1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_UDP_SOCKET_HPP
10  
#ifndef BOOST_COROSIO_UDP_SOCKET_HPP
11  
#define BOOST_COROSIO_UDP_SOCKET_HPP
11  
#define BOOST_COROSIO_UDP_SOCKET_HPP
12  

12  

13  
#include <boost/corosio/detail/config.hpp>
13  
#include <boost/corosio/detail/config.hpp>
14  
#include <boost/corosio/detail/platform.hpp>
14  
#include <boost/corosio/detail/platform.hpp>
15  
#include <boost/corosio/detail/except.hpp>
15  
#include <boost/corosio/detail/except.hpp>
16  
#include <boost/corosio/detail/native_handle.hpp>
16  
#include <boost/corosio/detail/native_handle.hpp>
 
17 +
#include <boost/corosio/detail/op_base.hpp>
17  
#include <boost/corosio/io/io_object.hpp>
18  
#include <boost/corosio/io/io_object.hpp>
18  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/corosio/detail/buffer_param.hpp>
20  
#include <boost/corosio/detail/buffer_param.hpp>
20  
#include <boost/corosio/endpoint.hpp>
21  
#include <boost/corosio/endpoint.hpp>
 
22 +
#include <boost/corosio/message_flags.hpp>
21  
#include <boost/corosio/udp.hpp>
23  
#include <boost/corosio/udp.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
24  
#include <boost/capy/ex/executor_ref.hpp>
23  
#include <boost/capy/ex/execution_context.hpp>
25  
#include <boost/capy/ex/execution_context.hpp>
24  
#include <boost/capy/ex/io_env.hpp>
26  
#include <boost/capy/ex/io_env.hpp>
25  
#include <boost/capy/concept/executor.hpp>
27  
#include <boost/capy/concept/executor.hpp>
26  

28  

27  
#include <system_error>
29  
#include <system_error>
28  

30  

29  
#include <concepts>
31  
#include <concepts>
30  
#include <coroutine>
32  
#include <coroutine>
31  
#include <cstddef>
33  
#include <cstddef>
32  
#include <stop_token>
34  
#include <stop_token>
33  
#include <type_traits>
35  
#include <type_traits>
34  

36  

35  
namespace boost::corosio {
37  
namespace boost::corosio {
36  

38  

37  
/** An asynchronous UDP socket for coroutine I/O.
39  
/** An asynchronous UDP socket for coroutine I/O.
38  

40  

39  
    This class provides asynchronous UDP datagram operations that
41  
    This class provides asynchronous UDP datagram operations that
40  
    return awaitable types. Each operation participates in the affine
42  
    return awaitable types. Each operation participates in the affine
41  
    awaitable protocol, ensuring coroutines resume on the correct
43  
    awaitable protocol, ensuring coroutines resume on the correct
42  
    executor.
44  
    executor.
43  

45  

44  
    Supports two modes of operation:
46  
    Supports two modes of operation:
45  

47  

46  
    **Connectionless mode**: each `send_to` specifies a destination
48  
    **Connectionless mode**: each `send_to` specifies a destination
47  
    endpoint, and each `recv_from` captures the source endpoint.
49  
    endpoint, and each `recv_from` captures the source endpoint.
48  
    The socket must be opened (and optionally bound) before I/O.
50  
    The socket must be opened (and optionally bound) before I/O.
49  

51  

50  
    **Connected mode**: call `connect()` to set a default peer,
52  
    **Connected mode**: call `connect()` to set a default peer,
51  
    then use `send()`/`recv()` without endpoint arguments.
53  
    then use `send()`/`recv()` without endpoint arguments.
52  
    The kernel filters incoming datagrams to those from the
54  
    The kernel filters incoming datagrams to those from the
53  
    connected peer.
55  
    connected peer.
54  

56  

55  
    @par Thread Safety
57  
    @par Thread Safety
56  
    Distinct objects: Safe.@n
58  
    Distinct objects: Safe.@n
57  
    Shared objects: Unsafe. A socket must not have concurrent
59  
    Shared objects: Unsafe. A socket must not have concurrent
58  
    operations of the same type (e.g., two simultaneous recv_from).
60  
    operations of the same type (e.g., two simultaneous recv_from).
59  
    One send_to and one recv_from may be in flight simultaneously.
61  
    One send_to and one recv_from may be in flight simultaneously.
60  

62  

61  
    @par Example
63  
    @par Example
62  
    @code
64  
    @code
63  
    // Connectionless mode
65  
    // Connectionless mode
64  
    io_context ioc;
66  
    io_context ioc;
65  
    udp_socket sock( ioc );
67  
    udp_socket sock( ioc );
66  
    sock.open( udp::v4() );
68  
    sock.open( udp::v4() );
67  
    sock.bind( endpoint( ipv4_address::any(), 9000 ) );
69  
    sock.bind( endpoint( ipv4_address::any(), 9000 ) );
68  

70  

69  
    char buf[1024];
71  
    char buf[1024];
70  
    endpoint sender;
72  
    endpoint sender;
71  
    auto [ec, n] = co_await sock.recv_from(
73  
    auto [ec, n] = co_await sock.recv_from(
72  
        capy::mutable_buffer( buf, sizeof( buf ) ), sender );
74  
        capy::mutable_buffer( buf, sizeof( buf ) ), sender );
73  
    if ( !ec )
75  
    if ( !ec )
74  
        co_await sock.send_to(
76  
        co_await sock.send_to(
75  
            capy::const_buffer( buf, n ), sender );
77  
            capy::const_buffer( buf, n ), sender );
76  

78  

77  
    // Connected mode
79  
    // Connected mode
78  
    udp_socket csock( ioc );
80  
    udp_socket csock( ioc );
79  
    auto [cec] = co_await csock.connect(
81  
    auto [cec] = co_await csock.connect(
80  
        endpoint( ipv4_address::loopback(), 9000 ) );
82  
        endpoint( ipv4_address::loopback(), 9000 ) );
81  
    if ( !cec )
83  
    if ( !cec )
82  
        co_await csock.send(
84  
        co_await csock.send(
83  
            capy::const_buffer( buf, n ) );
85  
            capy::const_buffer( buf, n ) );
84  
    @endcode
86  
    @endcode
85  
*/
87  
*/
86  
class BOOST_COROSIO_DECL udp_socket : public io_object
88  
class BOOST_COROSIO_DECL udp_socket : public io_object
87  
{
89  
{
88  
public:
90  
public:
89  
    /** Define backend hooks for UDP socket operations.
91  
    /** Define backend hooks for UDP socket operations.
90  

92  

91  
        Platform backends (epoll, kqueue, select) derive from
93  
        Platform backends (epoll, kqueue, select) derive from
92  
        this to implement datagram I/O and option management.
94  
        this to implement datagram I/O and option management.
93  
    */
95  
    */
94  
    struct implementation : io_object::implementation
96  
    struct implementation : io_object::implementation
95  
    {
97  
    {
96  
        /** Initiate an asynchronous send_to operation.
98  
        /** Initiate an asynchronous send_to operation.
97  

99  

98  
            @param h Coroutine handle to resume on completion.
100  
            @param h Coroutine handle to resume on completion.
99  
            @param ex Executor for dispatching the completion.
101  
            @param ex Executor for dispatching the completion.
100  
            @param buf The buffer data to send.
102  
            @param buf The buffer data to send.
101  
            @param dest The destination endpoint.
103  
            @param dest The destination endpoint.
 
104 +
            @param flags Platform message flags (e.g. `MSG_DONTWAIT`).
102  
            @param token Stop token for cancellation.
105  
            @param token Stop token for cancellation.
103  
            @param ec Output error code.
106  
            @param ec Output error code.
104  
            @param bytes_out Output bytes transferred.
107  
            @param bytes_out Output bytes transferred.
105  

108  

106  
            @return Coroutine handle to resume immediately.
109  
            @return Coroutine handle to resume immediately.
107  
        */
110  
        */
108  
        virtual std::coroutine_handle<> send_to(
111  
        virtual std::coroutine_handle<> send_to(
109  
            std::coroutine_handle<> h,
112  
            std::coroutine_handle<> h,
110  
            capy::executor_ref ex,
113  
            capy::executor_ref ex,
111  
            buffer_param buf,
114  
            buffer_param buf,
112  
            endpoint dest,
115  
            endpoint dest,
 
116 +
            int flags,
113  
            std::stop_token token,
117  
            std::stop_token token,
114  
            std::error_code* ec,
118  
            std::error_code* ec,
115  
            std::size_t* bytes_out) = 0;
119  
            std::size_t* bytes_out) = 0;
116  

120  

117  
        /** Initiate an asynchronous recv_from operation.
121  
        /** Initiate an asynchronous recv_from operation.
118  

122  

119  
            @param h Coroutine handle to resume on completion.
123  
            @param h Coroutine handle to resume on completion.
120  
            @param ex Executor for dispatching the completion.
124  
            @param ex Executor for dispatching the completion.
121  
            @param buf The buffer to receive into.
125  
            @param buf The buffer to receive into.
122  
            @param source Output endpoint for the sender's address.
126  
            @param source Output endpoint for the sender's address.
 
127 +
            @param flags Platform message flags (e.g. `MSG_PEEK`).
123  
            @param token Stop token for cancellation.
128  
            @param token Stop token for cancellation.
124  
            @param ec Output error code.
129  
            @param ec Output error code.
125  
            @param bytes_out Output bytes transferred.
130  
            @param bytes_out Output bytes transferred.
126  

131  

127  
            @return Coroutine handle to resume immediately.
132  
            @return Coroutine handle to resume immediately.
128  
        */
133  
        */
129  
        virtual std::coroutine_handle<> recv_from(
134  
        virtual std::coroutine_handle<> recv_from(
130  
            std::coroutine_handle<> h,
135  
            std::coroutine_handle<> h,
131  
            capy::executor_ref ex,
136  
            capy::executor_ref ex,
132  
            buffer_param buf,
137  
            buffer_param buf,
133  
            endpoint* source,
138  
            endpoint* source,
 
139 +
            int flags,
134  
            std::stop_token token,
140  
            std::stop_token token,
135  
            std::error_code* ec,
141  
            std::error_code* ec,
136  
            std::size_t* bytes_out) = 0;
142  
            std::size_t* bytes_out) = 0;
137  

143  

138  
        /// Return the platform socket descriptor.
144  
        /// Return the platform socket descriptor.
139  
        virtual native_handle_type native_handle() const noexcept = 0;
145  
        virtual native_handle_type native_handle() const noexcept = 0;
140  

146  

141  
        /** Request cancellation of pending asynchronous operations.
147  
        /** Request cancellation of pending asynchronous operations.
142  

148  

143  
            All outstanding operations complete with operation_canceled
149  
            All outstanding operations complete with operation_canceled
144  
            error. Check `ec == cond::canceled` for portable comparison.
150  
            error. Check `ec == cond::canceled` for portable comparison.
145  
        */
151  
        */
146  
        virtual void cancel() noexcept = 0;
152  
        virtual void cancel() noexcept = 0;
147  

153  

148  
        /** Set a socket option.
154  
        /** Set a socket option.
149  

155  

150  
            @param level The protocol level (e.g. `SOL_SOCKET`).
156  
            @param level The protocol level (e.g. `SOL_SOCKET`).
151  
            @param optname The option name.
157  
            @param optname The option name.
152  
            @param data Pointer to the option value.
158  
            @param data Pointer to the option value.
153  
            @param size Size of the option value in bytes.
159  
            @param size Size of the option value in bytes.
154  
            @return Error code on failure, empty on success.
160  
            @return Error code on failure, empty on success.
155  
        */
161  
        */
156  
        virtual std::error_code set_option(
162  
        virtual std::error_code set_option(
157  
            int level,
163  
            int level,
158  
            int optname,
164  
            int optname,
159  
            void const* data,
165  
            void const* data,
160  
            std::size_t size) noexcept = 0;
166  
            std::size_t size) noexcept = 0;
161  

167  

162  
        /** Get a socket option.
168  
        /** Get a socket option.
163  

169  

164  
            @param level The protocol level (e.g. `SOL_SOCKET`).
170  
            @param level The protocol level (e.g. `SOL_SOCKET`).
165  
            @param optname The option name.
171  
            @param optname The option name.
166  
            @param data Pointer to receive the option value.
172  
            @param data Pointer to receive the option value.
167  
            @param size On entry, the size of the buffer. On exit,
173  
            @param size On entry, the size of the buffer. On exit,
168  
                the size of the option value.
174  
                the size of the option value.
169  
            @return Error code on failure, empty on success.
175  
            @return Error code on failure, empty on success.
170  
        */
176  
        */
171  
        virtual std::error_code
177  
        virtual std::error_code
172  
        get_option(int level, int optname, void* data, std::size_t* size)
178  
        get_option(int level, int optname, void* data, std::size_t* size)
173  
            const noexcept = 0;
179  
            const noexcept = 0;
174  

180  

175  
        /// Return the cached local endpoint.
181  
        /// Return the cached local endpoint.
176  
        virtual endpoint local_endpoint() const noexcept = 0;
182  
        virtual endpoint local_endpoint() const noexcept = 0;
177  

183  

178  
        /// Return the cached remote endpoint (connected mode).
184  
        /// Return the cached remote endpoint (connected mode).
179  
        virtual endpoint remote_endpoint() const noexcept = 0;
185  
        virtual endpoint remote_endpoint() const noexcept = 0;
180  

186  

181  
        /** Initiate an asynchronous connect to set the default peer.
187  
        /** Initiate an asynchronous connect to set the default peer.
182  

188  

183  
            @param h Coroutine handle to resume on completion.
189  
            @param h Coroutine handle to resume on completion.
184  
            @param ex Executor for dispatching the completion.
190  
            @param ex Executor for dispatching the completion.
185  
            @param ep The remote endpoint to connect to.
191  
            @param ep The remote endpoint to connect to.
186  
            @param token Stop token for cancellation.
192  
            @param token Stop token for cancellation.
187  
            @param ec Output error code.
193  
            @param ec Output error code.
188  

194  

189  
            @return Coroutine handle to resume immediately.
195  
            @return Coroutine handle to resume immediately.
190  
        */
196  
        */
191  
        virtual std::coroutine_handle<> connect(
197  
        virtual std::coroutine_handle<> connect(
192  
            std::coroutine_handle<> h,
198  
            std::coroutine_handle<> h,
193  
            capy::executor_ref ex,
199  
            capy::executor_ref ex,
194  
            endpoint ep,
200  
            endpoint ep,
195  
            std::stop_token token,
201  
            std::stop_token token,
196  
            std::error_code* ec) = 0;
202  
            std::error_code* ec) = 0;
197  

203  

198  
        /** Initiate an asynchronous connected send operation.
204  
        /** Initiate an asynchronous connected send operation.
199  

205  

200  
            @param h Coroutine handle to resume on completion.
206  
            @param h Coroutine handle to resume on completion.
201  
            @param ex Executor for dispatching the completion.
207  
            @param ex Executor for dispatching the completion.
202  
            @param buf The buffer data to send.
208  
            @param buf The buffer data to send.
 
209 +
            @param flags Platform message flags (e.g. `MSG_DONTWAIT`).
203  
            @param token Stop token for cancellation.
210  
            @param token Stop token for cancellation.
204  
            @param ec Output error code.
211  
            @param ec Output error code.
205  
            @param bytes_out Output bytes transferred.
212  
            @param bytes_out Output bytes transferred.
206  

213  

207  
            @return Coroutine handle to resume immediately.
214  
            @return Coroutine handle to resume immediately.
208  
        */
215  
        */
209  
        virtual std::coroutine_handle<> send(
216  
        virtual std::coroutine_handle<> send(
210  
            std::coroutine_handle<> h,
217  
            std::coroutine_handle<> h,
211  
            capy::executor_ref ex,
218  
            capy::executor_ref ex,
212  
            buffer_param buf,
219  
            buffer_param buf,
 
220 +
            int flags,
213  
            std::stop_token token,
221  
            std::stop_token token,
214  
            std::error_code* ec,
222  
            std::error_code* ec,
215  
            std::size_t* bytes_out) = 0;
223  
            std::size_t* bytes_out) = 0;
216  

224  

217  
        /** Initiate an asynchronous connected recv operation.
225  
        /** Initiate an asynchronous connected recv operation.
218  

226  

219  
            @param h Coroutine handle to resume on completion.
227  
            @param h Coroutine handle to resume on completion.
220  
            @param ex Executor for dispatching the completion.
228  
            @param ex Executor for dispatching the completion.
221  
            @param buf The buffer to receive into.
229  
            @param buf The buffer to receive into.
 
230 +
            @param flags Platform message flags (e.g. `MSG_PEEK`).
222  
            @param token Stop token for cancellation.
231  
            @param token Stop token for cancellation.
223  
            @param ec Output error code.
232  
            @param ec Output error code.
224  
            @param bytes_out Output bytes transferred.
233  
            @param bytes_out Output bytes transferred.
225  

234  

226  
            @return Coroutine handle to resume immediately.
235  
            @return Coroutine handle to resume immediately.
227  
        */
236  
        */
228  
        virtual std::coroutine_handle<> recv(
237  
        virtual std::coroutine_handle<> recv(
229  
            std::coroutine_handle<> h,
238  
            std::coroutine_handle<> h,
230  
            capy::executor_ref ex,
239  
            capy::executor_ref ex,
231  
            buffer_param buf,
240  
            buffer_param buf,
 
241 +
            int flags,
232  
            std::stop_token token,
242  
            std::stop_token token,
233  
            std::error_code* ec,
243  
            std::error_code* ec,
234  
            std::size_t* bytes_out) = 0;
244  
            std::size_t* bytes_out) = 0;
235  
    };
245  
    };
236  

246  

237  
    /** Represent the awaitable returned by @ref send_to.
247  
    /** Represent the awaitable returned by @ref send_to.
238  

248  

239  
        Captures the destination endpoint and buffer, then dispatches
249  
        Captures the destination endpoint and buffer, then dispatches
240  
        to the backend implementation on suspension.
250  
        to the backend implementation on suspension.
241  
    */
251  
    */
242  
    struct send_to_awaitable
252  
    struct send_to_awaitable
 
253 +
        : detail::bytes_op_base<send_to_awaitable>
243  
    {
254  
    {
244  
        udp_socket& s_;
255  
        udp_socket& s_;
245  
        buffer_param buf_;
256  
        buffer_param buf_;
246  
        endpoint dest_;
257  
        endpoint dest_;
247 -
        std::stop_token token_;
258 +
        int flags_;
248 -
        mutable std::error_code ec_;
 
249 -
        mutable std::size_t bytes_ = 0;
 
250  

259  

251  
        send_to_awaitable(
260  
        send_to_awaitable(
252 -
            udp_socket& s, buffer_param buf, endpoint dest) noexcept
261 +
            udp_socket& s, buffer_param buf,
253 -
            : s_(s)
262 +
            endpoint dest, int flags = 0) noexcept
254 -
            , buf_(buf)
263 +
            : s_(s), buf_(buf), dest_(dest), flags_(flags) {}
255 -
            , dest_(dest)
 
256 -
        {
 
257 -
        }
 
258 -

 
259 -
        bool await_ready() const noexcept
 
260 -
        {
 
261 -
            return token_.stop_requested();
 
262 -
        }
 
263 -

 
264 -
        capy::io_result<std::size_t> await_resume() const noexcept
 
265 -
        {
 
266 -
            if (token_.stop_requested())
 
267 -
                return {make_error_code(std::errc::operation_canceled), 0};
 
268 -
            return {ec_, bytes_};
 
269 -
        }
 
270  

264  

271 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
265 +
        std::coroutine_handle<> dispatch(
272 -
            -> std::coroutine_handle<>
266 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
273 -
            token_ = env->stop_token;
 
274  
        {
267  
        {
275  
            return s_.get().send_to(
268  
            return s_.get().send_to(
276 -
                h, env->executor, buf_, dest_, token_, &ec_, &bytes_);
269 +
                h, ex, buf_, dest_, flags_, token_, &ec_, &bytes_);
277  
        }
270  
        }
278  
    };
271  
    };
279  

272  

280  
    /** Represent the awaitable returned by @ref recv_from.
273  
    /** Represent the awaitable returned by @ref recv_from.
281  

274  

282 -
        Captures the receive buffer and source endpoint reference,
275 +
        Captures the source endpoint reference and buffer, then
283 -
        then dispatches to the backend implementation on suspension.
276 +
        dispatches to the backend implementation on suspension.
284  
    */
277  
    */
285  
    struct recv_from_awaitable
278  
    struct recv_from_awaitable
 
279 +
        : detail::bytes_op_base<recv_from_awaitable>
286  
    {
280  
    {
287  
        udp_socket& s_;
281  
        udp_socket& s_;
288  
        buffer_param buf_;
282  
        buffer_param buf_;
289  
        endpoint& source_;
283  
        endpoint& source_;
290 -
        std::stop_token token_;
284 +
        int flags_;
291 -
        mutable std::error_code ec_;
 
292 -
        mutable std::size_t bytes_ = 0;
 
293  

285  

294  
        recv_from_awaitable(
286  
        recv_from_awaitable(
295 -
            udp_socket& s, buffer_param buf, endpoint& source) noexcept
287 +
            udp_socket& s, buffer_param buf,
296 -
            : s_(s)
288 +
            endpoint& source, int flags = 0) noexcept
297 -
            , buf_(buf)
289 +
            : s_(s), buf_(buf), source_(source), flags_(flags) {}
298 -
            , source_(source)
 
299 -
        {
 
300 -
        }
 
301 -

 
302 -
        bool await_ready() const noexcept
 
303 -
        {
 
304 -
            return token_.stop_requested();
 
305 -
        }
 
306 -

 
307 -
        capy::io_result<std::size_t> await_resume() const noexcept
 
308 -
        {
 
309 -
            if (token_.stop_requested())
 
310 -
                return {make_error_code(std::errc::operation_canceled), 0};
 
311 -
            return {ec_, bytes_};
 
312 -
        }
 
313  

290  

314 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
291 +
        std::coroutine_handle<> dispatch(
315 -
            -> std::coroutine_handle<>
292 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
316 -
            token_ = env->stop_token;
 
317  
        {
293  
        {
318  
            return s_.get().recv_from(
294  
            return s_.get().recv_from(
319 -
                h, env->executor, buf_, &source_, token_, &ec_, &bytes_);
295 +
                h, ex, buf_, &source_, flags_, token_, &ec_, &bytes_);
320  
        }
296  
        }
321  
    };
297  
    };
322  

298  

323 -
    /** Represent the awaitable returned by @ref connect.
299 +
    /// Represent the awaitable returned by @ref connect.
324 -

 
325 -
        Captures the target endpoint, then dispatches to the backend
 
326 -
        implementation on suspension.
 
327 -
    */
 
328  
    struct connect_awaitable
300  
    struct connect_awaitable
 
301 +
        : detail::void_op_base<connect_awaitable>
329  
    {
302  
    {
330  
        udp_socket& s_;
303  
        udp_socket& s_;
331 -
        std::stop_token token_;
 
332 -
        mutable std::error_code ec_;
 
333  
        endpoint endpoint_;
304  
        endpoint endpoint_;
334  

305  

335  
        connect_awaitable(udp_socket& s, endpoint ep) noexcept
306  
        connect_awaitable(udp_socket& s, endpoint ep) noexcept
336 -
            : s_(s)
307 +
            : s_(s), endpoint_(ep) {}
337 -
            , endpoint_(ep)
 
338 -
        {
 
339 -
        }
 
340 -

 
341 -
        bool await_ready() const noexcept
 
342 -
        {
 
343 -
            return token_.stop_requested();
 
344 -
        }
 
345 -

 
346 -
        capy::io_result<> await_resume() const noexcept
 
347 -
        {
 
348 -
            if (token_.stop_requested())
 
349 -
                return {make_error_code(std::errc::operation_canceled)};
 
350 -
            return {ec_};
 
351 -
        }
 
352  

308  

353 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
309 +
        std::coroutine_handle<> dispatch(
354 -
            -> std::coroutine_handle<>
310 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
355  
        {
311  
        {
356 -
            token_ = env->stop_token;
312 +
            return s_.get().connect(h, ex, endpoint_, token_, &ec_);
357 -
            return s_.get().connect(h, env->executor, endpoint_, token_, &ec_);
 
358  
        }
313  
        }
359  
    };
314  
    };
360  

315  

361 -
    /** Represent the awaitable returned by @ref send.
316 +
    /// Represent the awaitable returned by @ref send.
362 -

 
363 -
        Captures the buffer, then dispatches to the backend
 
364 -
        implementation on suspension. No endpoint argument
 
365 -
        (uses the connected peer).
 
366 -
    */
 
367  
    struct send_awaitable
317  
    struct send_awaitable
 
318 +
        : detail::bytes_op_base<send_awaitable>
368  
    {
319  
    {
369  
        udp_socket& s_;
320  
        udp_socket& s_;
370  
        buffer_param buf_;
321  
        buffer_param buf_;
371 -
        std::stop_token token_;
322 +
        int flags_;
372 -
        mutable std::error_code ec_;
 
373 -
        mutable std::size_t bytes_ = 0;
 
374 -

 
375 -
        send_awaitable(udp_socket& s, buffer_param buf) noexcept
 
376 -
            : s_(s)
 
377 -
            , buf_(buf)
 
378 -
        {
 
379 -
        }
 
380 -

 
381 -
        bool await_ready() const noexcept
 
382 -
        {
 
383 -
            return token_.stop_requested();
 
384 -
        }
 
385  

323  

386 -
        capy::io_result<std::size_t> await_resume() const noexcept
324 +
        send_awaitable(
387 -
        {
325 +
            udp_socket& s, buffer_param buf,
388 -
            if (token_.stop_requested())
326 +
            int flags = 0) noexcept
389 -
                return {make_error_code(std::errc::operation_canceled), 0};
327 +
            : s_(s), buf_(buf), flags_(flags) {}
390 -
            return {ec_, bytes_};
 
391 -
        }
 
392  

328  

393 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
329 +
        std::coroutine_handle<> dispatch(
394 -
            -> std::coroutine_handle<>
330 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
395  
        {
331  
        {
396 -
            token_ = env->stop_token;
332 +
            return s_.get().send(
397 -
            return s_.get().send(h, env->executor, buf_, token_, &ec_, &bytes_);
333 +
                h, ex, buf_, flags_, token_, &ec_, &bytes_);
398  
        }
334  
        }
399  
    };
335  
    };
400  

336  

401 -
    /** Represent the awaitable returned by @ref recv.
337 +
    /// Represent the awaitable returned by @ref recv.
402 -

 
403 -
        Captures the receive buffer, then dispatches to the backend
 
404 -
        implementation on suspension. No source endpoint (connected
 
405 -
        mode filters at the kernel level).
 
406 -
    */
 
407  
    struct recv_awaitable
338  
    struct recv_awaitable
 
339 +
        : detail::bytes_op_base<recv_awaitable>
408  
    {
340  
    {
409  
        udp_socket& s_;
341  
        udp_socket& s_;
410  
        buffer_param buf_;
342  
        buffer_param buf_;
411 -
        std::stop_token token_;
343 +
        int flags_;
412 -
        mutable std::error_code ec_;
 
413 -
        mutable std::size_t bytes_ = 0;
 
414 -

 
415 -
        recv_awaitable(udp_socket& s, buffer_param buf) noexcept
 
416 -
            : s_(s)
 
417 -
            , buf_(buf)
 
418 -
        {
 
419 -
        }
 
420 -

 
421 -
        bool await_ready() const noexcept
 
422 -
        {
 
423 -
            return token_.stop_requested();
 
424 -
        }
 
425  

344  

426 -
        capy::io_result<std::size_t> await_resume() const noexcept
345 +
        recv_awaitable(
427 -
        {
346 +
            udp_socket& s, buffer_param buf,
428 -
            if (token_.stop_requested())
347 +
            int flags = 0) noexcept
429 -
                return {make_error_code(std::errc::operation_canceled), 0};
348 +
            : s_(s), buf_(buf), flags_(flags) {}
430 -
            return {ec_, bytes_};
 
431 -
        }
 
432  

349  

433 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
350 +
        std::coroutine_handle<> dispatch(
434 -
            -> std::coroutine_handle<>
351 +
            std::coroutine_handle<> h, capy::executor_ref ex) const
435  
        {
352  
        {
436 -
            token_ = env->stop_token;
353 +
            return s_.get().recv(
437 -
            return s_.get().recv(h, env->executor, buf_, token_, &ec_, &bytes_);
354 +
                h, ex, buf_, flags_, token_, &ec_, &bytes_);
438  
        }
355  
        }
439  
    };
356  
    };
440  

357  

441  
public:
358  
public:
442  
    /** Destructor.
359  
    /** Destructor.
443  

360  

444  
        Closes the socket if open, cancelling any pending operations.
361  
        Closes the socket if open, cancelling any pending operations.
445  
    */
362  
    */
446  
    ~udp_socket() override;
363  
    ~udp_socket() override;
447  

364  

448  
    /** Construct a socket from an execution context.
365  
    /** Construct a socket from an execution context.
449  

366  

450  
        @param ctx The execution context that will own this socket.
367  
        @param ctx The execution context that will own this socket.
451  
    */
368  
    */
452  
    explicit udp_socket(capy::execution_context& ctx);
369  
    explicit udp_socket(capy::execution_context& ctx);
453  

370  

454  
    /** Construct a socket from an executor.
371  
    /** Construct a socket from an executor.
455  

372  

456  
        The socket is associated with the executor's context.
373  
        The socket is associated with the executor's context.
457  

374  

458  
        @param ex The executor whose context will own the socket.
375  
        @param ex The executor whose context will own the socket.
459  
    */
376  
    */
460  
    template<class Ex>
377  
    template<class Ex>
461  
        requires(!std::same_as<std::remove_cvref_t<Ex>, udp_socket>) &&
378  
        requires(!std::same_as<std::remove_cvref_t<Ex>, udp_socket>) &&
462  
        capy::Executor<Ex>
379  
        capy::Executor<Ex>
463  
    explicit udp_socket(Ex const& ex) : udp_socket(ex.context())
380  
    explicit udp_socket(Ex const& ex) : udp_socket(ex.context())
464  
    {
381  
    {
465  
    }
382  
    }
466  

383  

467  
    /** Move constructor.
384  
    /** Move constructor.
468  

385  

469  
        Transfers ownership of the socket resources.
386  
        Transfers ownership of the socket resources.
470  

387  

471  
        @param other The socket to move from.
388  
        @param other The socket to move from.
472  
    */
389  
    */
473  
    udp_socket(udp_socket&& other) noexcept : io_object(std::move(other)) {}
390  
    udp_socket(udp_socket&& other) noexcept : io_object(std::move(other)) {}
474  

391  

475  
    /** Move assignment operator.
392  
    /** Move assignment operator.
476  

393  

477  
        Closes any existing socket and transfers ownership.
394  
        Closes any existing socket and transfers ownership.
478  

395  

479  
        @param other The socket to move from.
396  
        @param other The socket to move from.
480  
        @return Reference to this socket.
397  
        @return Reference to this socket.
481  
    */
398  
    */
482  
    udp_socket& operator=(udp_socket&& other) noexcept
399  
    udp_socket& operator=(udp_socket&& other) noexcept
483  
    {
400  
    {
484  
        if (this != &other)
401  
        if (this != &other)
485  
        {
402  
        {
486  
            close();
403  
            close();
487  
            h_ = std::move(other.h_);
404  
            h_ = std::move(other.h_);
488  
        }
405  
        }
489  
        return *this;
406  
        return *this;
490  
    }
407  
    }
491  

408  

492  
    udp_socket(udp_socket const&)            = delete;
409  
    udp_socket(udp_socket const&)            = delete;
493  
    udp_socket& operator=(udp_socket const&) = delete;
410  
    udp_socket& operator=(udp_socket const&) = delete;
494  

411  

495  
    /** Open the socket.
412  
    /** Open the socket.
496  

413  

497  
        Creates a UDP socket and associates it with the platform
414  
        Creates a UDP socket and associates it with the platform
498  
        reactor.
415  
        reactor.
499  

416  

500  
        @param proto The protocol (IPv4 or IPv6). Defaults to
417  
        @param proto The protocol (IPv4 or IPv6). Defaults to
501  
            `udp::v4()`.
418  
            `udp::v4()`.
502  

419  

503  
        @throws std::system_error on failure.
420  
        @throws std::system_error on failure.
504  
    */
421  
    */
505  
    void open(udp proto = udp::v4());
422  
    void open(udp proto = udp::v4());
506  

423  

507  
    /** Close the socket.
424  
    /** Close the socket.
508  

425  

509  
        Releases socket resources. Any pending operations complete
426  
        Releases socket resources. Any pending operations complete
510  
        with `errc::operation_canceled`.
427  
        with `errc::operation_canceled`.
511  
    */
428  
    */
512  
    void close();
429  
    void close();
513  

430  

514  
    /** Check if the socket is open.
431  
    /** Check if the socket is open.
515  

432  

516  
        @return `true` if the socket is open and ready for operations.
433  
        @return `true` if the socket is open and ready for operations.
517  
    */
434  
    */
518  
    bool is_open() const noexcept
435  
    bool is_open() const noexcept
519  
    {
436  
    {
520  
#if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
437  
#if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
521  
        return h_ && get().native_handle() != ~native_handle_type(0);
438  
        return h_ && get().native_handle() != ~native_handle_type(0);
522  
#else
439  
#else
523  
        return h_ && get().native_handle() >= 0;
440  
        return h_ && get().native_handle() >= 0;
524  
#endif
441  
#endif
525  
    }
442  
    }
526  

443  

527  
    /** Bind the socket to a local endpoint.
444  
    /** Bind the socket to a local endpoint.
528  

445  

529  
        Associates the socket with a local address and port.
446  
        Associates the socket with a local address and port.
530  
        Required before calling `recv_from`.
447  
        Required before calling `recv_from`.
531  

448  

532  
        @param ep The local endpoint to bind to.
449  
        @param ep The local endpoint to bind to.
533  

450  

534  
        @return Error code on failure, empty on success.
451  
        @return Error code on failure, empty on success.
535  

452  

536  
        @throws std::logic_error if the socket is not open.
453  
        @throws std::logic_error if the socket is not open.
537  
    */
454  
    */
538  
    [[nodiscard]] std::error_code bind(endpoint ep);
455  
    [[nodiscard]] std::error_code bind(endpoint ep);
539  

456  

540  
    /** Cancel any pending asynchronous operations.
457  
    /** Cancel any pending asynchronous operations.
541  

458  

542  
        All outstanding operations complete with
459  
        All outstanding operations complete with
543  
        `errc::operation_canceled`. Check `ec == cond::canceled`
460  
        `errc::operation_canceled`. Check `ec == cond::canceled`
544  
        for portable comparison.
461  
        for portable comparison.
545  
    */
462  
    */
546  
    void cancel();
463  
    void cancel();
547  

464  

548  
    /** Get the native socket handle.
465  
    /** Get the native socket handle.
549  

466  

550  
        @return The native socket handle, or -1 if not open.
467  
        @return The native socket handle, or -1 if not open.
551  
    */
468  
    */
552  
    native_handle_type native_handle() const noexcept;
469  
    native_handle_type native_handle() const noexcept;
553  

470  

554  
    /** Set a socket option.
471  
    /** Set a socket option.
555  

472  

556  
        @param opt The option to set.
473  
        @param opt The option to set.
557  

474  

558  
        @throws std::logic_error if the socket is not open.
475  
        @throws std::logic_error if the socket is not open.
559  
        @throws std::system_error on failure.
476  
        @throws std::system_error on failure.
560  
    */
477  
    */
561  
    template<class Option>
478  
    template<class Option>
562  
    void set_option(Option const& opt)
479  
    void set_option(Option const& opt)
563  
    {
480  
    {
564  
        if (!is_open())
481  
        if (!is_open())
565  
            detail::throw_logic_error("set_option: socket not open");
482  
            detail::throw_logic_error("set_option: socket not open");
566  
        std::error_code ec = get().set_option(
483  
        std::error_code ec = get().set_option(
567  
            Option::level(), Option::name(), opt.data(), opt.size());
484  
            Option::level(), Option::name(), opt.data(), opt.size());
568  
        if (ec)
485  
        if (ec)
569  
            detail::throw_system_error(ec, "udp_socket::set_option");
486  
            detail::throw_system_error(ec, "udp_socket::set_option");
570  
    }
487  
    }
571  

488  

572  
    /** Get a socket option.
489  
    /** Get a socket option.
573  

490  

574  
        @return The current option value.
491  
        @return The current option value.
575  

492  

576  
        @throws std::logic_error if the socket is not open.
493  
        @throws std::logic_error if the socket is not open.
577  
        @throws std::system_error on failure.
494  
        @throws std::system_error on failure.
578  
    */
495  
    */
579  
    template<class Option>
496  
    template<class Option>
580  
    Option get_option() const
497  
    Option get_option() const
581  
    {
498  
    {
582  
        if (!is_open())
499  
        if (!is_open())
583  
            detail::throw_logic_error("get_option: socket not open");
500  
            detail::throw_logic_error("get_option: socket not open");
584  
        Option opt{};
501  
        Option opt{};
585  
        std::size_t sz = opt.size();
502  
        std::size_t sz = opt.size();
586  
        std::error_code ec =
503  
        std::error_code ec =
587  
            get().get_option(Option::level(), Option::name(), opt.data(), &sz);
504  
            get().get_option(Option::level(), Option::name(), opt.data(), &sz);
588  
        if (ec)
505  
        if (ec)
589  
            detail::throw_system_error(ec, "udp_socket::get_option");
506  
            detail::throw_system_error(ec, "udp_socket::get_option");
590  
        opt.resize(sz);
507  
        opt.resize(sz);
591  
        return opt;
508  
        return opt;
592  
    }
509  
    }
593  

510  

594  
    /** Get the local endpoint of the socket.
511  
    /** Get the local endpoint of the socket.
595  

512  

596  
        @return The local endpoint, or a default endpoint if not bound.
513  
        @return The local endpoint, or a default endpoint if not bound.
597  
    */
514  
    */
598  
    endpoint local_endpoint() const noexcept;
515  
    endpoint local_endpoint() const noexcept;
599  

516  

600  
    /** Send a datagram to the specified destination.
517  
    /** Send a datagram to the specified destination.
601  

518  

602  
        @param buf The buffer containing data to send.
519  
        @param buf The buffer containing data to send.
603  
        @param dest The destination endpoint.
520  
        @param dest The destination endpoint.
 
521 +
        @param flags Message flags (e.g. message_flags::dont_route).
604  

522  

605  
        @return An awaitable that completes with
523  
        @return An awaitable that completes with
606  
            `io_result<std::size_t>`.
524  
            `io_result<std::size_t>`.
607  

525  

608  
        @throws std::logic_error if the socket is not open.
526  
        @throws std::logic_error if the socket is not open.
609  
    */
527  
    */
610  
    template<capy::ConstBufferSequence Buffers>
528  
    template<capy::ConstBufferSequence Buffers>
611 -
    auto send_to(Buffers const& buf, endpoint dest)
529 +
    auto send_to(
 
530 +
        Buffers const& buf,
 
531 +
        endpoint dest,
 
532 +
        corosio::message_flags flags)
612  
    {
533  
    {
613  
        if (!is_open())
534  
        if (!is_open())
614  
            detail::throw_logic_error("send_to: socket not open");
535  
            detail::throw_logic_error("send_to: socket not open");
615 -
        return send_to_awaitable(*this, buf, dest);
536 +
        return send_to_awaitable(
 
537 +
            *this, buf, dest, static_cast<int>(flags));
 
538 +
    }
 
539 +

 
540 +
    /// @overload
 
541 +
    template<capy::ConstBufferSequence Buffers>
 
542 +
    auto send_to(Buffers const& buf, endpoint dest)
 
543 +
    {
 
544 +
        return send_to(buf, dest, corosio::message_flags::none);
616  
    }
545  
    }
617  

546  

618  
    /** Receive a datagram and capture the sender's endpoint.
547  
    /** Receive a datagram and capture the sender's endpoint.
619  

548  

620  
        @param buf The buffer to receive data into.
549  
        @param buf The buffer to receive data into.
621  
        @param source Reference to an endpoint that will be set to
550  
        @param source Reference to an endpoint that will be set to
622  
            the sender's address on successful completion.
551  
            the sender's address on successful completion.
 
552 +
        @param flags Message flags (e.g. message_flags::peek).
623  

553  

624  
        @return An awaitable that completes with
554  
        @return An awaitable that completes with
625  
            `io_result<std::size_t>`.
555  
            `io_result<std::size_t>`.
626  

556  

627  
        @throws std::logic_error if the socket is not open.
557  
        @throws std::logic_error if the socket is not open.
628  
    */
558  
    */
629  
    template<capy::MutableBufferSequence Buffers>
559  
    template<capy::MutableBufferSequence Buffers>
630 -
    auto recv_from(Buffers const& buf, endpoint& source)
560 +
    auto recv_from(
 
561 +
        Buffers const& buf,
 
562 +
        endpoint& source,
 
563 +
        corosio::message_flags flags)
631  
    {
564  
    {
632  
        if (!is_open())
565  
        if (!is_open())
633  
            detail::throw_logic_error("recv_from: socket not open");
566  
            detail::throw_logic_error("recv_from: socket not open");
634 -
        return recv_from_awaitable(*this, buf, source);
567 +
        return recv_from_awaitable(
 
568 +
            *this, buf, source, static_cast<int>(flags));
 
569 +
    }
 
570 +

 
571 +
    /// @overload
 
572 +
    template<capy::MutableBufferSequence Buffers>
 
573 +
    auto recv_from(Buffers const& buf, endpoint& source)
 
574 +
    {
 
575 +
        return recv_from(buf, source, corosio::message_flags::none);
635  
    }
576  
    }
636  

577  

637  
    /** Initiate an asynchronous connect to set the default peer.
578  
    /** Initiate an asynchronous connect to set the default peer.
638  

579  

639  
        If the socket is not already open, it is opened automatically
580  
        If the socket is not already open, it is opened automatically
640  
        using the address family of @p ep.
581  
        using the address family of @p ep.
641  

582  

642  
        @param ep The remote endpoint to connect to.
583  
        @param ep The remote endpoint to connect to.
643  

584  

644  
        @return An awaitable that completes with `io_result<>`.
585  
        @return An awaitable that completes with `io_result<>`.
645  

586  

646  
        @throws std::system_error if the socket needs to be opened
587  
        @throws std::system_error if the socket needs to be opened
647  
            and the open fails.
588  
            and the open fails.
648  
    */
589  
    */
649  
    auto connect(endpoint ep)
590  
    auto connect(endpoint ep)
650  
    {
591  
    {
651  
        if (!is_open())
592  
        if (!is_open())
652  
            open(ep.is_v6() ? udp::v6() : udp::v4());
593  
            open(ep.is_v6() ? udp::v6() : udp::v4());
653  
        return connect_awaitable(*this, ep);
594  
        return connect_awaitable(*this, ep);
654  
    }
595  
    }
655  

596  

656  
    /** Send a datagram to the connected peer.
597  
    /** Send a datagram to the connected peer.
657  

598  

658  
        @param buf The buffer containing data to send.
599  
        @param buf The buffer containing data to send.
 
600 +
        @param flags Message flags.
659  

601  

660  
        @return An awaitable that completes with
602  
        @return An awaitable that completes with
661  
            `io_result<std::size_t>`.
603  
            `io_result<std::size_t>`.
662  

604  

663  
        @throws std::logic_error if the socket is not open.
605  
        @throws std::logic_error if the socket is not open.
664  
    */
606  
    */
665  
    template<capy::ConstBufferSequence Buffers>
607  
    template<capy::ConstBufferSequence Buffers>
666 -
    auto send(Buffers const& buf)
608 +
    auto send(Buffers const& buf, corosio::message_flags flags)
667  
    {
609  
    {
668  
        if (!is_open())
610  
        if (!is_open())
669  
            detail::throw_logic_error("send: socket not open");
611  
            detail::throw_logic_error("send: socket not open");
670 -
        return send_awaitable(*this, buf);
612 +
        return send_awaitable(
 
613 +
            *this, buf, static_cast<int>(flags));
 
614 +
    }
 
615 +

 
616 +
    /// @overload
 
617 +
    template<capy::ConstBufferSequence Buffers>
 
618 +
    auto send(Buffers const& buf)
 
619 +
    {
 
620 +
        return send(buf, corosio::message_flags::none);
671  
    }
621  
    }
672  

622  

673  
    /** Receive a datagram from the connected peer.
623  
    /** Receive a datagram from the connected peer.
674  

624  

675  
        @param buf The buffer to receive data into.
625  
        @param buf The buffer to receive data into.
 
626 +
        @param flags Message flags (e.g. message_flags::peek).
676  

627  

677  
        @return An awaitable that completes with
628  
        @return An awaitable that completes with
678  
            `io_result<std::size_t>`.
629  
            `io_result<std::size_t>`.
679  

630  

680  
        @throws std::logic_error if the socket is not open.
631  
        @throws std::logic_error if the socket is not open.
681  
    */
632  
    */
682  
    template<capy::MutableBufferSequence Buffers>
633  
    template<capy::MutableBufferSequence Buffers>
683 -
    auto recv(Buffers const& buf)
634 +
    auto recv(Buffers const& buf, corosio::message_flags flags)
684  
    {
635  
    {
685  
        if (!is_open())
636  
        if (!is_open())
686  
            detail::throw_logic_error("recv: socket not open");
637  
            detail::throw_logic_error("recv: socket not open");
687 -
        return recv_awaitable(*this, buf);
638 +
        return recv_awaitable(
 
639 +
            *this, buf, static_cast<int>(flags));
 
640 +
    }
 
641 +

 
642 +
    /// @overload
 
643 +
    template<capy::MutableBufferSequence Buffers>
 
644 +
    auto recv(Buffers const& buf)
 
645 +
    {
 
646 +
        return recv(buf, corosio::message_flags::none);
688  
    }
647  
    }
689  

648  

690  
    /** Get the remote endpoint of the socket.
649  
    /** Get the remote endpoint of the socket.
691  

650  

692  
        Returns the address and port of the connected peer.
651  
        Returns the address and port of the connected peer.
693  

652  

694  
        @return The remote endpoint, or a default endpoint if
653  
        @return The remote endpoint, or a default endpoint if
695  
            not connected.
654  
            not connected.
696  
    */
655  
    */
697  
    endpoint remote_endpoint() const noexcept;
656  
    endpoint remote_endpoint() const noexcept;
698  

657  

699  
protected:
658  
protected:
700  
    /// Construct from a pre-built handle (for native_udp_socket).
659  
    /// Construct from a pre-built handle (for native_udp_socket).
701  
    explicit udp_socket(io_object::handle h) noexcept : io_object(std::move(h))
660  
    explicit udp_socket(io_object::handle h) noexcept : io_object(std::move(h))
702  
    {
661  
    {
703  
    }
662  
    }
704  

663  

705  
private:
664  
private:
706  
    /// Open the socket for the given protocol triple.
665  
    /// Open the socket for the given protocol triple.
707  
    void open_for_family(int family, int type, int protocol);
666  
    void open_for_family(int family, int type, int protocol);
708  

667  

709  
    inline implementation& get() const noexcept
668  
    inline implementation& get() const noexcept
710  
    {
669  
    {
711  
        return *static_cast<implementation*>(h_.get());
670  
        return *static_cast<implementation*>(h_.get());
712  
    }
671  
    }
713  
};
672  
};
714  

673  

715  
} // namespace boost::corosio
674  
} // namespace boost::corosio
716  

675  

717  
#endif // BOOST_COROSIO_UDP_SOCKET_HPP
676  
#endif // BOOST_COROSIO_UDP_SOCKET_HPP