include/boost/corosio/native/detail/epoll/epoll_local_stream_acceptor_service.hpp

50.9% Lines (59/116) 75.0% List of functions (9/12)
epoll_local_stream_acceptor_service.hpp
f(x) Functions (12)
Function Calls Lines Blocks
boost::corosio::detail::epoll_local_accept_op::cancel() :78 0 0.0% 0.0% boost::corosio::detail::epoll_local_accept_op::operator()() :87 2x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor::epoll_local_stream_acceptor(boost::corosio::detail::epoll_local_stream_acceptor_service&) :92 6x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :99 2x 34.3% 30.0% boost::corosio::detail::epoll_local_stream_acceptor::cancel() :219 0 0.0% 0.0% boost::corosio::detail::epoll_local_stream_acceptor::close_socket() :225 24x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor::release_socket() :231 0 0.0% 0.0% boost::corosio::detail::epoll_local_stream_acceptor_service::epoll_local_stream_acceptor_service(boost::capy::execution_context&) :236 356x 100.0% 73.0% boost::corosio::detail::epoll_local_stream_acceptor_service::~epoll_local_stream_acceptor_service() :248 712x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor_service::open_acceptor_socket(boost::corosio::local_stream_acceptor::implementation&, int, int, int) :252 6x 91.7% 91.0% boost::corosio::detail::epoll_local_stream_acceptor_service::bind_acceptor(boost::corosio::local_stream_acceptor::implementation&, boost::corosio::local_endpoint) :278 6x 100.0% 100.0% boost::corosio::detail::epoll_local_stream_acceptor_service::listen_acceptor(boost::corosio::local_stream_acceptor::implementation&, int) :285 2x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_local_stream_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_local_stream_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <memory>
28 #include <mutex>
29 #include <utility>
30
31 #include <errno.h>
32 #include <sys/socket.h>
33 #include <sys/un.h>
34 #include <unistd.h>
35
36 namespace boost::corosio::detail {
37
38 /* epoll local stream acceptor service implementation.
39
40 Inherits from local_stream_acceptor_service to enable runtime
41 polymorphism. Uses key_type = local_stream_acceptor_service
42 for service lookup.
43 */
44 class BOOST_COROSIO_DECL epoll_local_stream_acceptor_service final
45 : public reactor_acceptor_service<
46 epoll_local_stream_acceptor_service,
47 local_stream_acceptor_service,
48 epoll_scheduler,
49 epoll_local_stream_acceptor,
50 epoll_local_stream_service>
51 {
52 using base_type = reactor_acceptor_service<
53 epoll_local_stream_acceptor_service,
54 local_stream_acceptor_service,
55 epoll_scheduler,
56 epoll_local_stream_acceptor,
57 epoll_local_stream_service>;
58 friend base_type;
59
60 public:
61 explicit epoll_local_stream_acceptor_service(capy::execution_context& ctx);
62 ~epoll_local_stream_acceptor_service() override;
63
64 std::error_code open_acceptor_socket(
65 local_stream_acceptor::implementation& impl,
66 int family,
67 int type,
68 int protocol) override;
69 std::error_code bind_acceptor(
70 local_stream_acceptor::implementation& impl,
71 corosio::local_endpoint ep) override;
72 std::error_code listen_acceptor(
73 local_stream_acceptor::implementation& impl,
74 int backlog) override;
75 };
76
77 inline void
78 epoll_local_accept_op::cancel() noexcept
79 {
80 if (acceptor_impl_)
81 acceptor_impl_->cancel_single_op(*this);
82 else
83 request_cancel();
84 }
85
86 inline void
87 2x epoll_local_accept_op::operator()()
88 {
89 2x complete_accept_op<epoll_local_stream_socket>(*this);
90 2x }
91
92 6x inline epoll_local_stream_acceptor::epoll_local_stream_acceptor(
93 6x epoll_local_stream_acceptor_service& svc) noexcept
94 6x : reactor_acceptor(svc)
95 {
96 6x }
97
98 inline std::coroutine_handle<>
99 2x epoll_local_stream_acceptor::accept(
100 std::coroutine_handle<> h,
101 capy::executor_ref ex,
102 std::stop_token token,
103 std::error_code* ec,
104 io_object::implementation** impl_out)
105 {
106 2x auto& op = acc_;
107 2x op.reset();
108 2x op.h = h;
109 2x op.ex = ex;
110 2x op.ec_out = ec;
111 2x op.impl_out = impl_out;
112 2x op.fd = fd_;
113 2x op.start(token, this);
114
115 2x sockaddr_storage peer_storage{};
116 2x socklen_t addrlen = sizeof(peer_storage);
117 int accepted;
118 do
119 {
120 2x accepted = ::accept4(
121 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
122 SOCK_NONBLOCK | SOCK_CLOEXEC);
123 }
124 2x while (accepted < 0 && errno == EINTR);
125
126 2x if (accepted >= 0)
127 {
128 {
129 std::lock_guard lock(desc_state_.mutex);
130 desc_state_.read_ready = false;
131 }
132
133 if (svc_.scheduler().try_consume_inline_budget())
134 {
135 auto* socket_svc = svc_.stream_service();
136 if (socket_svc)
137 {
138 auto& impl =
139 static_cast<epoll_local_stream_socket&>(
140 *socket_svc->construct());
141 impl.set_socket(accepted);
142
143 impl.desc_state_.fd = accepted;
144 {
145 std::lock_guard lock(impl.desc_state_.mutex);
146 impl.desc_state_.read_op = nullptr;
147 impl.desc_state_.write_op = nullptr;
148 impl.desc_state_.connect_op = nullptr;
149 }
150 socket_svc->scheduler().register_descriptor(
151 accepted, &impl.desc_state_);
152
153 impl.set_endpoints(
154 local_endpoint_,
155 from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
156
157 if (ec)
158 *ec = {};
159 if (impl_out)
160 *impl_out = &impl;
161 }
162 else
163 {
164 ::close(accepted);
165 if (ec)
166 *ec = make_err(ENOENT);
167 if (impl_out)
168 *impl_out = nullptr;
169 }
170 op.cont_op.cont.h = h;
171 return dispatch_coro(ex, op.cont_op.cont);
172 }
173
174 op.accepted_fd = accepted;
175 op.peer_storage = peer_storage;
176 op.peer_addrlen = addrlen;
177 op.complete(0, 0);
178 op.impl_ptr = shared_from_this();
179 svc_.post(&op);
180 return std::noop_coroutine();
181 }
182
183 2x if (errno == EAGAIN || errno == EWOULDBLOCK)
184 {
185 2x op.impl_ptr = shared_from_this();
186 2x svc_.work_started();
187
188 2x std::lock_guard lock(desc_state_.mutex);
189 2x bool io_done = false;
190 2x if (desc_state_.read_ready)
191 {
192 desc_state_.read_ready = false;
193 op.perform_io();
194 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
195 if (!io_done)
196 op.errn = 0;
197 }
198
199 2x if (io_done || op.cancelled.load(std::memory_order_acquire))
200 {
201 svc_.post(&op);
202 svc_.work_finished();
203 }
204 else
205 {
206 2x desc_state_.read_op = &op;
207 }
208 2x return std::noop_coroutine();
209 2x }
210
211 op.complete(errno, 0);
212 op.impl_ptr = shared_from_this();
213 svc_.post(&op);
214 // completion is always posted to scheduler queue, never inline.
215 return std::noop_coroutine();
216 }
217
218 inline void
219 epoll_local_stream_acceptor::cancel() noexcept
220 {
221 do_cancel();
222 }
223
224 inline void
225 24x epoll_local_stream_acceptor::close_socket() noexcept
226 {
227 24x do_close_socket();
228 24x }
229
230 inline native_handle_type
231 epoll_local_stream_acceptor::release_socket() noexcept
232 {
233 return this->do_release_socket();
234 }
235
236 356x inline epoll_local_stream_acceptor_service::
237 356x epoll_local_stream_acceptor_service(capy::execution_context& ctx)
238 356x : base_type(ctx)
239 {
240 356x auto* svc = ctx_.find_service<detail::local_stream_service>();
241 356x stream_svc_ = svc
242 356x ? dynamic_cast<epoll_local_stream_service*>(svc)
243 : nullptr;
244 356x assert(stream_svc_ &&
245 "local_stream_service must be registered before acceptor service");
246 356x }
247
248 712x inline epoll_local_stream_acceptor_service::
249 712x ~epoll_local_stream_acceptor_service() {}
250
251 inline std::error_code
252 6x epoll_local_stream_acceptor_service::open_acceptor_socket(
253 local_stream_acceptor::implementation& impl,
254 int family,
255 int type,
256 int protocol)
257 {
258 6x auto* epoll_impl = static_cast<epoll_local_stream_acceptor*>(&impl);
259 6x epoll_impl->close_socket();
260
261 6x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
262 6x if (fd < 0)
263 return make_err(errno);
264
265 6x epoll_impl->fd_ = fd;
266
267 // Set up descriptor state but do NOT register with epoll yet
268 6x epoll_impl->desc_state_.fd = fd;
269 {
270 6x std::lock_guard lock(epoll_impl->desc_state_.mutex);
271 6x epoll_impl->desc_state_.read_op = nullptr;
272 6x }
273
274 6x return {};
275 }
276
277 inline std::error_code
278 6x epoll_local_stream_acceptor_service::bind_acceptor(
279 local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
280 {
281 6x return static_cast<epoll_local_stream_acceptor*>(&impl)->do_bind(ep);
282 }
283
284 inline std::error_code
285 2x epoll_local_stream_acceptor_service::listen_acceptor(
286 local_stream_acceptor::implementation& impl, int backlog)
287 {
288 2x return static_cast<epoll_local_stream_acceptor*>(&impl)->do_listen(backlog);
289 }
290
291
292 } // namespace boost::corosio::detail
293
294 #endif // BOOST_COROSIO_HAS_EPOLL
295
296 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
297