1use std::{
20 io::ErrorKind,
21 sync::{
22 atomic::{AtomicUsize, Ordering::SeqCst},
23 Arc,
24 },
25};
26
27use tracing::warn;
28use url::Url;
29
30#[cfg(feature = "upnp-igd")]
31use smol::lock::Mutex as AsyncMutex;
32
33use super::{
34 channel::{Channel, ChannelPtr},
35 hosts::HostColor,
36 session::SessionWeakPtr,
37 transport::{Listener, PtListener},
38};
39
40#[cfg(feature = "upnp-igd")]
41use super::upnp::{setup_port_mappings, PortMapping};
42
43use crate::{
44 system::{
45 CondVar, ExecutorPtr, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr,
46 Subscription,
47 },
48 util::logger::verbose,
49 Error, Result,
50};
51
52pub type AcceptorPtr = Arc<Acceptor>;
54
55pub struct Acceptor {
57 channel_publisher: PublisherPtr<Result<ChannelPtr>>,
58 task: StoppableTaskPtr,
59 session: SessionWeakPtr,
60 conn_count: AtomicUsize,
61 #[cfg(feature = "upnp-igd")]
62 port_mappings: AsyncMutex<Vec<Arc<dyn PortMapping>>>,
63}
64
65impl Acceptor {
66 pub fn new(session: SessionWeakPtr) -> AcceptorPtr {
68 Arc::new(Self {
69 channel_publisher: Publisher::new(),
70 task: StoppableTask::new(),
71 session,
72 conn_count: AtomicUsize::new(0),
73 #[cfg(feature = "upnp-igd")]
74 port_mappings: AsyncMutex::new(Vec::new()),
75 })
76 }
77
78 pub async fn start(self: Arc<Self>, endpoint: Url, ex: ExecutorPtr) -> Result<()> {
80 let datastore =
81 self.session.upgrade().unwrap().p2p().settings().read().await.p2p_datastore.clone();
82
83 let listener = Listener::new(endpoint.clone(), datastore).await?;
85
86 let ptlistener = listener.listen().await?;
88
89 #[cfg(feature = "p2p-tor")]
90 if endpoint.scheme() == "tor" {
91 let onion_addr = listener.endpoint().await;
92 verbose!("[P2P] Adding {onion_addr} to external_addrs");
93 self.session
94 .upgrade()
95 .unwrap()
96 .p2p()
97 .settings()
98 .write()
99 .await
100 .external_addrs
101 .push(onion_addr);
102 }
103
104 #[cfg(feature = "upnp-igd")]
105 {
106 let actual_endpoint = listener.endpoint().await;
107 let settings = self.session.upgrade().unwrap().p2p().settings();
108 let mappings = setup_port_mappings(&actual_endpoint, settings, ex.clone());
109 self.port_mappings.lock().await.extend(mappings);
110 }
111
112 self.accept(ptlistener, ex);
113 Ok(())
114 }
115
116 pub async fn stop(&self) {
118 #[cfg(feature = "upnp-igd")]
120 {
121 let mappings = std::mem::take(&mut *self.port_mappings.lock().await);
122 for mapping in mappings {
123 mapping.stop();
124 }
125 }
126
127 self.task.stop().await;
129 }
130
131 pub async fn subscribe(self: Arc<Self>) -> Subscription<Result<ChannelPtr>> {
133 self.channel_publisher.clone().subscribe().await
134 }
135
136 fn accept(self: Arc<Self>, listener: Box<dyn PtListener>, ex: ExecutorPtr) {
138 let self_ = self.clone();
139 self.task.clone().start(
140 self.run_accept_loop(listener, ex.clone()),
141 |result| self_.handle_stop(result),
142 Error::NetworkServiceStopped,
143 ex,
144 );
145 }
146
147 async fn run_accept_loop(
149 self: Arc<Self>,
150 listener: Box<dyn PtListener>,
151 ex: ExecutorPtr,
152 ) -> Result<()> {
153 let cv = Arc::new(CondVar::new());
156 let hosts = self.session.upgrade().unwrap().p2p().hosts();
157
158 loop {
159 let limit =
161 self.session.upgrade().unwrap().p2p().settings().read().await.inbound_connections;
162
163 if self.clone().conn_count.load(SeqCst) >= limit {
164 warn!(target: "net::acceptor::run_accept_loop", "Reached incoming conn limit, waiting...");
169 cv.wait().await;
170 cv.reset();
171 continue
172 }
173
174 match listener.next().await {
176 Ok((stream, url)) => {
177 if hosts.container.contains(HostColor::Black, &url) ||
179 hosts.block_all_ports(&url)
180 {
181 warn!(target: "net::acceptor::run_accept_loop", "Peer {url} is blacklisted");
182 continue
183 }
184
185 let session = self.session.clone();
187 let channel = Channel::new(stream, None, url, session, false).await;
188
189 self.conn_count.fetch_add(1, SeqCst);
191
192 let self_ = self.clone();
196 let channel_ = channel.clone();
197 let cv_ = cv.clone();
198 ex.spawn(async move {
199 let stop_sub = channel_.subscribe_stop().await?;
200 stop_sub.receive().await;
201 self_.conn_count.fetch_sub(1, SeqCst);
202 cv_.notify();
203 Ok::<(), crate::Error>(())
204 })
205 .detach();
206
207 self.channel_publisher.notify(Ok(channel)).await;
209 }
210
211 Err(e) if e.raw_os_error().is_some() => match e.raw_os_error().unwrap() {
213 libc::EAGAIN | libc::ECONNABORTED | libc::EPROTO | libc::EINTR => continue,
214 libc::ECONNRESET => {
215 warn!(
216 target: "net::acceptor::run_accept_loop",
217 "[P2P] Connection reset by peer in accept_loop"
218 );
219 continue
220 }
221 libc::ETIMEDOUT => {
222 warn!(
223 target: "net::acceptor::run_accept_loop",
224 "[P2P] Connection timed out in accept_loop"
225 );
226 continue
227 }
228 libc::EPIPE => {
229 warn!(
230 target: "net::acceptor::run_accept_loop",
231 "[P2P] Broken pipe in accept_loop"
232 );
233 continue
234 }
235 x => {
236 warn!(
237 target: "net::acceptor::run_accept_loop",
238 "[P2P] Unhandled OS Error: {e} {x}"
239 );
240 continue
241 }
242 },
243
244 Err(e) if e.kind() == ErrorKind::UnexpectedEof => continue,
246
247 Err(e) if e.kind() == ErrorKind::Other => {
249 if let Some(inner) = std::error::Error::source(&e) {
250 if let Some(inner) = inner.downcast_ref::<futures_rustls::rustls::Error>() {
251 warn!(
252 target: "net::acceptor::run_accept_loop",
253 "[P2P] rustls listener error: {inner:?}"
254 );
255 continue
256 }
257 }
258
259 warn!(
260 target: "net::acceptor::run_accept_loop",
261 "[P2P] Unhandled ErrorKind::Other error: {e:?}"
262 );
263 continue
264 }
265
266 Err(e) => {
268 warn!(
269 target: "net::acceptor::run_accept_loop",
270 "[P2P] Unhandled listener.next() error: {e}"
271 );
272 continue
273 }
274 }
275 }
276 }
277
278 async fn handle_stop(self: Arc<Self>, result: Result<()>) {
281 match result {
282 Ok(()) => panic!("Acceptor task should never complete without error status"),
283 Err(err) => self.channel_publisher.notify(Err(err)).await,
284 }
285 }
286}