darkfi/net/
hosts.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Host management for the P2P network.
20//!
21//! `Hosts` is the main interface managing the registry and container.
22//! Filters addresses before storing and publishes events on host/channel changes.
23//!
24//! `HostRegistry` maps peer addresses to their current `HostState`.
25//!
26//! `HostContainer` stores the hostlists (Grey, White, Gold, Black, Dark) behind a
27//! single lock for atomic cross-list operations.
28//!
29//! # Host Colors
30//!
31//! - `Grey`: Recently received hosts pending refinement.
32//! - `White`: Hosts that passed refinement successfully.
33//! - `Gold`: Hosts we've connected to in OutboundSession.
34//! - `Black`: Hostile hosts, blocked for the program duration.
35//! - `Dark`: Hosts with unsupported transports. Shared with peers but not used locally.
36//!   Cleared daily to avoid propagating stale entries.
37
38use parking_lot::{Mutex, RwLock};
39use rand::{prelude::IteratorRandom, rngs::OsRng, Rng};
40use smol::lock::RwLock as AsyncRwLock;
41use std::{
42    collections::HashMap,
43    fmt, fs,
44    fs::File,
45    net::{IpAddr, Ipv4Addr, Ipv6Addr},
46    sync::{
47        atomic::{AtomicBool, Ordering},
48        Arc,
49    },
50    time::{Instant, UNIX_EPOCH},
51};
52use tracing::{debug, error, warn};
53use url::{Host, Url};
54
55use super::{
56    session::{SESSION_REFINE, SESSION_SEED},
57    settings::Settings,
58    ChannelPtr,
59};
60use crate::{
61    system::{Publisher, PublisherPtr, Subscription},
62    util::{
63        file::{load_file, save_file},
64        logger::verbose,
65        most_frequent_or_any,
66        path::expand_path,
67        ringbuffer::RingBuffer,
68    },
69    Error, Result,
70};
71
72pub const LOCAL_HOST_STRS: [&str; 2] = ["localhost", "localhost.localdomain"];
73
74const WHITELIST_MAX_LEN: usize = 5000;
75const GREYLIST_MAX_LEN: usize = 2000;
76const DARKLIST_MAX_LEN: usize = 1000;
77const BLACKLIST_MAX_LEN: usize = 10000;
78
79/// How long a host can remain in Free state before being pruned from the registry.
80/// 24 hours is appropriate for long-running daemons.
81const REGISTRY_PRUNE_AGE_SECS: u64 = 86400;
82
83pub type HostsPtr = Arc<Hosts>;
84
85/// Mutually exclusive states for host lifecycle management.
86///
87/// ```text
88///                +------+
89///                | free |
90///                +------+
91///                   ^
92///                   |
93///                   v
94///                +------+      +---------+
95///       +------> | move | ---> | suspend |
96///       |        +------+      +---------+
97///       |           |               |        +--------+
98///       |           |               v        | insert |
99///  +---------+      |          +--------+    +--------+
100///  | connect |      |          | refine |        ^
101///  +---------+      |          +--------+        |
102///       |           v               |            v
103///       |     +-----------+         |         +------+
104///       +---> | connected | <-------+-------> | free |
105///             +-----------+                   +------+
106///                   ^
107///                   |
108///                   v
109///                +------+
110///                | free |
111///                +------+
112///
113/// ```
114#[derive(Clone, Debug)]
115pub(crate) enum HostState {
116    /// Being inserted into the hostlist.
117    Insert,
118    /// Being refined (greylist -> whitelist check).
119    Refine,
120    /// Being connected to in Outbound/Manual Session.
121    Connect,
122    /// Failed connection, awaiting refinement.
123    Suspend,
124    /// Successfully connected.
125    Connected(ChannelPtr),
126    /// Moving between hostlists.
127    Move,
128    /// Available for any operation. Contains timestamp when freed.
129    Free(u64),
130}
131
132impl HostState {
133    fn try_transition(&self, target: HostState) -> Result<HostState> {
134        use HostState::*;
135
136        let allowed = matches!(
137            (&target, self),
138            (Insert, Free(_)) |
139                (Refine, Free(_) | Suspend) |
140                (Connect, Free(_)) |
141                (Connected(_), Free(_) | Connect | Refine | Move) |
142                (Move, Free(_) | Connect | Refine | Connected(_)) |
143                (Suspend, Move) |
144                (Free(_), _)
145        );
146
147        if allowed {
148            Ok(target)
149        } else {
150            Err(Error::HostStateBlocked(self.to_string(), target.to_string()))
151        }
152    }
153}
154
155impl fmt::Display for HostState {
156    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157        match self {
158            HostState::Insert => write!(f, "Insert"),
159            HostState::Refine => write!(f, "Refine"),
160            HostState::Connect => write!(f, "Connect"),
161            HostState::Suspend => write!(f, "Suspend"),
162            HostState::Connected(_) => write!(f, "Connected"),
163            HostState::Move => write!(f, "Move"),
164            HostState::Free(_) => write!(f, "Free"),
165        }
166    }
167}
168
169#[repr(u8)]
170#[derive(Clone, Copy, Debug, PartialEq, Eq)]
171pub enum HostColor {
172    /// Intermediary nodes that are periodically probed and updated to White.
173    Grey = 0,
174    /// Recently seen hosts. Shared with other nodes.
175    White = 1,
176    /// Nodes to which we have already been able to establish a connection.
177    Gold = 2,
178    /// Hostile peers that can neither be connected to nor establish
179    /// connections to us for the duration of the program.
180    Black = 3,
181    /// Peers that do not match our accepted transports. We are blind to
182    /// these nodes (we do not use them) but we send them around the network
183    /// anyway to ensure all transports are propagated.
184    Dark = 4,
185}
186
187impl HostColor {
188    const ALL: [HostColor; 5] =
189        [HostColor::Grey, HostColor::White, HostColor::Gold, HostColor::Black, HostColor::Dark];
190
191    fn max_len(self) -> Option<usize> {
192        match self {
193            HostColor::Grey => Some(GREYLIST_MAX_LEN),
194            HostColor::White => Some(WHITELIST_MAX_LEN),
195            HostColor::Dark => Some(DARKLIST_MAX_LEN),
196            HostColor::Black => Some(BLACKLIST_MAX_LEN),
197            HostColor::Gold => None, // Limited by connection slots
198        }
199    }
200
201    fn name(self) -> &'static str {
202        match self {
203            HostColor::Grey => "grey",
204            HostColor::White => "white",
205            HostColor::Gold => "gold",
206            HostColor::Black => "black",
207            HostColor::Dark => "dark",
208        }
209    }
210
211    fn from_name(name: &str) -> Option<Self> {
212        match name {
213            "grey" => Some(HostColor::Grey),
214            "white" => Some(HostColor::White),
215            "gold" => Some(HostColor::Gold),
216            "black" => Some(HostColor::Black),
217            "dark" => Some(HostColor::Dark),
218            _ => None,
219        }
220    }
221}
222
223impl TryFrom<usize> for HostColor {
224    type Error = Error;
225
226    fn try_from(value: usize) -> Result<Self> {
227        HostColor::ALL.get(value).copied().ok_or(Error::InvalidHostColor)
228    }
229}
230
231/// Container for all hostlists. Uses a single lock for atomic cross-list operations.
232pub struct HostContainer {
233    pub(in crate::net) lists: RwLock<[Vec<(Url, u64)>; 5]>,
234}
235
236impl HostContainer {
237    fn new() -> Self {
238        Self { lists: RwLock::new([Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()]) }
239    }
240
241    /// Store or update an address on a hostlist.
242    pub fn store(&self, color: HostColor, addr: Url, last_seen: u64) {
243        let mut lists = self.lists.write();
244        let list = &mut lists[color as usize];
245
246        if let Some(entry) = list.iter_mut().find(|(u, _)| *u == addr) {
247            entry.1 = last_seen;
248        } else {
249            list.push((addr, last_seen));
250        }
251    }
252
253    /// Store, sort by last_seen (descending), and enforce max size.
254    pub fn store_and_trim(&self, color: HostColor, addr: Url, last_seen: u64) {
255        let mut lists = self.lists.write();
256        let list = &mut lists[color as usize];
257
258        if let Some(entry) = list.iter_mut().find(|(u, _)| *u == addr) {
259            entry.1 = last_seen;
260        } else {
261            list.push((addr, last_seen));
262        }
263
264        list.sort_by_key(|e| std::cmp::Reverse(e.1));
265
266        if let Some(max) = color.max_len() {
267            list.truncate(max);
268        }
269    }
270
271    /// Remove an address from a hostlist if it exists.
272    pub fn remove(&self, color: HostColor, addr: &Url) {
273        let mut lists = self.lists.write();
274        lists[color as usize].retain(|(u, _)| u != addr);
275    }
276
277    /// Check if an address exists in a hostlist.
278    pub fn contains(&self, color: HostColor, addr: &Url) -> bool {
279        self.lists.read()[color as usize].iter().any(|(u, _)| u == addr)
280    }
281
282    /// Check if an address exists in any of the specified hostlists.
283    pub fn contains_any(&self, colors: &[HostColor], addr: &Url) -> bool {
284        let lists = self.lists.read();
285        colors.iter().any(|&c| lists[c as usize].iter().any(|(u, _)| u == addr))
286    }
287
288    /// Check if any host with the given hostname exists in the specified lists.
289    pub fn contains_hostname(&self, colors: &[HostColor], hostname: &str) -> bool {
290        let lists = self.lists.read();
291        colors
292            .iter()
293            .any(|&c| lists[c as usize].iter().any(|(u, _)| u.host_str() == Some(hostname)))
294    }
295
296    /// Check if a hostlist is empty.
297    pub fn is_empty(&self, color: HostColor) -> bool {
298        self.lists.read()[color as usize].is_empty()
299    }
300
301    /// Update the last_seen field for an address.
302    pub fn update_last_seen(&self, color: HostColor, addr: &Url, last_seen: u64) {
303        let mut lists = self.lists.write();
304        if let Some(entry) = lists[color as usize].iter_mut().find(|(u, _)| u == addr) {
305            entry.1 = last_seen;
306        }
307    }
308
309    /// Get the last_seen field for an address.
310    pub fn get_last_seen(&self, color: HostColor, addr: &Url) -> Option<u64> {
311        self.lists.read()[color as usize].iter().find(|(u, _)| u == addr).map(|(_, ls)| *ls)
312    }
313
314    /// Return all hosts from a hostlist.
315    pub fn fetch_all(&self, color: HostColor) -> Vec<(Url, u64)> {
316        self.lists.read()[color as usize].clone()
317    }
318
319    /// Get the oldest entry (last in sorted list) from a hostlist.
320    pub fn fetch_last(&self, color: HostColor) -> Option<(Url, u64)> {
321        self.lists.read()[color as usize].last().cloned()
322    }
323
324    /// Get hosts matching the given transport schemes.
325    pub fn fetch_with_schemes(
326        &self,
327        color: HostColor,
328        schemes: &[String],
329        limit: Option<usize>,
330    ) -> Vec<(Url, u64)> {
331        let lists = self.lists.read();
332        lists[color as usize]
333            .iter()
334            .filter(|(addr, _)| schemes.contains(&addr.scheme().to_string()))
335            .take(limit.unwrap_or(usize::MAX))
336            .cloned()
337            .collect()
338    }
339
340    /// Get hosts NOT matching the given transport schemes.
341    pub fn fetch_excluding_schemes(
342        &self,
343        color: HostColor,
344        schemes: &[String],
345        limit: Option<usize>,
346    ) -> Vec<(Url, u64)> {
347        let lists = self.lists.read();
348        lists[color as usize]
349            .iter()
350            .filter(|(addr, _)| !schemes.contains(&addr.scheme().to_string()))
351            .take(limit.unwrap_or(usize::MAX))
352            .cloned()
353            .collect()
354    }
355
356    /// Get a random host matching the given schemes.
357    pub fn fetch_random_with_schemes(
358        &self,
359        color: HostColor,
360        schemes: &[String],
361    ) -> Option<(Url, u64)> {
362        let hosts = self.fetch_with_schemes(color, schemes, None);
363        if hosts.is_empty() {
364            return None
365        }
366        let idx = rand::thread_rng().gen_range(0..hosts.len());
367        Some(hosts[idx].clone())
368    }
369
370    /// Get up to n random hosts.
371    pub fn fetch_n_random(&self, color: HostColor, n: usize) -> Vec<(Url, u64)> {
372        if n == 0 {
373            return vec![]
374        }
375        let lists = self.lists.read();
376        lists[color as usize].iter().cloned().choose_multiple(&mut OsRng, n)
377    }
378
379    /// Get up to n random hosts matching the given schemes.
380    pub fn fetch_n_random_with_schemes(
381        &self,
382        color: HostColor,
383        schemes: &[String],
384        n: usize,
385    ) -> Vec<(Url, u64)> {
386        if n == 0 {
387            return vec![]
388        }
389        let hosts = self.fetch_with_schemes(color, schemes, None);
390        hosts.into_iter().choose_multiple(&mut OsRng, n)
391    }
392
393    /// Get up to n random hosts NOT matching the given schemes.
394    pub fn fetch_n_random_excluding_schemes(
395        &self,
396        color: HostColor,
397        schemes: &[String],
398        n: usize,
399    ) -> Vec<(Url, u64)> {
400        if n == 0 {
401            return vec![]
402        }
403        let hosts = self.fetch_excluding_schemes(color, schemes, None);
404        hosts.into_iter().choose_multiple(&mut OsRng, n)
405    }
406
407    /// Atomically move a host between lists.
408    pub fn move_host(&self, addr: &Url, last_seen: u64, dest: HostColor) -> Result<()> {
409        let mut lists = self.lists.write();
410
411        // Remove from source lists based on destination
412        match dest {
413            HostColor::Grey => {
414                lists[HostColor::Gold as usize].retain(|(u, _)| u != addr);
415                lists[HostColor::White as usize].retain(|(u, _)| u != addr);
416            }
417            HostColor::White => {
418                lists[HostColor::Grey as usize].retain(|(u, _)| u != addr);
419            }
420            HostColor::Gold => {
421                lists[HostColor::Grey as usize].retain(|(u, _)| u != addr);
422                lists[HostColor::White as usize].retain(|(u, _)| u != addr);
423            }
424            HostColor::Black => {
425                lists[HostColor::Grey as usize].retain(|(u, _)| u != addr);
426                lists[HostColor::White as usize].retain(|(u, _)| u != addr);
427                lists[HostColor::Gold as usize].retain(|(u, _)| u != addr);
428            }
429            HostColor::Dark => return Err(Error::InvalidHostColor),
430        }
431
432        // Add to destination
433        let dest_list = &mut lists[dest as usize];
434        if let Some(entry) = dest_list.iter_mut().find(|(u, _)| u == addr) {
435            entry.1 = last_seen;
436        } else {
437            dest_list.push((addr.clone(), last_seen));
438        }
439
440        // Sort and trim
441        dest_list.sort_by_key(|e| std::cmp::Reverse(e.1));
442        if let Some(max) = dest.max_len() {
443            dest_list.truncate(max);
444        }
445
446        Ok(())
447    }
448
449    /// Remove entries older than max_age seconds.
450    pub fn refresh(&self, color: HostColor, max_age: u64) {
451        let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
452        let mut lists = self.lists.write();
453        let original_len = lists[color as usize].len();
454
455        lists[color as usize].retain(|(addr, last_seen)| {
456            // Keep if last_seen is in future (clock skew protection)
457            if now < *last_seen {
458                return true
459            }
460            let age = now - last_seen;
461            if age <= max_age {
462                return true
463            }
464            debug!(target: "net::hosts::refresh", "Removing {addr} (age: {age}s)");
465            false
466        });
467
468        let removed = original_len - lists[color as usize].len();
469        if removed > 0 {
470            debug!(target: "net::hosts::refresh", "Removed {removed} old entries from {:?}", color);
471        }
472    }
473
474    pub fn load_all(&self, path: &str) -> Result<()> {
475        let path = expand_path(path)?;
476
477        if !path.exists() {
478            if let Some(parent) = path.parent() {
479                fs::create_dir_all(parent)?;
480            }
481            File::create(path.clone())?;
482        }
483
484        let contents = match load_file(&path) {
485            Ok(c) => c,
486            Err(e) => {
487                warn!(target: "net::hosts::load_all", "[P2P] Failed retrieving saved hosts: {e}");
488                return Ok(())
489            }
490        };
491
492        let mut lists = self.lists.write();
493
494        for line in contents.lines() {
495            let parts: Vec<&str> = line.split('\t').collect();
496            if parts.len() < 3 {
497                continue;
498            }
499
500            let color = match HostColor::from_name(parts[0]) {
501                Some(c) => c,
502                None => continue,
503            };
504
505            let url = match Url::parse(parts[1]) {
506                Ok(u) => u,
507                Err(_) => continue,
508            };
509
510            let last_seen = match parts[2].parse::<u64>() {
511                Ok(t) => t,
512                Err(_) => continue,
513            };
514
515            let list = &mut lists[color as usize];
516            list.push((url, last_seen));
517            list.sort_by_key(|e| std::cmp::Reverse(e.1));
518
519            if let Some(max) = color.max_len() {
520                list.truncate(max);
521            }
522        }
523
524        // Refresh dark list (remove entries older than one day)
525        drop(lists);
526        self.refresh(HostColor::Dark, 86400);
527
528        Ok(())
529    }
530
531    pub fn save_all(&self, path: &str) -> Result<()> {
532        let path = expand_path(path)?;
533        let lists = self.lists.read();
534
535        let mut tsv = String::new();
536        for color in [HostColor::Dark, HostColor::Grey, HostColor::White, HostColor::Gold] {
537            for (url, last_seen) in &lists[color as usize] {
538                tsv.push_str(&format!("{}\t{}\t{}\n", color.name(), url, last_seen));
539            }
540        }
541
542        if !tsv.is_empty() {
543            verbose!(target: "net::hosts::save_all", "[P2P] Saving hosts to: {path:?}");
544            if let Err(e) = save_file(&path, &tsv) {
545                error!(target: "net::hosts::save_all", "[P2P] Failed saving hosts: {e}");
546            }
547        }
548
549        Ok(())
550    }
551
552    /// Perform transport mixing for a URL, returning alternative connection addresses.
553    pub fn mix_host(
554        addr: &Url,
555        transports: &[String],
556        mixed_transports: &[String],
557        tor_socks5_proxy: &Option<Url>,
558        nym_socks5_proxy: &Option<Url>,
559    ) -> Vec<Url> {
560        if !mixed_transports.contains(&addr.scheme().to_string()) {
561            return vec![]
562        }
563
564        let mut hosts = vec![];
565
566        let mix = |scheme: &str, target: &str, hosts: &mut Vec<Url>| {
567            if transports.contains(&scheme.to_string()) && addr.scheme() == target {
568                let mut url = addr.clone();
569                let _ = url.set_scheme(scheme);
570                hosts.push(url);
571            }
572        };
573
574        let mix_socks5 =
575            |scheme: &str, target: &str, proxies: &[&Option<Url>], hosts: &mut Vec<Url>| {
576                if transports.contains(&scheme.to_string()) && addr.scheme() == target {
577                    for proxy in proxies {
578                        if let Some(base) = proxy.as_ref() {
579                            let mut endpoint = base.clone();
580                            endpoint.set_path(&format!(
581                                "{}:{}",
582                                addr.host().unwrap(),
583                                addr.port().unwrap()
584                            ));
585                            let _ = endpoint.set_scheme(scheme);
586                            hosts.push(endpoint);
587                        }
588                    }
589                }
590            };
591
592        mix("tor", "tcp", &mut hosts);
593        mix("tor+tls", "tcp+tls", &mut hosts);
594        mix("nym", "tcp", &mut hosts);
595        mix("nym+tls", "tcp+tls", &mut hosts);
596
597        mix_socks5("socks5", "tcp", &[tor_socks5_proxy, nym_socks5_proxy], &mut hosts);
598        mix_socks5("socks5+tls", "tcp+tls", &[tor_socks5_proxy, nym_socks5_proxy], &mut hosts);
599        mix_socks5("socks5", "tor", &[tor_socks5_proxy], &mut hosts);
600        mix_socks5("socks5+tls", "tor+tls", &[tor_socks5_proxy], &mut hosts);
601
602        hosts
603    }
604}
605
606/// Main interface for host management.
607pub struct Hosts {
608    /// A registry that tracks hosts and their current state.
609    registry: Mutex<HashMap<Url, HostState>>,
610    /// Hostlists and associated methods
611    pub container: HostContainer,
612    /// Publisher listening for store updates
613    store_publisher: PublisherPtr<usize>,
614    /// Publisher for notifications of new channels
615    pub(crate) channel_publisher: PublisherPtr<Result<ChannelPtr>>,
616    /// Publisher listening for network disconnects
617    pub(crate) disconnect_publisher: PublisherPtr<Error>,
618    /// Keeps track of the last time a connection was made.
619    pub(crate) last_connection: Mutex<Instant>,
620    /// Marker for IPv6 availability
621    pub(crate) ipv6_available: AtomicBool,
622    /// Auto self discovered addresses. Used for filtering self connections.
623    auto_self_addrs: Mutex<RingBuffer<Ipv6Addr, 20>>,
624    /// Pointer to configured P2P settings
625    settings: Arc<AsyncRwLock<Settings>>,
626}
627
628impl Hosts {
629    /// Create a new hosts list
630    pub(crate) fn new(settings: Arc<AsyncRwLock<Settings>>) -> HostsPtr {
631        Arc::new(Self {
632            registry: Mutex::new(HashMap::new()),
633            container: HostContainer::new(),
634            store_publisher: Publisher::new(),
635            channel_publisher: Publisher::new(),
636            disconnect_publisher: Publisher::new(),
637            last_connection: Mutex::new(Instant::now()),
638            ipv6_available: AtomicBool::new(true),
639            auto_self_addrs: Mutex::new(RingBuffer::new()),
640            settings,
641        })
642    }
643
644    /// Try to register a host with a new state.
645    pub(crate) fn try_register(&self, addr: Url, new_state: HostState) -> Result<HostState> {
646        let mut registry = self.registry.lock();
647
648        let result = if let Some(current) = registry.get(&addr) {
649            current.try_transition(new_state)
650        } else {
651            Ok(new_state)
652        };
653
654        if let Ok(ref state) = result {
655            registry.insert(addr, state.clone());
656        }
657
658        result
659    }
660
661    /// Mark a host as Free.
662    pub(crate) fn unregister(&self, addr: &Url) -> Result<()> {
663        let age = UNIX_EPOCH.elapsed().unwrap().as_secs();
664        self.try_register(addr.clone(), HostState::Free(age))?;
665        debug!(target: "net::hosts::unregister", "Unregistered: {addr}");
666        Ok(())
667    }
668
669    /// Prune stale entries from the registry.
670    ///
671    /// Removes hosts that have been in `Free` state longer than `REGISTRY_PRUNE_AGE_SECS`.
672    /// This prevents unbounded growth of the registry over long-running sessions.
673    ///
674    /// Returns the number of entries pruned.
675    pub fn prune_registry(&self) -> usize {
676        let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
677        let mut registry = self.registry.lock();
678        let before = registry.len();
679
680        registry.retain(|url, state| {
681            if let HostState::Free(age) = state {
682                let elapsed = now.saturating_sub(*age);
683                if elapsed > REGISTRY_PRUNE_AGE_SECS {
684                    debug!(
685                        target: "net::hosts::prune_registry",
686                        "Pruning stale entry {url} (idle for {elapsed}s)",
687                    );
688                    return false
689                }
690            }
691            true
692        });
693
694        let pruned = before - registry.len();
695        if pruned > 0 {
696            debug!(target: "net::hosts::prune_registry", "Pruned {pruned} stale entries");
697        }
698        pruned
699    }
700
701    /// Check if a host can be refined.
702    pub fn refinable(&self, addr: &Url) -> bool {
703        let registry = self.registry.lock();
704        match registry.get(addr) {
705            Some(state) => state.try_transition(HostState::Refine).is_ok(),
706            None => true,
707        }
708    }
709
710    /// Return all connected channels.
711    pub fn channels(&self) -> Vec<ChannelPtr> {
712        self.registry
713            .lock()
714            .values()
715            .filter_map(
716                |state| {
717                    if let HostState::Connected(c) = state {
718                        Some(c.clone())
719                    } else {
720                        None
721                    }
722                },
723            )
724            .collect()
725    }
726
727    /// Return connected peers (excluding seed and refinery connections).
728    pub fn peers(&self) -> Vec<ChannelPtr> {
729        self.registry
730            .lock()
731            .values()
732            .filter_map(|state| {
733                if let HostState::Connected(c) = state {
734                    if c.session_type_id() & (SESSION_SEED | SESSION_REFINE) == 0 {
735                        return Some(c.clone())
736                    }
737                }
738                None
739            })
740            .collect()
741    }
742
743    /// Get a channel by ID.
744    pub fn get_channel(&self, id: u32) -> Option<ChannelPtr> {
745        self.channels().into_iter().find(|c| c.info.id == id)
746    }
747
748    /// Get a random connected channel.
749    pub fn random_channel(&self) -> Option<ChannelPtr> {
750        let channels = self.channels();
751        if channels.is_empty() {
752            return None
753        }
754        let idx = rand::thread_rng().gen_range(0..channels.len());
755        Some(channels[idx].clone())
756    }
757
758    /// Return suspended hosts.
759    pub(crate) fn suspended(&self) -> Vec<Url> {
760        self.registry
761            .lock()
762            .iter()
763            .filter_map(
764                |(url, state)| {
765                    if matches!(state, HostState::Suspend) {
766                        Some(url.clone())
767                    } else {
768                        None
769                    }
770                },
771            )
772            .collect()
773    }
774
775    /// Register a channel as connected.
776    pub(crate) async fn register_channel(&self, channel: ChannelPtr) {
777        let address = channel.address().clone();
778
779        // Skip Tor-style inbound connections
780        if channel.p2p().settings().read().await.inbound_addrs.contains(&address) {
781            return
782        }
783
784        if let Err(e) = self.try_register(address, HostState::Connected(channel.clone())) {
785            warn!(target: "net::hosts::register_channel", "[P2P] Error registering channel: {e:?}");
786            return
787        }
788
789        self.channel_publisher.notify(Ok(channel)).await;
790        *self.last_connection.lock() = Instant::now();
791    }
792
793    /// Insert addresses into the greylist after filtering.
794    pub(crate) async fn insert(&self, color: HostColor, addrs: &[(Url, u64)]) {
795        let filtered = self.filter_addresses(addrs).await;
796        let mut count = 0;
797
798        for (addr, last_seen) in filtered {
799            if self.try_register(addr.clone(), HostState::Insert).is_err() {
800                continue;
801            }
802
803            self.container.store_and_trim(color, addr.clone(), last_seen);
804            let _ = self.unregister(&addr);
805            count += 1;
806        }
807
808        if count > 0 {
809            self.store_publisher.notify(count).await;
810        }
811    }
812
813    /// Find a connectable address from the given hosts.
814    pub(crate) async fn check_addrs(&self, hosts: Vec<(Url, u64)>) -> Option<(Url, u64)> {
815        let settings = self.settings.read().await;
816        let seeds = &settings.seeds;
817        let external = self.external_addrs().await;
818
819        for (host, last_seen) in hosts {
820            if seeds.contains(&host) || external.contains(&host) {
821                continue;
822            }
823
824            if self.try_register(host.clone(), HostState::Connect).is_ok() {
825                return Some((host, last_seen))
826            }
827        }
828
829        None
830    }
831
832    /// Move a host to the greylist.
833    pub async fn greylist_host(&self, addr: &Url, last_seen: u64) -> Result<()> {
834        self.move_host(addr, last_seen, HostColor::Grey).await?;
835        self.unregister(addr)
836    }
837
838    /// Move a host to the whitelist.
839    pub async fn whitelist_host(&self, addr: &Url, last_seen: u64) -> Result<()> {
840        self.move_host(addr, last_seen, HostColor::White).await?;
841        self.unregister(addr)
842    }
843
844    /// Move a host between lists (requires Move state).
845    pub(crate) async fn move_host(
846        &self,
847        addr: &Url,
848        last_seen: u64,
849        dest: HostColor,
850    ) -> Result<()> {
851        self.try_register(addr.clone(), HostState::Move)?;
852
853        if dest == HostColor::Black {
854            if addr.host_str().is_none() {
855                return Ok(())
856            }
857            if !self.settings.read().await.localnet && self.is_local_host(addr) {
858                return Ok(())
859            }
860        }
861
862        self.container.move_host(addr, last_seen, dest)
863    }
864
865    /// Get the last_seen for an address across all active lists.
866    pub fn fetch_last_seen(&self, addr: &Url) -> Option<u64> {
867        for color in [HostColor::Gold, HostColor::White, HostColor::Grey] {
868            if let Some(ls) = self.container.get_last_seen(color, addr) {
869                return Some(ls)
870            }
871        }
872        None
873    }
874
875    /// Check if we have an existing connection to a host (any port).
876    pub fn has_existing_connection(&self, url: &Url) -> bool {
877        let host_str = match url.host_str() {
878            Some(h) => h,
879            None => return false,
880        };
881        self.container.contains_hostname(&[HostColor::Gold, HostColor::White], host_str)
882    }
883
884    async fn filter_addresses(&self, addrs: &[(Url, u64)]) -> Vec<(Url, u64)> {
885        let settings = self.settings.read().await;
886        let external_addrs = self.external_addrs().await;
887        let mut result = vec![];
888
889        'addr_loop: for (addr, last_seen) in addrs {
890            // Validate format
891            if addr.host_str().is_none() || addr.port().is_none() || addr.cannot_be_a_base() {
892                continue;
893            }
894
895            // Skip configured seeds and peers
896            if settings.seeds.contains(addr) || settings.peers.contains(addr) {
897                continue;
898            }
899
900            // Skip blacklisted
901            if self.container.contains(HostColor::Black, addr) || self.block_all_ports(addr) {
902                continue;
903            }
904
905            let host = addr.host().unwrap();
906
907            // Skip our own addresses
908            if !settings.localnet {
909                for ext in &external_addrs {
910                    if host == ext.host().unwrap() {
911                        continue 'addr_loop;
912                    }
913                }
914            } else {
915                for ext in &settings.external_addrs {
916                    if addr.port() == ext.port() {
917                        continue 'addr_loop;
918                    }
919                }
920            }
921
922            // Skip local addresses in production
923            if !settings.localnet && self.is_local_host(addr) {
924                continue;
925            }
926
927            // Validate transport-specific formats
928            if !self.validate_transport(addr) {
929                continue;
930            }
931
932            // Store unsupported transports on dark list
933            if !settings.active_profiles.contains(&addr.scheme().to_string()) ||
934                (!self.ipv6_available.load(Ordering::SeqCst) && self.is_ipv6(addr))
935            {
936                self.container.store_and_trim(HostColor::Dark, addr.clone(), *last_seen);
937                self.container.refresh(HostColor::Dark, 86400);
938
939                if !settings.mixed_profiles.contains(&addr.scheme().to_string()) {
940                    continue;
941                }
942            }
943
944            // Skip if already in active lists
945            if self
946                .container
947                .contains_any(&[HostColor::Gold, HostColor::White, HostColor::Grey], addr)
948            {
949                continue;
950            }
951
952            result.push((addr.clone(), *last_seen));
953        }
954
955        result
956    }
957
958    fn validate_transport(&self, addr: &Url) -> bool {
959        match addr.scheme() {
960            "tcp" | "tcp+tls" => true,
961
962            #[cfg(feature = "p2p-tor")]
963            "tor" | "tor+tls" => {
964                use std::str::FromStr;
965                tor_hscrypto::pk::HsId::from_str(addr.host_str().unwrap()).is_ok()
966            }
967
968            #[cfg(feature = "p2p-nym")]
969            "nym" | "nym+tls" => false, // Temp skip
970
971            #[cfg(feature = "p2p-i2p")]
972            "i2p" | "i2p+tls" => Self::is_i2p_host(addr.host_str().unwrap()),
973
974            #[cfg(feature = "p2p-quic")]
975            "quic" => true,
976
977            _ => false,
978        }
979    }
980
981    pub(crate) async fn import_blacklist(&self) -> Result<()> {
982        let settings = self.settings.read().await;
983
984        for (hostname, schemes, ports) in &settings.blacklist {
985            let schemes =
986                if schemes.is_empty() { vec!["tcp+tls".to_string()] } else { schemes.clone() };
987
988            let ports = if ports.is_empty() { vec![0] } else { ports.clone() };
989
990            for scheme in &schemes {
991                for &port in &ports {
992                    let url_string = if port == 0 {
993                        format!("{scheme}://{hostname}")
994                    } else {
995                        format!("{scheme}://{hostname}:{port}")
996                    };
997
998                    if let Ok(url) = Url::parse(&url_string) {
999                        self.container.store_and_trim(HostColor::Black, url, 0);
1000                    }
1001                }
1002            }
1003        }
1004
1005        Ok(())
1006    }
1007
1008    /// Check if a host is blacklisted without a port (blocks all ports).
1009    pub(crate) fn block_all_ports(&self, url: &Url) -> bool {
1010        let host = match url.host() {
1011            Some(h) => h,
1012            None => return false,
1013        };
1014
1015        self.container.lists.read()[HostColor::Black as usize]
1016            .iter()
1017            .any(|(u, _)| u.host() == Some(host.clone()) && u.port().is_none())
1018    }
1019
1020    pub fn is_local_host(&self, url: &Url) -> bool {
1021        match url.host() {
1022            None => false,
1023            Some(Host::Ipv4(ip)) => !ip.unstable_is_global(),
1024            Some(Host::Ipv6(ip)) => !ip.unstable_is_global(),
1025            Some(Host::Domain(d)) => LOCAL_HOST_STRS.contains(&d),
1026        }
1027    }
1028
1029    pub fn is_ipv6(&self, url: &Url) -> bool {
1030        matches!(url.host(), Some(Host::Ipv6(_)))
1031    }
1032
1033    pub(crate) fn add_auto_addr(&self, addr: Ipv6Addr) {
1034        self.auto_self_addrs.lock().push(addr);
1035    }
1036
1037    pub fn guess_auto_addr(&self) -> Option<Ipv6Addr> {
1038        let mut addrs = self.auto_self_addrs.lock();
1039        most_frequent_or_any(addrs.make_contiguous())
1040    }
1041
1042    pub async fn external_addrs(&self) -> Vec<Url> {
1043        let mut addrs = self.settings.read().await.external_addrs.clone();
1044        for addr in &mut addrs {
1045            self.patch_port(addr);
1046            self.patch_auto_addr(addr);
1047        }
1048        addrs
1049    }
1050
1051    fn patch_auto_addr(&self, addr: &mut Url) {
1052        if addr.scheme() != "tcp" && addr.scheme() != "tcp+tls" {
1053            return
1054        }
1055
1056        if let Some(Host::Ipv6(ip)) = addr.host() {
1057            if ip.is_unspecified() {
1058                if let Some(auto) = self.guess_auto_addr() {
1059                    let _ = addr.set_ip_host(IpAddr::V6(auto));
1060                }
1061            }
1062        }
1063    }
1064
1065    fn patch_port(&self, _addr: &mut Url) {
1066        // TODO: Lookup port from InboundSession when port is 0
1067    }
1068
1069    #[cfg(feature = "p2p-i2p")]
1070    fn is_i2p_host(host: &str) -> bool {
1071        if !host.ends_with(".i2p") {
1072            return false
1073        }
1074
1075        let name = host.trim_end_matches(".i2p");
1076
1077        if name.ends_with(".b32") {
1078            let b32 = name.trim_end_matches(".b32");
1079            let decoded = crate::util::encoding::base32::decode(b32);
1080            return decoded.is_some() && decoded.unwrap().len() == 32
1081        }
1082
1083        name.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '.')
1084    }
1085
1086    pub async fn subscribe_store(&self) -> Subscription<usize> {
1087        self.store_publisher.clone().subscribe().await
1088    }
1089
1090    pub async fn subscribe_channel(&self) -> Subscription<Result<ChannelPtr>> {
1091        self.channel_publisher.clone().subscribe().await
1092    }
1093
1094    pub async fn subscribe_disconnect(&self) -> Subscription<Error> {
1095        self.disconnect_publisher.clone().subscribe().await
1096    }
1097}
1098
1099// Copied from https://doc.rust-lang.org/stable/src/core/net/ip_addr.rs.html#839
1100trait UnstableFeatureIp {
1101    fn unstable_is_global(&self) -> bool;
1102    fn unstable_is_shared(&self) -> bool;
1103    fn unstable_is_benchmarking(&self) -> bool;
1104    fn unstable_is_reserved(&self) -> bool;
1105    fn unstable_is_documentation(&self) -> bool;
1106}
1107
1108impl UnstableFeatureIp for Ipv4Addr {
1109    #[inline]
1110    fn unstable_is_global(&self) -> bool {
1111        !(self.octets()[0] == 0 // "This network"
1112            || self.is_private()
1113            || self.unstable_is_shared()
1114            || self.is_loopback()
1115            || self.is_link_local()
1116            // addresses reserved for future protocols (`192.0.0.0/24`)
1117            // .9 and .10 are documented as globally reachable so they're excluded
1118            || (
1119                self.octets()[0] == 192 && self.octets()[1] == 0 && self.octets()[2] == 0
1120                && self.octets()[3] != 9 && self.octets()[3] != 10
1121            )
1122            || self.unstable_is_documentation()
1123            || self.unstable_is_benchmarking()
1124            || self.unstable_is_reserved()
1125            || self.is_broadcast())
1126    }
1127
1128    #[inline]
1129    fn unstable_is_shared(&self) -> bool {
1130        self.octets()[0] == 100 && (self.octets()[1] & 0b1100_0000 == 0b0100_0000)
1131    }
1132
1133    #[inline]
1134    fn unstable_is_benchmarking(&self) -> bool {
1135        self.octets()[0] == 198 && (self.octets()[1] & 0xfe) == 18
1136    }
1137
1138    #[inline]
1139    fn unstable_is_reserved(&self) -> bool {
1140        self.octets()[0] & 240 == 240 && !self.is_broadcast()
1141    }
1142
1143    #[inline]
1144    fn unstable_is_documentation(&self) -> bool {
1145        matches!(self.octets(), [192, 0, 2, _] | [198, 51, 100, _] | [203, 0, 113, _])
1146    }
1147}
1148
1149impl UnstableFeatureIp for Ipv6Addr {
1150    fn unstable_is_global(&self) -> bool {
1151        !(self.is_unspecified()
1152            || self.is_loopback()
1153            // IPv4-mapped Address (`::ffff:0:0/96`)
1154            || matches!(self.segments(), [0, 0, 0, 0, 0, 0xffff, _, _])
1155            // IPv4-IPv6 Translat. (`64:ff9b:1::/48`)
1156            || matches!(self.segments(), [0x64, 0xff9b, 1, _, _, _, _, _])
1157            // Discard-Only Address Block (`100::/64`)
1158            || matches!(self.segments(), [0x100, 0, 0, 0, _, _, _, _])
1159            // IETF Protocol Assignments (`2001::/23`)
1160            || (matches!(self.segments(), [0x2001, b, _, _, _, _, _, _] if b < 0x200)
1161                && !(
1162                    // Port Control Protocol Anycast (`2001:1::1`)
1163                    u128::from_be_bytes(self.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0001
1164                    // Traversal Using Relays around NAT Anycast (`2001:1::2`)
1165                    || u128::from_be_bytes(self.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0002
1166                    // AMT (`2001:3::/32`)
1167                    || matches!(self.segments(), [0x2001, 3, _, _, _, _, _, _])
1168                    // AS112-v6 (`2001:4:112::/48`)
1169                    || matches!(self.segments(), [0x2001, 4, 0x112, _, _, _, _, _])
1170                    // ORCHIDv2 (`2001:20::/28`)
1171                    // Drone Remote ID Protocol Entity Tags (DETs) Prefix (`2001:30::/28`)`
1172                    || matches!(self.segments(), [0x2001, b, _, _, _, _, _, _] if (0x20..=0x3F).contains(&b))
1173                ))
1174            // 6to4 (`2002::/16`) – it's not explicitly documented as globally reachable,
1175            // IANA says N/A.
1176            || matches!(self.segments(), [0x2002, _, _, _, _, _, _, _])
1177            || self.unstable_is_documentation()
1178            // Segment Routing (SRv6) SIDs (`5f00::/16`)
1179            || matches!(self.segments(), [0x5f00, ..])
1180            || self.is_unique_local()
1181            || self.is_unicast_link_local())
1182    }
1183
1184    #[inline]
1185    fn unstable_is_shared(&self) -> bool {
1186        // Noop for ipv6
1187        false
1188    }
1189
1190    #[inline]
1191    fn unstable_is_benchmarking(&self) -> bool {
1192        (self.segments()[0] == 0x2001) && (self.segments()[1] == 0x2) && (self.segments()[2] == 0)
1193    }
1194
1195    #[inline]
1196    fn unstable_is_reserved(&self) -> bool {
1197        // Noop for ipv6
1198        false
1199    }
1200
1201    #[inline]
1202    fn unstable_is_documentation(&self) -> bool {
1203        matches!(self.segments(), [0x2001, 0xdb8, ..] | [0x3fff, 0..=0x0fff, ..])
1204    }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::*;
1210
1211    fn make_hosts() -> HostsPtr {
1212        let settings = Settings::default();
1213        Hosts::new(Arc::new(AsyncRwLock::new(settings)))
1214    }
1215
1216    #[test]
1217    fn test_is_local_host() {
1218        let hosts = make_hosts();
1219
1220        let local = vec![
1221            "tcp://localhost:1234",
1222            "tcp://127.0.0.1:1234",
1223            "tcp+tls://[::1]:1234",
1224            "tcp://192.168.10.65:1234",
1225        ];
1226
1227        for url in local {
1228            assert!(hosts.is_local_host(&Url::parse(url).unwrap()), "{url} should be local");
1229        }
1230
1231        let remote = vec![
1232            "https://dyne.org:443",
1233            "tcp://77.168.10.65:2222",
1234            "tcp://[2345:0425:2CA1::5673:23b5]:1234",
1235        ];
1236
1237        for url in remote {
1238            assert!(!hosts.is_local_host(&Url::parse(url).unwrap()), "{url} should be remote");
1239        }
1240    }
1241
1242    #[test]
1243    fn test_container_operations() {
1244        let container = HostContainer::new();
1245        let url = Url::parse("tcp://test.com:1234").unwrap();
1246        let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
1247
1248        // Store and retrieve
1249        container.store(HostColor::Grey, url.clone(), now);
1250        assert!(container.contains(HostColor::Grey, &url));
1251        assert!(!container.contains(HostColor::White, &url));
1252
1253        // Move atomically
1254        container.move_host(&url, now, HostColor::White).unwrap();
1255        assert!(!container.contains(HostColor::Grey, &url));
1256        assert!(container.contains(HostColor::White, &url));
1257
1258        // Remove
1259        container.remove(HostColor::White, &url);
1260        assert!(!container.contains(HostColor::White, &url));
1261    }
1262
1263    #[test]
1264    fn test_contains_any() {
1265        let container = HostContainer::new();
1266        let url = Url::parse("tcp://test.com:1234").unwrap();
1267        let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
1268
1269        container.store(HostColor::Gold, url.clone(), now);
1270
1271        assert!(container.contains_any(&[HostColor::Grey, HostColor::Gold], &url));
1272        assert!(!container.contains_any(&[HostColor::Grey, HostColor::White], &url));
1273    }
1274
1275    #[test]
1276    fn test_host_state_transitions() {
1277        let valid = [
1278            (HostState::Free(0), HostState::Insert),
1279            (HostState::Free(0), HostState::Refine),
1280            (HostState::Free(0), HostState::Connect),
1281            (HostState::Suspend, HostState::Refine),
1282            (HostState::Move, HostState::Suspend),
1283        ];
1284
1285        for (from, to) in valid {
1286            assert!(from.try_transition(to).is_ok());
1287        }
1288
1289        let invalid = [
1290            (HostState::Insert, HostState::Connect),
1291            (HostState::Refine, HostState::Insert),
1292            (HostState::Suspend, HostState::Connect),
1293        ];
1294
1295        for (from, to) in invalid {
1296            assert!(from.try_transition(to).is_err());
1297        }
1298    }
1299
1300    #[test]
1301    fn test_random_channel_empty() {
1302        let hosts = make_hosts();
1303        assert!(hosts.random_channel().is_none());
1304    }
1305
1306    #[test]
1307    fn test_block_all_ports() {
1308        let hosts = make_hosts();
1309
1310        let with_port = Url::parse("tcp+tls://example.com:333").unwrap();
1311        let without_port = Url::parse("tcp+tls://blocked.com").unwrap();
1312
1313        hosts.container.store(HostColor::Black, with_port.clone(), 0);
1314        hosts.container.store(HostColor::Black, without_port.clone(), 0);
1315
1316        let test_url = Url::parse("tcp+tls://blocked.com:9999").unwrap();
1317        assert!(hosts.block_all_ports(&test_url));
1318
1319        let test_url2 = Url::parse("tcp+tls://example.com:9999").unwrap();
1320        assert!(!hosts.block_all_ports(&test_url2));
1321    }
1322
1323    #[test]
1324    fn test_refresh() {
1325        let container = HostContainer::new();
1326        let old_time = 1720000000u64;
1327        let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
1328
1329        // Add old entries
1330        for i in 0..5 {
1331            let url = Url::parse(&format!("tcp://old{i}.com:123")).unwrap();
1332            container.store(HostColor::Dark, url, old_time);
1333        }
1334
1335        // Add new entries
1336        for i in 0..5 {
1337            let url = Url::parse(&format!("tcp://new{i}.com:123")).unwrap();
1338            container.store(HostColor::Dark, url, now);
1339        }
1340
1341        container.refresh(HostColor::Dark, 86400);
1342
1343        let all = container.fetch_all(HostColor::Dark);
1344        assert_eq!(all.len(), 5);
1345        assert!(all.iter().all(|(_, ls)| *ls > old_time));
1346    }
1347
1348    #[test]
1349    fn test_transport_mixing() {
1350        let hosts = HostContainer::mix_host(
1351            &Url::parse("tcp://dark.fi:28880").unwrap(),
1352            &["tor".to_string(), "tcp".to_string()],
1353            &["tcp".to_string()],
1354            &Url::parse("socks5://127.0.0.1:9050").ok(),
1355            &None,
1356        );
1357
1358        assert_eq!(hosts.len(), 1);
1359        assert_eq!(hosts[0].scheme(), "tor");
1360    }
1361
1362    #[test]
1363    fn test_prune_registry() {
1364        let hosts = make_hosts();
1365        let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
1366
1367        // Insert an entry that should be pruned (old Free)
1368        let old_url = Url::parse("tcp://old.example.com:123").unwrap();
1369        let old_age = now.saturating_sub(super::REGISTRY_PRUNE_AGE_SECS + 1000);
1370        hosts.registry.lock().insert(old_url.clone(), HostState::Free(old_age));
1371
1372        // Insert an entry that should NOT be pruned (recent Free)
1373        let new_url = Url::parse("tcp://new.example.com:123").unwrap();
1374        hosts.registry.lock().insert(new_url.clone(), HostState::Free(now));
1375
1376        // Insert an entry that should NOT be pruned (non-Free state)
1377        let active_url = Url::parse("tcp://active.example.com:123").unwrap();
1378        hosts.registry.lock().insert(active_url.clone(), HostState::Connect);
1379
1380        assert_eq!(hosts.registry.lock().len(), 3);
1381
1382        let pruned = hosts.prune_registry();
1383        assert_eq!(pruned, 1);
1384
1385        let registry = hosts.registry.lock();
1386        assert_eq!(registry.len(), 2);
1387        assert!(!registry.contains_key(&old_url));
1388        assert!(registry.contains_key(&new_url));
1389        assert!(registry.contains_key(&active_url));
1390    }
1391}