TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26 :
27 : #include <memory>
28 : #include <mutex>
29 : #include <utility>
30 :
31 : #include <errno.h>
32 : #include <fcntl.h>
33 : #include <netinet/in.h>
34 : #include <sys/select.h>
35 : #include <sys/socket.h>
36 : #include <unistd.h>
37 :
38 : namespace boost::corosio::detail {
39 :
40 : /** select acceptor service implementation.
41 :
42 : Inherits from tcp_acceptor_service to enable runtime polymorphism.
43 : Uses key_type = tcp_acceptor_service for service lookup.
44 : */
45 : class BOOST_COROSIO_DECL select_tcp_acceptor_service final
46 : : public reactor_acceptor_service<
47 : select_tcp_acceptor_service,
48 : tcp_acceptor_service,
49 : select_scheduler,
50 : select_tcp_acceptor,
51 : select_tcp_service>
52 : {
53 : using base_type = reactor_acceptor_service<
54 : select_tcp_acceptor_service,
55 : tcp_acceptor_service,
56 : select_scheduler,
57 : select_tcp_acceptor,
58 : select_tcp_service>;
59 : friend base_type;
60 :
61 : public:
62 : explicit select_tcp_acceptor_service(capy::execution_context& ctx);
63 : ~select_tcp_acceptor_service() override;
64 :
65 : std::error_code open_acceptor_socket(
66 : tcp_acceptor::implementation& impl,
67 : int family,
68 : int type,
69 : int protocol) override;
70 : std::error_code
71 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
72 : std::error_code
73 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
74 : };
75 :
76 : inline void
77 MIS 0 : select_accept_op::cancel() noexcept
78 : {
79 0 : if (acceptor_impl_)
80 0 : acceptor_impl_->cancel_single_op(*this);
81 : else
82 0 : request_cancel();
83 0 : }
84 :
85 : inline void
86 HIT 3297 : select_accept_op::operator()()
87 : {
88 3297 : complete_accept_op<select_tcp_socket>(*this);
89 3297 : }
90 :
91 65 : inline select_tcp_acceptor::select_tcp_acceptor(
92 65 : select_tcp_acceptor_service& svc) noexcept
93 65 : : reactor_acceptor(svc)
94 : {
95 65 : }
96 :
97 : inline std::coroutine_handle<>
98 3297 : select_tcp_acceptor::accept(
99 : std::coroutine_handle<> h,
100 : capy::executor_ref ex,
101 : std::stop_token token,
102 : std::error_code* ec,
103 : io_object::implementation** impl_out)
104 : {
105 3297 : auto& op = acc_;
106 3297 : op.reset();
107 3297 : op.h = h;
108 3297 : op.ex = ex;
109 3297 : op.ec_out = ec;
110 3297 : op.impl_out = impl_out;
111 3297 : op.fd = fd_;
112 3297 : op.start(token, this);
113 :
114 3297 : sockaddr_storage peer_storage{};
115 3297 : socklen_t addrlen = sizeof(peer_storage);
116 : int accepted;
117 : do
118 : {
119 : accepted =
120 3297 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
121 : }
122 3297 : while (accepted < 0 && errno == EINTR);
123 :
124 3297 : if (accepted >= 0)
125 : {
126 3 : if (accepted >= FD_SETSIZE)
127 : {
128 MIS 0 : ::close(accepted);
129 0 : op.complete(EINVAL, 0);
130 0 : op.impl_ptr = shared_from_this();
131 0 : svc_.post(&op);
132 0 : return std::noop_coroutine();
133 : }
134 :
135 HIT 3 : int flags = ::fcntl(accepted, F_GETFL, 0);
136 3 : if (flags == -1)
137 : {
138 MIS 0 : int err = errno;
139 0 : ::close(accepted);
140 0 : op.complete(err, 0);
141 0 : op.impl_ptr = shared_from_this();
142 0 : svc_.post(&op);
143 0 : return std::noop_coroutine();
144 : }
145 :
146 HIT 3 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
147 : {
148 MIS 0 : int err = errno;
149 0 : ::close(accepted);
150 0 : op.complete(err, 0);
151 0 : op.impl_ptr = shared_from_this();
152 0 : svc_.post(&op);
153 0 : return std::noop_coroutine();
154 : }
155 :
156 HIT 3 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
157 : {
158 MIS 0 : int err = errno;
159 0 : ::close(accepted);
160 0 : op.complete(err, 0);
161 0 : op.impl_ptr = shared_from_this();
162 0 : svc_.post(&op);
163 0 : return std::noop_coroutine();
164 : }
165 :
166 : {
167 HIT 3 : std::lock_guard lock(desc_state_.mutex);
168 3 : desc_state_.read_ready = false;
169 3 : }
170 :
171 3 : if (svc_.scheduler().try_consume_inline_budget())
172 : {
173 MIS 0 : auto* socket_svc = svc_.stream_service();
174 0 : if (socket_svc)
175 : {
176 : auto& impl =
177 0 : static_cast<select_tcp_socket&>(*socket_svc->construct());
178 0 : impl.set_socket(accepted);
179 :
180 0 : impl.desc_state_.fd = accepted;
181 : {
182 0 : std::lock_guard lock(impl.desc_state_.mutex);
183 0 : impl.desc_state_.read_op = nullptr;
184 0 : impl.desc_state_.write_op = nullptr;
185 0 : impl.desc_state_.connect_op = nullptr;
186 0 : }
187 0 : socket_svc->scheduler().register_descriptor(
188 : accepted, &impl.desc_state_);
189 :
190 0 : impl.set_endpoints(
191 : local_endpoint_, from_sockaddr(peer_storage));
192 :
193 0 : *ec = {};
194 0 : if (impl_out)
195 0 : *impl_out = &impl;
196 : }
197 : else
198 : {
199 0 : ::close(accepted);
200 0 : *ec = make_err(ENOENT);
201 0 : if (impl_out)
202 0 : *impl_out = nullptr;
203 : }
204 0 : op.cont_op.cont.h = h;
205 0 : return dispatch_coro(ex, op.cont_op.cont);
206 : }
207 :
208 HIT 3 : op.accepted_fd = accepted;
209 3 : op.peer_storage = peer_storage;
210 3 : op.complete(0, 0);
211 3 : op.impl_ptr = shared_from_this();
212 3 : svc_.post(&op);
213 3 : return std::noop_coroutine();
214 : }
215 :
216 3294 : if (errno == EAGAIN || errno == EWOULDBLOCK)
217 : {
218 3294 : op.impl_ptr = shared_from_this();
219 3294 : svc_.work_started();
220 :
221 3294 : std::lock_guard lock(desc_state_.mutex);
222 3294 : bool io_done = false;
223 3294 : if (desc_state_.read_ready)
224 : {
225 MIS 0 : desc_state_.read_ready = false;
226 0 : op.perform_io();
227 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
228 0 : if (!io_done)
229 0 : op.errn = 0;
230 : }
231 :
232 HIT 3294 : if (io_done || op.cancelled.load(std::memory_order_acquire))
233 : {
234 MIS 0 : svc_.post(&op);
235 0 : svc_.work_finished();
236 : }
237 : else
238 : {
239 HIT 3294 : desc_state_.read_op = &op;
240 : }
241 3294 : return std::noop_coroutine();
242 3294 : }
243 :
244 MIS 0 : op.complete(errno, 0);
245 0 : op.impl_ptr = shared_from_this();
246 0 : svc_.post(&op);
247 0 : return std::noop_coroutine();
248 : }
249 :
250 : inline void
251 HIT 2 : select_tcp_acceptor::cancel() noexcept
252 : {
253 2 : do_cancel();
254 2 : }
255 :
256 : inline void
257 254 : select_tcp_acceptor::close_socket() noexcept
258 : {
259 254 : do_close_socket();
260 254 : }
261 :
262 228 : inline select_tcp_acceptor_service::select_tcp_acceptor_service(
263 228 : capy::execution_context& ctx)
264 228 : : base_type(ctx)
265 : {
266 228 : auto* svc = ctx_.find_service<detail::tcp_service>();
267 228 : stream_svc_ = svc ? dynamic_cast<select_tcp_service*>(svc) : nullptr;
268 228 : }
269 :
270 456 : inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
271 :
272 : inline std::error_code
273 62 : select_tcp_acceptor_service::open_acceptor_socket(
274 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
275 : {
276 62 : auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
277 62 : select_impl->close_socket();
278 :
279 62 : int fd = ::socket(family, type, protocol);
280 62 : if (fd < 0)
281 MIS 0 : return make_err(errno);
282 :
283 HIT 62 : int flags = ::fcntl(fd, F_GETFL, 0);
284 62 : if (flags == -1)
285 : {
286 MIS 0 : int errn = errno;
287 0 : ::close(fd);
288 0 : return make_err(errn);
289 : }
290 HIT 62 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
291 : {
292 MIS 0 : int errn = errno;
293 0 : ::close(fd);
294 0 : return make_err(errn);
295 : }
296 HIT 62 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
297 : {
298 MIS 0 : int errn = errno;
299 0 : ::close(fd);
300 0 : return make_err(errn);
301 : }
302 :
303 HIT 62 : if (fd >= FD_SETSIZE)
304 : {
305 MIS 0 : ::close(fd);
306 0 : return make_err(EMFILE);
307 : }
308 :
309 HIT 62 : if (family == AF_INET6)
310 : {
311 8 : int val = 0; // dual-stack default
312 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
313 : }
314 :
315 : #ifdef SO_NOSIGPIPE
316 : {
317 : int nosig = 1;
318 : ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
319 : }
320 : #endif
321 :
322 62 : select_impl->fd_ = fd;
323 :
324 : // Set up descriptor state but do NOT register with reactor yet
325 : // (registration happens in do_listen via reactor_acceptor base)
326 62 : select_impl->desc_state_.fd = fd;
327 : {
328 62 : std::lock_guard lock(select_impl->desc_state_.mutex);
329 62 : select_impl->desc_state_.read_op = nullptr;
330 62 : }
331 :
332 62 : return {};
333 : }
334 :
335 : inline std::error_code
336 61 : select_tcp_acceptor_service::bind_acceptor(
337 : tcp_acceptor::implementation& impl, endpoint ep)
338 : {
339 61 : return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
340 : }
341 :
342 : inline std::error_code
343 58 : select_tcp_acceptor_service::listen_acceptor(
344 : tcp_acceptor::implementation& impl, int backlog)
345 : {
346 58 : return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
347 : }
348 :
349 : } // namespace boost::corosio::detail
350 :
351 : #endif // BOOST_COROSIO_HAS_SELECT
352 :
353 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
|