1use std::{
28 collections::{HashMap, HashSet},
29 net::IpAddr,
30 sync::{Arc, LazyLock},
31 time::{Duration, UNIX_EPOCH},
32};
33
34use async_trait::async_trait;
35use darkfi_serial::{SerialDecodable, SerialEncodable};
36use rand::{rngs::OsRng, Rng};
37use smol::{lock::Mutex as AsyncMutex, Executor};
38use tracing::{debug, info, warn};
39use url::Url;
40
41use crate::{
42 impl_p2p_message,
43 net::{
44 hosts::HostsPtr, metering::MeteringConfiguration, ChannelPtr, Message, MessageSubscription,
45 P2pPtr, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr,
46 },
47 system::{sleep, timeout::timeout},
48 util::time::NanoTimestamp,
49 Error, Result,
50};
51
52const PROTO_NAME: &str = "ProtocolHolepunch";
53const ALLOWED_SCHEME: &str = "quic";
54
55const CONNECT_VALIDITY_MS: u64 = 5000;
57
58const MAX_CLOCK_SKEW_MS: u64 = 2000;
60
61const COORDINATION_DELAY_MS: u64 = 500;
63
64const MAX_PENDING_PER_PEER: usize = 5;
66
67const NONCE_EXPIRY_SECS: u64 = 60;
69
70pub const HOLEPUNCH_MAX_BYTES: u64 = 1024;
71pub const HOLEPUNCH_METERING: MeteringConfiguration = MeteringConfiguration {
72 threshold: 10,
73 sleep_step: 1000,
74 expiry_time: NanoTimestamp::from_secs(10),
75};
76
77#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
79pub struct HolepunchRequest {
80 pub nonce: u64,
81 pub target_addr: Url,
82 pub our_addrs: Vec<Url>,
83}
84
85impl_p2p_message!(HolepunchRequest, "hpreq", HOLEPUNCH_MAX_BYTES, 1, HOLEPUNCH_METERING);
86
87#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
89pub struct HolepunchConnect {
90 pub nonce: u64,
91 pub peer_addr: Url,
92 pub observed_addr: Url,
93 pub connect_at: u64,
94}
95
96impl_p2p_message!(HolepunchConnect, "hpconn", HOLEPUNCH_MAX_BYTES, 1, HOLEPUNCH_METERING);
97
98#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
100pub struct HolepunchResult {
101 pub nonce: u64,
102 pub success: bool,
103 pub error: Option<String>,
104}
105
106impl_p2p_message!(HolepunchResult, "hpres", HOLEPUNCH_MAX_BYTES, 1, HOLEPUNCH_METERING);
107
108static INITIATOR_NONCES: LazyLock<AsyncMutex<HashSet<u64>>> =
110 LazyLock::new(|| AsyncMutex::new(HashSet::new()));
111
112struct UsedNonce {
113 nonce: u64,
114 timestamp: u64,
115}
116
117pub struct ProtocolHolepunch {
118 channel: ChannelPtr,
119 request_sub: MessageSubscription<HolepunchRequest>,
120 connect_sub: MessageSubscription<HolepunchConnect>,
121 _result_sub: MessageSubscription<HolepunchResult>,
122 hosts: HostsPtr,
123 p2p: P2pPtr,
124 jobsman: ProtocolJobsManagerPtr,
125 used_nonces: AsyncMutex<Vec<UsedNonce>>,
127 pending_count: AsyncMutex<HashMap<IpAddr, usize>>,
129}
130
131impl ProtocolHolepunch {
132 pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
133 let request_sub = channel
134 .subscribe_msg::<HolepunchRequest>()
135 .await
136 .expect("missing HolepunchRequest dispatcher");
137
138 let connect_sub = channel
139 .subscribe_msg::<HolepunchConnect>()
140 .await
141 .expect("missing HolepunchConnect dispatcher");
142
143 let result_sub = channel
144 .subscribe_msg::<HolepunchResult>()
145 .await
146 .expect("missing HolepunchResult dispatcher");
147
148 Arc::new(Self {
149 channel: channel.clone(),
150 request_sub,
151 connect_sub,
152 _result_sub: result_sub,
153 hosts: p2p.hosts(),
154 p2p,
155 jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
156 used_nonces: AsyncMutex::new(Vec::new()),
157 pending_count: AsyncMutex::new(HashMap::new()),
158 })
159 }
160
161 fn is_quic(addr: &Url) -> bool {
162 addr.scheme() == ALLOWED_SCHEME
163 }
164
165 fn get_ip(addr: &Url) -> Option<IpAddr> {
166 addr.host_str().and_then(|h| h.parse().ok())
167 }
168
169 fn validate_connect_time(connect_at: u64) -> bool {
170 let now = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
171 if connect_at > now + CONNECT_VALIDITY_MS {
173 return false
174 }
175 if now > connect_at + CONNECT_VALIDITY_MS + MAX_CLOCK_SKEW_MS {
177 return false
178 }
179
180 true
181 }
182
183 fn get_observed_addr(channel: &ChannelPtr) -> Option<Url> {
185 let version = channel.version.get()?;
186 let addr = version.resolve_recv_addr.clone().unwrap_or(version.connect_recv_addr.clone());
187
188 if Self::is_quic(&addr) {
189 return Some(addr)
190 }
191
192 let mut quic_addr = addr;
195 quic_addr.set_scheme(ALLOWED_SCHEME).ok()?;
196 Some(quic_addr)
197 }
198
199 async fn check_nonce(&self, nonce: u64) -> bool {
200 let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
201 let mut used = self.used_nonces.lock().await;
202
203 used.retain(|n| now - n.timestamp < NONCE_EXPIRY_SECS);
205
206 if used.iter().any(|n| n.nonce == nonce) {
208 return false
209 }
210
211 used.push(UsedNonce { nonce, timestamp: now });
212 true
213 }
214
215 async fn cleanup_nonces(&self) {
216 let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
217 let mut used = self.used_nonces.lock().await;
218 used.retain(|n| now - n.timestamp < NONCE_EXPIRY_SECS);
219 }
220
221 async fn check_rate_limit(&self, ip: IpAddr) -> bool {
222 let pending = self.pending_count.lock().await;
223 pending.get(&ip).copied().unwrap_or(0) < MAX_PENDING_PER_PEER
224 }
225
226 async fn inc_pending(&self, ip: IpAddr) {
227 let mut pending = self.pending_count.lock().await;
228 *pending.entry(ip).or_insert(0) += 1;
229 }
230
231 async fn dec_pending(&self, ip: IpAddr) {
232 let mut pending = self.pending_count.lock().await;
233 if let Some(count) = pending.get_mut(&ip) {
234 *count = count.saturating_sub(1);
235 if *count == 0 {
236 pending.remove(&ip);
237 }
238 }
239 }
240
241 fn verify_claimed_addrs(&self, claimed: &[Url]) -> bool {
242 if claimed.is_empty() {
243 return true
244 }
245
246 let observed_ip = Self::get_ip(self.channel.address());
247
248 for addr in claimed {
250 if Self::get_ip(addr) == observed_ip {
251 return true
252 }
253 }
254
255 if let Some(version) = self.channel.version.get() {
257 for ext in &version.ext_send_addr {
258 if claimed.contains(ext) {
259 return true
260 }
261 }
262 }
263
264 false
265 }
266
267 fn find_target_channel(&self, target: &Url) -> Option<ChannelPtr> {
268 for channel in self.hosts.channels() {
269 if !Self::is_quic(channel.address()) {
270 continue
271 }
272
273 if let Some(version) = channel.version.get() {
274 if version.ext_send_addr.contains(target) {
275 return Some(channel);
276 }
277 }
278
279 if channel.address() == target {
280 return Some(channel);
281 }
282 }
283
284 None
285 }
286
287 async fn handle_relay_requests(self: Arc<Self>) -> Result<()> {
288 if !Self::is_quic(self.channel.address()) {
290 loop {
292 let _ = self.request_sub.receive().await?;
293 }
294 }
295
296 loop {
297 let req = self.request_sub.receive().await?;
298
299 if !Self::is_quic(&req.target_addr) {
301 continue
302 }
303
304 if !self.check_nonce(req.nonce).await {
306 warn!(
307 target: "net::protocol_holepunch::handle_relay_requests",
308 "[QUIC-NAT-RELAY] Rejecting: nonce replay",
309 );
310 continue
311 }
312
313 if !self.verify_claimed_addrs(&req.our_addrs) {
315 warn!(
316 target: "net::protocol_holepunch::handle_relay_requests",
317 "[QUIC-NAT-RELAY] Rejecting: addr verification failed",
318 );
319 continue
320 }
321
322 let Some(peer_ip) = Self::get_ip(self.channel.address()) else { continue };
324 if !self.check_rate_limit(peer_ip).await {
325 warn!(
326 target: "net::protocol_holepunch::handle_relay_requests",
327 "[QUIC-NAT-RELAY] Rejecting: ratelimit for {}", peer_ip,
328 );
329 continue
330 }
331
332 let Some(target_chan) = self.find_target_channel(&req.target_addr) else {
334 let _ = self
335 .channel
336 .send(&HolepunchResult {
337 nonce: req.nonce,
338 success: false,
339 error: Some("not connected to target".into()),
340 })
341 .await;
342 continue
343 };
344
345 let Some(requester_observed) = Self::get_observed_addr(&self.channel) else { continue };
347 let Some(target_observed) = Self::get_observed_addr(&target_chan) else { continue };
348
349 if !Self::is_quic(&requester_observed) || !Self::is_quic(&target_observed) {
351 continue
352 }
353
354 self.inc_pending(peer_ip).await;
355
356 let connect_at =
357 UNIX_EPOCH.elapsed().unwrap().as_millis() as u64 + COORDINATION_DELAY_MS;
358
359 let to_requester = HolepunchConnect {
361 nonce: req.nonce,
362 peer_addr: req.target_addr.clone(),
363 observed_addr: target_observed,
364 connect_at,
365 };
366
367 if self.channel.send(&to_requester).await.is_err() {
368 self.dec_pending(peer_ip).await;
369 continue
370 }
371
372 let to_target = HolepunchConnect {
374 nonce: req.nonce,
375 peer_addr: self.channel.address().clone(),
376 observed_addr: requester_observed,
377 connect_at,
378 };
379
380 if target_chan.send(&to_target).await.is_err() {
381 self.dec_pending(peer_ip).await;
382 continue
383 }
384
385 info!(
386 target: "net::protocol_holepunch::handle_relay_requests",
387 "[QUIC-NAT-RELAY] Relayed punch {} <-> {}",
388 self.channel.display_address(),
389 target_chan.display_address(),
390 );
391
392 let self_ = self.clone();
394 self.p2p
395 .executor()
396 .spawn(async move {
397 sleep(COORDINATION_DELAY_MS / 1000 + 1).await;
398 self_.dec_pending(peer_ip).await;
399 })
400 .detach();
401 }
402 }
403
404 async fn handle_connect_instructions(self: Arc<Self>) -> Result<()> {
405 if !Self::is_quic(self.channel.address()) {
406 loop {
407 let _ = self.connect_sub.receive().await?;
408 }
409 }
410
411 loop {
412 let conn = self.connect_sub.receive().await?;
413
414 if !Self::is_quic(&conn.observed_addr) || !Self::validate_connect_time(conn.connect_at)
416 {
417 continue
418 }
419
420 if INITIATOR_NONCES.lock().await.contains(&conn.nonce) {
422 continue
423 }
424
425 let p2p = self.p2p.clone();
427 let observed = conn.observed_addr.clone();
428 let connect_at = conn.connect_at;
429
430 p2p.executor()
431 .spawn(async move {
432 let now = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
434 if connect_at > now {
435 smol::Timer::after(Duration::from_millis(connect_at - now)).await;
436 }
437
438 match p2p.session_direct().get_channel(&observed).await {
440 Ok(chan) => {
441 info!(
442 target: "net::protocol_holepunch::handle_connect_instructions",
443 "[QUIC-NAT-CONNECT] Punch succeeded: {}", chan.display_address(),
444 );
445 }
446 Err(e) => {
447 debug!(
448 target: "net::protocol_holepunch::handle_connect_instructions",
449 "[QUIC-NAT-CONNECT] Punch failed to {}: {}", observed, e,
450 );
451 }
452 }
453 })
454 .detach();
455 }
456 }
457
458 async fn nonce_cleanup_loop(self: Arc<Self>) -> Result<()> {
459 loop {
460 sleep(NONCE_EXPIRY_SECS).await;
461 self.cleanup_nonces().await;
462 }
463 }
464
465 pub async fn initiate_punch(
467 p2p: P2pPtr,
468 target: &Url,
469 relay: &ChannelPtr,
470 ) -> Result<ChannelPtr> {
471 if !Self::is_quic(target) {
473 return Err(Error::UnsupportedTransport(format!(
474 "Target must be QUIC: {}",
475 target.scheme(),
476 )));
477 }
478
479 if !Self::is_quic(relay.address()) {
480 return Err(Error::UnsupportedTransport(format!(
481 "Relay must be QUIC: {}",
482 relay.address().scheme()
483 )));
484 }
485
486 let nonce: u64 = OsRng.gen();
488 INITIATOR_NONCES.lock().await.insert(nonce);
489
490 let result = Self::do_initiate_punch(p2p, target, relay, nonce).await;
492 INITIATOR_NONCES.lock().await.remove(&nonce);
493 result
494 }
495
496 async fn do_initiate_punch(
497 p2p: P2pPtr,
498 target: &Url,
499 relay: &ChannelPtr,
500 nonce: u64,
501 ) -> Result<ChannelPtr> {
502 let our_addrs: Vec<Url> =
505 p2p.hosts().external_addrs().await.into_iter().filter(Self::is_quic).collect();
506
507 let connect_sub =
509 relay.subscribe_msg::<HolepunchConnect>().await.map_err(|_| Error::ChannelStopped)?;
510
511 relay.send(&HolepunchRequest { nonce, target_addr: target.clone(), our_addrs }).await?;
513
514 let conn = loop {
516 let msg = timeout(Duration::from_millis(CONNECT_VALIDITY_MS), connect_sub.receive())
517 .await
518 .map_err(|_| Error::ChannelTimeout)??;
519
520 if msg.nonce == nonce {
522 break msg;
523 }
524 };
525
526 if !Self::is_quic(&conn.observed_addr) {
528 return Err(Error::UnsupportedTransport("Response addr not QUIC".into()));
529 }
530 if !Self::validate_connect_time(conn.connect_at) {
531 return Err(Error::ChannelTimeout);
532 }
533
534 let now = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
536 if conn.connect_at > now {
537 smol::Timer::after(Duration::from_millis(conn.connect_at - now)).await;
538 }
539
540 p2p.session_direct().get_channel(&conn.observed_addr).await
542 }
543}
544
545#[async_trait]
546impl ProtocolBase for ProtocolHolepunch {
547 async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
548 debug!(target: "net::protocol_holepunch", "Starting on {}", self.channel.display_address());
549 self.jobsman.clone().start(ex.clone());
550 self.jobsman.clone().spawn(self.clone().handle_relay_requests(), ex.clone()).await;
551 self.jobsman.clone().spawn(self.clone().handle_connect_instructions(), ex.clone()).await;
552 self.jobsman.spawn(self.clone().nonce_cleanup_loop(), ex).await;
553 Ok(())
554 }
555
556 fn name(&self) -> &'static str {
557 PROTO_NAME
558 }
559}