darkfi/dht/
tasks.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{sync::Arc, time::UNIX_EPOCH};
20use tracing::{error, info, warn};
21
22use crate::{
23    dht::{event::DhtEvent, ChannelCacheItem, DhtHandler, DhtNode, SESSION_MANUAL},
24    net::{
25        hosts::HostColor,
26        session::{SESSION_DIRECT, SESSION_INBOUND, SESSION_OUTBOUND},
27    },
28    system::sleep,
29    util::time::Timestamp,
30    Result,
31};
32
33/// Handle DHT events.
34pub async fn events_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
35    let dht = handler.dht();
36    let sub = dht.event_publisher.clone().subscribe().await;
37    loop {
38        let event = sub.receive().await;
39
40        match event {
41            // On [`DhtEvent::PingReceived`] set channel_cache.ping_received = true
42            DhtEvent::PingReceived { from, .. } => {
43                let channel_cache_lock = dht.channel_cache.clone();
44                let mut channel_cache = channel_cache_lock.write().await;
45                if let Some(cached) = channel_cache.get_mut(&from.info.id) {
46                    cached.ping_received = true;
47                }
48            }
49            // On [`DhtEvent::PingSent`] set channel_cache.ping_sent = true
50            DhtEvent::PingSent { to, .. } => {
51                let channel_cache_lock = dht.channel_cache.clone();
52                let mut channel_cache = channel_cache_lock.write().await;
53                if let Some(cached) = channel_cache.get_mut(&to.info.id) {
54                    cached.ping_sent = true;
55                }
56            }
57            _ => {}
58        }
59    }
60}
61
62/// Send a DHT ping request when there is a new channel, to know the node id of the new peer,
63/// Then fill the channel cache and the buckets
64pub async fn channel_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
65    let dht = handler.dht();
66    let p2p = dht.p2p.clone();
67    let channel_sub = p2p.hosts().subscribe_channel().await;
68    loop {
69        let res = channel_sub.receive().await;
70        if res.is_err() {
71            continue;
72        }
73        let channel = res.unwrap();
74
75        let channel_cache_lock = dht.channel_cache.clone();
76        let mut channel_cache = channel_cache_lock.write().await;
77
78        // Skip this channel if it's not new
79        if channel_cache.keys().any(|&k| k == channel.info.id) {
80            continue;
81        }
82
83        channel_cache.insert(
84            channel.info.id,
85            ChannelCacheItem {
86                node: None,
87                last_used: Timestamp::current_time(),
88                ping_received: false,
89                ping_sent: false,
90            },
91        );
92        drop(channel_cache);
93
94        // It's a manual connection
95        if channel.session_type_id() & SESSION_MANUAL != 0 {
96            let ping_res = dht.ping(channel.clone()).await;
97
98            if let Err(e) = ping_res {
99                warn!(target: "dht::channel_task", "Error while pinging manual connection (requesting node id) {}: {e}", channel.display_address());
100                continue;
101            }
102        }
103
104        // It's an outbound connection
105        if channel.session_type_id() & SESSION_OUTBOUND != 0 {
106            let _ = dht.ping(channel.clone()).await;
107            continue;
108        }
109
110        // It's a direct connection
111        if channel.session_type_id() & SESSION_DIRECT != 0 {
112            p2p.session_direct().inc_channel_usage(&channel, 1).await;
113            let _ = dht.ping(channel.clone()).await;
114            dht.cleanup_channel(channel).await;
115            continue;
116        }
117    }
118}
119
120/// Periodically send a DHT ping to known hosts. If the ping is successful, we
121/// move the host to the whitelist (updating the last seen field).
122///
123/// This is necessary to prevent unresponsive nodes staying on the whitelist,
124/// as the DHT does not require any outbound slot.
125pub async fn dht_refinery_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
126    let interval = 60; // TODO: Make a setting
127    let min_ping_interval = 10 * 60; // TODO: Make a setting
128    let dht = handler.dht();
129    let hosts = dht.p2p.hosts();
130
131    loop {
132        let mut hostlist = hosts.container.fetch_all(HostColor::Gold);
133        hostlist.extend(hosts.container.fetch_all(HostColor::White));
134
135        // Include the greylist only if the DHT is not bootstrapped yet
136        if !handler.dht().is_bootstrapped().await {
137            hostlist.extend(hosts.container.fetch_all(HostColor::Grey));
138        }
139
140        for entry in &hostlist {
141            let url = &entry.0;
142            let host_cache = dht.host_cache.read().await;
143            let last_ping = host_cache.get(url).map(|h| h.last_ping.inner());
144            if last_ping.is_some() &&
145                last_ping.unwrap() > Timestamp::current_time().inner() - min_ping_interval
146            {
147                continue
148            }
149            drop(host_cache);
150
151            let res = dht.create_channel(url).await;
152            if res.is_err() {
153                continue
154            }
155            let (channel, _) = res.unwrap();
156            dht.cleanup_channel(channel).await;
157
158            let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
159            if let Err(e) = hosts.whitelist_host(url, last_seen).await {
160                error!(target: "dht::tasks::whitelist_refinery_task", "Could not send {url} to the whitelist: {e}");
161            }
162            break
163        }
164
165        match hostlist.is_empty() {
166            true => sleep(5).await,
167            false => sleep(interval).await,
168        }
169    }
170}
171
172/// Add a node to the DHT buckets.
173/// If the bucket is already full, we ping the least recently seen node in the
174/// bucket: if successful it becomes the most recently seen node, if the ping
175/// fails we remove it and add the new node.
176/// [`crate::dht::Dht::update_node()`] increments a channel's usage count
177/// (in the direct session) and triggers this task. This task decrements the
178/// usage count using [`crate::dht::Dht::cleanup_channel()`].
179pub async fn add_node_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
180    let dht = handler.dht();
181    loop {
182        let (node, channel) = dht.add_node_rx.recv().await.unwrap();
183
184        let self_node = handler.node().await;
185        if self_node.is_err() {
186            continue;
187        }
188        let self_node = self_node.unwrap();
189
190        let bucket_index = dht.get_bucket_index(&self_node.id(), &node.id()).await;
191        let buckets_lock = dht.buckets.clone();
192        let mut buckets = buckets_lock.write().await;
193        let bucket = &mut buckets[bucket_index];
194
195        // Do not add ourselves to the buckets
196        if node.id() == self_node.id() {
197            dht.cleanup_channel(channel).await;
198            continue;
199        }
200
201        // Don't add this node if it has any external address that is the same as one of ours
202        let node_addresses = node.addresses();
203        if self_node.addresses().iter().any(|addr| node_addresses.contains(addr)) {
204            dht.cleanup_channel(channel).await;
205            continue;
206        }
207
208        // Do not add a node to the buckets if it does not have an address
209        if node.addresses().is_empty() {
210            dht.cleanup_channel(channel).await;
211            continue;
212        }
213
214        // We already have this node, move it to the tail of the bucket
215        if let Some(node_index) = bucket.nodes.iter().position(|n| n.id() == node.id()) {
216            bucket.nodes.remove(node_index);
217            bucket.nodes.push(node);
218            dht.cleanup_channel(channel).await;
219            continue;
220        }
221
222        // Bucket is full
223        if bucket.nodes.len() >= dht.settings.k {
224            // Ping the least recently seen node
225            if let Ok((channel2, node)) = dht.get_channel(&bucket.nodes[0]).await {
226                // Ping was successful, move the least recently seen node to the tail
227                let n = bucket.nodes.remove(0);
228                bucket.nodes.push(n);
229                drop(buckets);
230                dht.on_new_node(&node.clone(), channel2.clone()).await;
231                dht.cleanup_channel(channel2).await;
232                dht.cleanup_channel(channel).await;
233                continue;
234            }
235
236            // Ping was not successful, remove the least recently seen node and add the new node
237            bucket.nodes.remove(0);
238            bucket.nodes.push(node.clone());
239            drop(buckets);
240            dht.on_new_node(&node.clone(), channel.clone()).await;
241            dht.cleanup_channel(channel).await;
242            continue;
243        }
244
245        // Bucket is not full, just add the node
246        bucket.nodes.push(node.clone());
247        drop(buckets);
248        dht.on_new_node(&node.clone(), channel.clone()).await;
249        dht.cleanup_channel(channel).await;
250    }
251}
252
253/// Close inbound connections that are unused for too long.
254pub async fn disconnect_inbounds_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
255    let interval = 10; // TODO: Make a setting
256    let dht = handler.dht();
257
258    loop {
259        sleep(interval).await;
260
261        let min_last_used = Timestamp::current_time().inner() - dht.settings.inbound_timeout;
262
263        let channel_cache_lock = dht.channel_cache.clone();
264        let mut channel_cache = channel_cache_lock.write().await;
265
266        for (channel_id, cached) in channel_cache.clone() {
267            // Check that:
268            // The channel timed out,
269            if cached.last_used.inner() >= min_last_used {
270                continue;
271            }
272            // The channel exists,
273            let channel = dht.p2p.get_channel(channel_id);
274            if channel.is_none() {
275                channel_cache.remove(&channel_id);
276                continue;
277            }
278            let channel = channel.unwrap();
279            // And the channel is inbound.
280            if channel.session_type_id() & SESSION_INBOUND == 0 {
281                continue;
282            }
283
284            // Now we can stop it and remove it from the channel cache
285            info!(target: "dht::disconnect_inbounds_task", "Closing expired inbound channel [{}]", channel.display_address());
286            channel.stop().await;
287            channel_cache.remove(&channel.info.id);
288        }
289    }
290}
291
292/// Removes entries from [`crate::dht::Dht::channel_cache`] when a channel is
293/// stopped.
294pub async fn cleanup_channels_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
295    let interval = 60; // TODO: Make a setting
296    let dht = handler.dht();
297
298    loop {
299        sleep(interval).await;
300
301        let channel_cache_lock = dht.channel_cache.clone();
302        let mut channel_cache = channel_cache_lock.write().await;
303
304        for (channel_id, _) in channel_cache.clone() {
305            match dht.p2p.get_channel(channel_id) {
306                Some(channel) => {
307                    if channel.is_stopped() {
308                        channel_cache.remove(&channel_id);
309                    }
310                }
311                None => {
312                    channel_cache.remove(&channel_id);
313                }
314            }
315        }
316    }
317}