darkfi/net/transport/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{io, time::Duration};
20
21use async_trait::async_trait;
22use smol::io::{AsyncRead, AsyncWrite};
23use tracing::error;
24use url::Url;
25
26#[cfg(feature = "p2p-unix")]
27use std::io::ErrorKind;
28
29/// TLS upgrade mechanism
30pub(crate) mod tls;
31
32/// SOCKS5 proxy client
33#[cfg(feature = "p2p-socks5")]
34pub mod socks5;
35
36/// TCP transport
37pub(crate) mod tcp;
38
39#[cfg(feature = "p2p-tor")]
40/// Tor transport
41pub(crate) mod tor;
42
43#[cfg(feature = "p2p-nym")]
44/// Nym transport
45pub(crate) mod nym;
46
47/// Unix socket transport
48#[cfg(feature = "p2p-unix")]
49pub(crate) mod unix;
50
51/// QUIC transport
52#[cfg(feature = "p2p-quic")]
53pub(crate) mod quic;
54
55/// Dialer variants
56#[derive(Debug, Clone)]
57pub enum DialerVariant {
58    /// Plain TCP
59    Tcp(tcp::TcpDialer),
60
61    /// TCP with TLS
62    TcpTls(tcp::TcpDialer),
63
64    #[cfg(feature = "p2p-tor")]
65    /// Tor
66    Tor(tor::TorDialer),
67
68    #[cfg(feature = "p2p-tor")]
69    /// Tor with TLS
70    TorTls(tor::TorDialer),
71
72    #[cfg(feature = "p2p-nym")]
73    /// Nym
74    Nym(nym::NymDialer),
75
76    #[cfg(feature = "p2p-nym")]
77    /// Nym with TLS
78    NymTls(nym::NymDialer),
79
80    /// Unix socket
81    #[cfg(feature = "p2p-unix")]
82    Unix(unix::UnixDialer),
83
84    /// SOCKS5 proxy
85    #[cfg(feature = "p2p-socks5")]
86    Socks5(socks5::Socks5Dialer),
87
88    /// SOCKS5 proxy with TLS
89    #[cfg(feature = "p2p-socks5")]
90    Socks5Tls(socks5::Socks5Dialer),
91
92    /// QUIC (with built-in TLS)
93    Quic(quic::QuicDialer),
94}
95
96/// Listener variants
97#[derive(Debug, Clone)]
98pub enum ListenerVariant {
99    /// Plain TCP
100    Tcp(tcp::TcpListener),
101
102    /// TCP with TLS
103    TcpTls(tcp::TcpListener),
104
105    #[cfg(feature = "p2p-tor")]
106    /// Tor
107    Tor(tor::TorListener),
108
109    #[cfg(feature = "p2p-unix")]
110    /// Unix socket
111    Unix(unix::UnixListener),
112
113    #[cfg(feature = "p2p-quic")]
114    /// QUIC (with built-in TLS)
115    Quic(quic::QuicListener),
116}
117
118/// A dialer that is able to transparently operate over arbitrary transports.
119pub struct Dialer {
120    /// The endpoint to connect to
121    endpoint: Url,
122    /// The dialer variant (transport protocol)
123    variant: DialerVariant,
124}
125
126macro_rules! enforce_hostport {
127    ($endpoint:ident) => {
128        if $endpoint.host_str().is_none() || $endpoint.port().is_none() {
129            return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
130        }
131    };
132}
133
134#[cfg(feature = "p2p-unix")]
135macro_rules! enforce_abspath {
136    ($endpoint:ident) => {
137        if $endpoint.host_str().is_some() || $endpoint.port().is_some() {
138            return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
139        }
140
141        if $endpoint.to_file_path().is_err() {
142            return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
143        }
144    };
145}
146
147impl Dialer {
148    /// Instantiate a new [`Dialer`] with the given [`Url`] and datastore path.
149    pub async fn new(
150        endpoint: Url,
151        datastore: Option<String>,
152        i2p_socks5_proxy: Option<Url>,
153    ) -> io::Result<Self> {
154        match endpoint.scheme().to_lowercase().as_str() {
155            "tcp" => {
156                // Build a TCP dialer
157                enforce_hostport!(endpoint);
158                let variant = tcp::TcpDialer::new(None).await?;
159                let variant = DialerVariant::Tcp(variant);
160                Ok(Self { endpoint, variant })
161            }
162
163            "tcp+tls" => {
164                // Build a TCP dialer wrapped with TLS
165                enforce_hostport!(endpoint);
166                let variant = tcp::TcpDialer::new(None).await?;
167                let variant = DialerVariant::TcpTls(variant);
168                Ok(Self { endpoint, variant })
169            }
170
171            #[cfg(feature = "p2p-tor")]
172            "tor" => {
173                // Build a Tor dialer
174                enforce_hostport!(endpoint);
175                let variant = tor::TorDialer::new(datastore).await?;
176                let variant = DialerVariant::Tor(variant);
177                Ok(Self { endpoint, variant })
178            }
179
180            #[cfg(feature = "p2p-tor")]
181            "tor+tls" => {
182                // Build a Tor dialer wrapped with TLS
183                enforce_hostport!(endpoint);
184                let variant = tor::TorDialer::new(datastore).await?;
185                let variant = DialerVariant::TorTls(variant);
186                Ok(Self { endpoint, variant })
187            }
188
189            #[cfg(feature = "p2p-nym")]
190            "nym" => {
191                // Build a Nym dialer
192                enforce_hostport!(endpoint);
193                let variant = nym::NymDialer::new().await?;
194                let variant = DialerVariant::Nym(variant);
195                Ok(Self { endpoint, variant })
196            }
197
198            #[cfg(feature = "p2p-nym")]
199            "nym+tls" => {
200                // Build a Nym dialer wrapped with TLS
201                enforce_hostport!(endpoint);
202                let variant = nym::NymDialer::new().await?;
203                let variant = DialerVariant::NymTls(variant);
204                Ok(Self { endpoint, variant })
205            }
206
207            #[cfg(feature = "p2p-unix")]
208            "unix" => {
209                // Build a Unix socket dialer
210                enforce_abspath!(endpoint);
211                let variant = unix::UnixDialer::new().await?;
212                let variant = DialerVariant::Unix(variant);
213                Ok(Self { endpoint, variant })
214            }
215
216            #[cfg(feature = "p2p-socks5")]
217            "socks5" => {
218                // Build a SOCKS5 dialer
219                enforce_hostport!(endpoint);
220                let variant = socks5::Socks5Dialer::new(&endpoint).await?;
221                let variant = DialerVariant::Socks5(variant);
222                Ok(Self { endpoint, variant })
223            }
224
225            #[cfg(feature = "p2p-socks5")]
226            "socks5+tls" => {
227                // Build a SOCKS5 dialer with TLS encapsulation
228                enforce_hostport!(endpoint);
229                let variant = socks5::Socks5Dialer::new(&endpoint).await?;
230                let variant = DialerVariant::Socks5Tls(variant);
231                Ok(Self { endpoint, variant })
232            }
233
234            #[cfg(feature = "p2p-i2p")]
235            "i2p" => {
236                // Build a Socks5 Dialer for I2p
237                enforce_hostport!(endpoint);
238                let mut url = i2p_socks5_proxy.unwrap();
239                url.set_path(&format!("{}:{}", endpoint.host().unwrap(), endpoint.port().unwrap()));
240                let variant = socks5::Socks5Dialer::new(&url).await?;
241                let variant = DialerVariant::Socks5(variant);
242                Ok(Self { endpoint, variant })
243            }
244
245            #[cfg(feature = "p2p-i2p")]
246            "i2p+tls" => {
247                // Build a SOCKS5 dialer with TLS encapsulation for I2p
248                enforce_hostport!(endpoint);
249                let mut url = i2p_socks5_proxy.unwrap();
250                url.set_path(&format!("{}:{}", endpoint.host().unwrap(), endpoint.port().unwrap()));
251                url.set_scheme("socks5+tls").unwrap();
252                let variant = socks5::Socks5Dialer::new(&url).await?;
253                let variant = DialerVariant::Socks5Tls(variant);
254                Ok(Self { endpoint, variant })
255            }
256
257            #[cfg(feature = "p2p-quic")]
258            "quic" => {
259                // Build a QUIC dialer (TLS is built-in)
260                enforce_hostport!(endpoint);
261                let variant = quic::QuicDialer::new().await?;
262                let variant = DialerVariant::Quic(variant);
263                Ok(Self { endpoint, variant })
264            }
265
266            x => {
267                error!("[P2P] Requested unsupported transport: {x}");
268                Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
269            }
270        }
271    }
272
273    /// Dial an instantiated [`Dialer`]. This creates a connection and returns a stream.
274    /// The Tor-based Dialer variants can panic: this is intended. There exists validation
275    /// for hosts and ports in other parts of the codebase. A panic occurring here
276    /// likely indicates a configuration issue on the part of the user. It is preferable
277    /// in this case that the user is alerted to this problem via a panic.
278    pub async fn dial(&self, timeout: Option<Duration>) -> io::Result<Box<dyn PtStream>> {
279        match &self.variant {
280            DialerVariant::Tcp(dialer) => {
281                // NOTE: sockaddr here is an array, can contain both ipv4 and ipv6
282                let sockaddr = self.endpoint.socket_addrs(|| None)?;
283                let stream = dialer.do_dial(sockaddr[0], timeout).await?;
284                Ok(Box::new(stream))
285            }
286
287            DialerVariant::TcpTls(dialer) => {
288                let sockaddr = self.endpoint.socket_addrs(|| None)?;
289                let stream = dialer.do_dial(sockaddr[0], timeout).await?;
290                let tlsupgrade = tls::TlsUpgrade::new().await?;
291                let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
292                Ok(Box::new(stream))
293            }
294
295            #[cfg(feature = "p2p-tor")]
296            DialerVariant::Tor(dialer) => {
297                let host = self.endpoint.host_str().unwrap();
298                let port = self.endpoint.port().unwrap();
299                let stream = dialer.do_dial(host, port, timeout).await?;
300                Ok(Box::new(stream))
301            }
302
303            #[cfg(feature = "p2p-tor")]
304            DialerVariant::TorTls(dialer) => {
305                let host = self.endpoint.host_str().unwrap();
306                let port = self.endpoint.port().unwrap();
307                let stream = dialer.do_dial(host, port, timeout).await?;
308                let tlsupgrade = tls::TlsUpgrade::new().await?;
309                let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
310                Ok(Box::new(stream))
311            }
312
313            #[cfg(feature = "p2p-nym")]
314            DialerVariant::Nym(_dialer) => {
315                todo!();
316            }
317
318            #[cfg(feature = "p2p-nym")]
319            DialerVariant::NymTls(_dialer) => {
320                todo!();
321            }
322
323            #[cfg(feature = "p2p-unix")]
324            DialerVariant::Unix(dialer) => {
325                let path = match self.endpoint.to_file_path() {
326                    Ok(v) => v,
327                    Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
328                };
329                let stream = dialer.do_dial(path).await?;
330                Ok(Box::new(stream))
331            }
332
333            #[cfg(feature = "p2p-socks5")]
334            DialerVariant::Socks5(dialer) => {
335                let stream = dialer.do_dial().await?;
336                Ok(Box::new(stream))
337            }
338
339            #[cfg(feature = "p2p-socks5")]
340            DialerVariant::Socks5Tls(dialer) => {
341                let stream = dialer.do_dial().await?;
342                let tlsupgrade = tls::TlsUpgrade::new().await?;
343                let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
344                Ok(Box::new(stream))
345            }
346
347            #[cfg(feature = "p2p-quic")]
348            DialerVariant::Quic(dialer) => {
349                let sockaddr = self.endpoint.socket_addrs(|| None)?;
350                let stream = dialer.do_dial(sockaddr[0], timeout).await?;
351                Ok(Box::new(stream))
352            }
353        }
354    }
355
356    /// Return a reference to the `Dialer` endpoint
357    pub fn endpoint(&self) -> &Url {
358        &self.endpoint
359    }
360}
361
362/// A listener that is able to transparently listen over arbitrary transports.
363pub struct Listener {
364    /// The address to open the listener on
365    endpoint: Url,
366    /// The listener variant (transport protocol)
367    variant: ListenerVariant,
368}
369
370impl Listener {
371    /// Instantiate a new [`Listener`] with the given [`Url`] and datastore path.
372    /// Must contain a scheme, host string, and a port.
373    pub async fn new(endpoint: Url, datastore: Option<String>) -> io::Result<Self> {
374        match endpoint.scheme().to_lowercase().as_str() {
375            "tcp" => {
376                // Build a TCP listener
377                enforce_hostport!(endpoint);
378                let variant = tcp::TcpListener::new(1024).await?;
379                let variant = ListenerVariant::Tcp(variant);
380                Ok(Self { endpoint, variant })
381            }
382
383            "tcp+tls" => {
384                // Build a TCP listener wrapped with TLS
385                enforce_hostport!(endpoint);
386                let variant = tcp::TcpListener::new(1024).await?;
387                let variant = ListenerVariant::TcpTls(variant);
388                Ok(Self { endpoint, variant })
389            }
390
391            #[cfg(feature = "p2p-tor")]
392            "tor" => {
393                // Build a Tor Hidden Service listener
394                enforce_hostport!(endpoint);
395                let variant = tor::TorListener::new(datastore).await?;
396                let variant = ListenerVariant::Tor(variant);
397                Ok(Self { endpoint, variant })
398            }
399
400            #[cfg(feature = "p2p-unix")]
401            "unix" => {
402                enforce_abspath!(endpoint);
403                let variant = unix::UnixListener::new().await?;
404                let variant = ListenerVariant::Unix(variant);
405                Ok(Self { endpoint, variant })
406            }
407
408            #[cfg(feature = "p2p-quic")]
409            "quic" => {
410                enforce_hostport!(endpoint);
411                let variant = quic::QuicListener::new().await?;
412                let variant = ListenerVariant::Quic(variant);
413                Ok(Self { endpoint, variant })
414            }
415
416            x => {
417                error!("[P2P] Requested unsupported transport: {x}");
418                Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
419            }
420        }
421    }
422
423    /// Listen on an instantiated [`Listener`].
424    /// This will open a socket and return the listener.
425    pub async fn listen(&self) -> io::Result<Box<dyn PtListener>> {
426        match &self.variant {
427            ListenerVariant::Tcp(listener) => {
428                let sockaddr = self.endpoint.socket_addrs(|| None)?;
429                let l = listener.do_listen(sockaddr[0]).await?;
430                Ok(Box::new(l))
431            }
432
433            ListenerVariant::TcpTls(listener) => {
434                let sockaddr = self.endpoint.socket_addrs(|| None)?;
435                let l = listener.do_listen(sockaddr[0]).await?;
436                let tlsupgrade = tls::TlsUpgrade::new().await?;
437                let l = tlsupgrade.upgrade_listener_tcp_tls(l).await?;
438                Ok(Box::new(l))
439            }
440
441            #[cfg(feature = "p2p-tor")]
442            ListenerVariant::Tor(listener) => {
443                let port = self.endpoint.port().unwrap();
444                let l = listener.do_listen(port).await?;
445                Ok(Box::new(l))
446            }
447
448            #[cfg(feature = "p2p-unix")]
449            ListenerVariant::Unix(listener) => {
450                let path = match self.endpoint.to_file_path() {
451                    Ok(v) => v,
452                    Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
453                };
454                let l = listener.do_listen(&path).await?;
455                Ok(Box::new(l))
456            }
457
458            #[cfg(feature = "p2p-quic")]
459            ListenerVariant::Quic(listener) => {
460                let sockaddr = self.endpoint.socket_addrs(|| None)?;
461                let l = listener.do_listen(sockaddr[0]).await?;
462                Ok(Box::new(l))
463            }
464        }
465    }
466
467    /// Should only be called after `listen()` in order to behave correctly.
468    pub async fn endpoint(&self) -> Url {
469        match &self.variant {
470            ListenerVariant::Tcp(listener) | ListenerVariant::TcpTls(listener) => {
471                let mut endpoint = self.endpoint.clone();
472
473                // Endpoint *must* always have a port set.
474                // This is enforced by the enforce_hostport!() macro in Listener::new().
475                let port = self.endpoint.port().unwrap();
476
477                // `port == 0` means we got the OS to assign a random listen port to us.
478                // Get the port from the listener and modify the endpoint.
479                if port == 0 {
480                    // Was `.listen()` called yet? Otherwise do nothing
481                    if let Some(actual_port) = listener.port.get() {
482                        endpoint.set_port(Some(*actual_port)).unwrap();
483                    }
484                }
485
486                endpoint
487            }
488
489            #[cfg(feature = "p2p-tor")]
490            ListenerVariant::Tor(listener) => listener.endpoint.get().unwrap().clone(),
491
492            #[cfg(feature = "p2p-quic")]
493            ListenerVariant::Quic(listener) => {
494                let mut endpoint = self.endpoint.clone();
495                let port = self.endpoint.port().unwrap();
496
497                if port == 0 {
498                    if let Some(actual_port) = listener.port.get() {
499                        endpoint.set_port(Some(*actual_port)).unwrap();
500                    }
501                }
502
503                endpoint
504            }
505
506            #[allow(unreachable_patterns)]
507            _ => self.endpoint.clone(),
508        }
509    }
510}
511
512/// Wrapper trait for async streams
513pub trait PtStream: AsyncRead + AsyncWrite + Unpin + Send {}
514
515impl PtStream for smol::net::TcpStream {}
516
517impl PtStream for futures_rustls::TlsStream<smol::net::TcpStream> {}
518
519#[cfg(feature = "p2p-tor")]
520impl PtStream for arti_client::DataStream {}
521
522#[cfg(feature = "p2p-tor")]
523impl PtStream for futures_rustls::TlsStream<arti_client::DataStream> {}
524
525#[cfg(feature = "p2p-unix")]
526impl PtStream for smol::net::unix::UnixStream {}
527
528#[cfg(feature = "p2p-quic")]
529impl PtStream for quic::QuicStream {}
530
531/// Wrapper trait for async listeners
532#[async_trait]
533pub trait PtListener: Send + Unpin {
534    async fn next(&self) -> io::Result<(Box<dyn PtStream>, Url)>;
535}