fud/
lib.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    io::ErrorKind,
22    path::{Path, PathBuf},
23    sync::Arc,
24};
25
26use sled_overlay::sled;
27use smol::{
28    channel,
29    fs::{self, OpenOptions},
30    lock::RwLock,
31};
32use tracing::{error, info, warn};
33
34use darkfi::{
35    dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings},
36    geode::{hash_to_string, Chunk, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
37    net::P2pPtr,
38    system::{ExecutorPtr, PublisherPtr, StoppableTask},
39    util::{path::expand_path, time::Timestamp},
40    Error, Result,
41};
42use darkfi_sdk::crypto::{schnorr::SchnorrSecret, SecretKey};
43use darkfi_serial::{deserialize_async, serialize_async};
44
45/// P2P protocols
46pub mod proto;
47use proto::FudAnnounce;
48
49/// FudEvent
50pub mod event;
51use event::{notify_event, FudEvent};
52
53/// Resource definition
54pub mod resource;
55use resource::{Resource, ResourceStatus, ResourceType};
56
57/// Scrap definition
58pub mod scrap;
59use scrap::Scrap;
60
61/// JSON-RPC related methods
62pub mod rpc;
63
64/// Background tasks
65pub mod tasks;
66use tasks::start_task;
67
68/// Bitcoin
69pub mod bitcoin;
70
71/// PoW
72pub mod pow;
73use pow::{FudPow, VerifiableNodeData};
74
75/// Equi-X
76pub mod equix;
77
78/// Settings and args
79pub mod settings;
80use settings::Args;
81
82/// Utils
83pub mod util;
84use util::{create_all_files, get_all_files, FileSelection};
85
86/// Download methods
87mod download;
88use download::{fetch_chunks, fetch_metadata};
89
90/// [`DhtHandler`] implementation and fud-specific DHT structs
91pub mod dht;
92use dht::FudSeeder;
93
94use crate::{dht::FudNode, pow::PowSettings};
95
96const SLED_PATH_TREE: &[u8] = b"_fud_paths";
97const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
98const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
99
100#[derive(Clone, Debug)]
101pub struct FudState {
102    /// Our own [`VerifiableNodeData`]
103    node_data: VerifiableNodeData,
104    /// Our secret key (the public key is in `node_data`)
105    secret_key: SecretKey,
106}
107
108pub struct Fud {
109    state: Arc<RwLock<Option<FudState>>>,
110    /// The Geode instance
111    geode: Geode,
112    /// Default download directory
113    downloads_path: PathBuf,
114    /// Chunk transfer timeout in seconds
115    chunk_timeout: u64,
116    /// The [`FudPow`] instance
117    pub pow: Arc<RwLock<FudPow>>,
118    /// The DHT instance
119    dht: Arc<Dht<Fud>>,
120    /// Resources (current status of all downloads/seeds)
121    resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
122    /// Chunked storages (represent files/directories)
123    chunked_storages: Arc<RwLock<HashMap<blake3::Hash, ChunkedStorage>>>,
124    /// Sled tree containing "resource hash -> path on the filesystem"
125    path_tree: sled::Tree,
126    /// Sled tree containing "resource hash -> file selection". If the file
127    /// selection is all files of the resource (or if the resource is not a
128    /// directory), the resource does not store its file selection in the tree.
129    file_selection_tree: sled::Tree,
130    /// Sled tree containing scraps which are chunks containing data the user
131    /// did not want to save to files. They also contain data the user wanted
132    /// otherwise we would not have downloaded the chunk at all.
133    /// We save scraps to be able to verify integrity even if part of the chunk
134    /// is not saved to the filesystem in the downloaded files.
135    /// "chunk/scrap hash -> chunk content"
136    scrap_tree: sled::Tree,
137    /// Get requests sender
138    get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
139    /// Get requests receiver
140    get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
141    /// Put requests sender
142    put_tx: channel::Sender<PathBuf>,
143    /// Put requests receiver
144    put_rx: channel::Receiver<PathBuf>,
145    /// Lookup requests sender
146    lookup_tx: channel::Sender<blake3::Hash>,
147    /// Lookup requests receiver
148    lookup_rx: channel::Receiver<blake3::Hash>,
149    /// Verify node requests sender
150    verify_node_tx: channel::Sender<FudNode>,
151    /// Verify node requests receiver
152    verify_node_rx: channel::Receiver<FudNode>,
153    /// Currently active downloading tasks (running the `fud.fetch_resource()` method)
154    fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
155    /// Currently active put tasks (running the `fud.insert_resource()` method)
156    put_tasks: Arc<RwLock<HashMap<PathBuf, Arc<StoppableTask>>>>,
157    /// Currently active lookup tasks (running the `fud.lookup_value()` method)
158    lookup_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
159    /// Currently active tasks (defined in `tasks`, started with the `start_task` macro)
160    tasks: Arc<RwLock<HashMap<String, Arc<StoppableTask>>>>,
161    /// Used to send events to fud clients
162    event_publisher: PublisherPtr<FudEvent>,
163    /// Pointer to the P2P network instance
164    p2p: P2pPtr,
165    /// Global multithreaded executor reference
166    pub executor: ExecutorPtr,
167}
168
169impl Fud {
170    pub async fn new(
171        settings: Args,
172        p2p: P2pPtr,
173        sled_db: &sled::Db,
174        event_publisher: PublisherPtr<FudEvent>,
175        executor: ExecutorPtr,
176    ) -> Result<Arc<Self>> {
177        let dht_settings: DhtSettings = settings.dht.into();
178        let net_settings_lock = p2p.settings();
179        let mut net_settings = net_settings_lock.write().await;
180        // We do not need any outbound slot
181        net_settings.outbound_connections = 0;
182        // Default GetAddrsMessage's `max` is dht's `k`
183        net_settings.getaddrs_max =
184            Some(net_settings.getaddrs_max.unwrap_or(dht_settings.k.min(u32::MAX as usize) as u32));
185        drop(net_settings);
186
187        let basedir = expand_path(&settings.base_dir)?;
188        let downloads_path = match settings.downloads_path {
189            Some(downloads_path) => expand_path(&downloads_path)?,
190            None => basedir.join("downloads"),
191        };
192
193        let pow_settings: PowSettings = settings.pow.into();
194        let pow = FudPow::new(pow_settings.clone(), executor.clone());
195
196        // Geode
197        info!(target: "fud::new()", "Instantiating Geode instance");
198        let geode = Geode::new(&basedir).await?;
199
200        // DHT
201        let dht: Arc<Dht<Fud>> =
202            Arc::new(Dht::<Fud>::new(&dht_settings, p2p.clone(), executor.clone()).await);
203
204        let (get_tx, get_rx) = smol::channel::unbounded();
205        let (put_tx, put_rx) = smol::channel::unbounded();
206        let (lookup_tx, lookup_rx) = smol::channel::unbounded();
207        let (verify_node_tx, verify_node_rx) = smol::channel::unbounded();
208        let fud = Arc::new(Self {
209            state: Arc::new(RwLock::new(None)),
210            geode,
211            downloads_path,
212            chunk_timeout: settings.chunk_timeout,
213            pow: Arc::new(RwLock::new(pow)),
214            dht: dht.clone(),
215            path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
216            file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
217            scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?,
218            resources: Arc::new(RwLock::new(HashMap::new())),
219            chunked_storages: Arc::new(RwLock::new(HashMap::new())),
220            get_tx,
221            get_rx,
222            put_tx,
223            put_rx,
224            lookup_tx,
225            lookup_rx,
226            verify_node_tx,
227            verify_node_rx,
228            fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
229            put_tasks: Arc::new(RwLock::new(HashMap::new())),
230            lookup_tasks: Arc::new(RwLock::new(HashMap::new())),
231            tasks: Arc::new(RwLock::new(HashMap::new())),
232            event_publisher,
233            p2p,
234            executor,
235        });
236        *dht.handler.write().await = Arc::downgrade(&fud);
237
238        Ok(fud)
239    }
240
241    /// Run the PoW and generate a `VerifiableNodeData`, then start tasks
242    pub async fn start(self: &Arc<Self>) -> Result<()> {
243        let mut pow = self.pow.write().await;
244        if pow.settings.read().await.btc_enabled {
245            pow.bitcoin_hash_cache.update().await?; // Fetch BTC block hashes
246        }
247        let (node_data, secret_key) = pow.generate_node().await?;
248        info!(target: "fud::init()", "Your node ID: {}", hash_to_string(&node_data.id()));
249        let mut state = self.state.write().await;
250        *state = Some(FudState { node_data, secret_key });
251        drop(state);
252        drop(pow);
253
254        self.start_tasks().await;
255
256        Ok(())
257    }
258
259    async fn start_tasks(self: &Arc<Self>) {
260        let mut tasks = self.tasks.write().await;
261        start_task!(self, "get", tasks::get_task, tasks);
262        start_task!(self, "put", tasks::put_task, tasks);
263        start_task!(self, "events", tasks::handle_dht_events, tasks);
264        start_task!(self, "DHT events", dht_tasks::events_task::<Fud>, tasks);
265        start_task!(self, "DHT channel", dht_tasks::channel_task::<Fud>, tasks);
266        start_task!(self, "DHT cleanup channels", dht_tasks::cleanup_channels_task::<Fud>, tasks);
267        start_task!(self, "DHT add node", dht_tasks::add_node_task::<Fud>, tasks);
268        start_task!(self, "DHT refinery", dht_tasks::dht_refinery_task::<Fud>, tasks);
269        start_task!(
270            self,
271            "DHT disconnect inbounds",
272            dht_tasks::disconnect_inbounds_task::<Fud>,
273            tasks
274        );
275        start_task!(self, "lookup", tasks::lookup_task, tasks);
276        start_task!(self, "verify node", tasks::verify_node_task, tasks);
277        start_task!(self, "announce", tasks::announce_seed_task, tasks);
278        start_task!(self, "node ID", tasks::node_id_task, tasks);
279    }
280
281    /// Verify our resources, add ourselves to the seeders (`dht.hash_table`)
282    /// for the resources we already have, announce our resources.
283    async fn init(&self) -> Result<()> {
284        info!(target: "fud::init()", "Finding resources...");
285        let mut resources_write = self.resources.write().await;
286        for result in self.path_tree.iter() {
287            if result.is_err() {
288                continue;
289            }
290
291            // Parse hash
292            let (hash, path) = result.unwrap();
293            let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
294                Ok(v) => v,
295                Err(_) => continue,
296            };
297            let hash = blake3::Hash::from_bytes(hash_bytes);
298
299            // Parse path
300            let path_bytes = path.to_vec();
301            let path_str = match std::str::from_utf8(&path_bytes) {
302                Ok(v) => v,
303                Err(_) => continue,
304            };
305            let path: PathBuf = match expand_path(path_str) {
306                Ok(v) => v,
307                Err(_) => continue,
308            };
309
310            // Get the file selection from sled, fallback on FileSelection::All
311            let mut file_selection = FileSelection::All;
312            if let Ok(Some(fs)) = self.file_selection_tree.get(hash.as_bytes()) {
313                if let Ok(path_list) = deserialize_async::<Vec<Vec<u8>>>(&fs).await {
314                    file_selection = FileSelection::Set(
315                        path_list
316                            .into_iter()
317                            .filter_map(|bytes| {
318                                std::str::from_utf8(&bytes)
319                                    .ok()
320                                    .and_then(|path_str| expand_path(path_str).ok())
321                            })
322                            .collect(),
323                    );
324                }
325            }
326
327            // Add resource
328            resources_write.insert(
329                hash,
330                Resource::new(
331                    hash,
332                    ResourceType::Unknown,
333                    &path,
334                    ResourceStatus::Incomplete(None),
335                    file_selection,
336                ),
337            );
338        }
339        drop(resources_write);
340
341        info!(target: "fud::init()", "Verifying resources...");
342        let resources = self.verify_resources(None).await?;
343
344        let self_node = self.node().await?;
345
346        // Stop here if we have no external address
347        if self_node.addresses.is_empty() {
348            return Ok(());
349        }
350
351        // Add our own node as a seeder for the resources we are seeding
352        for resource in &resources {
353            if let Ok(seeder) = self.new_seeder(&resource.hash).await {
354                let self_router_items = vec![seeder];
355                self.add_value(&resource.hash, &self_router_items).await;
356            }
357        }
358
359        info!(target: "fud::init()", "Announcing resources...");
360        for resource in resources {
361            if let Ok(seeder) = self.new_seeder(&resource.hash).await {
362                let seeders = vec![seeder];
363                let _ = self
364                    .dht
365                    .announce(
366                        &resource.hash,
367                        &seeders.clone(),
368                        &FudAnnounce { key: resource.hash, seeders },
369                    )
370                    .await;
371            }
372        }
373
374        Ok(())
375    }
376
377    /// Get a copy of the current resources
378    pub async fn resources(&self) -> HashMap<blake3::Hash, Resource> {
379        let resources = self.resources.read().await;
380        resources.clone()
381    }
382
383    /// Get resource path from hash using the sled db
384    pub fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
385        if let Some(value) = self.path_tree.get(hash.as_bytes())? {
386            let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
387            return Ok(Some(path));
388        }
389
390        Ok(None)
391    }
392
393    /// Get resource hash from path using the sled db
394    pub fn path_to_hash(&self, path: &Path) -> Result<Option<blake3::Hash>> {
395        let path_string = path.to_string_lossy().to_string();
396        let path_bytes = path_string.as_bytes();
397        for path_item in self.path_tree.iter() {
398            let (key, value) = path_item?;
399            if value == path_bytes {
400                let bytes: &[u8] = &key;
401                if bytes.len() != 32 {
402                    return Err(Error::Custom(format!(
403                        "Expected a 32-byte BLAKE3, got {} bytes",
404                        bytes.len()
405                    )));
406                }
407
408                let array: [u8; 32] = bytes.try_into().unwrap();
409                return Ok(Some(array.into()))
410            }
411        }
412
413        Ok(None)
414    }
415
416    /// Create a new [`dht::FudSeeder`] for own node
417    pub async fn new_seeder(&self, key: &blake3::Hash) -> Result<FudSeeder> {
418        let state = self.state.read().await;
419        if state.is_none() {
420            return Err(Error::Custom("Fud is not ready yet".to_string()))
421        }
422        let state_ = state.clone().unwrap();
423        drop(state);
424        let node = self.node().await?;
425
426        Ok(FudSeeder {
427            key: *key,
428            node: node.clone(),
429            sig: state_
430                .secret_key
431                .sign(&[key.as_bytes().to_vec(), serialize_async(&node).await].concat()),
432            timestamp: Timestamp::current_time().inner(),
433        })
434    }
435
436    /// Verify if resources are complete and uncorrupted.
437    /// If a resource is incomplete or corrupted, its status is changed to Incomplete.
438    /// If a resource is complete, its status is changed to Seeding.
439    /// Takes an optional list of resource hashes.
440    /// If no hash is given (None), it verifies all resources.
441    /// Returns the list of verified and uncorrupted/complete seeding resources.
442    pub async fn verify_resources(
443        &self,
444        hashes: Option<Vec<blake3::Hash>>,
445    ) -> Result<Vec<Resource>> {
446        let mut resources_write = self.resources.write().await;
447
448        let update_resource = async |resource: &mut Resource,
449                                     status: ResourceStatus,
450                                     chunked: Option<&ChunkedStorage>,
451                                     total_bytes_downloaded: u64,
452                                     target_bytes_downloaded: u64| {
453            let files = match chunked {
454                Some(chunked) => resource.get_selected_files(chunked, &resource.file_selection),
455                None => vec![],
456            };
457            let chunk_hashes = match chunked {
458                Some(chunked) => resource.get_selected_chunks(chunked),
459                None => HashSet::new(),
460            };
461
462            if let Some(chunked) = chunked {
463                resource.rtype = match chunked.is_dir() {
464                    false => ResourceType::File,
465                    true => ResourceType::Directory,
466                };
467            }
468
469            resource.status = status;
470            resource.total_chunks_count = match chunked {
471                Some(chunked) => chunked.len() as u64,
472                None => 0,
473            };
474            resource.target_chunks_count = chunk_hashes.len() as u64;
475            resource.total_chunks_downloaded = match chunked {
476                Some(chunked) => chunked.local_chunks() as u64,
477                None => 0,
478            };
479            resource.target_chunks_downloaded = match chunked {
480                Some(chunked) => chunked
481                    .iter()
482                    .filter(|chunk| chunk_hashes.contains(&chunk.hash) && chunk.available)
483                    .count() as u64,
484                None => 0,
485            };
486
487            resource.total_bytes_size = match chunked {
488                Some(chunked) => chunked.get_fileseq().len(),
489                None => 0,
490            };
491            resource.target_bytes_size = match chunked {
492                Some(chunked) => chunked
493                    .get_files()
494                    .iter()
495                    .filter(|(path, _)| files.contains(path))
496                    .map(|(_, size)| size)
497                    .sum(),
498                None => 0,
499            };
500
501            resource.total_bytes_downloaded = total_bytes_downloaded;
502            resource.target_bytes_downloaded = target_bytes_downloaded;
503
504            notify_event!(self, ResourceUpdated, resource);
505        };
506
507        let mut seeding_resources: Vec<Resource> = vec![];
508        for (_, mut resource) in resources_write.iter_mut() {
509            if let Some(ref hashes_list) = hashes {
510                if !hashes_list.contains(&resource.hash) {
511                    continue;
512                }
513            }
514
515            match resource.status {
516                ResourceStatus::Seeding => {}
517                ResourceStatus::Incomplete(_) => {}
518                _ => continue,
519            };
520
521            // Make sure the resource is not corrupted or incomplete
522            let resource_path = match self.hash_to_path(&resource.hash) {
523                Ok(Some(v)) => v,
524                Ok(None) | Err(_) => {
525                    update_resource(&mut resource, ResourceStatus::Incomplete(None), None, 0, 0)
526                        .await;
527                    continue;
528                }
529            };
530            let mut chunked = match self.geode.get(&resource.hash, &resource_path).await {
531                Ok(v) => v,
532                Err(_) => {
533                    update_resource(&mut resource, ResourceStatus::Incomplete(None), None, 0, 0)
534                        .await;
535                    continue;
536                }
537            };
538            let verify_res =
539                self.verify_chunks(resource, &mut chunked, &resource.file_selection).await;
540            if let Err(e) = verify_res {
541                error!(target: "fud::verify_resources()", "Error while verifying chunks of {}: {e}", hash_to_string(&resource.hash));
542                update_resource(&mut resource, ResourceStatus::Incomplete(None), None, 0, 0).await;
543                continue;
544            }
545            let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
546
547            let mut chunked_storages = self.chunked_storages.write().await;
548            chunked_storages.insert(resource.hash, chunked.clone());
549            drop(chunked_storages);
550
551            if !chunked.is_complete() {
552                update_resource(
553                    &mut resource,
554                    ResourceStatus::Incomplete(None),
555                    Some(&chunked),
556                    total_bytes_downloaded,
557                    target_bytes_downloaded,
558                )
559                .await;
560                continue;
561            }
562
563            update_resource(
564                &mut resource,
565                ResourceStatus::Seeding,
566                Some(&chunked),
567                total_bytes_downloaded,
568                target_bytes_downloaded,
569            )
570            .await;
571            seeding_resources.push(resource.clone());
572        }
573
574        Ok(seeding_resources)
575    }
576
577    /// Start downloading a file or directory from the network to `path`.
578    /// This creates a new task in `fetch_tasks` calling `fetch_resource()`.
579    pub async fn get(&self, hash: &blake3::Hash, path: &Path, files: FileSelection) -> Result<()> {
580        let fetch_tasks = self.fetch_tasks.read().await;
581        if fetch_tasks.contains_key(hash) {
582            return Err(Error::Custom(format!(
583                "Resource {} is already being downloaded",
584                hash_to_string(hash)
585            )))
586        }
587        drop(fetch_tasks);
588
589        self.get_tx.send((*hash, path.to_path_buf(), files)).await?;
590
591        Ok(())
592    }
593
594    /// Try to get the chunked file or directory from geode, if we don't have it
595    /// then it is fetched from the network using `fetch_metadata()`.
596    /// If we need to fetch from the network, the seeders we find are sent to
597    /// `seeders_pub`.
598    /// The seeder in the returned result is only defined if we fetched from
599    /// the network.
600    pub async fn get_metadata(
601        &self,
602        hash: &blake3::Hash,
603        path: &Path,
604    ) -> Result<(ChunkedStorage, Option<FudSeeder>)> {
605        match self.geode.get(hash, path).await {
606            // We already know the metadata
607            Ok(v) => Ok((v, None)),
608            // The metadata in geode is invalid or corrupted
609            Err(Error::GeodeNeedsGc) => todo!(),
610            // If we could not find the metadata in geode, get it from the network
611            Err(Error::GeodeFileNotFound) => {
612                // Find nodes close to the file hash
613                info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash));
614                let dht_sub = self.dht.subscribe().await;
615                if let Err(e) = self.lookup_tx.send(*hash).await {
616                    dht_sub.unsubscribe().await;
617                    return Err(e.into())
618                }
619
620                // Fetch resource metadata
621                let fetch_res = fetch_metadata(self, hash, path, &dht_sub).await;
622                dht_sub.unsubscribe().await;
623                let seeder = fetch_res?;
624                Ok((self.geode.get(hash, path).await?, Some(seeder)))
625            }
626            Err(e) => Err(e),
627        }
628    }
629
630    /// Returns true if all the files in the `FileSelection` are locally available
631    pub async fn get_progress(
632        &self,
633        hash: &blake3::Hash,
634        file_selection: &FileSelection,
635    ) -> (u64, u64) {
636        let mut resources = self.resources.write().await;
637        let resource = resources.get_mut(hash);
638        if resource.is_none() {
639            return (0, 0)
640        }
641        let resource = resource.unwrap();
642        match self.hash_to_path(&resource.hash) {
643            Ok(Some(_)) => {}
644            Ok(None) | Err(_) => {
645                resource.status = ResourceStatus::Incomplete(None);
646                resource.total_bytes_downloaded = 0;
647                resource.target_bytes_downloaded = 0;
648                notify_event!(self, ResourceUpdated, resource);
649                return (0, 0)
650            }
651        }
652        let mut chunked_storages = self.chunked_storages.write().await;
653        let Some(chunked) = chunked_storages.get_mut(&resource.hash) else { return (0, 0) };
654
655        let files_vec: Vec<PathBuf> = resource.get_selected_files(chunked, file_selection);
656
657        let bytes_downloaded = {
658            let mut bytes = 0;
659            for chunk in chunked.iter() {
660                if chunk.available {
661                    bytes += resource.get_bytes_of_selection(
662                        chunked,
663                        file_selection,
664                        &chunk.hash,
665                        chunk.size,
666                    ) as u64;
667                }
668            }
669            bytes
670        };
671        let bytes_total = chunked.get_fileseq().subset_len(files_vec.into_iter().collect());
672
673        (bytes_downloaded, bytes_total)
674    }
675
676    /// Download a file or directory from the network to `path`.
677    /// Called when `get()` creates a new fetch task.
678    pub async fn fetch_resource(
679        &self,
680        hash: &blake3::Hash,
681        path: &Path,
682        files: &FileSelection,
683    ) -> Result<()> {
684        let hash_bytes = hash.as_bytes();
685        let path_string = path.to_string_lossy().to_string();
686        let path_bytes = path_string.as_bytes();
687
688        // Macro that acquires a write lock on `self.resources`, updates a
689        // resource, and returns the resource (dropping the write lock)
690        macro_rules! update_resource {
691            ($hash:ident, { $($field:ident = $value:expr $(,)?)* }) => {{
692                let mut resources_write = self.resources.write().await;
693                let resource = match resources_write.get_mut($hash) {
694                    Some(resource) => {
695                        $(resource.$field = $value;)* // Apply the field assignments
696                        resource.clone()
697                    }
698                    None => return Ok(()), // Resource was removed, abort
699                };
700                resource
701            }};
702        }
703
704        // Make sure we don't already have another resource on that path
705        if let Ok(Some(hash_found)) = self.path_to_hash(path) {
706            if *hash != hash_found {
707                return Err(Error::Custom(format!(
708                    "There is already another resource on path {path_string}"
709                )))
710            }
711        }
712
713        // Add path to the sled db
714        self.path_tree.insert(hash_bytes, path_bytes)?;
715
716        let mut resources_write = self.resources.write().await;
717        let merged_files = if let Some(old_resource) = resources_write.get(hash) {
718            old_resource.file_selection.merge(files)
719        } else {
720            files.clone()
721        };
722
723        // Add merged file selection to the sled db
724        if let FileSelection::Set(selected_files) = &merged_files {
725            let paths: Vec<Vec<u8>> = selected_files
726                .iter()
727                .map(|f| f.to_string_lossy().to_string().as_bytes().to_vec())
728                .collect();
729            let serialized_paths = serialize_async(&paths).await;
730            // Abort if the file selection cannot be inserted into sled
731            if let Err(e) = self.file_selection_tree.insert(hash_bytes, serialized_paths) {
732                return Err(Error::SledError(e))
733            }
734        } else {
735            // Abort if the file selection cannot be removed from sled
736            if let Err(e) = self.file_selection_tree.remove(hash_bytes) {
737                return Err(Error::SledError(e))
738            }
739        }
740
741        // Add resource to `self.resources`
742        let mut resource = Resource::new(
743            *hash,
744            ResourceType::Unknown,
745            path,
746            ResourceStatus::Discovering,
747            merged_files.clone(),
748        );
749        resource.last_file_selection = files.clone();
750        resources_write.insert(*hash, resource.clone());
751        drop(resources_write);
752
753        // Subscribe to DHT events early for `fetch_chunks()`
754        let dht_sub = self.dht.subscribe().await;
755
756        // Send a DownloadStarted event
757        notify_event!(self, DownloadStarted, resource);
758
759        // Try to get the chunked file or directory from geode
760        let metadata_result = self.get_metadata(hash, path).await;
761
762        if let Err(e) = metadata_result {
763            // Set resource status to `Incomplete` and send a `MetadataNotFound` event
764            let resource = update_resource!(hash, {
765                status = ResourceStatus::Incomplete(Some("Metadata not found".to_string()))
766            });
767            notify_event!(self, MetadataNotFound, resource);
768            dht_sub.unsubscribe().await;
769            return Err(e)
770        }
771        let (mut chunked, metadata_seeder) = metadata_result.unwrap();
772
773        // Get a list of all file paths the user wants to fetch
774        let resources_read = self.resources.read().await;
775        let resource = match resources_read.get(hash) {
776            Some(resource) => resource,
777            None => {
778                // Resource was removed, abort
779                dht_sub.unsubscribe().await;
780                return Ok(())
781            }
782        };
783        let files_vec: Vec<PathBuf> = resource.get_selected_files(&chunked, files);
784        drop(resources_read);
785
786        // Create all files (and all necessary directories)
787        if let Err(e) = create_all_files(&files_vec).await {
788            dht_sub.unsubscribe().await;
789            return Err(e)
790        }
791
792        // Set resource status to `Verifying` and send a `MetadataDownloadCompleted` event
793        let resource = update_resource!(hash, {
794            status = ResourceStatus::Verifying,
795            total_chunks_count = chunked.len() as u64,
796            total_bytes_size = chunked.get_fileseq().len(),
797            rtype = match chunked.is_dir() {
798                false => ResourceType::File,
799                true => ResourceType::Directory,
800            },
801        });
802        notify_event!(self, MetadataDownloadCompleted, resource);
803
804        // Set of all chunks we need locally (including the ones we already have)
805        let chunk_hashes = resource.get_chunks_of_selection(&chunked, files);
806
807        // Write all scraps to make sure the data on the filesystem is correct
808        if let Err(e) = self.write_scraps(&mut chunked, &chunk_hashes).await {
809            dht_sub.unsubscribe().await;
810            return Err(e)
811        }
812
813        // Mark locally available chunks as such
814        let verify_res = self.verify_chunks(&resource, &mut chunked, files).await;
815
816        // Insert the chunked storage to fud's cache
817        let mut chunked_storages = self.chunked_storages.write().await;
818        chunked_storages.insert(*hash, chunked.clone());
819        drop(chunked_storages);
820
821        if let Err(e) = verify_res {
822            dht_sub.unsubscribe().await;
823            error!(target: "fud::fetch_resource()", "Error while verifying chunks: {e}");
824            return Err(e);
825        }
826        let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
827
828        // Update `total_bytes_size` if the resource is a file
829        if let ResourceType::File = resource.rtype {
830            update_resource!(hash, { total_bytes_size = chunked.get_fileseq().len() });
831            notify_event!(self, ResourceUpdated, resource);
832        }
833
834        // If `chunked` is a file that is bigger than the all its chunks,
835        // truncate the file to the chunks.
836        // This fixes two edge-cases: a file that exactly ends at the end of
837        // a chunk, and a file with no chunk.
838        if !chunked.is_dir() {
839            let fs_metadata = fs::metadata(&path).await;
840            if let Err(e) = fs_metadata {
841                dht_sub.unsubscribe().await;
842                return Err(e.into());
843            }
844            if fs_metadata.unwrap().len() > (chunked.len() * MAX_CHUNK_SIZE) as u64 {
845                if let Ok(file) = OpenOptions::new().write(true).create(true).open(path).await {
846                    let _ = file.set_len((chunked.len() * MAX_CHUNK_SIZE) as u64).await;
847                }
848            }
849        }
850
851        // Set of all chunks we need locally and their current availability
852        let chunks: HashSet<Chunk> =
853            chunked.iter().filter(|c| chunk_hashes.contains(&c.hash)).cloned().collect();
854
855        // Set of the chunks we need to download
856        let mut missing_chunks: HashSet<blake3::Hash> =
857            chunks.iter().filter(|&c| !c.available).map(|c| c.hash).collect();
858
859        // Update the resource with the chunks/bytes counts
860        update_resource!(hash, {
861            target_chunks_count = chunks.len() as u64,
862            total_chunks_downloaded = chunked.local_chunks() as u64,
863            target_chunks_downloaded = (chunks.len() - missing_chunks.len()) as u64,
864
865            target_bytes_size =
866                chunked.get_fileseq().subset_len(files_vec.into_iter().collect()),
867            total_bytes_downloaded = total_bytes_downloaded,
868            target_bytes_downloaded = target_bytes_downloaded,
869        });
870
871        let download_completed = async |chunked: &ChunkedStorage| -> Result<()> {
872            // Set resource status to `Seeding` or `Incomplete`
873            let resource = update_resource!(hash, {
874                status = match chunked.is_complete() {
875                    true => ResourceStatus::Seeding,
876                    false => ResourceStatus::Incomplete(None),
877                },
878                target_chunks_downloaded = chunks.len() as u64,
879                total_chunks_downloaded = chunked.local_chunks() as u64,
880            });
881
882            // Announce the resource if we have all chunks
883            if chunked.is_complete() {
884                if let Ok(seeder) = self.new_seeder(hash).await {
885                    let seeders = vec![seeder];
886                    let self_announce = FudAnnounce { key: *hash, seeders: seeders.clone() };
887                    let _ = self.dht.announce(hash, &seeders, &self_announce).await;
888                }
889            }
890
891            // Send a DownloadCompleted event
892            notify_event!(self, DownloadCompleted, resource);
893
894            Ok(())
895        };
896
897        // If we don't need to download any chunk
898        if missing_chunks.is_empty() {
899            dht_sub.unsubscribe().await;
900            return download_completed(&chunked).await;
901        }
902
903        // Set resource status to `Downloading` and send a MetadataDownloadCompleted event
904        let resource = update_resource!(hash, {
905            status = ResourceStatus::Downloading,
906        });
907        notify_event!(self, MetadataDownloadCompleted, resource);
908
909        // Start looking up seeders if we did not need to do it for the metadata
910        if metadata_seeder.is_none() {
911            if let Err(e) = self.lookup_tx.send(*hash).await {
912                dht_sub.unsubscribe().await;
913                return Err(e.into())
914            }
915        }
916
917        // Fetch missing chunks from seeders
918        let _ =
919            fetch_chunks(self, hash, &mut chunked, &dht_sub, metadata_seeder, &mut missing_chunks)
920                .await;
921
922        // We don't need the DHT events sub anymore
923        dht_sub.unsubscribe().await;
924
925        // Get chunked file from geode
926        let mut chunked = self.geode.get(hash, path).await?;
927
928        // Set resource status to `Verifying` and send FudEvent::ResourceUpdated
929        let resource = update_resource!(hash, { status = ResourceStatus::Verifying });
930        notify_event!(self, ResourceUpdated, resource);
931
932        // Verify all chunks
933        self.verify_chunks(&resource, &mut chunked, &resource.last_file_selection).await?;
934
935        let is_complete =
936            chunked.iter().filter(|c| chunk_hashes.contains(&c.hash)).all(|c| c.available);
937
938        // We fetched all chunks, but the resource is not complete
939        // (some chunks were missing from all seeders)
940        if !is_complete {
941            // Set resource status to `Incomplete`
942            let resource = update_resource!(hash, {
943                status = ResourceStatus::Incomplete(Some("Missing chunks".to_string()))
944            });
945
946            // Send a MissingChunks event
947            notify_event!(self, MissingChunks, resource);
948
949            return Ok(());
950        }
951
952        download_completed(&chunked).await
953    }
954
955    async fn write_scraps(
956        &self,
957        chunked: &mut ChunkedStorage,
958        chunk_hashes: &HashSet<blake3::Hash>,
959    ) -> Result<()> {
960        // Get all scraps
961        let mut scraps = HashMap::new();
962        // TODO: This can be improved to not loop over all chunks
963        for chunk_hash in chunk_hashes {
964            let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
965            if scrap.is_none() {
966                continue;
967            }
968
969            // Verify the scrap we found
970            let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
971            if scrap.is_err() {
972                continue;
973            }
974            let scrap: Scrap = scrap.unwrap();
975
976            // Add the scrap to the HashMap
977            scraps.insert(chunk_hash, scrap);
978        }
979
980        // Write all scraps
981        if !scraps.is_empty() {
982            info!(target: "fud::write_scraps()", "Writing {} scraps...", scraps.len());
983        }
984        for (scrap_hash, mut scrap) in scraps {
985            let len = scrap.chunk.len();
986            let write_res = self.geode.write_chunk(chunked, scrap.chunk.clone()).await;
987            if let Err(e) = write_res {
988                error!(target: "fud::write_scraps()", "Error rewriting scrap {}: {e}", hash_to_string(scrap_hash));
989                continue;
990            }
991            let (_, chunk_bytes_written) = write_res.unwrap();
992
993            // If the whole scrap was written, we can remove it from sled
994            if chunk_bytes_written == len {
995                self.scrap_tree.remove(scrap_hash.as_bytes())?;
996                continue;
997            }
998            // Otherwise update the scrap in sled
999            let chunk_res = self.geode.get_chunk(chunked, scrap_hash).await;
1000            if let Err(e) = chunk_res {
1001                error!(target: "fud::write_scraps()", "Failed to get scrap {}: {e}", hash_to_string(scrap_hash));
1002                continue;
1003            }
1004            scrap.hash_written = blake3::hash(&chunk_res.unwrap());
1005            if let Err(e) =
1006                self.scrap_tree.insert(scrap_hash.as_bytes(), serialize_async(&scrap).await)
1007            {
1008                error!(target: "fud::write_scraps()", "Failed to save chunk {} as a scrap after rewrite: {e}", hash_to_string(scrap_hash));
1009            }
1010        }
1011
1012        Ok(())
1013    }
1014
1015    /// Iterate over chunks and find which chunks are available locally,
1016    /// either in the filesystem (using geode::verify_chunks()) or in scraps.
1017    /// Return the size in bytes of locally available data (downloaded and
1018    /// downloaded+targeted).
1019    pub async fn verify_chunks(
1020        &self,
1021        resource: &Resource,
1022        chunked: &mut ChunkedStorage,
1023        file_selection: &FileSelection,
1024    ) -> Result<(u64, u64)> {
1025        let chunks = chunked.get_chunks().clone();
1026        let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
1027
1028        // Gather all available chunks
1029        for (chunk_index, chunk) in chunks.iter().enumerate() {
1030            // Read the chunk using the `FileSequence`
1031            let chunk_data =
1032                match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
1033                    Ok(c) => c,
1034                    Err(Error::Io(ErrorKind::NotFound)) => continue,
1035                    Err(e) => {
1036                        warn!(target: "fud::verify_chunks()", "Error while verifying chunks: {e}");
1037                        break
1038                    }
1039                };
1040
1041            // Perform chunk consistency check
1042            if self.geode.verify_chunk(&chunk.hash, &chunk_data) {
1043                chunked.get_chunk_mut(chunk_index).available = true;
1044                chunked.get_chunk_mut(chunk_index).size = chunk_data.len();
1045                bytes.insert(
1046                    chunk.hash,
1047                    (
1048                        chunk_data.len(),
1049                        resource.get_bytes_of_selection(
1050                            chunked,
1051                            file_selection,
1052                            &chunk.hash,
1053                            chunk_data.len(),
1054                        ),
1055                    ),
1056                );
1057            } else {
1058                chunked.get_chunk_mut(chunk_index).available = false;
1059            }
1060        }
1061
1062        // Look for the chunks that are not on the filesystem
1063        let chunks = chunked.get_chunks().clone();
1064        let missing_on_fs: Vec<_> =
1065            chunks.iter().enumerate().filter(|(_, c)| !c.available).collect();
1066
1067        // Look for scraps
1068        for (chunk_index, chunk) in missing_on_fs {
1069            let scrap = self.scrap_tree.get(chunk.hash.as_bytes())?;
1070            if scrap.is_none() {
1071                continue;
1072            }
1073
1074            // Verify the scrap we found
1075            let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
1076            if scrap.is_err() {
1077                continue;
1078            }
1079            let scrap: Scrap = scrap.unwrap();
1080            if blake3::hash(&scrap.chunk) != chunk.hash {
1081                continue;
1082            }
1083
1084            // Check if the scrap is still written on the filesystem
1085            let scrap_chunk =
1086                self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await;
1087            if scrap_chunk.is_err() {
1088                continue;
1089            }
1090            let scrap_chunk = scrap_chunk.unwrap();
1091
1092            // The scrap is not available if the chunk on the disk changed
1093            if !self.geode.verify_chunk(&scrap.hash_written, &scrap_chunk) {
1094                continue;
1095            }
1096
1097            // Mark the chunk as available
1098            chunked.get_chunk_mut(chunk_index).available = true;
1099            chunked.get_chunk_mut(chunk_index).size = scrap.chunk.len();
1100
1101            // Update the sums of locally available data
1102            bytes.insert(
1103                chunk.hash,
1104                (
1105                    scrap.chunk.len(),
1106                    resource.get_bytes_of_selection(
1107                        chunked,
1108                        file_selection,
1109                        &chunk.hash,
1110                        scrap.chunk.len(),
1111                    ),
1112                ),
1113            );
1114        }
1115
1116        // If the resource is a file: make the `FileSequence`'s file the
1117        // exact file size if we know the last chunk's size. This is not
1118        // needed for directories.
1119        let is_dir = chunked.is_dir();
1120        if let Some(last_chunk) = chunked.iter_mut().last() {
1121            if !is_dir && last_chunk.available {
1122                if let Some((last_chunk_size, _)) = bytes.get(&last_chunk.hash) {
1123                    last_chunk.size = *last_chunk_size;
1124                    let exact_file_size =
1125                        chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
1126                    chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
1127                }
1128            }
1129        }
1130
1131        let total_bytes_downloaded = bytes.iter().map(|(_, (b, _))| b).sum::<usize>() as u64;
1132        let target_bytes_downloaded = bytes.iter().map(|(_, (_, b))| b).sum::<usize>() as u64;
1133
1134        Ok((total_bytes_downloaded, target_bytes_downloaded))
1135    }
1136
1137    /// Add a resource from the file system.
1138    pub async fn put(&self, path: &Path) -> Result<()> {
1139        let put_tasks = self.put_tasks.read().await;
1140        drop(put_tasks);
1141
1142        self.put_tx.send(path.to_path_buf()).await?;
1143
1144        Ok(())
1145    }
1146
1147    /// Insert a file or directory from the file system.
1148    /// Called when `put()` creates a new put task.
1149    pub async fn insert_resource(&self, path: &PathBuf) -> Result<()> {
1150        let self_node = self.node().await?;
1151
1152        if self_node.addresses.is_empty() {
1153            return Err(Error::Custom(
1154                "Cannot put resource, you don't have any external address".to_string(),
1155            ))
1156        }
1157
1158        let metadata = fs::metadata(path).await?;
1159
1160        // Get the list of files and the resource type (file or directory)
1161        let (files, resource_type) = if metadata.is_file() {
1162            (vec![(path.clone(), metadata.len())], ResourceType::File)
1163        } else if metadata.is_dir() {
1164            let mut files = get_all_files(path).await?;
1165            self.geode.sort_files(&mut files);
1166            (files, ResourceType::Directory)
1167        } else {
1168            return Err(Error::Custom(format!("{} is not a valid path", path.to_string_lossy())))
1169        };
1170
1171        // Read the file or directory and create the chunks
1172        let stream = FileSequence::new(&files, false);
1173        let total_size = stream.len();
1174        let (mut hasher, chunk_hashes) = self.geode.chunk_stream(stream).await?;
1175
1176        // Get the relative file paths included in the metadata and hash of directories
1177        let relative_files = if let ResourceType::Directory = resource_type {
1178            // [(absolute file path, file size)] -> [(relative file path, file size)]
1179            let relative_files = files
1180                .into_iter()
1181                .map(|(file_path, size)| match file_path.strip_prefix(path) {
1182                    Ok(rel_path) => Ok((rel_path.to_path_buf(), size)),
1183                    Err(_) => Err(Error::Custom("Invalid file path".to_string())),
1184                })
1185                .collect::<Result<Vec<_>>>()?;
1186
1187            // Add the files metadata to the hasher to complete the resource hash
1188            self.geode.hash_files_metadata(&mut hasher, &relative_files);
1189
1190            relative_files
1191        } else {
1192            vec![]
1193        };
1194
1195        // Finalize the resource hash
1196        let hash = hasher.finalize();
1197
1198        // Create the metadata file in geode
1199        if let Err(e) = self.geode.insert_metadata(&hash, &chunk_hashes, &relative_files).await {
1200            error!(target: "fud::put()", "Failed inserting {path:?} to geode: {e}");
1201            return Err(e)
1202        }
1203
1204        // Add path to the sled db
1205        if let Err(e) =
1206            self.path_tree.insert(hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
1207        {
1208            error!(target: "fud::put()", "Failed inserting new resource into sled: {e}");
1209            return Err(e.into())
1210        }
1211
1212        // Add resource
1213        let mut resources_write = self.resources.write().await;
1214        resources_write.insert(
1215            hash,
1216            Resource {
1217                hash,
1218                rtype: resource_type,
1219                path: path.to_path_buf(),
1220                status: ResourceStatus::Seeding,
1221                file_selection: FileSelection::All,
1222                last_file_selection: FileSelection::All,
1223                total_chunks_count: chunk_hashes.len() as u64,
1224                target_chunks_count: chunk_hashes.len() as u64,
1225                total_chunks_downloaded: chunk_hashes.len() as u64,
1226                target_chunks_downloaded: chunk_hashes.len() as u64,
1227                total_bytes_size: total_size,
1228                target_bytes_size: total_size,
1229                total_bytes_downloaded: total_size,
1230                target_bytes_downloaded: total_size,
1231                speeds: vec![],
1232            },
1233        );
1234        drop(resources_write);
1235
1236        // Announce the new resource
1237        if let Ok(seeder) = self.new_seeder(&hash).await {
1238            let seeders = vec![seeder];
1239            let fud_announce = FudAnnounce { key: hash, seeders: seeders.clone() };
1240            let _ = self.dht.announce(&hash, &seeders, &fud_announce).await;
1241        }
1242
1243        // Send InsertCompleted event
1244        notify_event!(self, InsertCompleted, {
1245            hash,
1246            path: path.to_path_buf()
1247        });
1248
1249        Ok(())
1250    }
1251
1252    /// Removes:
1253    /// - a resource
1254    /// - its metadata in geode
1255    /// - its path in the sled path tree
1256    /// - its file selection in the sled file selection tree
1257    /// - and any related scrap in the sled scrap tree,
1258    ///
1259    /// then sends a `ResourceRemoved` fud event.
1260    pub async fn remove(&self, hash: &blake3::Hash) {
1261        // Remove the resource
1262        let mut resources_write = self.resources.write().await;
1263        resources_write.remove(hash);
1264        drop(resources_write);
1265
1266        // Remove the scraps in sled
1267        if let Ok(Some(path)) = self.hash_to_path(hash) {
1268            let chunked = self.geode.get(hash, &path).await;
1269
1270            if let Ok(chunked) = chunked {
1271                for chunk in chunked.iter() {
1272                    let _ = self.scrap_tree.remove(chunk.hash.as_bytes());
1273                }
1274            }
1275        }
1276
1277        // Remove the metadata in geode
1278        let hash_str = hash_to_string(hash);
1279        let _ = fs::remove_file(self.geode.files_path.join(&hash_str)).await;
1280        let _ = fs::remove_file(self.geode.dirs_path.join(&hash_str)).await;
1281
1282        // Remove the path in sled
1283        let _ = self.path_tree.remove(hash.as_bytes());
1284
1285        // Remove the file selection in sled
1286        let _ = self.file_selection_tree.remove(hash.as_bytes());
1287
1288        // Send a `ResourceRemoved` event
1289        notify_event!(self, ResourceRemoved, { hash: *hash });
1290    }
1291
1292    /// Remove seeders that are older than `expiry_secs`
1293    pub async fn prune_seeders(&self, expiry_secs: u32) {
1294        let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
1295        let mut seeders_write = self.dht.hash_table.write().await;
1296
1297        let keys: Vec<_> = seeders_write.keys().cloned().collect();
1298
1299        for key in keys {
1300            let items = seeders_write.get_mut(&key).unwrap();
1301            items.retain(|item| item.timestamp > expiry_timestamp);
1302            if items.is_empty() {
1303                seeders_write.remove(&key);
1304            }
1305        }
1306    }
1307
1308    /// Stop all tasks.
1309    pub async fn stop(&self) {
1310        info!("Stopping fetch tasks...");
1311        // Create a clone of fetch_tasks because `task.stop()` needs a write lock
1312        let fetch_tasks = self.fetch_tasks.read().await;
1313        let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1314            fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1315        drop(fetch_tasks);
1316
1317        for task in cloned_fetch_tasks.values() {
1318            task.stop().await;
1319        }
1320
1321        info!("Stopping put tasks...");
1322        let put_tasks = self.put_tasks.read().await;
1323        let cloned_put_tasks: HashMap<PathBuf, Arc<StoppableTask>> =
1324            put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect();
1325        drop(put_tasks);
1326
1327        for task in cloned_put_tasks.values() {
1328            task.stop().await;
1329        }
1330
1331        info!("Stopping lookup tasks...");
1332        let lookup_tasks = self.lookup_tasks.read().await;
1333        let cloned_lookup_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1334            lookup_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1335        drop(lookup_tasks);
1336
1337        for task in cloned_lookup_tasks.values() {
1338            task.stop().await;
1339        }
1340
1341        // Stop all other tasks
1342        let mut tasks = self.tasks.write().await;
1343        for (name, task) in tasks.clone() {
1344            info!("Stopping {name} task...");
1345            task.stop().await;
1346        }
1347        *tasks = HashMap::new();
1348    }
1349}