1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_STREAM_HPP
10  
#ifndef BOOST_CAPY_TEST_STREAM_HPP
11  
#define BOOST_CAPY_TEST_STREAM_HPP
11  
#define BOOST_CAPY_TEST_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <coroutine>
17  
#include <coroutine>
18  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/error.hpp>
20  
#include <boost/capy/error.hpp>
21  
#include <boost/capy/read.hpp>
21  
#include <boost/capy/read.hpp>
22  
#include <boost/capy/task.hpp>
22  
#include <boost/capy/task.hpp>
23  
#include <boost/capy/test/fuse.hpp>
23  
#include <boost/capy/test/fuse.hpp>
24  
#include <boost/capy/test/run_blocking.hpp>
24  
#include <boost/capy/test/run_blocking.hpp>
25  

25  

26  
#include <memory>
26  
#include <memory>
27  
#include <stop_token>
27  
#include <stop_token>
28  
#include <string>
28  
#include <string>
29  
#include <string_view>
29  
#include <string_view>
30  
#include <utility>
30  
#include <utility>
31  

31  

32  
namespace boost {
32  
namespace boost {
33  
namespace capy {
33  
namespace capy {
34  
namespace test {
34  
namespace test {
35  

35  

36  
/** A connected stream for testing bidirectional I/O.
36  
/** A connected stream for testing bidirectional I/O.
37  

37  

38  
    Streams are created in pairs via @ref make_stream_pair.
38  
    Streams are created in pairs via @ref make_stream_pair.
39  
    Data written to one end becomes available for reading on
39  
    Data written to one end becomes available for reading on
40  
    the other. If no data is available when @ref read_some
40  
    the other. If no data is available when @ref read_some
41  
    is called, the calling coroutine suspends until the peer
41  
    is called, the calling coroutine suspends until the peer
42  
    calls @ref write_some. The shared @ref fuse enables error
42  
    calls @ref write_some. The shared @ref fuse enables error
43  
    injection at controlled points in both directions.
43  
    injection at controlled points in both directions.
44  

44  

45  
    When the fuse injects an error or throws on one end, the
45  
    When the fuse injects an error or throws on one end, the
46  
    other end is automatically closed: any suspended reader is
46  
    other end is automatically closed: any suspended reader is
47  
    resumed with `error::eof`, and subsequent operations on
47  
    resumed with `error::eof`, and subsequent operations on
48  
    both ends return `error::eof`. Calling @ref close on one
48  
    both ends return `error::eof`. Calling @ref close on one
49  
    end signals eof to the peer's reads after draining any
49  
    end signals eof to the peer's reads after draining any
50  
    buffered data, while the peer may still write.
50  
    buffered data, while the peer may still write.
51  

51  

52  
    @par Thread Safety
52  
    @par Thread Safety
53  
    Single-threaded only. Both ends of the pair must be
53  
    Single-threaded only. Both ends of the pair must be
54  
    accessed from the same thread. Concurrent access is
54  
    accessed from the same thread. Concurrent access is
55  
    undefined behavior.
55  
    undefined behavior.
56  

56  

57  
    @par Example
57  
    @par Example
58  
    @code
58  
    @code
59  
    fuse f;
59  
    fuse f;
60  
    auto [a, b] = make_stream_pair( f );
60  
    auto [a, b] = make_stream_pair( f );
61  

61  

62  
    auto r = f.armed( [&]( fuse& ) -> task<> {
62  
    auto r = f.armed( [&]( fuse& ) -> task<> {
63  
        auto [ec, n] = co_await a.write_some(
63  
        auto [ec, n] = co_await a.write_some(
64  
            const_buffer( "hello", 5 ) );
64  
            const_buffer( "hello", 5 ) );
65  
        if( ec )
65  
        if( ec )
66  
            co_return;
66  
            co_return;
67  

67  

68  
        char buf[32];
68  
        char buf[32];
69  
        auto [ec2, n2] = co_await b.read_some(
69  
        auto [ec2, n2] = co_await b.read_some(
70  
            mutable_buffer( buf, sizeof( buf ) ) );
70  
            mutable_buffer( buf, sizeof( buf ) ) );
71  
        if( ec2 )
71  
        if( ec2 )
72  
            co_return;
72  
            co_return;
73  
        // buf contains "hello"
73  
        // buf contains "hello"
74  
    } );
74  
    } );
75  
    @endcode
75  
    @endcode
76  

76  

77  
    @see make_stream_pair, fuse
77  
    @see make_stream_pair, fuse
78  
*/
78  
*/
79  
class stream
79  
class stream
80  
{
80  
{
81  
    // Single-threaded only. No concurrent access to either
81  
    // Single-threaded only. No concurrent access to either
82  
    // end of the pair. Both streams and all operations must
82  
    // end of the pair. Both streams and all operations must
83  
    // run on the same thread.
83  
    // run on the same thread.
84  

84  

85  
    struct half
85  
    struct half
86  
    {
86  
    {
87  
        std::string buf;
87  
        std::string buf;
88  
        std::size_t max_read_size = std::size_t(-1);
88  
        std::size_t max_read_size = std::size_t(-1);
89  
        std::coroutine_handle<> pending_h{};
89  
        std::coroutine_handle<> pending_h{};
90  
        executor_ref pending_ex;
90  
        executor_ref pending_ex;
91  
        bool eof = false;
91  
        bool eof = false;
92  
    };
92  
    };
93  

93  

94  
    struct state
94  
    struct state
95  
    {
95  
    {
96  
        fuse f;
96  
        fuse f;
97  
        bool closed = false;
97  
        bool closed = false;
98  
        half sides[2];
98  
        half sides[2];
99  

99  

100  
        explicit state(fuse f_) noexcept
100  
        explicit state(fuse f_) noexcept
101  
            : f(std::move(f_))
101  
            : f(std::move(f_))
102  
        {
102  
        {
103  
        }
103  
        }
104  

104  

105  
        // Set closed and resume any suspended readers
105  
        // Set closed and resume any suspended readers
106  
        // with eof on both sides.
106  
        // with eof on both sides.
107  
        void close()
107  
        void close()
108  
        {
108  
        {
109  
            closed = true;
109  
            closed = true;
110  
            for(auto& side : sides)
110  
            for(auto& side : sides)
111  
            {
111  
            {
112  
                if(side.pending_h)
112  
                if(side.pending_h)
113  
                {
113  
                {
114  
                    auto h = side.pending_h;
114  
                    auto h = side.pending_h;
115  
                    side.pending_h = {};
115  
                    side.pending_h = {};
116  
                    auto ex = side.pending_ex;
116  
                    auto ex = side.pending_ex;
117  
                    side.pending_ex = {};
117  
                    side.pending_ex = {};
118  
                    ex.post(h);
118  
                    ex.post(h);
119  
                }
119  
                }
120  
            }
120  
            }
121  
        }
121  
        }
122  
    };
122  
    };
123  

123  

124  
    // Wraps the maybe_fail() call. If the guard is
124  
    // Wraps the maybe_fail() call. If the guard is
125  
    // not disarmed before destruction (fuse returned
125  
    // not disarmed before destruction (fuse returned
126  
    // an error, or threw an exception), closes both
126  
    // an error, or threw an exception), closes both
127  
    // ends so any suspended peer gets eof.
127  
    // ends so any suspended peer gets eof.
128  
    struct close_guard
128  
    struct close_guard
129  
    {
129  
    {
130  
        state* st;
130  
        state* st;
131  
        bool armed = true;
131  
        bool armed = true;
132  
        void disarm() noexcept { armed = false; }
132  
        void disarm() noexcept { armed = false; }
133  
        ~close_guard() noexcept(false) { if(armed) st->close(); }
133  
        ~close_guard() noexcept(false) { if(armed) st->close(); }
134  
    };
134  
    };
135  

135  

136  
    std::shared_ptr<state> state_;
136  
    std::shared_ptr<state> state_;
137  
    int index_;
137  
    int index_;
138  

138  

139  
    stream(
139  
    stream(
140  
        std::shared_ptr<state> sp,
140  
        std::shared_ptr<state> sp,
141  
        int index) noexcept
141  
        int index) noexcept
142  
        : state_(std::move(sp))
142  
        : state_(std::move(sp))
143  
        , index_(index)
143  
        , index_(index)
144  
    {
144  
    {
145  
    }
145  
    }
146  

146  

147  
    friend std::pair<stream, stream>
147  
    friend std::pair<stream, stream>
148  
    make_stream_pair(fuse);
148  
    make_stream_pair(fuse);
149  

149  

150  
public:
150  
public:
151  
    stream(stream const&) = delete;
151  
    stream(stream const&) = delete;
152  
    stream& operator=(stream const&) = delete;
152  
    stream& operator=(stream const&) = delete;
153  
    stream(stream&&) = default;
153  
    stream(stream&&) = default;
154  
    stream& operator=(stream&&) = default;
154  
    stream& operator=(stream&&) = default;
155  

155  

156  
    /** Signal end-of-stream to the peer.
156  
    /** Signal end-of-stream to the peer.
157  

157  

158  
        Marks the peer's read direction as closed.
158  
        Marks the peer's read direction as closed.
159  
        If the peer is suspended in @ref read_some,
159  
        If the peer is suspended in @ref read_some,
160  
        it is resumed. The peer drains any buffered
160  
        it is resumed. The peer drains any buffered
161  
        data before receiving `error::eof`. Writes
161  
        data before receiving `error::eof`. Writes
162  
        from the peer are unaffected.
162  
        from the peer are unaffected.
163  
    */
163  
    */
164  
    void
164  
    void
165  
    close()
165  
    close()
166  
    {
166  
    {
167  
        int peer = 1 - index_;
167  
        int peer = 1 - index_;
168  
        auto& side = state_->sides[peer];
168  
        auto& side = state_->sides[peer];
169  
        side.eof = true;
169  
        side.eof = true;
170  
        if(side.pending_h)
170  
        if(side.pending_h)
171  
        {
171  
        {
172  
            auto h = side.pending_h;
172  
            auto h = side.pending_h;
173  
            side.pending_h = {};
173  
            side.pending_h = {};
174  
            auto ex = side.pending_ex;
174  
            auto ex = side.pending_ex;
175  
            side.pending_ex = {};
175  
            side.pending_ex = {};
176  
            ex.post(h);
176  
            ex.post(h);
177  
        }
177  
        }
178  
    }
178  
    }
179  

179  

180  
    /** Set the maximum bytes returned per read.
180  
    /** Set the maximum bytes returned per read.
181  

181  

182  
        Limits how many bytes @ref read_some returns in
182  
        Limits how many bytes @ref read_some returns in
183  
        a single call, simulating chunked network delivery.
183  
        a single call, simulating chunked network delivery.
184  
        The default is unlimited.
184  
        The default is unlimited.
185  

185  

186  
        @param n Maximum bytes per read.
186  
        @param n Maximum bytes per read.
187  
    */
187  
    */
188  
    void
188  
    void
189  
    set_max_read_size(std::size_t n) noexcept
189  
    set_max_read_size(std::size_t n) noexcept
190  
    {
190  
    {
191  
        state_->sides[index_].max_read_size = n;
191  
        state_->sides[index_].max_read_size = n;
192  
    }
192  
    }
193  

193  

194  
    /** Asynchronously read data from the stream.
194  
    /** Asynchronously read data from the stream.
195  

195  

196  
        Transfers up to `buffer_size(buffers)` bytes from
196  
        Transfers up to `buffer_size(buffers)` bytes from
197  
        data written by the peer. If no data is available,
197  
        data written by the peer. If no data is available,
198  
        the calling coroutine suspends until the peer calls
198  
        the calling coroutine suspends until the peer calls
199  
        @ref write_some. Before every read, the attached
199  
        @ref write_some. Before every read, the attached
200  
        @ref fuse is consulted to possibly inject an error.
200  
        @ref fuse is consulted to possibly inject an error.
201  
        If the fuse fires, the peer is automatically closed.
201  
        If the fuse fires, the peer is automatically closed.
202  
        If the stream is closed, returns `error::eof`.
202  
        If the stream is closed, returns `error::eof`.
203  
        The returned `std::size_t` is the number of bytes
203  
        The returned `std::size_t` is the number of bytes
204  
        transferred.
204  
        transferred.
205  

205  

206  
        @param buffers The mutable buffer sequence to receive data.
206  
        @param buffers The mutable buffer sequence to receive data.
207  

207  

208  
        @return An awaitable yielding `(error_code,std::size_t)`.
208  
        @return An awaitable yielding `(error_code,std::size_t)`.
209  

209  

210  
        @see fuse, close
210  
        @see fuse, close
211  
    */
211  
    */
212  
    template<MutableBufferSequence MB>
212  
    template<MutableBufferSequence MB>
213  
    auto
213  
    auto
214  
    read_some(MB buffers)
214  
    read_some(MB buffers)
215  
    {
215  
    {
216  
        struct awaitable
216  
        struct awaitable
217  
        {
217  
        {
218  
            stream* self_;
218  
            stream* self_;
219  
            MB buffers_;
219  
            MB buffers_;
220  

220  

221  
            bool await_ready() const noexcept
221  
            bool await_ready() const noexcept
222  
            {
222  
            {
223  
                if(buffer_empty(buffers_))
223  
                if(buffer_empty(buffers_))
224  
                    return true;
224  
                    return true;
225  
                auto* st = self_->state_.get();
225  
                auto* st = self_->state_.get();
226  
                auto& side = st->sides[self_->index_];
226  
                auto& side = st->sides[self_->index_];
227  
                return st->closed || side.eof ||
227  
                return st->closed || side.eof ||
228  
                    !side.buf.empty();
228  
                    !side.buf.empty();
229  
            }
229  
            }
230  

230  

231  
            std::coroutine_handle<> await_suspend(
231  
            std::coroutine_handle<> await_suspend(
232  
                std::coroutine_handle<> h,
232  
                std::coroutine_handle<> h,
233  
                io_env const* env) noexcept
233  
                io_env const* env) noexcept
234  
            {
234  
            {
235  
                auto& side = self_->state_->sides[
235  
                auto& side = self_->state_->sides[
236  
                    self_->index_];
236  
                    self_->index_];
237  
                side.pending_h = h;
237  
                side.pending_h = h;
238  
                side.pending_ex = env->executor;
238  
                side.pending_ex = env->executor;
239  
                return std::noop_coroutine();
239  
                return std::noop_coroutine();
240  
            }
240  
            }
241  

241  

242  
            io_result<std::size_t>
242  
            io_result<std::size_t>
243  
            await_resume()
243  
            await_resume()
244  
            {
244  
            {
245  
                if(buffer_empty(buffers_))
245  
                if(buffer_empty(buffers_))
246  
                    return {{}, 0};
246  
                    return {{}, 0};
247  

247  

248  
                auto* st = self_->state_.get();
248  
                auto* st = self_->state_.get();
249  
                auto& side = st->sides[
249  
                auto& side = st->sides[
250  
                    self_->index_];
250  
                    self_->index_];
251  

251  

252  
                if(st->closed)
252  
                if(st->closed)
253  
                    return {error::eof, 0};
253  
                    return {error::eof, 0};
254  

254  

255  
                if(side.eof && side.buf.empty())
255  
                if(side.eof && side.buf.empty())
256  
                    return {error::eof, 0};
256  
                    return {error::eof, 0};
257  

257  

258  
                if(!side.eof)
258  
                if(!side.eof)
259  
                {
259  
                {
260  
                    close_guard g{st};
260  
                    close_guard g{st};
261  
                    auto ec = st->f.maybe_fail();
261  
                    auto ec = st->f.maybe_fail();
262  
                    if(ec)
262  
                    if(ec)
263  
                        return {ec, 0};
263  
                        return {ec, 0};
264  
                    g.disarm();
264  
                    g.disarm();
265  
                }
265  
                }
266  

266  

267  
                std::size_t const n = buffer_copy(
267  
                std::size_t const n = buffer_copy(
268  
                    buffers_, make_buffer(side.buf),
268  
                    buffers_, make_buffer(side.buf),
269  
                    side.max_read_size);
269  
                    side.max_read_size);
270  
                side.buf.erase(0, n);
270  
                side.buf.erase(0, n);
271  
                return {{}, n};
271  
                return {{}, n};
272  
            }
272  
            }
273  
        };
273  
        };
274  
        return awaitable{this, buffers};
274  
        return awaitable{this, buffers};
275  
    }
275  
    }
276  

276  

277  
    /** Asynchronously write data to the stream.
277  
    /** Asynchronously write data to the stream.
278  

278  

279  
        Transfers up to `buffer_size(buffers)` bytes to the
279  
        Transfers up to `buffer_size(buffers)` bytes to the
280  
        peer's incoming buffer. If the peer is suspended in
280  
        peer's incoming buffer. If the peer is suspended in
281  
        @ref read_some, it is resumed. Before every write,
281  
        @ref read_some, it is resumed. Before every write,
282  
        the attached @ref fuse is consulted to possibly inject
282  
        the attached @ref fuse is consulted to possibly inject
283  
        an error. If the fuse fires, the peer is automatically
283  
        an error. If the fuse fires, the peer is automatically
284  
        closed. If the stream is closed, returns `error::eof`.
284  
        closed. If the stream is closed, returns `error::eof`.
285  
        The returned `std::size_t` is the number of bytes
285  
        The returned `std::size_t` is the number of bytes
286  
        transferred.
286  
        transferred.
287  

287  

288  
        @param buffers The const buffer sequence containing
288  
        @param buffers The const buffer sequence containing
289  
            data to write.
289  
            data to write.
290  

290  

291  
        @return An awaitable yielding `(error_code,std::size_t)`.
291  
        @return An awaitable yielding `(error_code,std::size_t)`.
292  

292  

293  
        @see fuse, close
293  
        @see fuse, close
294  
    */
294  
    */
295  
    template<ConstBufferSequence CB>
295  
    template<ConstBufferSequence CB>
296  
    auto
296  
    auto
297  
    write_some(CB buffers)
297  
    write_some(CB buffers)
298  
    {
298  
    {
299  
        struct awaitable
299  
        struct awaitable
300  
        {
300  
        {
301  
            stream* self_;
301  
            stream* self_;
302  
            CB buffers_;
302  
            CB buffers_;
303  

303  

304  
            bool await_ready() const noexcept { return true; }
304  
            bool await_ready() const noexcept { return true; }
305  

305  

306  
            void await_suspend(
306  
            void await_suspend(
307  
                std::coroutine_handle<>,
307  
                std::coroutine_handle<>,
308  
                io_env const*) const noexcept
308  
                io_env const*) const noexcept
309  
            {
309  
            {
310  
            }
310  
            }
311  

311  

312  
            io_result<std::size_t>
312  
            io_result<std::size_t>
313  
            await_resume()
313  
            await_resume()
314  
            {
314  
            {
315  
                std::size_t n = buffer_size(buffers_);
315  
                std::size_t n = buffer_size(buffers_);
316  
                if(n == 0)
316  
                if(n == 0)
317  
                    return {{}, 0};
317  
                    return {{}, 0};
318  

318  

319  
                auto* st = self_->state_.get();
319  
                auto* st = self_->state_.get();
320  

320  

321  
                if(st->closed)
321  
                if(st->closed)
322  
                    return {error::eof, 0};
322  
                    return {error::eof, 0};
323  

323  

324  
                close_guard g{st};
324  
                close_guard g{st};
325  
                auto ec = st->f.maybe_fail();
325  
                auto ec = st->f.maybe_fail();
326  
                if(ec)
326  
                if(ec)
327  
                    return {ec, 0};
327  
                    return {ec, 0};
328  
                g.disarm();
328  
                g.disarm();
329  

329  

330  
                int peer = 1 - self_->index_;
330  
                int peer = 1 - self_->index_;
331  
                auto& side = st->sides[peer];
331  
                auto& side = st->sides[peer];
332  

332  

333  
                std::size_t const old_size = side.buf.size();
333  
                std::size_t const old_size = side.buf.size();
334  
                side.buf.resize(old_size + n);
334  
                side.buf.resize(old_size + n);
335  
                buffer_copy(make_buffer(
335  
                buffer_copy(make_buffer(
336  
                    side.buf.data() + old_size, n),
336  
                    side.buf.data() + old_size, n),
337  
                    buffers_, n);
337  
                    buffers_, n);
338  

338  

339  
                if(side.pending_h)
339  
                if(side.pending_h)
340  
                {
340  
                {
341  
                    auto h = side.pending_h;
341  
                    auto h = side.pending_h;
342  
                    side.pending_h = {};
342  
                    side.pending_h = {};
343  
                    auto ex = side.pending_ex;
343  
                    auto ex = side.pending_ex;
344  
                    side.pending_ex = {};
344  
                    side.pending_ex = {};
345  
                    ex.post(h);
345  
                    ex.post(h);
346  
                }
346  
                }
347  

347  

348  
                return {{}, n};
348  
                return {{}, n};
349  
            }
349  
            }
350  
        };
350  
        };
351  
        return awaitable{this, buffers};
351  
        return awaitable{this, buffers};
352  
    }
352  
    }
353  

353  

354  
    /** Inject data into this stream's peer for reading.
354  
    /** Inject data into this stream's peer for reading.
355  

355  

356  
        Appends data directly to the peer's incoming buffer,
356  
        Appends data directly to the peer's incoming buffer,
357  
        bypassing the fuse. If the peer is suspended in
357  
        bypassing the fuse. If the peer is suspended in
358  
        @ref read_some, it is resumed. This is test setup,
358  
        @ref read_some, it is resumed. This is test setup,
359  
        not an operation under test.
359  
        not an operation under test.
360  

360  

361  
        @param sv The data to inject.
361  
        @param sv The data to inject.
362  

362  

363  
        @see make_stream_pair
363  
        @see make_stream_pair
364  
    */
364  
    */
365  
    void
365  
    void
366  
    provide(std::string_view sv)
366  
    provide(std::string_view sv)
367  
    {
367  
    {
368  
        int peer = 1 - index_;
368  
        int peer = 1 - index_;
369  
        auto& side = state_->sides[peer];
369  
        auto& side = state_->sides[peer];
370  
        side.buf.append(sv);
370  
        side.buf.append(sv);
371  
        if(side.pending_h)
371  
        if(side.pending_h)
372  
        {
372  
        {
373  
            auto h = side.pending_h;
373  
            auto h = side.pending_h;
374  
            side.pending_h = {};
374  
            side.pending_h = {};
375  
            auto ex = side.pending_ex;
375  
            auto ex = side.pending_ex;
376  
            side.pending_ex = {};
376  
            side.pending_ex = {};
377  
            ex.post(h);
377  
            ex.post(h);
378  
        }
378  
        }
379  
    }
379  
    }
380  

380  

381  
    /** Read from this stream and verify the content.
381  
    /** Read from this stream and verify the content.
382  

382  

383  
        Reads exactly `expected.size()` bytes from the stream
383  
        Reads exactly `expected.size()` bytes from the stream
384  
        and compares against the expected string. The read goes
384  
        and compares against the expected string. The read goes
385  
        through the normal path including the fuse.
385  
        through the normal path including the fuse.
386  

386  

387  
        @param expected The expected content.
387  
        @param expected The expected content.
388  

388  

389  
        @return A pair of `(error_code, bool)`. The error_code
389  
        @return A pair of `(error_code, bool)`. The error_code
390  
            is set if a read error occurs (e.g. fuse injection).
390  
            is set if a read error occurs (e.g. fuse injection).
391  
            The bool is true if the data matches.
391  
            The bool is true if the data matches.
392  

392  

393  
        @see provide
393  
        @see provide
394  
    */
394  
    */
395  
    std::pair<std::error_code, bool>
395  
    std::pair<std::error_code, bool>
396  
    expect(std::string_view expected)
396  
    expect(std::string_view expected)
397  
    {
397  
    {
398  
        std::error_code result;
398  
        std::error_code result;
399  
        bool match = false;
399  
        bool match = false;
400  
        run_blocking()([](
400  
        run_blocking()([](
401  
            stream& self,
401  
            stream& self,
402  
            std::string_view expected,
402  
            std::string_view expected,
403  
            std::error_code& result,
403  
            std::error_code& result,
404  
            bool& match) -> task<>
404  
            bool& match) -> task<>
405  
        {
405  
        {
406  
            std::string buf(expected.size(), '\0');
406  
            std::string buf(expected.size(), '\0');
407  
            auto [ec, n] = co_await read(
407  
            auto [ec, n] = co_await read(
408  
                self, mutable_buffer(
408  
                self, mutable_buffer(
409  
                    buf.data(), buf.size()));
409  
                    buf.data(), buf.size()));
410  
            if(ec)
410  
            if(ec)
411  
            {
411  
            {
412  
                result = ec;
412  
                result = ec;
413  
                co_return;
413  
                co_return;
414  
            }
414  
            }
415  
            match = (std::string_view(
415  
            match = (std::string_view(
416  
                buf.data(), n) == expected);
416  
                buf.data(), n) == expected);
417  
        }(*this, expected, result, match));
417  
        }(*this, expected, result, match));
418  
        return {result, match};
418  
        return {result, match};
419  
    }
419  
    }
420  

420  

421  
    /** Return the stream's pending read data.
421  
    /** Return the stream's pending read data.
422  

422  

423  
        Returns a view of the data waiting to be read
423  
        Returns a view of the data waiting to be read
424  
        from this stream. This is a direct peek at the
424  
        from this stream. This is a direct peek at the
425  
        internal buffer, bypassing the fuse.
425  
        internal buffer, bypassing the fuse.
426  

426  

427  
        @return A view of the pending data.
427  
        @return A view of the pending data.
428  

428  

429  
        @see provide, expect
429  
        @see provide, expect
430  
    */
430  
    */
431  
    std::string_view
431  
    std::string_view
432  
    data() const noexcept
432  
    data() const noexcept
433  
    {
433  
    {
434  
        return state_->sides[index_].buf;
434  
        return state_->sides[index_].buf;
435  
    }
435  
    }
436  
};
436  
};
437  

437  

438  
/** Create a connected pair of test streams.
438  
/** Create a connected pair of test streams.
439  

439  

440  
    Data written to one stream becomes readable on the other.
440  
    Data written to one stream becomes readable on the other.
441  
    If a coroutine calls @ref stream::read_some when no data
441  
    If a coroutine calls @ref stream::read_some when no data
442  
    is available, it suspends until the peer writes. Before
442  
    is available, it suspends until the peer writes. Before
443  
    every read or write, the @ref fuse is consulted to
443  
    every read or write, the @ref fuse is consulted to
444  
    possibly inject an error for testing fault scenarios.
444  
    possibly inject an error for testing fault scenarios.
445  
    When the fuse fires, the peer is automatically closed.
445  
    When the fuse fires, the peer is automatically closed.
446  

446  

447  
    @param f The fuse used to inject errors during operations.
447  
    @param f The fuse used to inject errors during operations.
448  

448  

449  
    @return A pair of connected streams.
449  
    @return A pair of connected streams.
450  

450  

451  
    @see stream, fuse
451  
    @see stream, fuse
452  
*/
452  
*/
453  
inline std::pair<stream, stream>
453  
inline std::pair<stream, stream>
454  
make_stream_pair(fuse f = {})
454  
make_stream_pair(fuse f = {})
455  
{
455  
{
456  
    auto sp = std::make_shared<stream::state>(std::move(f));
456  
    auto sp = std::make_shared<stream::state>(std::move(f));
457  
    return {stream(sp, 0), stream(sp, 1)};
457  
    return {stream(sp, 0), stream(sp, 1)};
458  
}
458  
}
459  

459  

460  
} // test
460  
} // test
461  
} // capy
461  
} // capy
462  
} // boost
462  
} // boost
463  

463  

464  
#endif
464  
#endif