Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_TEST_STREAM_HPP
11 : #define BOOST_CAPY_TEST_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_copy.hpp>
16 : #include <boost/capy/buffers/make_buffer.hpp>
17 : #include <coroutine>
18 : #include <boost/capy/ex/io_env.hpp>
19 : #include <boost/capy/io_result.hpp>
20 : #include <boost/capy/error.hpp>
21 : #include <boost/capy/read.hpp>
22 : #include <boost/capy/task.hpp>
23 : #include <boost/capy/test/fuse.hpp>
24 : #include <boost/capy/test/run_blocking.hpp>
25 :
26 : #include <memory>
27 : #include <stop_token>
28 : #include <string>
29 : #include <string_view>
30 : #include <utility>
31 :
32 : namespace boost {
33 : namespace capy {
34 : namespace test {
35 :
36 : /** A connected stream for testing bidirectional I/O.
37 :
38 : Streams are created in pairs via @ref make_stream_pair.
39 : Data written to one end becomes available for reading on
40 : the other. If no data is available when @ref read_some
41 : is called, the calling coroutine suspends until the peer
42 : calls @ref write_some. The shared @ref fuse enables error
43 : injection at controlled points in both directions.
44 :
45 : When the fuse injects an error or throws on one end, the
46 : other end is automatically closed: any suspended reader is
47 : resumed with `error::eof`, and subsequent operations on
48 : both ends return `error::eof`. Calling @ref close on one
49 : end signals eof to the peer's reads after draining any
50 : buffered data, while the peer may still write.
51 :
52 : @par Thread Safety
53 : Single-threaded only. Both ends of the pair must be
54 : accessed from the same thread. Concurrent access is
55 : undefined behavior.
56 :
57 : @par Example
58 : @code
59 : fuse f;
60 : auto [a, b] = make_stream_pair( f );
61 :
62 : auto r = f.armed( [&]( fuse& ) -> task<> {
63 : auto [ec, n] = co_await a.write_some(
64 : const_buffer( "hello", 5 ) );
65 : if( ec )
66 : co_return;
67 :
68 : char buf[32];
69 : auto [ec2, n2] = co_await b.read_some(
70 : mutable_buffer( buf, sizeof( buf ) ) );
71 : if( ec2 )
72 : co_return;
73 : // buf contains "hello"
74 : } );
75 : @endcode
76 :
77 : @see make_stream_pair, fuse
78 : */
79 : class stream
80 : {
81 : // Single-threaded only. No concurrent access to either
82 : // end of the pair. Both streams and all operations must
83 : // run on the same thread.
84 :
85 : struct half
86 : {
87 : std::string buf;
88 : std::size_t max_read_size = std::size_t(-1);
89 : std::coroutine_handle<> pending_h{};
90 : executor_ref pending_ex;
91 : bool eof = false;
92 : };
93 :
94 : struct state
95 : {
96 : fuse f;
97 : bool closed = false;
98 : half sides[2];
99 :
100 279 : explicit state(fuse f_) noexcept
101 837 : : f(std::move(f_))
102 : {
103 279 : }
104 :
105 : // Set closed and resume any suspended readers
106 : // with eof on both sides.
107 208 : void close()
108 : {
109 208 : closed = true;
110 624 : for(auto& side : sides)
111 : {
112 416 : if(side.pending_h)
113 : {
114 12 : auto h = side.pending_h;
115 12 : side.pending_h = {};
116 12 : auto ex = side.pending_ex;
117 12 : side.pending_ex = {};
118 12 : ex.post(h);
119 : }
120 : }
121 208 : }
122 : };
123 :
124 : // Wraps the maybe_fail() call. If the guard is
125 : // not disarmed before destruction (fuse returned
126 : // an error, or threw an exception), closes both
127 : // ends so any suspended peer gets eof.
128 : struct close_guard
129 : {
130 : state* st;
131 : bool armed = true;
132 298 : void disarm() noexcept { armed = false; }
133 506 : ~close_guard() noexcept(false) { if(armed) st->close(); }
134 : };
135 :
136 : std::shared_ptr<state> state_;
137 : int index_;
138 :
139 558 : stream(
140 : std::shared_ptr<state> sp,
141 : int index) noexcept
142 558 : : state_(std::move(sp))
143 558 : , index_(index)
144 : {
145 558 : }
146 :
147 : friend std::pair<stream, stream>
148 : make_stream_pair(fuse);
149 :
150 : public:
151 : stream(stream const&) = delete;
152 : stream& operator=(stream const&) = delete;
153 658 : stream(stream&&) = default;
154 : stream& operator=(stream&&) = default;
155 :
156 : /** Signal end-of-stream to the peer.
157 :
158 : Marks the peer's read direction as closed.
159 : If the peer is suspended in @ref read_some,
160 : it is resumed. The peer drains any buffered
161 : data before receiving `error::eof`. Writes
162 : from the peer are unaffected.
163 : */
164 : void
165 3 : close()
166 : {
167 3 : int peer = 1 - index_;
168 3 : auto& side = state_->sides[peer];
169 3 : side.eof = true;
170 3 : if(side.pending_h)
171 : {
172 1 : auto h = side.pending_h;
173 1 : side.pending_h = {};
174 1 : auto ex = side.pending_ex;
175 1 : side.pending_ex = {};
176 1 : ex.post(h);
177 : }
178 3 : }
179 :
180 : /** Set the maximum bytes returned per read.
181 :
182 : Limits how many bytes @ref read_some returns in
183 : a single call, simulating chunked network delivery.
184 : The default is unlimited.
185 :
186 : @param n Maximum bytes per read.
187 : */
188 : void
189 54 : set_max_read_size(std::size_t n) noexcept
190 : {
191 54 : state_->sides[index_].max_read_size = n;
192 54 : }
193 :
194 : /** Asynchronously read data from the stream.
195 :
196 : Transfers up to `buffer_size(buffers)` bytes from
197 : data written by the peer. If no data is available,
198 : the calling coroutine suspends until the peer calls
199 : @ref write_some. Before every read, the attached
200 : @ref fuse is consulted to possibly inject an error.
201 : If the fuse fires, the peer is automatically closed.
202 : If the stream is closed, returns `error::eof`.
203 : The returned `std::size_t` is the number of bytes
204 : transferred.
205 :
206 : @param buffers The mutable buffer sequence to receive data.
207 :
208 : @return An awaitable yielding `(error_code,std::size_t)`.
209 :
210 : @see fuse, close
211 : */
212 : template<MutableBufferSequence MB>
213 : auto
214 274 : read_some(MB buffers)
215 : {
216 : struct awaitable
217 : {
218 : stream* self_;
219 : MB buffers_;
220 :
221 274 : bool await_ready() const noexcept
222 : {
223 274 : if(buffer_empty(buffers_))
224 8 : return true;
225 266 : auto* st = self_->state_.get();
226 266 : auto& side = st->sides[self_->index_];
227 530 : return st->closed || side.eof ||
228 530 : !side.buf.empty();
229 : }
230 :
231 25 : std::coroutine_handle<> await_suspend(
232 : std::coroutine_handle<> h,
233 : io_env const* env) noexcept
234 : {
235 25 : auto& side = self_->state_->sides[
236 25 : self_->index_];
237 25 : side.pending_h = h;
238 25 : side.pending_ex = env->executor;
239 25 : return std::noop_coroutine();
240 : }
241 :
242 : io_result<std::size_t>
243 274 : await_resume()
244 : {
245 274 : if(buffer_empty(buffers_))
246 8 : return {{}, 0};
247 :
248 266 : auto* st = self_->state_.get();
249 266 : auto& side = st->sides[
250 266 : self_->index_];
251 :
252 266 : if(st->closed)
253 12 : return {error::eof, 0};
254 :
255 254 : if(side.eof && side.buf.empty())
256 3 : return {error::eof, 0};
257 :
258 251 : if(!side.eof)
259 : {
260 251 : close_guard g{st};
261 251 : auto ec = st->f.maybe_fail();
262 198 : if(ec)
263 53 : return {ec, 0};
264 145 : g.disarm();
265 251 : }
266 :
267 290 : std::size_t const n = buffer_copy(
268 145 : buffers_, make_buffer(side.buf),
269 : side.max_read_size);
270 145 : side.buf.erase(0, n);
271 145 : return {{}, n};
272 : }
273 : };
274 274 : return awaitable{this, buffers};
275 : }
276 :
277 : /** Asynchronously write data to the stream.
278 :
279 : Transfers up to `buffer_size(buffers)` bytes to the
280 : peer's incoming buffer. If the peer is suspended in
281 : @ref read_some, it is resumed. Before every write,
282 : the attached @ref fuse is consulted to possibly inject
283 : an error. If the fuse fires, the peer is automatically
284 : closed. If the stream is closed, returns `error::eof`.
285 : The returned `std::size_t` is the number of bytes
286 : transferred.
287 :
288 : @param buffers The const buffer sequence containing
289 : data to write.
290 :
291 : @return An awaitable yielding `(error_code,std::size_t)`.
292 :
293 : @see fuse, close
294 : */
295 : template<ConstBufferSequence CB>
296 : auto
297 259 : write_some(CB buffers)
298 : {
299 : struct awaitable
300 : {
301 : stream* self_;
302 : CB buffers_;
303 :
304 259 : bool await_ready() const noexcept { return true; }
305 :
306 0 : void await_suspend(
307 : std::coroutine_handle<>,
308 : io_env const*) const noexcept
309 : {
310 0 : }
311 :
312 : io_result<std::size_t>
313 259 : await_resume()
314 : {
315 259 : std::size_t n = buffer_size(buffers_);
316 259 : if(n == 0)
317 4 : return {{}, 0};
318 :
319 255 : auto* st = self_->state_.get();
320 :
321 255 : if(st->closed)
322 0 : return {error::eof, 0};
323 :
324 255 : close_guard g{st};
325 255 : auto ec = st->f.maybe_fail();
326 204 : if(ec)
327 51 : return {ec, 0};
328 153 : g.disarm();
329 :
330 153 : int peer = 1 - self_->index_;
331 153 : auto& side = st->sides[peer];
332 :
333 153 : std::size_t const old_size = side.buf.size();
334 153 : side.buf.resize(old_size + n);
335 153 : buffer_copy(make_buffer(
336 153 : side.buf.data() + old_size, n),
337 153 : buffers_, n);
338 :
339 153 : if(side.pending_h)
340 : {
341 12 : auto h = side.pending_h;
342 12 : side.pending_h = {};
343 12 : auto ex = side.pending_ex;
344 12 : side.pending_ex = {};
345 12 : ex.post(h);
346 : }
347 :
348 153 : return {{}, n};
349 255 : }
350 : };
351 259 : return awaitable{this, buffers};
352 : }
353 :
354 : /** Inject data into this stream's peer for reading.
355 :
356 : Appends data directly to the peer's incoming buffer,
357 : bypassing the fuse. If the peer is suspended in
358 : @ref read_some, it is resumed. This is test setup,
359 : not an operation under test.
360 :
361 : @param sv The data to inject.
362 :
363 : @see make_stream_pair
364 : */
365 : void
366 86 : provide(std::string_view sv)
367 : {
368 86 : int peer = 1 - index_;
369 86 : auto& side = state_->sides[peer];
370 86 : side.buf.append(sv);
371 86 : if(side.pending_h)
372 : {
373 0 : auto h = side.pending_h;
374 0 : side.pending_h = {};
375 0 : auto ex = side.pending_ex;
376 0 : side.pending_ex = {};
377 0 : ex.post(h);
378 : }
379 86 : }
380 :
381 : /** Read from this stream and verify the content.
382 :
383 : Reads exactly `expected.size()` bytes from the stream
384 : and compares against the expected string. The read goes
385 : through the normal path including the fuse.
386 :
387 : @param expected The expected content.
388 :
389 : @return A pair of `(error_code, bool)`. The error_code
390 : is set if a read error occurs (e.g. fuse injection).
391 : The bool is true if the data matches.
392 :
393 : @see provide
394 : */
395 : std::pair<std::error_code, bool>
396 38 : expect(std::string_view expected)
397 : {
398 38 : std::error_code result;
399 38 : bool match = false;
400 141 : run_blocking()([](
401 : stream& self,
402 : std::string_view expected,
403 : std::error_code& result,
404 : bool& match) -> task<>
405 : {
406 : std::string buf(expected.size(), '\0');
407 : auto [ec, n] = co_await read(
408 : self, mutable_buffer(
409 : buf.data(), buf.size()));
410 : if(ec)
411 : {
412 : result = ec;
413 : co_return;
414 : }
415 : match = (std::string_view(
416 : buf.data(), n) == expected);
417 161 : }(*this, expected, result, match));
418 58 : return {result, match};
419 : }
420 :
421 : /** Return the stream's pending read data.
422 :
423 : Returns a view of the data waiting to be read
424 : from this stream. This is a direct peek at the
425 : internal buffer, bypassing the fuse.
426 :
427 : @return A view of the pending data.
428 :
429 : @see provide, expect
430 : */
431 : std::string_view
432 : data() const noexcept
433 : {
434 : return state_->sides[index_].buf;
435 : }
436 : };
437 :
438 : /** Create a connected pair of test streams.
439 :
440 : Data written to one stream becomes readable on the other.
441 : If a coroutine calls @ref stream::read_some when no data
442 : is available, it suspends until the peer writes. Before
443 : every read or write, the @ref fuse is consulted to
444 : possibly inject an error for testing fault scenarios.
445 : When the fuse fires, the peer is automatically closed.
446 :
447 : @param f The fuse used to inject errors during operations.
448 :
449 : @return A pair of connected streams.
450 :
451 : @see stream, fuse
452 : */
453 : inline std::pair<stream, stream>
454 279 : make_stream_pair(fuse f = {})
455 : {
456 279 : auto sp = std::make_shared<stream::state>(std::move(f));
457 558 : return {stream(sp, 0), stream(sp, 1)};
458 279 : }
459 :
460 : } // test
461 : } // capy
462 : } // boost
463 :
464 : #endif
|