LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_basic_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 89.5 % 162 145 17
Test Date: 2026-04-13 22:45:57 Functions: 58.1 % 310 180 130

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/intrusive.hpp>
      14                 : #include <boost/corosio/detail/native_handle.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      17                 : #include <boost/corosio/native/detail/make_err.hpp>
      18                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      19                 : 
      20                 : #include <memory>
      21                 : #include <mutex>
      22                 : #include <utility>
      23                 : 
      24                 : #include <errno.h>
      25                 : #include <netinet/in.h>
      26                 : #include <sys/socket.h>
      27                 : #include <unistd.h>
      28                 : 
      29                 : namespace boost::corosio::detail {
      30                 : 
      31                 : /** CRTP base for reactor-backed socket implementations.
      32                 : 
      33                 :     Extracts the shared data members, virtual overrides, and
      34                 :     cancel/close/register logic that is identical across TCP
      35                 :     (reactor_stream_socket) and UDP (reactor_datagram_socket).
      36                 : 
      37                 :     Derived classes provide CRTP callbacks that enumerate their
      38                 :     specific op slots so cancel/close can iterate them generically.
      39                 : 
      40                 :     @tparam Derived   The concrete socket type (CRTP).
      41                 :     @tparam ImplBase  The public vtable base (tcp_socket::implementation
      42                 :                       or udp_socket::implementation).
      43                 :     @tparam Service   The backend's service type.
      44                 :     @tparam DescState The backend's descriptor_state type.
      45                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      46                 : */
      47                 : template<
      48                 :     class Derived,
      49                 :     class ImplBase,
      50                 :     class Service,
      51                 :     class DescState,
      52                 :     class Endpoint = endpoint>
      53                 : class reactor_basic_socket
      54                 :     : public ImplBase
      55                 :     , public std::enable_shared_from_this<Derived>
      56                 :     , public intrusive_list<Derived>::node
      57                 : {
      58                 :     friend Derived;
      59                 : 
      60                 :     template<class, class, class, class, class, class, class, class>
      61                 :     friend class reactor_stream_socket;
      62                 : 
      63                 :     template<class, class, class, class, class, class, class, class, class, class>
      64                 :     friend class reactor_datagram_socket;
      65                 : 
      66 HIT       24586 :     explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
      67                 : 
      68                 : protected:
      69                 :     Service& svc_;
      70                 :     int fd_ = -1;
      71                 :     Endpoint local_endpoint_;
      72                 : 
      73                 : public:
      74                 :     /// Per-descriptor state for persistent reactor registration.
      75                 :     DescState desc_state_;
      76                 : 
      77           24586 :     ~reactor_basic_socket() override = default;
      78                 : 
      79                 :     /// Return the underlying file descriptor.
      80           50008 :     native_handle_type native_handle() const noexcept override
      81                 :     {
      82           50008 :         return fd_;
      83                 :     }
      84                 : 
      85                 :     /// Return the cached local endpoint.
      86              80 :     Endpoint local_endpoint() const noexcept override
      87                 :     {
      88              80 :         return local_endpoint_;
      89                 :     }
      90                 : 
      91                 :     /// Return true if the socket has an open file descriptor.
      92                 :     bool is_open() const noexcept
      93                 :     {
      94                 :         return fd_ >= 0;
      95                 :     }
      96                 : 
      97                 :     /// Set a socket option.
      98              80 :     std::error_code set_option(
      99                 :         int level,
     100                 :         int optname,
     101                 :         void const* data,
     102                 :         std::size_t size) noexcept override
     103                 :     {
     104              80 :         if (::setsockopt(
     105              80 :                 fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
     106 MIS           0 :             return make_err(errno);
     107 HIT          80 :         return {};
     108                 :     }
     109                 : 
     110                 :     /// Get a socket option.
     111                 :     std::error_code
     112              78 :     get_option(int level, int optname, void* data, std::size_t* size)
     113                 :         const noexcept override
     114                 :     {
     115              78 :         socklen_t len = static_cast<socklen_t>(*size);
     116              78 :         if (::getsockopt(fd_, level, optname, data, &len) != 0)
     117 MIS           0 :             return make_err(errno);
     118 HIT          78 :         *size = static_cast<std::size_t>(len);
     119              78 :         return {};
     120                 :     }
     121                 : 
     122                 :     /// Assign the file descriptor.
     123            8114 :     void set_socket(int fd) noexcept
     124                 :     {
     125            8114 :         fd_ = fd;
     126            8114 :     }
     127                 : 
     128                 :     /// Cache the local endpoint.
     129                 :     void set_local_endpoint(Endpoint ep) noexcept
     130                 :     {
     131                 :         local_endpoint_ = ep;
     132                 :     }
     133                 : 
     134                 :     /** Bind the socket to a local endpoint.
     135                 : 
     136                 :         Calls ::bind() and caches the resulting local endpoint
     137                 :         via getsockname().
     138                 : 
     139                 :         @param ep The endpoint to bind to.
     140                 :         @return Error code on failure, empty on success.
     141                 :     */
     142              76 :     std::error_code do_bind(Endpoint const& ep) noexcept
     143                 :     {
     144              76 :         sockaddr_storage storage{};
     145              76 :         socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
     146              76 :         if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
     147              10 :             return make_err(errno);
     148                 : 
     149              66 :         sockaddr_storage local_storage{};
     150              66 :         socklen_t local_len = sizeof(local_storage);
     151              66 :         if (::getsockname(
     152              66 :                 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     153                 :             0)
     154              52 :             local_endpoint_ =
     155              66 :                 from_sockaddr_as(local_storage, local_len, Endpoint{});
     156                 : 
     157              66 :         return {};
     158                 :     }
     159                 : 
     160                 :     /** Register an op with the reactor.
     161                 : 
     162                 :         Handles cached edge events and deferred cancellation.
     163                 :         Called on the EAGAIN/EINPROGRESS path when speculative
     164                 :         I/O failed.
     165                 :     */
     166                 :     template<class Op>
     167                 :     void register_op(
     168                 :         Op& op,
     169                 :         reactor_op_base*& desc_slot,
     170                 :         bool& ready_flag,
     171                 :         bool& cancel_flag) noexcept;
     172                 : 
     173                 :     /** Cancel a single pending operation.
     174                 : 
     175                 :         Claims the operation from its descriptor_state slot under
     176                 :         the mutex and posts it to the scheduler as cancelled.
     177                 :         Derived must implement:
     178                 :           op_to_desc_slot(Op&) -> reactor_op_base**
     179                 :           op_to_cancel_flag(Op&) -> bool*
     180                 :     */
     181                 :     template<class Op>
     182                 :     void cancel_single_op(Op& op) noexcept;
     183                 : 
     184                 :     /** Cancel all pending operations.
     185                 : 
     186                 :         Invoked by the derived class's cancel() override.
     187                 :         Derived must implement:
     188                 :           for_each_op(auto fn)
     189                 :           for_each_desc_entry(auto fn)
     190                 :     */
     191                 :     void do_cancel() noexcept;
     192                 : 
     193                 :     /** Close the socket and cancel pending operations.
     194                 : 
     195                 :         Invoked by the derived class's close_socket(). The
     196                 :         derived class may add backend-specific cleanup after
     197                 :         calling this method.
     198                 :         Derived must implement:
     199                 :           for_each_op(auto fn)
     200                 :           for_each_desc_entry(auto fn)
     201                 :     */
     202                 :     void do_close_socket() noexcept;
     203                 : 
     204                 :     /** Release the socket without closing the fd.
     205                 : 
     206                 :         Like do_close_socket() but does not call ::close().
     207                 :         Returns the fd so the caller can take ownership.
     208                 :     */
     209                 :     native_handle_type do_release_socket() noexcept;
     210                 : };
     211                 : 
     212                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     213                 : template<class Op>
     214                 : void
     215            8528 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
     216                 :     Op& op,
     217                 :     reactor_op_base*& desc_slot,
     218                 :     bool& ready_flag,
     219                 :     bool& cancel_flag) noexcept
     220                 : {
     221            8528 :     svc_.work_started();
     222                 : 
     223            8528 :     std::lock_guard lock(desc_state_.mutex);
     224            8528 :     bool io_done = false;
     225            8528 :     if (ready_flag)
     226                 :     {
     227             193 :         ready_flag = false;
     228             193 :         op.perform_io();
     229             193 :         io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     230             193 :         if (!io_done)
     231             193 :             op.errn = 0;
     232                 :     }
     233                 : 
     234            8528 :     if (cancel_flag)
     235                 :     {
     236 MIS           0 :         cancel_flag = false;
     237               0 :         op.cancelled.store(true, std::memory_order_relaxed);
     238                 :     }
     239                 : 
     240 HIT        8528 :     if (io_done || op.cancelled.load(std::memory_order_acquire))
     241                 :     {
     242 MIS           0 :         svc_.post(&op);
     243               0 :         svc_.work_finished();
     244                 :     }
     245                 :     else
     246                 :     {
     247 HIT        8528 :         desc_slot = &op;
     248                 :     }
     249            8528 : }
     250                 : 
     251                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     252                 : template<class Op>
     253                 : void
     254             201 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
     255                 :     Op& op) noexcept
     256                 : {
     257             201 :     auto self = this->weak_from_this().lock();
     258             201 :     if (!self)
     259 MIS           0 :         return;
     260                 : 
     261 HIT         201 :     op.request_cancel();
     262                 : 
     263             201 :     auto* d                       = static_cast<Derived*>(this);
     264             201 :     reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
     265                 : 
     266             201 :     if (desc_op_ptr)
     267                 :     {
     268             201 :         reactor_op_base* claimed = nullptr;
     269                 :         {
     270             201 :             std::lock_guard lock(desc_state_.mutex);
     271             201 :             if (*desc_op_ptr == &op)
     272             201 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     273                 :             else
     274                 :             {
     275 MIS           0 :                 bool* cflag = d->op_to_cancel_flag(op);
     276               0 :                 if (cflag)
     277               0 :                     *cflag = true;
     278                 :             }
     279 HIT         201 :         }
     280             201 :         if (claimed)
     281                 :         {
     282             201 :             op.impl_ptr = self;
     283             201 :             svc_.post(&op);
     284             201 :             svc_.work_finished();
     285                 :         }
     286                 :     }
     287             201 : }
     288                 : 
     289                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     290                 : void
     291             197 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     292                 :     do_cancel() noexcept
     293                 : {
     294             197 :     auto self = this->weak_from_this().lock();
     295             197 :     if (!self)
     296 MIS           0 :         return;
     297                 : 
     298 HIT         197 :     auto* d = static_cast<Derived*>(this);
     299                 : 
     300             796 :     d->for_each_op([](auto& op) { op.request_cancel(); });
     301                 : 
     302                 :     // Claim ops under a single lock acquisition
     303                 :     struct claimed_entry
     304                 :     {
     305                 :         reactor_op_base* op   = nullptr;
     306                 :         reactor_op_base* base = nullptr;
     307                 :     };
     308                 :     // Max 3 ops (conn, rd, wr)
     309             197 :     claimed_entry claimed[3];
     310             197 :     int count = 0;
     311                 : 
     312                 :     {
     313             197 :         std::lock_guard lock(desc_state_.mutex);
     314            1395 :         d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
     315             599 :             if (desc_slot == &op)
     316                 :             {
     317             105 :                 claimed[count].op   = std::exchange(desc_slot, nullptr);
     318             105 :                 claimed[count].base = &op;
     319             105 :                 ++count;
     320                 :             }
     321                 :         });
     322             197 :     }
     323                 : 
     324             302 :     for (int i = 0; i < count; ++i)
     325                 :     {
     326             105 :         claimed[i].base->impl_ptr = self;
     327             105 :         svc_.post(claimed[i].base);
     328             105 :         svc_.work_finished();
     329                 :     }
     330             197 : }
     331                 : 
     332                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     333                 : void
     334           73866 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     335                 :     do_close_socket() noexcept
     336                 : {
     337           73866 :     auto self = this->weak_from_this().lock();
     338           73866 :     if (self)
     339                 :     {
     340           73866 :         auto* d = static_cast<Derived*>(this);
     341                 : 
     342          296392 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     343                 : 
     344                 :         struct claimed_entry
     345                 :         {
     346                 :             reactor_op_base* base = nullptr;
     347                 :         };
     348           73866 :         claimed_entry claimed[3];
     349           73866 :         int count = 0;
     350                 : 
     351                 :         {
     352           73866 :             std::lock_guard lock(desc_state_.mutex);
     353           73866 :             d->for_each_desc_entry(
     354          445052 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     355          222526 :                     auto* c = std::exchange(desc_slot, nullptr);
     356          222526 :                     if (c)
     357                 :                     {
     358               4 :                         claimed[count].base = c;
     359               4 :                         ++count;
     360                 :                     }
     361                 :                 });
     362           73866 :             desc_state_.read_ready             = false;
     363           73866 :             desc_state_.write_ready            = false;
     364           73866 :             desc_state_.read_cancel_pending    = false;
     365           73866 :             desc_state_.write_cancel_pending   = false;
     366           73866 :             desc_state_.connect_cancel_pending = false;
     367                 : 
     368           73866 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     369             268 :                 desc_state_.impl_ref_ = self;
     370           73866 :         }
     371                 : 
     372           73870 :         for (int i = 0; i < count; ++i)
     373                 :         {
     374               4 :             claimed[i].base->impl_ptr = self;
     375               4 :             svc_.post(claimed[i].base);
     376               4 :             svc_.work_finished();
     377                 :         }
     378                 :     }
     379                 : 
     380           73866 :     if (fd_ >= 0)
     381                 :     {
     382           16403 :         if (desc_state_.registered_events != 0)
     383           16403 :             svc_.scheduler().deregister_descriptor(fd_);
     384           16403 :         ::close(fd_);
     385           16403 :         fd_ = -1;
     386                 :     }
     387                 : 
     388           73866 :     desc_state_.fd                = -1;
     389           73866 :     desc_state_.registered_events = 0;
     390                 : 
     391           73866 :     local_endpoint_ = Endpoint{};
     392           73866 : }
     393                 : 
     394                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     395                 : native_handle_type
     396               2 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     397                 :     do_release_socket() noexcept
     398                 : {
     399                 :     // Cancel pending ops (same as do_close_socket)
     400               2 :     auto self = this->weak_from_this().lock();
     401               2 :     if (self)
     402                 :     {
     403               2 :         auto* d = static_cast<Derived*>(this);
     404                 : 
     405               8 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     406                 : 
     407                 :         struct claimed_entry
     408                 :         {
     409                 :             reactor_op_base* base = nullptr;
     410                 :         };
     411               2 :         claimed_entry claimed[3];
     412               2 :         int count = 0;
     413                 : 
     414                 :         {
     415               2 :             std::lock_guard lock(desc_state_.mutex);
     416               2 :             d->for_each_desc_entry(
     417              12 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     418               6 :                     auto* c = std::exchange(desc_slot, nullptr);
     419               6 :                     if (c)
     420                 :                     {
     421 MIS           0 :                         claimed[count].base = c;
     422               0 :                         ++count;
     423                 :                     }
     424                 :                 });
     425 HIT           2 :             desc_state_.read_ready             = false;
     426               2 :             desc_state_.write_ready            = false;
     427               2 :             desc_state_.read_cancel_pending    = false;
     428               2 :             desc_state_.write_cancel_pending   = false;
     429               2 :             desc_state_.connect_cancel_pending = false;
     430                 : 
     431               2 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     432 MIS           0 :                 desc_state_.impl_ref_ = self;
     433 HIT           2 :         }
     434                 : 
     435               2 :         for (int i = 0; i < count; ++i)
     436                 :         {
     437 MIS           0 :             claimed[i].base->impl_ptr = self;
     438               0 :             svc_.post(claimed[i].base);
     439               0 :             svc_.work_finished();
     440                 :         }
     441                 :     }
     442                 : 
     443 HIT           2 :     native_handle_type released = fd_;
     444                 : 
     445               2 :     if (fd_ >= 0)
     446                 :     {
     447               2 :         if (desc_state_.registered_events != 0)
     448               2 :             svc_.scheduler().deregister_descriptor(fd_);
     449                 :         // Do NOT close -- caller takes ownership
     450               2 :         fd_ = -1;
     451                 :     }
     452                 : 
     453               2 :     desc_state_.fd                = -1;
     454               2 :     desc_state_.registered_events = 0;
     455                 : 
     456               2 :     local_endpoint_ = Endpoint{};
     457                 : 
     458               4 :     return released;
     459               2 : }
     460                 : 
     461                 : } // namespace boost::corosio::detail
     462                 : 
     463                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
        

Generated by: LCOV version 2.3