TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/intrusive.hpp>
14 : #include <boost/corosio/detail/native_handle.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
17 : #include <boost/corosio/native/detail/make_err.hpp>
18 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
19 :
20 : #include <memory>
21 : #include <mutex>
22 : #include <utility>
23 :
24 : #include <errno.h>
25 : #include <netinet/in.h>
26 : #include <sys/socket.h>
27 : #include <unistd.h>
28 :
29 : namespace boost::corosio::detail {
30 :
31 : /** CRTP base for reactor-backed socket implementations.
32 :
33 : Extracts the shared data members, virtual overrides, and
34 : cancel/close/register logic that is identical across TCP
35 : (reactor_stream_socket) and UDP (reactor_datagram_socket).
36 :
37 : Derived classes provide CRTP callbacks that enumerate their
38 : specific op slots so cancel/close can iterate them generically.
39 :
40 : @tparam Derived The concrete socket type (CRTP).
41 : @tparam ImplBase The public vtable base (tcp_socket::implementation
42 : or udp_socket::implementation).
43 : @tparam Service The backend's service type.
44 : @tparam DescState The backend's descriptor_state type.
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class ImplBase,
50 : class Service,
51 : class DescState,
52 : class Endpoint = endpoint>
53 : class reactor_basic_socket
54 : : public ImplBase
55 : , public std::enable_shared_from_this<Derived>
56 : , public intrusive_list<Derived>::node
57 : {
58 : friend Derived;
59 :
60 : template<class, class, class, class, class, class, class, class>
61 : friend class reactor_stream_socket;
62 :
63 : template<class, class, class, class, class, class, class, class, class, class>
64 : friend class reactor_datagram_socket;
65 :
66 HIT 24586 : explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
67 :
68 : protected:
69 : Service& svc_;
70 : int fd_ = -1;
71 : Endpoint local_endpoint_;
72 :
73 : public:
74 : /// Per-descriptor state for persistent reactor registration.
75 : DescState desc_state_;
76 :
77 24586 : ~reactor_basic_socket() override = default;
78 :
79 : /// Return the underlying file descriptor.
80 50008 : native_handle_type native_handle() const noexcept override
81 : {
82 50008 : return fd_;
83 : }
84 :
85 : /// Return the cached local endpoint.
86 80 : Endpoint local_endpoint() const noexcept override
87 : {
88 80 : return local_endpoint_;
89 : }
90 :
91 : /// Return true if the socket has an open file descriptor.
92 : bool is_open() const noexcept
93 : {
94 : return fd_ >= 0;
95 : }
96 :
97 : /// Set a socket option.
98 80 : std::error_code set_option(
99 : int level,
100 : int optname,
101 : void const* data,
102 : std::size_t size) noexcept override
103 : {
104 80 : if (::setsockopt(
105 80 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
106 MIS 0 : return make_err(errno);
107 HIT 80 : return {};
108 : }
109 :
110 : /// Get a socket option.
111 : std::error_code
112 78 : get_option(int level, int optname, void* data, std::size_t* size)
113 : const noexcept override
114 : {
115 78 : socklen_t len = static_cast<socklen_t>(*size);
116 78 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
117 MIS 0 : return make_err(errno);
118 HIT 78 : *size = static_cast<std::size_t>(len);
119 78 : return {};
120 : }
121 :
122 : /// Assign the file descriptor.
123 8114 : void set_socket(int fd) noexcept
124 : {
125 8114 : fd_ = fd;
126 8114 : }
127 :
128 : /// Cache the local endpoint.
129 : void set_local_endpoint(Endpoint ep) noexcept
130 : {
131 : local_endpoint_ = ep;
132 : }
133 :
134 : /** Bind the socket to a local endpoint.
135 :
136 : Calls ::bind() and caches the resulting local endpoint
137 : via getsockname().
138 :
139 : @param ep The endpoint to bind to.
140 : @return Error code on failure, empty on success.
141 : */
142 76 : std::error_code do_bind(Endpoint const& ep) noexcept
143 : {
144 76 : sockaddr_storage storage{};
145 76 : socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
146 76 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
147 10 : return make_err(errno);
148 :
149 66 : sockaddr_storage local_storage{};
150 66 : socklen_t local_len = sizeof(local_storage);
151 66 : if (::getsockname(
152 66 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
153 : 0)
154 52 : local_endpoint_ =
155 66 : from_sockaddr_as(local_storage, local_len, Endpoint{});
156 :
157 66 : return {};
158 : }
159 :
160 : /** Register an op with the reactor.
161 :
162 : Handles cached edge events and deferred cancellation.
163 : Called on the EAGAIN/EINPROGRESS path when speculative
164 : I/O failed.
165 : */
166 : template<class Op>
167 : void register_op(
168 : Op& op,
169 : reactor_op_base*& desc_slot,
170 : bool& ready_flag,
171 : bool& cancel_flag) noexcept;
172 :
173 : /** Cancel a single pending operation.
174 :
175 : Claims the operation from its descriptor_state slot under
176 : the mutex and posts it to the scheduler as cancelled.
177 : Derived must implement:
178 : op_to_desc_slot(Op&) -> reactor_op_base**
179 : op_to_cancel_flag(Op&) -> bool*
180 : */
181 : template<class Op>
182 : void cancel_single_op(Op& op) noexcept;
183 :
184 : /** Cancel all pending operations.
185 :
186 : Invoked by the derived class's cancel() override.
187 : Derived must implement:
188 : for_each_op(auto fn)
189 : for_each_desc_entry(auto fn)
190 : */
191 : void do_cancel() noexcept;
192 :
193 : /** Close the socket and cancel pending operations.
194 :
195 : Invoked by the derived class's close_socket(). The
196 : derived class may add backend-specific cleanup after
197 : calling this method.
198 : Derived must implement:
199 : for_each_op(auto fn)
200 : for_each_desc_entry(auto fn)
201 : */
202 : void do_close_socket() noexcept;
203 :
204 : /** Release the socket without closing the fd.
205 :
206 : Like do_close_socket() but does not call ::close().
207 : Returns the fd so the caller can take ownership.
208 : */
209 : native_handle_type do_release_socket() noexcept;
210 : };
211 :
212 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
213 : template<class Op>
214 : void
215 8528 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
216 : Op& op,
217 : reactor_op_base*& desc_slot,
218 : bool& ready_flag,
219 : bool& cancel_flag) noexcept
220 : {
221 8528 : svc_.work_started();
222 :
223 8528 : std::lock_guard lock(desc_state_.mutex);
224 8528 : bool io_done = false;
225 8528 : if (ready_flag)
226 : {
227 193 : ready_flag = false;
228 193 : op.perform_io();
229 193 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
230 193 : if (!io_done)
231 193 : op.errn = 0;
232 : }
233 :
234 8528 : if (cancel_flag)
235 : {
236 MIS 0 : cancel_flag = false;
237 0 : op.cancelled.store(true, std::memory_order_relaxed);
238 : }
239 :
240 HIT 8528 : if (io_done || op.cancelled.load(std::memory_order_acquire))
241 : {
242 MIS 0 : svc_.post(&op);
243 0 : svc_.work_finished();
244 : }
245 : else
246 : {
247 HIT 8528 : desc_slot = &op;
248 : }
249 8528 : }
250 :
251 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
252 : template<class Op>
253 : void
254 201 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
255 : Op& op) noexcept
256 : {
257 201 : auto self = this->weak_from_this().lock();
258 201 : if (!self)
259 MIS 0 : return;
260 :
261 HIT 201 : op.request_cancel();
262 :
263 201 : auto* d = static_cast<Derived*>(this);
264 201 : reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
265 :
266 201 : if (desc_op_ptr)
267 : {
268 201 : reactor_op_base* claimed = nullptr;
269 : {
270 201 : std::lock_guard lock(desc_state_.mutex);
271 201 : if (*desc_op_ptr == &op)
272 201 : claimed = std::exchange(*desc_op_ptr, nullptr);
273 : else
274 : {
275 MIS 0 : bool* cflag = d->op_to_cancel_flag(op);
276 0 : if (cflag)
277 0 : *cflag = true;
278 : }
279 HIT 201 : }
280 201 : if (claimed)
281 : {
282 201 : op.impl_ptr = self;
283 201 : svc_.post(&op);
284 201 : svc_.work_finished();
285 : }
286 : }
287 201 : }
288 :
289 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
290 : void
291 197 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
292 : do_cancel() noexcept
293 : {
294 197 : auto self = this->weak_from_this().lock();
295 197 : if (!self)
296 MIS 0 : return;
297 :
298 HIT 197 : auto* d = static_cast<Derived*>(this);
299 :
300 796 : d->for_each_op([](auto& op) { op.request_cancel(); });
301 :
302 : // Claim ops under a single lock acquisition
303 : struct claimed_entry
304 : {
305 : reactor_op_base* op = nullptr;
306 : reactor_op_base* base = nullptr;
307 : };
308 : // Max 3 ops (conn, rd, wr)
309 197 : claimed_entry claimed[3];
310 197 : int count = 0;
311 :
312 : {
313 197 : std::lock_guard lock(desc_state_.mutex);
314 1395 : d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
315 599 : if (desc_slot == &op)
316 : {
317 105 : claimed[count].op = std::exchange(desc_slot, nullptr);
318 105 : claimed[count].base = &op;
319 105 : ++count;
320 : }
321 : });
322 197 : }
323 :
324 302 : for (int i = 0; i < count; ++i)
325 : {
326 105 : claimed[i].base->impl_ptr = self;
327 105 : svc_.post(claimed[i].base);
328 105 : svc_.work_finished();
329 : }
330 197 : }
331 :
332 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
333 : void
334 73866 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
335 : do_close_socket() noexcept
336 : {
337 73866 : auto self = this->weak_from_this().lock();
338 73866 : if (self)
339 : {
340 73866 : auto* d = static_cast<Derived*>(this);
341 :
342 296392 : d->for_each_op([](auto& op) { op.request_cancel(); });
343 :
344 : struct claimed_entry
345 : {
346 : reactor_op_base* base = nullptr;
347 : };
348 73866 : claimed_entry claimed[3];
349 73866 : int count = 0;
350 :
351 : {
352 73866 : std::lock_guard lock(desc_state_.mutex);
353 73866 : d->for_each_desc_entry(
354 445052 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
355 222526 : auto* c = std::exchange(desc_slot, nullptr);
356 222526 : if (c)
357 : {
358 4 : claimed[count].base = c;
359 4 : ++count;
360 : }
361 : });
362 73866 : desc_state_.read_ready = false;
363 73866 : desc_state_.write_ready = false;
364 73866 : desc_state_.read_cancel_pending = false;
365 73866 : desc_state_.write_cancel_pending = false;
366 73866 : desc_state_.connect_cancel_pending = false;
367 :
368 73866 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
369 268 : desc_state_.impl_ref_ = self;
370 73866 : }
371 :
372 73870 : for (int i = 0; i < count; ++i)
373 : {
374 4 : claimed[i].base->impl_ptr = self;
375 4 : svc_.post(claimed[i].base);
376 4 : svc_.work_finished();
377 : }
378 : }
379 :
380 73866 : if (fd_ >= 0)
381 : {
382 16403 : if (desc_state_.registered_events != 0)
383 16403 : svc_.scheduler().deregister_descriptor(fd_);
384 16403 : ::close(fd_);
385 16403 : fd_ = -1;
386 : }
387 :
388 73866 : desc_state_.fd = -1;
389 73866 : desc_state_.registered_events = 0;
390 :
391 73866 : local_endpoint_ = Endpoint{};
392 73866 : }
393 :
394 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
395 : native_handle_type
396 2 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
397 : do_release_socket() noexcept
398 : {
399 : // Cancel pending ops (same as do_close_socket)
400 2 : auto self = this->weak_from_this().lock();
401 2 : if (self)
402 : {
403 2 : auto* d = static_cast<Derived*>(this);
404 :
405 8 : d->for_each_op([](auto& op) { op.request_cancel(); });
406 :
407 : struct claimed_entry
408 : {
409 : reactor_op_base* base = nullptr;
410 : };
411 2 : claimed_entry claimed[3];
412 2 : int count = 0;
413 :
414 : {
415 2 : std::lock_guard lock(desc_state_.mutex);
416 2 : d->for_each_desc_entry(
417 12 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
418 6 : auto* c = std::exchange(desc_slot, nullptr);
419 6 : if (c)
420 : {
421 MIS 0 : claimed[count].base = c;
422 0 : ++count;
423 : }
424 : });
425 HIT 2 : desc_state_.read_ready = false;
426 2 : desc_state_.write_ready = false;
427 2 : desc_state_.read_cancel_pending = false;
428 2 : desc_state_.write_cancel_pending = false;
429 2 : desc_state_.connect_cancel_pending = false;
430 :
431 2 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
432 MIS 0 : desc_state_.impl_ref_ = self;
433 HIT 2 : }
434 :
435 2 : for (int i = 0; i < count; ++i)
436 : {
437 MIS 0 : claimed[i].base->impl_ptr = self;
438 0 : svc_.post(claimed[i].base);
439 0 : svc_.work_finished();
440 : }
441 : }
442 :
443 HIT 2 : native_handle_type released = fd_;
444 :
445 2 : if (fd_ >= 0)
446 : {
447 2 : if (desc_state_.registered_events != 0)
448 2 : svc_.scheduler().deregister_descriptor(fd_);
449 : // Do NOT close -- caller takes ownership
450 2 : fd_ = -1;
451 : }
452 :
453 2 : desc_state_.fd = -1;
454 2 : desc_state_.registered_events = 0;
455 :
456 2 : local_endpoint_ = Endpoint{};
457 :
458 4 : return released;
459 2 : }
460 :
461 : } // namespace boost::corosio::detail
462 :
463 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
|