include/boost/corosio/native/detail/epoll/epoll_tcp_service.hpp

80.0% Lines (48/60) 86.7% List of functions (13/15)
epoll_tcp_service.hpp
f(x) Functions (15)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_service::epoll_tcp_service(boost::capy::execution_context&) :99 356x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::cancel() :115 0 0.0% 0.0% boost::corosio::detail::epoll_read_op::cancel() :124 99x 80.0% 75.0% boost::corosio::detail::epoll_write_op::cancel() :133 0 0.0% 0.0% boost::corosio::detail::epoll_op::operator()() :142 73078x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::operator()() :148 4003x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::epoll_tcp_socket(boost::corosio::detail::epoll_tcp_service&) :153 12072x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::~epoll_tcp_socket() :158 12072x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :161 4003x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :172 182433x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :184 182280x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::cancel() :196 97x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::close_socket() :202 36191x 100.0% 100.0% boost::corosio::detail::epoll_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :208 4023x 94.4% 94.0% boost::corosio::detail::epoll_tcp_service::bind_socket(boost::corosio::tcp_socket::implementation&, boost::corosio::endpoint) :240 6x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/tcp_service.hpp>
19
20 #include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23
24 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25
26 #include <coroutine>
27
28 #include <errno.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <sys/epoll.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 /*
36 epoll Socket Implementation
37 ===========================
38
39 Each I/O operation follows the same pattern:
40 1. Try the syscall immediately (non-blocking socket)
41 2. If it succeeds or fails with a real error, post to completion queue
42 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43
44 This "try first" approach avoids unnecessary epoll round-trips for
45 operations that can complete immediately (common for small reads/writes
46 on fast local connections).
47
48 One-Shot Registration
49 ---------------------
50 We use one-shot epoll registration: each operation registers, waits for
51 one event, then unregisters. This simplifies the state machine since we
52 don't need to track whether an fd is currently registered or handle
53 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54 simplicity is worth it.
55
56 Cancellation
57 ------------
58 See op.hpp for the completion/cancellation race handling via the
59 `registered` atomic. cancel() must complete pending operations (post
60 them with cancelled flag) so coroutines waiting on them can resume.
61 close_socket() calls cancel() first to ensure this.
62
63 Impl Lifetime with shared_ptr
64 -----------------------------
65 Socket impls use enable_shared_from_this. The service owns impls via
66 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67 removal. When a user calls close(), we call cancel() which posts pending
68 ops to the scheduler.
69
70 CRITICAL: The posted ops must keep the impl alive until they complete.
71 Otherwise the scheduler would process a freed op (use-after-free). The
72 cancel() method captures shared_from_this() into op.impl_ptr before
73 posting. When the op completes, impl_ptr is cleared, allowing the impl
74 to be destroyed if no other references exist.
75
76 Service Ownership
77 -----------------
78 epoll_tcp_service owns all socket impls. destroy_impl() removes the
79 shared_ptr from the map, but the impl may survive if ops still hold
80 impl_ptr refs. shutdown() closes all sockets and clears the map; any
81 in-flight ops will complete and release their refs.
82 */
83
84 namespace boost::corosio::detail {
85
86 /** epoll TCP service implementation.
87
88 Inherits from tcp_service to enable runtime polymorphism.
89 Uses key_type = tcp_service for service lookup.
90 */
91 class BOOST_COROSIO_DECL epoll_tcp_service final
92 : public reactor_socket_service<
93 epoll_tcp_service,
94 tcp_service,
95 epoll_scheduler,
96 epoll_tcp_socket>
97 {
98 public:
99 356x explicit epoll_tcp_service(capy::execution_context& ctx)
100 356x : reactor_socket_service(ctx)
101 {
102 356x }
103
104 std::error_code open_socket(
105 tcp_socket::implementation& impl,
106 int family,
107 int type,
108 int protocol) override;
109
110 std::error_code
111 bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
112 };
113
114 inline void
115 epoll_connect_op::cancel() noexcept
116 {
117 if (socket_impl_)
118 socket_impl_->cancel_single_op(*this);
119 else
120 request_cancel();
121 }
122
123 inline void
124 99x epoll_read_op::cancel() noexcept
125 {
126 99x if (socket_impl_)
127 99x socket_impl_->cancel_single_op(*this);
128 else
129 request_cancel();
130 99x }
131
132 inline void
133 epoll_write_op::cancel() noexcept
134 {
135 if (socket_impl_)
136 socket_impl_->cancel_single_op(*this);
137 else
138 request_cancel();
139 }
140
141 inline void
142 73078x epoll_op::operator()()
143 {
144 73078x complete_io_op(*this);
145 73078x }
146
147 inline void
148 4003x epoll_connect_op::operator()()
149 {
150 4003x complete_connect_op(*this);
151 4003x }
152
153 12072x inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
154 12072x : reactor_stream_socket(svc)
155 {
156 12072x }
157
158 12072x inline epoll_tcp_socket::~epoll_tcp_socket() = default;
159
160 inline std::coroutine_handle<>
161 4003x epoll_tcp_socket::connect(
162 std::coroutine_handle<> h,
163 capy::executor_ref ex,
164 endpoint ep,
165 std::stop_token token,
166 std::error_code* ec)
167 {
168 4003x return do_connect(h, ex, ep, token, ec);
169 }
170
171 inline std::coroutine_handle<>
172 182433x epoll_tcp_socket::read_some(
173 std::coroutine_handle<> h,
174 capy::executor_ref ex,
175 buffer_param param,
176 std::stop_token token,
177 std::error_code* ec,
178 std::size_t* bytes_out)
179 {
180 182433x return do_read_some(h, ex, param, token, ec, bytes_out);
181 }
182
183 inline std::coroutine_handle<>
184 182280x epoll_tcp_socket::write_some(
185 std::coroutine_handle<> h,
186 capy::executor_ref ex,
187 buffer_param param,
188 std::stop_token token,
189 std::error_code* ec,
190 std::size_t* bytes_out)
191 {
192 182280x return do_write_some(h, ex, param, token, ec, bytes_out);
193 }
194
195 inline void
196 97x epoll_tcp_socket::cancel() noexcept
197 {
198 97x do_cancel();
199 97x }
200
201 inline void
202 36191x epoll_tcp_socket::close_socket() noexcept
203 {
204 36191x do_close_socket();
205 36191x }
206
207 inline std::error_code
208 4023x epoll_tcp_service::open_socket(
209 tcp_socket::implementation& impl, int family, int type, int protocol)
210 {
211 4023x auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
212 4023x epoll_impl->close_socket();
213
214 4023x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
215 4023x if (fd < 0)
216 return make_err(errno);
217
218 4023x if (family == AF_INET6)
219 {
220 6x int one = 1;
221 6x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
222 }
223
224 4023x epoll_impl->fd_ = fd;
225
226 // Register fd with epoll (edge-triggered mode)
227 4023x epoll_impl->desc_state_.fd = fd;
228 {
229 4023x std::lock_guard lock(epoll_impl->desc_state_.mutex);
230 4023x epoll_impl->desc_state_.read_op = nullptr;
231 4023x epoll_impl->desc_state_.write_op = nullptr;
232 4023x epoll_impl->desc_state_.connect_op = nullptr;
233 4023x }
234 4023x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
235
236 4023x return {};
237 }
238
239 inline std::error_code
240 6x epoll_tcp_service::bind_socket(
241 tcp_socket::implementation& impl, endpoint ep)
242 {
243 6x return static_cast<epoll_tcp_socket*>(&impl)->do_bind(ep);
244 }
245
246 } // namespace boost::corosio::detail
247
248 #endif // BOOST_COROSIO_HAS_EPOLL
249
250 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
251