LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_local_stream_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 40.5 % 158 64 94
Test Date: 2026-04-15 17:16:49 Functions: 76.9 % 13 10 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_local_stream_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_local_stream_service.hpp>
      23                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      24                 : #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
      25                 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
      26                 : 
      27                 : #include <cassert>
      28                 : #include <memory>
      29                 : #include <mutex>
      30                 : #include <utility>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <fcntl.h>
      34                 : #include <sys/select.h>
      35                 : #include <sys/socket.h>
      36                 : #include <sys/un.h>
      37                 : #include <unistd.h>
      38                 : 
      39                 : namespace boost::corosio::detail {
      40                 : 
      41                 : /* select local stream acceptor service implementation.
      42                 : 
      43                 :    Inherits from local_stream_acceptor_service to enable runtime
      44                 :    polymorphism. Uses key_type = local_stream_acceptor_service
      45                 :    for service lookup.
      46                 : */
      47                 : class BOOST_COROSIO_DECL select_local_stream_acceptor_service final
      48                 :     : public reactor_acceptor_service<
      49                 :           select_local_stream_acceptor_service,
      50                 :           local_stream_acceptor_service,
      51                 :           select_scheduler,
      52                 :           select_local_stream_acceptor,
      53                 :           select_local_stream_service>
      54                 : {
      55                 :     using base_type = reactor_acceptor_service<
      56                 :         select_local_stream_acceptor_service,
      57                 :         local_stream_acceptor_service,
      58                 :         select_scheduler,
      59                 :         select_local_stream_acceptor,
      60                 :         select_local_stream_service>;
      61                 :     friend base_type;
      62                 : 
      63                 : public:
      64                 :     explicit select_local_stream_acceptor_service(
      65                 :         capy::execution_context& ctx);
      66                 :     ~select_local_stream_acceptor_service() override;
      67                 : 
      68                 :     std::error_code open_acceptor_socket(
      69                 :         local_stream_acceptor::implementation& impl,
      70                 :         int family,
      71                 :         int type,
      72                 :         int protocol) override;
      73                 :     std::error_code bind_acceptor(
      74                 :         local_stream_acceptor::implementation& impl,
      75                 :         corosio::local_endpoint ep) override;
      76                 :     std::error_code listen_acceptor(
      77                 :         local_stream_acceptor::implementation& impl,
      78                 :         int backlog) override;
      79                 : };
      80                 : 
      81                 : inline void
      82 MIS           0 : select_local_accept_op::cancel() noexcept
      83                 : {
      84               0 :     if (acceptor_impl_)
      85               0 :         acceptor_impl_->cancel_single_op(*this);
      86                 :     else
      87               0 :         request_cancel();
      88               0 : }
      89                 : 
      90                 : inline void
      91 HIT           2 : select_local_accept_op::operator()()
      92                 : {
      93               2 :     complete_accept_op<select_local_stream_socket>(*this);
      94               2 : }
      95                 : 
      96               6 : inline select_local_stream_acceptor::select_local_stream_acceptor(
      97               6 :     select_local_stream_acceptor_service& svc) noexcept
      98               6 :     : reactor_acceptor(svc)
      99                 : {
     100               6 : }
     101                 : 
     102                 : inline std::coroutine_handle<>
     103               2 : select_local_stream_acceptor::accept(
     104                 :     std::coroutine_handle<> h,
     105                 :     capy::executor_ref ex,
     106                 :     std::stop_token token,
     107                 :     std::error_code* ec,
     108                 :     io_object::implementation** impl_out)
     109                 : {
     110               2 :     auto& op = acc_;
     111               2 :     op.reset();
     112               2 :     op.h        = h;
     113               2 :     op.ex       = ex;
     114               2 :     op.ec_out   = ec;
     115               2 :     op.impl_out = impl_out;
     116               2 :     op.fd       = fd_;
     117               2 :     op.start(token, this);
     118                 : 
     119               2 :     sockaddr_storage peer_storage{};
     120                 :     socklen_t addrlen;
     121                 :     int accepted;
     122                 :     do
     123                 :     {
     124               2 :         addrlen = sizeof(peer_storage);
     125                 :         accepted =
     126               2 :             ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
     127                 :     }
     128               2 :     while (accepted < 0 && errno == EINTR);
     129                 : 
     130               2 :     if (accepted >= 0)
     131                 :     {
     132 MIS           0 :         if (accepted >= FD_SETSIZE)
     133                 :         {
     134               0 :             ::close(accepted);
     135               0 :             op.complete(EMFILE, 0);
     136               0 :             op.impl_ptr = shared_from_this();
     137               0 :             svc_.post(&op);
     138               0 :             return std::noop_coroutine();
     139                 :         }
     140                 : 
     141               0 :         int flags = ::fcntl(accepted, F_GETFL, 0);
     142               0 :         if (flags == -1)
     143                 :         {
     144               0 :             int err = errno;
     145               0 :             ::close(accepted);
     146               0 :             op.complete(err, 0);
     147               0 :             op.impl_ptr = shared_from_this();
     148               0 :             svc_.post(&op);
     149               0 :             return std::noop_coroutine();
     150                 :         }
     151                 : 
     152               0 :         if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
     153                 :         {
     154               0 :             int err = errno;
     155               0 :             ::close(accepted);
     156               0 :             op.complete(err, 0);
     157               0 :             op.impl_ptr = shared_from_this();
     158               0 :             svc_.post(&op);
     159               0 :             return std::noop_coroutine();
     160                 :         }
     161                 : 
     162               0 :         if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
     163                 :         {
     164               0 :             int err = errno;
     165               0 :             ::close(accepted);
     166               0 :             op.complete(err, 0);
     167               0 :             op.impl_ptr = shared_from_this();
     168               0 :             svc_.post(&op);
     169               0 :             return std::noop_coroutine();
     170                 :         }
     171                 : 
     172                 : #ifdef SO_NOSIGPIPE
     173                 :         {
     174                 :             int one = 1;
     175                 :             ::setsockopt(
     176                 :                 accepted, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
     177                 :         }
     178                 : #endif
     179                 : 
     180                 :         {
     181               0 :             std::lock_guard lock(desc_state_.mutex);
     182               0 :             desc_state_.read_ready = false;
     183               0 :         }
     184                 : 
     185               0 :         if (svc_.scheduler().try_consume_inline_budget())
     186                 :         {
     187               0 :             auto* socket_svc = svc_.stream_service();
     188               0 :             if (socket_svc)
     189                 :             {
     190                 :                 auto& impl =
     191                 :                     static_cast<select_local_stream_socket&>(
     192               0 :                         *socket_svc->construct());
     193               0 :                 impl.set_socket(accepted);
     194                 : 
     195               0 :                 impl.desc_state_.fd = accepted;
     196                 :                 {
     197               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     198               0 :                     impl.desc_state_.read_op    = nullptr;
     199               0 :                     impl.desc_state_.write_op   = nullptr;
     200               0 :                     impl.desc_state_.connect_op = nullptr;
     201               0 :                 }
     202               0 :                 socket_svc->scheduler().register_descriptor(
     203                 :                     accepted, &impl.desc_state_);
     204                 : 
     205               0 :                 impl.set_endpoints(
     206                 :                     local_endpoint_,
     207               0 :                     from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
     208                 : 
     209               0 :                 *ec = {};
     210               0 :                 if (impl_out)
     211               0 :                     *impl_out = &impl;
     212                 :             }
     213                 :             else
     214                 :             {
     215               0 :                 ::close(accepted);
     216               0 :                 *ec = make_err(ENOENT);
     217               0 :                 if (impl_out)
     218               0 :                     *impl_out = nullptr;
     219                 :             }
     220               0 :             op.cont_op.cont.h = h;
     221               0 :             return dispatch_coro(ex, op.cont_op.cont);
     222                 :         }
     223                 : 
     224               0 :         op.accepted_fd   = accepted;
     225               0 :         op.peer_storage  = peer_storage;
     226               0 :         op.peer_addrlen  = addrlen;
     227               0 :         op.complete(0, 0);
     228               0 :         op.impl_ptr = shared_from_this();
     229               0 :         svc_.post(&op);
     230               0 :         return std::noop_coroutine();
     231                 :     }
     232                 : 
     233 HIT           2 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     234                 :     {
     235               2 :         op.impl_ptr = shared_from_this();
     236               2 :         svc_.work_started();
     237                 : 
     238               2 :         std::lock_guard lock(desc_state_.mutex);
     239               2 :         bool io_done = false;
     240               2 :         if (desc_state_.read_ready)
     241                 :         {
     242 MIS           0 :             desc_state_.read_ready = false;
     243               0 :             op.perform_io();
     244               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     245               0 :             if (!io_done)
     246               0 :                 op.errn = 0;
     247                 :         }
     248                 : 
     249 HIT           2 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     250                 :         {
     251 MIS           0 :             svc_.post(&op);
     252               0 :             svc_.work_finished();
     253                 :         }
     254                 :         else
     255                 :         {
     256 HIT           2 :             desc_state_.read_op = &op;
     257                 :         }
     258               2 :         return std::noop_coroutine();
     259               2 :     }
     260                 : 
     261 MIS           0 :     op.complete(errno, 0);
     262               0 :     op.impl_ptr = shared_from_this();
     263               0 :     svc_.post(&op);
     264               0 :     return std::noop_coroutine();
     265                 : }
     266                 : 
     267                 : inline void
     268               0 : select_local_stream_acceptor::cancel() noexcept
     269                 : {
     270               0 :     do_cancel();
     271               0 : }
     272                 : 
     273                 : inline void
     274 HIT          24 : select_local_stream_acceptor::close_socket() noexcept
     275                 : {
     276              24 :     do_close_socket();
     277              24 : }
     278                 : 
     279                 : inline native_handle_type
     280 MIS           0 : select_local_stream_acceptor::release_socket() noexcept
     281                 : {
     282               0 :     return this->do_release_socket();
     283                 : }
     284                 : 
     285 HIT         229 : inline select_local_stream_acceptor_service::
     286             229 :     select_local_stream_acceptor_service(capy::execution_context& ctx)
     287             229 :     : base_type(ctx)
     288                 : {
     289             229 :     auto* svc = ctx_.find_service<detail::local_stream_service>();
     290             229 :     stream_svc_ = svc
     291             229 :         ? dynamic_cast<select_local_stream_service*>(svc)
     292                 :         : nullptr;
     293             229 :     assert(stream_svc_ &&
     294                 :         "local_stream_service must be registered before acceptor service");
     295             229 : }
     296                 : 
     297             458 : inline select_local_stream_acceptor_service::
     298             458 :     ~select_local_stream_acceptor_service() {}
     299                 : 
     300                 : inline std::error_code
     301               6 : select_local_stream_acceptor_service::open_acceptor_socket(
     302                 :     local_stream_acceptor::implementation& impl,
     303                 :     int family,
     304                 :     int type,
     305                 :     int protocol)
     306                 : {
     307               6 :     auto* select_impl = static_cast<select_local_stream_acceptor*>(&impl);
     308               6 :     select_impl->close_socket();
     309                 : 
     310               6 :     int fd = ::socket(family, type, protocol);
     311               6 :     if (fd < 0)
     312 MIS           0 :         return make_err(errno);
     313                 : 
     314 HIT           6 :     int flags = ::fcntl(fd, F_GETFL, 0);
     315               6 :     if (flags == -1)
     316                 :     {
     317 MIS           0 :         int errn = errno;
     318               0 :         ::close(fd);
     319               0 :         return make_err(errn);
     320                 :     }
     321 HIT           6 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     322                 :     {
     323 MIS           0 :         int errn = errno;
     324               0 :         ::close(fd);
     325               0 :         return make_err(errn);
     326                 :     }
     327 HIT           6 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     328                 :     {
     329 MIS           0 :         int errn = errno;
     330               0 :         ::close(fd);
     331               0 :         return make_err(errn);
     332                 :     }
     333                 : 
     334 HIT           6 :     if (fd >= FD_SETSIZE)
     335                 :     {
     336 MIS           0 :         ::close(fd);
     337               0 :         return make_err(EMFILE);
     338                 :     }
     339                 : 
     340 HIT           6 :     select_impl->fd_ = fd;
     341                 : 
     342                 :     // Set up descriptor state but do NOT register with select yet
     343                 :     // (registration happens in do_listen via reactor_acceptor base)
     344               6 :     select_impl->desc_state_.fd = fd;
     345                 :     {
     346               6 :         std::lock_guard lock(select_impl->desc_state_.mutex);
     347               6 :         select_impl->desc_state_.read_op = nullptr;
     348               6 :     }
     349                 : 
     350               6 :     return {};
     351                 : }
     352                 : 
     353                 : inline std::error_code
     354               6 : select_local_stream_acceptor_service::bind_acceptor(
     355                 :     local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
     356                 : {
     357               6 :     return static_cast<select_local_stream_acceptor*>(&impl)->do_bind(ep);
     358                 : }
     359                 : 
     360                 : inline std::error_code
     361               2 : select_local_stream_acceptor_service::listen_acceptor(
     362                 :     local_stream_acceptor::implementation& impl, int backlog)
     363                 : {
     364               2 :     return static_cast<select_local_stream_acceptor*>(&impl)->do_listen(backlog);
     365                 : }
     366                 : 
     367                 : } // namespace boost::corosio::detail
     368                 : 
     369                 : #endif // BOOST_COROSIO_HAS_SELECT
     370                 : 
     371                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3