TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_local_stream_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_local_stream_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26 :
27 : #include <cassert>
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <sys/select.h>
35 : #include <sys/socket.h>
36 : #include <sys/un.h>
37 : #include <unistd.h>
38 :
39 : namespace boost::corosio::detail {
40 :
41 : /* select local stream acceptor service implementation.
42 :
43 : Inherits from local_stream_acceptor_service to enable runtime
44 : polymorphism. Uses key_type = local_stream_acceptor_service
45 : for service lookup.
46 : */
47 : class BOOST_COROSIO_DECL select_local_stream_acceptor_service final
48 : : public reactor_acceptor_service<
49 : select_local_stream_acceptor_service,
50 : local_stream_acceptor_service,
51 : select_scheduler,
52 : select_local_stream_acceptor,
53 : select_local_stream_service>
54 : {
55 : using base_type = reactor_acceptor_service<
56 : select_local_stream_acceptor_service,
57 : local_stream_acceptor_service,
58 : select_scheduler,
59 : select_local_stream_acceptor,
60 : select_local_stream_service>;
61 : friend base_type;
62 :
63 : public:
64 : explicit select_local_stream_acceptor_service(
65 : capy::execution_context& ctx);
66 : ~select_local_stream_acceptor_service() override;
67 :
68 : std::error_code open_acceptor_socket(
69 : local_stream_acceptor::implementation& impl,
70 : int family,
71 : int type,
72 : int protocol) override;
73 : std::error_code bind_acceptor(
74 : local_stream_acceptor::implementation& impl,
75 : corosio::local_endpoint ep) override;
76 : std::error_code listen_acceptor(
77 : local_stream_acceptor::implementation& impl,
78 : int backlog) override;
79 : };
80 :
81 : inline void
82 MIS 0 : select_local_accept_op::cancel() noexcept
83 : {
84 0 : if (acceptor_impl_)
85 0 : acceptor_impl_->cancel_single_op(*this);
86 : else
87 0 : request_cancel();
88 0 : }
89 :
90 : inline void
91 HIT 2 : select_local_accept_op::operator()()
92 : {
93 2 : complete_accept_op<select_local_stream_socket>(*this);
94 2 : }
95 :
96 6 : inline select_local_stream_acceptor::select_local_stream_acceptor(
97 6 : select_local_stream_acceptor_service& svc) noexcept
98 6 : : reactor_acceptor(svc)
99 : {
100 6 : }
101 :
102 : inline std::coroutine_handle<>
103 2 : select_local_stream_acceptor::accept(
104 : std::coroutine_handle<> h,
105 : capy::executor_ref ex,
106 : std::stop_token token,
107 : std::error_code* ec,
108 : io_object::implementation** impl_out)
109 : {
110 2 : auto& op = acc_;
111 2 : op.reset();
112 2 : op.h = h;
113 2 : op.ex = ex;
114 2 : op.ec_out = ec;
115 2 : op.impl_out = impl_out;
116 2 : op.fd = fd_;
117 2 : op.start(token, this);
118 :
119 2 : sockaddr_storage peer_storage{};
120 : socklen_t addrlen;
121 : int accepted;
122 : do
123 : {
124 2 : addrlen = sizeof(peer_storage);
125 : accepted =
126 2 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
127 : }
128 2 : while (accepted < 0 && errno == EINTR);
129 :
130 2 : if (accepted >= 0)
131 : {
132 MIS 0 : if (accepted >= FD_SETSIZE)
133 : {
134 0 : ::close(accepted);
135 0 : op.complete(EMFILE, 0);
136 0 : op.impl_ptr = shared_from_this();
137 0 : svc_.post(&op);
138 0 : return std::noop_coroutine();
139 : }
140 :
141 0 : int flags = ::fcntl(accepted, F_GETFL, 0);
142 0 : if (flags == -1)
143 : {
144 0 : int err = errno;
145 0 : ::close(accepted);
146 0 : op.complete(err, 0);
147 0 : op.impl_ptr = shared_from_this();
148 0 : svc_.post(&op);
149 0 : return std::noop_coroutine();
150 : }
151 :
152 0 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
153 : {
154 0 : int err = errno;
155 0 : ::close(accepted);
156 0 : op.complete(err, 0);
157 0 : op.impl_ptr = shared_from_this();
158 0 : svc_.post(&op);
159 0 : return std::noop_coroutine();
160 : }
161 :
162 0 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
163 : {
164 0 : int err = errno;
165 0 : ::close(accepted);
166 0 : op.complete(err, 0);
167 0 : op.impl_ptr = shared_from_this();
168 0 : svc_.post(&op);
169 0 : return std::noop_coroutine();
170 : }
171 :
172 : #ifdef SO_NOSIGPIPE
173 : {
174 : int one = 1;
175 : ::setsockopt(
176 : accepted, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
177 : }
178 : #endif
179 :
180 : {
181 0 : std::lock_guard lock(desc_state_.mutex);
182 0 : desc_state_.read_ready = false;
183 0 : }
184 :
185 0 : if (svc_.scheduler().try_consume_inline_budget())
186 : {
187 0 : auto* socket_svc = svc_.stream_service();
188 0 : if (socket_svc)
189 : {
190 : auto& impl =
191 : static_cast<select_local_stream_socket&>(
192 0 : *socket_svc->construct());
193 0 : impl.set_socket(accepted);
194 :
195 0 : impl.desc_state_.fd = accepted;
196 : {
197 0 : std::lock_guard lock(impl.desc_state_.mutex);
198 0 : impl.desc_state_.read_op = nullptr;
199 0 : impl.desc_state_.write_op = nullptr;
200 0 : impl.desc_state_.connect_op = nullptr;
201 0 : }
202 0 : socket_svc->scheduler().register_descriptor(
203 : accepted, &impl.desc_state_);
204 :
205 0 : impl.set_endpoints(
206 : local_endpoint_,
207 0 : from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
208 :
209 0 : *ec = {};
210 0 : if (impl_out)
211 0 : *impl_out = &impl;
212 : }
213 : else
214 : {
215 0 : ::close(accepted);
216 0 : *ec = make_err(ENOENT);
217 0 : if (impl_out)
218 0 : *impl_out = nullptr;
219 : }
220 0 : op.cont_op.cont.h = h;
221 0 : return dispatch_coro(ex, op.cont_op.cont);
222 : }
223 :
224 0 : op.accepted_fd = accepted;
225 0 : op.peer_storage = peer_storage;
226 0 : op.peer_addrlen = addrlen;
227 0 : op.complete(0, 0);
228 0 : op.impl_ptr = shared_from_this();
229 0 : svc_.post(&op);
230 0 : return std::noop_coroutine();
231 : }
232 :
233 HIT 2 : if (errno == EAGAIN || errno == EWOULDBLOCK)
234 : {
235 2 : op.impl_ptr = shared_from_this();
236 2 : svc_.work_started();
237 :
238 2 : std::lock_guard lock(desc_state_.mutex);
239 2 : bool io_done = false;
240 2 : if (desc_state_.read_ready)
241 : {
242 MIS 0 : desc_state_.read_ready = false;
243 0 : op.perform_io();
244 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
245 0 : if (!io_done)
246 0 : op.errn = 0;
247 : }
248 :
249 HIT 2 : if (io_done || op.cancelled.load(std::memory_order_acquire))
250 : {
251 MIS 0 : svc_.post(&op);
252 0 : svc_.work_finished();
253 : }
254 : else
255 : {
256 HIT 2 : desc_state_.read_op = &op;
257 : }
258 2 : return std::noop_coroutine();
259 2 : }
260 :
261 MIS 0 : op.complete(errno, 0);
262 0 : op.impl_ptr = shared_from_this();
263 0 : svc_.post(&op);
264 0 : return std::noop_coroutine();
265 : }
266 :
267 : inline void
268 0 : select_local_stream_acceptor::cancel() noexcept
269 : {
270 0 : do_cancel();
271 0 : }
272 :
273 : inline void
274 HIT 24 : select_local_stream_acceptor::close_socket() noexcept
275 : {
276 24 : do_close_socket();
277 24 : }
278 :
279 : inline native_handle_type
280 MIS 0 : select_local_stream_acceptor::release_socket() noexcept
281 : {
282 0 : return this->do_release_socket();
283 : }
284 :
285 HIT 229 : inline select_local_stream_acceptor_service::
286 229 : select_local_stream_acceptor_service(capy::execution_context& ctx)
287 229 : : base_type(ctx)
288 : {
289 229 : auto* svc = ctx_.find_service<detail::local_stream_service>();
290 229 : stream_svc_ = svc
291 229 : ? dynamic_cast<select_local_stream_service*>(svc)
292 : : nullptr;
293 229 : assert(stream_svc_ &&
294 : "local_stream_service must be registered before acceptor service");
295 229 : }
296 :
297 458 : inline select_local_stream_acceptor_service::
298 458 : ~select_local_stream_acceptor_service() {}
299 :
300 : inline std::error_code
301 6 : select_local_stream_acceptor_service::open_acceptor_socket(
302 : local_stream_acceptor::implementation& impl,
303 : int family,
304 : int type,
305 : int protocol)
306 : {
307 6 : auto* select_impl = static_cast<select_local_stream_acceptor*>(&impl);
308 6 : select_impl->close_socket();
309 :
310 6 : int fd = ::socket(family, type, protocol);
311 6 : if (fd < 0)
312 MIS 0 : return make_err(errno);
313 :
314 HIT 6 : int flags = ::fcntl(fd, F_GETFL, 0);
315 6 : if (flags == -1)
316 : {
317 MIS 0 : int errn = errno;
318 0 : ::close(fd);
319 0 : return make_err(errn);
320 : }
321 HIT 6 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
322 : {
323 MIS 0 : int errn = errno;
324 0 : ::close(fd);
325 0 : return make_err(errn);
326 : }
327 HIT 6 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
328 : {
329 MIS 0 : int errn = errno;
330 0 : ::close(fd);
331 0 : return make_err(errn);
332 : }
333 :
334 HIT 6 : if (fd >= FD_SETSIZE)
335 : {
336 MIS 0 : ::close(fd);
337 0 : return make_err(EMFILE);
338 : }
339 :
340 HIT 6 : select_impl->fd_ = fd;
341 :
342 : // Set up descriptor state but do NOT register with select yet
343 : // (registration happens in do_listen via reactor_acceptor base)
344 6 : select_impl->desc_state_.fd = fd;
345 : {
346 6 : std::lock_guard lock(select_impl->desc_state_.mutex);
347 6 : select_impl->desc_state_.read_op = nullptr;
348 6 : }
349 :
350 6 : return {};
351 : }
352 :
353 : inline std::error_code
354 6 : select_local_stream_acceptor_service::bind_acceptor(
355 : local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
356 : {
357 6 : return static_cast<select_local_stream_acceptor*>(&impl)->do_bind(ep);
358 : }
359 :
360 : inline std::error_code
361 2 : select_local_stream_acceptor_service::listen_acceptor(
362 : local_stream_acceptor::implementation& impl, int backlog)
363 : {
364 2 : return static_cast<select_local_stream_acceptor*>(&impl)->do_listen(backlog);
365 : }
366 :
367 : } // namespace boost::corosio::detail
368 :
369 : #endif // BOOST_COROSIO_HAS_SELECT
370 :
371 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
|