darkfi/net/transport/
mod.rs1use std::{io, time::Duration};
20
21use async_trait::async_trait;
22use smol::io::{AsyncRead, AsyncWrite};
23use tracing::error;
24use url::Url;
25
26#[cfg(feature = "p2p-unix")]
27use std::io::ErrorKind;
28
29pub(crate) mod tls;
31
32#[cfg(feature = "p2p-socks5")]
34pub mod socks5;
35
36pub(crate) mod tcp;
38
39#[cfg(feature = "p2p-tor")]
40pub(crate) mod tor;
42
43#[cfg(feature = "p2p-nym")]
44pub(crate) mod nym;
46
47#[cfg(feature = "p2p-unix")]
49pub(crate) mod unix;
50
51#[derive(Debug, Clone)]
53pub enum DialerVariant {
54 Tcp(tcp::TcpDialer),
56
57 TcpTls(tcp::TcpDialer),
59
60 #[cfg(feature = "p2p-tor")]
61 Tor(tor::TorDialer),
63
64 #[cfg(feature = "p2p-tor")]
65 TorTls(tor::TorDialer),
67
68 #[cfg(feature = "p2p-nym")]
69 Nym(nym::NymDialer),
71
72 #[cfg(feature = "p2p-nym")]
73 NymTls(nym::NymDialer),
75
76 #[cfg(feature = "p2p-unix")]
78 Unix(unix::UnixDialer),
79
80 #[cfg(feature = "p2p-socks5")]
82 Socks5(socks5::Socks5Dialer),
83
84 #[cfg(feature = "p2p-socks5")]
86 Socks5Tls(socks5::Socks5Dialer),
87}
88
89#[derive(Debug, Clone)]
91pub enum ListenerVariant {
92 Tcp(tcp::TcpListener),
94
95 TcpTls(tcp::TcpListener),
97
98 #[cfg(feature = "p2p-tor")]
99 Tor(tor::TorListener),
101
102 #[cfg(feature = "p2p-unix")]
104 Unix(unix::UnixListener),
105}
106
107pub struct Dialer {
109 endpoint: Url,
111 variant: DialerVariant,
113}
114
115macro_rules! enforce_hostport {
116 ($endpoint:ident) => {
117 if $endpoint.host_str().is_none() || $endpoint.port().is_none() {
118 return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
119 }
120 };
121}
122
123#[cfg(feature = "p2p-unix")]
124macro_rules! enforce_abspath {
125 ($endpoint:ident) => {
126 if $endpoint.host_str().is_some() || $endpoint.port().is_some() {
127 return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
128 }
129
130 if $endpoint.to_file_path().is_err() {
131 return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
132 }
133 };
134}
135
136impl Dialer {
137 pub async fn new(
139 endpoint: Url,
140 datastore: Option<String>,
141 i2p_socks5_proxy: Option<Url>,
142 ) -> io::Result<Self> {
143 match endpoint.scheme().to_lowercase().as_str() {
144 "tcp" => {
145 enforce_hostport!(endpoint);
147 let variant = tcp::TcpDialer::new(None).await?;
148 let variant = DialerVariant::Tcp(variant);
149 Ok(Self { endpoint, variant })
150 }
151
152 "tcp+tls" => {
153 enforce_hostport!(endpoint);
155 let variant = tcp::TcpDialer::new(None).await?;
156 let variant = DialerVariant::TcpTls(variant);
157 Ok(Self { endpoint, variant })
158 }
159
160 #[cfg(feature = "p2p-tor")]
161 "tor" => {
162 enforce_hostport!(endpoint);
164 let variant = tor::TorDialer::new(datastore).await?;
165 let variant = DialerVariant::Tor(variant);
166 Ok(Self { endpoint, variant })
167 }
168
169 #[cfg(feature = "p2p-tor")]
170 "tor+tls" => {
171 enforce_hostport!(endpoint);
173 let variant = tor::TorDialer::new(datastore).await?;
174 let variant = DialerVariant::TorTls(variant);
175 Ok(Self { endpoint, variant })
176 }
177
178 #[cfg(feature = "p2p-nym")]
179 "nym" => {
180 enforce_hostport!(endpoint);
182 let variant = nym::NymDialer::new().await?;
183 let variant = DialerVariant::Nym(variant);
184 Ok(Self { endpoint, variant })
185 }
186
187 #[cfg(feature = "p2p-nym")]
188 "nym+tls" => {
189 enforce_hostport!(endpoint);
191 let variant = nym::NymDialer::new().await?;
192 let variant = DialerVariant::NymTls(variant);
193 Ok(Self { endpoint, variant })
194 }
195
196 #[cfg(feature = "p2p-unix")]
197 "unix" => {
198 enforce_abspath!(endpoint);
200 let variant = unix::UnixDialer::new().await?;
201 let variant = DialerVariant::Unix(variant);
202 Ok(Self { endpoint, variant })
203 }
204
205 #[cfg(feature = "p2p-socks5")]
206 "socks5" => {
207 enforce_hostport!(endpoint);
209 let variant = socks5::Socks5Dialer::new(&endpoint).await?;
210 let variant = DialerVariant::Socks5(variant);
211 Ok(Self { endpoint, variant })
212 }
213
214 #[cfg(feature = "p2p-socks5")]
215 "socks5+tls" => {
216 enforce_hostport!(endpoint);
218 let variant = socks5::Socks5Dialer::new(&endpoint).await?;
219 let variant = DialerVariant::Socks5Tls(variant);
220 Ok(Self { endpoint, variant })
221 }
222
223 #[cfg(feature = "p2p-i2p")]
224 "i2p" => {
225 enforce_hostport!(endpoint);
227 let mut url = i2p_socks5_proxy.unwrap();
228 url.set_path(&format!("{}:{}", endpoint.host().unwrap(), endpoint.port().unwrap()));
229 let variant = socks5::Socks5Dialer::new(&url).await?;
230 let variant = DialerVariant::Socks5(variant);
231 Ok(Self { endpoint, variant })
232 }
233
234 #[cfg(feature = "p2p-i2p")]
235 "i2p+tls" => {
236 enforce_hostport!(endpoint);
238 let mut url = i2p_socks5_proxy.unwrap();
239 url.set_path(&format!("{}:{}", endpoint.host().unwrap(), endpoint.port().unwrap()));
240 url.set_scheme("socks5+tls").unwrap();
241 let variant = socks5::Socks5Dialer::new(&url).await?;
242 let variant = DialerVariant::Socks5Tls(variant);
243 Ok(Self { endpoint, variant })
244 }
245
246 x => {
247 error!("[P2P] Requested unsupported transport: {x}");
248 Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
249 }
250 }
251 }
252
253 pub async fn dial(&self, timeout: Option<Duration>) -> io::Result<Box<dyn PtStream>> {
259 match &self.variant {
260 DialerVariant::Tcp(dialer) => {
261 let sockaddr = self.endpoint.socket_addrs(|| None)?;
263 let stream = dialer.do_dial(sockaddr[0], timeout).await?;
264 Ok(Box::new(stream))
265 }
266
267 DialerVariant::TcpTls(dialer) => {
268 let sockaddr = self.endpoint.socket_addrs(|| None)?;
269 let stream = dialer.do_dial(sockaddr[0], timeout).await?;
270 let tlsupgrade = tls::TlsUpgrade::new().await?;
271 let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
272 Ok(Box::new(stream))
273 }
274
275 #[cfg(feature = "p2p-tor")]
276 DialerVariant::Tor(dialer) => {
277 let host = self.endpoint.host_str().unwrap();
278 let port = self.endpoint.port().unwrap();
279 let stream = dialer.do_dial(host, port, timeout).await?;
280 Ok(Box::new(stream))
281 }
282
283 #[cfg(feature = "p2p-tor")]
284 DialerVariant::TorTls(dialer) => {
285 let host = self.endpoint.host_str().unwrap();
286 let port = self.endpoint.port().unwrap();
287 let stream = dialer.do_dial(host, port, timeout).await?;
288 let tlsupgrade = tls::TlsUpgrade::new().await?;
289 let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
290 Ok(Box::new(stream))
291 }
292
293 #[cfg(feature = "p2p-nym")]
294 DialerVariant::Nym(_dialer) => {
295 todo!();
296 }
297
298 #[cfg(feature = "p2p-nym")]
299 DialerVariant::NymTls(_dialer) => {
300 todo!();
301 }
302
303 #[cfg(feature = "p2p-unix")]
304 DialerVariant::Unix(dialer) => {
305 let path = match self.endpoint.to_file_path() {
306 Ok(v) => v,
307 Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
308 };
309 let stream = dialer.do_dial(path).await?;
310 Ok(Box::new(stream))
311 }
312
313 #[cfg(feature = "p2p-socks5")]
314 DialerVariant::Socks5(dialer) => {
315 let stream = dialer.do_dial().await?;
316 Ok(Box::new(stream))
317 }
318
319 #[cfg(feature = "p2p-socks5")]
320 DialerVariant::Socks5Tls(dialer) => {
321 let stream = dialer.do_dial().await?;
322 let tlsupgrade = tls::TlsUpgrade::new().await?;
323 let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
324 Ok(Box::new(stream))
325 }
326 }
327 }
328
329 pub fn endpoint(&self) -> &Url {
331 &self.endpoint
332 }
333}
334
335pub struct Listener {
337 endpoint: Url,
339 variant: ListenerVariant,
341}
342
343impl Listener {
344 pub async fn new(endpoint: Url, datastore: Option<String>) -> io::Result<Self> {
347 match endpoint.scheme().to_lowercase().as_str() {
348 "tcp" => {
349 enforce_hostport!(endpoint);
351 let variant = tcp::TcpListener::new(1024).await?;
352 let variant = ListenerVariant::Tcp(variant);
353 Ok(Self { endpoint, variant })
354 }
355
356 "tcp+tls" => {
357 enforce_hostport!(endpoint);
359 let variant = tcp::TcpListener::new(1024).await?;
360 let variant = ListenerVariant::TcpTls(variant);
361 Ok(Self { endpoint, variant })
362 }
363
364 #[cfg(feature = "p2p-tor")]
365 "tor" => {
366 enforce_hostport!(endpoint);
368 let variant = tor::TorListener::new(datastore).await?;
369 let variant = ListenerVariant::Tor(variant);
370 Ok(Self { endpoint, variant })
371 }
372
373 #[cfg(feature = "p2p-unix")]
374 "unix" => {
375 enforce_abspath!(endpoint);
376 let variant = unix::UnixListener::new().await?;
377 let variant = ListenerVariant::Unix(variant);
378 Ok(Self { endpoint, variant })
379 }
380
381 x => {
382 error!("[P2P] Requested unsupported transport: {x}");
383 Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
384 }
385 }
386 }
387
388 pub async fn listen(&self) -> io::Result<Box<dyn PtListener>> {
391 match &self.variant {
392 ListenerVariant::Tcp(listener) => {
393 let sockaddr = self.endpoint.socket_addrs(|| None)?;
394 let l = listener.do_listen(sockaddr[0]).await?;
395 Ok(Box::new(l))
396 }
397
398 ListenerVariant::TcpTls(listener) => {
399 let sockaddr = self.endpoint.socket_addrs(|| None)?;
400 let l = listener.do_listen(sockaddr[0]).await?;
401 let tlsupgrade = tls::TlsUpgrade::new().await?;
402 let l = tlsupgrade.upgrade_listener_tcp_tls(l).await?;
403 Ok(Box::new(l))
404 }
405
406 #[cfg(feature = "p2p-tor")]
407 ListenerVariant::Tor(listener) => {
408 let port = self.endpoint.port().unwrap();
409 let l = listener.do_listen(port).await?;
410 Ok(Box::new(l))
411 }
412
413 #[cfg(feature = "p2p-unix")]
414 ListenerVariant::Unix(listener) => {
415 let path = match self.endpoint.to_file_path() {
416 Ok(v) => v,
417 Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
418 };
419 let l = listener.do_listen(&path).await?;
420 Ok(Box::new(l))
421 }
422 }
423 }
424
425 pub async fn endpoint(&self) -> Url {
427 match &self.variant {
428 ListenerVariant::Tcp(listener) | ListenerVariant::TcpTls(listener) => {
429 let mut endpoint = self.endpoint.clone();
430
431 let port = self.endpoint.port().unwrap();
434
435 if port == 0 {
438 if let Some(actual_port) = listener.port.get() {
440 endpoint.set_port(Some(*actual_port)).unwrap();
441 }
442 }
443
444 endpoint
445 }
446 #[cfg(feature = "p2p-tor")]
447 ListenerVariant::Tor(listener) => listener.endpoint.get().unwrap().clone(),
448 #[allow(unreachable_patterns)]
449 _ => self.endpoint.clone(),
450 }
451 }
452}
453
454pub trait PtStream: AsyncRead + AsyncWrite + Unpin + Send {}
456
457impl PtStream for smol::net::TcpStream {}
458
459impl PtStream for futures_rustls::TlsStream<smol::net::TcpStream> {}
460
461#[cfg(feature = "p2p-tor")]
462impl PtStream for arti_client::DataStream {}
463
464#[cfg(feature = "p2p-tor")]
465impl PtStream for futures_rustls::TlsStream<arti_client::DataStream> {}
466
467#[cfg(feature = "p2p-unix")]
468impl PtStream for smol::net::unix::UnixStream {}
469
470#[async_trait]
472pub trait PtListener: Send + Unpin {
473 async fn next(&self) -> io::Result<(Box<dyn PtStream>, Url)>;
474}