include/boost/corosio/native/detail/epoll/epoll_local_stream_acceptor_service.hpp

50.9% Lines (59/116) 75.0% List of functions (9/12)
epoll_local_stream_acceptor_service.hpp
f(x) Functions (12)
Function Calls Lines Blocks
boost::corosio::detail::epoll_local_accept_op::cancel() :79 0 0.0% 0.0% boost::corosio::detail::epoll_local_accept_op::operator()() :88 2x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor::epoll_local_stream_acceptor(boost::corosio::detail::epoll_local_stream_acceptor_service&) :93 6x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :100 2x 34.3% 30.0% boost::corosio::detail::epoll_local_stream_acceptor::cancel() :221 0 0.0% 0.0% boost::corosio::detail::epoll_local_stream_acceptor::close_socket() :227 24x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor::release_socket() :233 0 0.0% 0.0% boost::corosio::detail::epoll_local_stream_acceptor_service::epoll_local_stream_acceptor_service(boost::capy::execution_context&) :238 356x 100.0% 73.0% boost::corosio::detail::epoll_local_stream_acceptor_service::~epoll_local_stream_acceptor_service() :250 712x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor_service::open_acceptor_socket(boost::corosio::local_stream_acceptor::implementation&, int, int, int) :254 6x 91.7% 91.0% boost::corosio::detail::epoll_local_stream_acceptor_service::bind_acceptor(boost::corosio::local_stream_acceptor::implementation&, boost::corosio::local_endpoint) :280 6x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor_service::listen_acceptor(boost::corosio::local_stream_acceptor::implementation&, int) :287 2x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_local_stream_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_local_stream_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <cassert>
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <sys/socket.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36
37 namespace boost::corosio::detail {
38
39 /* epoll local stream acceptor service implementation.
40
41 Inherits from local_stream_acceptor_service to enable runtime
42 polymorphism. Uses key_type = local_stream_acceptor_service
43 for service lookup.
44 */
45 class BOOST_COROSIO_DECL epoll_local_stream_acceptor_service final
46 : public reactor_acceptor_service<
47 epoll_local_stream_acceptor_service,
48 local_stream_acceptor_service,
49 epoll_scheduler,
50 epoll_local_stream_acceptor,
51 epoll_local_stream_service>
52 {
53 using base_type = reactor_acceptor_service<
54 epoll_local_stream_acceptor_service,
55 local_stream_acceptor_service,
56 epoll_scheduler,
57 epoll_local_stream_acceptor,
58 epoll_local_stream_service>;
59 friend base_type;
60
61 public:
62 explicit epoll_local_stream_acceptor_service(capy::execution_context& ctx);
63 ~epoll_local_stream_acceptor_service() override;
64
65 std::error_code open_acceptor_socket(
66 local_stream_acceptor::implementation& impl,
67 int family,
68 int type,
69 int protocol) override;
70 std::error_code bind_acceptor(
71 local_stream_acceptor::implementation& impl,
72 corosio::local_endpoint ep) override;
73 std::error_code listen_acceptor(
74 local_stream_acceptor::implementation& impl,
75 int backlog) override;
76 };
77
78 inline void
79 epoll_local_accept_op::cancel() noexcept
80 {
81 if (acceptor_impl_)
82 acceptor_impl_->cancel_single_op(*this);
83 else
84 request_cancel();
85 }
86
87 inline void
88 2x epoll_local_accept_op::operator()()
89 {
90 2x complete_accept_op<epoll_local_stream_socket>(*this);
91 2x }
92
93 6x inline epoll_local_stream_acceptor::epoll_local_stream_acceptor(
94 6x epoll_local_stream_acceptor_service& svc) noexcept
95 6x : reactor_acceptor(svc)
96 {
97 6x }
98
99 inline std::coroutine_handle<>
100 2x epoll_local_stream_acceptor::accept(
101 std::coroutine_handle<> h,
102 capy::executor_ref ex,
103 std::stop_token token,
104 std::error_code* ec,
105 io_object::implementation** impl_out)
106 {
107 2x auto& op = acc_;
108 2x op.reset();
109 2x op.h = h;
110 2x op.ex = ex;
111 2x op.ec_out = ec;
112 2x op.impl_out = impl_out;
113 2x op.fd = fd_;
114 2x op.start(token, this);
115
116 2x sockaddr_storage peer_storage{};
117 socklen_t addrlen;
118 int accepted;
119 do
120 {
121 2x addrlen = sizeof(peer_storage);
122 2x accepted = ::accept4(
123 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
124 SOCK_NONBLOCK | SOCK_CLOEXEC);
125 }
126 2x while (accepted < 0 && errno == EINTR);
127
128 2x if (accepted >= 0)
129 {
130 {
131 std::lock_guard lock(desc_state_.mutex);
132 desc_state_.read_ready = false;
133 }
134
135 if (svc_.scheduler().try_consume_inline_budget())
136 {
137 auto* socket_svc = svc_.stream_service();
138 if (socket_svc)
139 {
140 auto& impl =
141 static_cast<epoll_local_stream_socket&>(
142 *socket_svc->construct());
143 impl.set_socket(accepted);
144
145 impl.desc_state_.fd = accepted;
146 {
147 std::lock_guard lock(impl.desc_state_.mutex);
148 impl.desc_state_.read_op = nullptr;
149 impl.desc_state_.write_op = nullptr;
150 impl.desc_state_.connect_op = nullptr;
151 }
152 socket_svc->scheduler().register_descriptor(
153 accepted, &impl.desc_state_);
154
155 impl.set_endpoints(
156 local_endpoint_,
157 from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
158
159 if (ec)
160 *ec = {};
161 if (impl_out)
162 *impl_out = &impl;
163 }
164 else
165 {
166 ::close(accepted);
167 if (ec)
168 *ec = make_err(ENOENT);
169 if (impl_out)
170 *impl_out = nullptr;
171 }
172 op.cont_op.cont.h = h;
173 return dispatch_coro(ex, op.cont_op.cont);
174 }
175
176 op.accepted_fd = accepted;
177 op.peer_storage = peer_storage;
178 op.peer_addrlen = addrlen;
179 op.complete(0, 0);
180 op.impl_ptr = shared_from_this();
181 svc_.post(&op);
182 return std::noop_coroutine();
183 }
184
185 2x if (errno == EAGAIN || errno == EWOULDBLOCK)
186 {
187 2x op.impl_ptr = shared_from_this();
188 2x svc_.work_started();
189
190 2x std::lock_guard lock(desc_state_.mutex);
191 2x bool io_done = false;
192 2x if (desc_state_.read_ready)
193 {
194 desc_state_.read_ready = false;
195 op.perform_io();
196 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
197 if (!io_done)
198 op.errn = 0;
199 }
200
201 2x if (io_done || op.cancelled.load(std::memory_order_acquire))
202 {
203 svc_.post(&op);
204 svc_.work_finished();
205 }
206 else
207 {
208 2x desc_state_.read_op = &op;
209 }
210 2x return std::noop_coroutine();
211 2x }
212
213 op.complete(errno, 0);
214 op.impl_ptr = shared_from_this();
215 svc_.post(&op);
216 // completion is always posted to scheduler queue, never inline.
217 return std::noop_coroutine();
218 }
219
220 inline void
221 epoll_local_stream_acceptor::cancel() noexcept
222 {
223 do_cancel();
224 }
225
226 inline void
227 24x epoll_local_stream_acceptor::close_socket() noexcept
228 {
229 24x do_close_socket();
230 24x }
231
232 inline native_handle_type
233 epoll_local_stream_acceptor::release_socket() noexcept
234 {
235 return this->do_release_socket();
236 }
237
238 356x inline epoll_local_stream_acceptor_service::
239 356x epoll_local_stream_acceptor_service(capy::execution_context& ctx)
240 356x : base_type(ctx)
241 {
242 356x auto* svc = ctx_.find_service<detail::local_stream_service>();
243 356x stream_svc_ = svc
244 356x ? dynamic_cast<epoll_local_stream_service*>(svc)
245 : nullptr;
246 356x assert(stream_svc_ &&
247 "local_stream_service must be registered before acceptor service");
248 356x }
249
250 712x inline epoll_local_stream_acceptor_service::
251 712x ~epoll_local_stream_acceptor_service() {}
252
253 inline std::error_code
254 6x epoll_local_stream_acceptor_service::open_acceptor_socket(
255 local_stream_acceptor::implementation& impl,
256 int family,
257 int type,
258 int protocol)
259 {
260 6x auto* epoll_impl = static_cast<epoll_local_stream_acceptor*>(&impl);
261 6x epoll_impl->close_socket();
262
263 6x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
264 6x if (fd < 0)
265 return make_err(errno);
266
267 6x epoll_impl->fd_ = fd;
268
269 // Set up descriptor state but do NOT register with epoll yet
270 6x epoll_impl->desc_state_.fd = fd;
271 {
272 6x std::lock_guard lock(epoll_impl->desc_state_.mutex);
273 6x epoll_impl->desc_state_.read_op = nullptr;
274 6x }
275
276 6x return {};
277 }
278
279 inline std::error_code
280 6x epoll_local_stream_acceptor_service::bind_acceptor(
281 local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
282 {
283 6x return static_cast<epoll_local_stream_acceptor*>(&impl)->do_bind(ep);
284 }
285
286 inline std::error_code
287 2x epoll_local_stream_acceptor_service::listen_acceptor(
288 local_stream_acceptor::implementation& impl, int backlog)
289 {
290 2x return static_cast<epoll_local_stream_acceptor*>(&impl)->do_listen(backlog);
291 }
292
293
294 } // namespace boost::corosio::detail
295
296 #endif // BOOST_COROSIO_HAS_EPOLL
297
298 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
299