include/boost/corosio/native/detail/select/select_tcp_acceptor_service.hpp

53.5% Lines (83/155) 90.9% List of functions (10/11)
select_tcp_acceptor_service.hpp
f(x) Functions (11)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <memory>
28 #include <mutex>
29 #include <utility>
30
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <netinet/in.h>
34 #include <sys/select.h>
35 #include <sys/socket.h>
36 #include <unistd.h>
37
38 namespace boost::corosio::detail {
39
40 /** select acceptor service implementation.
41
42 Inherits from tcp_acceptor_service to enable runtime polymorphism.
43 Uses key_type = tcp_acceptor_service for service lookup.
44 */
45 class BOOST_COROSIO_DECL select_tcp_acceptor_service final
46 : public reactor_acceptor_service<
47 select_tcp_acceptor_service,
48 tcp_acceptor_service,
49 select_scheduler,
50 select_tcp_acceptor,
51 select_tcp_service>
52 {
53 using base_type = reactor_acceptor_service<
54 select_tcp_acceptor_service,
55 tcp_acceptor_service,
56 select_scheduler,
57 select_tcp_acceptor,
58 select_tcp_service>;
59 friend base_type;
60
61 public:
62 explicit select_tcp_acceptor_service(capy::execution_context& ctx);
63 ~select_tcp_acceptor_service() override;
64
65 std::error_code open_acceptor_socket(
66 tcp_acceptor::implementation& impl,
67 int family,
68 int type,
69 int protocol) override;
70 std::error_code
71 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
72 std::error_code
73 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
74 };
75
76 inline void
77 select_accept_op::cancel() noexcept
78 {
79 if (acceptor_impl_)
80 acceptor_impl_->cancel_single_op(*this);
81 else
82 request_cancel();
83 }
84
85 inline void
86 3153x select_accept_op::operator()()
87 {
88 3153x complete_accept_op<select_tcp_socket>(*this);
89 3153x }
90
91 65x inline select_tcp_acceptor::select_tcp_acceptor(
92 65x select_tcp_acceptor_service& svc) noexcept
93 65x : reactor_acceptor(svc)
94 {
95 65x }
96
97 inline std::coroutine_handle<>
98 3153x select_tcp_acceptor::accept(
99 std::coroutine_handle<> h,
100 capy::executor_ref ex,
101 std::stop_token token,
102 std::error_code* ec,
103 io_object::implementation** impl_out)
104 {
105 3153x auto& op = acc_;
106 3153x op.reset();
107 3153x op.h = h;
108 3153x op.ex = ex;
109 3153x op.ec_out = ec;
110 3153x op.impl_out = impl_out;
111 3153x op.fd = fd_;
112 3153x op.start(token, this);
113
114 3153x sockaddr_storage peer_storage{};
115 socklen_t addrlen;
116 int accepted;
117 do
118 {
119 3153x addrlen = sizeof(peer_storage);
120 accepted =
121 3153x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
122 }
123 3153x while (accepted < 0 && errno == EINTR);
124
125 3153x if (accepted >= 0)
126 {
127 3x if (accepted >= FD_SETSIZE)
128 {
129 ::close(accepted);
130 op.complete(EINVAL, 0);
131 op.impl_ptr = shared_from_this();
132 svc_.post(&op);
133 return std::noop_coroutine();
134 }
135
136 3x int flags = ::fcntl(accepted, F_GETFL, 0);
137 3x if (flags == -1)
138 {
139 int err = errno;
140 ::close(accepted);
141 op.complete(err, 0);
142 op.impl_ptr = shared_from_this();
143 svc_.post(&op);
144 return std::noop_coroutine();
145 }
146
147 3x if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
148 {
149 int err = errno;
150 ::close(accepted);
151 op.complete(err, 0);
152 op.impl_ptr = shared_from_this();
153 svc_.post(&op);
154 return std::noop_coroutine();
155 }
156
157 3x if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
158 {
159 int err = errno;
160 ::close(accepted);
161 op.complete(err, 0);
162 op.impl_ptr = shared_from_this();
163 svc_.post(&op);
164 return std::noop_coroutine();
165 }
166
167 #ifdef SO_NOSIGPIPE
168 {
169 int one = 1;
170 ::setsockopt(
171 accepted, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
172 }
173 #endif
174
175 {
176 3x std::lock_guard lock(desc_state_.mutex);
177 3x desc_state_.read_ready = false;
178 3x }
179
180 3x if (svc_.scheduler().try_consume_inline_budget())
181 {
182 auto* socket_svc = svc_.stream_service();
183 if (socket_svc)
184 {
185 auto& impl =
186 static_cast<select_tcp_socket&>(*socket_svc->construct());
187 impl.set_socket(accepted);
188
189 impl.desc_state_.fd = accepted;
190 {
191 std::lock_guard lock(impl.desc_state_.mutex);
192 impl.desc_state_.read_op = nullptr;
193 impl.desc_state_.write_op = nullptr;
194 impl.desc_state_.connect_op = nullptr;
195 }
196 socket_svc->scheduler().register_descriptor(
197 accepted, &impl.desc_state_);
198
199 impl.set_endpoints(
200 local_endpoint_, from_sockaddr(peer_storage));
201
202 *ec = {};
203 if (impl_out)
204 *impl_out = &impl;
205 }
206 else
207 {
208 ::close(accepted);
209 *ec = make_err(ENOENT);
210 if (impl_out)
211 *impl_out = nullptr;
212 }
213 op.cont_op.cont.h = h;
214 return dispatch_coro(ex, op.cont_op.cont);
215 }
216
217 3x op.accepted_fd = accepted;
218 3x op.peer_storage = peer_storage;
219 3x op.peer_addrlen = addrlen;
220 3x op.complete(0, 0);
221 3x op.impl_ptr = shared_from_this();
222 3x svc_.post(&op);
223 3x return std::noop_coroutine();
224 }
225
226 3150x if (errno == EAGAIN || errno == EWOULDBLOCK)
227 {
228 3150x op.impl_ptr = shared_from_this();
229 3150x svc_.work_started();
230
231 3150x std::lock_guard lock(desc_state_.mutex);
232 3150x bool io_done = false;
233 3150x if (desc_state_.read_ready)
234 {
235 desc_state_.read_ready = false;
236 op.perform_io();
237 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
238 if (!io_done)
239 op.errn = 0;
240 }
241
242 3150x if (io_done || op.cancelled.load(std::memory_order_acquire))
243 {
244 svc_.post(&op);
245 svc_.work_finished();
246 }
247 else
248 {
249 3150x desc_state_.read_op = &op;
250 }
251 3150x return std::noop_coroutine();
252 3150x }
253
254 op.complete(errno, 0);
255 op.impl_ptr = shared_from_this();
256 svc_.post(&op);
257 return std::noop_coroutine();
258 }
259
260 inline void
261 2x select_tcp_acceptor::cancel() noexcept
262 {
263 2x do_cancel();
264 2x }
265
266 inline void
267 254x select_tcp_acceptor::close_socket() noexcept
268 {
269 254x do_close_socket();
270 254x }
271
272 229x inline select_tcp_acceptor_service::select_tcp_acceptor_service(
273 229x capy::execution_context& ctx)
274 229x : base_type(ctx)
275 {
276 229x auto* svc = ctx_.find_service<detail::tcp_service>();
277 229x stream_svc_ = svc ? dynamic_cast<select_tcp_service*>(svc) : nullptr;
278 229x }
279
280 458x inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
281
282 inline std::error_code
283 62x select_tcp_acceptor_service::open_acceptor_socket(
284 tcp_acceptor::implementation& impl, int family, int type, int protocol)
285 {
286 62x auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
287 62x select_impl->close_socket();
288
289 62x int fd = ::socket(family, type, protocol);
290 62x if (fd < 0)
291 return make_err(errno);
292
293 62x int flags = ::fcntl(fd, F_GETFL, 0);
294 62x if (flags == -1)
295 {
296 int errn = errno;
297 ::close(fd);
298 return make_err(errn);
299 }
300 62x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
301 {
302 int errn = errno;
303 ::close(fd);
304 return make_err(errn);
305 }
306 62x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
307 {
308 int errn = errno;
309 ::close(fd);
310 return make_err(errn);
311 }
312
313 62x if (fd >= FD_SETSIZE)
314 {
315 ::close(fd);
316 return make_err(EMFILE);
317 }
318
319 62x if (family == AF_INET6)
320 {
321 8x int val = 0; // dual-stack default
322 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
323 }
324
325 #ifdef SO_NOSIGPIPE
326 {
327 int nosig = 1;
328 ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
329 }
330 #endif
331
332 62x select_impl->fd_ = fd;
333
334 // Set up descriptor state but do NOT register with reactor yet
335 // (registration happens in do_listen via reactor_acceptor base)
336 62x select_impl->desc_state_.fd = fd;
337 {
338 62x std::lock_guard lock(select_impl->desc_state_.mutex);
339 62x select_impl->desc_state_.read_op = nullptr;
340 62x }
341
342 62x return {};
343 }
344
345 inline std::error_code
346 61x select_tcp_acceptor_service::bind_acceptor(
347 tcp_acceptor::implementation& impl, endpoint ep)
348 {
349 61x return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
350 }
351
352 inline std::error_code
353 58x select_tcp_acceptor_service::listen_acceptor(
354 tcp_acceptor::implementation& impl, int backlog)
355 {
356 58x return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
357 }
358
359 } // namespace boost::corosio::detail
360
361 #endif // BOOST_COROSIO_HAS_SELECT
362
363 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
364