darkfi/net/transport/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{io, time::Duration};
20
21use async_trait::async_trait;
22use smol::io::{AsyncRead, AsyncWrite};
23use tracing::error;
24use url::Url;
25
26#[cfg(feature = "p2p-unix")]
27use std::io::ErrorKind;
28
29/// TLS upgrade mechanism
30pub(crate) mod tls;
31
32/// SOCKS5 proxy client
33#[cfg(feature = "p2p-socks5")]
34pub mod socks5;
35
36/// TCP transport
37pub(crate) mod tcp;
38
39#[cfg(feature = "p2p-tor")]
40/// Tor transport
41pub(crate) mod tor;
42
43#[cfg(feature = "p2p-nym")]
44/// Nym transport
45pub(crate) mod nym;
46
47/// Unix socket transport
48#[cfg(feature = "p2p-unix")]
49pub(crate) mod unix;
50
51/// Dialer variants
52#[derive(Debug, Clone)]
53pub enum DialerVariant {
54    /// Plain TCP
55    Tcp(tcp::TcpDialer),
56
57    /// TCP with TLS
58    TcpTls(tcp::TcpDialer),
59
60    #[cfg(feature = "p2p-tor")]
61    /// Tor
62    Tor(tor::TorDialer),
63
64    #[cfg(feature = "p2p-tor")]
65    /// Tor with TLS
66    TorTls(tor::TorDialer),
67
68    #[cfg(feature = "p2p-nym")]
69    /// Nym
70    Nym(nym::NymDialer),
71
72    #[cfg(feature = "p2p-nym")]
73    /// Nym with TLS
74    NymTls(nym::NymDialer),
75
76    /// Unix socket
77    #[cfg(feature = "p2p-unix")]
78    Unix(unix::UnixDialer),
79
80    /// SOCKS5 proxy
81    #[cfg(feature = "p2p-socks5")]
82    Socks5(socks5::Socks5Dialer),
83
84    /// SOCKS5 proxy with TLS
85    #[cfg(feature = "p2p-socks5")]
86    Socks5Tls(socks5::Socks5Dialer),
87}
88
89/// Listener variants
90#[derive(Debug, Clone)]
91pub enum ListenerVariant {
92    /// Plain TCP
93    Tcp(tcp::TcpListener),
94
95    /// TCP with TLS
96    TcpTls(tcp::TcpListener),
97
98    #[cfg(feature = "p2p-tor")]
99    /// Tor
100    Tor(tor::TorListener),
101
102    /// Unix socket
103    #[cfg(feature = "p2p-unix")]
104    Unix(unix::UnixListener),
105}
106
107/// A dialer that is able to transparently operate over arbitrary transports.
108pub struct Dialer {
109    /// The endpoint to connect to
110    endpoint: Url,
111    /// The dialer variant (transport protocol)
112    variant: DialerVariant,
113}
114
115macro_rules! enforce_hostport {
116    ($endpoint:ident) => {
117        if $endpoint.host_str().is_none() || $endpoint.port().is_none() {
118            return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
119        }
120    };
121}
122
123#[cfg(feature = "p2p-unix")]
124macro_rules! enforce_abspath {
125    ($endpoint:ident) => {
126        if $endpoint.host_str().is_some() || $endpoint.port().is_some() {
127            return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
128        }
129
130        if $endpoint.to_file_path().is_err() {
131            return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
132        }
133    };
134}
135
136impl Dialer {
137    /// Instantiate a new [`Dialer`] with the given [`Url`] and datastore path.
138    pub async fn new(
139        endpoint: Url,
140        datastore: Option<String>,
141        i2p_socks5_proxy: Option<Url>,
142    ) -> io::Result<Self> {
143        match endpoint.scheme().to_lowercase().as_str() {
144            "tcp" => {
145                // Build a TCP dialer
146                enforce_hostport!(endpoint);
147                let variant = tcp::TcpDialer::new(None).await?;
148                let variant = DialerVariant::Tcp(variant);
149                Ok(Self { endpoint, variant })
150            }
151
152            "tcp+tls" => {
153                // Build a TCP dialer wrapped with TLS
154                enforce_hostport!(endpoint);
155                let variant = tcp::TcpDialer::new(None).await?;
156                let variant = DialerVariant::TcpTls(variant);
157                Ok(Self { endpoint, variant })
158            }
159
160            #[cfg(feature = "p2p-tor")]
161            "tor" => {
162                // Build a Tor dialer
163                enforce_hostport!(endpoint);
164                let variant = tor::TorDialer::new(datastore).await?;
165                let variant = DialerVariant::Tor(variant);
166                Ok(Self { endpoint, variant })
167            }
168
169            #[cfg(feature = "p2p-tor")]
170            "tor+tls" => {
171                // Build a Tor dialer wrapped with TLS
172                enforce_hostport!(endpoint);
173                let variant = tor::TorDialer::new(datastore).await?;
174                let variant = DialerVariant::TorTls(variant);
175                Ok(Self { endpoint, variant })
176            }
177
178            #[cfg(feature = "p2p-nym")]
179            "nym" => {
180                // Build a Nym dialer
181                enforce_hostport!(endpoint);
182                let variant = nym::NymDialer::new().await?;
183                let variant = DialerVariant::Nym(variant);
184                Ok(Self { endpoint, variant })
185            }
186
187            #[cfg(feature = "p2p-nym")]
188            "nym+tls" => {
189                // Build a Nym dialer wrapped with TLS
190                enforce_hostport!(endpoint);
191                let variant = nym::NymDialer::new().await?;
192                let variant = DialerVariant::NymTls(variant);
193                Ok(Self { endpoint, variant })
194            }
195
196            #[cfg(feature = "p2p-unix")]
197            "unix" => {
198                // Build a Unix socket dialer
199                enforce_abspath!(endpoint);
200                let variant = unix::UnixDialer::new().await?;
201                let variant = DialerVariant::Unix(variant);
202                Ok(Self { endpoint, variant })
203            }
204
205            #[cfg(feature = "p2p-socks5")]
206            "socks5" => {
207                // Build a SOCKS5 dialer
208                enforce_hostport!(endpoint);
209                let variant = socks5::Socks5Dialer::new(&endpoint).await?;
210                let variant = DialerVariant::Socks5(variant);
211                Ok(Self { endpoint, variant })
212            }
213
214            #[cfg(feature = "p2p-socks5")]
215            "socks5+tls" => {
216                // Build a SOCKS5 dialer with TLS encapsulation
217                enforce_hostport!(endpoint);
218                let variant = socks5::Socks5Dialer::new(&endpoint).await?;
219                let variant = DialerVariant::Socks5Tls(variant);
220                Ok(Self { endpoint, variant })
221            }
222
223            #[cfg(feature = "p2p-i2p")]
224            "i2p" => {
225                // Build a Socks5 Dialer for I2p
226                enforce_hostport!(endpoint);
227                let mut url = i2p_socks5_proxy.unwrap();
228                url.set_path(&format!("{}:{}", endpoint.host().unwrap(), endpoint.port().unwrap()));
229                let variant = socks5::Socks5Dialer::new(&url).await?;
230                let variant = DialerVariant::Socks5(variant);
231                Ok(Self { endpoint, variant })
232            }
233
234            #[cfg(feature = "p2p-i2p")]
235            "i2p+tls" => {
236                // Build a SOCKS5 dialer with TLS encapsulation for I2p
237                enforce_hostport!(endpoint);
238                let mut url = i2p_socks5_proxy.unwrap();
239                url.set_path(&format!("{}:{}", endpoint.host().unwrap(), endpoint.port().unwrap()));
240                url.set_scheme("socks5+tls").unwrap();
241                let variant = socks5::Socks5Dialer::new(&url).await?;
242                let variant = DialerVariant::Socks5Tls(variant);
243                Ok(Self { endpoint, variant })
244            }
245
246            x => {
247                error!("[P2P] Requested unsupported transport: {x}");
248                Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
249            }
250        }
251    }
252
253    /// Dial an instantiated [`Dialer`]. This creates a connection and returns a stream.
254    /// The Tor-based Dialer variants can panic: this is intended. There exists validation
255    /// for hosts and ports in other parts of the codebase. A panic occurring here
256    /// likely indicates a configuration issue on the part of the user. It is preferable
257    /// in this case that the user is alerted to this problem via a panic.
258    pub async fn dial(&self, timeout: Option<Duration>) -> io::Result<Box<dyn PtStream>> {
259        match &self.variant {
260            DialerVariant::Tcp(dialer) => {
261                // NOTE: sockaddr here is an array, can contain both ipv4 and ipv6
262                let sockaddr = self.endpoint.socket_addrs(|| None)?;
263                let stream = dialer.do_dial(sockaddr[0], timeout).await?;
264                Ok(Box::new(stream))
265            }
266
267            DialerVariant::TcpTls(dialer) => {
268                let sockaddr = self.endpoint.socket_addrs(|| None)?;
269                let stream = dialer.do_dial(sockaddr[0], timeout).await?;
270                let tlsupgrade = tls::TlsUpgrade::new().await?;
271                let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
272                Ok(Box::new(stream))
273            }
274
275            #[cfg(feature = "p2p-tor")]
276            DialerVariant::Tor(dialer) => {
277                let host = self.endpoint.host_str().unwrap();
278                let port = self.endpoint.port().unwrap();
279                let stream = dialer.do_dial(host, port, timeout).await?;
280                Ok(Box::new(stream))
281            }
282
283            #[cfg(feature = "p2p-tor")]
284            DialerVariant::TorTls(dialer) => {
285                let host = self.endpoint.host_str().unwrap();
286                let port = self.endpoint.port().unwrap();
287                let stream = dialer.do_dial(host, port, timeout).await?;
288                let tlsupgrade = tls::TlsUpgrade::new().await?;
289                let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
290                Ok(Box::new(stream))
291            }
292
293            #[cfg(feature = "p2p-nym")]
294            DialerVariant::Nym(_dialer) => {
295                todo!();
296            }
297
298            #[cfg(feature = "p2p-nym")]
299            DialerVariant::NymTls(_dialer) => {
300                todo!();
301            }
302
303            #[cfg(feature = "p2p-unix")]
304            DialerVariant::Unix(dialer) => {
305                let path = match self.endpoint.to_file_path() {
306                    Ok(v) => v,
307                    Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
308                };
309                let stream = dialer.do_dial(path).await?;
310                Ok(Box::new(stream))
311            }
312
313            #[cfg(feature = "p2p-socks5")]
314            DialerVariant::Socks5(dialer) => {
315                let stream = dialer.do_dial().await?;
316                Ok(Box::new(stream))
317            }
318
319            #[cfg(feature = "p2p-socks5")]
320            DialerVariant::Socks5Tls(dialer) => {
321                let stream = dialer.do_dial().await?;
322                let tlsupgrade = tls::TlsUpgrade::new().await?;
323                let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
324                Ok(Box::new(stream))
325            }
326        }
327    }
328
329    /// Return a reference to the `Dialer` endpoint
330    pub fn endpoint(&self) -> &Url {
331        &self.endpoint
332    }
333}
334
335/// A listener that is able to transparently listen over arbitrary transports.
336pub struct Listener {
337    /// The address to open the listener on
338    endpoint: Url,
339    /// The listener variant (transport protocol)
340    variant: ListenerVariant,
341}
342
343impl Listener {
344    /// Instantiate a new [`Listener`] with the given [`Url`] and datastore path.
345    /// Must contain a scheme, host string, and a port.
346    pub async fn new(endpoint: Url, datastore: Option<String>) -> io::Result<Self> {
347        match endpoint.scheme().to_lowercase().as_str() {
348            "tcp" => {
349                // Build a TCP listener
350                enforce_hostport!(endpoint);
351                let variant = tcp::TcpListener::new(1024).await?;
352                let variant = ListenerVariant::Tcp(variant);
353                Ok(Self { endpoint, variant })
354            }
355
356            "tcp+tls" => {
357                // Build a TCP listener wrapped with TLS
358                enforce_hostport!(endpoint);
359                let variant = tcp::TcpListener::new(1024).await?;
360                let variant = ListenerVariant::TcpTls(variant);
361                Ok(Self { endpoint, variant })
362            }
363
364            #[cfg(feature = "p2p-tor")]
365            "tor" => {
366                // Build a Tor Hidden Service listener
367                enforce_hostport!(endpoint);
368                let variant = tor::TorListener::new(datastore).await?;
369                let variant = ListenerVariant::Tor(variant);
370                Ok(Self { endpoint, variant })
371            }
372
373            #[cfg(feature = "p2p-unix")]
374            "unix" => {
375                enforce_abspath!(endpoint);
376                let variant = unix::UnixListener::new().await?;
377                let variant = ListenerVariant::Unix(variant);
378                Ok(Self { endpoint, variant })
379            }
380
381            x => {
382                error!("[P2P] Requested unsupported transport: {x}");
383                Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
384            }
385        }
386    }
387
388    /// Listen on an instantiated [`Listener`].
389    /// This will open a socket and return the listener.
390    pub async fn listen(&self) -> io::Result<Box<dyn PtListener>> {
391        match &self.variant {
392            ListenerVariant::Tcp(listener) => {
393                let sockaddr = self.endpoint.socket_addrs(|| None)?;
394                let l = listener.do_listen(sockaddr[0]).await?;
395                Ok(Box::new(l))
396            }
397
398            ListenerVariant::TcpTls(listener) => {
399                let sockaddr = self.endpoint.socket_addrs(|| None)?;
400                let l = listener.do_listen(sockaddr[0]).await?;
401                let tlsupgrade = tls::TlsUpgrade::new().await?;
402                let l = tlsupgrade.upgrade_listener_tcp_tls(l).await?;
403                Ok(Box::new(l))
404            }
405
406            #[cfg(feature = "p2p-tor")]
407            ListenerVariant::Tor(listener) => {
408                let port = self.endpoint.port().unwrap();
409                let l = listener.do_listen(port).await?;
410                Ok(Box::new(l))
411            }
412
413            #[cfg(feature = "p2p-unix")]
414            ListenerVariant::Unix(listener) => {
415                let path = match self.endpoint.to_file_path() {
416                    Ok(v) => v,
417                    Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
418                };
419                let l = listener.do_listen(&path).await?;
420                Ok(Box::new(l))
421            }
422        }
423    }
424
425    /// Should only be called after `listen()` in order to behave correctly.
426    pub async fn endpoint(&self) -> Url {
427        match &self.variant {
428            ListenerVariant::Tcp(listener) | ListenerVariant::TcpTls(listener) => {
429                let mut endpoint = self.endpoint.clone();
430
431                // Endpoint *must* always have a port set.
432                // This is enforced by the enforce_hostport!() macro in Listener::new().
433                let port = self.endpoint.port().unwrap();
434
435                // `port == 0` means we got the OS to assign a random listen port to us.
436                // Get the port from the listener and modify the endpoint.
437                if port == 0 {
438                    // Was `.listen()` called yet? Otherwise do nothing
439                    if let Some(actual_port) = listener.port.get() {
440                        endpoint.set_port(Some(*actual_port)).unwrap();
441                    }
442                }
443
444                endpoint
445            }
446            #[cfg(feature = "p2p-tor")]
447            ListenerVariant::Tor(listener) => listener.endpoint.get().unwrap().clone(),
448            #[allow(unreachable_patterns)]
449            _ => self.endpoint.clone(),
450        }
451    }
452}
453
454/// Wrapper trait for async streams
455pub trait PtStream: AsyncRead + AsyncWrite + Unpin + Send {}
456
457impl PtStream for smol::net::TcpStream {}
458
459impl PtStream for futures_rustls::TlsStream<smol::net::TcpStream> {}
460
461#[cfg(feature = "p2p-tor")]
462impl PtStream for arti_client::DataStream {}
463
464#[cfg(feature = "p2p-tor")]
465impl PtStream for futures_rustls::TlsStream<arti_client::DataStream> {}
466
467#[cfg(feature = "p2p-unix")]
468impl PtStream for smol::net::unix::UnixStream {}
469
470/// Wrapper trait for async listeners
471#[async_trait]
472pub trait PtListener: Send + Unpin {
473    async fn next(&self) -> io::Result<(Box<dyn PtStream>, Url)>;
474}