include/boost/corosio/local_stream_socket.hpp

87.5% Lines (14/16) 85.7% List of functions (6/7)
local_stream_socket.hpp
f(x) Functions (7)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
11 #define BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/platform.hpp>
15 #include <boost/corosio/detail/except.hpp>
16 #include <boost/corosio/detail/native_handle.hpp>
17 #include <boost/corosio/detail/op_base.hpp>
18 #include <boost/corosio/io/io_stream.hpp>
19 #include <boost/capy/io_result.hpp>
20 #include <boost/corosio/detail/buffer_param.hpp>
21 #include <boost/corosio/local_endpoint.hpp>
22 #include <boost/corosio/local_stream.hpp>
23 #include <boost/corosio/shutdown_type.hpp>
24 #include <boost/capy/ex/executor_ref.hpp>
25 #include <boost/capy/ex/execution_context.hpp>
26 #include <boost/capy/ex/io_env.hpp>
27 #include <boost/capy/concept/executor.hpp>
28
29 #include <system_error>
30
31 #include <concepts>
32 #include <coroutine>
33 #include <cstddef>
34 #include <stop_token>
35 #include <type_traits>
36
37 namespace boost::corosio {
38
39 /* An asynchronous Unix stream socket for coroutine I/O.
40
41 This class provides asynchronous Unix domain stream socket
42 operations that return awaitable types. Each operation
43 participates in the affine awaitable protocol, ensuring
44 coroutines resume on the correct executor.
45
46 The socket must be opened before performing I/O operations.
47 Operations support cancellation through std::stop_token via
48 the affine protocol, or explicitly through cancel().
49
50 Thread Safety:
51 Distinct objects: Safe.
52 Shared objects: Unsafe. A socket must not have concurrent
53 operations of the same type. One read and one write may
54 be in flight simultaneously.
55
56 Satisfies capy::Stream.
57 */
58 class BOOST_COROSIO_DECL local_stream_socket : public io_stream
59 {
60 public:
61 using shutdown_type = corosio::shutdown_type;
62 using enum corosio::shutdown_type;
63
64 /** Define backend hooks for local stream socket operations.
65
66 Platform backends (epoll, kqueue, select) derive from this
67 to implement socket I/O, connection, and option management.
68 */
69 struct implementation : io_stream::implementation
70 {
71 virtual std::coroutine_handle<> connect(
72 std::coroutine_handle<> h,
73 capy::executor_ref ex,
74 corosio::local_endpoint ep,
75 std::stop_token token,
76 std::error_code* ec) = 0;
77
78 virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
79
80 virtual native_handle_type native_handle() const noexcept = 0;
81
82 virtual native_handle_type release_socket() noexcept = 0;
83
84 virtual void cancel() noexcept = 0;
85
86 virtual std::error_code set_option(
87 int level,
88 int optname,
89 void const* data,
90 std::size_t size) noexcept = 0;
91
92 virtual std::error_code
93 get_option(int level, int optname, void* data, std::size_t* size)
94 const noexcept = 0;
95
96 virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
97
98 virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
99 };
100
101 /// Represent the awaitable returned by connect.
102 struct connect_awaitable
103 : detail::void_op_base<connect_awaitable>
104 {
105 local_stream_socket& s_;
106 corosio::local_endpoint endpoint_;
107
108 4x connect_awaitable(
109 local_stream_socket& s, corosio::local_endpoint ep) noexcept
110 4x : s_(s), endpoint_(ep) {}
111
112 4x std::coroutine_handle<> dispatch(
113 std::coroutine_handle<> h, capy::executor_ref ex) const
114 {
115 4x return s_.get().connect(h, ex, endpoint_, token_, &ec_);
116 }
117 };
118
119 public:
120 ~local_stream_socket() override;
121
122 explicit local_stream_socket(capy::execution_context& ctx);
123
124 template<class Ex>
125 requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_socket>) &&
126 capy::Executor<Ex>
127 explicit local_stream_socket(Ex const& ex) : local_stream_socket(ex.context())
128 {
129 }
130
131 22x local_stream_socket(local_stream_socket&& other) noexcept
132 22x : io_object(std::move(other))
133 {
134 22x }
135
136 local_stream_socket& operator=(local_stream_socket&& other) noexcept
137 {
138 if (this != &other)
139 {
140 close();
141 io_object::operator=(std::move(other));
142 }
143 return *this;
144 }
145
146 local_stream_socket(local_stream_socket const&) = delete;
147 local_stream_socket& operator=(local_stream_socket const&) = delete;
148
149 /** Open the socket.
150
151 Creates a Unix stream socket and associates it with
152 the platform reactor.
153
154 @param proto The protocol. Defaults to local_stream{}.
155
156 @throws std::system_error on failure.
157 */
158 void open(local_stream proto = {});
159
160 /// Close the socket.
161 void close();
162
163 /// Check if the socket is open.
164 118x bool is_open() const noexcept
165 {
166 #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
167 return h_ && get().native_handle() != ~native_handle_type(0);
168 #else
169 118x return h_ && get().native_handle() >= 0;
170 #endif
171 }
172
173 /** Initiate an asynchronous connect operation.
174
175 If the socket is not already open, it is opened automatically.
176
177 @param ep The local endpoint (path) to connect to.
178
179 @return An awaitable that completes with io_result<>.
180
181 @throws std::system_error if the socket needs to be opened
182 and the open fails.
183 */
184 4x auto connect(corosio::local_endpoint ep)
185 {
186 4x if (!is_open())
187 open();
188 4x return connect_awaitable(*this, ep);
189 }
190
191 void cancel();
192
193 native_handle_type native_handle() const noexcept;
194
195 /** Query the number of bytes available for reading.
196
197 @return The number of bytes that can be read without blocking.
198
199 @throws std::logic_error if the socket is not open.
200 @throws std::system_error on ioctl failure.
201 */
202 std::size_t available() const;
203
204 /** Release ownership of the native socket handle.
205
206 Deregisters the socket from the reactor and cancels pending
207 operations without closing the fd. The caller takes ownership
208 of the returned descriptor.
209
210 @return The native handle, or -1 if not open.
211
212 @throws std::logic_error if the socket is not open.
213 */
214 native_handle_type release();
215
216 void shutdown(shutdown_type what);
217
218 /** Shut down part or all of the socket (non-throwing).
219
220 @param what Which direction to shut down.
221 @param ec Set to the error code on failure.
222 */
223 void shutdown(shutdown_type what, std::error_code& ec) noexcept;
224
225 template<class Option>
226 void set_option(Option const& opt)
227 {
228 if (!is_open())
229 detail::throw_logic_error("set_option: socket not open");
230 std::error_code ec = get().set_option(
231 Option::level(), Option::name(), opt.data(), opt.size());
232 if (ec)
233 detail::throw_system_error(ec, "local_stream_socket::set_option");
234 }
235
236 template<class Option>
237 Option get_option() const
238 {
239 if (!is_open())
240 detail::throw_logic_error("get_option: socket not open");
241 Option opt{};
242 std::size_t sz = opt.size();
243 std::error_code ec =
244 get().get_option(Option::level(), Option::name(), opt.data(), &sz);
245 if (ec)
246 detail::throw_system_error(ec, "local_stream_socket::get_option");
247 opt.resize(sz);
248 return opt;
249 }
250
251 /** Assign an existing file descriptor to this socket.
252
253 The socket must not already be open. The fd is adopted
254 and registered with the platform reactor. Used by
255 make_local_stream_pair() to wrap socketpair() fds.
256
257 @param fd The file descriptor to adopt. Must be a valid,
258 open, non-blocking Unix stream socket.
259
260 @throws std::system_error on failure.
261 */
262 void assign(int fd);
263
264 corosio::local_endpoint local_endpoint() const noexcept;
265
266 corosio::local_endpoint remote_endpoint() const noexcept;
267
268 protected:
269 local_stream_socket() noexcept = default;
270
271 explicit local_stream_socket(handle h) noexcept : io_object(std::move(h)) {}
272
273 private:
274 friend class local_stream_acceptor;
275
276 void open_for_family(int family, int type, int protocol);
277
278 104x inline implementation& get() const noexcept
279 {
280 104x return *static_cast<implementation*>(h_.get());
281 }
282 };
283
284 } // namespace boost::corosio
285
286 #endif // BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
287