include/boost/corosio/native/detail/select/select_local_stream_acceptor_service.hpp

40.5% Lines (64/158) 75.0% List of functions (9/12)
select_local_stream_acceptor_service.hpp
f(x) Functions (12)
Function Calls Lines Blocks
boost::corosio::detail::select_local_accept_op::cancel() :82 0 0.0% 0.0% boost::corosio::detail::select_local_accept_op::operator()() :91 2x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor::select_local_stream_acceptor(boost::corosio::detail::select_local_stream_acceptor_service&) :96 6x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :103 2x 25.0% 20.0% boost::corosio::detail::select_local_stream_acceptor::cancel() :268 0 0.0% 0.0% boost::corosio::detail::select_local_stream_acceptor::close_socket() :274 24x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor::release_socket() :280 0 0.0% 0.0% boost::corosio::detail::select_local_stream_acceptor_service::select_local_stream_acceptor_service(boost::capy::execution_context&) :285 229x 100.0% 73.0% boost::corosio::detail::select_local_stream_acceptor_service::~select_local_stream_acceptor_service() :297 458x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor_service::open_acceptor_socket(boost::corosio::local_stream_acceptor::implementation&, int, int, int) :301 6x 57.1% 65.0% boost::corosio::detail::select_local_stream_acceptor_service::bind_acceptor(boost::corosio::local_stream_acceptor::implementation&, boost::corosio::local_endpoint) :354 6x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor_service::listen_acceptor(boost::corosio::local_stream_acceptor::implementation&, int) :361 2x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_local_stream_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_local_stream_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <cassert>
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <sys/select.h>
35 #include <sys/socket.h>
36 #include <sys/un.h>
37 #include <unistd.h>
38
39 namespace boost::corosio::detail {
40
41 /* select local stream acceptor service implementation.
42
43 Inherits from local_stream_acceptor_service to enable runtime
44 polymorphism. Uses key_type = local_stream_acceptor_service
45 for service lookup.
46 */
47 class BOOST_COROSIO_DECL select_local_stream_acceptor_service final
48 : public reactor_acceptor_service<
49 select_local_stream_acceptor_service,
50 local_stream_acceptor_service,
51 select_scheduler,
52 select_local_stream_acceptor,
53 select_local_stream_service>
54 {
55 using base_type = reactor_acceptor_service<
56 select_local_stream_acceptor_service,
57 local_stream_acceptor_service,
58 select_scheduler,
59 select_local_stream_acceptor,
60 select_local_stream_service>;
61 friend base_type;
62
63 public:
64 explicit select_local_stream_acceptor_service(
65 capy::execution_context& ctx);
66 ~select_local_stream_acceptor_service() override;
67
68 std::error_code open_acceptor_socket(
69 local_stream_acceptor::implementation& impl,
70 int family,
71 int type,
72 int protocol) override;
73 std::error_code bind_acceptor(
74 local_stream_acceptor::implementation& impl,
75 corosio::local_endpoint ep) override;
76 std::error_code listen_acceptor(
77 local_stream_acceptor::implementation& impl,
78 int backlog) override;
79 };
80
81 inline void
82 select_local_accept_op::cancel() noexcept
83 {
84 if (acceptor_impl_)
85 acceptor_impl_->cancel_single_op(*this);
86 else
87 request_cancel();
88 }
89
90 inline void
91 2x select_local_accept_op::operator()()
92 {
93 2x complete_accept_op<select_local_stream_socket>(*this);
94 2x }
95
96 6x inline select_local_stream_acceptor::select_local_stream_acceptor(
97 6x select_local_stream_acceptor_service& svc) noexcept
98 6x : reactor_acceptor(svc)
99 {
100 6x }
101
102 inline std::coroutine_handle<>
103 2x select_local_stream_acceptor::accept(
104 std::coroutine_handle<> h,
105 capy::executor_ref ex,
106 std::stop_token token,
107 std::error_code* ec,
108 io_object::implementation** impl_out)
109 {
110 2x auto& op = acc_;
111 2x op.reset();
112 2x op.h = h;
113 2x op.ex = ex;
114 2x op.ec_out = ec;
115 2x op.impl_out = impl_out;
116 2x op.fd = fd_;
117 2x op.start(token, this);
118
119 2x sockaddr_storage peer_storage{};
120 socklen_t addrlen;
121 int accepted;
122 do
123 {
124 2x addrlen = sizeof(peer_storage);
125 accepted =
126 2x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
127 }
128 2x while (accepted < 0 && errno == EINTR);
129
130 2x if (accepted >= 0)
131 {
132 if (accepted >= FD_SETSIZE)
133 {
134 ::close(accepted);
135 op.complete(EMFILE, 0);
136 op.impl_ptr = shared_from_this();
137 svc_.post(&op);
138 return std::noop_coroutine();
139 }
140
141 int flags = ::fcntl(accepted, F_GETFL, 0);
142 if (flags == -1)
143 {
144 int err = errno;
145 ::close(accepted);
146 op.complete(err, 0);
147 op.impl_ptr = shared_from_this();
148 svc_.post(&op);
149 return std::noop_coroutine();
150 }
151
152 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
153 {
154 int err = errno;
155 ::close(accepted);
156 op.complete(err, 0);
157 op.impl_ptr = shared_from_this();
158 svc_.post(&op);
159 return std::noop_coroutine();
160 }
161
162 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
163 {
164 int err = errno;
165 ::close(accepted);
166 op.complete(err, 0);
167 op.impl_ptr = shared_from_this();
168 svc_.post(&op);
169 return std::noop_coroutine();
170 }
171
172 #ifdef SO_NOSIGPIPE
173 {
174 int one = 1;
175 ::setsockopt(
176 accepted, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
177 }
178 #endif
179
180 {
181 std::lock_guard lock(desc_state_.mutex);
182 desc_state_.read_ready = false;
183 }
184
185 if (svc_.scheduler().try_consume_inline_budget())
186 {
187 auto* socket_svc = svc_.stream_service();
188 if (socket_svc)
189 {
190 auto& impl =
191 static_cast<select_local_stream_socket&>(
192 *socket_svc->construct());
193 impl.set_socket(accepted);
194
195 impl.desc_state_.fd = accepted;
196 {
197 std::lock_guard lock(impl.desc_state_.mutex);
198 impl.desc_state_.read_op = nullptr;
199 impl.desc_state_.write_op = nullptr;
200 impl.desc_state_.connect_op = nullptr;
201 }
202 socket_svc->scheduler().register_descriptor(
203 accepted, &impl.desc_state_);
204
205 impl.set_endpoints(
206 local_endpoint_,
207 from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
208
209 *ec = {};
210 if (impl_out)
211 *impl_out = &impl;
212 }
213 else
214 {
215 ::close(accepted);
216 *ec = make_err(ENOENT);
217 if (impl_out)
218 *impl_out = nullptr;
219 }
220 op.cont_op.cont.h = h;
221 return dispatch_coro(ex, op.cont_op.cont);
222 }
223
224 op.accepted_fd = accepted;
225 op.peer_storage = peer_storage;
226 op.peer_addrlen = addrlen;
227 op.complete(0, 0);
228 op.impl_ptr = shared_from_this();
229 svc_.post(&op);
230 return std::noop_coroutine();
231 }
232
233 2x if (errno == EAGAIN || errno == EWOULDBLOCK)
234 {
235 2x op.impl_ptr = shared_from_this();
236 2x svc_.work_started();
237
238 2x std::lock_guard lock(desc_state_.mutex);
239 2x bool io_done = false;
240 2x if (desc_state_.read_ready)
241 {
242 desc_state_.read_ready = false;
243 op.perform_io();
244 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
245 if (!io_done)
246 op.errn = 0;
247 }
248
249 2x if (io_done || op.cancelled.load(std::memory_order_acquire))
250 {
251 svc_.post(&op);
252 svc_.work_finished();
253 }
254 else
255 {
256 2x desc_state_.read_op = &op;
257 }
258 2x return std::noop_coroutine();
259 2x }
260
261 op.complete(errno, 0);
262 op.impl_ptr = shared_from_this();
263 svc_.post(&op);
264 return std::noop_coroutine();
265 }
266
267 inline void
268 select_local_stream_acceptor::cancel() noexcept
269 {
270 do_cancel();
271 }
272
273 inline void
274 24x select_local_stream_acceptor::close_socket() noexcept
275 {
276 24x do_close_socket();
277 24x }
278
279 inline native_handle_type
280 select_local_stream_acceptor::release_socket() noexcept
281 {
282 return this->do_release_socket();
283 }
284
285 229x inline select_local_stream_acceptor_service::
286 229x select_local_stream_acceptor_service(capy::execution_context& ctx)
287 229x : base_type(ctx)
288 {
289 229x auto* svc = ctx_.find_service<detail::local_stream_service>();
290 229x stream_svc_ = svc
291 229x ? dynamic_cast<select_local_stream_service*>(svc)
292 : nullptr;
293 229x assert(stream_svc_ &&
294 "local_stream_service must be registered before acceptor service");
295 229x }
296
297 458x inline select_local_stream_acceptor_service::
298 458x ~select_local_stream_acceptor_service() {}
299
300 inline std::error_code
301 6x select_local_stream_acceptor_service::open_acceptor_socket(
302 local_stream_acceptor::implementation& impl,
303 int family,
304 int type,
305 int protocol)
306 {
307 6x auto* select_impl = static_cast<select_local_stream_acceptor*>(&impl);
308 6x select_impl->close_socket();
309
310 6x int fd = ::socket(family, type, protocol);
311 6x if (fd < 0)
312 return make_err(errno);
313
314 6x int flags = ::fcntl(fd, F_GETFL, 0);
315 6x if (flags == -1)
316 {
317 int errn = errno;
318 ::close(fd);
319 return make_err(errn);
320 }
321 6x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
322 {
323 int errn = errno;
324 ::close(fd);
325 return make_err(errn);
326 }
327 6x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
328 {
329 int errn = errno;
330 ::close(fd);
331 return make_err(errn);
332 }
333
334 6x if (fd >= FD_SETSIZE)
335 {
336 ::close(fd);
337 return make_err(EMFILE);
338 }
339
340 6x select_impl->fd_ = fd;
341
342 // Set up descriptor state but do NOT register with select yet
343 // (registration happens in do_listen via reactor_acceptor base)
344 6x select_impl->desc_state_.fd = fd;
345 {
346 6x std::lock_guard lock(select_impl->desc_state_.mutex);
347 6x select_impl->desc_state_.read_op = nullptr;
348 6x }
349
350 6x return {};
351 }
352
353 inline std::error_code
354 6x select_local_stream_acceptor_service::bind_acceptor(
355 local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
356 {
357 6x return static_cast<select_local_stream_acceptor*>(&impl)->do_bind(ep);
358 }
359
360 inline std::error_code
361 2x select_local_stream_acceptor_service::listen_acceptor(
362 local_stream_acceptor::implementation& impl, int backlog)
363 {
364 2x return static_cast<select_local_stream_acceptor*>(&impl)->do_listen(backlog);
365 }
366
367 } // namespace boost::corosio::detail
368
369 #endif // BOOST_COROSIO_HAS_SELECT
370
371 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
372