1use std::{
20 fmt::{self, Debug, Formatter},
21 fs::remove_dir_all,
22 io::{self, ErrorKind},
23 pin::Pin,
24 sync::Arc,
25 time::Duration,
26};
27
28use arti_client::{
29 config::{onion_service::OnionServiceConfigBuilder, BoolOrAuto, TorClientConfigBuilder},
30 DataStream, StreamPrefs, TorClient,
31};
32use async_trait::async_trait;
33use futures::{
34 future::{select, Either},
35 pin_mut,
36 stream::StreamExt,
37 Stream,
38};
39use smol::{
40 lock::{Mutex as AsyncMutex, OnceCell},
41 Timer,
42};
43use tor_cell::relaycell::msg::Connected;
44use tor_error::ErrorReport;
45use tor_hsservice::{HsNickname, RendRequest, RunningOnionService};
46use tor_proto::client::stream::IncomingStreamRequest;
47use tor_rtcompat::PreferredRuntime;
48use tracing::{debug, error, warn};
49use url::Url;
50
51use super::{PtListener, PtStream};
52use crate::util::{encoding::base32, logger::verbose, path::expand_path};
53
54static TOR_CLIENT: OnceCell<TorClient<PreferredRuntime>> = OnceCell::new();
56
57#[derive(Clone)]
59pub struct TorDialer {
60 client: TorClient<PreferredRuntime>,
61}
62
63impl Debug for TorDialer {
64 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
65 writeln!(f, "TorDialer {{ TorClient }}")
66 }
67}
68
69impl TorDialer {
70 pub(crate) async fn new(datastore: Option<String>) -> io::Result<Self> {
72 let client = match TOR_CLIENT
75 .get_or_try_init(|| async {
76 debug!(target: "net::tor::TorDialer", "Bootstrapping...");
77 if let Some(datadir) = &datastore {
78 let datadir = expand_path(datadir).unwrap();
79 let arti_data = datadir.join("arti-data");
80 let arti_cache = datadir.join("arti-cache");
81
82 if arti_data.exists() {
85 remove_dir_all(&arti_data).unwrap();
86 }
87 if arti_cache.exists() {
88 remove_dir_all(&arti_cache).unwrap();
89 }
90
91 let config = TorClientConfigBuilder::from_directories(arti_data, arti_cache)
92 .build()
93 .unwrap();
94
95 TorClient::create_bootstrapped(config).await
96 } else {
97 TorClient::builder().create_bootstrapped().await
98 }
99 })
100 .await
101 {
102 Ok(client) => client.isolated_client(),
103 Err(e) => {
104 warn!(target: "net::tor::TorDialer", "{}", e.report());
105 return Err(io::Error::other("Internal Tor error, see logged warning"));
106 }
107 };
108
109 Ok(Self { client })
110 }
111
112 pub(crate) async fn do_dial(
114 &self,
115 host: &str,
116 port: u16,
117 conn_timeout: Option<Duration>,
118 ) -> io::Result<DataStream> {
119 debug!(target: "net::tor::do_dial", "Dialing {host}:{port} with Tor...");
120
121 let mut stream_prefs = StreamPrefs::new();
122 stream_prefs.connect_to_onion_services(BoolOrAuto::Explicit(true));
123
124 let connect = self.client.connect_with_prefs((host, port), &stream_prefs);
127
128 match conn_timeout {
129 Some(t) => {
130 let timeout = Timer::after(t);
131 pin_mut!(timeout);
132 pin_mut!(connect);
133
134 match select(connect, timeout).await {
135 Either::Left((Ok(stream), _)) => Ok(stream),
136
137 Either::Left((Err(e), _)) => {
138 warn!(target: "net::tor::do_dial", "{}", e.report());
139 Err(io::Error::other("Internal Tor error, see logged warning"))
140 }
141
142 Either::Right((_, _)) => Err(io::ErrorKind::TimedOut.into()),
143 }
144 }
145
146 None => {
147 match connect.await {
148 Ok(stream) => Ok(stream),
149 Err(e) => {
150 warn!(target: "net::tor::do_dial", "{}", e.report());
155 Err(io::Error::other("Internal Tor error, see logged warning"))
156 }
157 }
158 }
159 }
160 }
161}
162
163#[derive(Clone, Debug)]
165pub struct TorListener {
166 datastore: Option<String>,
167 pub endpoint: Arc<OnceCell<Url>>,
168}
169
170impl TorListener {
171 pub async fn new(datastore: Option<String>) -> io::Result<Self> {
173 Ok(Self { datastore, endpoint: Arc::new(OnceCell::new()) })
174 }
175
176 pub(crate) async fn do_listen(&self, port: u16) -> io::Result<TorListenerIntern> {
178 let client = match TOR_CLIENT
181 .get_or_try_init(|| async {
182 debug!(target: "net::tor::do_listen", "Bootstrapping...");
183 if let Some(datadir) = &self.datastore {
184 let datadir = expand_path(datadir).unwrap();
185 let arti_data = datadir.join("arti-data");
186 let arti_cache = datadir.join("arti-cache");
187
188 if arti_data.exists() {
191 remove_dir_all(&arti_data).unwrap();
192 }
193 if arti_cache.exists() {
194 remove_dir_all(&arti_cache).unwrap();
195 }
196
197 let config = TorClientConfigBuilder::from_directories(arti_data, arti_cache)
198 .build()
199 .unwrap();
200
201 TorClient::create_bootstrapped(config).await
202 } else {
203 TorClient::builder().create_bootstrapped().await
204 }
205 })
206 .await
207 {
208 Ok(client) => client.isolated_client(),
209 Err(e) => {
210 warn!(target: "net::tor::do_listen", "{}", e.report());
211 return Err(io::Error::other("Internal Tor error, see logged warning"));
212 }
213 };
214
215 let hs_nick = HsNickname::new("darkfi_tor".to_string()).unwrap();
216
217 let hs_config = match OnionServiceConfigBuilder::default().nickname(hs_nick).build() {
218 Ok(v) => v,
219 Err(e) => {
220 error!(
221 target: "net::tor::do_listen",
222 "[P2P] Failed to create OnionServiceConfig: {e}"
223 );
224 return Err(io::Error::other("Internal Tor error"));
225 }
226 };
227
228 let (onion_service, rendreq_stream) = match client.launch_onion_service(hs_config) {
229 Ok(Some(v)) => v,
230 Ok(None) => {
231 error!(
232 target: "net::tor::do_listen",
233 "[P2P] Onion service disabled in config",
234 );
235 return Err(io::Error::other("Internal Tor error"));
236 }
237 Err(e) => {
238 error!(
239 target: "net::tor::do_listen",
240 "[P2P] Failed to launch Onion Service: {e}"
241 );
242 return Err(io::Error::other("Internal Tor error"));
243 }
244 };
245
246 let onion_id =
247 base32::encode(false, onion_service.onion_address().unwrap().as_ref()).to_lowercase();
248
249 verbose!(
250 target: "net::tor::do_listen",
251 "[P2P] Established Tor listener on tor://{}:{port}", onion_id,
252 );
253
254 let endpoint = Url::parse(&format!("tor://{onion_id}:{port}")).unwrap();
255 self.endpoint.set(endpoint).await.expect("fatal endpoint already set for TorListener");
256
257 Ok(TorListenerIntern {
258 port,
259 _onion_service: onion_service,
260 rendreq_stream: AsyncMutex::new(Box::pin(rendreq_stream)),
261 })
262 }
263}
264
265pub struct TorListenerIntern {
267 port: u16,
268 _onion_service: Arc<RunningOnionService>,
269 rendreq_stream: AsyncMutex<Pin<Box<dyn Stream<Item = RendRequest> + Send>>>,
271}
272
273unsafe impl Sync for TorListenerIntern {}
274
275#[async_trait]
276impl PtListener for TorListenerIntern {
277 async fn next(&self) -> io::Result<(Box<dyn PtStream>, Url)> {
278 let mut rendreq_stream = self.rendreq_stream.lock().await;
279
280 let Some(rendrequest) = rendreq_stream.next().await else {
281 return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"));
282 };
283
284 drop(rendreq_stream);
285
286 let mut streamreq_stream = match rendrequest.accept().await {
287 Ok(v) => v,
288 Err(e) => {
289 error!(
290 target: "net::tor::PtListener::next",
291 "[P2P] Failed accepting Tor RendRequest: {e}"
292 );
293 return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"));
294 }
295 };
296
297 let Some(streamrequest) = streamreq_stream.next().await else {
298 return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"));
299 };
300
301 match streamrequest.request() {
303 IncomingStreamRequest::Begin(begin) => {
304 if begin.port() != self.port {
305 return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"));
306 }
307 }
308 &_ => return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted")),
309 }
310
311 let stream = match streamrequest.accept(Connected::new_empty()).await {
312 Ok(v) => v,
313 Err(e) => {
314 error!(
315 target: "net::tor::PtListener::next",
316 "[P2P] Failed accepting Tor StreamRequest: {e}"
317 );
318 return Err(io::Error::other("Internal Tor error"));
319 }
320 };
321
322 Ok((Box::new(stream), Url::parse(&format!("tor://127.0.0.1:{}", self.port)).unwrap()))
323 }
324}