include/boost/corosio/native/detail/select/select_tcp_acceptor_service.hpp

53.5% Lines (83/155) 90.9% List of functions (10/11)
select_tcp_acceptor_service.hpp
f(x) Functions (11)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <memory>
28 #include <mutex>
29 #include <utility>
30
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <netinet/in.h>
34 #include <sys/select.h>
35 #include <sys/socket.h>
36 #include <unistd.h>
37
38 namespace boost::corosio::detail {
39
40 /** select acceptor service implementation.
41
42 Inherits from tcp_acceptor_service to enable runtime polymorphism.
43 Uses key_type = tcp_acceptor_service for service lookup.
44 */
45 class BOOST_COROSIO_DECL select_tcp_acceptor_service final
46 : public reactor_acceptor_service<
47 select_tcp_acceptor_service,
48 tcp_acceptor_service,
49 select_scheduler,
50 select_tcp_acceptor,
51 select_tcp_service>
52 {
53 using base_type = reactor_acceptor_service<
54 select_tcp_acceptor_service,
55 tcp_acceptor_service,
56 select_scheduler,
57 select_tcp_acceptor,
58 select_tcp_service>;
59 friend base_type;
60
61 public:
62 explicit select_tcp_acceptor_service(capy::execution_context& ctx);
63 ~select_tcp_acceptor_service() override;
64
65 std::error_code open_acceptor_socket(
66 tcp_acceptor::implementation& impl,
67 int family,
68 int type,
69 int protocol) override;
70 std::error_code
71 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
72 std::error_code
73 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
74 };
75
76 inline void
77 select_accept_op::cancel() noexcept
78 {
79 if (acceptor_impl_)
80 acceptor_impl_->cancel_single_op(*this);
81 else
82 request_cancel();
83 }
84
85 inline void
86 3226x select_accept_op::operator()()
87 {
88 3226x complete_accept_op<select_tcp_socket>(*this);
89 3226x }
90
91 65x inline select_tcp_acceptor::select_tcp_acceptor(
92 65x select_tcp_acceptor_service& svc) noexcept
93 65x : reactor_acceptor(svc)
94 {
95 65x }
96
97 inline std::coroutine_handle<>
98 3226x select_tcp_acceptor::accept(
99 std::coroutine_handle<> h,
100 capy::executor_ref ex,
101 std::stop_token token,
102 std::error_code* ec,
103 io_object::implementation** impl_out)
104 {
105 3226x auto& op = acc_;
106 3226x op.reset();
107 3226x op.h = h;
108 3226x op.ex = ex;
109 3226x op.ec_out = ec;
110 3226x op.impl_out = impl_out;
111 3226x op.fd = fd_;
112 3226x op.start(token, this);
113
114 3226x sockaddr_storage peer_storage{};
115 3226x socklen_t addrlen = sizeof(peer_storage);
116 int accepted;
117 do
118 {
119 accepted =
120 3226x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
121 }
122 3226x while (accepted < 0 && errno == EINTR);
123
124 3226x if (accepted >= 0)
125 {
126 3x if (accepted >= FD_SETSIZE)
127 {
128 ::close(accepted);
129 op.complete(EINVAL, 0);
130 op.impl_ptr = shared_from_this();
131 svc_.post(&op);
132 return std::noop_coroutine();
133 }
134
135 3x int flags = ::fcntl(accepted, F_GETFL, 0);
136 3x if (flags == -1)
137 {
138 int err = errno;
139 ::close(accepted);
140 op.complete(err, 0);
141 op.impl_ptr = shared_from_this();
142 svc_.post(&op);
143 return std::noop_coroutine();
144 }
145
146 3x if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
147 {
148 int err = errno;
149 ::close(accepted);
150 op.complete(err, 0);
151 op.impl_ptr = shared_from_this();
152 svc_.post(&op);
153 return std::noop_coroutine();
154 }
155
156 3x if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
157 {
158 int err = errno;
159 ::close(accepted);
160 op.complete(err, 0);
161 op.impl_ptr = shared_from_this();
162 svc_.post(&op);
163 return std::noop_coroutine();
164 }
165
166 {
167 3x std::lock_guard lock(desc_state_.mutex);
168 3x desc_state_.read_ready = false;
169 3x }
170
171 3x if (svc_.scheduler().try_consume_inline_budget())
172 {
173 auto* socket_svc = svc_.stream_service();
174 if (socket_svc)
175 {
176 auto& impl =
177 static_cast<select_tcp_socket&>(*socket_svc->construct());
178 impl.set_socket(accepted);
179
180 impl.desc_state_.fd = accepted;
181 {
182 std::lock_guard lock(impl.desc_state_.mutex);
183 impl.desc_state_.read_op = nullptr;
184 impl.desc_state_.write_op = nullptr;
185 impl.desc_state_.connect_op = nullptr;
186 }
187 socket_svc->scheduler().register_descriptor(
188 accepted, &impl.desc_state_);
189
190 impl.set_endpoints(
191 local_endpoint_, from_sockaddr(peer_storage));
192
193 *ec = {};
194 if (impl_out)
195 *impl_out = &impl;
196 }
197 else
198 {
199 ::close(accepted);
200 *ec = make_err(ENOENT);
201 if (impl_out)
202 *impl_out = nullptr;
203 }
204 op.cont_op.cont.h = h;
205 return dispatch_coro(ex, op.cont_op.cont);
206 }
207
208 3x op.accepted_fd = accepted;
209 3x op.peer_storage = peer_storage;
210 3x op.peer_addrlen = addrlen;
211 3x op.complete(0, 0);
212 3x op.impl_ptr = shared_from_this();
213 3x svc_.post(&op);
214 3x return std::noop_coroutine();
215 }
216
217 3223x if (errno == EAGAIN || errno == EWOULDBLOCK)
218 {
219 3223x op.impl_ptr = shared_from_this();
220 3223x svc_.work_started();
221
222 3223x std::lock_guard lock(desc_state_.mutex);
223 3223x bool io_done = false;
224 3223x if (desc_state_.read_ready)
225 {
226 desc_state_.read_ready = false;
227 op.perform_io();
228 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
229 if (!io_done)
230 op.errn = 0;
231 }
232
233 3223x if (io_done || op.cancelled.load(std::memory_order_acquire))
234 {
235 svc_.post(&op);
236 svc_.work_finished();
237 }
238 else
239 {
240 3223x desc_state_.read_op = &op;
241 }
242 3223x return std::noop_coroutine();
243 3223x }
244
245 op.complete(errno, 0);
246 op.impl_ptr = shared_from_this();
247 svc_.post(&op);
248 return std::noop_coroutine();
249 }
250
251 inline void
252 2x select_tcp_acceptor::cancel() noexcept
253 {
254 2x do_cancel();
255 2x }
256
257 inline void
258 254x select_tcp_acceptor::close_socket() noexcept
259 {
260 254x do_close_socket();
261 254x }
262
263 229x inline select_tcp_acceptor_service::select_tcp_acceptor_service(
264 229x capy::execution_context& ctx)
265 229x : base_type(ctx)
266 {
267 229x auto* svc = ctx_.find_service<detail::tcp_service>();
268 229x stream_svc_ = svc ? dynamic_cast<select_tcp_service*>(svc) : nullptr;
269 229x }
270
271 458x inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
272
273 inline std::error_code
274 62x select_tcp_acceptor_service::open_acceptor_socket(
275 tcp_acceptor::implementation& impl, int family, int type, int protocol)
276 {
277 62x auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
278 62x select_impl->close_socket();
279
280 62x int fd = ::socket(family, type, protocol);
281 62x if (fd < 0)
282 return make_err(errno);
283
284 62x int flags = ::fcntl(fd, F_GETFL, 0);
285 62x if (flags == -1)
286 {
287 int errn = errno;
288 ::close(fd);
289 return make_err(errn);
290 }
291 62x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
292 {
293 int errn = errno;
294 ::close(fd);
295 return make_err(errn);
296 }
297 62x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
298 {
299 int errn = errno;
300 ::close(fd);
301 return make_err(errn);
302 }
303
304 62x if (fd >= FD_SETSIZE)
305 {
306 ::close(fd);
307 return make_err(EMFILE);
308 }
309
310 62x if (family == AF_INET6)
311 {
312 8x int val = 0; // dual-stack default
313 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
314 }
315
316 #ifdef SO_NOSIGPIPE
317 {
318 int nosig = 1;
319 ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
320 }
321 #endif
322
323 62x select_impl->fd_ = fd;
324
325 // Set up descriptor state but do NOT register with reactor yet
326 // (registration happens in do_listen via reactor_acceptor base)
327 62x select_impl->desc_state_.fd = fd;
328 {
329 62x std::lock_guard lock(select_impl->desc_state_.mutex);
330 62x select_impl->desc_state_.read_op = nullptr;
331 62x }
332
333 62x return {};
334 }
335
336 inline std::error_code
337 61x select_tcp_acceptor_service::bind_acceptor(
338 tcp_acceptor::implementation& impl, endpoint ep)
339 {
340 61x return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
341 }
342
343 inline std::error_code
344 58x select_tcp_acceptor_service::listen_acceptor(
345 tcp_acceptor::implementation& impl, int backlog)
346 {
347 58x return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
348 }
349
350 } // namespace boost::corosio::detail
351
352 #endif // BOOST_COROSIO_HAS_SELECT
353
354 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
355