TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_local_stream_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_local_stream_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26 :
27 : #include <memory>
28 : #include <mutex>
29 : #include <utility>
30 :
31 : #include <errno.h>
32 : #include <fcntl.h>
33 : #include <sys/select.h>
34 : #include <sys/socket.h>
35 : #include <sys/un.h>
36 : #include <unistd.h>
37 :
38 : namespace boost::corosio::detail {
39 :
40 : /* select local stream acceptor service implementation.
41 :
42 : Inherits from local_stream_acceptor_service to enable runtime
43 : polymorphism. Uses key_type = local_stream_acceptor_service
44 : for service lookup.
45 : */
46 : class BOOST_COROSIO_DECL select_local_stream_acceptor_service final
47 : : public reactor_acceptor_service<
48 : select_local_stream_acceptor_service,
49 : local_stream_acceptor_service,
50 : select_scheduler,
51 : select_local_stream_acceptor,
52 : select_local_stream_service>
53 : {
54 : using base_type = reactor_acceptor_service<
55 : select_local_stream_acceptor_service,
56 : local_stream_acceptor_service,
57 : select_scheduler,
58 : select_local_stream_acceptor,
59 : select_local_stream_service>;
60 : friend base_type;
61 :
62 : public:
63 : explicit select_local_stream_acceptor_service(
64 : capy::execution_context& ctx);
65 : ~select_local_stream_acceptor_service() override;
66 :
67 : std::error_code open_acceptor_socket(
68 : local_stream_acceptor::implementation& impl,
69 : int family,
70 : int type,
71 : int protocol) override;
72 : std::error_code bind_acceptor(
73 : local_stream_acceptor::implementation& impl,
74 : corosio::local_endpoint ep) override;
75 : std::error_code listen_acceptor(
76 : local_stream_acceptor::implementation& impl,
77 : int backlog) override;
78 : };
79 :
80 : inline void
81 MIS 0 : select_local_accept_op::cancel() noexcept
82 : {
83 0 : if (acceptor_impl_)
84 0 : acceptor_impl_->cancel_single_op(*this);
85 : else
86 0 : request_cancel();
87 0 : }
88 :
89 : inline void
90 HIT 2 : select_local_accept_op::operator()()
91 : {
92 2 : complete_accept_op<select_local_stream_socket>(*this);
93 2 : }
94 :
95 6 : inline select_local_stream_acceptor::select_local_stream_acceptor(
96 6 : select_local_stream_acceptor_service& svc) noexcept
97 6 : : reactor_acceptor(svc)
98 : {
99 6 : }
100 :
101 : inline std::coroutine_handle<>
102 2 : select_local_stream_acceptor::accept(
103 : std::coroutine_handle<> h,
104 : capy::executor_ref ex,
105 : std::stop_token token,
106 : std::error_code* ec,
107 : io_object::implementation** impl_out)
108 : {
109 2 : auto& op = acc_;
110 2 : op.reset();
111 2 : op.h = h;
112 2 : op.ex = ex;
113 2 : op.ec_out = ec;
114 2 : op.impl_out = impl_out;
115 2 : op.fd = fd_;
116 2 : op.start(token, this);
117 :
118 2 : sockaddr_storage peer_storage{};
119 2 : socklen_t addrlen = sizeof(peer_storage);
120 : int accepted;
121 : do
122 : {
123 : accepted =
124 2 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
125 : }
126 2 : while (accepted < 0 && errno == EINTR);
127 :
128 2 : if (accepted >= 0)
129 : {
130 MIS 0 : if (accepted >= FD_SETSIZE)
131 : {
132 0 : ::close(accepted);
133 0 : op.complete(EINVAL, 0);
134 0 : op.impl_ptr = shared_from_this();
135 0 : svc_.post(&op);
136 0 : return std::noop_coroutine();
137 : }
138 :
139 0 : int flags = ::fcntl(accepted, F_GETFL, 0);
140 0 : if (flags == -1)
141 : {
142 0 : int err = errno;
143 0 : ::close(accepted);
144 0 : op.complete(err, 0);
145 0 : op.impl_ptr = shared_from_this();
146 0 : svc_.post(&op);
147 0 : return std::noop_coroutine();
148 : }
149 :
150 0 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
151 : {
152 0 : int err = errno;
153 0 : ::close(accepted);
154 0 : op.complete(err, 0);
155 0 : op.impl_ptr = shared_from_this();
156 0 : svc_.post(&op);
157 0 : return std::noop_coroutine();
158 : }
159 :
160 0 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
161 : {
162 0 : int err = errno;
163 0 : ::close(accepted);
164 0 : op.complete(err, 0);
165 0 : op.impl_ptr = shared_from_this();
166 0 : svc_.post(&op);
167 0 : return std::noop_coroutine();
168 : }
169 :
170 : {
171 0 : std::lock_guard lock(desc_state_.mutex);
172 0 : desc_state_.read_ready = false;
173 0 : }
174 :
175 0 : if (svc_.scheduler().try_consume_inline_budget())
176 : {
177 0 : auto* socket_svc = svc_.stream_service();
178 0 : if (socket_svc)
179 : {
180 : auto& impl =
181 : static_cast<select_local_stream_socket&>(
182 0 : *socket_svc->construct());
183 0 : impl.set_socket(accepted);
184 :
185 0 : impl.desc_state_.fd = accepted;
186 : {
187 0 : std::lock_guard lock(impl.desc_state_.mutex);
188 0 : impl.desc_state_.read_op = nullptr;
189 0 : impl.desc_state_.write_op = nullptr;
190 0 : impl.desc_state_.connect_op = nullptr;
191 0 : }
192 0 : socket_svc->scheduler().register_descriptor(
193 : accepted, &impl.desc_state_);
194 :
195 0 : impl.set_endpoints(
196 : local_endpoint_,
197 0 : from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
198 :
199 0 : *ec = {};
200 0 : if (impl_out)
201 0 : *impl_out = &impl;
202 : }
203 : else
204 : {
205 0 : ::close(accepted);
206 0 : *ec = make_err(ENOENT);
207 0 : if (impl_out)
208 0 : *impl_out = nullptr;
209 : }
210 0 : op.cont_op.cont.h = h;
211 0 : return dispatch_coro(ex, op.cont_op.cont);
212 : }
213 :
214 0 : op.accepted_fd = accepted;
215 0 : op.peer_storage = peer_storage;
216 0 : op.complete(0, 0);
217 0 : op.impl_ptr = shared_from_this();
218 0 : svc_.post(&op);
219 0 : return std::noop_coroutine();
220 : }
221 :
222 HIT 2 : if (errno == EAGAIN || errno == EWOULDBLOCK)
223 : {
224 2 : op.impl_ptr = shared_from_this();
225 2 : svc_.work_started();
226 :
227 2 : std::lock_guard lock(desc_state_.mutex);
228 2 : bool io_done = false;
229 2 : if (desc_state_.read_ready)
230 : {
231 MIS 0 : desc_state_.read_ready = false;
232 0 : op.perform_io();
233 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
234 0 : if (!io_done)
235 0 : op.errn = 0;
236 : }
237 :
238 HIT 2 : if (io_done || op.cancelled.load(std::memory_order_acquire))
239 : {
240 MIS 0 : svc_.post(&op);
241 0 : svc_.work_finished();
242 : }
243 : else
244 : {
245 HIT 2 : desc_state_.read_op = &op;
246 : }
247 2 : return std::noop_coroutine();
248 2 : }
249 :
250 MIS 0 : op.complete(errno, 0);
251 0 : op.impl_ptr = shared_from_this();
252 0 : svc_.post(&op);
253 0 : return std::noop_coroutine();
254 : }
255 :
256 : inline void
257 0 : select_local_stream_acceptor::cancel() noexcept
258 : {
259 0 : do_cancel();
260 0 : }
261 :
262 : inline void
263 HIT 24 : select_local_stream_acceptor::close_socket() noexcept
264 : {
265 24 : do_close_socket();
266 24 : }
267 :
268 : inline native_handle_type
269 MIS 0 : select_local_stream_acceptor::release_socket() noexcept
270 : {
271 0 : return this->do_release_socket();
272 : }
273 :
274 HIT 228 : inline select_local_stream_acceptor_service::
275 228 : select_local_stream_acceptor_service(capy::execution_context& ctx)
276 228 : : base_type(ctx)
277 : {
278 228 : auto* svc = ctx_.find_service<detail::local_stream_service>();
279 228 : stream_svc_ = svc
280 228 : ? dynamic_cast<select_local_stream_service*>(svc)
281 : : nullptr;
282 228 : assert(stream_svc_ &&
283 : "local_stream_service must be registered before acceptor service");
284 228 : }
285 :
286 456 : inline select_local_stream_acceptor_service::
287 456 : ~select_local_stream_acceptor_service() {}
288 :
289 : inline std::error_code
290 6 : select_local_stream_acceptor_service::open_acceptor_socket(
291 : local_stream_acceptor::implementation& impl,
292 : int family,
293 : int type,
294 : int protocol)
295 : {
296 6 : auto* select_impl = static_cast<select_local_stream_acceptor*>(&impl);
297 6 : select_impl->close_socket();
298 :
299 6 : int fd = ::socket(family, type, protocol);
300 6 : if (fd < 0)
301 MIS 0 : return make_err(errno);
302 :
303 HIT 6 : int flags = ::fcntl(fd, F_GETFL, 0);
304 6 : if (flags == -1)
305 : {
306 MIS 0 : int errn = errno;
307 0 : ::close(fd);
308 0 : return make_err(errn);
309 : }
310 HIT 6 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
311 : {
312 MIS 0 : int errn = errno;
313 0 : ::close(fd);
314 0 : return make_err(errn);
315 : }
316 HIT 6 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
317 : {
318 MIS 0 : int errn = errno;
319 0 : ::close(fd);
320 0 : return make_err(errn);
321 : }
322 :
323 HIT 6 : if (fd >= FD_SETSIZE)
324 : {
325 MIS 0 : ::close(fd);
326 0 : return make_err(EMFILE);
327 : }
328 :
329 HIT 6 : select_impl->fd_ = fd;
330 :
331 : // Set up descriptor state but do NOT register with select yet
332 : // (registration happens in do_listen via reactor_acceptor base)
333 6 : select_impl->desc_state_.fd = fd;
334 : {
335 6 : std::lock_guard lock(select_impl->desc_state_.mutex);
336 6 : select_impl->desc_state_.read_op = nullptr;
337 6 : }
338 :
339 6 : return {};
340 : }
341 :
342 : inline std::error_code
343 6 : select_local_stream_acceptor_service::bind_acceptor(
344 : local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
345 : {
346 6 : return static_cast<select_local_stream_acceptor*>(&impl)->do_bind(ep);
347 : }
348 :
349 : inline std::error_code
350 2 : select_local_stream_acceptor_service::listen_acceptor(
351 : local_stream_acceptor::implementation& impl, int backlog)
352 : {
353 2 : return static_cast<select_local_stream_acceptor*>(&impl)->do_listen(backlog);
354 : }
355 :
356 : } // namespace boost::corosio::detail
357 :
358 : #endif // BOOST_COROSIO_HAS_SELECT
359 :
360 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
|