1use std::{
20 io::ErrorKind,
21 sync::{
22 atomic::{AtomicUsize, Ordering::SeqCst},
23 Arc,
24 },
25};
26
27use smol::Executor;
28use tracing::warn;
29use url::Url;
30
31use super::{
32 channel::{Channel, ChannelPtr},
33 hosts::HostColor,
34 session::SessionWeakPtr,
35 transport::{Listener, PtListener},
36};
37use crate::{
38 system::{CondVar, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
39 util::logger::verbose,
40 Error, Result,
41};
42
43pub type AcceptorPtr = Arc<Acceptor>;
45
46pub struct Acceptor {
48 channel_publisher: PublisherPtr<Result<ChannelPtr>>,
49 task: StoppableTaskPtr,
50 session: SessionWeakPtr,
51 conn_count: AtomicUsize,
52}
53
54impl Acceptor {
55 pub fn new(session: SessionWeakPtr) -> AcceptorPtr {
57 Arc::new(Self {
58 channel_publisher: Publisher::new(),
59 task: StoppableTask::new(),
60 session,
61 conn_count: AtomicUsize::new(0),
62 })
63 }
64
65 pub async fn start(self: Arc<Self>, endpoint: Url, ex: Arc<Executor<'_>>) -> Result<()> {
67 let datastore =
68 self.session.upgrade().unwrap().p2p().settings().read().await.p2p_datastore.clone();
69
70 let listener = Listener::new(endpoint.clone(), datastore).await?;
72
73 let ptlistener = listener.listen().await?;
75
76 #[cfg(feature = "p2p-tor")]
77 if endpoint.scheme() == "tor" {
78 let onion_addr = listener.endpoint().await;
79 verbose!("[P2P] Adding {onion_addr} to external_addrs");
80 self.session
81 .upgrade()
82 .unwrap()
83 .p2p()
84 .settings()
85 .write()
86 .await
87 .external_addrs
88 .push(onion_addr);
89 }
90
91 self.accept(ptlistener, ex);
92 Ok(())
93 }
94
95 pub async fn stop(&self) {
97 self.task.stop().await;
99 }
100
101 pub async fn subscribe(self: Arc<Self>) -> Subscription<Result<ChannelPtr>> {
103 self.channel_publisher.clone().subscribe().await
104 }
105
106 fn accept(self: Arc<Self>, listener: Box<dyn PtListener>, ex: Arc<Executor<'_>>) {
108 let self_ = self.clone();
109 self.task.clone().start(
110 self.run_accept_loop(listener, ex.clone()),
111 |result| self_.handle_stop(result),
112 Error::NetworkServiceStopped,
113 ex,
114 );
115 }
116
117 async fn run_accept_loop(
119 self: Arc<Self>,
120 listener: Box<dyn PtListener>,
121 ex: Arc<Executor<'_>>,
122 ) -> Result<()> {
123 let cv = Arc::new(CondVar::new());
126 let hosts = self.session.upgrade().unwrap().p2p().hosts();
127
128 loop {
129 let limit =
131 self.session.upgrade().unwrap().p2p().settings().read().await.inbound_connections;
132
133 if self.clone().conn_count.load(SeqCst) >= limit {
134 warn!(target: "net::acceptor::run_accept_loop", "Reached incoming conn limit, waiting...");
139 cv.wait().await;
140 cv.reset();
141 continue
142 }
143
144 match listener.next().await {
146 Ok((stream, url)) => {
147 if hosts.container.contains(HostColor::Black as usize, &url) ||
149 hosts.block_all_ports(&url)
150 {
151 warn!(target: "net::acceptor::run_accept_loop", "Peer {url} is blacklisted");
152 continue
153 }
154
155 let session = self.session.clone();
157 let channel = Channel::new(stream, None, url, session, false).await;
158
159 self.conn_count.fetch_add(1, SeqCst);
161
162 let self_ = self.clone();
166 let channel_ = channel.clone();
167 let cv_ = cv.clone();
168 ex.spawn(async move {
169 let stop_sub = channel_.subscribe_stop().await?;
170 stop_sub.receive().await;
171 self_.conn_count.fetch_sub(1, SeqCst);
172 cv_.notify();
173 Ok::<(), crate::Error>(())
174 })
175 .detach();
176
177 self.channel_publisher.notify(Ok(channel)).await;
179 }
180
181 Err(e) if e.raw_os_error().is_some() => match e.raw_os_error().unwrap() {
183 libc::EAGAIN | libc::ECONNABORTED | libc::EPROTO | libc::EINTR => continue,
184 libc::ECONNRESET => {
185 warn!(
186 target: "net::acceptor::run_accept_loop",
187 "[P2P] Connection reset by peer in accept_loop"
188 );
189 continue
190 }
191 libc::ETIMEDOUT => {
192 warn!(
193 target: "net::acceptor::run_accept_loop",
194 "[P2P] Connection timed out in accept_loop"
195 );
196 continue
197 }
198 libc::EPIPE => {
199 warn!(
200 target: "net::acceptor::run_accept_loop",
201 "[P2P] Broken pipe in accept_loop"
202 );
203 continue
204 }
205 x => {
206 warn!(
207 target: "net::acceptor::run_accept_loop",
208 "[P2P] Unhandled OS Error: {e} {x}"
209 );
210 continue
211 }
212 },
213
214 Err(e) if e.kind() == ErrorKind::UnexpectedEof => continue,
216
217 Err(e) if e.kind() == ErrorKind::Other => {
219 if let Some(inner) = std::error::Error::source(&e) {
220 if let Some(inner) = inner.downcast_ref::<futures_rustls::rustls::Error>() {
221 warn!(
222 target: "net::acceptor::run_accept_loop",
223 "[P2P] rustls listener error: {inner:?}"
224 );
225 continue
226 }
227 }
228
229 warn!(
230 target: "net::acceptor::run_accept_loop",
231 "[P2P] Unhandled ErrorKind::Other error: {e:?}"
232 );
233 continue
234 }
235
236 Err(e) => {
238 warn!(
239 target: "net::acceptor::run_accept_loop",
240 "[P2P] Unhandled listener.next() error: {e}"
241 );
242 continue
243 }
244 }
245 }
246 }
247
248 async fn handle_stop(self: Arc<Self>, result: Result<()>) {
251 match result {
252 Ok(()) => panic!("Acceptor task should never complete without error status"),
253 Err(err) => self.channel_publisher.notify(Err(err)).await,
254 }
255 }
256}