LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_stream_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 84.3 % 197 166 31
Test Date: 2026-04-13 22:45:57 Functions: 59.5 % 84 50 34

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/tcp_socket.hpp>
      14                 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
      15                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      16                 : #include <boost/capy/buffers.hpp>
      17                 : 
      18                 : #include <coroutine>
      19                 : 
      20                 : #include <errno.h>
      21                 : #include <sys/socket.h>
      22                 : #include <sys/uio.h>
      23                 : 
      24                 : namespace boost::corosio::detail {
      25                 : 
      26                 : /** CRTP base for reactor-backed stream socket implementations.
      27                 : 
      28                 :     Inherits shared data members and cancel/close/register logic
      29                 :     from reactor_basic_socket. Adds the stream-specific remote
      30                 :     endpoint, shutdown, and I/O dispatch (connect, read, write).
      31                 : 
      32                 :     @tparam Derived   The concrete socket type (CRTP).
      33                 :     @tparam Service   The backend's socket service type.
      34                 :     @tparam ConnOp    The backend's connect op type.
      35                 :     @tparam ReadOp    The backend's read op type.
      36                 :     @tparam WriteOp   The backend's write op type.
      37                 :     @tparam DescState The backend's descriptor_state type.
      38                 :     @tparam ImplBase  The public vtable base
      39                 :                       (tcp_socket::implementation or
      40                 :                        local_stream_socket::implementation).
      41                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      42                 : */
      43                 : template<
      44                 :     class Derived,
      45                 :     class Service,
      46                 :     class ConnOp,
      47                 :     class ReadOp,
      48                 :     class WriteOp,
      49                 :     class DescState,
      50                 :     class ImplBase = tcp_socket::implementation,
      51                 :     class Endpoint = endpoint>
      52                 : class reactor_stream_socket
      53                 :     : public reactor_basic_socket<
      54                 :           Derived,
      55                 :           ImplBase,
      56                 :           Service,
      57                 :           DescState,
      58                 :           Endpoint>
      59                 : {
      60                 :     using base_type = reactor_basic_socket<
      61                 :         Derived,
      62                 :         ImplBase,
      63                 :         Service,
      64                 :         DescState,
      65                 :         Endpoint>;
      66                 :     friend base_type;
      67                 :     friend Derived;
      68                 : 
      69 HIT       24466 :     explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
      70                 : 
      71                 : protected:
      72                 :     Endpoint remote_endpoint_;
      73                 : 
      74                 : public:
      75                 :     /// Pending connect operation slot.
      76                 :     ConnOp conn_;
      77                 : 
      78                 :     /// Pending read operation slot.
      79                 :     ReadOp rd_;
      80                 : 
      81                 :     /// Pending write operation slot.
      82                 :     WriteOp wr_;
      83                 : 
      84           24466 :     ~reactor_stream_socket() override = default;
      85                 : 
      86                 :     /// Return the cached remote endpoint.
      87              44 :     Endpoint remote_endpoint() const noexcept override
      88                 :     {
      89              44 :         return remote_endpoint_;
      90                 :     }
      91                 : 
      92                 :     /** Shut down part or all of the full-duplex connection.
      93                 : 
      94                 :         Not an override — concrete backend classes forward their
      95                 :         ImplBase-typed shutdown() here.
      96                 : 
      97                 :         @param what 0 = receive, 1 = send, 2 = both.
      98                 :     */
      99               6 :     std::error_code do_shutdown(int what) noexcept
     100                 :     {
     101                 :         int how;
     102               6 :         switch (what)
     103                 :         {
     104               2 :         case 0: // shutdown_receive
     105               2 :             how = SHUT_RD;
     106               2 :             break;
     107               2 :         case 1: // shutdown_send
     108               2 :             how = SHUT_WR;
     109               2 :             break;
     110               2 :         case 2: // shutdown_both
     111               2 :             how = SHUT_RDWR;
     112               2 :             break;
     113 MIS           0 :         default:
     114               0 :             return make_err(EINVAL);
     115                 :         }
     116 HIT           6 :         if (::shutdown(this->fd_, how) != 0)
     117 MIS           0 :             return make_err(errno);
     118 HIT           6 :         return {};
     119                 :     }
     120                 : 
     121                 :     /// Cache local and remote endpoints.
     122           16228 :     void set_endpoints(Endpoint local, Endpoint remote) noexcept
     123                 :     {
     124           16228 :         this->local_endpoint_ = std::move(local);
     125           16228 :         remote_endpoint_      = std::move(remote);
     126           16228 :     }
     127                 : 
     128                 :     /** Shared connect dispatch.
     129                 : 
     130                 :         Tries the connect syscall speculatively. On synchronous
     131                 :         completion, returns via inline budget or posts through queue.
     132                 :         On EINPROGRESS, registers with the reactor.
     133                 :     */
     134                 :     std::coroutine_handle<> do_connect(
     135                 :         std::coroutine_handle<>,
     136                 :         capy::executor_ref,
     137                 :         Endpoint const&,
     138                 :         std::stop_token const&,
     139                 :         std::error_code*);
     140                 : 
     141                 :     /** Shared scatter-read dispatch.
     142                 : 
     143                 :         Tries readv() speculatively. On success or hard error,
     144                 :         returns via inline budget or posts through queue.
     145                 :         On EAGAIN, registers with the reactor.
     146                 :     */
     147                 :     std::coroutine_handle<> do_read_some(
     148                 :         std::coroutine_handle<>,
     149                 :         capy::executor_ref,
     150                 :         buffer_param,
     151                 :         std::stop_token const&,
     152                 :         std::error_code*,
     153                 :         std::size_t*);
     154                 : 
     155                 :     /** Shared gather-write dispatch.
     156                 : 
     157                 :         Tries the write via WriteOp::write_policy speculatively.
     158                 :         On success or hard error, returns via inline budget or
     159                 :         posts through queue. On EAGAIN, registers with the reactor.
     160                 :     */
     161                 :     std::coroutine_handle<> do_write_some(
     162                 :         std::coroutine_handle<>,
     163                 :         capy::executor_ref,
     164                 :         buffer_param,
     165                 :         std::stop_token const&,
     166                 :         std::error_code*,
     167                 :         std::size_t*);
     168                 : 
     169                 :     /** Close the socket and cancel pending operations.
     170                 : 
     171                 :         Extends the base do_close_socket() to also reset
     172                 :         the remote endpoint.
     173                 :     */
     174           73402 :     void do_close_socket() noexcept
     175                 :     {
     176           73402 :         base_type::do_close_socket();
     177           73402 :         remote_endpoint_ = Endpoint{};
     178           73402 :     }
     179                 : 
     180                 : private:
     181                 :     // CRTP callbacks for reactor_basic_socket cancel/close
     182                 : 
     183                 :     template<class Op>
     184             199 :     reactor_op_base** op_to_desc_slot(Op& op) noexcept
     185                 :     {
     186             199 :         if (&op == static_cast<void*>(&conn_))
     187 MIS           0 :             return &this->desc_state_.connect_op;
     188 HIT         199 :         if (&op == static_cast<void*>(&rd_))
     189             199 :             return &this->desc_state_.read_op;
     190 MIS           0 :         if (&op == static_cast<void*>(&wr_))
     191               0 :             return &this->desc_state_.write_op;
     192               0 :         return nullptr;
     193                 :     }
     194                 : 
     195                 :     template<class Op>
     196               0 :     bool* op_to_cancel_flag(Op& op) noexcept
     197                 :     {
     198               0 :         if (&op == static_cast<void*>(&conn_))
     199               0 :             return &this->desc_state_.connect_cancel_pending;
     200               0 :         if (&op == static_cast<void*>(&rd_))
     201               0 :             return &this->desc_state_.read_cancel_pending;
     202               0 :         if (&op == static_cast<void*>(&wr_))
     203               0 :             return &this->desc_state_.write_cancel_pending;
     204               0 :         return nullptr;
     205                 :     }
     206                 : 
     207                 :     template<class Fn>
     208 HIT       73597 :     void for_each_op(Fn fn) noexcept
     209                 :     {
     210           73597 :         fn(conn_);
     211           73597 :         fn(rd_);
     212           73597 :         fn(wr_);
     213           73597 :     }
     214                 : 
     215                 :     template<class Fn>
     216           73597 :     void for_each_desc_entry(Fn fn) noexcept
     217                 :     {
     218           73597 :         fn(conn_, this->desc_state_.connect_op);
     219           73597 :         fn(rd_, this->desc_state_.read_op);
     220           73597 :         fn(wr_, this->desc_state_.write_op);
     221           73597 :     }
     222                 : };
     223                 : 
     224                 : template<
     225                 :     class Derived,
     226                 :     class Service,
     227                 :     class ConnOp,
     228                 :     class ReadOp,
     229                 :     class WriteOp,
     230                 :     class DescState,
     231                 :     class ImplBase,
     232                 :     class Endpoint>
     233                 : std::coroutine_handle<>
     234            8119 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     235                 :     do_connect(
     236                 :         std::coroutine_handle<> h,
     237                 :         capy::executor_ref ex,
     238                 :         Endpoint const& ep,
     239                 :         std::stop_token const& token,
     240                 :         std::error_code* ec)
     241                 : {
     242            8119 :     auto& op = conn_;
     243                 : 
     244            8119 :     sockaddr_storage storage{};
     245            8119 :     socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
     246                 :     int result =
     247            8119 :         ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     248                 : 
     249            8119 :     if (result == 0)
     250                 :     {
     251               4 :         sockaddr_storage local_storage{};
     252               4 :         socklen_t local_len = sizeof(local_storage);
     253               4 :         if (::getsockname(
     254                 :                 this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
     255               4 :                 &local_len) == 0)
     256 MIS           0 :             this->local_endpoint_ =
     257 HIT           4 :                 from_sockaddr_as(local_storage, local_len, Endpoint{});
     258               4 :         remote_endpoint_ = ep;
     259                 :     }
     260                 : 
     261            8119 :     if (result == 0 || errno != EINPROGRESS)
     262                 :     {
     263               4 :         int err = (result < 0) ? errno : 0;
     264               4 :         if (this->svc_.scheduler().try_consume_inline_budget())
     265                 :         {
     266 MIS           0 :             *ec = err ? make_err(err) : std::error_code{};
     267               0 :             op.cont_op.cont.h = h;
     268               0 :             return dispatch_coro(ex, op.cont_op.cont);
     269                 :         }
     270 HIT           4 :         op.reset();
     271               4 :         op.h               = h;
     272               4 :         op.ex              = ex;
     273               4 :         op.ec_out          = ec;
     274               4 :         op.fd              = this->fd_;
     275               4 :         op.target_endpoint = ep;
     276               4 :         op.start(token, static_cast<Derived*>(this));
     277               4 :         op.impl_ptr = this->shared_from_this();
     278               4 :         op.complete(err, 0);
     279               4 :         this->svc_.post(&op);
     280               4 :         return std::noop_coroutine();
     281                 :     }
     282                 : 
     283                 :     // EINPROGRESS — register with reactor
     284            8115 :     op.reset();
     285            8115 :     op.h               = h;
     286            8115 :     op.ex              = ex;
     287            8115 :     op.ec_out          = ec;
     288            8115 :     op.fd              = this->fd_;
     289            8115 :     op.target_endpoint = ep;
     290            8115 :     op.start(token, static_cast<Derived*>(this));
     291            8115 :     op.impl_ptr = this->shared_from_this();
     292                 : 
     293            8115 :     this->register_op(
     294            8115 :         op, this->desc_state_.connect_op, this->desc_state_.write_ready,
     295            8115 :         this->desc_state_.connect_cancel_pending);
     296            8115 :     return std::noop_coroutine();
     297                 : }
     298                 : 
     299                 : template<
     300                 :     class Derived,
     301                 :     class Service,
     302                 :     class ConnOp,
     303                 :     class ReadOp,
     304                 :     class WriteOp,
     305                 :     class DescState,
     306                 :     class ImplBase,
     307                 :     class Endpoint>
     308                 : std::coroutine_handle<>
     309          353803 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     310                 :     do_read_some(
     311                 :         std::coroutine_handle<> h,
     312                 :         capy::executor_ref ex,
     313                 :         buffer_param param,
     314                 :         std::stop_token const& token,
     315                 :         std::error_code* ec,
     316                 :         std::size_t* bytes_out)
     317                 : {
     318          353803 :     auto& op = rd_;
     319          353803 :     op.reset();
     320                 : 
     321          353803 :     capy::mutable_buffer bufs[ReadOp::max_buffers];
     322          353803 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
     323                 : 
     324          353803 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     325                 :     {
     326               2 :         op.empty_buffer_read = true;
     327               2 :         op.h                 = h;
     328               2 :         op.ex                = ex;
     329               2 :         op.ec_out            = ec;
     330               2 :         op.bytes_out         = bytes_out;
     331               2 :         op.start(token, static_cast<Derived*>(this));
     332               2 :         op.impl_ptr = this->shared_from_this();
     333               2 :         op.complete(0, 0);
     334               2 :         this->svc_.post(&op);
     335               2 :         return std::noop_coroutine();
     336                 :     }
     337                 : 
     338          707602 :     for (int i = 0; i < op.iovec_count; ++i)
     339                 :     {
     340          353801 :         op.iovecs[i].iov_base = bufs[i].data();
     341          353801 :         op.iovecs[i].iov_len  = bufs[i].size();
     342                 :     }
     343                 : 
     344                 :     // Speculative read
     345                 :     ssize_t n;
     346                 :     do
     347                 :     {
     348          353801 :         n = ::readv(this->fd_, op.iovecs, op.iovec_count);
     349                 :     }
     350          353801 :     while (n < 0 && errno == EINTR);
     351                 : 
     352          353801 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     353                 :     {
     354          353398 :         int err    = (n < 0) ? errno : 0;
     355          353398 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     356                 : 
     357          353398 :         if (this->svc_.scheduler().try_consume_inline_budget())
     358                 :         {
     359          282752 :             if (err)
     360 MIS           0 :                 *ec = make_err(err);
     361 HIT      282752 :             else if (n == 0)
     362              10 :                 *ec = capy::error::eof;
     363                 :             else
     364          282742 :                 *ec = {};
     365          282752 :             *bytes_out = bytes;
     366          282752 :             op.cont_op.cont.h = h;
     367          282752 :             return dispatch_coro(ex, op.cont_op.cont);
     368                 :         }
     369           70646 :         op.h         = h;
     370           70646 :         op.ex        = ex;
     371           70646 :         op.ec_out    = ec;
     372           70646 :         op.bytes_out = bytes_out;
     373           70646 :         op.start(token, static_cast<Derived*>(this));
     374           70646 :         op.impl_ptr = this->shared_from_this();
     375           70646 :         op.complete(err, bytes);
     376           70646 :         this->svc_.post(&op);
     377           70646 :         return std::noop_coroutine();
     378                 :     }
     379                 : 
     380                 :     // EAGAIN — register with reactor
     381             403 :     op.h         = h;
     382             403 :     op.ex        = ex;
     383             403 :     op.ec_out    = ec;
     384             403 :     op.bytes_out = bytes_out;
     385             403 :     op.fd        = this->fd_;
     386             403 :     op.start(token, static_cast<Derived*>(this));
     387             403 :     op.impl_ptr = this->shared_from_this();
     388                 : 
     389             403 :     this->register_op(
     390             403 :         op, this->desc_state_.read_op, this->desc_state_.read_ready,
     391             403 :         this->desc_state_.read_cancel_pending);
     392             403 :     return std::noop_coroutine();
     393                 : }
     394                 : 
     395                 : template<
     396                 :     class Derived,
     397                 :     class Service,
     398                 :     class ConnOp,
     399                 :     class ReadOp,
     400                 :     class WriteOp,
     401                 :     class DescState,
     402                 :     class ImplBase,
     403                 :     class Endpoint>
     404                 : std::coroutine_handle<>
     405          353497 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     406                 :     do_write_some(
     407                 :         std::coroutine_handle<> h,
     408                 :         capy::executor_ref ex,
     409                 :         buffer_param param,
     410                 :         std::stop_token const& token,
     411                 :         std::error_code* ec,
     412                 :         std::size_t* bytes_out)
     413                 : {
     414          353497 :     auto& op = wr_;
     415          353497 :     op.reset();
     416                 : 
     417          353497 :     capy::mutable_buffer bufs[WriteOp::max_buffers];
     418          353497 :     op.iovec_count =
     419          353497 :         static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
     420                 : 
     421          353497 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     422                 :     {
     423               2 :         op.h         = h;
     424               2 :         op.ex        = ex;
     425               2 :         op.ec_out    = ec;
     426               2 :         op.bytes_out = bytes_out;
     427               2 :         op.start(token, static_cast<Derived*>(this));
     428               2 :         op.impl_ptr = this->shared_from_this();
     429               2 :         op.complete(0, 0);
     430               2 :         this->svc_.post(&op);
     431               2 :         return std::noop_coroutine();
     432                 :     }
     433                 : 
     434          706990 :     for (int i = 0; i < op.iovec_count; ++i)
     435                 :     {
     436          353495 :         op.iovecs[i].iov_base = bufs[i].data();
     437          353495 :         op.iovecs[i].iov_len  = bufs[i].size();
     438                 :     }
     439                 : 
     440                 :     // Speculative write via backend-specific write policy
     441                 :     ssize_t n =
     442          353495 :         WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
     443                 : 
     444          353495 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     445                 :     {
     446          353495 :         int err    = (n < 0) ? errno : 0;
     447          353495 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     448                 : 
     449          353495 :         if (this->svc_.scheduler().try_consume_inline_budget())
     450                 :         {
     451          282813 :             *ec        = err ? make_err(err) : std::error_code{};
     452          282813 :             *bytes_out = bytes;
     453          282813 :             op.cont_op.cont.h = h;
     454          282813 :             return dispatch_coro(ex, op.cont_op.cont);
     455                 :         }
     456           70682 :         op.h         = h;
     457           70682 :         op.ex        = ex;
     458           70682 :         op.ec_out    = ec;
     459           70682 :         op.bytes_out = bytes_out;
     460           70682 :         op.start(token, static_cast<Derived*>(this));
     461           70682 :         op.impl_ptr = this->shared_from_this();
     462           70682 :         op.complete(err, bytes);
     463           70682 :         this->svc_.post(&op);
     464           70682 :         return std::noop_coroutine();
     465                 :     }
     466                 : 
     467                 :     // EAGAIN — register with reactor
     468 MIS           0 :     op.h         = h;
     469               0 :     op.ex        = ex;
     470               0 :     op.ec_out    = ec;
     471               0 :     op.bytes_out = bytes_out;
     472               0 :     op.fd        = this->fd_;
     473               0 :     op.start(token, static_cast<Derived*>(this));
     474               0 :     op.impl_ptr = this->shared_from_this();
     475                 : 
     476               0 :     this->register_op(
     477               0 :         op, this->desc_state_.write_op, this->desc_state_.write_ready,
     478               0 :         this->desc_state_.write_cancel_pending);
     479               0 :     return std::noop_coroutine();
     480                 : }
     481                 : 
     482                 : } // namespace boost::corosio::detail
     483                 : 
     484                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
        

Generated by: LCOV version 2.3