darkfi/net/protocol/
protocol_address.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use async_trait::async_trait;
20use smol::{lock::RwLock as AsyncRwLock, Executor};
21use std::{sync::Arc, time::UNIX_EPOCH};
22use tracing::debug;
23
24use super::{
25    super::{
26        channel::ChannelPtr,
27        hosts::{HostColor, HostsPtr},
28        message::{AddrsMessage, GetAddrsMessage},
29        message_publisher::MessageSubscription,
30        p2p::P2pPtr,
31        session::SESSION_OUTBOUND,
32        settings::Settings,
33    },
34    protocol_base::{ProtocolBase, ProtocolBasePtr},
35    protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
36};
37use crate::Result;
38
39/// Defines address and get-address messages.
40///
41/// On receiving GetAddr, nodes reply an AddrMessage containing nodes from
42/// their hostlist.  On receiving an AddrMessage, nodes enter the info into
43/// their greylists.
44///
45/// The node selection logic for creating an AddrMessage is as follows:
46///
47/// 1. First select nodes matching the requested transports from the
48///    anchorlist. These nodes have the highest guarantee of being reachable,
49///    so we prioritize them first.
50///
51/// 2. Then select nodes matching the requested transports from the
52///    whitelist.
53///
54/// 3. Next select whitelist nodes that don't match our transports. We do
55///    this so that nodes share and propagate nodes of different transports,
56///    even if they can't connect to them themselves.
57///
58/// 4. Finally, if there's still space available, fill the remaining vector
59///    space with darklist entries. This is necessary to propagate transports
60///    that neither this node nor the receiving node support.
61pub struct ProtocolAddress {
62    channel: ChannelPtr,
63    addrs_sub: MessageSubscription<AddrsMessage>,
64    get_addrs_sub: MessageSubscription<GetAddrsMessage>,
65    hosts: HostsPtr,
66    settings: Arc<AsyncRwLock<Settings>>,
67    jobsman: ProtocolJobsManagerPtr,
68}
69
70const PROTO_NAME: &str = "ProtocolAddress";
71
72/// A vector of all currently accepted transports and valid transport
73/// combinations.  Should be updated if and when new transports are
74/// added. Creates a upper bound on the number of transports a given peer
75/// can request.
76const TRANSPORT_COMBOS: [&str; 9] =
77    ["tor", "tls", "tcp", "nym", "i2p", "tor+tls", "nym+tls", "tcp+tls", "i2p+tls"];
78
79impl ProtocolAddress {
80    /// Creates a new address protocol. Makes an address, an external address
81    /// and a get-address subscription and adds them to the address protocol
82    /// instance.
83    pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
84        // Creates a subscription to address message
85        let addrs_sub =
86            channel.subscribe_msg::<AddrsMessage>().await.expect("Missing addrs dispatcher!");
87
88        // Creates a subscription to get-address message
89        let get_addrs_sub =
90            channel.subscribe_msg::<GetAddrsMessage>().await.expect("Missing getaddrs dispatcher!");
91
92        Arc::new(Self {
93            channel: channel.clone(),
94            addrs_sub,
95            get_addrs_sub,
96            hosts: p2p.hosts(),
97            jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
98            settings: p2p.settings(),
99        })
100    }
101
102    /// Handles receiving the address message. Loops to continually receive
103    /// address messages on the address subscription. Validates and adds the
104    /// received addresses to the greylist.
105    async fn handle_receive_addrs(self: Arc<Self>) -> Result<()> {
106        debug!(
107            target: "net::protocol_address::handle_receive_addrs",
108            "[START] address={}", self.channel.display_address(),
109        );
110
111        loop {
112            let addrs_msg = self.addrs_sub.receive().await?;
113            debug!(
114                target: "net::protocol_address::handle_receive_addrs",
115                "Received {} addrs from {}", addrs_msg.addrs.len(), self.channel.display_address(),
116            );
117
118            debug!(
119                target: "net::protocol_address::handle_receive_addrs",
120                "Appending to greylist...",
121            );
122
123            self.hosts.insert(HostColor::Grey, &addrs_msg.addrs).await;
124        }
125    }
126
127    /// Handles receiving the get-address message. Continually receives
128    /// get-address messages on the get-address subscription. Then replies
129    /// with an address message.
130    async fn handle_receive_get_addrs(self: Arc<Self>) -> Result<()> {
131        debug!(
132            target: "net::protocol_address::handle_receive_get_addrs",
133            "[START] address={}", self.channel.display_address(),
134        );
135
136        loop {
137            let get_addrs_msg = self.get_addrs_sub.receive().await?;
138
139            debug!(
140                target: "net::protocol_address::handle_receive_get_addrs",
141                "Received GetAddrs({}) message from {}", get_addrs_msg.max, self.channel.display_address(),
142            );
143
144            // Filter out transports not meant to be shared like Socks5 and Socks5+tls
145            let requested_transports: Vec<String> = get_addrs_msg
146                .transports
147                .iter()
148                .filter(|tp| TRANSPORT_COMBOS.contains(&tp.as_str()))
149                .cloned()
150                .collect();
151
152            // First we grab address with the requested transports from the gold list
153            debug!(target: "net::protocol_address::handle_receive_get_addrs",
154            "Fetching gold entries with schemes");
155            let mut addrs = self.hosts.container.fetch_n_random_with_schemes(
156                HostColor::Gold,
157                &requested_transports,
158                get_addrs_msg.max,
159            );
160
161            // Then we grab address with the requested transports from the whitelist
162            debug!(target: "net::protocol_address::handle_receive_get_addrs",
163            "Fetching whitelist entries with schemes");
164            addrs.append(&mut self.hosts.container.fetch_n_random_with_schemes(
165                HostColor::White,
166                &requested_transports,
167                get_addrs_msg.max,
168            ));
169
170            // Next we grab addresses without the requested transports
171            // to fill a 2 * max length vector.
172
173            // Then we grab address without the requested transports from the gold list
174            debug!(target: "net::protocol_address::handle_receive_get_addrs",
175            "Fetching gold entries without schemes");
176            let remain = 2 * get_addrs_msg.max - addrs.len() as u32;
177            addrs.append(&mut self.hosts.container.fetch_n_random_excluding_schemes(
178                HostColor::Gold,
179                &requested_transports,
180                remain,
181            ));
182
183            // Then we grab address without the requested transports from the white list
184            debug!(target: "net::protocol_address::handle_receive_get_addrs",
185            "Fetching white entries without schemes");
186            let remain = 2 * get_addrs_msg.max - addrs.len() as u32;
187            addrs.append(&mut self.hosts.container.fetch_n_random_excluding_schemes(
188                HostColor::White,
189                &requested_transports,
190                remain,
191            ));
192
193            // If there's still space available, take from the Dark list.
194
195            /* NOTE: We share peers from our Dark list because to ensure
196            that non-compatiable transports are shared with other nodes
197            so that they propagate on the network even if they're not
198            popular transports. */
199
200            debug!(target: "net::protocol_address::handle_receive_get_addrs",
201            "Fetching dark entries");
202            let remain = 2 * get_addrs_msg.max - addrs.len() as u32;
203            addrs.append(&mut self.hosts.container.fetch_n_random(HostColor::Dark, remain));
204
205            // Filter out transports not meant to be shared like Socks5 and Socks5+tls
206            addrs.retain(|addr| TRANSPORT_COMBOS.contains(&addr.0.scheme()));
207
208            debug!(
209                target: "net::protocol_address::handle_receive_get_addrs",
210                "Sending {} addresses to {}", addrs.len(), self.channel.display_address(),
211            );
212
213            let addrs_msg = AddrsMessage { addrs };
214            self.channel.send(&addrs_msg).await?;
215        }
216    }
217
218    /// Send our own external addresses over a channel. Set the
219    /// last_seen field to now.
220    async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
221        debug!(
222            target: "net::protocol_address::send_my_addrs",
223            "[START] channel address={}", self.channel.display_address(),
224        );
225
226        if self.channel.session_type_id() != SESSION_OUTBOUND {
227            debug!(
228                target: "net::protocol_address::send_my_addrs",
229                "Not an outbound session. Stopping",
230            );
231            return Ok(())
232        }
233
234        let external_addrs = self.channel.hosts().external_addrs().await;
235
236        if external_addrs.is_empty() {
237            debug!(
238                target: "net::protocol_address::send_my_addrs",
239                "External addr not configured. Stopping",
240            );
241            return Ok(())
242        }
243
244        let mut addrs = vec![];
245
246        for addr in external_addrs {
247            let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
248            addrs.push((addr, last_seen));
249        }
250
251        debug!(
252            target: "net::protocol_address::send_my_addrs",
253            "Broadcasting {} addresses", addrs.len(),
254        );
255
256        let ext_addr_msg = AddrsMessage { addrs };
257        self.channel.send(&ext_addr_msg).await?;
258
259        debug!(
260            target: "net::protocol_address::send_my_addrs",
261            "[END] channel address={}", self.channel.display_address(),
262        );
263
264        Ok(())
265    }
266}
267
268#[async_trait]
269impl ProtocolBase for ProtocolAddress {
270    /// Start the address protocol. If it's an outbound session and has an
271    /// external address, send our external address. Run receive address
272    /// and get address protocols on the protocol task manager. Then send
273    /// get-address msg.
274    async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
275        debug!(
276            target: "net::protocol_address::start",
277            "START => address={}", self.channel.display_address(),
278        );
279
280        let settings = self.settings.read().await;
281        let outbound_connections = settings.outbound_connections;
282        let active_profiles = settings.active_profiles.clone();
283        let getaddrs_max = settings.getaddrs_max;
284        drop(settings);
285
286        self.jobsman.clone().start(ex.clone());
287
288        self.jobsman.clone().spawn(self.clone().send_my_addrs(), ex.clone()).await;
289
290        self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), ex.clone()).await;
291
292        self.jobsman.spawn(self.clone().handle_receive_get_addrs(), ex).await;
293
294        // Send get_address message.
295        // We ask for a maximum of u8::MAX addresses from a single node
296        let get_addrs = GetAddrsMessage {
297            max: getaddrs_max.unwrap_or(outbound_connections.min(u32::MAX as usize) as u32),
298            transports: active_profiles,
299        };
300        self.channel.send(&get_addrs).await?;
301
302        debug!(
303            target: "net::protocol_address::start",
304            "END => address={}", self.channel.display_address(),
305        );
306
307        Ok(())
308    }
309    fn name(&self) -> &'static str {
310        PROTO_NAME
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use darkfi_serial::serialize;
317
318    use crate::net::message::GET_ADDRS_MAX_BYTES;
319
320    use super::{GetAddrsMessage, TRANSPORT_COMBOS};
321
322    // Helps to check if the MAX_BYTES for GetAddrs message is valid as new transports are added
323    #[test]
324    fn test_get_addrs_msg_size() {
325        let message = GetAddrsMessage {
326            max: u8::MAX as u32,
327            transports: TRANSPORT_COMBOS.iter().map(|x| x.to_string()).collect(),
328        };
329
330        assert_eq!(serialize(&message).len() as u64, GET_ADDRS_MAX_BYTES);
331    }
332}