1use std::{
20 collections::{HashMap, HashSet},
21 io::ErrorKind,
22 path::{Path, PathBuf},
23 sync::Arc,
24};
25
26use sled_overlay::sled;
27use smol::{
28 channel,
29 fs::{self, OpenOptions},
30 lock::RwLock,
31};
32use tracing::{error, info, warn};
33
34use darkfi::{
35 dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings},
36 geode::{hash_to_string, Chunk, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
37 net::P2pPtr,
38 system::{ExecutorPtr, PublisherPtr, StoppableTask},
39 util::{path::expand_path, time::Timestamp},
40 Error, Result,
41};
42use darkfi_sdk::crypto::{schnorr::SchnorrSecret, SecretKey};
43use darkfi_serial::{deserialize_async, serialize_async};
44
45pub mod proto;
47use proto::FudAnnounce;
48
49pub mod event;
51use event::{notify_event, FudEvent};
52
53pub mod resource;
55use resource::{Resource, ResourceStatus, ResourceType};
56
57pub mod scrap;
59use scrap::Scrap;
60
61pub mod rpc;
63
64pub mod tasks;
66use tasks::start_task;
67
68pub mod bitcoin;
70
71pub mod pow;
73use pow::{FudPow, VerifiableNodeData};
74
75pub mod equix;
77
78pub mod settings;
80use settings::Args;
81
82pub mod util;
84use util::{create_all_files, get_all_files, FileSelection};
85
86mod download;
88use download::{fetch_chunks, fetch_metadata};
89
90pub mod dht;
92use dht::FudSeeder;
93
94use crate::{dht::FudNode, pow::PowSettings};
95
96const SLED_PATH_TREE: &[u8] = b"_fud_paths";
97const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
98const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
99
100#[derive(Clone, Debug)]
101pub struct FudState {
102 node_data: VerifiableNodeData,
104 secret_key: SecretKey,
106}
107
108pub struct Fud {
109 state: Arc<RwLock<Option<FudState>>>,
110 geode: Geode,
112 downloads_path: PathBuf,
114 chunk_timeout: u64,
116 pub pow: Arc<RwLock<FudPow>>,
118 dht: Arc<Dht<Fud>>,
120 resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
122 chunked_storages: Arc<RwLock<HashMap<blake3::Hash, ChunkedStorage>>>,
124 path_tree: sled::Tree,
126 file_selection_tree: sled::Tree,
130 scrap_tree: sled::Tree,
137 get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
139 get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
141 put_tx: channel::Sender<PathBuf>,
143 put_rx: channel::Receiver<PathBuf>,
145 lookup_tx: channel::Sender<blake3::Hash>,
147 lookup_rx: channel::Receiver<blake3::Hash>,
149 verify_node_tx: channel::Sender<FudNode>,
151 verify_node_rx: channel::Receiver<FudNode>,
153 fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
155 put_tasks: Arc<RwLock<HashMap<PathBuf, Arc<StoppableTask>>>>,
157 lookup_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
159 tasks: Arc<RwLock<HashMap<String, Arc<StoppableTask>>>>,
161 event_publisher: PublisherPtr<FudEvent>,
163 p2p: P2pPtr,
165 pub executor: ExecutorPtr,
167}
168
169impl Fud {
170 pub async fn new(
171 settings: Args,
172 p2p: P2pPtr,
173 sled_db: &sled::Db,
174 event_publisher: PublisherPtr<FudEvent>,
175 executor: ExecutorPtr,
176 ) -> Result<Arc<Self>> {
177 let dht_settings: DhtSettings = settings.dht.into();
178 let net_settings_lock = p2p.settings();
179 let mut net_settings = net_settings_lock.write().await;
180 net_settings.outbound_connections = 0;
182 net_settings.getaddrs_max =
184 Some(net_settings.getaddrs_max.unwrap_or(dht_settings.k.min(u32::MAX as usize) as u32));
185 drop(net_settings);
186
187 let basedir = expand_path(&settings.base_dir)?;
188 let downloads_path = match settings.downloads_path {
189 Some(downloads_path) => expand_path(&downloads_path)?,
190 None => basedir.join("downloads"),
191 };
192
193 let pow_settings: PowSettings = settings.pow.into();
194 let pow = FudPow::new(pow_settings.clone(), executor.clone());
195
196 info!(target: "fud::new()", "Instantiating Geode instance");
198 let geode = Geode::new(&basedir).await?;
199
200 let dht: Arc<Dht<Fud>> =
202 Arc::new(Dht::<Fud>::new(&dht_settings, p2p.clone(), executor.clone()).await);
203
204 let (get_tx, get_rx) = smol::channel::unbounded();
205 let (put_tx, put_rx) = smol::channel::unbounded();
206 let (lookup_tx, lookup_rx) = smol::channel::unbounded();
207 let (verify_node_tx, verify_node_rx) = smol::channel::unbounded();
208 let fud = Arc::new(Self {
209 state: Arc::new(RwLock::new(None)),
210 geode,
211 downloads_path,
212 chunk_timeout: settings.chunk_timeout,
213 pow: Arc::new(RwLock::new(pow)),
214 dht: dht.clone(),
215 path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
216 file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
217 scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?,
218 resources: Arc::new(RwLock::new(HashMap::new())),
219 chunked_storages: Arc::new(RwLock::new(HashMap::new())),
220 get_tx,
221 get_rx,
222 put_tx,
223 put_rx,
224 lookup_tx,
225 lookup_rx,
226 verify_node_tx,
227 verify_node_rx,
228 fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
229 put_tasks: Arc::new(RwLock::new(HashMap::new())),
230 lookup_tasks: Arc::new(RwLock::new(HashMap::new())),
231 tasks: Arc::new(RwLock::new(HashMap::new())),
232 event_publisher,
233 p2p,
234 executor,
235 });
236 *dht.handler.write().await = Arc::downgrade(&fud);
237
238 Ok(fud)
239 }
240
241 pub async fn start(self: &Arc<Self>) -> Result<()> {
243 let mut pow = self.pow.write().await;
244 if pow.settings.read().await.btc_enabled {
245 pow.bitcoin_hash_cache.update().await?; }
247 let (node_data, secret_key) = pow.generate_node().await?;
248 info!(target: "fud::init()", "Your node ID: {}", hash_to_string(&node_data.id()));
249 let mut state = self.state.write().await;
250 *state = Some(FudState { node_data, secret_key });
251 drop(state);
252 drop(pow);
253
254 self.start_tasks().await;
255
256 Ok(())
257 }
258
259 async fn start_tasks(self: &Arc<Self>) {
260 let mut tasks = self.tasks.write().await;
261 start_task!(self, "get", tasks::get_task, tasks);
262 start_task!(self, "put", tasks::put_task, tasks);
263 start_task!(self, "events", tasks::handle_dht_events, tasks);
264 start_task!(self, "DHT events", dht_tasks::events_task::<Fud>, tasks);
265 start_task!(self, "DHT channel", dht_tasks::channel_task::<Fud>, tasks);
266 start_task!(self, "DHT cleanup channels", dht_tasks::cleanup_channels_task::<Fud>, tasks);
267 start_task!(self, "DHT add node", dht_tasks::add_node_task::<Fud>, tasks);
268 start_task!(self, "DHT refinery", dht_tasks::dht_refinery_task::<Fud>, tasks);
269 start_task!(
270 self,
271 "DHT disconnect inbounds",
272 dht_tasks::disconnect_inbounds_task::<Fud>,
273 tasks
274 );
275 start_task!(self, "lookup", tasks::lookup_task, tasks);
276 start_task!(self, "verify node", tasks::verify_node_task, tasks);
277 start_task!(self, "announce", tasks::announce_seed_task, tasks);
278 start_task!(self, "node ID", tasks::node_id_task, tasks);
279 }
280
281 async fn init(&self) -> Result<()> {
284 info!(target: "fud::init()", "Finding resources...");
285 let mut resources_write = self.resources.write().await;
286 for result in self.path_tree.iter() {
287 if result.is_err() {
288 continue;
289 }
290
291 let (hash, path) = result.unwrap();
293 let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
294 Ok(v) => v,
295 Err(_) => continue,
296 };
297 let hash = blake3::Hash::from_bytes(hash_bytes);
298
299 let path_bytes = path.to_vec();
301 let path_str = match std::str::from_utf8(&path_bytes) {
302 Ok(v) => v,
303 Err(_) => continue,
304 };
305 let path: PathBuf = match expand_path(path_str) {
306 Ok(v) => v,
307 Err(_) => continue,
308 };
309
310 let mut file_selection = FileSelection::All;
312 if let Ok(Some(fs)) = self.file_selection_tree.get(hash.as_bytes()) {
313 if let Ok(path_list) = deserialize_async::<Vec<Vec<u8>>>(&fs).await {
314 file_selection = FileSelection::Set(
315 path_list
316 .into_iter()
317 .filter_map(|bytes| {
318 std::str::from_utf8(&bytes)
319 .ok()
320 .and_then(|path_str| expand_path(path_str).ok())
321 })
322 .collect(),
323 );
324 }
325 }
326
327 resources_write.insert(
329 hash,
330 Resource::new(
331 hash,
332 ResourceType::Unknown,
333 &path,
334 ResourceStatus::Incomplete(None),
335 file_selection,
336 ),
337 );
338 }
339 drop(resources_write);
340
341 info!(target: "fud::init()", "Verifying resources...");
342 let resources = self.verify_resources(None).await?;
343
344 let self_node = self.node().await?;
345
346 if self_node.addresses.is_empty() {
348 return Ok(());
349 }
350
351 for resource in &resources {
353 if let Ok(seeder) = self.new_seeder(&resource.hash).await {
354 let self_router_items = vec![seeder];
355 self.add_value(&resource.hash, &self_router_items).await;
356 }
357 }
358
359 info!(target: "fud::init()", "Announcing resources...");
360 for resource in resources {
361 if let Ok(seeder) = self.new_seeder(&resource.hash).await {
362 let seeders = vec![seeder];
363 let _ = self
364 .dht
365 .announce(
366 &resource.hash,
367 &seeders.clone(),
368 &FudAnnounce { key: resource.hash, seeders },
369 )
370 .await;
371 }
372 }
373
374 Ok(())
375 }
376
377 pub async fn resources(&self) -> HashMap<blake3::Hash, Resource> {
379 let resources = self.resources.read().await;
380 resources.clone()
381 }
382
383 pub fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
385 if let Some(value) = self.path_tree.get(hash.as_bytes())? {
386 let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
387 return Ok(Some(path));
388 }
389
390 Ok(None)
391 }
392
393 pub fn path_to_hash(&self, path: &Path) -> Result<Option<blake3::Hash>> {
395 let path_string = path.to_string_lossy().to_string();
396 let path_bytes = path_string.as_bytes();
397 for path_item in self.path_tree.iter() {
398 let (key, value) = path_item?;
399 if value == path_bytes {
400 let bytes: &[u8] = &key;
401 if bytes.len() != 32 {
402 return Err(Error::Custom(format!(
403 "Expected a 32-byte BLAKE3, got {} bytes",
404 bytes.len()
405 )));
406 }
407
408 let array: [u8; 32] = bytes.try_into().unwrap();
409 return Ok(Some(array.into()))
410 }
411 }
412
413 Ok(None)
414 }
415
416 pub async fn new_seeder(&self, key: &blake3::Hash) -> Result<FudSeeder> {
418 let state = self.state.read().await;
419 if state.is_none() {
420 return Err(Error::Custom("Fud is not ready yet".to_string()))
421 }
422 let state_ = state.clone().unwrap();
423 drop(state);
424 let node = self.node().await?;
425
426 Ok(FudSeeder {
427 key: *key,
428 node: node.clone(),
429 sig: state_
430 .secret_key
431 .sign(&[key.as_bytes().to_vec(), serialize_async(&node).await].concat()),
432 timestamp: Timestamp::current_time().inner(),
433 })
434 }
435
436 pub async fn verify_resources(
443 &self,
444 hashes: Option<Vec<blake3::Hash>>,
445 ) -> Result<Vec<Resource>> {
446 let mut resources_write = self.resources.write().await;
447
448 let update_resource = async |resource: &mut Resource,
449 status: ResourceStatus,
450 chunked: Option<&ChunkedStorage>,
451 total_bytes_downloaded: u64,
452 target_bytes_downloaded: u64| {
453 let files = match chunked {
454 Some(chunked) => resource.get_selected_files(chunked, &resource.file_selection),
455 None => vec![],
456 };
457 let chunk_hashes = match chunked {
458 Some(chunked) => resource.get_selected_chunks(chunked),
459 None => HashSet::new(),
460 };
461
462 if let Some(chunked) = chunked {
463 resource.rtype = match chunked.is_dir() {
464 false => ResourceType::File,
465 true => ResourceType::Directory,
466 };
467 }
468
469 resource.status = status;
470 resource.total_chunks_count = match chunked {
471 Some(chunked) => chunked.len() as u64,
472 None => 0,
473 };
474 resource.target_chunks_count = chunk_hashes.len() as u64;
475 resource.total_chunks_downloaded = match chunked {
476 Some(chunked) => chunked.local_chunks() as u64,
477 None => 0,
478 };
479 resource.target_chunks_downloaded = match chunked {
480 Some(chunked) => chunked
481 .iter()
482 .filter(|chunk| chunk_hashes.contains(&chunk.hash) && chunk.available)
483 .count() as u64,
484 None => 0,
485 };
486
487 resource.total_bytes_size = match chunked {
488 Some(chunked) => chunked.get_fileseq().len(),
489 None => 0,
490 };
491 resource.target_bytes_size = match chunked {
492 Some(chunked) => chunked
493 .get_files()
494 .iter()
495 .filter(|(path, _)| files.contains(path))
496 .map(|(_, size)| size)
497 .sum(),
498 None => 0,
499 };
500
501 resource.total_bytes_downloaded = total_bytes_downloaded;
502 resource.target_bytes_downloaded = target_bytes_downloaded;
503
504 notify_event!(self, ResourceUpdated, resource);
505 };
506
507 let mut seeding_resources: Vec<Resource> = vec![];
508 for (_, mut resource) in resources_write.iter_mut() {
509 if let Some(ref hashes_list) = hashes {
510 if !hashes_list.contains(&resource.hash) {
511 continue;
512 }
513 }
514
515 match resource.status {
516 ResourceStatus::Seeding => {}
517 ResourceStatus::Incomplete(_) => {}
518 _ => continue,
519 };
520
521 let resource_path = match self.hash_to_path(&resource.hash) {
523 Ok(Some(v)) => v,
524 Ok(None) | Err(_) => {
525 update_resource(&mut resource, ResourceStatus::Incomplete(None), None, 0, 0)
526 .await;
527 continue;
528 }
529 };
530 let mut chunked = match self.geode.get(&resource.hash, &resource_path).await {
531 Ok(v) => v,
532 Err(_) => {
533 update_resource(&mut resource, ResourceStatus::Incomplete(None), None, 0, 0)
534 .await;
535 continue;
536 }
537 };
538 let verify_res =
539 self.verify_chunks(resource, &mut chunked, &resource.file_selection).await;
540 if let Err(e) = verify_res {
541 error!(target: "fud::verify_resources()", "Error while verifying chunks of {}: {e}", hash_to_string(&resource.hash));
542 update_resource(&mut resource, ResourceStatus::Incomplete(None), None, 0, 0).await;
543 continue;
544 }
545 let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
546
547 let mut chunked_storages = self.chunked_storages.write().await;
548 chunked_storages.insert(resource.hash, chunked.clone());
549 drop(chunked_storages);
550
551 if !chunked.is_complete() {
552 update_resource(
553 &mut resource,
554 ResourceStatus::Incomplete(None),
555 Some(&chunked),
556 total_bytes_downloaded,
557 target_bytes_downloaded,
558 )
559 .await;
560 continue;
561 }
562
563 update_resource(
564 &mut resource,
565 ResourceStatus::Seeding,
566 Some(&chunked),
567 total_bytes_downloaded,
568 target_bytes_downloaded,
569 )
570 .await;
571 seeding_resources.push(resource.clone());
572 }
573
574 Ok(seeding_resources)
575 }
576
577 pub async fn get(&self, hash: &blake3::Hash, path: &Path, files: FileSelection) -> Result<()> {
580 let fetch_tasks = self.fetch_tasks.read().await;
581 if fetch_tasks.contains_key(hash) {
582 return Err(Error::Custom(format!(
583 "Resource {} is already being downloaded",
584 hash_to_string(hash)
585 )))
586 }
587 drop(fetch_tasks);
588
589 self.get_tx.send((*hash, path.to_path_buf(), files)).await?;
590
591 Ok(())
592 }
593
594 pub async fn get_metadata(
601 &self,
602 hash: &blake3::Hash,
603 path: &Path,
604 ) -> Result<(ChunkedStorage, Option<FudSeeder>)> {
605 match self.geode.get(hash, path).await {
606 Ok(v) => Ok((v, None)),
608 Err(Error::GeodeNeedsGc) => todo!(),
610 Err(Error::GeodeFileNotFound) => {
612 info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash));
614 let dht_sub = self.dht.subscribe().await;
615 if let Err(e) = self.lookup_tx.send(*hash).await {
616 dht_sub.unsubscribe().await;
617 return Err(e.into())
618 }
619
620 let fetch_res = fetch_metadata(self, hash, path, &dht_sub).await;
622 dht_sub.unsubscribe().await;
623 let seeder = fetch_res?;
624 Ok((self.geode.get(hash, path).await?, Some(seeder)))
625 }
626 Err(e) => Err(e),
627 }
628 }
629
630 pub async fn get_progress(
632 &self,
633 hash: &blake3::Hash,
634 file_selection: &FileSelection,
635 ) -> (u64, u64) {
636 let mut resources = self.resources.write().await;
637 let resource = resources.get_mut(hash);
638 if resource.is_none() {
639 return (0, 0)
640 }
641 let resource = resource.unwrap();
642 match self.hash_to_path(&resource.hash) {
643 Ok(Some(_)) => {}
644 Ok(None) | Err(_) => {
645 resource.status = ResourceStatus::Incomplete(None);
646 resource.total_bytes_downloaded = 0;
647 resource.target_bytes_downloaded = 0;
648 notify_event!(self, ResourceUpdated, resource);
649 return (0, 0)
650 }
651 }
652 let mut chunked_storages = self.chunked_storages.write().await;
653 let Some(chunked) = chunked_storages.get_mut(&resource.hash) else { return (0, 0) };
654
655 let files_vec: Vec<PathBuf> = resource.get_selected_files(chunked, file_selection);
656
657 let bytes_downloaded = {
658 let mut bytes = 0;
659 for chunk in chunked.iter() {
660 if chunk.available {
661 bytes += resource.get_bytes_of_selection(
662 chunked,
663 file_selection,
664 &chunk.hash,
665 chunk.size,
666 ) as u64;
667 }
668 }
669 bytes
670 };
671 let bytes_total = chunked.get_fileseq().subset_len(files_vec.into_iter().collect());
672
673 (bytes_downloaded, bytes_total)
674 }
675
676 pub async fn fetch_resource(
679 &self,
680 hash: &blake3::Hash,
681 path: &Path,
682 files: &FileSelection,
683 ) -> Result<()> {
684 let hash_bytes = hash.as_bytes();
685 let path_string = path.to_string_lossy().to_string();
686 let path_bytes = path_string.as_bytes();
687
688 macro_rules! update_resource {
691 ($hash:ident, { $($field:ident = $value:expr $(,)?)* }) => {{
692 let mut resources_write = self.resources.write().await;
693 let resource = match resources_write.get_mut($hash) {
694 Some(resource) => {
695 $(resource.$field = $value;)* resource.clone()
697 }
698 None => return Ok(()), };
700 resource
701 }};
702 }
703
704 if let Ok(Some(hash_found)) = self.path_to_hash(path) {
706 if *hash != hash_found {
707 return Err(Error::Custom(format!(
708 "There is already another resource on path {path_string}"
709 )))
710 }
711 }
712
713 self.path_tree.insert(hash_bytes, path_bytes)?;
715
716 let mut resources_write = self.resources.write().await;
717 let merged_files = if let Some(old_resource) = resources_write.get(hash) {
718 old_resource.file_selection.merge(files)
719 } else {
720 files.clone()
721 };
722
723 if let FileSelection::Set(selected_files) = &merged_files {
725 let paths: Vec<Vec<u8>> = selected_files
726 .iter()
727 .map(|f| f.to_string_lossy().to_string().as_bytes().to_vec())
728 .collect();
729 let serialized_paths = serialize_async(&paths).await;
730 if let Err(e) = self.file_selection_tree.insert(hash_bytes, serialized_paths) {
732 return Err(Error::SledError(e))
733 }
734 } else {
735 if let Err(e) = self.file_selection_tree.remove(hash_bytes) {
737 return Err(Error::SledError(e))
738 }
739 }
740
741 let mut resource = Resource::new(
743 *hash,
744 ResourceType::Unknown,
745 path,
746 ResourceStatus::Discovering,
747 merged_files.clone(),
748 );
749 resource.last_file_selection = files.clone();
750 resources_write.insert(*hash, resource.clone());
751 drop(resources_write);
752
753 let dht_sub = self.dht.subscribe().await;
755
756 notify_event!(self, DownloadStarted, resource);
758
759 let metadata_result = self.get_metadata(hash, path).await;
761
762 if let Err(e) = metadata_result {
763 let resource = update_resource!(hash, {
765 status = ResourceStatus::Incomplete(Some("Metadata not found".to_string()))
766 });
767 notify_event!(self, MetadataNotFound, resource);
768 dht_sub.unsubscribe().await;
769 return Err(e)
770 }
771 let (mut chunked, metadata_seeder) = metadata_result.unwrap();
772
773 let resources_read = self.resources.read().await;
775 let resource = match resources_read.get(hash) {
776 Some(resource) => resource,
777 None => {
778 dht_sub.unsubscribe().await;
780 return Ok(())
781 }
782 };
783 let files_vec: Vec<PathBuf> = resource.get_selected_files(&chunked, files);
784 drop(resources_read);
785
786 if let Err(e) = create_all_files(&files_vec).await {
788 dht_sub.unsubscribe().await;
789 return Err(e)
790 }
791
792 let resource = update_resource!(hash, {
794 status = ResourceStatus::Verifying,
795 total_chunks_count = chunked.len() as u64,
796 total_bytes_size = chunked.get_fileseq().len(),
797 rtype = match chunked.is_dir() {
798 false => ResourceType::File,
799 true => ResourceType::Directory,
800 },
801 });
802 notify_event!(self, MetadataDownloadCompleted, resource);
803
804 let chunk_hashes = resource.get_chunks_of_selection(&chunked, files);
806
807 if let Err(e) = self.write_scraps(&mut chunked, &chunk_hashes).await {
809 dht_sub.unsubscribe().await;
810 return Err(e)
811 }
812
813 let verify_res = self.verify_chunks(&resource, &mut chunked, files).await;
815
816 let mut chunked_storages = self.chunked_storages.write().await;
818 chunked_storages.insert(*hash, chunked.clone());
819 drop(chunked_storages);
820
821 if let Err(e) = verify_res {
822 dht_sub.unsubscribe().await;
823 error!(target: "fud::fetch_resource()", "Error while verifying chunks: {e}");
824 return Err(e);
825 }
826 let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
827
828 if let ResourceType::File = resource.rtype {
830 update_resource!(hash, { total_bytes_size = chunked.get_fileseq().len() });
831 notify_event!(self, ResourceUpdated, resource);
832 }
833
834 if !chunked.is_dir() {
839 let fs_metadata = fs::metadata(&path).await;
840 if let Err(e) = fs_metadata {
841 dht_sub.unsubscribe().await;
842 return Err(e.into());
843 }
844 if fs_metadata.unwrap().len() > (chunked.len() * MAX_CHUNK_SIZE) as u64 {
845 if let Ok(file) = OpenOptions::new().write(true).create(true).open(path).await {
846 let _ = file.set_len((chunked.len() * MAX_CHUNK_SIZE) as u64).await;
847 }
848 }
849 }
850
851 let chunks: HashSet<Chunk> =
853 chunked.iter().filter(|c| chunk_hashes.contains(&c.hash)).cloned().collect();
854
855 let mut missing_chunks: HashSet<blake3::Hash> =
857 chunks.iter().filter(|&c| !c.available).map(|c| c.hash).collect();
858
859 update_resource!(hash, {
861 target_chunks_count = chunks.len() as u64,
862 total_chunks_downloaded = chunked.local_chunks() as u64,
863 target_chunks_downloaded = (chunks.len() - missing_chunks.len()) as u64,
864
865 target_bytes_size =
866 chunked.get_fileseq().subset_len(files_vec.into_iter().collect()),
867 total_bytes_downloaded = total_bytes_downloaded,
868 target_bytes_downloaded = target_bytes_downloaded,
869 });
870
871 let download_completed = async |chunked: &ChunkedStorage| -> Result<()> {
872 let resource = update_resource!(hash, {
874 status = match chunked.is_complete() {
875 true => ResourceStatus::Seeding,
876 false => ResourceStatus::Incomplete(None),
877 },
878 target_chunks_downloaded = chunks.len() as u64,
879 total_chunks_downloaded = chunked.local_chunks() as u64,
880 });
881
882 if chunked.is_complete() {
884 if let Ok(seeder) = self.new_seeder(hash).await {
885 let seeders = vec![seeder];
886 let self_announce = FudAnnounce { key: *hash, seeders: seeders.clone() };
887 let _ = self.dht.announce(hash, &seeders, &self_announce).await;
888 }
889 }
890
891 notify_event!(self, DownloadCompleted, resource);
893
894 Ok(())
895 };
896
897 if missing_chunks.is_empty() {
899 dht_sub.unsubscribe().await;
900 return download_completed(&chunked).await;
901 }
902
903 let resource = update_resource!(hash, {
905 status = ResourceStatus::Downloading,
906 });
907 notify_event!(self, MetadataDownloadCompleted, resource);
908
909 if metadata_seeder.is_none() {
911 if let Err(e) = self.lookup_tx.send(*hash).await {
912 dht_sub.unsubscribe().await;
913 return Err(e.into())
914 }
915 }
916
917 let _ =
919 fetch_chunks(self, hash, &mut chunked, &dht_sub, metadata_seeder, &mut missing_chunks)
920 .await;
921
922 dht_sub.unsubscribe().await;
924
925 let mut chunked = self.geode.get(hash, path).await?;
927
928 let resource = update_resource!(hash, { status = ResourceStatus::Verifying });
930 notify_event!(self, ResourceUpdated, resource);
931
932 self.verify_chunks(&resource, &mut chunked, &resource.last_file_selection).await?;
934
935 let is_complete =
936 chunked.iter().filter(|c| chunk_hashes.contains(&c.hash)).all(|c| c.available);
937
938 if !is_complete {
941 let resource = update_resource!(hash, {
943 status = ResourceStatus::Incomplete(Some("Missing chunks".to_string()))
944 });
945
946 notify_event!(self, MissingChunks, resource);
948
949 return Ok(());
950 }
951
952 download_completed(&chunked).await
953 }
954
955 async fn write_scraps(
956 &self,
957 chunked: &mut ChunkedStorage,
958 chunk_hashes: &HashSet<blake3::Hash>,
959 ) -> Result<()> {
960 let mut scraps = HashMap::new();
962 for chunk_hash in chunk_hashes {
964 let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
965 if scrap.is_none() {
966 continue;
967 }
968
969 let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
971 if scrap.is_err() {
972 continue;
973 }
974 let scrap: Scrap = scrap.unwrap();
975
976 scraps.insert(chunk_hash, scrap);
978 }
979
980 if !scraps.is_empty() {
982 info!(target: "fud::write_scraps()", "Writing {} scraps...", scraps.len());
983 }
984 for (scrap_hash, mut scrap) in scraps {
985 let len = scrap.chunk.len();
986 let write_res = self.geode.write_chunk(chunked, scrap.chunk.clone()).await;
987 if let Err(e) = write_res {
988 error!(target: "fud::write_scraps()", "Error rewriting scrap {}: {e}", hash_to_string(scrap_hash));
989 continue;
990 }
991 let (_, chunk_bytes_written) = write_res.unwrap();
992
993 if chunk_bytes_written == len {
995 self.scrap_tree.remove(scrap_hash.as_bytes())?;
996 continue;
997 }
998 let chunk_res = self.geode.get_chunk(chunked, scrap_hash).await;
1000 if let Err(e) = chunk_res {
1001 error!(target: "fud::write_scraps()", "Failed to get scrap {}: {e}", hash_to_string(scrap_hash));
1002 continue;
1003 }
1004 scrap.hash_written = blake3::hash(&chunk_res.unwrap());
1005 if let Err(e) =
1006 self.scrap_tree.insert(scrap_hash.as_bytes(), serialize_async(&scrap).await)
1007 {
1008 error!(target: "fud::write_scraps()", "Failed to save chunk {} as a scrap after rewrite: {e}", hash_to_string(scrap_hash));
1009 }
1010 }
1011
1012 Ok(())
1013 }
1014
1015 pub async fn verify_chunks(
1020 &self,
1021 resource: &Resource,
1022 chunked: &mut ChunkedStorage,
1023 file_selection: &FileSelection,
1024 ) -> Result<(u64, u64)> {
1025 let chunks = chunked.get_chunks().clone();
1026 let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
1027
1028 for (chunk_index, chunk) in chunks.iter().enumerate() {
1030 let chunk_data =
1032 match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
1033 Ok(c) => c,
1034 Err(Error::Io(ErrorKind::NotFound)) => continue,
1035 Err(e) => {
1036 warn!(target: "fud::verify_chunks()", "Error while verifying chunks: {e}");
1037 break
1038 }
1039 };
1040
1041 if self.geode.verify_chunk(&chunk.hash, &chunk_data) {
1043 chunked.get_chunk_mut(chunk_index).available = true;
1044 chunked.get_chunk_mut(chunk_index).size = chunk_data.len();
1045 bytes.insert(
1046 chunk.hash,
1047 (
1048 chunk_data.len(),
1049 resource.get_bytes_of_selection(
1050 chunked,
1051 file_selection,
1052 &chunk.hash,
1053 chunk_data.len(),
1054 ),
1055 ),
1056 );
1057 } else {
1058 chunked.get_chunk_mut(chunk_index).available = false;
1059 }
1060 }
1061
1062 let chunks = chunked.get_chunks().clone();
1064 let missing_on_fs: Vec<_> =
1065 chunks.iter().enumerate().filter(|(_, c)| !c.available).collect();
1066
1067 for (chunk_index, chunk) in missing_on_fs {
1069 let scrap = self.scrap_tree.get(chunk.hash.as_bytes())?;
1070 if scrap.is_none() {
1071 continue;
1072 }
1073
1074 let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
1076 if scrap.is_err() {
1077 continue;
1078 }
1079 let scrap: Scrap = scrap.unwrap();
1080 if blake3::hash(&scrap.chunk) != chunk.hash {
1081 continue;
1082 }
1083
1084 let scrap_chunk =
1086 self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await;
1087 if scrap_chunk.is_err() {
1088 continue;
1089 }
1090 let scrap_chunk = scrap_chunk.unwrap();
1091
1092 if !self.geode.verify_chunk(&scrap.hash_written, &scrap_chunk) {
1094 continue;
1095 }
1096
1097 chunked.get_chunk_mut(chunk_index).available = true;
1099 chunked.get_chunk_mut(chunk_index).size = scrap.chunk.len();
1100
1101 bytes.insert(
1103 chunk.hash,
1104 (
1105 scrap.chunk.len(),
1106 resource.get_bytes_of_selection(
1107 chunked,
1108 file_selection,
1109 &chunk.hash,
1110 scrap.chunk.len(),
1111 ),
1112 ),
1113 );
1114 }
1115
1116 let is_dir = chunked.is_dir();
1120 if let Some(last_chunk) = chunked.iter_mut().last() {
1121 if !is_dir && last_chunk.available {
1122 if let Some((last_chunk_size, _)) = bytes.get(&last_chunk.hash) {
1123 last_chunk.size = *last_chunk_size;
1124 let exact_file_size =
1125 chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
1126 chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
1127 }
1128 }
1129 }
1130
1131 let total_bytes_downloaded = bytes.iter().map(|(_, (b, _))| b).sum::<usize>() as u64;
1132 let target_bytes_downloaded = bytes.iter().map(|(_, (_, b))| b).sum::<usize>() as u64;
1133
1134 Ok((total_bytes_downloaded, target_bytes_downloaded))
1135 }
1136
1137 pub async fn put(&self, path: &Path) -> Result<()> {
1139 let put_tasks = self.put_tasks.read().await;
1140 drop(put_tasks);
1141
1142 self.put_tx.send(path.to_path_buf()).await?;
1143
1144 Ok(())
1145 }
1146
1147 pub async fn insert_resource(&self, path: &PathBuf) -> Result<()> {
1150 let self_node = self.node().await?;
1151
1152 if self_node.addresses.is_empty() {
1153 return Err(Error::Custom(
1154 "Cannot put resource, you don't have any external address".to_string(),
1155 ))
1156 }
1157
1158 let metadata = fs::metadata(path).await?;
1159
1160 let (files, resource_type) = if metadata.is_file() {
1162 (vec![(path.clone(), metadata.len())], ResourceType::File)
1163 } else if metadata.is_dir() {
1164 let mut files = get_all_files(path).await?;
1165 self.geode.sort_files(&mut files);
1166 (files, ResourceType::Directory)
1167 } else {
1168 return Err(Error::Custom(format!("{} is not a valid path", path.to_string_lossy())))
1169 };
1170
1171 let stream = FileSequence::new(&files, false);
1173 let total_size = stream.len();
1174 let (mut hasher, chunk_hashes) = self.geode.chunk_stream(stream).await?;
1175
1176 let relative_files = if let ResourceType::Directory = resource_type {
1178 let relative_files = files
1180 .into_iter()
1181 .map(|(file_path, size)| match file_path.strip_prefix(path) {
1182 Ok(rel_path) => Ok((rel_path.to_path_buf(), size)),
1183 Err(_) => Err(Error::Custom("Invalid file path".to_string())),
1184 })
1185 .collect::<Result<Vec<_>>>()?;
1186
1187 self.geode.hash_files_metadata(&mut hasher, &relative_files);
1189
1190 relative_files
1191 } else {
1192 vec![]
1193 };
1194
1195 let hash = hasher.finalize();
1197
1198 if let Err(e) = self.geode.insert_metadata(&hash, &chunk_hashes, &relative_files).await {
1200 error!(target: "fud::put()", "Failed inserting {path:?} to geode: {e}");
1201 return Err(e)
1202 }
1203
1204 if let Err(e) =
1206 self.path_tree.insert(hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
1207 {
1208 error!(target: "fud::put()", "Failed inserting new resource into sled: {e}");
1209 return Err(e.into())
1210 }
1211
1212 let mut resources_write = self.resources.write().await;
1214 resources_write.insert(
1215 hash,
1216 Resource {
1217 hash,
1218 rtype: resource_type,
1219 path: path.to_path_buf(),
1220 status: ResourceStatus::Seeding,
1221 file_selection: FileSelection::All,
1222 last_file_selection: FileSelection::All,
1223 total_chunks_count: chunk_hashes.len() as u64,
1224 target_chunks_count: chunk_hashes.len() as u64,
1225 total_chunks_downloaded: chunk_hashes.len() as u64,
1226 target_chunks_downloaded: chunk_hashes.len() as u64,
1227 total_bytes_size: total_size,
1228 target_bytes_size: total_size,
1229 total_bytes_downloaded: total_size,
1230 target_bytes_downloaded: total_size,
1231 speeds: vec![],
1232 },
1233 );
1234 drop(resources_write);
1235
1236 if let Ok(seeder) = self.new_seeder(&hash).await {
1238 let seeders = vec![seeder];
1239 let fud_announce = FudAnnounce { key: hash, seeders: seeders.clone() };
1240 let _ = self.dht.announce(&hash, &seeders, &fud_announce).await;
1241 }
1242
1243 notify_event!(self, InsertCompleted, {
1245 hash,
1246 path: path.to_path_buf()
1247 });
1248
1249 Ok(())
1250 }
1251
1252 pub async fn remove(&self, hash: &blake3::Hash) {
1261 let mut resources_write = self.resources.write().await;
1263 resources_write.remove(hash);
1264 drop(resources_write);
1265
1266 if let Ok(Some(path)) = self.hash_to_path(hash) {
1268 let chunked = self.geode.get(hash, &path).await;
1269
1270 if let Ok(chunked) = chunked {
1271 for chunk in chunked.iter() {
1272 let _ = self.scrap_tree.remove(chunk.hash.as_bytes());
1273 }
1274 }
1275 }
1276
1277 let hash_str = hash_to_string(hash);
1279 let _ = fs::remove_file(self.geode.files_path.join(&hash_str)).await;
1280 let _ = fs::remove_file(self.geode.dirs_path.join(&hash_str)).await;
1281
1282 let _ = self.path_tree.remove(hash.as_bytes());
1284
1285 let _ = self.file_selection_tree.remove(hash.as_bytes());
1287
1288 notify_event!(self, ResourceRemoved, { hash: *hash });
1290 }
1291
1292 pub async fn prune_seeders(&self, expiry_secs: u32) {
1294 let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
1295 let mut seeders_write = self.dht.hash_table.write().await;
1296
1297 let keys: Vec<_> = seeders_write.keys().cloned().collect();
1298
1299 for key in keys {
1300 let items = seeders_write.get_mut(&key).unwrap();
1301 items.retain(|item| item.timestamp > expiry_timestamp);
1302 if items.is_empty() {
1303 seeders_write.remove(&key);
1304 }
1305 }
1306 }
1307
1308 pub async fn stop(&self) {
1310 info!("Stopping fetch tasks...");
1311 let fetch_tasks = self.fetch_tasks.read().await;
1313 let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1314 fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1315 drop(fetch_tasks);
1316
1317 for task in cloned_fetch_tasks.values() {
1318 task.stop().await;
1319 }
1320
1321 info!("Stopping put tasks...");
1322 let put_tasks = self.put_tasks.read().await;
1323 let cloned_put_tasks: HashMap<PathBuf, Arc<StoppableTask>> =
1324 put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect();
1325 drop(put_tasks);
1326
1327 for task in cloned_put_tasks.values() {
1328 task.stop().await;
1329 }
1330
1331 info!("Stopping lookup tasks...");
1332 let lookup_tasks = self.lookup_tasks.read().await;
1333 let cloned_lookup_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1334 lookup_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1335 drop(lookup_tasks);
1336
1337 for task in cloned_lookup_tasks.values() {
1338 task.stop().await;
1339 }
1340
1341 let mut tasks = self.tasks.write().await;
1343 for (name, task) in tasks.clone() {
1344 info!("Stopping {name} task...");
1345 task.stop().await;
1346 }
1347 *tasks = HashMap::new();
1348 }
1349}