darkfi/net/
acceptor.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    io::ErrorKind,
21    sync::{
22        atomic::{AtomicUsize, Ordering::SeqCst},
23        Arc,
24    },
25};
26
27use tracing::warn;
28use url::Url;
29
30#[cfg(feature = "upnp-igd")]
31use smol::lock::Mutex as AsyncMutex;
32
33use super::{
34    channel::{Channel, ChannelPtr},
35    hosts::HostColor,
36    session::SessionWeakPtr,
37    transport::{Listener, PtListener},
38};
39
40#[cfg(feature = "upnp-igd")]
41use super::upnp::{setup_port_mappings, PortMapping};
42
43use crate::{
44    system::{
45        CondVar, ExecutorPtr, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr,
46        Subscription,
47    },
48    util::logger::verbose,
49    Error, Result,
50};
51
52/// Atomic pointer to Acceptor
53pub type AcceptorPtr = Arc<Acceptor>;
54
55/// Create inbound socket connections
56pub struct Acceptor {
57    channel_publisher: PublisherPtr<Result<ChannelPtr>>,
58    task: StoppableTaskPtr,
59    session: SessionWeakPtr,
60    conn_count: AtomicUsize,
61    #[cfg(feature = "upnp-igd")]
62    port_mappings: AsyncMutex<Vec<Arc<dyn PortMapping>>>,
63}
64
65impl Acceptor {
66    /// Create new Acceptor object.
67    pub fn new(session: SessionWeakPtr) -> AcceptorPtr {
68        Arc::new(Self {
69            channel_publisher: Publisher::new(),
70            task: StoppableTask::new(),
71            session,
72            conn_count: AtomicUsize::new(0),
73            #[cfg(feature = "upnp-igd")]
74            port_mappings: AsyncMutex::new(Vec::new()),
75        })
76    }
77
78    /// Start accepting inbound socket connections
79    pub async fn start(self: Arc<Self>, endpoint: Url, ex: ExecutorPtr) -> Result<()> {
80        let datastore =
81            self.session.upgrade().unwrap().p2p().settings().read().await.p2p_datastore.clone();
82
83        // Initialize listener
84        let listener = Listener::new(endpoint.clone(), datastore).await?;
85
86        // Open socket
87        let ptlistener = listener.listen().await?;
88
89        #[cfg(feature = "p2p-tor")]
90        if endpoint.scheme() == "tor" {
91            let onion_addr = listener.endpoint().await;
92            verbose!("[P2P] Adding {onion_addr} to external_addrs");
93            self.session
94                .upgrade()
95                .unwrap()
96                .p2p()
97                .settings()
98                .write()
99                .await
100                .external_addrs
101                .push(onion_addr);
102        }
103
104        #[cfg(feature = "upnp-igd")]
105        {
106            let actual_endpoint = listener.endpoint().await;
107            let settings = self.session.upgrade().unwrap().p2p().settings();
108            let mappings = setup_port_mappings(&actual_endpoint, settings, ex.clone());
109            self.port_mappings.lock().await.extend(mappings);
110        }
111
112        self.accept(ptlistener, ex);
113        Ok(())
114    }
115
116    /// Stop accepting inbound socket connections
117    pub async fn stop(&self) {
118        // Stop all port mappings
119        #[cfg(feature = "upnp-igd")]
120        {
121            let mappings = std::mem::take(&mut *self.port_mappings.lock().await);
122            for mapping in mappings {
123                mapping.stop();
124            }
125        }
126
127        // Send stop signal
128        self.task.stop().await;
129    }
130
131    /// Start receiving network messages.
132    pub async fn subscribe(self: Arc<Self>) -> Subscription<Result<ChannelPtr>> {
133        self.channel_publisher.clone().subscribe().await
134    }
135
136    /// Run the accept loop in a new thread and error if a connection problem occurs
137    fn accept(self: Arc<Self>, listener: Box<dyn PtListener>, ex: ExecutorPtr) {
138        let self_ = self.clone();
139        self.task.clone().start(
140            self.run_accept_loop(listener, ex.clone()),
141            |result| self_.handle_stop(result),
142            Error::NetworkServiceStopped,
143            ex,
144        );
145    }
146
147    /// Run the accept loop.
148    async fn run_accept_loop(
149        self: Arc<Self>,
150        listener: Box<dyn PtListener>,
151        ex: ExecutorPtr,
152    ) -> Result<()> {
153        // CondVar used to notify the loop to recheck if new connections can
154        // be accepted by the listener.
155        let cv = Arc::new(CondVar::new());
156        let hosts = self.session.upgrade().unwrap().p2p().hosts();
157
158        loop {
159            // Refuse new connections if we're up to the connection limit
160            let limit =
161                self.session.upgrade().unwrap().p2p().settings().read().await.inbound_connections;
162
163            if self.clone().conn_count.load(SeqCst) >= limit {
164                // This will get notified every time an inbound channel is stopped.
165                // These channels are the channels spawned below on listener.next().is_ok().
166                // After the notification, we reset the condvar and retry this loop to see
167                // if we can accept more connections, and if not - we'll be back here.
168                warn!(target: "net::acceptor::run_accept_loop", "Reached incoming conn limit, waiting...");
169                cv.wait().await;
170                cv.reset();
171                continue
172            }
173
174            // Now we wait for a new connection.
175            match listener.next().await {
176                Ok((stream, url)) => {
177                    // Check if we reject this peer
178                    if hosts.container.contains(HostColor::Black, &url) ||
179                        hosts.block_all_ports(&url)
180                    {
181                        warn!(target: "net::acceptor::run_accept_loop", "Peer {url} is blacklisted");
182                        continue
183                    }
184
185                    // Create the new Channel.
186                    let session = self.session.clone();
187                    let channel = Channel::new(stream, None, url, session, false).await;
188
189                    // Increment the connection counter
190                    self.conn_count.fetch_add(1, SeqCst);
191
192                    // This task will subscribe on the new channel and decrement
193                    // the connection counter. Along with that, it will notify
194                    // the CondVar that might be waiting to allow new connections.
195                    let self_ = self.clone();
196                    let channel_ = channel.clone();
197                    let cv_ = cv.clone();
198                    ex.spawn(async move {
199                        let stop_sub = channel_.subscribe_stop().await?;
200                        stop_sub.receive().await;
201                        self_.conn_count.fetch_sub(1, SeqCst);
202                        cv_.notify();
203                        Ok::<(), crate::Error>(())
204                    })
205                    .detach();
206
207                    // Finally, notify any publishers about the new channel.
208                    self.channel_publisher.notify(Ok(channel)).await;
209                }
210
211                // As per accept(2) recommendation:
212                Err(e) if e.raw_os_error().is_some() => match e.raw_os_error().unwrap() {
213                    libc::EAGAIN | libc::ECONNABORTED | libc::EPROTO | libc::EINTR => continue,
214                    libc::ECONNRESET => {
215                        warn!(
216                            target: "net::acceptor::run_accept_loop",
217                            "[P2P] Connection reset by peer in accept_loop"
218                        );
219                        continue
220                    }
221                    libc::ETIMEDOUT => {
222                        warn!(
223                            target: "net::acceptor::run_accept_loop",
224                            "[P2P] Connection timed out in accept_loop"
225                        );
226                        continue
227                    }
228                    libc::EPIPE => {
229                        warn!(
230                            target: "net::acceptor::run_accept_loop",
231                            "[P2P] Broken pipe in accept_loop"
232                        );
233                        continue
234                    }
235                    x => {
236                        warn!(
237                            target: "net::acceptor::run_accept_loop",
238                            "[P2P] Unhandled OS Error: {e} {x}"
239                        );
240                        continue
241                    }
242                },
243
244                // In case a TLS handshake fails, we'll get this:
245                Err(e) if e.kind() == ErrorKind::UnexpectedEof => continue,
246
247                // Handle ErrorKind::Other
248                Err(e) if e.kind() == ErrorKind::Other => {
249                    if let Some(inner) = std::error::Error::source(&e) {
250                        if let Some(inner) = inner.downcast_ref::<futures_rustls::rustls::Error>() {
251                            warn!(
252                                target: "net::acceptor::run_accept_loop",
253                                "[P2P] rustls listener error: {inner:?}"
254                            );
255                            continue
256                        }
257                    }
258
259                    warn!(
260                        target: "net::acceptor::run_accept_loop",
261                        "[P2P] Unhandled ErrorKind::Other error: {e:?}"
262                    );
263                    continue
264                }
265
266                // Errors we didn't handle above:
267                Err(e) => {
268                    warn!(
269                        target: "net::acceptor::run_accept_loop",
270                        "[P2P] Unhandled listener.next() error: {e}"
271                    );
272                    continue
273                }
274            }
275        }
276    }
277
278    /// Handles network errors. Panics if errors pass silently, otherwise broadcasts it
279    /// to all channel publishers.
280    async fn handle_stop(self: Arc<Self>, result: Result<()>) {
281        match result {
282            Ok(()) => panic!("Acceptor task should never complete without error status"),
283            Err(err) => self.channel_publisher.notify(Err(err)).await,
284        }
285    }
286}