TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/detail/local_stream_service.hpp>
19 :
20 : #include <boost/corosio/native/detail/select/select_local_stream_socket.hpp>
21 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
22 : #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23 :
24 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25 :
26 : #include <coroutine>
27 : #include <mutex>
28 :
29 : #include <errno.h>
30 : #include <fcntl.h>
31 : #include <sys/select.h>
32 : #include <sys/socket.h>
33 : #include <sys/un.h>
34 : #include <unistd.h>
35 :
36 : /*
37 : Each I/O op tries the syscall speculatively; only registers with
38 : the reactor on EAGAIN. Fd is registered once at open time and
39 : stays registered until close. The reactor only marks ready_events_;
40 : actual I/O happens in invoke_deferred_io(). cancel() captures
41 : shared_from_this() into op.impl_ptr to keep the impl alive.
42 : */
43 :
44 : namespace boost::corosio::detail {
45 :
46 : class BOOST_COROSIO_DECL select_local_stream_service final
47 : : public reactor_socket_service<
48 : select_local_stream_service,
49 : local_stream_service,
50 : select_scheduler,
51 : select_local_stream_socket>
52 : {
53 : public:
54 HIT 228 : explicit select_local_stream_service(capy::execution_context& ctx)
55 228 : : reactor_socket_service(ctx)
56 : {
57 228 : }
58 :
59 : std::error_code open_socket(
60 : local_stream_socket::implementation& impl,
61 : int family,
62 : int type,
63 : int protocol) override;
64 :
65 : std::error_code assign_socket(
66 : local_stream_socket::implementation& impl,
67 : int fd) override;
68 : };
69 :
70 : // Op implementations
71 :
72 : inline void
73 MIS 0 : select_local_connect_op::cancel() noexcept
74 : {
75 0 : if (socket_impl_)
76 0 : socket_impl_->cancel_single_op(*this);
77 : else
78 0 : request_cancel();
79 0 : }
80 :
81 : inline void
82 0 : select_local_read_op::cancel() noexcept
83 : {
84 0 : if (socket_impl_)
85 0 : socket_impl_->cancel_single_op(*this);
86 : else
87 0 : request_cancel();
88 0 : }
89 :
90 : inline void
91 0 : select_local_write_op::cancel() noexcept
92 : {
93 0 : if (socket_impl_)
94 0 : socket_impl_->cancel_single_op(*this);
95 : else
96 0 : request_cancel();
97 0 : }
98 :
99 : inline void
100 HIT 3 : select_local_stream_op::operator()()
101 : {
102 3 : complete_io_op(*this);
103 3 : }
104 :
105 : inline void
106 2 : select_local_connect_op::operator()()
107 : {
108 2 : complete_connect_op(*this);
109 2 : }
110 :
111 : // Socket implementations
112 :
113 17 : inline select_local_stream_socket::select_local_stream_socket(
114 17 : select_local_stream_service& svc) noexcept
115 17 : : reactor_stream_socket(svc)
116 : {
117 17 : }
118 :
119 17 : inline select_local_stream_socket::~select_local_stream_socket() = default;
120 :
121 : inline std::coroutine_handle<>
122 2 : select_local_stream_socket::connect(
123 : std::coroutine_handle<> h,
124 : capy::executor_ref ex,
125 : corosio::local_endpoint ep,
126 : std::stop_token token,
127 : std::error_code* ec)
128 : {
129 2 : auto result = do_connect(h, ex, ep, token, ec);
130 : // Rebuild fd_sets so select() watches for writability
131 2 : if (result == std::noop_coroutine())
132 2 : svc_.scheduler().notify_reactor();
133 2 : return result;
134 : }
135 :
136 : inline std::coroutine_handle<>
137 1 : select_local_stream_socket::read_some(
138 : std::coroutine_handle<> h,
139 : capy::executor_ref ex,
140 : buffer_param param,
141 : std::stop_token token,
142 : std::error_code* ec,
143 : std::size_t* bytes_out)
144 : {
145 1 : return do_read_some(h, ex, param, token, ec, bytes_out);
146 : }
147 :
148 : inline std::coroutine_handle<>
149 2 : select_local_stream_socket::write_some(
150 : std::coroutine_handle<> h,
151 : capy::executor_ref ex,
152 : buffer_param param,
153 : std::stop_token token,
154 : std::error_code* ec,
155 : std::size_t* bytes_out)
156 : {
157 2 : auto result = do_write_some(h, ex, param, token, ec, bytes_out);
158 : // Rebuild fd_sets so select() watches for writability
159 2 : if (result == std::noop_coroutine())
160 2 : svc_.scheduler().notify_reactor();
161 2 : return result;
162 : }
163 :
164 : inline void
165 MIS 0 : select_local_stream_socket::cancel() noexcept
166 : {
167 0 : do_cancel();
168 0 : }
169 :
170 : inline void
171 HIT 59 : select_local_stream_socket::close_socket() noexcept
172 : {
173 59 : do_close_socket();
174 59 : }
175 :
176 : inline native_handle_type
177 1 : select_local_stream_socket::release_socket() noexcept
178 : {
179 1 : return this->do_release_socket();
180 : }
181 :
182 : // Service implementations
183 :
184 : inline std::error_code
185 4 : select_local_stream_service::open_socket(
186 : local_stream_socket::implementation& impl,
187 : int family,
188 : int type,
189 : int protocol)
190 : {
191 4 : auto* select_impl = static_cast<select_local_stream_socket*>(&impl);
192 4 : select_impl->close_socket();
193 :
194 4 : int fd = ::socket(family, type, protocol);
195 4 : if (fd < 0)
196 MIS 0 : return make_err(errno);
197 :
198 HIT 4 : int flags = ::fcntl(fd, F_GETFL, 0);
199 4 : if (flags == -1)
200 : {
201 MIS 0 : int errn = errno;
202 0 : ::close(fd);
203 0 : return make_err(errn);
204 : }
205 HIT 4 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
206 : {
207 MIS 0 : int errn = errno;
208 0 : ::close(fd);
209 0 : return make_err(errn);
210 : }
211 HIT 4 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
212 : {
213 MIS 0 : int errn = errno;
214 0 : ::close(fd);
215 0 : return make_err(errn);
216 : }
217 :
218 HIT 4 : if (fd >= FD_SETSIZE)
219 : {
220 MIS 0 : ::close(fd);
221 0 : return make_err(EMFILE);
222 : }
223 :
224 HIT 4 : select_impl->fd_ = fd;
225 :
226 4 : select_impl->desc_state_.fd = fd;
227 : {
228 4 : std::lock_guard lock(select_impl->desc_state_.mutex);
229 4 : select_impl->desc_state_.read_op = nullptr;
230 4 : select_impl->desc_state_.write_op = nullptr;
231 4 : select_impl->desc_state_.connect_op = nullptr;
232 4 : }
233 4 : scheduler().register_descriptor(fd, &select_impl->desc_state_);
234 :
235 4 : return {};
236 : }
237 :
238 : inline std::error_code
239 8 : select_local_stream_service::assign_socket(
240 : local_stream_socket::implementation& impl,
241 : int fd)
242 : {
243 8 : if (fd < 0 || fd >= FD_SETSIZE)
244 MIS 0 : return make_err(fd < 0 ? EBADF : EMFILE);
245 :
246 HIT 8 : auto* select_impl = static_cast<select_local_stream_socket*>(&impl);
247 8 : select_impl->close_socket();
248 :
249 8 : select_impl->fd_ = fd;
250 :
251 8 : select_impl->desc_state_.fd = fd;
252 : {
253 8 : std::lock_guard lock(select_impl->desc_state_.mutex);
254 8 : select_impl->desc_state_.read_op = nullptr;
255 8 : select_impl->desc_state_.write_op = nullptr;
256 8 : select_impl->desc_state_.connect_op = nullptr;
257 8 : }
258 8 : scheduler().register_descriptor(fd, &select_impl->desc_state_);
259 :
260 8 : return {};
261 : }
262 :
263 : } // namespace boost::corosio::detail
264 :
265 : #endif // BOOST_COROSIO_HAS_SELECT
266 :
267 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_LOCAL_STREAM_SERVICE_HPP
|