include/boost/corosio/native/detail/select/select_local_stream_acceptor_service.hpp

40.5% Lines (64/158) 75.0% List of functions (9/12)
select_local_stream_acceptor_service.hpp
f(x) Functions (12)
Function Calls Lines Blocks
boost::corosio::detail::select_local_accept_op::cancel() :81 0 0.0% 0.0% boost::corosio::detail::select_local_accept_op::operator()() :90 2x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor::select_local_stream_acceptor(boost::corosio::detail::select_local_stream_acceptor_service&) :95 6x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :102 2x 25.0% 20.0% boost::corosio::detail::select_local_stream_acceptor::cancel() :258 0 0.0% 0.0% boost::corosio::detail::select_local_stream_acceptor::close_socket() :264 24x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor::release_socket() :270 0 0.0% 0.0% boost::corosio::detail::select_local_stream_acceptor_service::select_local_stream_acceptor_service(boost::capy::execution_context&) :275 229x 100.0% 73.0% boost::corosio::detail::select_local_stream_acceptor_service::~select_local_stream_acceptor_service() :287 458x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor_service::open_acceptor_socket(boost::corosio::local_stream_acceptor::implementation&, int, int, int) :291 6x 57.1% 65.0% boost::corosio::detail::select_local_stream_acceptor_service::bind_acceptor(boost::corosio::local_stream_acceptor::implementation&, boost::corosio::local_endpoint) :344 6x 100.0% 100.0% boost::corosio::detail::select_local_stream_acceptor_service::listen_acceptor(boost::corosio::local_stream_acceptor::implementation&, int) :351 2x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_local_stream_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_local_stream_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <memory>
28 #include <mutex>
29 #include <utility>
30
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <sys/select.h>
34 #include <sys/socket.h>
35 #include <sys/un.h>
36 #include <unistd.h>
37
38 namespace boost::corosio::detail {
39
40 /* select local stream acceptor service implementation.
41
42 Inherits from local_stream_acceptor_service to enable runtime
43 polymorphism. Uses key_type = local_stream_acceptor_service
44 for service lookup.
45 */
46 class BOOST_COROSIO_DECL select_local_stream_acceptor_service final
47 : public reactor_acceptor_service<
48 select_local_stream_acceptor_service,
49 local_stream_acceptor_service,
50 select_scheduler,
51 select_local_stream_acceptor,
52 select_local_stream_service>
53 {
54 using base_type = reactor_acceptor_service<
55 select_local_stream_acceptor_service,
56 local_stream_acceptor_service,
57 select_scheduler,
58 select_local_stream_acceptor,
59 select_local_stream_service>;
60 friend base_type;
61
62 public:
63 explicit select_local_stream_acceptor_service(
64 capy::execution_context& ctx);
65 ~select_local_stream_acceptor_service() override;
66
67 std::error_code open_acceptor_socket(
68 local_stream_acceptor::implementation& impl,
69 int family,
70 int type,
71 int protocol) override;
72 std::error_code bind_acceptor(
73 local_stream_acceptor::implementation& impl,
74 corosio::local_endpoint ep) override;
75 std::error_code listen_acceptor(
76 local_stream_acceptor::implementation& impl,
77 int backlog) override;
78 };
79
80 inline void
81 select_local_accept_op::cancel() noexcept
82 {
83 if (acceptor_impl_)
84 acceptor_impl_->cancel_single_op(*this);
85 else
86 request_cancel();
87 }
88
89 inline void
90 2x select_local_accept_op::operator()()
91 {
92 2x complete_accept_op<select_local_stream_socket>(*this);
93 2x }
94
95 6x inline select_local_stream_acceptor::select_local_stream_acceptor(
96 6x select_local_stream_acceptor_service& svc) noexcept
97 6x : reactor_acceptor(svc)
98 {
99 6x }
100
101 inline std::coroutine_handle<>
102 2x select_local_stream_acceptor::accept(
103 std::coroutine_handle<> h,
104 capy::executor_ref ex,
105 std::stop_token token,
106 std::error_code* ec,
107 io_object::implementation** impl_out)
108 {
109 2x auto& op = acc_;
110 2x op.reset();
111 2x op.h = h;
112 2x op.ex = ex;
113 2x op.ec_out = ec;
114 2x op.impl_out = impl_out;
115 2x op.fd = fd_;
116 2x op.start(token, this);
117
118 2x sockaddr_storage peer_storage{};
119 2x socklen_t addrlen = sizeof(peer_storage);
120 int accepted;
121 do
122 {
123 accepted =
124 2x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
125 }
126 2x while (accepted < 0 && errno == EINTR);
127
128 2x if (accepted >= 0)
129 {
130 if (accepted >= FD_SETSIZE)
131 {
132 ::close(accepted);
133 op.complete(EINVAL, 0);
134 op.impl_ptr = shared_from_this();
135 svc_.post(&op);
136 return std::noop_coroutine();
137 }
138
139 int flags = ::fcntl(accepted, F_GETFL, 0);
140 if (flags == -1)
141 {
142 int err = errno;
143 ::close(accepted);
144 op.complete(err, 0);
145 op.impl_ptr = shared_from_this();
146 svc_.post(&op);
147 return std::noop_coroutine();
148 }
149
150 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
151 {
152 int err = errno;
153 ::close(accepted);
154 op.complete(err, 0);
155 op.impl_ptr = shared_from_this();
156 svc_.post(&op);
157 return std::noop_coroutine();
158 }
159
160 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
161 {
162 int err = errno;
163 ::close(accepted);
164 op.complete(err, 0);
165 op.impl_ptr = shared_from_this();
166 svc_.post(&op);
167 return std::noop_coroutine();
168 }
169
170 {
171 std::lock_guard lock(desc_state_.mutex);
172 desc_state_.read_ready = false;
173 }
174
175 if (svc_.scheduler().try_consume_inline_budget())
176 {
177 auto* socket_svc = svc_.stream_service();
178 if (socket_svc)
179 {
180 auto& impl =
181 static_cast<select_local_stream_socket&>(
182 *socket_svc->construct());
183 impl.set_socket(accepted);
184
185 impl.desc_state_.fd = accepted;
186 {
187 std::lock_guard lock(impl.desc_state_.mutex);
188 impl.desc_state_.read_op = nullptr;
189 impl.desc_state_.write_op = nullptr;
190 impl.desc_state_.connect_op = nullptr;
191 }
192 socket_svc->scheduler().register_descriptor(
193 accepted, &impl.desc_state_);
194
195 impl.set_endpoints(
196 local_endpoint_,
197 from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
198
199 *ec = {};
200 if (impl_out)
201 *impl_out = &impl;
202 }
203 else
204 {
205 ::close(accepted);
206 *ec = make_err(ENOENT);
207 if (impl_out)
208 *impl_out = nullptr;
209 }
210 op.cont_op.cont.h = h;
211 return dispatch_coro(ex, op.cont_op.cont);
212 }
213
214 op.accepted_fd = accepted;
215 op.peer_storage = peer_storage;
216 op.peer_addrlen = addrlen;
217 op.complete(0, 0);
218 op.impl_ptr = shared_from_this();
219 svc_.post(&op);
220 return std::noop_coroutine();
221 }
222
223 2x if (errno == EAGAIN || errno == EWOULDBLOCK)
224 {
225 2x op.impl_ptr = shared_from_this();
226 2x svc_.work_started();
227
228 2x std::lock_guard lock(desc_state_.mutex);
229 2x bool io_done = false;
230 2x if (desc_state_.read_ready)
231 {
232 desc_state_.read_ready = false;
233 op.perform_io();
234 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
235 if (!io_done)
236 op.errn = 0;
237 }
238
239 2x if (io_done || op.cancelled.load(std::memory_order_acquire))
240 {
241 svc_.post(&op);
242 svc_.work_finished();
243 }
244 else
245 {
246 2x desc_state_.read_op = &op;
247 }
248 2x return std::noop_coroutine();
249 2x }
250
251 op.complete(errno, 0);
252 op.impl_ptr = shared_from_this();
253 svc_.post(&op);
254 return std::noop_coroutine();
255 }
256
257 inline void
258 select_local_stream_acceptor::cancel() noexcept
259 {
260 do_cancel();
261 }
262
263 inline void
264 24x select_local_stream_acceptor::close_socket() noexcept
265 {
266 24x do_close_socket();
267 24x }
268
269 inline native_handle_type
270 select_local_stream_acceptor::release_socket() noexcept
271 {
272 return this->do_release_socket();
273 }
274
275 229x inline select_local_stream_acceptor_service::
276 229x select_local_stream_acceptor_service(capy::execution_context& ctx)
277 229x : base_type(ctx)
278 {
279 229x auto* svc = ctx_.find_service<detail::local_stream_service>();
280 229x stream_svc_ = svc
281 229x ? dynamic_cast<select_local_stream_service*>(svc)
282 : nullptr;
283 229x assert(stream_svc_ &&
284 "local_stream_service must be registered before acceptor service");
285 229x }
286
287 458x inline select_local_stream_acceptor_service::
288 458x ~select_local_stream_acceptor_service() {}
289
290 inline std::error_code
291 6x select_local_stream_acceptor_service::open_acceptor_socket(
292 local_stream_acceptor::implementation& impl,
293 int family,
294 int type,
295 int protocol)
296 {
297 6x auto* select_impl = static_cast<select_local_stream_acceptor*>(&impl);
298 6x select_impl->close_socket();
299
300 6x int fd = ::socket(family, type, protocol);
301 6x if (fd < 0)
302 return make_err(errno);
303
304 6x int flags = ::fcntl(fd, F_GETFL, 0);
305 6x if (flags == -1)
306 {
307 int errn = errno;
308 ::close(fd);
309 return make_err(errn);
310 }
311 6x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
312 {
313 int errn = errno;
314 ::close(fd);
315 return make_err(errn);
316 }
317 6x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
318 {
319 int errn = errno;
320 ::close(fd);
321 return make_err(errn);
322 }
323
324 6x if (fd >= FD_SETSIZE)
325 {
326 ::close(fd);
327 return make_err(EMFILE);
328 }
329
330 6x select_impl->fd_ = fd;
331
332 // Set up descriptor state but do NOT register with select yet
333 // (registration happens in do_listen via reactor_acceptor base)
334 6x select_impl->desc_state_.fd = fd;
335 {
336 6x std::lock_guard lock(select_impl->desc_state_.mutex);
337 6x select_impl->desc_state_.read_op = nullptr;
338 6x }
339
340 6x return {};
341 }
342
343 inline std::error_code
344 6x select_local_stream_acceptor_service::bind_acceptor(
345 local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
346 {
347 6x return static_cast<select_local_stream_acceptor*>(&impl)->do_bind(ep);
348 }
349
350 inline std::error_code
351 2x select_local_stream_acceptor_service::listen_acceptor(
352 local_stream_acceptor::implementation& impl, int backlog)
353 {
354 2x return static_cast<select_local_stream_acceptor*>(&impl)->do_listen(backlog);
355 }
356
357 } // namespace boost::corosio::detail
358
359 #endif // BOOST_COROSIO_HAS_SELECT
360
361 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
362