1use std::{
20 sync::{atomic::Ordering, Arc},
21 time::Duration,
22};
23
24use futures::{
25 future::{select, Either},
26 pin_mut,
27};
28use smol::lock::RwLock as AsyncRwLock;
29use tracing::warn;
30use url::Url;
31
32use super::{
33 channel::{Channel, ChannelPtr},
34 hosts::HostColor,
35 session::SessionWeakPtr,
36 settings::Settings,
37 transport::Dialer,
38};
39use crate::{net::hosts::HostContainer, system::CondVar, Error, Result};
40
41pub struct Connector {
43 settings: Arc<AsyncRwLock<Settings>>,
45 pub session: SessionWeakPtr,
47 stop_signal: CondVar,
49}
50
51impl Connector {
52 pub fn new(settings: Arc<AsyncRwLock<Settings>>, session: SessionWeakPtr) -> Self {
54 Self { settings, session, stop_signal: CondVar::new() }
55 }
56
57 pub async fn connect(&self, url: &Url) -> Result<(Url, ChannelPtr)> {
59 let hosts = self.session.upgrade().unwrap().p2p().hosts();
60 if hosts.container.contains(HostColor::Black as usize, url) || hosts.block_all_ports(url) {
61 warn!(target: "net::connector::connect", "Peer {url} is blacklisted");
62 return Err(Error::ConnectFailed(format!("[{url}]: Peer is blacklisted")));
63 }
64
65 let settings = self.settings.read().await;
66 let datastore = settings.p2p_datastore.clone();
67 let i2p_socks5_proxy = settings.i2p_socks5_proxy.clone();
68
69 let (endpoint, mixed_transport) = if let Some(mixed_host) = HostContainer::mix_host(
70 url,
71 &settings.active_profiles,
72 &settings.mixed_profiles,
73 &settings.tor_socks5_proxy,
74 &settings.nym_socks5_proxy,
75 )
76 .first()
77 {
78 (mixed_host.clone(), true)
79 } else {
80 (url.clone(), false)
81 };
82
83 let outbound_connect_timeout = settings.outbound_connect_timeout(endpoint.scheme());
84 drop(settings);
85
86 let dialer = match Dialer::new(endpoint.clone(), datastore, Some(i2p_socks5_proxy)).await {
87 Ok(dialer) => dialer,
88 Err(err) => return Err(Error::ConnectFailed(format!("[{endpoint}]: {err}"))),
89 };
90 let timeout = Duration::from_secs(outbound_connect_timeout);
91
92 let stop_fut = async {
93 self.stop_signal.wait().await;
94 };
95 let dial_fut = async { dialer.dial(Some(timeout)).await };
96
97 pin_mut!(stop_fut);
98 pin_mut!(dial_fut);
99
100 match select(dial_fut, stop_fut).await {
101 Either::Left((Ok(ptstream), _)) => {
102 let channel = Channel::new(
103 ptstream,
104 Some(endpoint.clone()),
105 url.clone(),
106 self.session.clone(),
107 mixed_transport,
108 )
109 .await;
110 Ok((endpoint, channel))
111 }
112
113 Either::Left((Err(e), _)) => {
114 if e.raw_os_error() == Some(libc::ENETUNREACH) {
116 self.session
117 .upgrade()
118 .unwrap()
119 .p2p()
120 .hosts()
121 .ipv6_available
122 .store(false, Ordering::SeqCst);
123 }
124 Err(Error::ConnectFailed(format!("[{endpoint}]: {e}")))
125 }
126
127 Either::Right((_, _)) => Err(Error::ConnectorStopped(format!("[{endpoint}]"))),
128 }
129 }
130
131 pub(crate) fn stop(&self) {
132 self.stop_signal.notify()
133 }
134}