fud/
download.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::HashSet,
21    path::{Path, PathBuf},
22    time::Instant,
23};
24
25use futures::{future::FutureExt, pin_mut, select};
26use rand::{
27    prelude::{IteratorRandom, SliceRandom},
28    rngs::OsRng,
29};
30use tracing::{error, info, warn};
31
32use darkfi::{
33    dht::{event::DhtEvent, DhtHandler, DhtNode},
34    geode::{hash_to_string, ChunkedStorage},
35    net::ChannelPtr,
36    system::Subscription,
37    Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::{
42    event::{self, notify_event, FudEvent},
43    proto::{
44        FudChunkNotFound, FudChunkReply, FudChunkRequest, FudDirectoryReply, FudFileReply,
45        FudMetadataNotFound, FudMetadataRequest,
46    },
47    util::{create_all_files, receive_resource_msg},
48    Fud, FudSeeder, ResourceStatus, ResourceType, Scrap,
49};
50
51type FudDhtEvent = DhtEvent<<Fud as DhtHandler>::Node, <Fud as DhtHandler>::Value>;
52
53/// Receive seeders from a DHT events subscription, and execute an async
54/// expression for each deduplicated seeder once (seeder order is random).
55/// It will keep going until the expression returns `Ok(())`, or there are
56/// no more seeders.
57/// It has an optional `favored_seeder` argument that will be tried first if
58/// specified.
59macro_rules! seeders_loop {
60    ($key:expr, $fud:expr, $dht_sub:expr, $favored_seeder:expr, $code:expr) => {
61        let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
62        let mut is_done = false;
63
64        // Try favored seeder
65        let favored_seeder: Option<FudSeeder> = $favored_seeder;
66        if let Some(seeder) = favored_seeder {
67            queried_seeders.insert(seeder.node.id());
68            if $code(seeder).await.is_ok() {
69                is_done = true;
70            }
71        }
72
73        // Try other seeders using the DHT subscription
74        while !is_done {
75            let event = $dht_sub.receive().await;
76            if event.key() != Some($key) {
77                continue // Ignore this event if it's not about the right key
78            }
79            if let DhtEvent::ValueLookupCompleted { .. } = event {
80                break // Lookup is done
81            }
82            if !matches!(event, DhtEvent::ValueFound { .. }) {
83                continue // Ignore this event as it's not a ValueFound
84            }
85            let seeders = event.into_value().unwrap();
86            let mut shuffled_seeders = {
87                let mut vec: Vec<_> = seeders.iter().cloned().collect();
88                vec.shuffle(&mut OsRng);
89                vec
90            };
91            // Loop over seeders
92            while let Some(seeder) = shuffled_seeders.pop() {
93                // Only use a seeder once
94                if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
95                    continue
96                }
97                queried_seeders.insert(seeder.node.id());
98
99                if $code(seeder).await.is_err() {
100                    continue
101                }
102
103                is_done = true;
104                break
105            }
106        }
107    };
108    ($key:expr, $fud:expr, $dht_sub:expr, $code:expr) => {
109        seeders_loop!($key, $fud, $dht_sub, None, $code)
110    };
111}
112
113enum ChunkFetchControl {
114    NextChunk,
115    NextSeeder,
116    Abort,
117}
118
119struct ChunkFetchContext<'a> {
120    fud: &'a Fud,
121    hash: &'a blake3::Hash,
122    chunked: &'a mut ChunkedStorage,
123    chunks: &'a mut HashSet<blake3::Hash>,
124}
125
126/// Fetch `chunks` for `chunked` (file or directory) from seeders in `seeders_sub`.
127pub async fn fetch_chunks(
128    fud: &Fud,
129    hash: &blake3::Hash,
130    chunked: &mut ChunkedStorage,
131    dht_sub: &Subscription<FudDhtEvent>,
132    favored_seeder: Option<FudSeeder>,
133    chunks: &mut HashSet<blake3::Hash>,
134) -> Result<()> {
135    let mut ctx = ChunkFetchContext { fud, hash, chunked, chunks };
136
137    seeders_loop!(hash, fud, dht_sub, favored_seeder, async |seeder: FudSeeder| -> Result<()> {
138        let (channel, _) = match fud.dht.get_channel(&seeder.node).await {
139            Ok(channel) => channel,
140            Err(e) => {
141                warn!(target: "fud::download::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
142                return Err(e)
143            }
144        };
145        let mut chunks_to_query = ctx.chunks.clone();
146        info!(target: "fud::download::fetch_chunks()", "Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
147
148        loop {
149            // Loop over chunks
150            match fetch_chunk(&mut ctx, &channel, &seeder, &mut chunks_to_query).await {
151                ChunkFetchControl::NextChunk => continue,
152                ChunkFetchControl::NextSeeder => break,
153                ChunkFetchControl::Abort => {
154                    fud.dht.cleanup_channel(channel).await;
155                    return Ok(())
156                }
157            };
158        }
159
160        fud.dht.cleanup_channel(channel).await;
161
162        // Stop when there are no missing chunks
163        if ctx.chunks.is_empty() {
164            return Ok(())
165        }
166
167        Err(().into())
168    });
169
170    Ok(())
171}
172
173/// Fetch a single chunk and return what should be done next
174async fn fetch_chunk(
175    ctx: &mut ChunkFetchContext<'_>,
176    channel: &ChannelPtr,
177    seeder: &FudSeeder,
178    chunks_to_query: &mut HashSet<blake3::Hash>,
179) -> ChunkFetchControl {
180    // Select a chunk to request
181    let mut chunk = None;
182    if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
183        chunk = Some(*random_chunk);
184    }
185
186    if chunk.is_none() {
187        // No more chunks to request from this seeder
188        return ChunkFetchControl::NextSeeder;
189    }
190
191    let chunk_hash = chunk.unwrap();
192    chunks_to_query.remove(&chunk_hash);
193
194    let start_time = Instant::now();
195    let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
196    let msg_subscriber_notfound = channel.subscribe_msg::<FudChunkNotFound>().await.unwrap();
197
198    let send_res = channel.send(&FudChunkRequest { resource: *ctx.hash, chunk: chunk_hash }).await;
199    if let Err(e) = send_res {
200        warn!(target: "fud::download::fetch_chunk()", "Error while sending FudChunkRequest: {e}");
201        return ChunkFetchControl::NextSeeder;
202    }
203
204    let chunk_recv =
205        receive_resource_msg(&msg_subscriber_chunk, *ctx.hash, ctx.fud.chunk_timeout).fuse();
206    let notfound_recv =
207        receive_resource_msg(&msg_subscriber_notfound, *ctx.hash, ctx.fud.chunk_timeout).fuse();
208
209    pin_mut!(chunk_recv, notfound_recv);
210
211    // Wait for a FudChunkReply or FudNotFound
212    select! {
213        chunk_reply = chunk_recv => {
214            msg_subscriber_chunk.unsubscribe().await;
215            msg_subscriber_notfound.unsubscribe().await;
216            if let Err(e) = chunk_reply {
217                warn!(target: "fud::download::fetch_chunk()", "Error waiting for chunk reply: {e}");
218                return ChunkFetchControl::NextSeeder;
219            }
220            let reply = chunk_reply.unwrap();
221            handle_chunk_reply(ctx, &chunk_hash, &reply, seeder, &start_time).await
222        }
223        notfound_reply = notfound_recv => {
224            msg_subscriber_chunk.unsubscribe().await;
225            msg_subscriber_notfound.unsubscribe().await;
226            if let Err(e) = notfound_reply {
227                warn!(target: "fud::download::fetch_chunk()", "Error waiting for NOTFOUND reply: {e}");
228                return ChunkFetchControl::NextSeeder;
229            }
230            info!(target: "fud::download::fetch_chunk()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
231            notify_event!(ctx.fud, ChunkNotFound, { hash: *ctx.hash, chunk_hash });
232            ChunkFetchControl::NextChunk
233        }
234    }
235}
236
237/// Processes an incoming chunk
238async fn handle_chunk_reply(
239    ctx: &mut ChunkFetchContext<'_>,
240    chunk_hash: &blake3::Hash,
241    reply: &FudChunkReply,
242    seeder: &FudSeeder,
243    start_time: &Instant,
244) -> ChunkFetchControl {
245    let write_res = ctx.fud.geode.write_chunk(ctx.chunked, &reply.chunk).await;
246    if let Err(e) = write_res {
247        error!(target: "fud::download::handle_chunk_reply()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(chunk_hash));
248        return ChunkFetchControl::NextChunk;
249    }
250    let (inserted_hash, bytes_written) = write_res.unwrap();
251    if inserted_hash != *chunk_hash {
252        warn!(target: "fud::download::handle_chunk_reply()", "Received chunk does not match requested chunk");
253        return ChunkFetchControl::NextChunk;
254    }
255
256    info!(target: "fud::download::handle_chunk_reply()", "Received chunk {} from seeder {}", hash_to_string(chunk_hash), hash_to_string(&seeder.node.id()));
257
258    // If we did not write the whole chunk to the filesystem,
259    // save the chunk in the scraps.
260    if bytes_written < reply.chunk.len() {
261        info!(target: "fud::download::handle_chunk_reply()", "Saving chunk {} as a scrap", hash_to_string(chunk_hash));
262        let chunk_written = ctx.fud.geode.get_chunk(ctx.chunked, chunk_hash).await;
263        if let Err(e) = chunk_written {
264            error!(target: "fud::download::handle_chunk_reply()", "Error getting chunk: {e}");
265            return ChunkFetchControl::NextChunk;
266        }
267        let scrap = Scrap {
268            chunk: reply.chunk.clone(),
269            hash_written: blake3::hash(&chunk_written.unwrap()),
270        };
271        if let Err(e) =
272            ctx.fud.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&scrap).await)
273        {
274            error!(target: "fud::download::handle_chunk_reply()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(chunk_hash));
275            return ChunkFetchControl::NextChunk;
276        }
277    }
278
279    // Update the resource
280    let mut resources_write = ctx.fud.resources.write().await;
281    let resource = resources_write.get_mut(ctx.hash);
282    if resource.is_none() {
283        return ChunkFetchControl::Abort // Resource was removed
284    }
285    let resource = resource.unwrap();
286    resource.status = ResourceStatus::Downloading;
287    resource.total_chunks_downloaded += 1;
288    resource.target_chunks_downloaded += 1;
289
290    resource.total_bytes_downloaded += reply.chunk.len() as u64;
291    resource.target_bytes_downloaded +=
292        resource.get_selected_bytes(ctx.chunked, chunk_hash, reply.chunk.len()) as u64;
293    resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64());
294    if resource.speeds.len() > 12 {
295        resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); // Only keep the last few speeds
296    }
297
298    // If we just fetched the last chunk of a file, compute
299    // `total_bytes_size` (and `target_bytes_size`) again,
300    // as `geode.write_chunk()` updated the FileSequence
301    // to the exact file size.
302    if let Some(last_chunk) = ctx.chunked.iter().last() {
303        if matches!(resource.rtype, ResourceType::File) && last_chunk.hash == *chunk_hash {
304            resource.total_bytes_size = ctx.chunked.get_fileseq().len();
305            resource.target_bytes_size = resource.total_bytes_size;
306        }
307    }
308    let resource = resource.clone();
309    drop(resources_write);
310
311    if let Some(chunk_index) = ctx.chunked.get_chunk_index(chunk_hash) {
312        ctx.chunked.get_chunk_mut(chunk_index).available = true;
313        let mut chunked_storages = ctx.fud.chunked_storages.write().await;
314        chunked_storages.insert(*ctx.hash, ctx.chunked.clone());
315        drop(chunked_storages);
316    }
317
318    notify_event!(ctx.fud, ChunkDownloadCompleted, { hash: *ctx.hash, chunk_hash: *chunk_hash, resource });
319    ctx.chunks.remove(chunk_hash);
320    ChunkFetchControl::NextChunk
321}
322
323enum MetadataFetchReply {
324    Directory(FudDirectoryReply),
325    File(FudFileReply),
326    Chunk(FudChunkReply),
327}
328
329/// Fetch a single resource metadata from seeders received from `seeders_sub`.
330/// If the resource is a file smaller than a single chunk then seeder can send the
331/// chunk directly, and we will create the file from it on path `path`.
332/// 1. Wait for seeders from the subscription
333/// 2. Request the metadata from the seeders
334/// 3. Insert metadata to geode using the reply
335pub async fn fetch_metadata(
336    fud: &Fud,
337    hash: &blake3::Hash,
338    path: &Path,
339    dht_sub: &Subscription<FudDhtEvent>,
340) -> Result<FudSeeder> {
341    let mut result: Option<(FudSeeder, MetadataFetchReply)> = None;
342
343    seeders_loop!(hash, fud, dht_sub, async |seeder: FudSeeder| -> Result<()> {
344        let (channel, _) = fud.dht.get_channel(&seeder.node).await?;
345        let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
346        let msg_subscriber_file = channel.subscribe_msg::<FudFileReply>().await.unwrap();
347        let msg_subscriber_dir = channel.subscribe_msg::<FudDirectoryReply>().await.unwrap();
348        let msg_subscriber_notfound = channel.subscribe_msg::<FudMetadataNotFound>().await.unwrap();
349
350        let send_res = channel.send(&FudMetadataRequest { resource: *hash }).await;
351        if let Err(e) = send_res {
352            warn!(target: "fud::download::fetch_metadata()", "Error while sending FudMetadataRequest: {e}");
353            msg_subscriber_chunk.unsubscribe().await;
354            msg_subscriber_file.unsubscribe().await;
355            msg_subscriber_dir.unsubscribe().await;
356            msg_subscriber_notfound.unsubscribe().await;
357            fud.dht.cleanup_channel(channel).await;
358            return Err(e)
359        }
360
361        let chunk_recv =
362            receive_resource_msg(&msg_subscriber_chunk, *hash, fud.chunk_timeout).fuse();
363        let file_recv = receive_resource_msg(&msg_subscriber_file, *hash, fud.chunk_timeout).fuse();
364        let dir_recv = receive_resource_msg(&msg_subscriber_dir, *hash, fud.chunk_timeout).fuse();
365        let notfound_recv =
366            receive_resource_msg(&msg_subscriber_notfound, *hash, fud.chunk_timeout).fuse();
367
368        pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv);
369
370        let cleanup = async || {
371            msg_subscriber_chunk.unsubscribe().await;
372            msg_subscriber_file.unsubscribe().await;
373            msg_subscriber_dir.unsubscribe().await;
374            msg_subscriber_notfound.unsubscribe().await;
375            fud.dht.cleanup_channel(channel).await;
376        };
377
378        // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound
379        select! {
380            // Received a chunk while requesting metadata, this is allowed to
381            // optimize fetching files smaller than a single chunk
382            chunk_reply = chunk_recv => {
383                cleanup().await;
384                if let Err(e) = chunk_reply {
385                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for chunk reply: {e}");
386                    return Err(e)
387                }
388                let reply = chunk_reply.unwrap();
389                let chunk_hash = blake3::hash(&reply.chunk);
390                // Check that this is the only chunk in the file
391                if !fud.geode.verify_metadata(hash, &[chunk_hash], &[]) {
392                    warn!(target: "fud::download::fetch_metadata()", "Received a chunk while fetching metadata, but the chunk did not match the file hash");
393                    return Err(().into())
394                }
395                info!(target: "fud::download::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
396                result = Some((seeder, MetadataFetchReply::Chunk((*reply).clone())));
397                Ok(())
398            }
399            file_reply = file_recv => {
400                cleanup().await;
401                if let Err(e) = file_reply {
402                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for file reply: {e}");
403                    return Err(e)
404                }
405                let reply = file_reply.unwrap();
406                if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) {
407                    warn!(target: "fud::download::fetch_metadata()", "Received invalid file metadata");
408                    return Err(().into())
409                }
410                info!(target: "fud::download::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
411                result = Some((seeder, MetadataFetchReply::File((*reply).clone())));
412                Ok(())
413            }
414            dir_reply = dir_recv => {
415                cleanup().await;
416                if let Err(e) = dir_reply {
417                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for directory reply: {e}");
418                    return Err(e)
419                }
420                let reply = dir_reply.unwrap();
421
422                // Convert all file paths from String to PathBuf
423                let files: Vec<_> = reply.files.clone().into_iter()
424                    .map(|(path_str, size)| (PathBuf::from(path_str), size))
425                    .collect();
426
427                if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &files) {
428                    warn!(target: "fud::download::fetch_metadata()", "Received invalid directory metadata");
429                    return Err(().into())
430                }
431                info!(target: "fud::download::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
432                result = Some((seeder, MetadataFetchReply::Directory((*reply).clone())));
433                Ok(())
434            }
435            notfound_reply = notfound_recv => {
436                cleanup().await;
437                if let Err(e) = notfound_reply {
438                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
439                    return Err(e)
440                }
441                info!(target: "fud::download::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
442                Err(().into())
443            }
444        }
445    });
446
447    // We did not find the resource
448    if result.is_none() {
449        return Err(Error::Custom("Metadata not found".to_string()))
450    }
451
452    // Insert metadata to geode using the reply
453    // At this point the reply content is already verified
454    let (seeder, reply) = result.unwrap();
455    match reply {
456        MetadataFetchReply::Directory(FudDirectoryReply { files, chunk_hashes, .. }) => {
457            // Convert all file paths from String to PathBuf
458            let mut files: Vec<_> =
459                files.into_iter().map(|(path_str, size)| (PathBuf::from(path_str), size)).collect();
460
461            fud.geode.sort_files(&mut files);
462            if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &files).await {
463                error!(target: "fud::download::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash));
464                return Err(e)
465            }
466        }
467        MetadataFetchReply::File(FudFileReply { chunk_hashes, .. }) => {
468            if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
469                error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash));
470                return Err(e)
471            }
472        }
473        // Looked for a file but got a chunk: the entire file fits in a single chunk
474        MetadataFetchReply::Chunk(FudChunkReply { chunk, .. }) => {
475            info!(target: "fud::download::fetch_metadata()", "File fits in a single chunk");
476            let chunk_hash = blake3::hash(&chunk);
477            if let Err(e) = fud.geode.insert_metadata(hash, &[chunk_hash], &[]).await {
478                error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode (from single chunk): {e}", hash_to_string(hash));
479                return Err(e)
480            }
481            create_all_files(&[path.to_path_buf()]).await?;
482            let mut chunked_file = ChunkedStorage::new(
483                &[chunk_hash],
484                &[(path.to_path_buf(), chunk.len() as u64)],
485                false,
486            );
487            if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await {
488                error!(target: "fud::download::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
489                return Err(e)
490            };
491        }
492    };
493
494    Ok(seeder)
495}