TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
11 : #define BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/except.hpp>
15 : #include <boost/corosio/io/io_object.hpp>
16 : #include <boost/capy/io_result.hpp>
17 : #include <boost/corosio/local_endpoint.hpp>
18 : #include <boost/corosio/local_stream.hpp>
19 : #include <boost/corosio/local_stream_socket.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/execution_context.hpp>
22 : #include <boost/capy/ex/io_env.hpp>
23 : #include <boost/capy/concept/executor.hpp>
24 :
25 : #include <system_error>
26 :
27 : #include <cassert>
28 : #include <concepts>
29 : #include <coroutine>
30 : #include <cstddef>
31 : #include <stop_token>
32 : #include <type_traits>
33 :
34 : namespace boost::corosio {
35 :
36 : /* An asynchronous Unix stream acceptor for coroutine I/O.
37 :
38 : This class provides asynchronous Unix domain stream accept
39 : operations that return awaitable types. The acceptor binds
40 : to a local endpoint (filesystem path) and listens for
41 : incoming connections.
42 :
43 : The library does NOT automatically unlink the socket path
44 : on close. Callers are responsible for removing the socket
45 : file before bind or after close.
46 :
47 : Thread Safety:
48 : Distinct objects: Safe.
49 : Shared objects: Unsafe. An acceptor must not have
50 : concurrent accept operations.
51 : */
52 : /** Options for local_stream_acceptor::bind(). */
53 : enum class bind_option
54 : {
55 : none,
56 : /// Unlink the socket path before binding (ignored for abstract paths).
57 : unlink_existing
58 : };
59 :
60 : class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
61 : {
62 : struct move_accept_awaitable
63 : {
64 : local_stream_acceptor& acc_;
65 : std::stop_token token_;
66 : mutable std::error_code ec_;
67 : mutable io_object::implementation* peer_impl_ = nullptr;
68 :
69 HIT 2 : explicit move_accept_awaitable(
70 : local_stream_acceptor& acc) noexcept
71 2 : : acc_(acc)
72 : {
73 2 : }
74 :
75 2 : bool await_ready() const noexcept
76 : {
77 2 : return token_.stop_requested();
78 : }
79 :
80 2 : capy::io_result<local_stream_socket> await_resume() const noexcept
81 : {
82 2 : if (token_.stop_requested())
83 MIS 0 : return {make_error_code(std::errc::operation_canceled),
84 0 : local_stream_socket()};
85 :
86 HIT 2 : if (ec_ || !peer_impl_)
87 MIS 0 : return {ec_, local_stream_socket()};
88 :
89 HIT 2 : local_stream_socket peer(acc_.ctx_);
90 2 : reset_peer_impl(peer, peer_impl_);
91 2 : return {ec_, std::move(peer)};
92 2 : }
93 :
94 2 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
95 : -> std::coroutine_handle<>
96 : {
97 2 : token_ = env->stop_token;
98 6 : return acc_.get().accept(
99 6 : h, env->executor, token_, &ec_, &peer_impl_);
100 : }
101 : };
102 :
103 : struct accept_awaitable
104 : {
105 : local_stream_acceptor& acc_;
106 : local_stream_socket& peer_;
107 : std::stop_token token_;
108 : mutable std::error_code ec_;
109 : mutable io_object::implementation* peer_impl_ = nullptr;
110 :
111 2 : accept_awaitable(
112 : local_stream_acceptor& acc, local_stream_socket& peer) noexcept
113 2 : : acc_(acc)
114 2 : , peer_(peer)
115 : {
116 2 : }
117 :
118 2 : bool await_ready() const noexcept
119 : {
120 2 : return token_.stop_requested();
121 : }
122 :
123 2 : capy::io_result<> await_resume() const noexcept
124 : {
125 2 : if (token_.stop_requested())
126 MIS 0 : return {make_error_code(std::errc::operation_canceled)};
127 :
128 HIT 2 : if (!ec_ && peer_impl_)
129 2 : peer_.h_.reset(peer_impl_);
130 2 : return {ec_};
131 : }
132 :
133 2 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
134 : -> std::coroutine_handle<>
135 : {
136 2 : token_ = env->stop_token;
137 6 : return acc_.get().accept(
138 6 : h, env->executor, token_, &ec_, &peer_impl_);
139 : }
140 : };
141 :
142 : public:
143 : ~local_stream_acceptor() override;
144 :
145 : explicit local_stream_acceptor(capy::execution_context& ctx);
146 :
147 : template<class Ex>
148 : requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_acceptor>) &&
149 : capy::Executor<Ex>
150 : explicit local_stream_acceptor(Ex const& ex) : local_stream_acceptor(ex.context())
151 : {
152 : }
153 :
154 : local_stream_acceptor(local_stream_acceptor&& other) noexcept
155 : : local_stream_acceptor(other.ctx_, std::move(other))
156 : {
157 : }
158 :
159 : local_stream_acceptor& operator=(local_stream_acceptor&& other) noexcept
160 : {
161 : assert(&ctx_ == &other.ctx_ &&
162 : "move-assign requires the same execution_context");
163 : if (this != &other)
164 : {
165 : close();
166 : io_object::operator=(std::move(other));
167 : }
168 : return *this;
169 : }
170 :
171 : local_stream_acceptor(local_stream_acceptor const&) = delete;
172 : local_stream_acceptor& operator=(local_stream_acceptor const&) = delete;
173 :
174 : /** Create the acceptor socket.
175 :
176 : @param proto The protocol. Defaults to local_stream{}.
177 :
178 : @throws std::system_error on failure.
179 : */
180 : void open(local_stream proto = {});
181 :
182 : /** Bind to a local endpoint.
183 :
184 : @param ep The local endpoint (path) to bind to.
185 : @param opt Bind options. Pass bind_option::unlink_existing
186 : to unlink the socket path before binding (ignored for
187 : abstract sockets and empty endpoints).
188 :
189 : @return An error code on failure, empty on success.
190 :
191 : @throws std::logic_error if the acceptor is not open.
192 : */
193 : [[nodiscard]] std::error_code
194 : bind(corosio::local_endpoint ep,
195 : bind_option opt = bind_option::none);
196 :
197 : /** Start listening for incoming connections.
198 :
199 : @param backlog The maximum pending connection queue length.
200 :
201 : @return An error code on failure, empty on success.
202 :
203 : @throws std::logic_error if the acceptor is not open.
204 : */
205 : [[nodiscard]] std::error_code listen(int backlog = 128);
206 :
207 : /// Close the acceptor.
208 : void close();
209 :
210 : /// Check if the acceptor has an open socket handle.
211 44 : bool is_open() const noexcept
212 : {
213 44 : return h_ && get().is_open();
214 : }
215 :
216 : /** Initiate an asynchronous accept operation.
217 :
218 : @param peer The socket to receive the accepted connection.
219 :
220 : @return An awaitable that completes with io_result<>.
221 :
222 : @throws std::logic_error if the acceptor is not listening.
223 : */
224 2 : auto accept(local_stream_socket& peer)
225 : {
226 2 : if (!is_open())
227 MIS 0 : detail::throw_logic_error("accept: acceptor not listening");
228 HIT 2 : return accept_awaitable(*this, peer);
229 : }
230 :
231 : /** Initiate an asynchronous accept, returning the socket.
232 :
233 : @return An awaitable that completes with
234 : io_result<local_stream_socket>.
235 :
236 : @throws std::logic_error if the acceptor is not listening.
237 : */
238 2 : auto accept()
239 : {
240 2 : if (!is_open())
241 MIS 0 : detail::throw_logic_error("accept: acceptor not listening");
242 HIT 2 : return move_accept_awaitable(*this);
243 : }
244 :
245 : /** Cancel pending asynchronous accept operations.
246 :
247 : Outstanding accept operations complete with
248 : @c capy::cond::canceled. Safe to call when no
249 : operations are pending (no-op).
250 : */
251 : void cancel();
252 :
253 : /** Release ownership of the native socket handle.
254 :
255 : Deregisters the acceptor from the reactor and cancels
256 : pending operations without closing the fd.
257 :
258 : @return The native handle, or -1 if not open.
259 :
260 : @throws std::logic_error if the acceptor is not open.
261 : */
262 : native_handle_type release();
263 :
264 : /** Return the local endpoint the acceptor is bound to.
265 :
266 : Returns a default-constructed (empty) endpoint if the
267 : acceptor is not open or not yet bound. Safe to call in
268 : any state.
269 : */
270 : corosio::local_endpoint local_endpoint() const noexcept;
271 :
272 : template<class Option>
273 : void set_option(Option const& opt)
274 : {
275 : if (!is_open())
276 : detail::throw_logic_error("set_option: acceptor not open");
277 : std::error_code ec = get().set_option(
278 : Option::level(), Option::name(), opt.data(), opt.size());
279 : if (ec)
280 : detail::throw_system_error(ec, "local_stream_acceptor::set_option");
281 : }
282 :
283 : template<class Option>
284 : Option get_option() const
285 : {
286 : if (!is_open())
287 : detail::throw_logic_error("get_option: acceptor not open");
288 : Option opt{};
289 : std::size_t sz = opt.size();
290 : std::error_code ec =
291 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
292 : if (ec)
293 : detail::throw_system_error(ec, "local_stream_acceptor::get_option");
294 : opt.resize(sz);
295 : return opt;
296 : }
297 :
298 : /** Define backend hooks for local stream acceptor operations. */
299 : struct implementation : io_object::implementation
300 : {
301 : virtual std::coroutine_handle<> accept(
302 : std::coroutine_handle<>,
303 : capy::executor_ref,
304 : std::stop_token,
305 : std::error_code*,
306 : io_object::implementation**) = 0;
307 :
308 : virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
309 :
310 : virtual bool is_open() const noexcept = 0;
311 :
312 : virtual native_handle_type release_socket() noexcept = 0;
313 :
314 : virtual void cancel() noexcept = 0;
315 :
316 : virtual std::error_code set_option(
317 : int level,
318 : int optname,
319 : void const* data,
320 : std::size_t size) noexcept = 0;
321 :
322 : virtual std::error_code
323 : get_option(int level, int optname, void* data, std::size_t* size)
324 : const noexcept = 0;
325 : };
326 :
327 : protected:
328 : local_stream_acceptor(handle h, capy::execution_context& ctx) noexcept
329 : : io_object(std::move(h))
330 : , ctx_(ctx)
331 : {
332 : }
333 :
334 : local_stream_acceptor(
335 : capy::execution_context& ctx, local_stream_acceptor&& other) noexcept
336 : : io_object(std::move(other))
337 : , ctx_(ctx)
338 : {
339 : }
340 :
341 2 : static void reset_peer_impl(
342 : local_stream_socket& peer, io_object::implementation* impl) noexcept
343 : {
344 2 : if (impl)
345 2 : peer.h_.reset(impl);
346 2 : }
347 :
348 : private:
349 : capy::execution_context& ctx_;
350 :
351 48 : inline implementation& get() const noexcept
352 : {
353 48 : return *static_cast<implementation*>(h_.get());
354 : }
355 : };
356 :
357 : } // namespace boost::corosio
358 :
359 : #endif // BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
|