include/boost/corosio/native/detail/epoll/epoll_tcp_acceptor_service.hpp

69.4% Lines (77/111) 100.0% List of functions (11/11)
epoll_tcp_acceptor_service.hpp
f(x) Functions (11)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <memory>
28 #include <mutex>
29 #include <utility>
30
31 #include <errno.h>
32 #include <netinet/in.h>
33 #include <sys/epoll.h>
34 #include <sys/socket.h>
35 #include <unistd.h>
36
37 namespace boost::corosio::detail {
38
39 /** epoll acceptor service implementation.
40
41 Derives from reactor_acceptor_service for shared construct/
42 destroy/shutdown/close logic. Provides epoll-specific socket
43 creation (SOCK_NONBLOCK | SOCK_CLOEXEC) and dual-stack defaults.
44 Uses key_type = tcp_acceptor_service for service lookup.
45 */
46 class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
47 : public reactor_acceptor_service<
48 epoll_tcp_acceptor_service,
49 tcp_acceptor_service,
50 epoll_scheduler,
51 epoll_tcp_acceptor,
52 epoll_tcp_service>
53 {
54 using base_type = reactor_acceptor_service<
55 epoll_tcp_acceptor_service,
56 tcp_acceptor_service,
57 epoll_scheduler,
58 epoll_tcp_acceptor,
59 epoll_tcp_service>;
60 friend base_type;
61
62 public:
63 explicit epoll_tcp_acceptor_service(capy::execution_context& ctx);
64 ~epoll_tcp_acceptor_service() override;
65
66 std::error_code open_acceptor_socket(
67 tcp_acceptor::implementation& impl,
68 int family,
69 int type,
70 int protocol) override;
71 std::error_code
72 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 std::error_code
74 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75 };
76
77 inline void
78 6x epoll_accept_op::cancel() noexcept
79 {
80 6x if (acceptor_impl_)
81 6x acceptor_impl_->cancel_single_op(*this);
82 else
83 request_cancel();
84 6x }
85
86 inline void
87 3404x epoll_accept_op::operator()()
88 {
89 3404x complete_accept_op<epoll_tcp_socket>(*this);
90 3404x }
91
92 84x inline epoll_tcp_acceptor::epoll_tcp_acceptor(
93 84x epoll_tcp_acceptor_service& svc) noexcept
94 84x : reactor_acceptor(svc)
95 {
96 84x }
97
98 inline std::coroutine_handle<>
99 3404x epoll_tcp_acceptor::accept(
100 std::coroutine_handle<> h,
101 capy::executor_ref ex,
102 std::stop_token token,
103 std::error_code* ec,
104 io_object::implementation** impl_out)
105 {
106 3404x auto& op = acc_;
107 3404x op.reset();
108 3404x op.h = h;
109 3404x op.ex = ex;
110 3404x op.ec_out = ec;
111 3404x op.impl_out = impl_out;
112 3404x op.fd = fd_;
113 3404x op.start(token, this);
114
115 3404x sockaddr_storage peer_storage{};
116 socklen_t addrlen;
117 int accepted;
118 do
119 {
120 3404x addrlen = sizeof(peer_storage);
121 3404x accepted = ::accept4(
122 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
123 SOCK_NONBLOCK | SOCK_CLOEXEC);
124 }
125 3404x while (accepted < 0 && errno == EINTR);
126
127 3404x if (accepted >= 0)
128 {
129 {
130 3x std::lock_guard lock(desc_state_.mutex);
131 3x desc_state_.read_ready = false;
132 3x }
133
134 3x if (svc_.scheduler().try_consume_inline_budget())
135 {
136 auto* socket_svc = svc_.stream_service();
137 if (socket_svc)
138 {
139 auto& impl =
140 static_cast<epoll_tcp_socket&>(*socket_svc->construct());
141 impl.set_socket(accepted);
142
143 impl.desc_state_.fd = accepted;
144 {
145 std::lock_guard lock(impl.desc_state_.mutex);
146 impl.desc_state_.read_op = nullptr;
147 impl.desc_state_.write_op = nullptr;
148 impl.desc_state_.connect_op = nullptr;
149 }
150 socket_svc->scheduler().register_descriptor(
151 accepted, &impl.desc_state_);
152
153 impl.set_endpoints(
154 local_endpoint_, from_sockaddr(peer_storage));
155
156 *ec = {};
157 if (impl_out)
158 *impl_out = &impl;
159 }
160 else
161 {
162 ::close(accepted);
163 *ec = make_err(ENOENT);
164 if (impl_out)
165 *impl_out = nullptr;
166 }
167 op.cont_op.cont.h = h;
168 return dispatch_coro(ex, op.cont_op.cont);
169 }
170
171 3x op.accepted_fd = accepted;
172 3x op.peer_storage = peer_storage;
173 3x op.peer_addrlen = addrlen;
174 3x op.complete(0, 0);
175 3x op.impl_ptr = shared_from_this();
176 3x svc_.post(&op);
177 3x return std::noop_coroutine();
178 }
179
180 3401x if (errno == EAGAIN || errno == EWOULDBLOCK)
181 {
182 3401x op.impl_ptr = shared_from_this();
183 3401x svc_.work_started();
184
185 3401x std::lock_guard lock(desc_state_.mutex);
186 3401x bool io_done = false;
187 3401x if (desc_state_.read_ready)
188 {
189 desc_state_.read_ready = false;
190 op.perform_io();
191 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
192 if (!io_done)
193 op.errn = 0;
194 }
195
196 3401x if (io_done || op.cancelled.load(std::memory_order_acquire))
197 {
198 svc_.post(&op);
199 svc_.work_finished();
200 }
201 else
202 {
203 3401x desc_state_.read_op = &op;
204 }
205 3401x return std::noop_coroutine();
206 3401x }
207
208 op.complete(errno, 0);
209 op.impl_ptr = shared_from_this();
210 svc_.post(&op);
211 // completion is always posted to scheduler queue, never inline.
212 return std::noop_coroutine();
213 }
214
215 inline void
216 2x epoll_tcp_acceptor::cancel() noexcept
217 {
218 2x do_cancel();
219 2x }
220
221 inline void
222 332x epoll_tcp_acceptor::close_socket() noexcept
223 {
224 332x do_close_socket();
225 332x }
226
227 356x inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
228 356x capy::execution_context& ctx)
229 356x : base_type(ctx)
230 {
231 356x auto* svc = ctx_.find_service<detail::tcp_service>();
232 356x stream_svc_ = svc ? dynamic_cast<epoll_tcp_service*>(svc) : nullptr;
233 356x }
234
235 712x inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
236
237 inline std::error_code
238 82x epoll_tcp_acceptor_service::open_acceptor_socket(
239 tcp_acceptor::implementation& impl, int family, int type, int protocol)
240 {
241 82x auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
242 82x epoll_impl->close_socket();
243
244 82x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
245 82x if (fd < 0)
246 return make_err(errno);
247
248 82x if (family == AF_INET6)
249 {
250 8x int val = 0; // dual-stack default
251 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
252 }
253
254 82x epoll_impl->fd_ = fd;
255
256 // Set up descriptor state but do NOT register with epoll yet
257 82x epoll_impl->desc_state_.fd = fd;
258 {
259 82x std::lock_guard lock(epoll_impl->desc_state_.mutex);
260 82x epoll_impl->desc_state_.read_op = nullptr;
261 82x }
262
263 82x return {};
264 }
265
266 inline std::error_code
267 81x epoll_tcp_acceptor_service::bind_acceptor(
268 tcp_acceptor::implementation& impl, endpoint ep)
269 {
270 81x return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
271 }
272
273 inline std::error_code
274 76x epoll_tcp_acceptor_service::listen_acceptor(
275 tcp_acceptor::implementation& impl, int backlog)
276 {
277 76x return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
278 }
279
280 } // namespace boost::corosio::detail
281
282 #endif // BOOST_COROSIO_HAS_EPOLL
283
284 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
285