LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_local_stream_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 40.8 % 157 64 93
Test Date: 2026-04-13 22:45:57 Functions: 76.9 % 13 10 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_local_stream_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_local_stream_service.hpp>
      23                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      24                 : #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
      25                 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
      26                 : 
      27                 : #include <memory>
      28                 : #include <mutex>
      29                 : #include <utility>
      30                 : 
      31                 : #include <errno.h>
      32                 : #include <fcntl.h>
      33                 : #include <sys/select.h>
      34                 : #include <sys/socket.h>
      35                 : #include <sys/un.h>
      36                 : #include <unistd.h>
      37                 : 
      38                 : namespace boost::corosio::detail {
      39                 : 
      40                 : /* select local stream acceptor service implementation.
      41                 : 
      42                 :    Inherits from local_stream_acceptor_service to enable runtime
      43                 :    polymorphism. Uses key_type = local_stream_acceptor_service
      44                 :    for service lookup.
      45                 : */
      46                 : class BOOST_COROSIO_DECL select_local_stream_acceptor_service final
      47                 :     : public reactor_acceptor_service<
      48                 :           select_local_stream_acceptor_service,
      49                 :           local_stream_acceptor_service,
      50                 :           select_scheduler,
      51                 :           select_local_stream_acceptor,
      52                 :           select_local_stream_service>
      53                 : {
      54                 :     using base_type = reactor_acceptor_service<
      55                 :         select_local_stream_acceptor_service,
      56                 :         local_stream_acceptor_service,
      57                 :         select_scheduler,
      58                 :         select_local_stream_acceptor,
      59                 :         select_local_stream_service>;
      60                 :     friend base_type;
      61                 : 
      62                 : public:
      63                 :     explicit select_local_stream_acceptor_service(
      64                 :         capy::execution_context& ctx);
      65                 :     ~select_local_stream_acceptor_service() override;
      66                 : 
      67                 :     std::error_code open_acceptor_socket(
      68                 :         local_stream_acceptor::implementation& impl,
      69                 :         int family,
      70                 :         int type,
      71                 :         int protocol) override;
      72                 :     std::error_code bind_acceptor(
      73                 :         local_stream_acceptor::implementation& impl,
      74                 :         corosio::local_endpoint ep) override;
      75                 :     std::error_code listen_acceptor(
      76                 :         local_stream_acceptor::implementation& impl,
      77                 :         int backlog) override;
      78                 : };
      79                 : 
      80                 : inline void
      81 MIS           0 : select_local_accept_op::cancel() noexcept
      82                 : {
      83               0 :     if (acceptor_impl_)
      84               0 :         acceptor_impl_->cancel_single_op(*this);
      85                 :     else
      86               0 :         request_cancel();
      87               0 : }
      88                 : 
      89                 : inline void
      90 HIT           2 : select_local_accept_op::operator()()
      91                 : {
      92               2 :     complete_accept_op<select_local_stream_socket>(*this);
      93               2 : }
      94                 : 
      95               6 : inline select_local_stream_acceptor::select_local_stream_acceptor(
      96               6 :     select_local_stream_acceptor_service& svc) noexcept
      97               6 :     : reactor_acceptor(svc)
      98                 : {
      99               6 : }
     100                 : 
     101                 : inline std::coroutine_handle<>
     102               2 : select_local_stream_acceptor::accept(
     103                 :     std::coroutine_handle<> h,
     104                 :     capy::executor_ref ex,
     105                 :     std::stop_token token,
     106                 :     std::error_code* ec,
     107                 :     io_object::implementation** impl_out)
     108                 : {
     109               2 :     auto& op = acc_;
     110               2 :     op.reset();
     111               2 :     op.h        = h;
     112               2 :     op.ex       = ex;
     113               2 :     op.ec_out   = ec;
     114               2 :     op.impl_out = impl_out;
     115               2 :     op.fd       = fd_;
     116               2 :     op.start(token, this);
     117                 : 
     118               2 :     sockaddr_storage peer_storage{};
     119               2 :     socklen_t addrlen = sizeof(peer_storage);
     120                 :     int accepted;
     121                 :     do
     122                 :     {
     123                 :         accepted =
     124               2 :             ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
     125                 :     }
     126               2 :     while (accepted < 0 && errno == EINTR);
     127                 : 
     128               2 :     if (accepted >= 0)
     129                 :     {
     130 MIS           0 :         if (accepted >= FD_SETSIZE)
     131                 :         {
     132               0 :             ::close(accepted);
     133               0 :             op.complete(EINVAL, 0);
     134               0 :             op.impl_ptr = shared_from_this();
     135               0 :             svc_.post(&op);
     136               0 :             return std::noop_coroutine();
     137                 :         }
     138                 : 
     139               0 :         int flags = ::fcntl(accepted, F_GETFL, 0);
     140               0 :         if (flags == -1)
     141                 :         {
     142               0 :             int err = errno;
     143               0 :             ::close(accepted);
     144               0 :             op.complete(err, 0);
     145               0 :             op.impl_ptr = shared_from_this();
     146               0 :             svc_.post(&op);
     147               0 :             return std::noop_coroutine();
     148                 :         }
     149                 : 
     150               0 :         if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
     151                 :         {
     152               0 :             int err = errno;
     153               0 :             ::close(accepted);
     154               0 :             op.complete(err, 0);
     155               0 :             op.impl_ptr = shared_from_this();
     156               0 :             svc_.post(&op);
     157               0 :             return std::noop_coroutine();
     158                 :         }
     159                 : 
     160               0 :         if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
     161                 :         {
     162               0 :             int err = errno;
     163               0 :             ::close(accepted);
     164               0 :             op.complete(err, 0);
     165               0 :             op.impl_ptr = shared_from_this();
     166               0 :             svc_.post(&op);
     167               0 :             return std::noop_coroutine();
     168                 :         }
     169                 : 
     170                 :         {
     171               0 :             std::lock_guard lock(desc_state_.mutex);
     172               0 :             desc_state_.read_ready = false;
     173               0 :         }
     174                 : 
     175               0 :         if (svc_.scheduler().try_consume_inline_budget())
     176                 :         {
     177               0 :             auto* socket_svc = svc_.stream_service();
     178               0 :             if (socket_svc)
     179                 :             {
     180                 :                 auto& impl =
     181                 :                     static_cast<select_local_stream_socket&>(
     182               0 :                         *socket_svc->construct());
     183               0 :                 impl.set_socket(accepted);
     184                 : 
     185               0 :                 impl.desc_state_.fd = accepted;
     186                 :                 {
     187               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     188               0 :                     impl.desc_state_.read_op    = nullptr;
     189               0 :                     impl.desc_state_.write_op   = nullptr;
     190               0 :                     impl.desc_state_.connect_op = nullptr;
     191               0 :                 }
     192               0 :                 socket_svc->scheduler().register_descriptor(
     193                 :                     accepted, &impl.desc_state_);
     194                 : 
     195               0 :                 impl.set_endpoints(
     196                 :                     local_endpoint_,
     197               0 :                     from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
     198                 : 
     199               0 :                 *ec = {};
     200               0 :                 if (impl_out)
     201               0 :                     *impl_out = &impl;
     202                 :             }
     203                 :             else
     204                 :             {
     205               0 :                 ::close(accepted);
     206               0 :                 *ec = make_err(ENOENT);
     207               0 :                 if (impl_out)
     208               0 :                     *impl_out = nullptr;
     209                 :             }
     210               0 :             op.cont_op.cont.h = h;
     211               0 :             return dispatch_coro(ex, op.cont_op.cont);
     212                 :         }
     213                 : 
     214               0 :         op.accepted_fd  = accepted;
     215               0 :         op.peer_storage = peer_storage;
     216               0 :         op.complete(0, 0);
     217               0 :         op.impl_ptr = shared_from_this();
     218               0 :         svc_.post(&op);
     219               0 :         return std::noop_coroutine();
     220                 :     }
     221                 : 
     222 HIT           2 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     223                 :     {
     224               2 :         op.impl_ptr = shared_from_this();
     225               2 :         svc_.work_started();
     226                 : 
     227               2 :         std::lock_guard lock(desc_state_.mutex);
     228               2 :         bool io_done = false;
     229               2 :         if (desc_state_.read_ready)
     230                 :         {
     231 MIS           0 :             desc_state_.read_ready = false;
     232               0 :             op.perform_io();
     233               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     234               0 :             if (!io_done)
     235               0 :                 op.errn = 0;
     236                 :         }
     237                 : 
     238 HIT           2 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     239                 :         {
     240 MIS           0 :             svc_.post(&op);
     241               0 :             svc_.work_finished();
     242                 :         }
     243                 :         else
     244                 :         {
     245 HIT           2 :             desc_state_.read_op = &op;
     246                 :         }
     247               2 :         return std::noop_coroutine();
     248               2 :     }
     249                 : 
     250 MIS           0 :     op.complete(errno, 0);
     251               0 :     op.impl_ptr = shared_from_this();
     252               0 :     svc_.post(&op);
     253               0 :     return std::noop_coroutine();
     254                 : }
     255                 : 
     256                 : inline void
     257               0 : select_local_stream_acceptor::cancel() noexcept
     258                 : {
     259               0 :     do_cancel();
     260               0 : }
     261                 : 
     262                 : inline void
     263 HIT          24 : select_local_stream_acceptor::close_socket() noexcept
     264                 : {
     265              24 :     do_close_socket();
     266              24 : }
     267                 : 
     268                 : inline native_handle_type
     269 MIS           0 : select_local_stream_acceptor::release_socket() noexcept
     270                 : {
     271               0 :     return this->do_release_socket();
     272                 : }
     273                 : 
     274 HIT         228 : inline select_local_stream_acceptor_service::
     275             228 :     select_local_stream_acceptor_service(capy::execution_context& ctx)
     276             228 :     : base_type(ctx)
     277                 : {
     278             228 :     auto* svc = ctx_.find_service<detail::local_stream_service>();
     279             228 :     stream_svc_ = svc
     280             228 :         ? dynamic_cast<select_local_stream_service*>(svc)
     281                 :         : nullptr;
     282             228 :     assert(stream_svc_ &&
     283                 :         "local_stream_service must be registered before acceptor service");
     284             228 : }
     285                 : 
     286             456 : inline select_local_stream_acceptor_service::
     287             456 :     ~select_local_stream_acceptor_service() {}
     288                 : 
     289                 : inline std::error_code
     290               6 : select_local_stream_acceptor_service::open_acceptor_socket(
     291                 :     local_stream_acceptor::implementation& impl,
     292                 :     int family,
     293                 :     int type,
     294                 :     int protocol)
     295                 : {
     296               6 :     auto* select_impl = static_cast<select_local_stream_acceptor*>(&impl);
     297               6 :     select_impl->close_socket();
     298                 : 
     299               6 :     int fd = ::socket(family, type, protocol);
     300               6 :     if (fd < 0)
     301 MIS           0 :         return make_err(errno);
     302                 : 
     303 HIT           6 :     int flags = ::fcntl(fd, F_GETFL, 0);
     304               6 :     if (flags == -1)
     305                 :     {
     306 MIS           0 :         int errn = errno;
     307               0 :         ::close(fd);
     308               0 :         return make_err(errn);
     309                 :     }
     310 HIT           6 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     311                 :     {
     312 MIS           0 :         int errn = errno;
     313               0 :         ::close(fd);
     314               0 :         return make_err(errn);
     315                 :     }
     316 HIT           6 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     317                 :     {
     318 MIS           0 :         int errn = errno;
     319               0 :         ::close(fd);
     320               0 :         return make_err(errn);
     321                 :     }
     322                 : 
     323 HIT           6 :     if (fd >= FD_SETSIZE)
     324                 :     {
     325 MIS           0 :         ::close(fd);
     326               0 :         return make_err(EMFILE);
     327                 :     }
     328                 : 
     329 HIT           6 :     select_impl->fd_ = fd;
     330                 : 
     331                 :     // Set up descriptor state but do NOT register with select yet
     332                 :     // (registration happens in do_listen via reactor_acceptor base)
     333               6 :     select_impl->desc_state_.fd = fd;
     334                 :     {
     335               6 :         std::lock_guard lock(select_impl->desc_state_.mutex);
     336               6 :         select_impl->desc_state_.read_op = nullptr;
     337               6 :     }
     338                 : 
     339               6 :     return {};
     340                 : }
     341                 : 
     342                 : inline std::error_code
     343               6 : select_local_stream_acceptor_service::bind_acceptor(
     344                 :     local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
     345                 : {
     346               6 :     return static_cast<select_local_stream_acceptor*>(&impl)->do_bind(ep);
     347                 : }
     348                 : 
     349                 : inline std::error_code
     350               2 : select_local_stream_acceptor_service::listen_acceptor(
     351                 :     local_stream_acceptor::implementation& impl, int backlog)
     352                 : {
     353               2 :     return static_cast<select_local_stream_acceptor*>(&impl)->do_listen(backlog);
     354                 : }
     355                 : 
     356                 : } // namespace boost::corosio::detail
     357                 : 
     358                 : #endif // BOOST_COROSIO_HAS_SELECT
     359                 : 
     360                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3