darkfi/net/transport/
tor.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    fmt::{self, Debug, Formatter},
21    fs::remove_dir_all,
22    io::{self, ErrorKind},
23    pin::Pin,
24    sync::Arc,
25    time::Duration,
26};
27
28use arti_client::{
29    config::{onion_service::OnionServiceConfigBuilder, BoolOrAuto, TorClientConfigBuilder},
30    DataStream, StreamPrefs, TorClient,
31};
32use async_trait::async_trait;
33use futures::{
34    future::{select, Either},
35    pin_mut,
36    stream::StreamExt,
37    Stream,
38};
39use smol::{
40    lock::{Mutex as AsyncMutex, OnceCell},
41    Timer,
42};
43use tor_cell::relaycell::msg::Connected;
44use tor_error::ErrorReport;
45use tor_hsservice::{HsNickname, RendRequest, RunningOnionService};
46use tor_proto::client::stream::IncomingStreamRequest;
47use tor_rtcompat::PreferredRuntime;
48use tracing::{debug, error, warn};
49use url::Url;
50
51use super::{PtListener, PtStream};
52use crate::util::{encoding::base32, logger::verbose, path::expand_path};
53
54/// A static for `TorClient` reusability
55static TOR_CLIENT: OnceCell<TorClient<PreferredRuntime>> = OnceCell::new();
56
57/// Tor Dialer implementation
58#[derive(Clone)]
59pub struct TorDialer {
60    client: TorClient<PreferredRuntime>,
61}
62
63impl Debug for TorDialer {
64    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
65        writeln!(f, "TorDialer {{ TorClient }}")
66    }
67}
68
69impl TorDialer {
70    /// Instantiate a new [`TorDialer`] object
71    pub(crate) async fn new(datastore: Option<String>) -> io::Result<Self> {
72        // Initialize or fetch the static TOR_CLIENT that should be reused in
73        // the Tor dialer
74        let client = match TOR_CLIENT
75            .get_or_try_init(|| async {
76                debug!(target: "net::tor::TorDialer", "Bootstrapping...");
77                if let Some(datadir) = &datastore {
78                    let datadir = expand_path(datadir).unwrap();
79                    let arti_data = datadir.join("arti-data");
80                    let arti_cache = datadir.join("arti-cache");
81
82                    // Reset arti folders.
83                    // We unwrap here so we panic in case of errors.
84                    if arti_data.exists() {
85                        remove_dir_all(&arti_data).unwrap();
86                    }
87                    if arti_cache.exists() {
88                        remove_dir_all(&arti_cache).unwrap();
89                    }
90
91                    let config = TorClientConfigBuilder::from_directories(arti_data, arti_cache)
92                        .build()
93                        .unwrap();
94
95                    TorClient::create_bootstrapped(config).await
96                } else {
97                    TorClient::builder().create_bootstrapped().await
98                }
99            })
100            .await
101        {
102            Ok(client) => client.isolated_client(),
103            Err(e) => {
104                warn!(target: "net::tor::TorDialer", "{}", e.report());
105                return Err(io::Error::other("Internal Tor error, see logged warning"));
106            }
107        };
108
109        Ok(Self { client })
110    }
111
112    /// Internal dial function
113    pub(crate) async fn do_dial(
114        &self,
115        host: &str,
116        port: u16,
117        conn_timeout: Option<Duration>,
118    ) -> io::Result<DataStream> {
119        debug!(target: "net::tor::do_dial", "Dialing {host}:{port} with Tor...");
120
121        let mut stream_prefs = StreamPrefs::new();
122        stream_prefs.connect_to_onion_services(BoolOrAuto::Explicit(true));
123
124        // If a timeout is configured, run both the connect and timeout futures
125        // and return whatever finishes first. Otherwise, wait on the connect future.
126        let connect = self.client.connect_with_prefs((host, port), &stream_prefs);
127
128        match conn_timeout {
129            Some(t) => {
130                let timeout = Timer::after(t);
131                pin_mut!(timeout);
132                pin_mut!(connect);
133
134                match select(connect, timeout).await {
135                    Either::Left((Ok(stream), _)) => Ok(stream),
136
137                    Either::Left((Err(e), _)) => {
138                        warn!(target: "net::tor::do_dial", "{}", e.report());
139                        Err(io::Error::other("Internal Tor error, see logged warning"))
140                    }
141
142                    Either::Right((_, _)) => Err(io::ErrorKind::TimedOut.into()),
143                }
144            }
145
146            None => {
147                match connect.await {
148                    Ok(stream) => Ok(stream),
149                    Err(e) => {
150                        // Extract error reports (i.e. very detailed debugging)
151                        // from arti-client in order to help debug Tor connections.
152                        // https://docs.rs/arti-client/latest/arti_client/#reporting-arti-errors
153                        // https://gitlab.torproject.org/tpo/core/arti/-/issues/1086
154                        warn!(target: "net::tor::do_dial", "{}", e.report());
155                        Err(io::Error::other("Internal Tor error, see logged warning"))
156                    }
157                }
158            }
159        }
160    }
161}
162
163/// Tor Listener implementation
164#[derive(Clone, Debug)]
165pub struct TorListener {
166    datastore: Option<String>,
167    pub endpoint: Arc<OnceCell<Url>>,
168}
169
170impl TorListener {
171    /// Instantiate a new [`TorListener`]
172    pub async fn new(datastore: Option<String>) -> io::Result<Self> {
173        Ok(Self { datastore, endpoint: Arc::new(OnceCell::new()) })
174    }
175
176    /// Internal listen function
177    pub(crate) async fn do_listen(&self, port: u16) -> io::Result<TorListenerIntern> {
178        // Initialize or fetch the static TOR_CLIENT that should be reused in
179        // the Tor dialer
180        let client = match TOR_CLIENT
181            .get_or_try_init(|| async {
182                debug!(target: "net::tor::do_listen", "Bootstrapping...");
183                if let Some(datadir) = &self.datastore {
184                    let datadir = expand_path(datadir).unwrap();
185                    let arti_data = datadir.join("arti-data");
186                    let arti_cache = datadir.join("arti-cache");
187
188                    // Reset arti folders.
189                    // We unwrap here so we panic in case of errors.
190                    if arti_data.exists() {
191                        remove_dir_all(&arti_data).unwrap();
192                    }
193                    if arti_cache.exists() {
194                        remove_dir_all(&arti_cache).unwrap();
195                    }
196
197                    let config = TorClientConfigBuilder::from_directories(arti_data, arti_cache)
198                        .build()
199                        .unwrap();
200
201                    TorClient::create_bootstrapped(config).await
202                } else {
203                    TorClient::builder().create_bootstrapped().await
204                }
205            })
206            .await
207        {
208            Ok(client) => client.isolated_client(),
209            Err(e) => {
210                warn!(target: "net::tor::do_listen", "{}", e.report());
211                return Err(io::Error::other("Internal Tor error, see logged warning"));
212            }
213        };
214
215        let hs_nick = HsNickname::new("darkfi_tor".to_string()).unwrap();
216
217        let hs_config = match OnionServiceConfigBuilder::default().nickname(hs_nick).build() {
218            Ok(v) => v,
219            Err(e) => {
220                error!(
221                    target: "net::tor::do_listen",
222                    "[P2P] Failed to create OnionServiceConfig: {e}"
223                );
224                return Err(io::Error::other("Internal Tor error"));
225            }
226        };
227
228        let (onion_service, rendreq_stream) = match client.launch_onion_service(hs_config) {
229            Ok(Some(v)) => v,
230            Ok(None) => {
231                error!(
232                    target: "net::tor::do_listen",
233                    "[P2P] Onion service disabled in config",
234                );
235                return Err(io::Error::other("Internal Tor error"));
236            }
237            Err(e) => {
238                error!(
239                    target: "net::tor::do_listen",
240                    "[P2P] Failed to launch Onion Service: {e}"
241                );
242                return Err(io::Error::other("Internal Tor error"));
243            }
244        };
245
246        let onion_id =
247            base32::encode(false, onion_service.onion_address().unwrap().as_ref()).to_lowercase();
248
249        verbose!(
250            target: "net::tor::do_listen",
251            "[P2P] Established Tor listener on tor://{}:{port}", onion_id,
252        );
253
254        let endpoint = Url::parse(&format!("tor://{onion_id}:{port}")).unwrap();
255        self.endpoint.set(endpoint).await.expect("fatal endpoint already set for TorListener");
256
257        Ok(TorListenerIntern {
258            port,
259            _onion_service: onion_service,
260            rendreq_stream: AsyncMutex::new(Box::pin(rendreq_stream)),
261        })
262    }
263}
264
265/// Internal Tor Listener implementation, used with `PtListener`
266pub struct TorListenerIntern {
267    port: u16,
268    _onion_service: Arc<RunningOnionService>,
269    //rendreq_stream: Mutex<BoxStream<'a, RendRequest>>,
270    rendreq_stream: AsyncMutex<Pin<Box<dyn Stream<Item = RendRequest> + Send>>>,
271}
272
273unsafe impl Sync for TorListenerIntern {}
274
275#[async_trait]
276impl PtListener for TorListenerIntern {
277    async fn next(&self) -> io::Result<(Box<dyn PtStream>, Url)> {
278        let mut rendreq_stream = self.rendreq_stream.lock().await;
279
280        let Some(rendrequest) = rendreq_stream.next().await else {
281            return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"));
282        };
283
284        drop(rendreq_stream);
285
286        let mut streamreq_stream = match rendrequest.accept().await {
287            Ok(v) => v,
288            Err(e) => {
289                error!(
290                    target: "net::tor::PtListener::next",
291                    "[P2P] Failed accepting Tor RendRequest: {e}"
292                );
293                return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"));
294            }
295        };
296
297        let Some(streamrequest) = streamreq_stream.next().await else {
298            return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"));
299        };
300
301        // Validate port correctness
302        match streamrequest.request() {
303            IncomingStreamRequest::Begin(begin) => {
304                if begin.port() != self.port {
305                    return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"));
306                }
307            }
308            &_ => return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted")),
309        }
310
311        let stream = match streamrequest.accept(Connected::new_empty()).await {
312            Ok(v) => v,
313            Err(e) => {
314                error!(
315                    target: "net::tor::PtListener::next",
316                    "[P2P] Failed accepting Tor StreamRequest: {e}"
317                );
318                return Err(io::Error::other("Internal Tor error"));
319            }
320        };
321
322        Ok((Box::new(stream), Url::parse(&format!("tor://127.0.0.1:{}", self.port)).unwrap()))
323    }
324}