darkfi/net/protocol/
protocol_holepunch.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! NAT hole punching protocol (QUIC-only)
20//!
21//! 1. Peer A wants to connect to peer C (both behind NATs)
22//! 2. A finds mutual peer B connected to both
23//! 3. A sends `HolepunchRequest` to B
24//! 4. B sends `HolepunchConnect` to both A and C with synchronized timing
25//! 5. A and C simultaneously connect to each other's observed addresses
26
27use std::{
28    collections::{HashMap, HashSet},
29    net::IpAddr,
30    sync::{Arc, LazyLock},
31    time::{Duration, UNIX_EPOCH},
32};
33
34use async_trait::async_trait;
35use darkfi_serial::{SerialDecodable, SerialEncodable};
36use rand::{rngs::OsRng, Rng};
37use smol::{lock::Mutex as AsyncMutex, Executor};
38use tracing::{debug, info, warn};
39use url::Url;
40
41use crate::{
42    impl_p2p_message,
43    net::{
44        hosts::HostsPtr, metering::MeteringConfiguration, ChannelPtr, Message, MessageSubscription,
45        P2pPtr, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr,
46    },
47    system::{sleep, timeout::timeout},
48    util::time::NanoTimestamp,
49    Error, Result,
50};
51
52const PROTO_NAME: &str = "ProtocolHolepunch";
53const ALLOWED_SCHEME: &str = "quic";
54
55/// Maximum time window in ms for a connection instruction to be valid.
56const CONNECT_VALIDITY_MS: u64 = 5000;
57
58/// Maximum clock skew allowed between peers in ms
59const MAX_CLOCK_SKEW_MS: u64 = 2000;
60
61/// Delay before simultaneous connection attempt in ms
62const COORDINATION_DELAY_MS: u64 = 500;
63
64/// Maximum pending holepunch requests per peer IP
65const MAX_PENDING_PER_PEER: usize = 5;
66
67/// Nonce expiry time for replay protection
68const NONCE_EXPIRY_SECS: u64 = 60;
69
70pub const HOLEPUNCH_MAX_BYTES: u64 = 1024;
71pub const HOLEPUNCH_METERING: MeteringConfiguration = MeteringConfiguration {
72    threshold: 10,
73    sleep_step: 1000,
74    expiry_time: NanoTimestamp::from_secs(10),
75};
76
77/// Request a peer to relay a holepunch
78#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
79pub struct HolepunchRequest {
80    pub nonce: u64,
81    pub target_addr: Url,
82    pub our_addrs: Vec<Url>,
83}
84
85impl_p2p_message!(HolepunchRequest, "hpreq", HOLEPUNCH_MAX_BYTES, 1, HOLEPUNCH_METERING);
86
87/// Instruction to attempt a holepunch connection
88#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
89pub struct HolepunchConnect {
90    pub nonce: u64,
91    pub peer_addr: Url,
92    pub observed_addr: Url,
93    pub connect_at: u64,
94}
95
96impl_p2p_message!(HolepunchConnect, "hpconn", HOLEPUNCH_MAX_BYTES, 1, HOLEPUNCH_METERING);
97
98/// Result of a holepunch attempt
99#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
100pub struct HolepunchResult {
101    pub nonce: u64,
102    pub success: bool,
103    pub error: Option<String>,
104}
105
106impl_p2p_message!(HolepunchResult, "hpres", HOLEPUNCH_MAX_BYTES, 1, HOLEPUNCH_METERING);
107
108/// Tracks nonces for active `initiate_punch()` calls.
109static INITIATOR_NONCES: LazyLock<AsyncMutex<HashSet<u64>>> =
110    LazyLock::new(|| AsyncMutex::new(HashSet::new()));
111
112struct UsedNonce {
113    nonce: u64,
114    timestamp: u64,
115}
116
117pub struct ProtocolHolepunch {
118    channel: ChannelPtr,
119    request_sub: MessageSubscription<HolepunchRequest>,
120    connect_sub: MessageSubscription<HolepunchConnect>,
121    _result_sub: MessageSubscription<HolepunchResult>,
122    hosts: HostsPtr,
123    p2p: P2pPtr,
124    jobsman: ProtocolJobsManagerPtr,
125    /// Recently used nonces for replay protection
126    used_nonces: AsyncMutex<Vec<UsedNonce>>,
127    /// Pending requests per peer IP for ratelimiting
128    pending_count: AsyncMutex<HashMap<IpAddr, usize>>,
129}
130
131impl ProtocolHolepunch {
132    pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
133        let request_sub = channel
134            .subscribe_msg::<HolepunchRequest>()
135            .await
136            .expect("missing HolepunchRequest dispatcher");
137
138        let connect_sub = channel
139            .subscribe_msg::<HolepunchConnect>()
140            .await
141            .expect("missing HolepunchConnect dispatcher");
142
143        let result_sub = channel
144            .subscribe_msg::<HolepunchResult>()
145            .await
146            .expect("missing HolepunchResult dispatcher");
147
148        Arc::new(Self {
149            channel: channel.clone(),
150            request_sub,
151            connect_sub,
152            _result_sub: result_sub,
153            hosts: p2p.hosts(),
154            p2p,
155            jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
156            used_nonces: AsyncMutex::new(Vec::new()),
157            pending_count: AsyncMutex::new(HashMap::new()),
158        })
159    }
160
161    fn is_quic(addr: &Url) -> bool {
162        addr.scheme() == ALLOWED_SCHEME
163    }
164
165    fn get_ip(addr: &Url) -> Option<IpAddr> {
166        addr.host_str().and_then(|h| h.parse().ok())
167    }
168
169    fn validate_connect_time(connect_at: u64) -> bool {
170        let now = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
171        // Not too far in future
172        if connect_at > now + CONNECT_VALIDITY_MS {
173            return false
174        }
175        // Not already expired (with clock skew)
176        if now > connect_at + CONNECT_VALIDITY_MS + MAX_CLOCK_SKEW_MS {
177            return false
178        }
179
180        true
181    }
182
183    /// Get peer's observed addr from version message ensuring QUIC scheme
184    fn get_observed_addr(channel: &ChannelPtr) -> Option<Url> {
185        let version = channel.version.get()?;
186        let addr = version.resolve_recv_addr.clone().unwrap_or(version.connect_recv_addr.clone());
187
188        if Self::is_quic(&addr) {
189            return Some(addr)
190        }
191
192        // Try converting to QUIC scheme
193        // TODO: This needs to be added to Url crate
194        let mut quic_addr = addr;
195        quic_addr.set_scheme(ALLOWED_SCHEME).ok()?;
196        Some(quic_addr)
197    }
198
199    async fn check_nonce(&self, nonce: u64) -> bool {
200        let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
201        let mut used = self.used_nonces.lock().await;
202
203        // Clean expired
204        used.retain(|n| now - n.timestamp < NONCE_EXPIRY_SECS);
205
206        // Check replay
207        if used.iter().any(|n| n.nonce == nonce) {
208            return false
209        }
210
211        used.push(UsedNonce { nonce, timestamp: now });
212        true
213    }
214
215    async fn cleanup_nonces(&self) {
216        let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
217        let mut used = self.used_nonces.lock().await;
218        used.retain(|n| now - n.timestamp < NONCE_EXPIRY_SECS);
219    }
220
221    async fn check_rate_limit(&self, ip: IpAddr) -> bool {
222        let pending = self.pending_count.lock().await;
223        pending.get(&ip).copied().unwrap_or(0) < MAX_PENDING_PER_PEER
224    }
225
226    async fn inc_pending(&self, ip: IpAddr) {
227        let mut pending = self.pending_count.lock().await;
228        *pending.entry(ip).or_insert(0) += 1;
229    }
230
231    async fn dec_pending(&self, ip: IpAddr) {
232        let mut pending = self.pending_count.lock().await;
233        if let Some(count) = pending.get_mut(&ip) {
234            *count = count.saturating_sub(1);
235            if *count == 0 {
236                pending.remove(&ip);
237            }
238        }
239    }
240
241    fn verify_claimed_addrs(&self, claimed: &[Url]) -> bool {
242        if claimed.is_empty() {
243            return true
244        }
245
246        let observed_ip = Self::get_ip(self.channel.address());
247
248        // Check if any claimed IP matches observed
249        for addr in claimed {
250            if Self::get_ip(addr) == observed_ip {
251                return true
252            }
253        }
254
255        // Check against version message external addrs
256        if let Some(version) = self.channel.version.get() {
257            for ext in &version.ext_send_addr {
258                if claimed.contains(ext) {
259                    return true
260                }
261            }
262        }
263
264        false
265    }
266
267    fn find_target_channel(&self, target: &Url) -> Option<ChannelPtr> {
268        for channel in self.hosts.channels() {
269            if !Self::is_quic(channel.address()) {
270                continue
271            }
272
273            if let Some(version) = channel.version.get() {
274                if version.ext_send_addr.contains(target) {
275                    return Some(channel);
276                }
277            }
278
279            if channel.address() == target {
280                return Some(channel);
281            }
282        }
283
284        None
285    }
286
287    async fn handle_relay_requests(self: Arc<Self>) -> Result<()> {
288        // Only process on QUIC channels
289        if !Self::is_quic(self.channel.address()) {
290            // TODO: Make sure this does not hang around when chan is dropped.
291            loop {
292                let _ = self.request_sub.receive().await?;
293            }
294        }
295
296        loop {
297            let req = self.request_sub.receive().await?;
298
299            // Validate target scheme
300            if !Self::is_quic(&req.target_addr) {
301                continue
302            }
303
304            // Replay protection
305            if !self.check_nonce(req.nonce).await {
306                warn!(
307                    target: "net::protocol_holepunch::handle_relay_requests",
308                    "[QUIC-NAT-RELAY] Rejecting: nonce replay",
309                );
310                continue
311            }
312
313            // Address verification
314            if !self.verify_claimed_addrs(&req.our_addrs) {
315                warn!(
316                    target: "net::protocol_holepunch::handle_relay_requests",
317                    "[QUIC-NAT-RELAY] Rejecting: addr verification failed",
318                );
319                continue
320            }
321
322            // Rate limiting
323            let Some(peer_ip) = Self::get_ip(self.channel.address()) else { continue };
324            if !self.check_rate_limit(peer_ip).await {
325                warn!(
326                    target: "net::protocol_holepunch::handle_relay_requests",
327                    "[QUIC-NAT-RELAY] Rejecting: ratelimit for {}", peer_ip,
328                );
329                continue
330            }
331
332            // Find target channel
333            let Some(target_chan) = self.find_target_channel(&req.target_addr) else {
334                let _ = self
335                    .channel
336                    .send(&HolepunchResult {
337                        nonce: req.nonce,
338                        success: false,
339                        error: Some("not connected to target".into()),
340                    })
341                    .await;
342                continue
343            };
344
345            // Get observed addrs
346            let Some(requester_observed) = Self::get_observed_addr(&self.channel) else { continue };
347            let Some(target_observed) = Self::get_observed_addr(&target_chan) else { continue };
348
349            // Both must be QUIC
350            if !Self::is_quic(&requester_observed) || !Self::is_quic(&target_observed) {
351                continue
352            }
353
354            self.inc_pending(peer_ip).await;
355
356            let connect_at =
357                UNIX_EPOCH.elapsed().unwrap().as_millis() as u64 + COORDINATION_DELAY_MS;
358
359            // Send to requester
360            let to_requester = HolepunchConnect {
361                nonce: req.nonce,
362                peer_addr: req.target_addr.clone(),
363                observed_addr: target_observed,
364                connect_at,
365            };
366
367            if self.channel.send(&to_requester).await.is_err() {
368                self.dec_pending(peer_ip).await;
369                continue
370            }
371
372            // Send to target
373            let to_target = HolepunchConnect {
374                nonce: req.nonce,
375                peer_addr: self.channel.address().clone(),
376                observed_addr: requester_observed,
377                connect_at,
378            };
379
380            if target_chan.send(&to_target).await.is_err() {
381                self.dec_pending(peer_ip).await;
382                continue
383            }
384
385            info!(
386                target: "net::protocol_holepunch::handle_relay_requests",
387                "[QUIC-NAT-RELAY] Relayed punch {} <-> {}",
388                self.channel.display_address(),
389                target_chan.display_address(),
390            );
391
392            // Cleanup rate limit after delay
393            let self_ = self.clone();
394            self.p2p
395                .executor()
396                .spawn(async move {
397                    sleep(COORDINATION_DELAY_MS / 1000 + 1).await;
398                    self_.dec_pending(peer_ip).await;
399                })
400                .detach();
401        }
402    }
403
404    async fn handle_connect_instructions(self: Arc<Self>) -> Result<()> {
405        if !Self::is_quic(self.channel.address()) {
406            loop {
407                let _ = self.connect_sub.receive().await?;
408            }
409        }
410
411        loop {
412            let conn = self.connect_sub.receive().await?;
413
414            // Validate
415            if !Self::is_quic(&conn.observed_addr) || !Self::validate_connect_time(conn.connect_at)
416            {
417                continue
418            }
419
420            // Skip if an initiator is handling this nonce
421            if INITIATOR_NONCES.lock().await.contains(&conn.nonce) {
422                continue
423            }
424
425            // Spawn connection attempt
426            let p2p = self.p2p.clone();
427            let observed = conn.observed_addr.clone();
428            let connect_at = conn.connect_at;
429
430            p2p.executor()
431                .spawn(async move {
432                    // Wait until scheduled time
433                    let now = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
434                    if connect_at > now {
435                        smol::Timer::after(Duration::from_millis(connect_at - now)).await;
436                    }
437
438                    // Connect
439                    match p2p.session_direct().get_channel(&observed).await {
440                        Ok(chan) => {
441                            info!(
442                                target: "net::protocol_holepunch::handle_connect_instructions",
443                                "[QUIC-NAT-CONNECT] Punch succeeded: {}", chan.display_address(),
444                            );
445                        }
446                        Err(e) => {
447                            debug!(
448                                target: "net::protocol_holepunch::handle_connect_instructions",
449                                "[QUIC-NAT-CONNECT] Punch failed to {}: {}", observed, e,
450                            );
451                        }
452                    }
453                })
454                .detach();
455        }
456    }
457
458    async fn nonce_cleanup_loop(self: Arc<Self>) -> Result<()> {
459        loop {
460            sleep(NONCE_EXPIRY_SECS).await;
461            self.cleanup_nonces().await;
462        }
463    }
464
465    /// Initiate a holepunch to target via relay.
466    pub async fn initiate_punch(
467        p2p: P2pPtr,
468        target: &Url,
469        relay: &ChannelPtr,
470    ) -> Result<ChannelPtr> {
471        // Validate schemes
472        if !Self::is_quic(target) {
473            return Err(Error::UnsupportedTransport(format!(
474                "Target must be QUIC: {}",
475                target.scheme(),
476            )));
477        }
478
479        if !Self::is_quic(relay.address()) {
480            return Err(Error::UnsupportedTransport(format!(
481                "Relay must be QUIC: {}",
482                relay.address().scheme()
483            )));
484        }
485
486        // Register nonce
487        let nonce: u64 = OsRng.gen();
488        INITIATOR_NONCES.lock().await.insert(nonce);
489
490        // Execute with cleanup
491        let result = Self::do_initiate_punch(p2p, target, relay, nonce).await;
492        INITIATOR_NONCES.lock().await.remove(&nonce);
493        result
494    }
495
496    async fn do_initiate_punch(
497        p2p: P2pPtr,
498        target: &Url,
499        relay: &ChannelPtr,
500        nonce: u64,
501    ) -> Result<ChannelPtr> {
502        // Get our QUIC external addr
503        // TODO: Perhaps STUN, perhaps IP discovery through live peers
504        let our_addrs: Vec<Url> =
505            p2p.hosts().external_addrs().await.into_iter().filter(Self::is_quic).collect();
506
507        // Subscribe to receive connect instruction
508        let connect_sub =
509            relay.subscribe_msg::<HolepunchConnect>().await.map_err(|_| Error::ChannelStopped)?;
510
511        // Send request
512        relay.send(&HolepunchRequest { nonce, target_addr: target.clone(), our_addrs }).await?;
513
514        // Wait for our connect instruction
515        let conn = loop {
516            let msg = timeout(Duration::from_millis(CONNECT_VALIDITY_MS), connect_sub.receive())
517                .await
518                .map_err(|_| Error::ChannelTimeout)??;
519
520            // We're looking for our nonce specifically
521            if msg.nonce == nonce {
522                break msg;
523            }
524        };
525
526        // Validate response
527        if !Self::is_quic(&conn.observed_addr) {
528            return Err(Error::UnsupportedTransport("Response addr not QUIC".into()));
529        }
530        if !Self::validate_connect_time(conn.connect_at) {
531            return Err(Error::ChannelTimeout);
532        }
533
534        // Wait until scheduled time
535        let now = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
536        if conn.connect_at > now {
537            smol::Timer::after(Duration::from_millis(conn.connect_at - now)).await;
538        }
539
540        // Attempt punch
541        p2p.session_direct().get_channel(&conn.observed_addr).await
542    }
543}
544
545#[async_trait]
546impl ProtocolBase for ProtocolHolepunch {
547    async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
548        debug!(target: "net::protocol_holepunch", "Starting on {}", self.channel.display_address());
549        self.jobsman.clone().start(ex.clone());
550        self.jobsman.clone().spawn(self.clone().handle_relay_requests(), ex.clone()).await;
551        self.jobsman.clone().spawn(self.clone().handle_connect_instructions(), ex.clone()).await;
552        self.jobsman.spawn(self.clone().nonce_cleanup_loop(), ex).await;
553        Ok(())
554    }
555
556    fn name(&self) -> &'static str {
557        PROTO_NAME
558    }
559}