TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_local_stream_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_local_stream_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26 :
27 : #include <cassert>
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <sys/socket.h>
34 : #include <sys/un.h>
35 : #include <unistd.h>
36 :
37 : namespace boost::corosio::detail {
38 :
39 : /* epoll local stream acceptor service implementation.
40 :
41 : Inherits from local_stream_acceptor_service to enable runtime
42 : polymorphism. Uses key_type = local_stream_acceptor_service
43 : for service lookup.
44 : */
45 : class BOOST_COROSIO_DECL epoll_local_stream_acceptor_service final
46 : : public reactor_acceptor_service<
47 : epoll_local_stream_acceptor_service,
48 : local_stream_acceptor_service,
49 : epoll_scheduler,
50 : epoll_local_stream_acceptor,
51 : epoll_local_stream_service>
52 : {
53 : using base_type = reactor_acceptor_service<
54 : epoll_local_stream_acceptor_service,
55 : local_stream_acceptor_service,
56 : epoll_scheduler,
57 : epoll_local_stream_acceptor,
58 : epoll_local_stream_service>;
59 : friend base_type;
60 :
61 : public:
62 : explicit epoll_local_stream_acceptor_service(capy::execution_context& ctx);
63 : ~epoll_local_stream_acceptor_service() override;
64 :
65 : std::error_code open_acceptor_socket(
66 : local_stream_acceptor::implementation& impl,
67 : int family,
68 : int type,
69 : int protocol) override;
70 : std::error_code bind_acceptor(
71 : local_stream_acceptor::implementation& impl,
72 : corosio::local_endpoint ep) override;
73 : std::error_code listen_acceptor(
74 : local_stream_acceptor::implementation& impl,
75 : int backlog) override;
76 : };
77 :
78 : inline void
79 MIS 0 : epoll_local_accept_op::cancel() noexcept
80 : {
81 0 : if (acceptor_impl_)
82 0 : acceptor_impl_->cancel_single_op(*this);
83 : else
84 0 : request_cancel();
85 0 : }
86 :
87 : inline void
88 HIT 2 : epoll_local_accept_op::operator()()
89 : {
90 2 : complete_accept_op<epoll_local_stream_socket>(*this);
91 2 : }
92 :
93 6 : inline epoll_local_stream_acceptor::epoll_local_stream_acceptor(
94 6 : epoll_local_stream_acceptor_service& svc) noexcept
95 6 : : reactor_acceptor(svc)
96 : {
97 6 : }
98 :
99 : inline std::coroutine_handle<>
100 2 : epoll_local_stream_acceptor::accept(
101 : std::coroutine_handle<> h,
102 : capy::executor_ref ex,
103 : std::stop_token token,
104 : std::error_code* ec,
105 : io_object::implementation** impl_out)
106 : {
107 2 : auto& op = acc_;
108 2 : op.reset();
109 2 : op.h = h;
110 2 : op.ex = ex;
111 2 : op.ec_out = ec;
112 2 : op.impl_out = impl_out;
113 2 : op.fd = fd_;
114 2 : op.start(token, this);
115 :
116 2 : sockaddr_storage peer_storage{};
117 : socklen_t addrlen;
118 : int accepted;
119 : do
120 : {
121 2 : addrlen = sizeof(peer_storage);
122 2 : accepted = ::accept4(
123 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
124 : SOCK_NONBLOCK | SOCK_CLOEXEC);
125 : }
126 2 : while (accepted < 0 && errno == EINTR);
127 :
128 2 : if (accepted >= 0)
129 : {
130 : {
131 MIS 0 : std::lock_guard lock(desc_state_.mutex);
132 0 : desc_state_.read_ready = false;
133 0 : }
134 :
135 0 : if (svc_.scheduler().try_consume_inline_budget())
136 : {
137 0 : auto* socket_svc = svc_.stream_service();
138 0 : if (socket_svc)
139 : {
140 : auto& impl =
141 : static_cast<epoll_local_stream_socket&>(
142 0 : *socket_svc->construct());
143 0 : impl.set_socket(accepted);
144 :
145 0 : impl.desc_state_.fd = accepted;
146 : {
147 0 : std::lock_guard lock(impl.desc_state_.mutex);
148 0 : impl.desc_state_.read_op = nullptr;
149 0 : impl.desc_state_.write_op = nullptr;
150 0 : impl.desc_state_.connect_op = nullptr;
151 0 : }
152 0 : socket_svc->scheduler().register_descriptor(
153 : accepted, &impl.desc_state_);
154 :
155 0 : impl.set_endpoints(
156 : local_endpoint_,
157 0 : from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
158 :
159 0 : if (ec)
160 0 : *ec = {};
161 0 : if (impl_out)
162 0 : *impl_out = &impl;
163 : }
164 : else
165 : {
166 0 : ::close(accepted);
167 0 : if (ec)
168 0 : *ec = make_err(ENOENT);
169 0 : if (impl_out)
170 0 : *impl_out = nullptr;
171 : }
172 0 : op.cont_op.cont.h = h;
173 0 : return dispatch_coro(ex, op.cont_op.cont);
174 : }
175 :
176 0 : op.accepted_fd = accepted;
177 0 : op.peer_storage = peer_storage;
178 0 : op.peer_addrlen = addrlen;
179 0 : op.complete(0, 0);
180 0 : op.impl_ptr = shared_from_this();
181 0 : svc_.post(&op);
182 0 : return std::noop_coroutine();
183 : }
184 :
185 HIT 2 : if (errno == EAGAIN || errno == EWOULDBLOCK)
186 : {
187 2 : op.impl_ptr = shared_from_this();
188 2 : svc_.work_started();
189 :
190 2 : std::lock_guard lock(desc_state_.mutex);
191 2 : bool io_done = false;
192 2 : if (desc_state_.read_ready)
193 : {
194 MIS 0 : desc_state_.read_ready = false;
195 0 : op.perform_io();
196 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
197 0 : if (!io_done)
198 0 : op.errn = 0;
199 : }
200 :
201 HIT 2 : if (io_done || op.cancelled.load(std::memory_order_acquire))
202 : {
203 MIS 0 : svc_.post(&op);
204 0 : svc_.work_finished();
205 : }
206 : else
207 : {
208 HIT 2 : desc_state_.read_op = &op;
209 : }
210 2 : return std::noop_coroutine();
211 2 : }
212 :
213 MIS 0 : op.complete(errno, 0);
214 0 : op.impl_ptr = shared_from_this();
215 0 : svc_.post(&op);
216 : // completion is always posted to scheduler queue, never inline.
217 0 : return std::noop_coroutine();
218 : }
219 :
220 : inline void
221 0 : epoll_local_stream_acceptor::cancel() noexcept
222 : {
223 0 : do_cancel();
224 0 : }
225 :
226 : inline void
227 HIT 24 : epoll_local_stream_acceptor::close_socket() noexcept
228 : {
229 24 : do_close_socket();
230 24 : }
231 :
232 : inline native_handle_type
233 MIS 0 : epoll_local_stream_acceptor::release_socket() noexcept
234 : {
235 0 : return this->do_release_socket();
236 : }
237 :
238 HIT 356 : inline epoll_local_stream_acceptor_service::
239 356 : epoll_local_stream_acceptor_service(capy::execution_context& ctx)
240 356 : : base_type(ctx)
241 : {
242 356 : auto* svc = ctx_.find_service<detail::local_stream_service>();
243 356 : stream_svc_ = svc
244 356 : ? dynamic_cast<epoll_local_stream_service*>(svc)
245 : : nullptr;
246 356 : assert(stream_svc_ &&
247 : "local_stream_service must be registered before acceptor service");
248 356 : }
249 :
250 712 : inline epoll_local_stream_acceptor_service::
251 712 : ~epoll_local_stream_acceptor_service() {}
252 :
253 : inline std::error_code
254 6 : epoll_local_stream_acceptor_service::open_acceptor_socket(
255 : local_stream_acceptor::implementation& impl,
256 : int family,
257 : int type,
258 : int protocol)
259 : {
260 6 : auto* epoll_impl = static_cast<epoll_local_stream_acceptor*>(&impl);
261 6 : epoll_impl->close_socket();
262 :
263 6 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
264 6 : if (fd < 0)
265 MIS 0 : return make_err(errno);
266 :
267 HIT 6 : epoll_impl->fd_ = fd;
268 :
269 : // Set up descriptor state but do NOT register with epoll yet
270 6 : epoll_impl->desc_state_.fd = fd;
271 : {
272 6 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
273 6 : epoll_impl->desc_state_.read_op = nullptr;
274 6 : }
275 :
276 6 : return {};
277 : }
278 :
279 : inline std::error_code
280 6 : epoll_local_stream_acceptor_service::bind_acceptor(
281 : local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
282 : {
283 6 : return static_cast<epoll_local_stream_acceptor*>(&impl)->do_bind(ep);
284 : }
285 :
286 : inline std::error_code
287 2 : epoll_local_stream_acceptor_service::listen_acceptor(
288 : local_stream_acceptor::implementation& impl, int backlog)
289 : {
290 2 : return static_cast<epoll_local_stream_acceptor*>(&impl)->do_listen(backlog);
291 : }
292 :
293 :
294 : } // namespace boost::corosio::detail
295 :
296 : #endif // BOOST_COROSIO_HAS_EPOLL
297 :
298 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
|