include/boost/corosio/native/detail/reactor/reactor_op_complete.hpp

80.3% Lines (504/628) 93.8% List of functions (30/32)
reactor_op_complete.hpp
f(x) Functions (32)
Function Calls Lines Blocks
void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_datagram_op>(boost::corosio::detail::epoll_datagram_op&) :39 8x 81.2% 64.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_local_datagram_op>(boost::corosio::detail::epoll_local_datagram_op&) :39 6x 81.2% 64.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_local_stream_op>(boost::corosio::detail::epoll_local_stream_op&) :39 3x 81.2% 68.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_op>(boost::corosio::detail::epoll_op&) :39 39997x 87.5% 75.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_recv_op>(boost::corosio::detail::epoll_recv_op&) :39 2x 87.5% 75.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_datagram_op>(boost::corosio::detail::select_datagram_op&) :39 8x 81.2% 64.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_local_datagram_op>(boost::corosio::detail::select_local_datagram_op&) :39 6x 81.2% 64.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_local_stream_op>(boost::corosio::detail::select_local_stream_op&) :39 3x 81.2% 68.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_op>(boost::corosio::detail::select_op&) :39 41276x 87.5% 75.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_recv_op>(boost::corosio::detail::select_recv_op&) :39 2x 87.5% 75.0% void boost::corosio::detail::complete_dgram_recv_op<boost::corosio::detail::epoll_local_dgram_recv_op>(boost::corosio::detail::epoll_local_dgram_recv_op&) :71 1x 85.7% 70.0% void boost::corosio::detail::complete_dgram_recv_op<boost::corosio::detail::select_local_dgram_recv_op>(boost::corosio::detail::select_local_dgram_recv_op&) :71 1x 85.7% 70.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::epoll_connect_op>(boost::corosio::detail::epoll_connect_op&) :102 3398x 95.8% 86.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::epoll_local_connect_op>(boost::corosio::detail::epoll_local_connect_op&) :102 2x 91.3% 79.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::epoll_local_dgram_connect_op>(boost::corosio::detail::epoll_local_dgram_connect_op&) :102 0 0.0% 0.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::epoll_udp_connect_op>(boost::corosio::detail::epoll_udp_connect_op&) :102 5x 91.7% 80.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::select_connect_op>(boost::corosio::detail::select_connect_op&) :102 3152x 95.8% 86.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::select_local_connect_op>(boost::corosio::detail::select_local_connect_op&) :102 2x 91.3% 79.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::select_local_dgram_connect_op>(boost::corosio::detail::select_local_dgram_connect_op&) :102 0 0.0% 0.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::select_udp_connect_op>(boost::corosio::detail::select_udp_connect_op&) :102 5x 91.7% 80.0% bool boost::corosio::detail::setup_accepted_socket<boost::corosio::detail::epoll_local_stream_socket, boost::corosio::detail::epoll_local_stream_acceptor>(boost::corosio::detail::epoll_local_stream_acceptor*, int&, sockaddr_storage const&, unsigned int, boost::corosio::io_object::implementation**, std::error_code*) :154 2x 90.0% 90.0% bool boost::corosio::detail::setup_accepted_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>(boost::corosio::detail::epoll_tcp_acceptor*, int&, sockaddr_storage const&, unsigned int, boost::corosio::io_object::implementation**, std::error_code*) :154 3395x 90.0% 90.0% bool boost::corosio::detail::setup_accepted_socket<boost::corosio::detail::select_local_stream_socket, boost::corosio::detail::select_local_stream_acceptor>(boost::corosio::detail::select_local_stream_acceptor*, int&, sockaddr_storage const&, unsigned int, boost::corosio::io_object::implementation**, std::error_code*) :154 2x 90.0% 90.0% bool boost::corosio::detail::setup_accepted_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>(boost::corosio::detail::select_tcp_acceptor*, int&, sockaddr_storage const&, unsigned int, boost::corosio::io_object::implementation**, std::error_code*) :154 3150x 90.0% 90.0% void boost::corosio::detail::complete_accept_op<boost::corosio::detail::epoll_local_stream_socket, boost::corosio::detail::epoll_local_accept_op>(boost::corosio::detail::epoll_local_accept_op&) :206 2x 68.0% 66.0% void boost::corosio::detail::complete_accept_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_accept_op>(boost::corosio::detail::epoll_accept_op&) :206 3404x 84.0% 82.0% void boost::corosio::detail::complete_accept_op<boost::corosio::detail::select_local_stream_socket, boost::corosio::detail::select_local_accept_op>(boost::corosio::detail::select_local_accept_op&) :206 2x 68.0% 66.0% void boost::corosio::detail::complete_accept_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_accept_op>(boost::corosio::detail::select_accept_op&) :206 3153x 84.0% 82.0% void boost::corosio::detail::complete_datagram_op<boost::corosio::detail::epoll_local_recv_from_op, boost::corosio::local_endpoint>(boost::corosio::detail::epoll_local_recv_from_op&, boost::corosio::local_endpoint*) :259 2x 88.9% 77.0% void boost::corosio::detail::complete_datagram_op<boost::corosio::detail::epoll_recv_from_op, boost::corosio::endpoint>(boost::corosio::detail::epoll_recv_from_op&, boost::corosio::endpoint*) :259 7x 94.4% 87.0% void boost::corosio::detail::complete_datagram_op<boost::corosio::detail::select_local_recv_from_op, boost::corosio::local_endpoint>(boost::corosio::detail::select_local_recv_from_op&, boost::corosio::local_endpoint*) :259 2x 88.9% 77.0% void boost::corosio::detail::complete_datagram_op<boost::corosio::detail::select_recv_from_op, boost::corosio::endpoint>(boost::corosio::detail::select_recv_from_op&, boost::corosio::endpoint*) :259 7x 94.4% 87.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12
13 #include <boost/corosio/detail/dispatch_coro.hpp>
14 #include <boost/corosio/native/detail/endpoint_convert.hpp>
15 #include <boost/corosio/native/detail/make_err.hpp>
16 #include <boost/corosio/io/io_object.hpp>
17
18 #include <coroutine>
19 #include <mutex>
20 #include <utility>
21
22 #include <netinet/in.h>
23 #include <sys/socket.h>
24 #include <unistd.h>
25
26 namespace boost::corosio::detail {
27
28 /** Complete a base read/write operation.
29
30 Translates the recorded errno and cancellation state into
31 an error_code, stores the byte count, then resumes the
32 caller via symmetric transfer.
33
34 @tparam Op The concrete operation type.
35 @param op The operation to complete.
36 */
37 template<typename Op>
38 void
39 81311x complete_io_op(Op& op)
40 {
41 81311x op.stop_cb.reset();
42 81311x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
43
44 81311x if (op.cancelled.load(std::memory_order_acquire))
45 306x *op.ec_out = capy::error::canceled;
46 81005x else if (op.errn != 0)
47 *op.ec_out = make_err(op.errn);
48 81005x else if (op.is_read_operation() && op.bytes_transferred == 0)
49 *op.ec_out = capy::error::eof;
50 else
51 81005x *op.ec_out = {};
52
53 81311x *op.bytes_out = op.bytes_transferred;
54
55 81311x op.cont_op.cont.h = op.h;
56 81311x capy::executor_ref saved_ex(op.ex);
57 81311x auto prevent = std::move(op.impl_ptr);
58 81311x dispatch_coro(saved_ex, op.cont_op.cont).resume();
59 81311x }
60
61 /** Complete a datagram recv operation (connected mode).
62
63 Like complete_io_op but does not translate zero bytes into
64 EOF. Zero-length datagrams are valid and should be reported
65 as success with 0 bytes transferred.
66
67 @param op The operation to complete.
68 */
69 template<typename Op>
70 void
71 2x complete_dgram_recv_op(Op& op)
72 {
73 2x op.stop_cb.reset();
74 2x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
75
76 2x if (op.cancelled.load(std::memory_order_acquire))
77 *op.ec_out = capy::error::canceled;
78 2x else if (op.errn != 0)
79 *op.ec_out = make_err(op.errn);
80 else
81 2x *op.ec_out = {};
82
83 2x *op.bytes_out = op.bytes_transferred;
84
85 2x op.cont_op.cont.h = op.h;
86 2x capy::executor_ref saved_ex(op.ex);
87 2x auto prevent = std::move(op.impl_ptr);
88 2x dispatch_coro(saved_ex, op.cont_op.cont).resume();
89 2x }
90
91 /** Complete a connect operation with endpoint caching.
92
93 On success, queries the local endpoint via getsockname and
94 caches both endpoints in the socket impl. Then resumes the
95 caller via symmetric transfer.
96
97 @tparam Op The concrete connect operation type.
98 @param op The operation to complete.
99 */
100 template<typename Op>
101 void
102 6564x complete_connect_op(Op& op)
103 {
104 6564x op.stop_cb.reset();
105 6564x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
106
107 6564x bool success =
108 6564x (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
109
110 6564x if (success && op.socket_impl_)
111 {
112 using ep_type = decltype(op.target_endpoint);
113 6559x ep_type local_ep;
114 6559x sockaddr_storage local_storage{};
115 6559x socklen_t local_len = sizeof(local_storage);
116 6559x if (::getsockname(
117 op.fd, reinterpret_cast<sockaddr*>(&local_storage),
118 6559x &local_len) == 0)
119 6555x local_ep =
120 6559x from_sockaddr_as(local_storage, local_len, ep_type{});
121 6559x op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
122 }
123
124 6564x if (op.cancelled.load(std::memory_order_acquire))
125 *op.ec_out = capy::error::canceled;
126 6564x else if (op.errn != 0)
127 5x *op.ec_out = make_err(op.errn);
128 else
129 6559x *op.ec_out = {};
130
131 6564x op.cont_op.cont.h = op.h;
132 6564x capy::executor_ref saved_ex(op.ex);
133 6564x auto prevent = std::move(op.impl_ptr);
134 6564x dispatch_coro(saved_ex, op.cont_op.cont).resume();
135 6564x }
136
137 /** Construct and register a peer socket from an accepted fd.
138
139 Creates a new socket impl via the acceptor's associated
140 socket service, registers it with the scheduler, and caches
141 the local and remote endpoints.
142
143 @tparam SocketImpl The concrete socket implementation type.
144 @tparam AcceptorImpl The concrete acceptor implementation type.
145 @param acceptor_impl The acceptor that accepted the connection.
146 @param accepted_fd The accepted file descriptor (set to -1 on success).
147 @param peer_storage The peer address from accept().
148 @param impl_out Output pointer for the new socket impl.
149 @param ec_out Output pointer for any error.
150 @return True on success, false on failure.
151 */
152 template<typename SocketImpl, typename AcceptorImpl>
153 bool
154 6549x setup_accepted_socket(
155 AcceptorImpl* acceptor_impl,
156 int& accepted_fd,
157 sockaddr_storage const& peer_storage,
158 socklen_t peer_addrlen,
159 io_object::implementation** impl_out,
160 std::error_code* ec_out)
161 {
162 6549x auto* socket_svc = acceptor_impl->service().stream_service();
163 6549x if (!socket_svc)
164 {
165 *ec_out = make_err(ENOENT);
166 return false;
167 }
168
169 6549x auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
170 6549x impl.set_socket(accepted_fd);
171
172 6549x impl.desc_state_.fd = accepted_fd;
173 {
174 6549x std::lock_guard lock(impl.desc_state_.mutex);
175 6549x impl.desc_state_.read_op = nullptr;
176 6549x impl.desc_state_.write_op = nullptr;
177 6549x impl.desc_state_.connect_op = nullptr;
178 6549x }
179 6549x socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
180
181 using ep_type = decltype(acceptor_impl->local_endpoint());
182 6549x impl.set_endpoints(
183 acceptor_impl->local_endpoint(),
184 6549x from_sockaddr_as(
185 peer_storage,
186 peer_addrlen,
187 ep_type{}));
188
189 6549x if (impl_out)
190 6549x *impl_out = &impl;
191 6549x accepted_fd = -1;
192 6549x return true;
193 }
194
195 /** Complete an accept operation.
196
197 Sets up the peer socket on success, or closes the accepted
198 fd on failure. Then resumes the caller via symmetric transfer.
199
200 @tparam SocketImpl The concrete socket implementation type.
201 @tparam Op The concrete accept operation type.
202 @param op The operation to complete.
203 */
204 template<typename SocketImpl, typename Op>
205 void
206 6561x complete_accept_op(Op& op)
207 {
208 6561x op.stop_cb.reset();
209 6561x op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
210
211 6561x bool success =
212 6561x (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
213
214 6561x if (op.cancelled.load(std::memory_order_acquire))
215 12x *op.ec_out = capy::error::canceled;
216 6549x else if (op.errn != 0)
217 *op.ec_out = make_err(op.errn);
218 else
219 6549x *op.ec_out = {};
220
221 6561x if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
222 {
223 6549x if (!setup_accepted_socket<SocketImpl>(
224 6549x op.acceptor_impl_, op.accepted_fd, op.peer_storage,
225 op.peer_addrlen, op.impl_out, op.ec_out))
226 success = false;
227 }
228
229 6561x if (!success || !op.acceptor_impl_)
230 {
231 12x if (op.accepted_fd >= 0)
232 {
233 ::close(op.accepted_fd);
234 op.accepted_fd = -1;
235 }
236 12x if (op.impl_out)
237 12x *op.impl_out = nullptr;
238 }
239
240 6561x op.cont_op.cont.h = op.h;
241 6561x capy::executor_ref saved_ex(op.ex);
242 6561x auto prevent = std::move(op.impl_ptr);
243 6561x dispatch_coro(saved_ex, op.cont_op.cont).resume();
244 6561x }
245
246 /** Complete a datagram operation (send_to or recv_from).
247
248 For recv_from operations, writes the source endpoint from the
249 recorded sockaddr_storage into the caller's endpoint pointer.
250 Then resumes the caller via symmetric transfer.
251
252 @tparam Op The concrete datagram operation type.
253 @param op The operation to complete.
254 @param source_out Optional pointer to store source endpoint
255 (non-null for recv_from, null for send_to).
256 */
257 template<typename Op, typename Endpoint>
258 void
259 18x complete_datagram_op(Op& op, Endpoint* source_out)
260 {
261 18x op.stop_cb.reset();
262 18x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
263
264 18x if (op.cancelled.load(std::memory_order_acquire))
265 6x *op.ec_out = capy::error::canceled;
266 12x else if (op.errn != 0)
267 *op.ec_out = make_err(op.errn);
268 else
269 12x *op.ec_out = {};
270
271 18x *op.bytes_out = op.bytes_transferred;
272
273 28x if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
274 10x op.errn == 0)
275 20x *source_out = from_sockaddr_as(
276 10x op.source_storage,
277 op.source_addrlen,
278 Endpoint{});
279
280 18x op.cont_op.cont.h = op.h;
281 18x capy::executor_ref saved_ex(op.ex);
282 18x auto prevent = std::move(op.impl_ptr);
283 18x dispatch_coro(saved_ex, op.cont_op.cont).resume();
284 18x }
285
286 } // namespace boost::corosio::detail
287
288 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
289