darkfi/net/
acceptor.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    io::ErrorKind,
21    sync::{
22        atomic::{AtomicUsize, Ordering::SeqCst},
23        Arc,
24    },
25};
26
27use smol::Executor;
28use tracing::warn;
29use url::Url;
30
31use super::{
32    channel::{Channel, ChannelPtr},
33    hosts::HostColor,
34    session::SessionWeakPtr,
35    transport::{Listener, PtListener},
36};
37use crate::{
38    system::{CondVar, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
39    util::logger::verbose,
40    Error, Result,
41};
42
43/// Atomic pointer to Acceptor
44pub type AcceptorPtr = Arc<Acceptor>;
45
46/// Create inbound socket connections
47pub struct Acceptor {
48    channel_publisher: PublisherPtr<Result<ChannelPtr>>,
49    task: StoppableTaskPtr,
50    session: SessionWeakPtr,
51    conn_count: AtomicUsize,
52}
53
54impl Acceptor {
55    /// Create new Acceptor object.
56    pub fn new(session: SessionWeakPtr) -> AcceptorPtr {
57        Arc::new(Self {
58            channel_publisher: Publisher::new(),
59            task: StoppableTask::new(),
60            session,
61            conn_count: AtomicUsize::new(0),
62        })
63    }
64
65    /// Start accepting inbound socket connections
66    pub async fn start(self: Arc<Self>, endpoint: Url, ex: Arc<Executor<'_>>) -> Result<()> {
67        let datastore =
68            self.session.upgrade().unwrap().p2p().settings().read().await.p2p_datastore.clone();
69
70        // Initialize listener
71        let listener = Listener::new(endpoint.clone(), datastore).await?;
72
73        // Open socket
74        let ptlistener = listener.listen().await?;
75
76        #[cfg(feature = "p2p-tor")]
77        if endpoint.scheme() == "tor" {
78            let onion_addr = listener.endpoint().await;
79            verbose!("[P2P] Adding {onion_addr} to external_addrs");
80            self.session
81                .upgrade()
82                .unwrap()
83                .p2p()
84                .settings()
85                .write()
86                .await
87                .external_addrs
88                .push(onion_addr);
89        }
90
91        self.accept(ptlistener, ex);
92        Ok(())
93    }
94
95    /// Stop accepting inbound socket connections
96    pub async fn stop(&self) {
97        // Send stop signal
98        self.task.stop().await;
99    }
100
101    /// Start receiving network messages.
102    pub async fn subscribe(self: Arc<Self>) -> Subscription<Result<ChannelPtr>> {
103        self.channel_publisher.clone().subscribe().await
104    }
105
106    /// Run the accept loop in a new thread and error if a connection problem occurs
107    fn accept(self: Arc<Self>, listener: Box<dyn PtListener>, ex: Arc<Executor<'_>>) {
108        let self_ = self.clone();
109        self.task.clone().start(
110            self.run_accept_loop(listener, ex.clone()),
111            |result| self_.handle_stop(result),
112            Error::NetworkServiceStopped,
113            ex,
114        );
115    }
116
117    /// Run the accept loop.
118    async fn run_accept_loop(
119        self: Arc<Self>,
120        listener: Box<dyn PtListener>,
121        ex: Arc<Executor<'_>>,
122    ) -> Result<()> {
123        // CondVar used to notify the loop to recheck if new connections can
124        // be accepted by the listener.
125        let cv = Arc::new(CondVar::new());
126        let hosts = self.session.upgrade().unwrap().p2p().hosts();
127
128        loop {
129            // Refuse new connections if we're up to the connection limit
130            let limit =
131                self.session.upgrade().unwrap().p2p().settings().read().await.inbound_connections;
132
133            if self.clone().conn_count.load(SeqCst) >= limit {
134                // This will get notified every time an inbound channel is stopped.
135                // These channels are the channels spawned below on listener.next().is_ok().
136                // After the notification, we reset the condvar and retry this loop to see
137                // if we can accept more connections, and if not - we'll be back here.
138                warn!(target: "net::acceptor::run_accept_loop", "Reached incoming conn limit, waiting...");
139                cv.wait().await;
140                cv.reset();
141                continue
142            }
143
144            // Now we wait for a new connection.
145            match listener.next().await {
146                Ok((stream, url)) => {
147                    // Check if we reject this peer
148                    if hosts.container.contains(HostColor::Black as usize, &url) ||
149                        hosts.block_all_ports(&url)
150                    {
151                        warn!(target: "net::acceptor::run_accept_loop", "Peer {url} is blacklisted");
152                        continue
153                    }
154
155                    // Create the new Channel.
156                    let session = self.session.clone();
157                    let channel = Channel::new(stream, None, url, session, false).await;
158
159                    // Increment the connection counter
160                    self.conn_count.fetch_add(1, SeqCst);
161
162                    // This task will subscribe on the new channel and decrement
163                    // the connection counter. Along with that, it will notify
164                    // the CondVar that might be waiting to allow new connections.
165                    let self_ = self.clone();
166                    let channel_ = channel.clone();
167                    let cv_ = cv.clone();
168                    ex.spawn(async move {
169                        let stop_sub = channel_.subscribe_stop().await?;
170                        stop_sub.receive().await;
171                        self_.conn_count.fetch_sub(1, SeqCst);
172                        cv_.notify();
173                        Ok::<(), crate::Error>(())
174                    })
175                    .detach();
176
177                    // Finally, notify any publishers about the new channel.
178                    self.channel_publisher.notify(Ok(channel)).await;
179                }
180
181                // As per accept(2) recommendation:
182                Err(e) if e.raw_os_error().is_some() => match e.raw_os_error().unwrap() {
183                    libc::EAGAIN | libc::ECONNABORTED | libc::EPROTO | libc::EINTR => continue,
184                    libc::ECONNRESET => {
185                        warn!(
186                            target: "net::acceptor::run_accept_loop",
187                            "[P2P] Connection reset by peer in accept_loop"
188                        );
189                        continue
190                    }
191                    libc::ETIMEDOUT => {
192                        warn!(
193                            target: "net::acceptor::run_accept_loop",
194                            "[P2P] Connection timed out in accept_loop"
195                        );
196                        continue
197                    }
198                    libc::EPIPE => {
199                        warn!(
200                            target: "net::acceptor::run_accept_loop",
201                            "[P2P] Broken pipe in accept_loop"
202                        );
203                        continue
204                    }
205                    x => {
206                        warn!(
207                            target: "net::acceptor::run_accept_loop",
208                            "[P2P] Unhandled OS Error: {e} {x}"
209                        );
210                        continue
211                    }
212                },
213
214                // In case a TLS handshake fails, we'll get this:
215                Err(e) if e.kind() == ErrorKind::UnexpectedEof => continue,
216
217                // Handle ErrorKind::Other
218                Err(e) if e.kind() == ErrorKind::Other => {
219                    if let Some(inner) = std::error::Error::source(&e) {
220                        if let Some(inner) = inner.downcast_ref::<futures_rustls::rustls::Error>() {
221                            warn!(
222                                target: "net::acceptor::run_accept_loop",
223                                "[P2P] rustls listener error: {inner:?}"
224                            );
225                            continue
226                        }
227                    }
228
229                    warn!(
230                        target: "net::acceptor::run_accept_loop",
231                        "[P2P] Unhandled ErrorKind::Other error: {e:?}"
232                    );
233                    continue
234                }
235
236                // Errors we didn't handle above:
237                Err(e) => {
238                    warn!(
239                        target: "net::acceptor::run_accept_loop",
240                        "[P2P] Unhandled listener.next() error: {e}"
241                    );
242                    continue
243                }
244            }
245        }
246    }
247
248    /// Handles network errors. Panics if errors pass silently, otherwise broadcasts it
249    /// to all channel publishers.
250    async fn handle_stop(self: Arc<Self>, result: Result<()>) {
251        match result {
252            Ok(()) => panic!("Acceptor task should never complete without error status"),
253            Err(err) => self.channel_publisher.notify(Err(err)).await,
254        }
255    }
256}