1 +
//
 
2 +
// Copyright (c) 2026 Michael Vandeberg
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
 
11 +
#define BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/config.hpp>
 
14 +
#include <boost/corosio/detail/except.hpp>
 
15 +
#include <boost/corosio/io/io_object.hpp>
 
16 +
#include <boost/capy/io_result.hpp>
 
17 +
#include <boost/corosio/local_endpoint.hpp>
 
18 +
#include <boost/corosio/local_stream.hpp>
 
19 +
#include <boost/corosio/local_stream_socket.hpp>
 
20 +
#include <boost/capy/ex/executor_ref.hpp>
 
21 +
#include <boost/capy/ex/execution_context.hpp>
 
22 +
#include <boost/capy/ex/io_env.hpp>
 
23 +
#include <boost/capy/concept/executor.hpp>
 
24 +

 
25 +
#include <system_error>
 
26 +

 
27 +
#include <cassert>
 
28 +
#include <concepts>
 
29 +
#include <coroutine>
 
30 +
#include <cstddef>
 
31 +
#include <stop_token>
 
32 +
#include <type_traits>
 
33 +

 
34 +
namespace boost::corosio {
 
35 +

 
36 +
/** Options for @ref local_stream_acceptor::bind().
 
37 +

 
38 +
    Controls filesystem cleanup behavior before binding
 
39 +
    to a Unix domain socket path.
 
40 +
*/
 
41 +
enum class bind_option
 
42 +
{
 
43 +
    none,
 
44 +
    /// Unlink the socket path before binding (ignored for abstract paths).
 
45 +
    unlink_existing
 
46 +
};
 
47 +

 
48 +
/** An asynchronous Unix domain stream acceptor for coroutine I/O.
 
49 +

 
50 +
    This class provides asynchronous Unix domain stream accept
 
51 +
    operations that return awaitable types. The acceptor binds
 
52 +
    to a local endpoint (filesystem path or abstract name) and
 
53 +
    listens for incoming connections.
 
54 +

 
55 +
    The library does NOT automatically unlink the socket path
 
56 +
    on close. Callers are responsible for removing the socket
 
57 +
    file before bind (via @ref bind_option::unlink_existing) or
 
58 +
    after close.
 
59 +

 
60 +
    @par Thread Safety
 
61 +
    Distinct objects: Safe.@n
 
62 +
    Shared objects: Unsafe. An acceptor must not have concurrent
 
63 +
    accept operations.
 
64 +

 
65 +
    @par Example
 
66 +
    @code
 
67 +
    io_context ioc;
 
68 +
    local_stream_acceptor acc(ioc);
 
69 +
    acc.open();
 
70 +
    acc.bind(local_endpoint("/tmp/my.sock"),
 
71 +
             bind_option::unlink_existing);
 
72 +
    acc.listen();
 
73 +
    auto [ec, peer] = co_await acc.accept();
 
74 +
    @endcode
 
75 +
*/
 
76 +
class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
 
77 +
{
 
78 +
    struct move_accept_awaitable
 
79 +
    {
 
80 +
        local_stream_acceptor& acc_;
 
81 +
        std::stop_token token_;
 
82 +
        mutable std::error_code ec_;
 
83 +
        mutable io_object::implementation* peer_impl_ = nullptr;
 
84 +

 
85 +
        explicit move_accept_awaitable(
 
86 +
            local_stream_acceptor& acc) noexcept
 
87 +
            : acc_(acc)
 
88 +
        {
 
89 +
        }
 
90 +

 
91 +
        bool await_ready() const noexcept
 
92 +
        {
 
93 +
            return token_.stop_requested();
 
94 +
        }
 
95 +

 
96 +
        capy::io_result<local_stream_socket> await_resume() const noexcept
 
97 +
        {
 
98 +
            if (token_.stop_requested())
 
99 +
                return {make_error_code(std::errc::operation_canceled),
 
100 +
                        local_stream_socket()};
 
101 +

 
102 +
            if (ec_ || !peer_impl_)
 
103 +
                return {ec_, local_stream_socket()};
 
104 +

 
105 +
            local_stream_socket peer(acc_.ctx_);
 
106 +
            reset_peer_impl(peer, peer_impl_);
 
107 +
            return {ec_, std::move(peer)};
 
108 +
        }
 
109 +

 
110 +
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
 
111 +
            -> std::coroutine_handle<>
 
112 +
        {
 
113 +
            token_ = env->stop_token;
 
114 +
            return acc_.get().accept(
 
115 +
                h, env->executor, token_, &ec_, &peer_impl_);
 
116 +
        }
 
117 +
    };
 
118 +

 
119 +
    struct accept_awaitable
 
120 +
    {
 
121 +
        local_stream_acceptor& acc_;
 
122 +
        local_stream_socket& peer_;
 
123 +
        std::stop_token token_;
 
124 +
        mutable std::error_code ec_;
 
125 +
        mutable io_object::implementation* peer_impl_ = nullptr;
 
126 +

 
127 +
        accept_awaitable(
 
128 +
            local_stream_acceptor& acc, local_stream_socket& peer) noexcept
 
129 +
            : acc_(acc)
 
130 +
            , peer_(peer)
 
131 +
        {
 
132 +
        }
 
133 +

 
134 +
        bool await_ready() const noexcept
 
135 +
        {
 
136 +
            return token_.stop_requested();
 
137 +
        }
 
138 +

 
139 +
        capy::io_result<> await_resume() const noexcept
 
140 +
        {
 
141 +
            if (token_.stop_requested())
 
142 +
                return {make_error_code(std::errc::operation_canceled)};
 
143 +

 
144 +
            if (!ec_ && peer_impl_)
 
145 +
                peer_.h_.reset(peer_impl_);
 
146 +
            return {ec_};
 
147 +
        }
 
148 +

 
149 +
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
 
150 +
            -> std::coroutine_handle<>
 
151 +
        {
 
152 +
            token_ = env->stop_token;
 
153 +
            return acc_.get().accept(
 
154 +
                h, env->executor, token_, &ec_, &peer_impl_);
 
155 +
        }
 
156 +
    };
 
157 +

 
158 +
public:
 
159 +
    /** Destructor.
 
160 +

 
161 +
        Closes the acceptor if open, cancelling any pending operations.
 
162 +
    */
 
163 +
    ~local_stream_acceptor() override;
 
164 +

 
165 +
    /** Construct an acceptor from an execution context.
 
166 +

 
167 +
        @param ctx The execution context that will own this acceptor.
 
168 +
    */
 
169 +
    explicit local_stream_acceptor(capy::execution_context& ctx);
 
170 +

 
171 +
    /** Construct an acceptor from an executor.
 
172 +

 
173 +
        The acceptor is associated with the executor's context.
 
174 +

 
175 +
        @param ex The executor whose context will own the acceptor.
 
176 +

 
177 +
        @tparam Ex A type satisfying @ref capy::Executor. Must not
 
178 +
            be `local_stream_acceptor` itself (disables implicit
 
179 +
            conversion from move).
 
180 +
    */
 
181 +
    template<class Ex>
 
182 +
        requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_acceptor>) &&
 
183 +
        capy::Executor<Ex>
 
184 +
    explicit local_stream_acceptor(Ex const& ex) : local_stream_acceptor(ex.context())
 
185 +
    {
 
186 +
    }
 
187 +

 
188 +
    /** Move constructor.
 
189 +

 
190 +
        Transfers ownership of the acceptor resources.
 
191 +

 
192 +
        @param other The acceptor to move from.
 
193 +

 
194 +
        @pre No awaitables returned by @p other's methods exist.
 
195 +
        @pre The execution context associated with @p other must
 
196 +
            outlive this acceptor.
 
197 +
    */
 
198 +
    local_stream_acceptor(local_stream_acceptor&& other) noexcept
 
199 +
        : local_stream_acceptor(other.ctx_, std::move(other))
 
200 +
    {
 
201 +
    }
 
202 +

 
203 +
    /** Move assignment operator.
 
204 +

 
205 +
        Closes any existing acceptor and transfers ownership.
 
206 +
        Both acceptors must share the same execution context.
 
207 +

 
208 +
        @param other The acceptor to move from.
 
209 +

 
210 +
        @return Reference to this acceptor.
 
211 +

 
212 +
        @pre `&ctx_ == &other.ctx_` (same execution context).
 
213 +
        @pre No awaitables returned by either `*this` or @p other's
 
214 +
            methods exist.
 
215 +
    */
 
216 +
    local_stream_acceptor& operator=(local_stream_acceptor&& other) noexcept
 
217 +
    {
 
218 +
        assert(&ctx_ == &other.ctx_ &&
 
219 +
            "move-assign requires the same execution_context");
 
220 +
        if (this != &other)
 
221 +
        {
 
222 +
            close();
 
223 +
            io_object::operator=(std::move(other));
 
224 +
        }
 
225 +
        return *this;
 
226 +
    }
 
227 +

 
228 +
    local_stream_acceptor(local_stream_acceptor const&)            = delete;
 
229 +
    local_stream_acceptor& operator=(local_stream_acceptor const&) = delete;
 
230 +

 
231 +
    /** Create the acceptor socket.
 
232 +

 
233 +
        @param proto The protocol. Defaults to local_stream{}.
 
234 +

 
235 +
        @throws std::system_error on failure.
 
236 +
    */
 
237 +
    void open(local_stream proto = {});
 
238 +

 
239 +
    /** Bind to a local endpoint.
 
240 +

 
241 +
        @param ep The local endpoint (path) to bind to.
 
242 +
        @param opt Bind options. Pass bind_option::unlink_existing
 
243 +
            to unlink the socket path before binding (ignored for
 
244 +
            abstract sockets and empty endpoints).
 
245 +

 
246 +
        @return An error code on failure, empty on success.
 
247 +

 
248 +
        @throws std::logic_error if the acceptor is not open.
 
249 +
    */
 
250 +
    [[nodiscard]] std::error_code
 
251 +
    bind(corosio::local_endpoint ep,
 
252 +
         bind_option opt = bind_option::none);
 
253 +

 
254 +
    /** Start listening for incoming connections.
 
255 +

 
256 +
        @param backlog The maximum pending connection queue length.
 
257 +

 
258 +
        @return An error code on failure, empty on success.
 
259 +

 
260 +
        @throws std::logic_error if the acceptor is not open.
 
261 +
    */
 
262 +
    [[nodiscard]] std::error_code listen(int backlog = 128);
 
263 +

 
264 +
    /** Close the acceptor.
 
265 +

 
266 +
        Cancels any pending accept operations and releases the
 
267 +
        underlying socket. Has no effect if the acceptor is not
 
268 +
        open.
 
269 +

 
270 +
        @post is_open() == false
 
271 +
    */
 
272 +
    void close();
 
273 +

 
274 +
    /// Check if the acceptor has an open socket handle.
 
275 +
    bool is_open() const noexcept
 
276 +
    {
 
277 +
        return h_ && get().is_open();
 
278 +
    }
 
279 +

 
280 +
    /** Initiate an asynchronous accept into an existing socket.
 
281 +

 
282 +
        Completes when a new connection is available. On success
 
283 +
        @p peer is reset to the accepted connection. Only one
 
284 +
        accept may be in flight at a time.
 
285 +

 
286 +
        @param peer The socket to receive the accepted connection.
 
287 +

 
288 +
        @par Cancellation
 
289 +
        Supports cancellation via stop_token or cancel().
 
290 +
        On cancellation, yields `capy::cond::canceled` and
 
291 +
        @p peer is not modified.
 
292 +

 
293 +
        @return An awaitable that completes with io_result<>.
 
294 +

 
295 +
        @throws std::logic_error if the acceptor is not open.
 
296 +
    */
 
297 +
    auto accept(local_stream_socket& peer)
 
298 +
    {
 
299 +
        if (!is_open())
 
300 +
            detail::throw_logic_error("accept: acceptor not listening");
 
301 +
        return accept_awaitable(*this, peer);
 
302 +
    }
 
303 +

 
304 +
    /** Initiate an asynchronous accept, returning the socket.
 
305 +

 
306 +
        Completes when a new connection is available. Only one
 
307 +
        accept may be in flight at a time.
 
308 +

 
309 +
        @par Cancellation
 
310 +
        Supports cancellation via stop_token or cancel().
 
311 +
        On cancellation, yields `capy::cond::canceled` with
 
312 +
        a default-constructed socket.
 
313 +

 
314 +
        @return An awaitable that completes with
 
315 +
            io_result<local_stream_socket>.
 
316 +

 
317 +
        @throws std::logic_error if the acceptor is not open.
 
318 +
    */
 
319 +
    auto accept()
 
320 +
    {
 
321 +
        if (!is_open())
 
322 +
            detail::throw_logic_error("accept: acceptor not listening");
 
323 +
        return move_accept_awaitable(*this);
 
324 +
    }
 
325 +

 
326 +
    /** Cancel pending asynchronous accept operations.
 
327 +

 
328 +
        Outstanding accept operations complete with
 
329 +
        @c capy::cond::canceled. Safe to call when no
 
330 +
        operations are pending (no-op).
 
331 +
    */
 
332 +
    void cancel();
 
333 +

 
334 +
    /** Release ownership of the native socket handle.
 
335 +

 
336 +
        Deregisters the acceptor from the reactor and cancels
 
337 +
        pending operations without closing the descriptor. The
 
338 +
        caller takes ownership of the returned handle.
 
339 +

 
340 +
        @return The native handle.
 
341 +

 
342 +
        @throws std::logic_error if the acceptor is not open.
 
343 +

 
344 +
        @post is_open() == false
 
345 +
    */
 
346 +
    native_handle_type release();
 
347 +

 
348 +
    /** Return the local endpoint the acceptor is bound to.
 
349 +

 
350 +
        Returns a default-constructed (empty) endpoint if the
 
351 +
        acceptor is not open or not yet bound. Safe to call in
 
352 +
        any state.
 
353 +
    */
 
354 +
    corosio::local_endpoint local_endpoint() const noexcept;
 
355 +

 
356 +
    /** Set a socket option on the acceptor.
 
357 +

 
358 +
        Applies a type-safe socket option to the underlying socket.
 
359 +
        The option type encodes the protocol level and option name.
 
360 +

 
361 +
        @param opt The option to set.
 
362 +

 
363 +
        @tparam Option A socket option type providing static
 
364 +
            `level()` and `name()` members, and `data()` / `size()`
 
365 +
            accessors.
 
366 +

 
367 +
        @throws std::logic_error if the acceptor is not open.
 
368 +
        @throws std::system_error on failure.
 
369 +
    */
 
370 +
    template<class Option>
 
371 +
    void set_option(Option const& opt)
 
372 +
    {
 
373 +
        if (!is_open())
 
374 +
            detail::throw_logic_error("set_option: acceptor not open");
 
375 +
        std::error_code ec = get().set_option(
 
376 +
            Option::level(), Option::name(), opt.data(), opt.size());
 
377 +
        if (ec)
 
378 +
            detail::throw_system_error(ec, "local_stream_acceptor::set_option");
 
379 +
    }
 
380 +

 
381 +
    /** Get a socket option from the acceptor.
 
382 +

 
383 +
        Retrieves the current value of a type-safe socket option.
 
384 +

 
385 +
        @return The current option value.
 
386 +

 
387 +
        @tparam Option A socket option type providing static
 
388 +
            `level()` and `name()` members, and `data()` / `size()`
 
389 +
            / `resize()` members.
 
390 +

 
391 +
        @throws std::logic_error if the acceptor is not open.
 
392 +
        @throws std::system_error on failure.
 
393 +
    */
 
394 +
    template<class Option>
 
395 +
    Option get_option() const
 
396 +
    {
 
397 +
        if (!is_open())
 
398 +
            detail::throw_logic_error("get_option: acceptor not open");
 
399 +
        Option opt{};
 
400 +
        std::size_t sz = opt.size();
 
401 +
        std::error_code ec =
 
402 +
            get().get_option(Option::level(), Option::name(), opt.data(), &sz);
 
403 +
        if (ec)
 
404 +
            detail::throw_system_error(ec, "local_stream_acceptor::get_option");
 
405 +
        opt.resize(sz);
 
406 +
        return opt;
 
407 +
    }
 
408 +

 
409 +
    /** Backend hooks for local stream acceptor operations.
 
410 +

 
411 +
        Platform backends derive from this to implement
 
412 +
        accept, option, and lifecycle management.
 
413 +
    */
 
414 +
    struct implementation : io_object::implementation
 
415 +
    {
 
416 +
        /** Initiate an asynchronous accept.
 
417 +

 
418 +
            On completion the backend sets @p *ec and, on
 
419 +
            success, stores a pointer to the new socket
 
420 +
            implementation in @p *impl_out.
 
421 +

 
422 +
            @param h Coroutine handle to resume.
 
423 +
            @param ex Executor for dispatching the completion.
 
424 +
            @param token Stop token for cancellation.
 
425 +
            @param ec Output error code.
 
426 +
            @param impl_out Output pointer for the accepted socket.
 
427 +
            @return Coroutine handle to resume immediately.
 
428 +
        */
 
429 +
        virtual std::coroutine_handle<> accept(
 
430 +
            std::coroutine_handle<>,
 
431 +
            capy::executor_ref,
 
432 +
            std::stop_token,
 
433 +
            std::error_code*,
 
434 +
            io_object::implementation**) = 0;
 
435 +

 
436 +
        /// Return the cached local endpoint.
 
437 +
        virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
 
438 +

 
439 +
        /// Return whether the underlying socket is open.
 
440 +
        virtual bool is_open() const noexcept = 0;
 
441 +

 
442 +
        /// Release and return the native handle without closing.
 
443 +
        virtual native_handle_type release_socket() noexcept = 0;
 
444 +

 
445 +
        /// Cancel pending accept operations.
 
446 +
        virtual void cancel() noexcept = 0;
 
447 +

 
448 +
        /// Set a raw socket option.
 
449 +
        virtual std::error_code set_option(
 
450 +
            int level,
 
451 +
            int optname,
 
452 +
            void const* data,
 
453 +
            std::size_t size) noexcept = 0;
 
454 +

 
455 +
        /// Get a raw socket option.
 
456 +
        virtual std::error_code
 
457 +
        get_option(int level, int optname, void* data, std::size_t* size)
 
458 +
            const noexcept = 0;
 
459 +
    };
 
460 +

 
461 +
protected:
 
462 +
    local_stream_acceptor(handle h, capy::execution_context& ctx) noexcept
 
463 +
        : io_object(std::move(h))
 
464 +
        , ctx_(ctx)
 
465 +
    {
 
466 +
    }
 
467 +

 
468 +
    local_stream_acceptor(
 
469 +
        capy::execution_context& ctx, local_stream_acceptor&& other) noexcept
 
470 +
        : io_object(std::move(other))
 
471 +
        , ctx_(ctx)
 
472 +
    {
 
473 +
    }
 
474 +

 
475 +
    static void reset_peer_impl(
 
476 +
        local_stream_socket& peer, io_object::implementation* impl) noexcept
 
477 +
    {
 
478 +
        if (impl)
 
479 +
            peer.h_.reset(impl);
 
480 +
    }
 
481 +

 
482 +
private:
 
483 +
    capy::execution_context& ctx_;
 
484 +

 
485 +
    inline implementation& get() const noexcept
 
486 +
    {
 
487 +
        return *static_cast<implementation*>(h_.get());
 
488 +
    }
 
489 +
};
 
490 +

 
491 +
} // namespace boost::corosio
 
492 +

 
493 +
#endif // BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP