darkfi/net/transport/
mod.rs1use std::{io, time::Duration};
20
21use async_trait::async_trait;
22use smol::io::{AsyncRead, AsyncWrite};
23use tracing::error;
24use url::Url;
25
26#[cfg(feature = "p2p-unix")]
27use std::io::ErrorKind;
28
29pub(crate) mod tls;
31
32#[cfg(feature = "p2p-socks5")]
34pub mod socks5;
35
36pub(crate) mod tcp;
38
39#[cfg(feature = "p2p-tor")]
40pub(crate) mod tor;
42
43#[cfg(feature = "p2p-nym")]
44pub(crate) mod nym;
46
47#[cfg(feature = "p2p-unix")]
49pub(crate) mod unix;
50
51#[cfg(feature = "p2p-quic")]
53pub(crate) mod quic;
54
55#[derive(Debug, Clone)]
57pub enum DialerVariant {
58 Tcp(tcp::TcpDialer),
60
61 TcpTls(tcp::TcpDialer),
63
64 #[cfg(feature = "p2p-tor")]
65 Tor(tor::TorDialer),
67
68 #[cfg(feature = "p2p-tor")]
69 TorTls(tor::TorDialer),
71
72 #[cfg(feature = "p2p-nym")]
73 Nym(nym::NymDialer),
75
76 #[cfg(feature = "p2p-nym")]
77 NymTls(nym::NymDialer),
79
80 #[cfg(feature = "p2p-unix")]
82 Unix(unix::UnixDialer),
83
84 #[cfg(feature = "p2p-socks5")]
86 Socks5(socks5::Socks5Dialer),
87
88 #[cfg(feature = "p2p-socks5")]
90 Socks5Tls(socks5::Socks5Dialer),
91
92 Quic(quic::QuicDialer),
94}
95
96#[derive(Debug, Clone)]
98pub enum ListenerVariant {
99 Tcp(tcp::TcpListener),
101
102 TcpTls(tcp::TcpListener),
104
105 #[cfg(feature = "p2p-tor")]
106 Tor(tor::TorListener),
108
109 #[cfg(feature = "p2p-unix")]
110 Unix(unix::UnixListener),
112
113 #[cfg(feature = "p2p-quic")]
114 Quic(quic::QuicListener),
116}
117
118pub struct Dialer {
120 endpoint: Url,
122 variant: DialerVariant,
124}
125
126macro_rules! enforce_hostport {
127 ($endpoint:ident) => {
128 if $endpoint.host_str().is_none() || $endpoint.port().is_none() {
129 return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
130 }
131 };
132}
133
134#[cfg(feature = "p2p-unix")]
135macro_rules! enforce_abspath {
136 ($endpoint:ident) => {
137 if $endpoint.host_str().is_some() || $endpoint.port().is_some() {
138 return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
139 }
140
141 if $endpoint.to_file_path().is_err() {
142 return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
143 }
144 };
145}
146
147impl Dialer {
148 pub async fn new(
150 endpoint: Url,
151 datastore: Option<String>,
152 i2p_socks5_proxy: Option<Url>,
153 ) -> io::Result<Self> {
154 match endpoint.scheme().to_lowercase().as_str() {
155 "tcp" => {
156 enforce_hostport!(endpoint);
158 let variant = tcp::TcpDialer::new(None).await?;
159 let variant = DialerVariant::Tcp(variant);
160 Ok(Self { endpoint, variant })
161 }
162
163 "tcp+tls" => {
164 enforce_hostport!(endpoint);
166 let variant = tcp::TcpDialer::new(None).await?;
167 let variant = DialerVariant::TcpTls(variant);
168 Ok(Self { endpoint, variant })
169 }
170
171 #[cfg(feature = "p2p-tor")]
172 "tor" => {
173 enforce_hostport!(endpoint);
175 let variant = tor::TorDialer::new(datastore).await?;
176 let variant = DialerVariant::Tor(variant);
177 Ok(Self { endpoint, variant })
178 }
179
180 #[cfg(feature = "p2p-tor")]
181 "tor+tls" => {
182 enforce_hostport!(endpoint);
184 let variant = tor::TorDialer::new(datastore).await?;
185 let variant = DialerVariant::TorTls(variant);
186 Ok(Self { endpoint, variant })
187 }
188
189 #[cfg(feature = "p2p-nym")]
190 "nym" => {
191 enforce_hostport!(endpoint);
193 let variant = nym::NymDialer::new().await?;
194 let variant = DialerVariant::Nym(variant);
195 Ok(Self { endpoint, variant })
196 }
197
198 #[cfg(feature = "p2p-nym")]
199 "nym+tls" => {
200 enforce_hostport!(endpoint);
202 let variant = nym::NymDialer::new().await?;
203 let variant = DialerVariant::NymTls(variant);
204 Ok(Self { endpoint, variant })
205 }
206
207 #[cfg(feature = "p2p-unix")]
208 "unix" => {
209 enforce_abspath!(endpoint);
211 let variant = unix::UnixDialer::new().await?;
212 let variant = DialerVariant::Unix(variant);
213 Ok(Self { endpoint, variant })
214 }
215
216 #[cfg(feature = "p2p-socks5")]
217 "socks5" => {
218 enforce_hostport!(endpoint);
220 let variant = socks5::Socks5Dialer::new(&endpoint).await?;
221 let variant = DialerVariant::Socks5(variant);
222 Ok(Self { endpoint, variant })
223 }
224
225 #[cfg(feature = "p2p-socks5")]
226 "socks5+tls" => {
227 enforce_hostport!(endpoint);
229 let variant = socks5::Socks5Dialer::new(&endpoint).await?;
230 let variant = DialerVariant::Socks5Tls(variant);
231 Ok(Self { endpoint, variant })
232 }
233
234 #[cfg(feature = "p2p-i2p")]
235 "i2p" => {
236 enforce_hostport!(endpoint);
238 let mut url = i2p_socks5_proxy.unwrap();
239 url.set_path(&format!("{}:{}", endpoint.host().unwrap(), endpoint.port().unwrap()));
240 let variant = socks5::Socks5Dialer::new(&url).await?;
241 let variant = DialerVariant::Socks5(variant);
242 Ok(Self { endpoint, variant })
243 }
244
245 #[cfg(feature = "p2p-i2p")]
246 "i2p+tls" => {
247 enforce_hostport!(endpoint);
249 let mut url = i2p_socks5_proxy.unwrap();
250 url.set_path(&format!("{}:{}", endpoint.host().unwrap(), endpoint.port().unwrap()));
251 url.set_scheme("socks5+tls").unwrap();
252 let variant = socks5::Socks5Dialer::new(&url).await?;
253 let variant = DialerVariant::Socks5Tls(variant);
254 Ok(Self { endpoint, variant })
255 }
256
257 #[cfg(feature = "p2p-quic")]
258 "quic" => {
259 enforce_hostport!(endpoint);
261 let variant = quic::QuicDialer::new().await?;
262 let variant = DialerVariant::Quic(variant);
263 Ok(Self { endpoint, variant })
264 }
265
266 x => {
267 error!("[P2P] Requested unsupported transport: {x}");
268 Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
269 }
270 }
271 }
272
273 pub async fn dial(&self, timeout: Option<Duration>) -> io::Result<Box<dyn PtStream>> {
279 match &self.variant {
280 DialerVariant::Tcp(dialer) => {
281 let sockaddr = self.endpoint.socket_addrs(|| None)?;
283 let stream = dialer.do_dial(sockaddr[0], timeout).await?;
284 Ok(Box::new(stream))
285 }
286
287 DialerVariant::TcpTls(dialer) => {
288 let sockaddr = self.endpoint.socket_addrs(|| None)?;
289 let stream = dialer.do_dial(sockaddr[0], timeout).await?;
290 let tlsupgrade = tls::TlsUpgrade::new().await?;
291 let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
292 Ok(Box::new(stream))
293 }
294
295 #[cfg(feature = "p2p-tor")]
296 DialerVariant::Tor(dialer) => {
297 let host = self.endpoint.host_str().unwrap();
298 let port = self.endpoint.port().unwrap();
299 let stream = dialer.do_dial(host, port, timeout).await?;
300 Ok(Box::new(stream))
301 }
302
303 #[cfg(feature = "p2p-tor")]
304 DialerVariant::TorTls(dialer) => {
305 let host = self.endpoint.host_str().unwrap();
306 let port = self.endpoint.port().unwrap();
307 let stream = dialer.do_dial(host, port, timeout).await?;
308 let tlsupgrade = tls::TlsUpgrade::new().await?;
309 let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
310 Ok(Box::new(stream))
311 }
312
313 #[cfg(feature = "p2p-nym")]
314 DialerVariant::Nym(_dialer) => {
315 todo!();
316 }
317
318 #[cfg(feature = "p2p-nym")]
319 DialerVariant::NymTls(_dialer) => {
320 todo!();
321 }
322
323 #[cfg(feature = "p2p-unix")]
324 DialerVariant::Unix(dialer) => {
325 let path = match self.endpoint.to_file_path() {
326 Ok(v) => v,
327 Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
328 };
329 let stream = dialer.do_dial(path).await?;
330 Ok(Box::new(stream))
331 }
332
333 #[cfg(feature = "p2p-socks5")]
334 DialerVariant::Socks5(dialer) => {
335 let stream = dialer.do_dial().await?;
336 Ok(Box::new(stream))
337 }
338
339 #[cfg(feature = "p2p-socks5")]
340 DialerVariant::Socks5Tls(dialer) => {
341 let stream = dialer.do_dial().await?;
342 let tlsupgrade = tls::TlsUpgrade::new().await?;
343 let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
344 Ok(Box::new(stream))
345 }
346
347 #[cfg(feature = "p2p-quic")]
348 DialerVariant::Quic(dialer) => {
349 let sockaddr = self.endpoint.socket_addrs(|| None)?;
350 let stream = dialer.do_dial(sockaddr[0], timeout).await?;
351 Ok(Box::new(stream))
352 }
353 }
354 }
355
356 pub fn endpoint(&self) -> &Url {
358 &self.endpoint
359 }
360}
361
362pub struct Listener {
364 endpoint: Url,
366 variant: ListenerVariant,
368}
369
370impl Listener {
371 pub async fn new(endpoint: Url, datastore: Option<String>) -> io::Result<Self> {
374 match endpoint.scheme().to_lowercase().as_str() {
375 "tcp" => {
376 enforce_hostport!(endpoint);
378 let variant = tcp::TcpListener::new(1024).await?;
379 let variant = ListenerVariant::Tcp(variant);
380 Ok(Self { endpoint, variant })
381 }
382
383 "tcp+tls" => {
384 enforce_hostport!(endpoint);
386 let variant = tcp::TcpListener::new(1024).await?;
387 let variant = ListenerVariant::TcpTls(variant);
388 Ok(Self { endpoint, variant })
389 }
390
391 #[cfg(feature = "p2p-tor")]
392 "tor" => {
393 enforce_hostport!(endpoint);
395 let variant = tor::TorListener::new(datastore).await?;
396 let variant = ListenerVariant::Tor(variant);
397 Ok(Self { endpoint, variant })
398 }
399
400 #[cfg(feature = "p2p-unix")]
401 "unix" => {
402 enforce_abspath!(endpoint);
403 let variant = unix::UnixListener::new().await?;
404 let variant = ListenerVariant::Unix(variant);
405 Ok(Self { endpoint, variant })
406 }
407
408 #[cfg(feature = "p2p-quic")]
409 "quic" => {
410 enforce_hostport!(endpoint);
411 let variant = quic::QuicListener::new().await?;
412 let variant = ListenerVariant::Quic(variant);
413 Ok(Self { endpoint, variant })
414 }
415
416 x => {
417 error!("[P2P] Requested unsupported transport: {x}");
418 Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
419 }
420 }
421 }
422
423 pub async fn listen(&self) -> io::Result<Box<dyn PtListener>> {
426 match &self.variant {
427 ListenerVariant::Tcp(listener) => {
428 let sockaddr = self.endpoint.socket_addrs(|| None)?;
429 let l = listener.do_listen(sockaddr[0]).await?;
430 Ok(Box::new(l))
431 }
432
433 ListenerVariant::TcpTls(listener) => {
434 let sockaddr = self.endpoint.socket_addrs(|| None)?;
435 let l = listener.do_listen(sockaddr[0]).await?;
436 let tlsupgrade = tls::TlsUpgrade::new().await?;
437 let l = tlsupgrade.upgrade_listener_tcp_tls(l).await?;
438 Ok(Box::new(l))
439 }
440
441 #[cfg(feature = "p2p-tor")]
442 ListenerVariant::Tor(listener) => {
443 let port = self.endpoint.port().unwrap();
444 let l = listener.do_listen(port).await?;
445 Ok(Box::new(l))
446 }
447
448 #[cfg(feature = "p2p-unix")]
449 ListenerVariant::Unix(listener) => {
450 let path = match self.endpoint.to_file_path() {
451 Ok(v) => v,
452 Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
453 };
454 let l = listener.do_listen(&path).await?;
455 Ok(Box::new(l))
456 }
457
458 #[cfg(feature = "p2p-quic")]
459 ListenerVariant::Quic(listener) => {
460 let sockaddr = self.endpoint.socket_addrs(|| None)?;
461 let l = listener.do_listen(sockaddr[0]).await?;
462 Ok(Box::new(l))
463 }
464 }
465 }
466
467 pub async fn endpoint(&self) -> Url {
469 match &self.variant {
470 ListenerVariant::Tcp(listener) | ListenerVariant::TcpTls(listener) => {
471 let mut endpoint = self.endpoint.clone();
472
473 let port = self.endpoint.port().unwrap();
476
477 if port == 0 {
480 if let Some(actual_port) = listener.port.get() {
482 endpoint.set_port(Some(*actual_port)).unwrap();
483 }
484 }
485
486 endpoint
487 }
488
489 #[cfg(feature = "p2p-tor")]
490 ListenerVariant::Tor(listener) => listener.endpoint.get().unwrap().clone(),
491
492 #[cfg(feature = "p2p-quic")]
493 ListenerVariant::Quic(listener) => {
494 let mut endpoint = self.endpoint.clone();
495 let port = self.endpoint.port().unwrap();
496
497 if port == 0 {
498 if let Some(actual_port) = listener.port.get() {
499 endpoint.set_port(Some(*actual_port)).unwrap();
500 }
501 }
502
503 endpoint
504 }
505
506 #[allow(unreachable_patterns)]
507 _ => self.endpoint.clone(),
508 }
509 }
510}
511
512pub trait PtStream: AsyncRead + AsyncWrite + Unpin + Send {}
514
515impl PtStream for smol::net::TcpStream {}
516
517impl PtStream for futures_rustls::TlsStream<smol::net::TcpStream> {}
518
519#[cfg(feature = "p2p-tor")]
520impl PtStream for arti_client::DataStream {}
521
522#[cfg(feature = "p2p-tor")]
523impl PtStream for futures_rustls::TlsStream<arti_client::DataStream> {}
524
525#[cfg(feature = "p2p-unix")]
526impl PtStream for smol::net::unix::UnixStream {}
527
528#[cfg(feature = "p2p-quic")]
529impl PtStream for quic::QuicStream {}
530
531#[async_trait]
533pub trait PtListener: Send + Unpin {
534 async fn next(&self) -> io::Result<(Box<dyn PtStream>, Url)>;
535}