include/boost/corosio/native/detail/select/select_local_stream_service.hpp

67.0% Lines (63/94) 75.0% List of functions (12/16)
select_local_stream_service.hpp
f(x) Functions (16)
Function Calls Lines Blocks
boost::corosio::detail::select_local_stream_service::select_local_stream_service(boost::capy::execution_context&) :54 229x 100.0% 100.0% boost::corosio::detail::select_local_connect_op::cancel() :73 0 0.0% 0.0% boost::corosio::detail::select_local_read_op::cancel() :82 0 0.0% 0.0% boost::corosio::detail::select_local_write_op::cancel() :91 0 0.0% 0.0% boost::corosio::detail::select_local_stream_op::operator()() :100 3x 100.0% 100.0% boost::corosio::detail::select_local_connect_op::operator()() :106 2x 100.0% 100.0% boost::corosio::detail::select_local_stream_socket::select_local_stream_socket(boost::corosio::detail::select_local_stream_service&) :113 17x 100.0% 100.0% boost::corosio::detail::select_local_stream_socket::~select_local_stream_socket() :119 17x 100.0% 100.0% boost::corosio::detail::select_local_stream_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::local_endpoint, std::stop_token, std::error_code*) :122 2x 100.0% 100.0% boost::corosio::detail::select_local_stream_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :137 1x 100.0% 100.0% boost::corosio::detail::select_local_stream_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :149 2x 100.0% 100.0% boost::corosio::detail::select_local_stream_socket::cancel() :165 0 0.0% 0.0% boost::corosio::detail::select_local_stream_socket::close_socket() :171 59x 100.0% 100.0% boost::corosio::detail::select_local_stream_socket::release_socket() :177 1x 100.0% 100.0% boost::corosio::detail::select_local_stream_service::open_socket(boost::corosio::local_stream_socket::implementation&, int, int, int) :185 4x 61.3% 68.0% boost::corosio::detail::select_local_stream_service::assign_socket(boost::corosio::local_stream_socket::implementation&, int) :246 8x 92.9% 75.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/local_stream_service.hpp>
19
20 #include <boost/corosio/native/detail/select/select_local_stream_socket.hpp>
21 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23
24 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25
26 #include <coroutine>
27 #include <mutex>
28
29 #include <errno.h>
30 #include <fcntl.h>
31 #include <sys/select.h>
32 #include <sys/socket.h>
33 #include <sys/un.h>
34 #include <unistd.h>
35
36 /*
37 Each I/O op tries the syscall speculatively; only registers with
38 the reactor on EAGAIN. Fd is registered once at open time and
39 stays registered until close. The reactor only marks ready_events_;
40 actual I/O happens in invoke_deferred_io(). cancel() captures
41 shared_from_this() into op.impl_ptr to keep the impl alive.
42 */
43
44 namespace boost::corosio::detail {
45
46 class BOOST_COROSIO_DECL select_local_stream_service final
47 : public reactor_socket_service<
48 select_local_stream_service,
49 local_stream_service,
50 select_scheduler,
51 select_local_stream_socket>
52 {
53 public:
54 229x explicit select_local_stream_service(capy::execution_context& ctx)
55 229x : reactor_socket_service(ctx)
56 {
57 229x }
58
59 std::error_code open_socket(
60 local_stream_socket::implementation& impl,
61 int family,
62 int type,
63 int protocol) override;
64
65 std::error_code assign_socket(
66 local_stream_socket::implementation& impl,
67 native_handle_type fd) override;
68 };
69
70 // Op implementations
71
72 inline void
73 select_local_connect_op::cancel() noexcept
74 {
75 if (socket_impl_)
76 socket_impl_->cancel_single_op(*this);
77 else
78 request_cancel();
79 }
80
81 inline void
82 select_local_read_op::cancel() noexcept
83 {
84 if (socket_impl_)
85 socket_impl_->cancel_single_op(*this);
86 else
87 request_cancel();
88 }
89
90 inline void
91 select_local_write_op::cancel() noexcept
92 {
93 if (socket_impl_)
94 socket_impl_->cancel_single_op(*this);
95 else
96 request_cancel();
97 }
98
99 inline void
100 3x select_local_stream_op::operator()()
101 {
102 3x complete_io_op(*this);
103 3x }
104
105 inline void
106 2x select_local_connect_op::operator()()
107 {
108 2x complete_connect_op(*this);
109 2x }
110
111 // Socket implementations
112
113 17x inline select_local_stream_socket::select_local_stream_socket(
114 17x select_local_stream_service& svc) noexcept
115 17x : reactor_stream_socket(svc)
116 {
117 17x }
118
119 17x inline select_local_stream_socket::~select_local_stream_socket() = default;
120
121 inline std::coroutine_handle<>
122 2x select_local_stream_socket::connect(
123 std::coroutine_handle<> h,
124 capy::executor_ref ex,
125 corosio::local_endpoint ep,
126 std::stop_token token,
127 std::error_code* ec)
128 {
129 2x auto result = do_connect(h, ex, ep, token, ec);
130 // Rebuild fd_sets so select() watches for writability
131 2x if (result == std::noop_coroutine())
132 2x svc_.scheduler().notify_reactor();
133 2x return result;
134 }
135
136 inline std::coroutine_handle<>
137 1x select_local_stream_socket::read_some(
138 std::coroutine_handle<> h,
139 capy::executor_ref ex,
140 buffer_param param,
141 std::stop_token token,
142 std::error_code* ec,
143 std::size_t* bytes_out)
144 {
145 1x return do_read_some(h, ex, param, token, ec, bytes_out);
146 }
147
148 inline std::coroutine_handle<>
149 2x select_local_stream_socket::write_some(
150 std::coroutine_handle<> h,
151 capy::executor_ref ex,
152 buffer_param param,
153 std::stop_token token,
154 std::error_code* ec,
155 std::size_t* bytes_out)
156 {
157 2x auto result = do_write_some(h, ex, param, token, ec, bytes_out);
158 // Rebuild fd_sets so select() watches for writability
159 2x if (result == std::noop_coroutine())
160 2x svc_.scheduler().notify_reactor();
161 2x return result;
162 }
163
164 inline void
165 select_local_stream_socket::cancel() noexcept
166 {
167 do_cancel();
168 }
169
170 inline void
171 59x select_local_stream_socket::close_socket() noexcept
172 {
173 59x do_close_socket();
174 59x }
175
176 inline native_handle_type
177 1x select_local_stream_socket::release_socket() noexcept
178 {
179 1x return this->do_release_socket();
180 }
181
182 // Service implementations
183
184 inline std::error_code
185 4x select_local_stream_service::open_socket(
186 local_stream_socket::implementation& impl,
187 int family,
188 int type,
189 int protocol)
190 {
191 4x auto* select_impl = static_cast<select_local_stream_socket*>(&impl);
192 4x select_impl->close_socket();
193
194 4x int fd = ::socket(family, type, protocol);
195 4x if (fd < 0)
196 return make_err(errno);
197
198 4x int flags = ::fcntl(fd, F_GETFL, 0);
199 4x if (flags == -1)
200 {
201 int errn = errno;
202 ::close(fd);
203 return make_err(errn);
204 }
205 4x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
206 {
207 int errn = errno;
208 ::close(fd);
209 return make_err(errn);
210 }
211 4x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
212 {
213 int errn = errno;
214 ::close(fd);
215 return make_err(errn);
216 }
217
218 #ifdef SO_NOSIGPIPE
219 {
220 int one = 1;
221 ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
222 }
223 #endif
224
225 4x if (fd >= FD_SETSIZE)
226 {
227 ::close(fd);
228 return make_err(EMFILE);
229 }
230
231 4x select_impl->fd_ = fd;
232
233 4x select_impl->desc_state_.fd = fd;
234 {
235 4x std::lock_guard lock(select_impl->desc_state_.mutex);
236 4x select_impl->desc_state_.read_op = nullptr;
237 4x select_impl->desc_state_.write_op = nullptr;
238 4x select_impl->desc_state_.connect_op = nullptr;
239 4x }
240 4x scheduler().register_descriptor(fd, &select_impl->desc_state_);
241
242 4x return {};
243 }
244
245 inline std::error_code
246 8x select_local_stream_service::assign_socket(
247 local_stream_socket::implementation& impl,
248 native_handle_type fd)
249 {
250 8x if (fd < 0 || fd >= FD_SETSIZE)
251 return make_err(fd < 0 ? EBADF : EMFILE);
252
253 8x auto* select_impl = static_cast<select_local_stream_socket*>(&impl);
254 8x select_impl->close_socket();
255
256 8x select_impl->fd_ = fd;
257
258 8x select_impl->desc_state_.fd = fd;
259 {
260 8x std::lock_guard lock(select_impl->desc_state_.mutex);
261 8x select_impl->desc_state_.read_op = nullptr;
262 8x select_impl->desc_state_.write_op = nullptr;
263 8x select_impl->desc_state_.connect_op = nullptr;
264 8x }
265 8x scheduler().register_descriptor(fd, &select_impl->desc_state_);
266
267 8x return {};
268 }
269
270 } // namespace boost::corosio::detail
271
272 #endif // BOOST_COROSIO_HAS_SELECT
273
274 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_SERVICE_HPP
275