TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26 :
27 : #include <memory>
28 : #include <mutex>
29 : #include <utility>
30 :
31 : #include <errno.h>
32 : #include <netinet/in.h>
33 : #include <sys/epoll.h>
34 : #include <sys/socket.h>
35 : #include <unistd.h>
36 :
37 : namespace boost::corosio::detail {
38 :
39 : /** epoll acceptor service implementation.
40 :
41 : Derives from reactor_acceptor_service for shared construct/
42 : destroy/shutdown/close logic. Provides epoll-specific socket
43 : creation (SOCK_NONBLOCK | SOCK_CLOEXEC) and dual-stack defaults.
44 : Uses key_type = tcp_acceptor_service for service lookup.
45 : */
46 : class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
47 : : public reactor_acceptor_service<
48 : epoll_tcp_acceptor_service,
49 : tcp_acceptor_service,
50 : epoll_scheduler,
51 : epoll_tcp_acceptor,
52 : epoll_tcp_service>
53 : {
54 : using base_type = reactor_acceptor_service<
55 : epoll_tcp_acceptor_service,
56 : tcp_acceptor_service,
57 : epoll_scheduler,
58 : epoll_tcp_acceptor,
59 : epoll_tcp_service>;
60 : friend base_type;
61 :
62 : public:
63 : explicit epoll_tcp_acceptor_service(capy::execution_context& ctx);
64 : ~epoll_tcp_acceptor_service() override;
65 :
66 : std::error_code open_acceptor_socket(
67 : tcp_acceptor::implementation& impl,
68 : int family,
69 : int type,
70 : int protocol) override;
71 : std::error_code
72 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 : std::error_code
74 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75 : };
76 :
77 : inline void
78 HIT 6 : epoll_accept_op::cancel() noexcept
79 : {
80 6 : if (acceptor_impl_)
81 6 : acceptor_impl_->cancel_single_op(*this);
82 : else
83 MIS 0 : request_cancel();
84 HIT 6 : }
85 :
86 : inline void
87 3404 : epoll_accept_op::operator()()
88 : {
89 3404 : complete_accept_op<epoll_tcp_socket>(*this);
90 3404 : }
91 :
92 84 : inline epoll_tcp_acceptor::epoll_tcp_acceptor(
93 84 : epoll_tcp_acceptor_service& svc) noexcept
94 84 : : reactor_acceptor(svc)
95 : {
96 84 : }
97 :
98 : inline std::coroutine_handle<>
99 3404 : epoll_tcp_acceptor::accept(
100 : std::coroutine_handle<> h,
101 : capy::executor_ref ex,
102 : std::stop_token token,
103 : std::error_code* ec,
104 : io_object::implementation** impl_out)
105 : {
106 3404 : auto& op = acc_;
107 3404 : op.reset();
108 3404 : op.h = h;
109 3404 : op.ex = ex;
110 3404 : op.ec_out = ec;
111 3404 : op.impl_out = impl_out;
112 3404 : op.fd = fd_;
113 3404 : op.start(token, this);
114 :
115 3404 : sockaddr_storage peer_storage{};
116 : socklen_t addrlen;
117 : int accepted;
118 : do
119 : {
120 3404 : addrlen = sizeof(peer_storage);
121 3404 : accepted = ::accept4(
122 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
123 : SOCK_NONBLOCK | SOCK_CLOEXEC);
124 : }
125 3404 : while (accepted < 0 && errno == EINTR);
126 :
127 3404 : if (accepted >= 0)
128 : {
129 : {
130 3 : std::lock_guard lock(desc_state_.mutex);
131 3 : desc_state_.read_ready = false;
132 3 : }
133 :
134 3 : if (svc_.scheduler().try_consume_inline_budget())
135 : {
136 MIS 0 : auto* socket_svc = svc_.stream_service();
137 0 : if (socket_svc)
138 : {
139 : auto& impl =
140 0 : static_cast<epoll_tcp_socket&>(*socket_svc->construct());
141 0 : impl.set_socket(accepted);
142 :
143 0 : impl.desc_state_.fd = accepted;
144 : {
145 0 : std::lock_guard lock(impl.desc_state_.mutex);
146 0 : impl.desc_state_.read_op = nullptr;
147 0 : impl.desc_state_.write_op = nullptr;
148 0 : impl.desc_state_.connect_op = nullptr;
149 0 : }
150 0 : socket_svc->scheduler().register_descriptor(
151 : accepted, &impl.desc_state_);
152 :
153 0 : impl.set_endpoints(
154 : local_endpoint_, from_sockaddr(peer_storage));
155 :
156 0 : *ec = {};
157 0 : if (impl_out)
158 0 : *impl_out = &impl;
159 : }
160 : else
161 : {
162 0 : ::close(accepted);
163 0 : *ec = make_err(ENOENT);
164 0 : if (impl_out)
165 0 : *impl_out = nullptr;
166 : }
167 0 : op.cont_op.cont.h = h;
168 0 : return dispatch_coro(ex, op.cont_op.cont);
169 : }
170 :
171 HIT 3 : op.accepted_fd = accepted;
172 3 : op.peer_storage = peer_storage;
173 3 : op.peer_addrlen = addrlen;
174 3 : op.complete(0, 0);
175 3 : op.impl_ptr = shared_from_this();
176 3 : svc_.post(&op);
177 3 : return std::noop_coroutine();
178 : }
179 :
180 3401 : if (errno == EAGAIN || errno == EWOULDBLOCK)
181 : {
182 3401 : op.impl_ptr = shared_from_this();
183 3401 : svc_.work_started();
184 :
185 3401 : std::lock_guard lock(desc_state_.mutex);
186 3401 : bool io_done = false;
187 3401 : if (desc_state_.read_ready)
188 : {
189 MIS 0 : desc_state_.read_ready = false;
190 0 : op.perform_io();
191 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
192 0 : if (!io_done)
193 0 : op.errn = 0;
194 : }
195 :
196 HIT 3401 : if (io_done || op.cancelled.load(std::memory_order_acquire))
197 : {
198 MIS 0 : svc_.post(&op);
199 0 : svc_.work_finished();
200 : }
201 : else
202 : {
203 HIT 3401 : desc_state_.read_op = &op;
204 : }
205 3401 : return std::noop_coroutine();
206 3401 : }
207 :
208 MIS 0 : op.complete(errno, 0);
209 0 : op.impl_ptr = shared_from_this();
210 0 : svc_.post(&op);
211 : // completion is always posted to scheduler queue, never inline.
212 0 : return std::noop_coroutine();
213 : }
214 :
215 : inline void
216 HIT 2 : epoll_tcp_acceptor::cancel() noexcept
217 : {
218 2 : do_cancel();
219 2 : }
220 :
221 : inline void
222 332 : epoll_tcp_acceptor::close_socket() noexcept
223 : {
224 332 : do_close_socket();
225 332 : }
226 :
227 356 : inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
228 356 : capy::execution_context& ctx)
229 356 : : base_type(ctx)
230 : {
231 356 : auto* svc = ctx_.find_service<detail::tcp_service>();
232 356 : stream_svc_ = svc ? dynamic_cast<epoll_tcp_service*>(svc) : nullptr;
233 356 : }
234 :
235 712 : inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
236 :
237 : inline std::error_code
238 82 : epoll_tcp_acceptor_service::open_acceptor_socket(
239 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
240 : {
241 82 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
242 82 : epoll_impl->close_socket();
243 :
244 82 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
245 82 : if (fd < 0)
246 MIS 0 : return make_err(errno);
247 :
248 HIT 82 : if (family == AF_INET6)
249 : {
250 8 : int val = 0; // dual-stack default
251 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
252 : }
253 :
254 82 : epoll_impl->fd_ = fd;
255 :
256 : // Set up descriptor state but do NOT register with epoll yet
257 82 : epoll_impl->desc_state_.fd = fd;
258 : {
259 82 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
260 82 : epoll_impl->desc_state_.read_op = nullptr;
261 82 : }
262 :
263 82 : return {};
264 : }
265 :
266 : inline std::error_code
267 81 : epoll_tcp_acceptor_service::bind_acceptor(
268 : tcp_acceptor::implementation& impl, endpoint ep)
269 : {
270 81 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
271 : }
272 :
273 : inline std::error_code
274 76 : epoll_tcp_acceptor_service::listen_acceptor(
275 : tcp_acceptor::implementation& impl, int backlog)
276 : {
277 76 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
278 : }
279 :
280 : } // namespace boost::corosio::detail
281 :
282 : #endif // BOOST_COROSIO_HAS_EPOLL
283 :
284 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
|