fud/
util.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::HashSet,
21    path::{Path, PathBuf},
22    sync::Arc,
23    time::Instant,
24};
25
26use smol::{
27    fs::{self, File},
28    stream::StreamExt,
29};
30use tinyjson::JsonValue;
31
32pub use darkfi::geode::hash_to_string;
33use darkfi::{
34    net::{Message, MessageSubscription},
35    rpc::util::json_map,
36    Error, Result,
37};
38
39use crate::proto::ResourceMessage;
40
41pub async fn get_all_files(dir: &Path) -> Result<Vec<(PathBuf, u64)>> {
42    let mut files = Vec::new();
43
44    let mut entries = fs::read_dir(dir).await.unwrap();
45
46    while let Some(entry) = entries.try_next().await.unwrap() {
47        let path = entry.path();
48
49        if path.is_dir() {
50            files.append(&mut Box::pin(get_all_files(&path)).await?);
51        } else {
52            let metadata = fs::metadata(&path).await?;
53            let file_size = metadata.len();
54            files.push((path, file_size));
55        }
56    }
57
58    Ok(files)
59}
60
61pub async fn create_all_files(files: &[PathBuf]) -> Result<()> {
62    for file_path in files.iter() {
63        if !file_path.exists() {
64            if let Some(dir) = file_path.parent() {
65                fs::create_dir_all(dir).await?;
66            }
67            File::create(&file_path).await?;
68        }
69    }
70
71    Ok(())
72}
73
74/// An enum to represent a set of files, where you can use `All` if you want
75/// all files without having to specify all of them.
76#[derive(Clone, Debug)]
77pub enum FileSelection {
78    All,
79    Set(HashSet<PathBuf>),
80}
81
82impl FileSelection {
83    pub fn get_set(&self) -> Option<HashSet<PathBuf>> {
84        match self {
85            FileSelection::Set(set) => Some(set.clone()),
86            _ => None,
87        }
88    }
89
90    pub fn is_all(&self) -> bool {
91        matches!(self, FileSelection::All)
92    }
93
94    pub fn is_subset(&self, other: &Self) -> bool {
95        match self {
96            FileSelection::All => other.is_all(),
97            FileSelection::Set(self_set) => match other {
98                FileSelection::All => true,
99                FileSelection::Set(other_set) => self_set.is_subset(other_set),
100            },
101        }
102    }
103
104    pub fn is_disjoint(&self, other: &Self) -> bool {
105        match self {
106            FileSelection::All => false,
107            FileSelection::Set(self_set) => match other {
108                FileSelection::All => false,
109                FileSelection::Set(other_set) => self_set.is_disjoint(other_set),
110            },
111        }
112    }
113
114    /// Merges two file selections: if any is [`FileSelection::All`] then
115    /// output is [`FileSelection::All`], if they are both
116    /// [`FileSelection::Set`] then the sets are merged.
117    pub fn merge(&self, other: &Self) -> Self {
118        if matches!(self, FileSelection::All) || matches!(other, FileSelection::All) {
119            return FileSelection::All
120        }
121        let files1 = self.get_set().unwrap();
122        let files2 = other.get_set().unwrap();
123        FileSelection::Set(files1.union(&files2).cloned().collect())
124    }
125}
126
127impl FromIterator<PathBuf> for FileSelection {
128    fn from_iter<I: IntoIterator<Item = PathBuf>>(iter: I) -> Self {
129        let paths: HashSet<PathBuf> = iter.into_iter().collect();
130        FileSelection::Set(paths)
131    }
132}
133
134impl From<FileSelection> for JsonValue {
135    fn from(fs: FileSelection) -> JsonValue {
136        json_map([
137            (
138                "type",
139                JsonValue::String(
140                    match fs {
141                        FileSelection::All => "all",
142                        FileSelection::Set(_) => "set",
143                    }
144                    .to_string(),
145                ),
146            ),
147            (
148                "set",
149                JsonValue::Array(match fs {
150                    FileSelection::All => vec![],
151                    FileSelection::Set(set) => set
152                        .into_iter()
153                        .map(|path| JsonValue::String(path.to_string_lossy().to_string()))
154                        .collect(),
155                }),
156            ),
157        ])
158    }
159}
160
161impl From<JsonValue> for FileSelection {
162    fn from(value: JsonValue) -> Self {
163        match value["type"].get::<String>().unwrap().as_str() {
164            "set" => {
165                let set = value["set"]
166                    .get::<Vec<JsonValue>>()
167                    .unwrap()
168                    .iter()
169                    .map(|path| PathBuf::from(path.get::<String>().unwrap()))
170                    .collect::<HashSet<PathBuf>>();
171                FileSelection::Set(set)
172            }
173            _ => FileSelection::All,
174        }
175    }
176}
177
178/// Wait for a [`crate::proto::ResourceMessage`] on `msg_subscriber` with a timeout.
179/// If we receive a message with the wrong resource hash, it's skipped.
180pub async fn receive_resource_msg<M: Message + ResourceMessage + std::fmt::Debug>(
181    msg_subscriber: &MessageSubscription<M>,
182    resource_hash: blake3::Hash,
183    timeout_seconds: u64,
184) -> Result<Arc<M>> {
185    let start = Instant::now();
186    loop {
187        let elapsed = start.elapsed().as_secs();
188        if elapsed >= timeout_seconds {
189            return Err(Error::ConnectTimeout);
190        }
191        let remaining_timeout = timeout_seconds - elapsed;
192
193        let reply = msg_subscriber.receive_with_timeout(remaining_timeout).await?;
194        // Done if it's the right resource hash
195        if reply.resource_hash() == resource_hash {
196            return Ok(reply)
197        }
198    }
199}