LCOV - code coverage report
Current view: top level - capy/test - stream.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 93.5 % 123 115
Test Date: 2026-02-14 08:39:14 Functions: 86.1 % 36 31

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_TEST_STREAM_HPP
      11              : #define BOOST_CAPY_TEST_STREAM_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/buffers.hpp>
      15              : #include <boost/capy/buffers/buffer_copy.hpp>
      16              : #include <boost/capy/buffers/make_buffer.hpp>
      17              : #include <coroutine>
      18              : #include <boost/capy/ex/io_env.hpp>
      19              : #include <boost/capy/io_result.hpp>
      20              : #include <boost/capy/error.hpp>
      21              : #include <boost/capy/read.hpp>
      22              : #include <boost/capy/task.hpp>
      23              : #include <boost/capy/test/fuse.hpp>
      24              : #include <boost/capy/test/run_blocking.hpp>
      25              : 
      26              : #include <memory>
      27              : #include <stop_token>
      28              : #include <string>
      29              : #include <string_view>
      30              : #include <utility>
      31              : 
      32              : namespace boost {
      33              : namespace capy {
      34              : namespace test {
      35              : 
      36              : /** A connected stream for testing bidirectional I/O.
      37              : 
      38              :     Streams are created in pairs via @ref make_stream_pair.
      39              :     Data written to one end becomes available for reading on
      40              :     the other. If no data is available when @ref read_some
      41              :     is called, the calling coroutine suspends until the peer
      42              :     calls @ref write_some. The shared @ref fuse enables error
      43              :     injection at controlled points in both directions.
      44              : 
      45              :     When the fuse injects an error or throws on one end, the
      46              :     other end is automatically closed: any suspended reader is
      47              :     resumed with `error::eof`, and subsequent operations on
      48              :     both ends return `error::eof`. Calling @ref close on one
      49              :     end signals eof to the peer's reads after draining any
      50              :     buffered data, while the peer may still write.
      51              : 
      52              :     @par Thread Safety
      53              :     Single-threaded only. Both ends of the pair must be
      54              :     accessed from the same thread. Concurrent access is
      55              :     undefined behavior.
      56              : 
      57              :     @par Example
      58              :     @code
      59              :     fuse f;
      60              :     auto [a, b] = make_stream_pair( f );
      61              : 
      62              :     auto r = f.armed( [&]( fuse& ) -> task<> {
      63              :         auto [ec, n] = co_await a.write_some(
      64              :             const_buffer( "hello", 5 ) );
      65              :         if( ec )
      66              :             co_return;
      67              : 
      68              :         char buf[32];
      69              :         auto [ec2, n2] = co_await b.read_some(
      70              :             mutable_buffer( buf, sizeof( buf ) ) );
      71              :         if( ec2 )
      72              :             co_return;
      73              :         // buf contains "hello"
      74              :     } );
      75              :     @endcode
      76              : 
      77              :     @see make_stream_pair, fuse
      78              : */
      79              : class stream
      80              : {
      81              :     // Single-threaded only. No concurrent access to either
      82              :     // end of the pair. Both streams and all operations must
      83              :     // run on the same thread.
      84              : 
      85              :     struct half
      86              :     {
      87              :         std::string buf;
      88              :         std::size_t max_read_size = std::size_t(-1);
      89              :         std::coroutine_handle<> pending_h{};
      90              :         executor_ref pending_ex;
      91              :         bool eof = false;
      92              :     };
      93              : 
      94              :     struct state
      95              :     {
      96              :         fuse f;
      97              :         bool closed = false;
      98              :         half sides[2];
      99              : 
     100          279 :         explicit state(fuse f_) noexcept
     101          837 :             : f(std::move(f_))
     102              :         {
     103          279 :         }
     104              : 
     105              :         // Set closed and resume any suspended readers
     106              :         // with eof on both sides.
     107          208 :         void close()
     108              :         {
     109          208 :             closed = true;
     110          624 :             for(auto& side : sides)
     111              :             {
     112          416 :                 if(side.pending_h)
     113              :                 {
     114           12 :                     auto h = side.pending_h;
     115           12 :                     side.pending_h = {};
     116           12 :                     auto ex = side.pending_ex;
     117           12 :                     side.pending_ex = {};
     118           12 :                     ex.post(h);
     119              :                 }
     120              :             }
     121          208 :         }
     122              :     };
     123              : 
     124              :     // Wraps the maybe_fail() call. If the guard is
     125              :     // not disarmed before destruction (fuse returned
     126              :     // an error, or threw an exception), closes both
     127              :     // ends so any suspended peer gets eof.
     128              :     struct close_guard
     129              :     {
     130              :         state* st;
     131              :         bool armed = true;
     132          298 :         void disarm() noexcept { armed = false; }
     133          506 :         ~close_guard() noexcept(false) { if(armed) st->close(); }
     134              :     };
     135              : 
     136              :     std::shared_ptr<state> state_;
     137              :     int index_;
     138              : 
     139          558 :     stream(
     140              :         std::shared_ptr<state> sp,
     141              :         int index) noexcept
     142          558 :         : state_(std::move(sp))
     143          558 :         , index_(index)
     144              :     {
     145          558 :     }
     146              : 
     147              :     friend std::pair<stream, stream>
     148              :     make_stream_pair(fuse);
     149              : 
     150              : public:
     151              :     stream(stream const&) = delete;
     152              :     stream& operator=(stream const&) = delete;
     153          658 :     stream(stream&&) = default;
     154              :     stream& operator=(stream&&) = default;
     155              : 
     156              :     /** Signal end-of-stream to the peer.
     157              : 
     158              :         Marks the peer's read direction as closed.
     159              :         If the peer is suspended in @ref read_some,
     160              :         it is resumed. The peer drains any buffered
     161              :         data before receiving `error::eof`. Writes
     162              :         from the peer are unaffected.
     163              :     */
     164              :     void
     165            3 :     close()
     166              :     {
     167            3 :         int peer = 1 - index_;
     168            3 :         auto& side = state_->sides[peer];
     169            3 :         side.eof = true;
     170            3 :         if(side.pending_h)
     171              :         {
     172            1 :             auto h = side.pending_h;
     173            1 :             side.pending_h = {};
     174            1 :             auto ex = side.pending_ex;
     175            1 :             side.pending_ex = {};
     176            1 :             ex.post(h);
     177              :         }
     178            3 :     }
     179              : 
     180              :     /** Set the maximum bytes returned per read.
     181              : 
     182              :         Limits how many bytes @ref read_some returns in
     183              :         a single call, simulating chunked network delivery.
     184              :         The default is unlimited.
     185              : 
     186              :         @param n Maximum bytes per read.
     187              :     */
     188              :     void
     189           54 :     set_max_read_size(std::size_t n) noexcept
     190              :     {
     191           54 :         state_->sides[index_].max_read_size = n;
     192           54 :     }
     193              : 
     194              :     /** Asynchronously read data from the stream.
     195              : 
     196              :         Transfers up to `buffer_size(buffers)` bytes from
     197              :         data written by the peer. If no data is available,
     198              :         the calling coroutine suspends until the peer calls
     199              :         @ref write_some. Before every read, the attached
     200              :         @ref fuse is consulted to possibly inject an error.
     201              :         If the fuse fires, the peer is automatically closed.
     202              :         If the stream is closed, returns `error::eof`.
     203              :         The returned `std::size_t` is the number of bytes
     204              :         transferred.
     205              : 
     206              :         @param buffers The mutable buffer sequence to receive data.
     207              : 
     208              :         @return An awaitable yielding `(error_code,std::size_t)`.
     209              : 
     210              :         @see fuse, close
     211              :     */
     212              :     template<MutableBufferSequence MB>
     213              :     auto
     214          274 :     read_some(MB buffers)
     215              :     {
     216              :         struct awaitable
     217              :         {
     218              :             stream* self_;
     219              :             MB buffers_;
     220              : 
     221          274 :             bool await_ready() const noexcept
     222              :             {
     223          274 :                 if(buffer_empty(buffers_))
     224            8 :                     return true;
     225          266 :                 auto* st = self_->state_.get();
     226          266 :                 auto& side = st->sides[self_->index_];
     227          530 :                 return st->closed || side.eof ||
     228          530 :                     !side.buf.empty();
     229              :             }
     230              : 
     231           25 :             std::coroutine_handle<> await_suspend(
     232              :                 std::coroutine_handle<> h,
     233              :                 io_env const* env) noexcept
     234              :             {
     235           25 :                 auto& side = self_->state_->sides[
     236           25 :                     self_->index_];
     237           25 :                 side.pending_h = h;
     238           25 :                 side.pending_ex = env->executor;
     239           25 :                 return std::noop_coroutine();
     240              :             }
     241              : 
     242              :             io_result<std::size_t>
     243          274 :             await_resume()
     244              :             {
     245          274 :                 if(buffer_empty(buffers_))
     246            8 :                     return {{}, 0};
     247              : 
     248          266 :                 auto* st = self_->state_.get();
     249          266 :                 auto& side = st->sides[
     250          266 :                     self_->index_];
     251              : 
     252          266 :                 if(st->closed)
     253           12 :                     return {error::eof, 0};
     254              : 
     255          254 :                 if(side.eof && side.buf.empty())
     256            3 :                     return {error::eof, 0};
     257              : 
     258          251 :                 if(!side.eof)
     259              :                 {
     260          251 :                     close_guard g{st};
     261          251 :                     auto ec = st->f.maybe_fail();
     262          198 :                     if(ec)
     263           53 :                         return {ec, 0};
     264          145 :                     g.disarm();
     265          251 :                 }
     266              : 
     267          290 :                 std::size_t const n = buffer_copy(
     268          145 :                     buffers_, make_buffer(side.buf),
     269              :                     side.max_read_size);
     270          145 :                 side.buf.erase(0, n);
     271          145 :                 return {{}, n};
     272              :             }
     273              :         };
     274          274 :         return awaitable{this, buffers};
     275              :     }
     276              : 
     277              :     /** Asynchronously write data to the stream.
     278              : 
     279              :         Transfers up to `buffer_size(buffers)` bytes to the
     280              :         peer's incoming buffer. If the peer is suspended in
     281              :         @ref read_some, it is resumed. Before every write,
     282              :         the attached @ref fuse is consulted to possibly inject
     283              :         an error. If the fuse fires, the peer is automatically
     284              :         closed. If the stream is closed, returns `error::eof`.
     285              :         The returned `std::size_t` is the number of bytes
     286              :         transferred.
     287              : 
     288              :         @param buffers The const buffer sequence containing
     289              :             data to write.
     290              : 
     291              :         @return An awaitable yielding `(error_code,std::size_t)`.
     292              : 
     293              :         @see fuse, close
     294              :     */
     295              :     template<ConstBufferSequence CB>
     296              :     auto
     297          259 :     write_some(CB buffers)
     298              :     {
     299              :         struct awaitable
     300              :         {
     301              :             stream* self_;
     302              :             CB buffers_;
     303              : 
     304          259 :             bool await_ready() const noexcept { return true; }
     305              : 
     306            0 :             void await_suspend(
     307              :                 std::coroutine_handle<>,
     308              :                 io_env const*) const noexcept
     309              :             {
     310            0 :             }
     311              : 
     312              :             io_result<std::size_t>
     313          259 :             await_resume()
     314              :             {
     315          259 :                 std::size_t n = buffer_size(buffers_);
     316          259 :                 if(n == 0)
     317            4 :                     return {{}, 0};
     318              : 
     319          255 :                 auto* st = self_->state_.get();
     320              : 
     321          255 :                 if(st->closed)
     322            0 :                     return {error::eof, 0};
     323              : 
     324          255 :                 close_guard g{st};
     325          255 :                 auto ec = st->f.maybe_fail();
     326          204 :                 if(ec)
     327           51 :                     return {ec, 0};
     328          153 :                 g.disarm();
     329              : 
     330          153 :                 int peer = 1 - self_->index_;
     331          153 :                 auto& side = st->sides[peer];
     332              : 
     333          153 :                 std::size_t const old_size = side.buf.size();
     334          153 :                 side.buf.resize(old_size + n);
     335          153 :                 buffer_copy(make_buffer(
     336          153 :                     side.buf.data() + old_size, n),
     337          153 :                     buffers_, n);
     338              : 
     339          153 :                 if(side.pending_h)
     340              :                 {
     341           12 :                     auto h = side.pending_h;
     342           12 :                     side.pending_h = {};
     343           12 :                     auto ex = side.pending_ex;
     344           12 :                     side.pending_ex = {};
     345           12 :                     ex.post(h);
     346              :                 }
     347              : 
     348          153 :                 return {{}, n};
     349          255 :             }
     350              :         };
     351          259 :         return awaitable{this, buffers};
     352              :     }
     353              : 
     354              :     /** Inject data into this stream's peer for reading.
     355              : 
     356              :         Appends data directly to the peer's incoming buffer,
     357              :         bypassing the fuse. If the peer is suspended in
     358              :         @ref read_some, it is resumed. This is test setup,
     359              :         not an operation under test.
     360              : 
     361              :         @param sv The data to inject.
     362              : 
     363              :         @see make_stream_pair
     364              :     */
     365              :     void
     366           86 :     provide(std::string_view sv)
     367              :     {
     368           86 :         int peer = 1 - index_;
     369           86 :         auto& side = state_->sides[peer];
     370           86 :         side.buf.append(sv);
     371           86 :         if(side.pending_h)
     372              :         {
     373            0 :             auto h = side.pending_h;
     374            0 :             side.pending_h = {};
     375            0 :             auto ex = side.pending_ex;
     376            0 :             side.pending_ex = {};
     377            0 :             ex.post(h);
     378              :         }
     379           86 :     }
     380              : 
     381              :     /** Read from this stream and verify the content.
     382              : 
     383              :         Reads exactly `expected.size()` bytes from the stream
     384              :         and compares against the expected string. The read goes
     385              :         through the normal path including the fuse.
     386              : 
     387              :         @param expected The expected content.
     388              : 
     389              :         @return A pair of `(error_code, bool)`. The error_code
     390              :             is set if a read error occurs (e.g. fuse injection).
     391              :             The bool is true if the data matches.
     392              : 
     393              :         @see provide
     394              :     */
     395              :     std::pair<std::error_code, bool>
     396           38 :     expect(std::string_view expected)
     397              :     {
     398           38 :         std::error_code result;
     399           38 :         bool match = false;
     400          141 :         run_blocking()([](
     401              :             stream& self,
     402              :             std::string_view expected,
     403              :             std::error_code& result,
     404              :             bool& match) -> task<>
     405              :         {
     406              :             std::string buf(expected.size(), '\0');
     407              :             auto [ec, n] = co_await read(
     408              :                 self, mutable_buffer(
     409              :                     buf.data(), buf.size()));
     410              :             if(ec)
     411              :             {
     412              :                 result = ec;
     413              :                 co_return;
     414              :             }
     415              :             match = (std::string_view(
     416              :                 buf.data(), n) == expected);
     417          161 :         }(*this, expected, result, match));
     418           58 :         return {result, match};
     419              :     }
     420              : 
     421              :     /** Return the stream's pending read data.
     422              : 
     423              :         Returns a view of the data waiting to be read
     424              :         from this stream. This is a direct peek at the
     425              :         internal buffer, bypassing the fuse.
     426              : 
     427              :         @return A view of the pending data.
     428              : 
     429              :         @see provide, expect
     430              :     */
     431              :     std::string_view
     432              :     data() const noexcept
     433              :     {
     434              :         return state_->sides[index_].buf;
     435              :     }
     436              : };
     437              : 
     438              : /** Create a connected pair of test streams.
     439              : 
     440              :     Data written to one stream becomes readable on the other.
     441              :     If a coroutine calls @ref stream::read_some when no data
     442              :     is available, it suspends until the peer writes. Before
     443              :     every read or write, the @ref fuse is consulted to
     444              :     possibly inject an error for testing fault scenarios.
     445              :     When the fuse fires, the peer is automatically closed.
     446              : 
     447              :     @param f The fuse used to inject errors during operations.
     448              : 
     449              :     @return A pair of connected streams.
     450              : 
     451              :     @see stream, fuse
     452              : */
     453              : inline std::pair<stream, stream>
     454          279 : make_stream_pair(fuse f = {})
     455              : {
     456          279 :     auto sp = std::make_shared<stream::state>(std::move(f));
     457          558 :     return {stream(sp, 0), stream(sp, 1)};
     458          279 : }
     459              : 
     460              : } // test
     461              : } // capy
     462              : } // boost
     463              : 
     464              : #endif
        

Generated by: LCOV version 2.3