TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
15 : #include <boost/corosio/detail/dispatch_coro.hpp>
16 : #include <boost/capy/buffers.hpp>
17 :
18 : #include <coroutine>
19 :
20 : #include <errno.h>
21 : #include <sys/socket.h>
22 : #include <sys/uio.h>
23 :
24 : namespace boost::corosio::detail {
25 :
26 : /** CRTP base for reactor-backed stream socket implementations.
27 :
28 : Inherits shared data members and cancel/close/register logic
29 : from reactor_basic_socket. Adds the stream-specific remote
30 : endpoint, shutdown, and I/O dispatch (connect, read, write).
31 :
32 : @tparam Derived The concrete socket type (CRTP).
33 : @tparam Service The backend's socket service type.
34 : @tparam ConnOp The backend's connect op type.
35 : @tparam ReadOp The backend's read op type.
36 : @tparam WriteOp The backend's write op type.
37 : @tparam DescState The backend's descriptor_state type.
38 : @tparam ImplBase The public vtable base
39 : (tcp_socket::implementation or
40 : local_stream_socket::implementation).
41 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
42 : */
43 : template<
44 : class Derived,
45 : class Service,
46 : class ConnOp,
47 : class ReadOp,
48 : class WriteOp,
49 : class DescState,
50 : class ImplBase = tcp_socket::implementation,
51 : class Endpoint = endpoint>
52 : class reactor_stream_socket
53 : : public reactor_basic_socket<
54 : Derived,
55 : ImplBase,
56 : Service,
57 : DescState,
58 : Endpoint>
59 : {
60 : using base_type = reactor_basic_socket<
61 : Derived,
62 : ImplBase,
63 : Service,
64 : DescState,
65 : Endpoint>;
66 : friend base_type;
67 : friend Derived;
68 :
69 HIT 24466 : explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
70 :
71 : protected:
72 : Endpoint remote_endpoint_;
73 :
74 : public:
75 : /// Pending connect operation slot.
76 : ConnOp conn_;
77 :
78 : /// Pending read operation slot.
79 : ReadOp rd_;
80 :
81 : /// Pending write operation slot.
82 : WriteOp wr_;
83 :
84 24466 : ~reactor_stream_socket() override = default;
85 :
86 : /// Return the cached remote endpoint.
87 44 : Endpoint remote_endpoint() const noexcept override
88 : {
89 44 : return remote_endpoint_;
90 : }
91 :
92 : /** Shut down part or all of the full-duplex connection.
93 :
94 : Not an override — concrete backend classes forward their
95 : ImplBase-typed shutdown() here.
96 :
97 : @param what 0 = receive, 1 = send, 2 = both.
98 : */
99 6 : std::error_code do_shutdown(int what) noexcept
100 : {
101 : int how;
102 6 : switch (what)
103 : {
104 2 : case 0: // shutdown_receive
105 2 : how = SHUT_RD;
106 2 : break;
107 2 : case 1: // shutdown_send
108 2 : how = SHUT_WR;
109 2 : break;
110 2 : case 2: // shutdown_both
111 2 : how = SHUT_RDWR;
112 2 : break;
113 MIS 0 : default:
114 0 : return make_err(EINVAL);
115 : }
116 HIT 6 : if (::shutdown(this->fd_, how) != 0)
117 MIS 0 : return make_err(errno);
118 HIT 6 : return {};
119 : }
120 :
121 : /// Cache local and remote endpoints.
122 16228 : void set_endpoints(Endpoint local, Endpoint remote) noexcept
123 : {
124 16228 : this->local_endpoint_ = std::move(local);
125 16228 : remote_endpoint_ = std::move(remote);
126 16228 : }
127 :
128 : /** Shared connect dispatch.
129 :
130 : Tries the connect syscall speculatively. On synchronous
131 : completion, returns via inline budget or posts through queue.
132 : On EINPROGRESS, registers with the reactor.
133 : */
134 : std::coroutine_handle<> do_connect(
135 : std::coroutine_handle<>,
136 : capy::executor_ref,
137 : Endpoint const&,
138 : std::stop_token const&,
139 : std::error_code*);
140 :
141 : /** Shared scatter-read dispatch.
142 :
143 : Tries readv() speculatively. On success or hard error,
144 : returns via inline budget or posts through queue.
145 : On EAGAIN, registers with the reactor.
146 : */
147 : std::coroutine_handle<> do_read_some(
148 : std::coroutine_handle<>,
149 : capy::executor_ref,
150 : buffer_param,
151 : std::stop_token const&,
152 : std::error_code*,
153 : std::size_t*);
154 :
155 : /** Shared gather-write dispatch.
156 :
157 : Tries the write via WriteOp::write_policy speculatively.
158 : On success or hard error, returns via inline budget or
159 : posts through queue. On EAGAIN, registers with the reactor.
160 : */
161 : std::coroutine_handle<> do_write_some(
162 : std::coroutine_handle<>,
163 : capy::executor_ref,
164 : buffer_param,
165 : std::stop_token const&,
166 : std::error_code*,
167 : std::size_t*);
168 :
169 : /** Close the socket and cancel pending operations.
170 :
171 : Extends the base do_close_socket() to also reset
172 : the remote endpoint.
173 : */
174 73402 : void do_close_socket() noexcept
175 : {
176 73402 : base_type::do_close_socket();
177 73402 : remote_endpoint_ = Endpoint{};
178 73402 : }
179 :
180 : private:
181 : // CRTP callbacks for reactor_basic_socket cancel/close
182 :
183 : template<class Op>
184 199 : reactor_op_base** op_to_desc_slot(Op& op) noexcept
185 : {
186 199 : if (&op == static_cast<void*>(&conn_))
187 MIS 0 : return &this->desc_state_.connect_op;
188 HIT 199 : if (&op == static_cast<void*>(&rd_))
189 199 : return &this->desc_state_.read_op;
190 MIS 0 : if (&op == static_cast<void*>(&wr_))
191 0 : return &this->desc_state_.write_op;
192 0 : return nullptr;
193 : }
194 :
195 : template<class Op>
196 0 : bool* op_to_cancel_flag(Op& op) noexcept
197 : {
198 0 : if (&op == static_cast<void*>(&conn_))
199 0 : return &this->desc_state_.connect_cancel_pending;
200 0 : if (&op == static_cast<void*>(&rd_))
201 0 : return &this->desc_state_.read_cancel_pending;
202 0 : if (&op == static_cast<void*>(&wr_))
203 0 : return &this->desc_state_.write_cancel_pending;
204 0 : return nullptr;
205 : }
206 :
207 : template<class Fn>
208 HIT 73597 : void for_each_op(Fn fn) noexcept
209 : {
210 73597 : fn(conn_);
211 73597 : fn(rd_);
212 73597 : fn(wr_);
213 73597 : }
214 :
215 : template<class Fn>
216 73597 : void for_each_desc_entry(Fn fn) noexcept
217 : {
218 73597 : fn(conn_, this->desc_state_.connect_op);
219 73597 : fn(rd_, this->desc_state_.read_op);
220 73597 : fn(wr_, this->desc_state_.write_op);
221 73597 : }
222 : };
223 :
224 : template<
225 : class Derived,
226 : class Service,
227 : class ConnOp,
228 : class ReadOp,
229 : class WriteOp,
230 : class DescState,
231 : class ImplBase,
232 : class Endpoint>
233 : std::coroutine_handle<>
234 8119 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
235 : do_connect(
236 : std::coroutine_handle<> h,
237 : capy::executor_ref ex,
238 : Endpoint const& ep,
239 : std::stop_token const& token,
240 : std::error_code* ec)
241 : {
242 8119 : auto& op = conn_;
243 :
244 8119 : sockaddr_storage storage{};
245 8119 : socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
246 : int result =
247 8119 : ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
248 :
249 8119 : if (result == 0)
250 : {
251 4 : sockaddr_storage local_storage{};
252 4 : socklen_t local_len = sizeof(local_storage);
253 4 : if (::getsockname(
254 : this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
255 4 : &local_len) == 0)
256 MIS 0 : this->local_endpoint_ =
257 HIT 4 : from_sockaddr_as(local_storage, local_len, Endpoint{});
258 4 : remote_endpoint_ = ep;
259 : }
260 :
261 8119 : if (result == 0 || errno != EINPROGRESS)
262 : {
263 4 : int err = (result < 0) ? errno : 0;
264 4 : if (this->svc_.scheduler().try_consume_inline_budget())
265 : {
266 MIS 0 : *ec = err ? make_err(err) : std::error_code{};
267 0 : op.cont_op.cont.h = h;
268 0 : return dispatch_coro(ex, op.cont_op.cont);
269 : }
270 HIT 4 : op.reset();
271 4 : op.h = h;
272 4 : op.ex = ex;
273 4 : op.ec_out = ec;
274 4 : op.fd = this->fd_;
275 4 : op.target_endpoint = ep;
276 4 : op.start(token, static_cast<Derived*>(this));
277 4 : op.impl_ptr = this->shared_from_this();
278 4 : op.complete(err, 0);
279 4 : this->svc_.post(&op);
280 4 : return std::noop_coroutine();
281 : }
282 :
283 : // EINPROGRESS — register with reactor
284 8115 : op.reset();
285 8115 : op.h = h;
286 8115 : op.ex = ex;
287 8115 : op.ec_out = ec;
288 8115 : op.fd = this->fd_;
289 8115 : op.target_endpoint = ep;
290 8115 : op.start(token, static_cast<Derived*>(this));
291 8115 : op.impl_ptr = this->shared_from_this();
292 :
293 8115 : this->register_op(
294 8115 : op, this->desc_state_.connect_op, this->desc_state_.write_ready,
295 8115 : this->desc_state_.connect_cancel_pending);
296 8115 : return std::noop_coroutine();
297 : }
298 :
299 : template<
300 : class Derived,
301 : class Service,
302 : class ConnOp,
303 : class ReadOp,
304 : class WriteOp,
305 : class DescState,
306 : class ImplBase,
307 : class Endpoint>
308 : std::coroutine_handle<>
309 353803 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
310 : do_read_some(
311 : std::coroutine_handle<> h,
312 : capy::executor_ref ex,
313 : buffer_param param,
314 : std::stop_token const& token,
315 : std::error_code* ec,
316 : std::size_t* bytes_out)
317 : {
318 353803 : auto& op = rd_;
319 353803 : op.reset();
320 :
321 353803 : capy::mutable_buffer bufs[ReadOp::max_buffers];
322 353803 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
323 :
324 353803 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
325 : {
326 2 : op.empty_buffer_read = true;
327 2 : op.h = h;
328 2 : op.ex = ex;
329 2 : op.ec_out = ec;
330 2 : op.bytes_out = bytes_out;
331 2 : op.start(token, static_cast<Derived*>(this));
332 2 : op.impl_ptr = this->shared_from_this();
333 2 : op.complete(0, 0);
334 2 : this->svc_.post(&op);
335 2 : return std::noop_coroutine();
336 : }
337 :
338 707602 : for (int i = 0; i < op.iovec_count; ++i)
339 : {
340 353801 : op.iovecs[i].iov_base = bufs[i].data();
341 353801 : op.iovecs[i].iov_len = bufs[i].size();
342 : }
343 :
344 : // Speculative read
345 : ssize_t n;
346 : do
347 : {
348 353801 : n = ::readv(this->fd_, op.iovecs, op.iovec_count);
349 : }
350 353801 : while (n < 0 && errno == EINTR);
351 :
352 353801 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
353 : {
354 353398 : int err = (n < 0) ? errno : 0;
355 353398 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
356 :
357 353398 : if (this->svc_.scheduler().try_consume_inline_budget())
358 : {
359 282752 : if (err)
360 MIS 0 : *ec = make_err(err);
361 HIT 282752 : else if (n == 0)
362 10 : *ec = capy::error::eof;
363 : else
364 282742 : *ec = {};
365 282752 : *bytes_out = bytes;
366 282752 : op.cont_op.cont.h = h;
367 282752 : return dispatch_coro(ex, op.cont_op.cont);
368 : }
369 70646 : op.h = h;
370 70646 : op.ex = ex;
371 70646 : op.ec_out = ec;
372 70646 : op.bytes_out = bytes_out;
373 70646 : op.start(token, static_cast<Derived*>(this));
374 70646 : op.impl_ptr = this->shared_from_this();
375 70646 : op.complete(err, bytes);
376 70646 : this->svc_.post(&op);
377 70646 : return std::noop_coroutine();
378 : }
379 :
380 : // EAGAIN — register with reactor
381 403 : op.h = h;
382 403 : op.ex = ex;
383 403 : op.ec_out = ec;
384 403 : op.bytes_out = bytes_out;
385 403 : op.fd = this->fd_;
386 403 : op.start(token, static_cast<Derived*>(this));
387 403 : op.impl_ptr = this->shared_from_this();
388 :
389 403 : this->register_op(
390 403 : op, this->desc_state_.read_op, this->desc_state_.read_ready,
391 403 : this->desc_state_.read_cancel_pending);
392 403 : return std::noop_coroutine();
393 : }
394 :
395 : template<
396 : class Derived,
397 : class Service,
398 : class ConnOp,
399 : class ReadOp,
400 : class WriteOp,
401 : class DescState,
402 : class ImplBase,
403 : class Endpoint>
404 : std::coroutine_handle<>
405 353497 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
406 : do_write_some(
407 : std::coroutine_handle<> h,
408 : capy::executor_ref ex,
409 : buffer_param param,
410 : std::stop_token const& token,
411 : std::error_code* ec,
412 : std::size_t* bytes_out)
413 : {
414 353497 : auto& op = wr_;
415 353497 : op.reset();
416 :
417 353497 : capy::mutable_buffer bufs[WriteOp::max_buffers];
418 353497 : op.iovec_count =
419 353497 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
420 :
421 353497 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
422 : {
423 2 : op.h = h;
424 2 : op.ex = ex;
425 2 : op.ec_out = ec;
426 2 : op.bytes_out = bytes_out;
427 2 : op.start(token, static_cast<Derived*>(this));
428 2 : op.impl_ptr = this->shared_from_this();
429 2 : op.complete(0, 0);
430 2 : this->svc_.post(&op);
431 2 : return std::noop_coroutine();
432 : }
433 :
434 706990 : for (int i = 0; i < op.iovec_count; ++i)
435 : {
436 353495 : op.iovecs[i].iov_base = bufs[i].data();
437 353495 : op.iovecs[i].iov_len = bufs[i].size();
438 : }
439 :
440 : // Speculative write via backend-specific write policy
441 : ssize_t n =
442 353495 : WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
443 :
444 353495 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
445 : {
446 353495 : int err = (n < 0) ? errno : 0;
447 353495 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
448 :
449 353495 : if (this->svc_.scheduler().try_consume_inline_budget())
450 : {
451 282813 : *ec = err ? make_err(err) : std::error_code{};
452 282813 : *bytes_out = bytes;
453 282813 : op.cont_op.cont.h = h;
454 282813 : return dispatch_coro(ex, op.cont_op.cont);
455 : }
456 70682 : op.h = h;
457 70682 : op.ex = ex;
458 70682 : op.ec_out = ec;
459 70682 : op.bytes_out = bytes_out;
460 70682 : op.start(token, static_cast<Derived*>(this));
461 70682 : op.impl_ptr = this->shared_from_this();
462 70682 : op.complete(err, bytes);
463 70682 : this->svc_.post(&op);
464 70682 : return std::noop_coroutine();
465 : }
466 :
467 : // EAGAIN — register with reactor
468 MIS 0 : op.h = h;
469 0 : op.ex = ex;
470 0 : op.ec_out = ec;
471 0 : op.bytes_out = bytes_out;
472 0 : op.fd = this->fd_;
473 0 : op.start(token, static_cast<Derived*>(this));
474 0 : op.impl_ptr = this->shared_from_this();
475 :
476 0 : this->register_op(
477 0 : op, this->desc_state_.write_op, this->desc_state_.write_ready,
478 0 : this->desc_state_.write_cancel_pending);
479 0 : return std::noop_coroutine();
480 : }
481 :
482 : } // namespace boost::corosio::detail
483 :
484 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
|