1 +
//
 
2 +
// Copyright (c) 2026 Michael Vandeberg
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
 
11 +
#define BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/config.hpp>
 
14 +
#include <boost/corosio/detail/platform.hpp>
 
15 +
#include <boost/corosio/detail/except.hpp>
 
16 +
#include <boost/corosio/detail/native_handle.hpp>
 
17 +
#include <boost/corosio/detail/op_base.hpp>
 
18 +
#include <boost/corosio/io/io_stream.hpp>
 
19 +
#include <boost/capy/io_result.hpp>
 
20 +
#include <boost/corosio/detail/buffer_param.hpp>
 
21 +
#include <boost/corosio/local_endpoint.hpp>
 
22 +
#include <boost/corosio/local_stream.hpp>
 
23 +
#include <boost/corosio/shutdown_type.hpp>
 
24 +
#include <boost/capy/ex/executor_ref.hpp>
 
25 +
#include <boost/capy/ex/execution_context.hpp>
 
26 +
#include <boost/capy/ex/io_env.hpp>
 
27 +
#include <boost/capy/concept/executor.hpp>
 
28 +

 
29 +
#include <system_error>
 
30 +

 
31 +
#include <concepts>
 
32 +
#include <coroutine>
 
33 +
#include <cstddef>
 
34 +
#include <stop_token>
 
35 +
#include <type_traits>
 
36 +

 
37 +
namespace boost::corosio {
 
38 +

 
39 +
/** An asynchronous Unix stream socket for coroutine I/O.
 
40 +

 
41 +
    This class provides asynchronous Unix domain stream socket
 
42 +
    operations that return awaitable types. Each operation
 
43 +
    participates in the affine awaitable protocol, ensuring
 
44 +
    coroutines resume on the correct executor.
 
45 +

 
46 +
    The socket must be opened before performing I/O operations.
 
47 +
    Operations support cancellation through `std::stop_token` via
 
48 +
    the affine protocol, or explicitly through the `cancel()`
 
49 +
    member function.
 
50 +

 
51 +
    @par Thread Safety
 
52 +
    Distinct objects: Safe.@n
 
53 +
    Shared objects: Unsafe. A socket must not have concurrent
 
54 +
    operations of the same type (e.g., two simultaneous reads).
 
55 +
    One read and one write may be in flight simultaneously.
 
56 +

 
57 +
    @par Semantics
 
58 +
    Wraps the platform Unix domain socket stack. Operations
 
59 +
    dispatch to OS socket APIs via the io_context backend
 
60 +
    (epoll, kqueue, select, or IOCP). Satisfies @ref capy::Stream.
 
61 +

 
62 +
    @par Example
 
63 +
    @code
 
64 +
    io_context ioc;
 
65 +
    local_stream_socket s(ioc);
 
66 +
    s.open();
 
67 +

 
68 +
    auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock"));
 
69 +
    if (ec)
 
70 +
        co_return;
 
71 +

 
72 +
    char buf[1024];
 
73 +
    auto [read_ec, n] = co_await s.read_some(
 
74 +
        capy::mutable_buffer(buf, sizeof(buf)));
 
75 +
    @endcode
 
76 +
*/
 
77 +
class BOOST_COROSIO_DECL local_stream_socket : public io_stream
 
78 +
{
 
79 +
public:
 
80 +
    using shutdown_type = corosio::shutdown_type;
 
81 +
    using enum corosio::shutdown_type;
 
82 +

 
83 +
    /** Define backend hooks for local stream socket operations.
 
84 +

 
85 +
        Platform backends (epoll, kqueue, select) derive from this
 
86 +
        to implement socket I/O, connection, and option management.
 
87 +
    */
 
88 +
    struct implementation : io_stream::implementation
 
89 +
    {
 
90 +
        /** Initiate an asynchronous connect to the given endpoint.
 
91 +

 
92 +
            @param h Coroutine handle to resume on completion.
 
93 +
            @param ex Executor for dispatching the completion.
 
94 +
            @param ep The local endpoint (path) to connect to.
 
95 +
            @param token Stop token for cancellation.
 
96 +
            @param ec Output error code.
 
97 +

 
98 +
            @return Coroutine handle to resume immediately.
 
99 +
        */
 
100 +
        virtual std::coroutine_handle<> connect(
 
101 +
            std::coroutine_handle<> h,
 
102 +
            capy::executor_ref ex,
 
103 +
            corosio::local_endpoint ep,
 
104 +
            std::stop_token token,
 
105 +
            std::error_code* ec) = 0;
 
106 +

 
107 +
        /** Shut down the socket for the given direction(s).
 
108 +

 
109 +
            @param what The shutdown direction.
 
110 +

 
111 +
            @return Error code on failure, empty on success.
 
112 +
        */
 
113 +
        virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
 
114 +

 
115 +
        /// Return the platform socket descriptor.
 
116 +
        virtual native_handle_type native_handle() const noexcept = 0;
 
117 +

 
118 +
        /** Release ownership of the native socket handle.
 
119 +

 
120 +
            Deregisters the socket from the reactor without closing
 
121 +
            the descriptor. The caller takes ownership.
 
122 +

 
123 +
            @return The native handle.
 
124 +
        */
 
125 +
        virtual native_handle_type release_socket() noexcept = 0;
 
126 +

 
127 +
        /** Request cancellation of pending asynchronous operations.
 
128 +

 
129 +
            All outstanding operations complete with operation_canceled error.
 
130 +
            Check `ec == cond::canceled` for portable comparison.
 
131 +
        */
 
132 +
        virtual void cancel() noexcept = 0;
 
133 +

 
134 +
        /** Set a socket option.
 
135 +

 
136 +
            @param level The protocol level (e.g. `SOL_SOCKET`).
 
137 +
            @param optname The option name (e.g. `SO_KEEPALIVE`).
 
138 +
            @param data Pointer to the option value.
 
139 +
            @param size Size of the option value in bytes.
 
140 +
            @return Error code on failure, empty on success.
 
141 +
        */
 
142 +
        virtual std::error_code set_option(
 
143 +
            int level,
 
144 +
            int optname,
 
145 +
            void const* data,
 
146 +
            std::size_t size) noexcept = 0;
 
147 +

 
148 +
        /** Get a socket option.
 
149 +

 
150 +
            @param level The protocol level (e.g. `SOL_SOCKET`).
 
151 +
            @param optname The option name (e.g. `SO_KEEPALIVE`).
 
152 +
            @param data Pointer to receive the option value.
 
153 +
            @param size On entry, the size of the buffer. On exit,
 
154 +
                the size of the option value.
 
155 +
            @return Error code on failure, empty on success.
 
156 +
        */
 
157 +
        virtual std::error_code
 
158 +
        get_option(int level, int optname, void* data, std::size_t* size)
 
159 +
            const noexcept = 0;
 
160 +

 
161 +
        /// Return the cached local endpoint.
 
162 +
        virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
 
163 +

 
164 +
        /// Return the cached remote endpoint.
 
165 +
        virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
 
166 +
    };
 
167 +

 
168 +
    /// Represent the awaitable returned by @ref connect.
 
169 +
    struct connect_awaitable
 
170 +
        : detail::void_op_base<connect_awaitable>
 
171 +
    {
 
172 +
        local_stream_socket& s_;
 
173 +
        corosio::local_endpoint endpoint_;
 
174 +

 
175 +
        connect_awaitable(
 
176 +
            local_stream_socket& s, corosio::local_endpoint ep) noexcept
 
177 +
            : s_(s), endpoint_(ep) {}
 
178 +

 
179 +
        std::coroutine_handle<> dispatch(
 
180 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
 
181 +
        {
 
182 +
            return s_.get().connect(h, ex, endpoint_, token_, &ec_);
 
183 +
        }
 
184 +
    };
 
185 +

 
186 +
public:
 
187 +
    /** Destructor.
 
188 +

 
189 +
        Closes the socket if open, cancelling any pending operations.
 
190 +
    */
 
191 +
    ~local_stream_socket() override;
 
192 +

 
193 +
    /** Construct a socket from an execution context.
 
194 +

 
195 +
        @param ctx The execution context that will own this socket.
 
196 +
    */
 
197 +
    explicit local_stream_socket(capy::execution_context& ctx);
 
198 +

 
199 +
    /** Construct a socket from an executor.
 
200 +

 
201 +
        The socket is associated with the executor's context.
 
202 +

 
203 +
        @param ex The executor whose context will own the socket.
 
204 +
    */
 
205 +
    template<class Ex>
 
206 +
        requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_socket>) &&
 
207 +
        capy::Executor<Ex>
 
208 +
    explicit local_stream_socket(Ex const& ex) : local_stream_socket(ex.context())
 
209 +
    {
 
210 +
    }
 
211 +

 
212 +
    /** Move constructor.
 
213 +

 
214 +
        Transfers ownership of the socket resources.
 
215 +

 
216 +
        @param other The socket to move from.
 
217 +

 
218 +
        @pre No awaitables returned by @p other's methods exist.
 
219 +
        @pre The execution context associated with @p other must
 
220 +
            outlive this socket.
 
221 +
    */
 
222 +
    local_stream_socket(local_stream_socket&& other) noexcept
 
223 +
        : io_object(std::move(other))
 
224 +
    {
 
225 +
    }
 
226 +

 
227 +
    /** Move assignment operator.
 
228 +

 
229 +
        Closes any existing socket and transfers ownership.
 
230 +

 
231 +
        @param other The socket to move from.
 
232 +

 
233 +
        @pre No awaitables returned by either `*this` or @p other's
 
234 +
            methods exist.
 
235 +
        @pre The execution context associated with @p other must
 
236 +
            outlive this socket.
 
237 +

 
238 +
        @return Reference to this socket.
 
239 +
    */
 
240 +
    local_stream_socket& operator=(local_stream_socket&& other) noexcept
 
241 +
    {
 
242 +
        if (this != &other)
 
243 +
        {
 
244 +
            close();
 
245 +
            io_object::operator=(std::move(other));
 
246 +
        }
 
247 +
        return *this;
 
248 +
    }
 
249 +

 
250 +
    local_stream_socket(local_stream_socket const&)            = delete;
 
251 +
    local_stream_socket& operator=(local_stream_socket const&) = delete;
 
252 +

 
253 +
    /** Open the socket.
 
254 +

 
255 +
        Creates a Unix stream socket and associates it with
 
256 +
        the platform reactor.
 
257 +

 
258 +
        @param proto The protocol. Defaults to local_stream{}.
 
259 +

 
260 +
        @throws std::system_error on failure.
 
261 +
    */
 
262 +
    void open(local_stream proto = {});
 
263 +

 
264 +
    /** Close the socket.
 
265 +

 
266 +
        Releases socket resources. Any pending operations complete
 
267 +
        with `errc::operation_canceled`.
 
268 +
    */
 
269 +
    void close();
 
270 +

 
271 +
    /** Check if the socket is open.
 
272 +

 
273 +
        @return `true` if the socket is open and ready for operations.
 
274 +
    */
 
275 +
    bool is_open() const noexcept
 
276 +
    {
 
277 +
#if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
 
278 +
        return h_ && get().native_handle() != ~native_handle_type(0);
 
279 +
#else
 
280 +
        return h_ && get().native_handle() >= 0;
 
281 +
#endif
 
282 +
    }
 
283 +

 
284 +
    /** Initiate an asynchronous connect operation.
 
285 +

 
286 +
        If the socket is not already open, it is opened automatically.
 
287 +

 
288 +
        @param ep The local endpoint (path) to connect to.
 
289 +

 
290 +
        @return An awaitable that completes with io_result<>.
 
291 +

 
292 +
        @throws std::system_error if the socket needs to be opened
 
293 +
            and the open fails.
 
294 +
    */
 
295 +
    auto connect(corosio::local_endpoint ep)
 
296 +
    {
 
297 +
        if (!is_open())
 
298 +
            open();
 
299 +
        return connect_awaitable(*this, ep);
 
300 +
    }
 
301 +

 
302 +
    /** Cancel any pending asynchronous operations.
 
303 +

 
304 +
        All outstanding operations complete with `errc::operation_canceled`.
 
305 +
        Check `ec == cond::canceled` for portable comparison.
 
306 +
    */
 
307 +
    void cancel();
 
308 +

 
309 +
    /** Get the native socket handle.
 
310 +

 
311 +
        Returns the underlying platform-specific socket descriptor.
 
312 +
        On POSIX systems this is an `int` file descriptor.
 
313 +

 
314 +
        @return The native socket handle, or an invalid sentinel
 
315 +
            if not open.
 
316 +
    */
 
317 +
    native_handle_type native_handle() const noexcept;
 
318 +

 
319 +
    /** Query the number of bytes available for reading.
 
320 +

 
321 +
        @return The number of bytes that can be read without blocking.
 
322 +

 
323 +
        @throws std::logic_error if the socket is not open.
 
324 +
        @throws std::system_error on ioctl failure.
 
325 +
    */
 
326 +
    std::size_t available() const;
 
327 +

 
328 +
    /** Release ownership of the native socket handle.
 
329 +

 
330 +
        Deregisters the socket from the backend and cancels pending
 
331 +
        operations without closing the descriptor. The caller takes
 
332 +
        ownership of the returned handle.
 
333 +

 
334 +
        @return The native handle.
 
335 +

 
336 +
        @throws std::logic_error if the socket is not open.
 
337 +

 
338 +
        @post is_open() == false
 
339 +
    */
 
340 +
    native_handle_type release();
 
341 +

 
342 +
    /** Disable sends or receives on the socket.
 
343 +

 
344 +
        Unix stream connections are full-duplex: each direction
 
345 +
        (send and receive) operates independently. This function
 
346 +
        allows you to close one or both directions without
 
347 +
        destroying the socket.
 
348 +

 
349 +
        @param what Determines what operations will no longer
 
350 +
            be allowed.
 
351 +

 
352 +
        @throws std::system_error on failure.
 
353 +
    */
 
354 +
    void shutdown(shutdown_type what);
 
355 +

 
356 +
    /** Shut down part or all of the socket (non-throwing).
 
357 +

 
358 +
        @param what Which direction to shut down.
 
359 +
        @param ec Set to the error code on failure.
 
360 +
    */
 
361 +
    void shutdown(shutdown_type what, std::error_code& ec) noexcept;
 
362 +

 
363 +
    /** Set a socket option.
 
364 +

 
365 +
        Applies a type-safe socket option to the underlying socket.
 
366 +
        The option type encodes the protocol level and option name.
 
367 +

 
368 +
        @param opt The option to set.
 
369 +

 
370 +
        @throws std::logic_error if the socket is not open.
 
371 +
        @throws std::system_error on failure.
 
372 +
    */
 
373 +
    template<class Option>
 
374 +
    void set_option(Option const& opt)
 
375 +
    {
 
376 +
        if (!is_open())
 
377 +
            detail::throw_logic_error("set_option: socket not open");
 
378 +
        std::error_code ec = get().set_option(
 
379 +
            Option::level(), Option::name(), opt.data(), opt.size());
 
380 +
        if (ec)
 
381 +
            detail::throw_system_error(ec, "local_stream_socket::set_option");
 
382 +
    }
 
383 +

 
384 +
    /** Get a socket option.
 
385 +

 
386 +
        Retrieves the current value of a type-safe socket option.
 
387 +

 
388 +
        @return The current option value.
 
389 +

 
390 +
        @throws std::logic_error if the socket is not open.
 
391 +
        @throws std::system_error on failure.
 
392 +
    */
 
393 +
    template<class Option>
 
394 +
    Option get_option() const
 
395 +
    {
 
396 +
        if (!is_open())
 
397 +
            detail::throw_logic_error("get_option: socket not open");
 
398 +
        Option opt{};
 
399 +
        std::size_t sz = opt.size();
 
400 +
        std::error_code ec =
 
401 +
            get().get_option(Option::level(), Option::name(), opt.data(), &sz);
 
402 +
        if (ec)
 
403 +
            detail::throw_system_error(ec, "local_stream_socket::get_option");
 
404 +
        opt.resize(sz);
 
405 +
        return opt;
 
406 +
    }
 
407 +

 
408 +
    /** Assign an existing file descriptor to this socket.
 
409 +

 
410 +
        The socket must not already be open. The fd is adopted
 
411 +
        and registered with the platform reactor. Used by
 
412 +
        make_local_stream_pair() to wrap socketpair() fds.
 
413 +

 
414 +
        @param fd The file descriptor to adopt. Must be a valid,
 
415 +
            open, non-blocking Unix stream socket.
 
416 +

 
417 +
        @throws std::system_error on failure.
 
418 +
    */
 
419 +
    void assign(native_handle_type fd);
 
420 +

 
421 +
    /** Get the local endpoint of the socket.
 
422 +

 
423 +
        Returns the local address (path) to which the socket is bound.
 
424 +
        The endpoint is cached when the connection is established.
 
425 +

 
426 +
        @return The local endpoint, or a default endpoint if the socket
 
427 +
            is not connected.
 
428 +
    */
 
429 +
    corosio::local_endpoint local_endpoint() const noexcept;
 
430 +

 
431 +
    /** Get the remote endpoint of the socket.
 
432 +

 
433 +
        Returns the remote address (path) to which the socket is connected.
 
434 +
        The endpoint is cached when the connection is established.
 
435 +

 
436 +
        @return The remote endpoint, or a default endpoint if the socket
 
437 +
            is not connected.
 
438 +
    */
 
439 +
    corosio::local_endpoint remote_endpoint() const noexcept;
 
440 +

 
441 +
protected:
 
442 +
    local_stream_socket() noexcept = default;
 
443 +

 
444 +
    explicit local_stream_socket(handle h) noexcept : io_object(std::move(h)) {}
 
445 +

 
446 +
private:
 
447 +
    friend class local_stream_acceptor;
 
448 +

 
449 +
    void open_for_family(int family, int type, int protocol);
 
450 +

 
451 +
    inline implementation& get() const noexcept
 
452 +
    {
 
453 +
        return *static_cast<implementation*>(h_.get());
 
454 +
    }
 
455 +
};
 
456 +

 
457 +
} // namespace boost::corosio
 
458 +

 
459 +
#endif // BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP