darkfi/net/protocol/
protocol_address.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use async_trait::async_trait;
20use smol::{lock::RwLock as AsyncRwLock, Executor};
21use std::{sync::Arc, time::UNIX_EPOCH};
22use tracing::debug;
23use url::Url;
24
25use super::{
26    super::{
27        channel::ChannelPtr,
28        hosts::{HostColor, HostsPtr},
29        message::{AddrsMessage, GetAddrsMessage},
30        message_publisher::MessageSubscription,
31        p2p::P2pPtr,
32        session::SESSION_OUTBOUND,
33        settings::Settings,
34    },
35    protocol_base::{ProtocolBase, ProtocolBasePtr},
36    protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
37};
38use crate::Result;
39
40/// Defines address and get-address messages.
41///
42/// On receiving GetAddr, nodes reply an AddrMessage containing nodes from
43/// their hostlist.  On receiving an AddrMessage, nodes enter the info into
44/// their greylists.
45///
46/// The node selection logic for creating an AddrMessage is as follows:
47///
48/// 1. First select nodes matching the requested transports from the
49///    anchorlist. These nodes have the highest guarantee of being reachable,
50///    so we prioritize them first.
51///
52/// 2. Then select nodes matching the requested transports from the
53///    whitelist.
54///
55/// 3. Next select whitelist nodes that don't match our transports. We do
56///    this so that nodes share and propagate nodes of different transports,
57///    even if they can't connect to them themselves.
58///
59/// 4. Finally, if there's still space available, fill the remaining vector
60///    space with darklist entries. This is necessary to propagate transports
61///    that neither this node nor the receiving node support.
62pub struct ProtocolAddress {
63    channel: ChannelPtr,
64    addrs_sub: MessageSubscription<AddrsMessage>,
65    get_addrs_sub: MessageSubscription<GetAddrsMessage>,
66    hosts: HostsPtr,
67    settings: Arc<AsyncRwLock<Settings>>,
68    jobsman: ProtocolJobsManagerPtr,
69}
70
71const PROTO_NAME: &str = "ProtocolAddress";
72
73/// A vector of all currently accepted transports and valid transport
74/// combinations.  Should be updated if and when new transports are
75/// added. Creates a upper bound on the number of transports a given peer
76/// can request.
77const TRANSPORT_COMBOS: [&str; 9] =
78    ["tor", "tls", "tcp", "nym", "i2p", "tor+tls", "nym+tls", "tcp+tls", "i2p+tls"];
79
80/// Strip query parameters from a URL before broadcasting.
81///
82/// This prevents leaking internal tracking identifiers (e.g., UPnP cookies)
83/// that could be used for fingerprinting nodes on the P2P network.
84fn strip_query_params(url: &Url) -> Url {
85    let mut stripped = url.clone();
86    stripped.set_query(None);
87    stripped
88}
89
90impl ProtocolAddress {
91    /// Creates a new address protocol. Makes an address, an external address
92    /// and a get-address subscription and adds them to the address protocol
93    /// instance.
94    pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
95        // Creates a subscription to address message
96        let addrs_sub =
97            channel.subscribe_msg::<AddrsMessage>().await.expect("Missing addrs dispatcher!");
98
99        // Creates a subscription to get-address message
100        let get_addrs_sub =
101            channel.subscribe_msg::<GetAddrsMessage>().await.expect("Missing getaddrs dispatcher!");
102
103        Arc::new(Self {
104            channel: channel.clone(),
105            addrs_sub,
106            get_addrs_sub,
107            hosts: p2p.hosts(),
108            jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
109            settings: p2p.settings(),
110        })
111    }
112
113    /// Handles receiving the address message. Loops to continually receive
114    /// address messages on the address subscription. Validates and adds the
115    /// received addresses to the greylist.
116    async fn handle_receive_addrs(self: Arc<Self>) -> Result<()> {
117        debug!(
118            target: "net::protocol_address::handle_receive_addrs",
119            "[START] address={}", self.channel.display_address(),
120        );
121
122        loop {
123            let addrs_msg = self.addrs_sub.receive().await?;
124            debug!(
125                target: "net::protocol_address::handle_receive_addrs",
126                "Received {} addrs from {}", addrs_msg.addrs.len(), self.channel.display_address(),
127            );
128
129            debug!(
130                target: "net::protocol_address::handle_receive_addrs",
131                "Appending to greylist...",
132            );
133
134            self.hosts.insert(HostColor::Grey, &addrs_msg.addrs).await;
135        }
136    }
137
138    /// Handles receiving the get-address message. Continually receives
139    /// get-address messages on the get-address subscription. Then replies
140    /// with an address message.
141    async fn handle_receive_get_addrs(self: Arc<Self>) -> Result<()> {
142        debug!(
143            target: "net::protocol_address::handle_receive_get_addrs",
144            "[START] address={}", self.channel.display_address(),
145        );
146
147        loop {
148            let get_addrs_msg = self.get_addrs_sub.receive().await?;
149
150            debug!(
151                target: "net::protocol_address::handle_receive_get_addrs",
152                "Received GetAddrs({}) message from {}", get_addrs_msg.max, self.channel.display_address(),
153            );
154
155            // Filter out transports not meant to be shared like Socks5 and Socks5+tls
156            let requested_transports: Vec<String> = get_addrs_msg
157                .transports
158                .iter()
159                .filter(|tp| TRANSPORT_COMBOS.contains(&tp.as_str()))
160                .cloned()
161                .collect();
162
163            // First we grab address with the requested transports from the gold list
164            debug!(target: "net::protocol_address::handle_receive_get_addrs",
165            "Fetching gold entries with schemes");
166            let mut addrs = self.hosts.container.fetch_n_random_with_schemes(
167                HostColor::Gold,
168                &requested_transports,
169                get_addrs_msg.max as usize,
170            );
171
172            // Then we grab address with the requested transports from the whitelist
173            debug!(target: "net::protocol_address::handle_receive_get_addrs",
174            "Fetching whitelist entries with schemes");
175            addrs.append(&mut self.hosts.container.fetch_n_random_with_schemes(
176                HostColor::White,
177                &requested_transports,
178                get_addrs_msg.max as usize,
179            ));
180
181            // Next we grab addresses without the requested transports
182            // to fill a 2 * max length vector.
183
184            // Then we grab address without the requested transports from the gold list
185            debug!(target: "net::protocol_address::handle_receive_get_addrs",
186            "Fetching gold entries without schemes");
187            let remain = 2 * get_addrs_msg.max as usize - addrs.len();
188            addrs.append(&mut self.hosts.container.fetch_n_random_excluding_schemes(
189                HostColor::Gold,
190                &requested_transports,
191                remain,
192            ));
193
194            // Then we grab address without the requested transports from the white list
195            debug!(target: "net::protocol_address::handle_receive_get_addrs",
196            "Fetching white entries without schemes");
197            let remain = 2 * get_addrs_msg.max as usize - addrs.len();
198            addrs.append(&mut self.hosts.container.fetch_n_random_excluding_schemes(
199                HostColor::White,
200                &requested_transports,
201                remain,
202            ));
203
204            // If there's still space available, take from the Dark list.
205
206            /* NOTE: We share peers from our Dark list because to ensure
207            that non-compatiable transports are shared with other nodes
208            so that they propagate on the network even if they're not
209            popular transports. */
210
211            debug!(target: "net::protocol_address::handle_receive_get_addrs",
212            "Fetching dark entries");
213            let remain = 2 * get_addrs_msg.max as usize - addrs.len();
214            addrs.append(&mut self.hosts.container.fetch_n_random(HostColor::Dark, remain));
215
216            // Filter out transports not meant to be shared like Socks5 and Socks5+tls
217            addrs.retain(|addr| TRANSPORT_COMBOS.contains(&addr.0.scheme()));
218
219            debug!(
220                target: "net::protocol_address::handle_receive_get_addrs",
221                "Sending {} addresses to {}", addrs.len(), self.channel.display_address(),
222            );
223
224            let addrs_msg = AddrsMessage { addrs };
225            self.channel.send(&addrs_msg).await?;
226        }
227    }
228
229    /// Send our own external addresses over a channel. Set the
230    /// last_seen field to now.
231    async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
232        debug!(
233            target: "net::protocol_address::send_my_addrs",
234            "[START] channel address={}", self.channel.display_address(),
235        );
236
237        if self.channel.session_type_id() != SESSION_OUTBOUND {
238            debug!(
239                target: "net::protocol_address::send_my_addrs",
240                "Not an outbound session. Stopping",
241            );
242            return Ok(())
243        }
244
245        let external_addrs = self.channel.hosts().external_addrs().await;
246
247        if external_addrs.is_empty() {
248            debug!(
249                target: "net::protocol_address::send_my_addrs",
250                "External addr not configured. Stopping",
251            );
252            return Ok(())
253        }
254
255        let mut addrs = vec![];
256
257        for addr in external_addrs {
258            let stripped_addr = strip_query_params(&addr);
259            let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
260            addrs.push((stripped_addr, last_seen));
261        }
262
263        debug!(
264            target: "net::protocol_address::send_my_addrs",
265            "Broadcasting {} addresses", addrs.len(),
266        );
267
268        let ext_addr_msg = AddrsMessage { addrs };
269        self.channel.send(&ext_addr_msg).await?;
270
271        debug!(
272            target: "net::protocol_address::send_my_addrs",
273            "[END] channel address={}", self.channel.display_address(),
274        );
275
276        Ok(())
277    }
278}
279
280#[async_trait]
281impl ProtocolBase for ProtocolAddress {
282    /// Start the address protocol. If it's an outbound session and has an
283    /// external address, send our external address. Run receive address
284    /// and get address protocols on the protocol task manager. Then send
285    /// get-address msg.
286    async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
287        debug!(
288            target: "net::protocol_address::start",
289            "START => address={}", self.channel.display_address(),
290        );
291
292        let settings = self.settings.read().await;
293        let outbound_connections = settings.outbound_connections;
294        let active_profiles = settings.active_profiles.clone();
295        let getaddrs_max = settings.getaddrs_max;
296        drop(settings);
297
298        self.jobsman.clone().start(ex.clone());
299
300        self.jobsman.clone().spawn(self.clone().send_my_addrs(), ex.clone()).await;
301
302        self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), ex.clone()).await;
303
304        self.jobsman.spawn(self.clone().handle_receive_get_addrs(), ex).await;
305
306        // Send get_address message.
307        // We ask for a maximum of u8::MAX addresses from a single node
308        let get_addrs = GetAddrsMessage {
309            max: getaddrs_max.unwrap_or(outbound_connections.min(u32::MAX as usize) as u32),
310            transports: active_profiles,
311        };
312        self.channel.send(&get_addrs).await?;
313
314        debug!(
315            target: "net::protocol_address::start",
316            "END => address={}", self.channel.display_address(),
317        );
318
319        Ok(())
320    }
321    fn name(&self) -> &'static str {
322        PROTO_NAME
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use darkfi_serial::serialize;
329
330    use crate::net::message::GET_ADDRS_MAX_BYTES;
331
332    use super::{GetAddrsMessage, TRANSPORT_COMBOS};
333
334    // Helps to check if the MAX_BYTES for GetAddrs message is valid as new transports are added
335    #[test]
336    fn test_get_addrs_msg_size() {
337        let message = GetAddrsMessage {
338            max: u8::MAX as u32,
339            transports: TRANSPORT_COMBOS.iter().map(|x| x.to_string()).collect(),
340        };
341
342        assert_eq!(serialize(&message).len() as u64, GET_ADDRS_MAX_BYTES);
343    }
344}