TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_LOCAL_DATAGRAM_SOCKET_HPP
11 : #define BOOST_COROSIO_LOCAL_DATAGRAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/platform.hpp>
15 : #include <boost/corosio/detail/except.hpp>
16 : #include <boost/corosio/detail/native_handle.hpp>
17 : #include <boost/corosio/detail/op_base.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/capy/io_result.hpp>
20 : #include <boost/corosio/detail/buffer_param.hpp>
21 : #include <boost/corosio/local_endpoint.hpp>
22 : #include <boost/corosio/local_datagram.hpp>
23 : #include <boost/corosio/message_flags.hpp>
24 : #include <boost/corosio/shutdown_type.hpp>
25 : #include <boost/capy/ex/executor_ref.hpp>
26 : #include <boost/capy/ex/execution_context.hpp>
27 : #include <boost/capy/ex/io_env.hpp>
28 : #include <boost/capy/concept/executor.hpp>
29 :
30 : #include <system_error>
31 :
32 : #include <concepts>
33 : #include <coroutine>
34 : #include <cstddef>
35 : #include <stop_token>
36 : #include <type_traits>
37 :
38 : namespace boost::corosio {
39 :
40 : /* An asynchronous Unix datagram socket for coroutine I/O.
41 :
42 : This class provides asynchronous Unix domain datagram socket
43 : operations that return awaitable types. Each operation
44 : participates in the affine awaitable protocol, ensuring
45 : coroutines resume on the correct executor.
46 :
47 : Supports two modes of operation:
48 :
49 : Connectionless mode: each send_to specifies a destination
50 : endpoint, and each recv_from captures the source endpoint.
51 : The socket must be opened (and optionally bound) before I/O.
52 :
53 : Connected mode: call connect() to set a default peer,
54 : then use send()/recv() without endpoint arguments.
55 : The kernel filters incoming datagrams to those from the
56 : connected peer.
57 :
58 : Thread Safety:
59 : Distinct objects: Safe.
60 : Shared objects: Unsafe. A socket must not have concurrent
61 : operations of the same type (e.g., two simultaneous
62 : recv_from). One send_to and one recv_from may be in flight
63 : simultaneously. Note that recv and recv_from share the
64 : same internal read slot, so they must not overlap; likewise
65 : send and send_to share the write slot.
66 : */
67 : class BOOST_COROSIO_DECL local_datagram_socket : public io_object
68 : {
69 : public:
70 : using shutdown_type = corosio::shutdown_type;
71 : using enum corosio::shutdown_type;
72 :
73 : /** Define backend hooks for local datagram socket operations.
74 :
75 : Platform backends (epoll, kqueue, select) derive from this
76 : to implement datagram I/O, connection, and option management.
77 : */
78 : struct implementation : io_object::implementation
79 : {
80 : /** Initiate an asynchronous send_to operation.
81 :
82 : @param h Coroutine handle to resume on completion.
83 : @param ex Executor for dispatching the completion.
84 : @param buf The buffer data to send.
85 : @param dest The destination endpoint.
86 : @param token Stop token for cancellation.
87 : @param ec Output error code.
88 : @param bytes_out Output bytes transferred.
89 :
90 : @return Coroutine handle to resume immediately.
91 : */
92 : virtual std::coroutine_handle<> send_to(
93 : std::coroutine_handle<> h,
94 : capy::executor_ref ex,
95 : buffer_param buf,
96 : corosio::local_endpoint dest,
97 : int flags,
98 : std::stop_token token,
99 : std::error_code* ec,
100 : std::size_t* bytes_out) = 0;
101 :
102 : /** Initiate an asynchronous recv_from operation.
103 :
104 : @param h Coroutine handle to resume on completion.
105 : @param ex Executor for dispatching the completion.
106 : @param buf The buffer to receive into.
107 : @param source Output endpoint for the sender's address.
108 : @param token Stop token for cancellation.
109 : @param ec Output error code.
110 : @param bytes_out Output bytes transferred.
111 :
112 : @return Coroutine handle to resume immediately.
113 : */
114 : virtual std::coroutine_handle<> recv_from(
115 : std::coroutine_handle<> h,
116 : capy::executor_ref ex,
117 : buffer_param buf,
118 : corosio::local_endpoint* source,
119 : int flags,
120 : std::stop_token token,
121 : std::error_code* ec,
122 : std::size_t* bytes_out) = 0;
123 :
124 : /** Initiate an asynchronous connect to set the default peer.
125 :
126 : @param h Coroutine handle to resume on completion.
127 : @param ex Executor for dispatching the completion.
128 : @param ep The remote endpoint to connect to.
129 : @param token Stop token for cancellation.
130 : @param ec Output error code.
131 :
132 : @return Coroutine handle to resume immediately.
133 : */
134 : virtual std::coroutine_handle<> connect(
135 : std::coroutine_handle<> h,
136 : capy::executor_ref ex,
137 : corosio::local_endpoint ep,
138 : std::stop_token token,
139 : std::error_code* ec) = 0;
140 :
141 : /** Initiate an asynchronous connected send operation.
142 :
143 : @param h Coroutine handle to resume on completion.
144 : @param ex Executor for dispatching the completion.
145 : @param buf The buffer data to send.
146 : @param token Stop token for cancellation.
147 : @param ec Output error code.
148 : @param bytes_out Output bytes transferred.
149 :
150 : @return Coroutine handle to resume immediately.
151 : */
152 : virtual std::coroutine_handle<> send(
153 : std::coroutine_handle<> h,
154 : capy::executor_ref ex,
155 : buffer_param buf,
156 : int flags,
157 : std::stop_token token,
158 : std::error_code* ec,
159 : std::size_t* bytes_out) = 0;
160 :
161 : /** Initiate an asynchronous connected recv operation.
162 :
163 : @param h Coroutine handle to resume on completion.
164 : @param ex Executor for dispatching the completion.
165 : @param buf The buffer to receive into.
166 : @param flags Message flags (e.g. MSG_PEEK).
167 : @param token Stop token for cancellation.
168 : @param ec Output error code.
169 : @param bytes_out Output bytes transferred.
170 :
171 : @return Coroutine handle to resume immediately.
172 : */
173 : virtual std::coroutine_handle<> recv(
174 : std::coroutine_handle<> h,
175 : capy::executor_ref ex,
176 : buffer_param buf,
177 : int flags,
178 : std::stop_token token,
179 : std::error_code* ec,
180 : std::size_t* bytes_out) = 0;
181 :
182 : /// Shut down part or all of the socket.
183 : virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
184 :
185 : /// Return the platform socket descriptor.
186 : virtual native_handle_type native_handle() const noexcept = 0;
187 :
188 : virtual native_handle_type release_socket() noexcept = 0;
189 :
190 : /** Request cancellation of pending asynchronous operations.
191 :
192 : All outstanding operations complete with operation_canceled
193 : error. Check ec == cond::canceled for portable comparison.
194 : */
195 : virtual void cancel() noexcept = 0;
196 :
197 : /** Set a socket option.
198 :
199 : @param level The protocol level (e.g. SOL_SOCKET).
200 : @param optname The option name.
201 : @param data Pointer to the option value.
202 : @param size Size of the option value in bytes.
203 : @return Error code on failure, empty on success.
204 : */
205 : virtual std::error_code set_option(
206 : int level,
207 : int optname,
208 : void const* data,
209 : std::size_t size) noexcept = 0;
210 :
211 : /** Get a socket option.
212 :
213 : @param level The protocol level (e.g. SOL_SOCKET).
214 : @param optname The option name.
215 : @param data Pointer to receive the option value.
216 : @param size On entry, the size of the buffer. On exit,
217 : the size of the option value.
218 : @return Error code on failure, empty on success.
219 : */
220 : virtual std::error_code
221 : get_option(int level, int optname, void* data, std::size_t* size)
222 : const noexcept = 0;
223 :
224 : /// Return the cached local endpoint.
225 : virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
226 :
227 : /// Return the cached remote endpoint (connected mode).
228 : virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
229 :
230 : /** Bind the socket to a local endpoint.
231 :
232 : @param ep The local endpoint to bind to.
233 : @return Error code on failure, empty on success.
234 : */
235 : virtual std::error_code
236 : bind(corosio::local_endpoint ep) noexcept = 0;
237 : };
238 :
239 : /** Represent the awaitable returned by @ref send_to.
240 :
241 : Captures the destination endpoint and buffer, then dispatches
242 : to the backend implementation on suspension.
243 : */
244 : struct send_to_awaitable
245 : : detail::bytes_op_base<send_to_awaitable>
246 : {
247 : local_datagram_socket& s_;
248 : buffer_param buf_;
249 : corosio::local_endpoint dest_;
250 : int flags_;
251 :
252 HIT 6 : send_to_awaitable(
253 : local_datagram_socket& s, buffer_param buf,
254 : corosio::local_endpoint dest, int flags = 0) noexcept
255 6 : : s_(s), buf_(buf), dest_(dest), flags_(flags) {}
256 :
257 6 : std::coroutine_handle<> dispatch(
258 : std::coroutine_handle<> h, capy::executor_ref ex) const
259 : {
260 12 : return s_.get().send_to(
261 12 : h, ex, buf_, dest_, flags_, token_, &ec_, &bytes_);
262 : }
263 : };
264 :
265 : struct recv_from_awaitable
266 : : detail::bytes_op_base<recv_from_awaitable>
267 : {
268 : local_datagram_socket& s_;
269 : buffer_param buf_;
270 : corosio::local_endpoint& source_;
271 : int flags_;
272 :
273 8 : recv_from_awaitable(
274 : local_datagram_socket& s, buffer_param buf,
275 : corosio::local_endpoint& source, int flags = 0) noexcept
276 8 : : s_(s), buf_(buf), source_(source), flags_(flags) {}
277 :
278 8 : std::coroutine_handle<> dispatch(
279 : std::coroutine_handle<> h, capy::executor_ref ex) const
280 : {
281 16 : return s_.get().recv_from(
282 16 : h, ex, buf_, &source_, flags_, token_, &ec_, &bytes_);
283 : }
284 : };
285 :
286 : struct connect_awaitable
287 : : detail::void_op_base<connect_awaitable>
288 : {
289 : local_datagram_socket& s_;
290 : corosio::local_endpoint endpoint_;
291 :
292 : connect_awaitable(
293 : local_datagram_socket& s,
294 : corosio::local_endpoint ep) noexcept
295 : : s_(s), endpoint_(ep) {}
296 :
297 : std::coroutine_handle<> dispatch(
298 : std::coroutine_handle<> h, capy::executor_ref ex) const
299 : {
300 : return s_.get().connect(
301 : h, ex, endpoint_, token_, &ec_);
302 : }
303 : };
304 :
305 : struct send_awaitable
306 : : detail::bytes_op_base<send_awaitable>
307 : {
308 : local_datagram_socket& s_;
309 : buffer_param buf_;
310 : int flags_;
311 :
312 8 : send_awaitable(
313 : local_datagram_socket& s, buffer_param buf,
314 : int flags = 0) noexcept
315 8 : : s_(s), buf_(buf), flags_(flags) {}
316 :
317 8 : std::coroutine_handle<> dispatch(
318 : std::coroutine_handle<> h, capy::executor_ref ex) const
319 : {
320 16 : return s_.get().send(
321 16 : h, ex, buf_, flags_, token_, &ec_, &bytes_);
322 : }
323 : };
324 :
325 : struct recv_awaitable
326 : : detail::bytes_op_base<recv_awaitable>
327 : {
328 : local_datagram_socket& s_;
329 : buffer_param buf_;
330 : int flags_;
331 :
332 10 : recv_awaitable(
333 : local_datagram_socket& s, buffer_param buf,
334 : int flags = 0) noexcept
335 10 : : s_(s), buf_(buf), flags_(flags) {}
336 :
337 10 : std::coroutine_handle<> dispatch(
338 : std::coroutine_handle<> h, capy::executor_ref ex) const
339 : {
340 20 : return s_.get().recv(
341 20 : h, ex, buf_, flags_, token_, &ec_, &bytes_);
342 : }
343 : };
344 :
345 : public:
346 : /** Destructor.
347 :
348 : Closes the socket if open, cancelling any pending operations.
349 : */
350 : ~local_datagram_socket() override;
351 :
352 : /** Construct a socket from an execution context.
353 :
354 : @param ctx The execution context that will own this socket.
355 : */
356 : explicit local_datagram_socket(capy::execution_context& ctx);
357 :
358 : /** Construct a socket from an executor.
359 :
360 : The socket is associated with the executor's context.
361 :
362 : @param ex The executor whose context will own the socket.
363 : */
364 : template<class Ex>
365 : requires(
366 : !std::same_as<std::remove_cvref_t<Ex>, local_datagram_socket>) &&
367 : capy::Executor<Ex>
368 : explicit local_datagram_socket(Ex const& ex)
369 : : local_datagram_socket(ex.context())
370 : {
371 : }
372 :
373 : /** Move constructor.
374 :
375 : Transfers ownership of the socket resources.
376 :
377 : @param other The socket to move from.
378 : */
379 14 : local_datagram_socket(local_datagram_socket&& other) noexcept
380 14 : : io_object(std::move(other))
381 : {
382 14 : }
383 :
384 : /** Move assignment operator.
385 :
386 : Closes any existing socket and transfers ownership.
387 :
388 : @param other The socket to move from.
389 : @return Reference to this socket.
390 : */
391 : local_datagram_socket& operator=(local_datagram_socket&& other) noexcept
392 : {
393 : if (this != &other)
394 : {
395 : close();
396 : io_object::operator=(std::move(other));
397 : }
398 : return *this;
399 : }
400 :
401 : local_datagram_socket(local_datagram_socket const&) = delete;
402 : local_datagram_socket& operator=(local_datagram_socket const&) = delete;
403 :
404 : /** Open the socket.
405 :
406 : Creates a Unix datagram socket and associates it with
407 : the platform reactor.
408 :
409 : @param proto The protocol. Defaults to local_datagram{}.
410 :
411 : @throws std::system_error on failure.
412 : */
413 : void open(local_datagram proto = {});
414 :
415 : /// Close the socket.
416 : void close();
417 :
418 : /// Check if the socket is open.
419 142 : bool is_open() const noexcept
420 : {
421 : #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
422 : return h_ && get().native_handle() != ~native_handle_type(0);
423 : #else
424 142 : return h_ && get().native_handle() >= 0;
425 : #endif
426 : }
427 :
428 : /** Bind the socket to a local endpoint.
429 :
430 : Associates the socket with a local address (filesystem path).
431 : Required before calling recv_from in connectionless mode.
432 :
433 : @param ep The local endpoint to bind to.
434 :
435 : @return Error code on failure, empty on success.
436 :
437 : @throws std::logic_error if the socket is not open.
438 : */
439 : std::error_code bind(corosio::local_endpoint ep);
440 :
441 : /** Initiate an asynchronous connect to set the default peer.
442 :
443 : If the socket is not already open, it is opened automatically.
444 :
445 : @param ep The remote endpoint to connect to.
446 :
447 : @return An awaitable that completes with io_result<>.
448 :
449 : @throws std::system_error if the socket needs to be opened
450 : and the open fails.
451 : */
452 : auto connect(corosio::local_endpoint ep)
453 : {
454 : if (!is_open())
455 : open();
456 : return connect_awaitable(*this, ep);
457 : }
458 :
459 : /** Send a datagram to the specified destination.
460 :
461 : @param buf The buffer containing data to send.
462 : @param dest The destination endpoint.
463 :
464 : @return An awaitable that completes with
465 : io_result<std::size_t>.
466 :
467 : @throws std::logic_error if the socket is not open.
468 : */
469 : template<capy::ConstBufferSequence Buffers>
470 6 : auto send_to(
471 : Buffers const& buf,
472 : corosio::local_endpoint dest,
473 : corosio::message_flags flags)
474 : {
475 6 : if (!is_open())
476 MIS 0 : detail::throw_logic_error("send_to: socket not open");
477 : return send_to_awaitable(
478 HIT 6 : *this, buf, dest, static_cast<int>(flags));
479 : }
480 :
481 : /// @overload
482 : template<capy::ConstBufferSequence Buffers>
483 6 : auto send_to(Buffers const& buf, corosio::local_endpoint dest)
484 : {
485 6 : return send_to(buf, dest, corosio::message_flags::none);
486 : }
487 :
488 : /** Receive a datagram and capture the sender's endpoint.
489 :
490 : @param buf The buffer to receive data into.
491 : @param source Reference to an endpoint that will be set to
492 : the sender's address on successful completion.
493 : @param flags Message flags (e.g. message_flags::peek).
494 :
495 : @return An awaitable that completes with
496 : io_result<std::size_t>.
497 :
498 : @throws std::logic_error if the socket is not open.
499 : */
500 : template<capy::MutableBufferSequence Buffers>
501 8 : auto recv_from(
502 : Buffers const& buf,
503 : corosio::local_endpoint& source,
504 : corosio::message_flags flags)
505 : {
506 8 : if (!is_open())
507 MIS 0 : detail::throw_logic_error("recv_from: socket not open");
508 : return recv_from_awaitable(
509 HIT 8 : *this, buf, source, static_cast<int>(flags));
510 : }
511 :
512 : /// @overload
513 : template<capy::MutableBufferSequence Buffers>
514 6 : auto recv_from(Buffers const& buf, corosio::local_endpoint& source)
515 : {
516 6 : return recv_from(buf, source, corosio::message_flags::none);
517 : }
518 :
519 : /** Send a datagram to the connected peer.
520 :
521 : @param buf The buffer containing data to send.
522 : @param flags Message flags.
523 :
524 : @return An awaitable that completes with
525 : io_result<std::size_t>.
526 :
527 : @throws std::logic_error if the socket is not open.
528 : */
529 : template<capy::ConstBufferSequence Buffers>
530 8 : auto send(Buffers const& buf, corosio::message_flags flags)
531 : {
532 8 : if (!is_open())
533 MIS 0 : detail::throw_logic_error("send: socket not open");
534 : return send_awaitable(
535 HIT 8 : *this, buf, static_cast<int>(flags));
536 : }
537 :
538 : /// @overload
539 : template<capy::ConstBufferSequence Buffers>
540 8 : auto send(Buffers const& buf)
541 : {
542 8 : return send(buf, corosio::message_flags::none);
543 : }
544 :
545 : /** Receive a datagram from the connected peer.
546 :
547 : @param buf The buffer to receive data into.
548 : @param flags Message flags (e.g. message_flags::peek).
549 :
550 : @return An awaitable that completes with
551 : io_result<std::size_t>.
552 :
553 : @throws std::logic_error if the socket is not open.
554 : */
555 : template<capy::MutableBufferSequence Buffers>
556 10 : auto recv(Buffers const& buf, corosio::message_flags flags)
557 : {
558 10 : if (!is_open())
559 MIS 0 : detail::throw_logic_error("recv: socket not open");
560 : return recv_awaitable(
561 HIT 10 : *this, buf, static_cast<int>(flags));
562 : }
563 :
564 : /// @overload
565 : template<capy::MutableBufferSequence Buffers>
566 8 : auto recv(Buffers const& buf)
567 : {
568 8 : return recv(buf, corosio::message_flags::none);
569 : }
570 :
571 : /** Cancel any pending asynchronous operations.
572 :
573 : All outstanding operations complete with
574 : errc::operation_canceled. Check ec == cond::canceled
575 : for portable comparison.
576 : */
577 : void cancel();
578 :
579 : /** Get the native socket handle.
580 :
581 : @return The native socket handle, or -1 if not open.
582 : */
583 : native_handle_type native_handle() const noexcept;
584 :
585 : /** Release ownership of the native socket handle.
586 :
587 : Deregisters the socket from the reactor and cancels pending
588 : operations without closing the fd. The caller takes ownership
589 : of the returned descriptor.
590 :
591 : @return The native handle, or -1 if not open.
592 :
593 : @throws std::logic_error if the socket is not open.
594 : */
595 : native_handle_type release();
596 :
597 : /** Query the number of bytes available for reading.
598 :
599 : @return The number of bytes that can be read without blocking.
600 :
601 : @throws std::logic_error if the socket is not open.
602 : @throws std::system_error on ioctl failure.
603 : */
604 : std::size_t available() const;
605 :
606 : /** Shut down part or all of the socket.
607 :
608 : @param what Which direction to shut down.
609 :
610 : @throws std::system_error on failure.
611 : */
612 : void shutdown(shutdown_type what);
613 :
614 : /** Shut down part or all of the socket (non-throwing).
615 :
616 : @param what Which direction to shut down.
617 : @param ec Set to the error code on failure.
618 : */
619 : void shutdown(shutdown_type what, std::error_code& ec) noexcept;
620 :
621 : /** Set a socket option.
622 :
623 : @param opt The option to set.
624 :
625 : @throws std::logic_error if the socket is not open.
626 : @throws std::system_error on failure.
627 : */
628 : template<class Option>
629 : void set_option(Option const& opt)
630 : {
631 : if (!is_open())
632 : detail::throw_logic_error("set_option: socket not open");
633 : std::error_code ec = get().set_option(
634 : Option::level(), Option::name(), opt.data(), opt.size());
635 : if (ec)
636 : detail::throw_system_error(
637 : ec, "local_datagram_socket::set_option");
638 : }
639 :
640 : /** Get a socket option.
641 :
642 : @return The current option value.
643 :
644 : @throws std::logic_error if the socket is not open.
645 : @throws std::system_error on failure.
646 : */
647 : template<class Option>
648 : Option get_option() const
649 : {
650 : if (!is_open())
651 : detail::throw_logic_error("get_option: socket not open");
652 : Option opt{};
653 : std::size_t sz = opt.size();
654 : std::error_code ec =
655 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
656 : if (ec)
657 : detail::throw_system_error(
658 : ec, "local_datagram_socket::get_option");
659 : opt.resize(sz);
660 : return opt;
661 : }
662 :
663 : /** Assign an existing file descriptor to this socket.
664 :
665 : The socket must not already be open. The fd is adopted
666 : and registered with the platform reactor.
667 :
668 : @param fd The file descriptor to adopt.
669 :
670 : @throws std::system_error on failure.
671 : */
672 : void assign(int fd);
673 :
674 : /** Get the local endpoint of the socket.
675 :
676 : @return The local endpoint, or a default endpoint if not bound.
677 : */
678 : corosio::local_endpoint local_endpoint() const noexcept;
679 :
680 : /** Get the remote endpoint of the socket.
681 :
682 : Returns the address of the connected peer.
683 :
684 : @return The remote endpoint, or a default endpoint if
685 : not connected.
686 : */
687 : corosio::local_endpoint remote_endpoint() const noexcept;
688 :
689 : protected:
690 : /// Default-construct (for derived types).
691 : local_datagram_socket() noexcept = default;
692 :
693 : /// Construct from a pre-built handle.
694 : explicit local_datagram_socket(handle h) noexcept
695 : : io_object(std::move(h))
696 : {
697 : }
698 :
699 : private:
700 : void open_for_family(int family, int type, int protocol);
701 :
702 158 : inline implementation& get() const noexcept
703 : {
704 158 : return *static_cast<implementation*>(h_.get());
705 : }
706 : };
707 :
708 : } // namespace boost::corosio
709 :
710 : #endif // BOOST_COROSIO_LOCAL_DATAGRAM_SOCKET_HPP
|