fud/
tasks.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use tracing::{error, info, warn};
22
23use darkfi::{
24    dht::{event::DhtEvent, DhtHandler, DhtNode},
25    geode::hash_to_string,
26    system::{sleep, StoppableTask},
27    Error, Result,
28};
29
30use crate::{
31    event::{self, notify_event},
32    proto::FudAnnounce,
33    resource::ResourceStatus,
34    Fud, FudEvent, FudState,
35};
36
37/// Handle DHT events in fud.
38pub async fn handle_dht_events(fud: Arc<Fud>) -> Result<()> {
39    let sub = fud.dht().subscribe().await;
40    loop {
41        let event = sub.receive().await;
42
43        match event {
44            DhtEvent::ValueLookupCompleted { key, values, .. } => {
45                let mut seeders: Vec<_> = values.into_iter().flatten().collect();
46                seeders.dedup_by_key(|seeder| seeder.node.id());
47                notify_event!(fud, SeedersFound, {
48                    hash: key,
49                    seeders
50                });
51            }
52            DhtEvent::BootstrapCompleted => {
53                let _ = fud.init().await;
54                notify_event!(fud, Ready);
55            }
56            _ => {}
57        }
58    }
59}
60
61/// Triggered when calling the `fud.get()` method.
62/// It creates a new StoppableTask (running `fud.fetch_resource()`) and inserts
63/// it into the `fud.fetch_tasks` hashmap. When the task is stopped it's
64/// removed from the hashmap.
65pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
66    loop {
67        let (hash, path, files) = fud.get_rx.recv().await.unwrap();
68
69        // Create the new task
70        let mut fetch_tasks = fud.fetch_tasks.write().await;
71        let task = StoppableTask::new();
72        fetch_tasks.insert(hash, task.clone());
73        drop(fetch_tasks);
74
75        // Start the new task
76        let fud_1 = fud.clone();
77        let fud_2 = fud.clone();
78        task.start(
79            async move { fud_1.fetch_resource(&hash, &path, &files).await },
80            move |res| async move {
81                // Remove the task from the `fud.fetch_tasks` hashmap once it is
82                // stopped (error, manually, or just done).
83                let mut fetch_tasks = fud_2.fetch_tasks.write().await;
84                fetch_tasks.remove(&hash);
85
86                // If there is still a lookup task for this hash, stop it
87                let lookup_tasks = fud_2.lookup_tasks.read().await;
88                if let Some(lookup_task) = lookup_tasks.get(&hash) {
89                    lookup_task.stop().await;
90                }
91
92                match res {
93                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
94                    Err(e) => {
95                        error!(target: "fud::get_task()", "Error while fetching resource: {e}");
96
97                        // Set resource status to `Incomplete`
98                        let mut resources = fud_2.resources.write().await;
99                        if let Some(resource) = resources.get_mut(&hash) {
100                            resource.status = ResourceStatus::Incomplete(Some(e.to_string()));
101                        }
102                        drop(resources);
103
104                        // Send a DownloadError for any error that stopped the fetch task
105                        notify_event!(fud_2, DownloadError, {
106                            hash,
107                            error: e.to_string(),
108                        });
109                    }
110                }
111            },
112            Error::DetachedTaskStopped,
113            fud.executor.clone(),
114        );
115    }
116}
117
118/// Triggered when calling the `fud.put()` method.
119pub async fn put_task(fud: Arc<Fud>) -> Result<()> {
120    loop {
121        let path = fud.put_rx.recv().await.unwrap();
122
123        // Create the new task
124        let mut put_tasks = fud.put_tasks.write().await;
125        let task = StoppableTask::new();
126        put_tasks.insert(path.clone(), task.clone());
127        drop(put_tasks);
128
129        // Start the new task
130        let fud_1 = fud.clone();
131        let fud_2 = fud.clone();
132        let path_ = path.clone();
133        task.start(
134            async move { fud_1.insert_resource(&path_).await },
135            move |res| async move {
136                // Remove the task from the `fud.put_tasks` hashmap once it is
137                // stopped (error, manually, or just done).
138                let mut put_tasks = fud_2.put_tasks.write().await;
139                put_tasks.remove(&path);
140                match res {
141                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
142                    Err(e) => {
143                        error!(target: "fud::put_task()", "Error while inserting resource: {e}");
144
145                        // Send a InsertError for any error that stopped the fetch task
146                        notify_event!(fud_2, InsertError, {
147                            path,
148                            error: e.to_string(),
149                        });
150                    }
151                }
152            },
153            Error::DetachedTaskStopped,
154            fud.executor.clone(),
155        );
156    }
157}
158
159/// Triggered when you need to lookup seeders for a resource.
160pub async fn lookup_task(fud: Arc<Fud>) -> Result<()> {
161    loop {
162        let key = fud.lookup_rx.recv().await.unwrap();
163
164        let mut lookup_tasks = fud.lookup_tasks.write().await;
165        let task = StoppableTask::new();
166        lookup_tasks.insert(key, task.clone());
167        drop(lookup_tasks);
168
169        let fud_1 = fud.clone();
170        let fud_2 = fud.clone();
171        task.start(
172            async move {
173                fud_1.dht.lookup_value(&key).await;
174                Ok(())
175            },
176            move |res| async move {
177                // Remove the task from the `fud.lookup_tasks` hashmap once it is
178                // stopped (error, manually, or just done).
179                let mut lookup_tasks = fud_2.lookup_tasks.write().await;
180                lookup_tasks.remove(&key);
181                match res {
182                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
183                    Err(e) => {
184                        error!(target: "dht::lookup_task()", "Error in DHT lookup task: {e}");
185                    }
186                }
187            },
188            Error::DetachedTaskStopped,
189            fud.executor.clone(),
190        );
191    }
192}
193
194/// After pinging an inbound connection, this task is triggered to make sure
195/// that you are able to reach at least one of the node's external address.
196/// [`Fud::ping()`] will take care of adding the node to our buckets.
197pub async fn verify_node_task(fud: Arc<Fud>) -> Result<()> {
198    loop {
199        let node = fud.verify_node_rx.recv().await.unwrap();
200        if let Ok((channel, _)) = fud.dht.create_channel_to_node(&node).await {
201            fud.dht.cleanup_channel(channel).await;
202        }
203    }
204}
205
206/// Background task that announces our files once every hour.
207/// Also removes seeders that did not announce for too long.
208pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
209    let interval = 3600; // TODO: Make a setting
210
211    loop {
212        sleep(interval).await;
213
214        info!(target: "fud::announce_seed_task()", "Verifying seeds...");
215        let seeding_resources = match fud.verify_resources(None).await {
216            Ok(resources) => resources,
217            Err(e) => {
218                error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {e}");
219                continue;
220            }
221        };
222
223        info!(target: "fud::announce_seed_task()", "Announcing files...");
224        for resource in seeding_resources {
225            if let Ok(seeder) = fud.new_seeder(&resource.hash).await {
226                let seeders = vec![seeder];
227                let _ = fud
228                    .dht
229                    .announce(
230                        &resource.hash,
231                        &seeders.clone(),
232                        &FudAnnounce { key: resource.hash, seeders },
233                    )
234                    .await;
235            }
236        }
237
238        info!(target: "fud::announce_seed_task()", "Pruning seeders...");
239        fud.prune_seeders(interval.try_into().unwrap()).await;
240    }
241}
242
243/// Background task that:
244/// 1. Updates the [`crate::bitcoin::BitcoinHashCache`]
245/// 2. Removes old nodes from the DHT
246/// 3. Removes old nodes from the seeders router
247/// 4. If the Bitcoin block hash we currently use in our `fud.node_data` is too old, we update it and reset our DHT
248pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
249    let interval = 600; // TODO: Make a setting
250
251    loop {
252        sleep(interval).await;
253
254        let mut pow = fud.pow.write().await;
255        if !pow.settings.read().await.btc_enabled {
256            continue
257        }
258
259        let btc = &mut pow.bitcoin_hash_cache;
260
261        if btc.update().await.is_err() {
262            continue
263        }
264
265        let state = fud.state.read().await;
266        if state.is_none() {
267            continue
268        }
269        let block = state.clone().unwrap().node_data.btc_block_hash;
270        drop(state);
271        let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
272            Some(i) => i < 6,
273            None => true,
274        };
275
276        if !needs_dht_reset {
277            // Removes nodes in the DHT with unknown BTC block hashes.
278            let dht = fud.dht();
279            let mut buckets = dht.buckets.write().await;
280            for bucket in buckets.iter_mut() {
281                for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
282                    // If this node's BTC block hash is unknown, remove it from the bucket
283                    if !btc.block_hashes.contains(&node.data.btc_block_hash) {
284                        bucket.nodes.remove(i);
285                        info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
286                    }
287                }
288            }
289            drop(buckets);
290
291            // Removes nodes in the seeders router with unknown BTC block hashes
292            let mut seeders_table = fud.dht.hash_table.write().await;
293            for (key, seeders) in seeders_table.iter_mut() {
294                for (i, seeder) in seeders.clone().iter().enumerate().rev() {
295                    if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
296                        seeders.remove(i);
297                        info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
298                    }
299                }
300            }
301
302            continue
303        }
304
305        info!(target: "fud::node_id_task()", "Creating a new node id...");
306        let (node_data, secret_key) = match pow.generate_node().await {
307            Ok(res) => res,
308            Err(e) => {
309                warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
310                continue
311            }
312        };
313        drop(pow);
314        info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
315
316        // Close all channels
317        let dht = fud.dht();
318        let mut channel_cache = dht.channel_cache.write().await;
319        for channel in dht.p2p.hosts().channels().clone() {
320            channel.stop().await;
321            channel_cache.remove(&channel.info.id);
322        }
323        drop(channel_cache);
324
325        // Reset the DHT: removes known nodes and seeders
326        dht.reset().await;
327
328        // Update our node data and our secret key
329        let mut state = fud.state.write().await;
330        *state = Some(FudState { node_data, secret_key });
331
332        // DHT will be bootstrapped on the next channel connection
333    }
334}
335
336macro_rules! start_task {
337    ($fud:expr, $task_name:expr, $task_fn:expr, $tasks:expr) => {{
338        info!(target: "fud", "Starting {} task", $task_name);
339        let task = StoppableTask::new();
340        let fud_ = $fud.clone();
341        task.clone().start(
342            async move { $task_fn(fud_).await },
343            |res| async {
344                match res {
345                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
346                    Err(e) => error!(target: "fud", "Failed starting {} task: {e}", $task_name),
347                }
348            },
349            Error::DetachedTaskStopped,
350            $fud.executor.clone(),
351        );
352        $tasks.insert($task_name.to_string(), task);
353    }};
354}
355pub(crate) use start_task;