TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/local_stream_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_local_stream_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_local_stream_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_acceptor_service.hpp>
25 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26 :
27 : #include <memory>
28 : #include <mutex>
29 : #include <utility>
30 :
31 : #include <errno.h>
32 : #include <sys/socket.h>
33 : #include <sys/un.h>
34 : #include <unistd.h>
35 :
36 : namespace boost::corosio::detail {
37 :
38 : /* epoll local stream acceptor service implementation.
39 :
40 : Inherits from local_stream_acceptor_service to enable runtime
41 : polymorphism. Uses key_type = local_stream_acceptor_service
42 : for service lookup.
43 : */
44 : class BOOST_COROSIO_DECL epoll_local_stream_acceptor_service final
45 : : public reactor_acceptor_service<
46 : epoll_local_stream_acceptor_service,
47 : local_stream_acceptor_service,
48 : epoll_scheduler,
49 : epoll_local_stream_acceptor,
50 : epoll_local_stream_service>
51 : {
52 : using base_type = reactor_acceptor_service<
53 : epoll_local_stream_acceptor_service,
54 : local_stream_acceptor_service,
55 : epoll_scheduler,
56 : epoll_local_stream_acceptor,
57 : epoll_local_stream_service>;
58 : friend base_type;
59 :
60 : public:
61 : explicit epoll_local_stream_acceptor_service(capy::execution_context& ctx);
62 : ~epoll_local_stream_acceptor_service() override;
63 :
64 : std::error_code open_acceptor_socket(
65 : local_stream_acceptor::implementation& impl,
66 : int family,
67 : int type,
68 : int protocol) override;
69 : std::error_code bind_acceptor(
70 : local_stream_acceptor::implementation& impl,
71 : corosio::local_endpoint ep) override;
72 : std::error_code listen_acceptor(
73 : local_stream_acceptor::implementation& impl,
74 : int backlog) override;
75 : };
76 :
77 : inline void
78 MIS 0 : epoll_local_accept_op::cancel() noexcept
79 : {
80 0 : if (acceptor_impl_)
81 0 : acceptor_impl_->cancel_single_op(*this);
82 : else
83 0 : request_cancel();
84 0 : }
85 :
86 : inline void
87 HIT 2 : epoll_local_accept_op::operator()()
88 : {
89 2 : complete_accept_op<epoll_local_stream_socket>(*this);
90 2 : }
91 :
92 6 : inline epoll_local_stream_acceptor::epoll_local_stream_acceptor(
93 6 : epoll_local_stream_acceptor_service& svc) noexcept
94 6 : : reactor_acceptor(svc)
95 : {
96 6 : }
97 :
98 : inline std::coroutine_handle<>
99 2 : epoll_local_stream_acceptor::accept(
100 : std::coroutine_handle<> h,
101 : capy::executor_ref ex,
102 : std::stop_token token,
103 : std::error_code* ec,
104 : io_object::implementation** impl_out)
105 : {
106 2 : auto& op = acc_;
107 2 : op.reset();
108 2 : op.h = h;
109 2 : op.ex = ex;
110 2 : op.ec_out = ec;
111 2 : op.impl_out = impl_out;
112 2 : op.fd = fd_;
113 2 : op.start(token, this);
114 :
115 2 : sockaddr_storage peer_storage{};
116 2 : socklen_t addrlen = sizeof(peer_storage);
117 : int accepted;
118 : do
119 : {
120 2 : accepted = ::accept4(
121 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
122 : SOCK_NONBLOCK | SOCK_CLOEXEC);
123 : }
124 2 : while (accepted < 0 && errno == EINTR);
125 :
126 2 : if (accepted >= 0)
127 : {
128 : {
129 MIS 0 : std::lock_guard lock(desc_state_.mutex);
130 0 : desc_state_.read_ready = false;
131 0 : }
132 :
133 0 : if (svc_.scheduler().try_consume_inline_budget())
134 : {
135 0 : auto* socket_svc = svc_.stream_service();
136 0 : if (socket_svc)
137 : {
138 : auto& impl =
139 : static_cast<epoll_local_stream_socket&>(
140 0 : *socket_svc->construct());
141 0 : impl.set_socket(accepted);
142 :
143 0 : impl.desc_state_.fd = accepted;
144 : {
145 0 : std::lock_guard lock(impl.desc_state_.mutex);
146 0 : impl.desc_state_.read_op = nullptr;
147 0 : impl.desc_state_.write_op = nullptr;
148 0 : impl.desc_state_.connect_op = nullptr;
149 0 : }
150 0 : socket_svc->scheduler().register_descriptor(
151 : accepted, &impl.desc_state_);
152 :
153 0 : impl.set_endpoints(
154 : local_endpoint_,
155 0 : from_sockaddr_as(peer_storage, addrlen, corosio::local_endpoint{}));
156 :
157 0 : if (ec)
158 0 : *ec = {};
159 0 : if (impl_out)
160 0 : *impl_out = &impl;
161 : }
162 : else
163 : {
164 0 : ::close(accepted);
165 0 : if (ec)
166 0 : *ec = make_err(ENOENT);
167 0 : if (impl_out)
168 0 : *impl_out = nullptr;
169 : }
170 0 : op.cont_op.cont.h = h;
171 0 : return dispatch_coro(ex, op.cont_op.cont);
172 : }
173 :
174 0 : op.accepted_fd = accepted;
175 0 : op.peer_storage = peer_storage;
176 0 : op.complete(0, 0);
177 0 : op.impl_ptr = shared_from_this();
178 0 : svc_.post(&op);
179 0 : return std::noop_coroutine();
180 : }
181 :
182 HIT 2 : if (errno == EAGAIN || errno == EWOULDBLOCK)
183 : {
184 2 : op.impl_ptr = shared_from_this();
185 2 : svc_.work_started();
186 :
187 2 : std::lock_guard lock(desc_state_.mutex);
188 2 : bool io_done = false;
189 2 : if (desc_state_.read_ready)
190 : {
191 MIS 0 : desc_state_.read_ready = false;
192 0 : op.perform_io();
193 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
194 0 : if (!io_done)
195 0 : op.errn = 0;
196 : }
197 :
198 HIT 2 : if (io_done || op.cancelled.load(std::memory_order_acquire))
199 : {
200 MIS 0 : svc_.post(&op);
201 0 : svc_.work_finished();
202 : }
203 : else
204 : {
205 HIT 2 : desc_state_.read_op = &op;
206 : }
207 2 : return std::noop_coroutine();
208 2 : }
209 :
210 MIS 0 : op.complete(errno, 0);
211 0 : op.impl_ptr = shared_from_this();
212 0 : svc_.post(&op);
213 : // completion is always posted to scheduler queue, never inline.
214 0 : return std::noop_coroutine();
215 : }
216 :
217 : inline void
218 0 : epoll_local_stream_acceptor::cancel() noexcept
219 : {
220 0 : do_cancel();
221 0 : }
222 :
223 : inline void
224 HIT 24 : epoll_local_stream_acceptor::close_socket() noexcept
225 : {
226 24 : do_close_socket();
227 24 : }
228 :
229 : inline native_handle_type
230 MIS 0 : epoll_local_stream_acceptor::release_socket() noexcept
231 : {
232 0 : return this->do_release_socket();
233 : }
234 :
235 HIT 355 : inline epoll_local_stream_acceptor_service::
236 355 : epoll_local_stream_acceptor_service(capy::execution_context& ctx)
237 355 : : base_type(ctx)
238 : {
239 355 : auto* svc = ctx_.find_service<detail::local_stream_service>();
240 355 : stream_svc_ = svc
241 355 : ? dynamic_cast<epoll_local_stream_service*>(svc)
242 : : nullptr;
243 355 : assert(stream_svc_ &&
244 : "local_stream_service must be registered before acceptor service");
245 355 : }
246 :
247 710 : inline epoll_local_stream_acceptor_service::
248 710 : ~epoll_local_stream_acceptor_service() {}
249 :
250 : inline std::error_code
251 6 : epoll_local_stream_acceptor_service::open_acceptor_socket(
252 : local_stream_acceptor::implementation& impl,
253 : int family,
254 : int type,
255 : int protocol)
256 : {
257 6 : auto* epoll_impl = static_cast<epoll_local_stream_acceptor*>(&impl);
258 6 : epoll_impl->close_socket();
259 :
260 6 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
261 6 : if (fd < 0)
262 MIS 0 : return make_err(errno);
263 :
264 HIT 6 : epoll_impl->fd_ = fd;
265 :
266 : // Set up descriptor state but do NOT register with epoll yet
267 6 : epoll_impl->desc_state_.fd = fd;
268 : {
269 6 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
270 6 : epoll_impl->desc_state_.read_op = nullptr;
271 6 : }
272 :
273 6 : return {};
274 : }
275 :
276 : inline std::error_code
277 6 : epoll_local_stream_acceptor_service::bind_acceptor(
278 : local_stream_acceptor::implementation& impl, corosio::local_endpoint ep)
279 : {
280 6 : return static_cast<epoll_local_stream_acceptor*>(&impl)->do_bind(ep);
281 : }
282 :
283 : inline std::error_code
284 2 : epoll_local_stream_acceptor_service::listen_acceptor(
285 : local_stream_acceptor::implementation& impl, int backlog)
286 : {
287 2 : return static_cast<epoll_local_stream_acceptor*>(&impl)->do_listen(backlog);
288 : }
289 :
290 :
291 : } // namespace boost::corosio::detail
292 :
293 : #endif // BOOST_COROSIO_HAS_EPOLL
294 :
295 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_LOCAL_STREAM_ACCEPTOR_SERVICE_HPP
|