darkfi/net/
connector.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    sync::{atomic::Ordering, Arc},
21    time::Duration,
22};
23
24use futures::{
25    future::{select, Either},
26    pin_mut,
27};
28use smol::lock::RwLock as AsyncRwLock;
29use tracing::warn;
30use url::Url;
31
32use super::{
33    channel::{Channel, ChannelPtr},
34    hosts::HostColor,
35    session::SessionWeakPtr,
36    settings::Settings,
37    transport::Dialer,
38};
39use crate::{net::hosts::HostContainer, system::CondVar, Error, Result};
40
41/// Create outbound socket connections
42pub struct Connector {
43    /// P2P settings
44    settings: Arc<AsyncRwLock<Settings>>,
45    /// Weak pointer to the session
46    pub session: SessionWeakPtr,
47    /// Stop signal that aborts the connector if received.
48    stop_signal: CondVar,
49}
50
51impl Connector {
52    /// Create a new connector with given network settings
53    pub fn new(settings: Arc<AsyncRwLock<Settings>>, session: SessionWeakPtr) -> Self {
54        Self { settings, session, stop_signal: CondVar::new() }
55    }
56
57    /// Establish an outbound connection
58    pub async fn connect(&self, url: &Url) -> Result<(Url, ChannelPtr)> {
59        let hosts = self.session.upgrade().unwrap().p2p().hosts();
60        if hosts.container.contains(HostColor::Black as usize, url) || hosts.block_all_ports(url) {
61            warn!(target: "net::connector::connect", "Peer {url} is blacklisted");
62            return Err(Error::ConnectFailed(format!("[{url}]: Peer is blacklisted")));
63        }
64
65        let settings = self.settings.read().await;
66        let datastore = settings.p2p_datastore.clone();
67        let i2p_socks5_proxy = settings.i2p_socks5_proxy.clone();
68
69        let (endpoint, mixed_transport) = if let Some(mixed_host) = HostContainer::mix_host(
70            url,
71            &settings.active_profiles,
72            &settings.mixed_profiles,
73            &settings.tor_socks5_proxy,
74            &settings.nym_socks5_proxy,
75        )
76        .first()
77        {
78            (mixed_host.clone(), true)
79        } else {
80            (url.clone(), false)
81        };
82
83        let outbound_connect_timeout = settings.outbound_connect_timeout(endpoint.scheme());
84        drop(settings);
85
86        let dialer = match Dialer::new(endpoint.clone(), datastore, Some(i2p_socks5_proxy)).await {
87            Ok(dialer) => dialer,
88            Err(err) => return Err(Error::ConnectFailed(format!("[{endpoint}]: {err}"))),
89        };
90        let timeout = Duration::from_secs(outbound_connect_timeout);
91
92        let stop_fut = async {
93            self.stop_signal.wait().await;
94        };
95        let dial_fut = async { dialer.dial(Some(timeout)).await };
96
97        pin_mut!(stop_fut);
98        pin_mut!(dial_fut);
99
100        match select(dial_fut, stop_fut).await {
101            Either::Left((Ok(ptstream), _)) => {
102                let channel = Channel::new(
103                    ptstream,
104                    Some(endpoint.clone()),
105                    url.clone(),
106                    self.session.clone(),
107                    mixed_transport,
108                )
109                .await;
110                Ok((endpoint, channel))
111            }
112
113            Either::Left((Err(e), _)) => {
114                // If we get ENETUNREACH, we don't have IPv6 connectivity so note it down.
115                if e.raw_os_error() == Some(libc::ENETUNREACH) {
116                    self.session
117                        .upgrade()
118                        .unwrap()
119                        .p2p()
120                        .hosts()
121                        .ipv6_available
122                        .store(false, Ordering::SeqCst);
123                }
124                Err(Error::ConnectFailed(format!("[{endpoint}]: {e}")))
125            }
126
127            Either::Right((_, _)) => Err(Error::ConnectorStopped(format!("[{endpoint}]"))),
128        }
129    }
130
131    pub(crate) fn stop(&self) {
132        self.stop_signal.notify()
133    }
134}