1use std::sync::Arc;
20
21use tracing::{error, info, warn};
22
23use darkfi::{
24 dht::{event::DhtEvent, DhtHandler, DhtNode},
25 geode::hash_to_string,
26 system::{sleep, StoppableTask},
27 Error, Result,
28};
29
30use crate::{
31 event::{self, notify_event},
32 proto::FudAnnounce,
33 resource::ResourceStatus,
34 Fud, FudEvent, FudState,
35};
36
37pub async fn handle_dht_events(fud: Arc<Fud>) -> Result<()> {
39 let sub = fud.dht().subscribe().await;
40 loop {
41 let event = sub.receive().await;
42
43 match event {
44 DhtEvent::ValueLookupCompleted { key, values, .. } => {
45 let mut seeders: Vec<_> = values.into_iter().flatten().collect();
46 seeders.dedup_by_key(|seeder| seeder.node.id());
47 notify_event!(fud, SeedersFound, {
48 hash: key,
49 seeders
50 });
51 }
52 DhtEvent::BootstrapCompleted => {
53 let _ = fud.init().await;
54 notify_event!(fud, Ready);
55 }
56 _ => {}
57 }
58 }
59}
60
61pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
66 loop {
67 let (hash, path, files) = fud.get_rx.recv().await.unwrap();
68
69 let mut fetch_tasks = fud.fetch_tasks.write().await;
71 let task = StoppableTask::new();
72 fetch_tasks.insert(hash, task.clone());
73 drop(fetch_tasks);
74
75 let fud_1 = fud.clone();
77 let fud_2 = fud.clone();
78 task.start(
79 async move { fud_1.fetch_resource(&hash, &path, &files).await },
80 move |res| async move {
81 let mut fetch_tasks = fud_2.fetch_tasks.write().await;
84 fetch_tasks.remove(&hash);
85
86 let lookup_tasks = fud_2.lookup_tasks.read().await;
88 if let Some(lookup_task) = lookup_tasks.get(&hash) {
89 lookup_task.stop().await;
90 }
91
92 match res {
93 Ok(()) | Err(Error::DetachedTaskStopped) => { }
94 Err(e) => {
95 error!(target: "fud::get_task()", "Error while fetching resource: {e}");
96
97 let mut resources = fud_2.resources.write().await;
99 if let Some(resource) = resources.get_mut(&hash) {
100 resource.status = ResourceStatus::Incomplete(Some(e.to_string()));
101 }
102 drop(resources);
103
104 notify_event!(fud_2, DownloadError, {
106 hash,
107 error: e.to_string(),
108 });
109 }
110 }
111 },
112 Error::DetachedTaskStopped,
113 fud.executor.clone(),
114 );
115 }
116}
117
118pub async fn put_task(fud: Arc<Fud>) -> Result<()> {
120 loop {
121 let path = fud.put_rx.recv().await.unwrap();
122
123 let mut put_tasks = fud.put_tasks.write().await;
125 let task = StoppableTask::new();
126 put_tasks.insert(path.clone(), task.clone());
127 drop(put_tasks);
128
129 let fud_1 = fud.clone();
131 let fud_2 = fud.clone();
132 let path_ = path.clone();
133 task.start(
134 async move { fud_1.insert_resource(&path_).await },
135 move |res| async move {
136 let mut put_tasks = fud_2.put_tasks.write().await;
139 put_tasks.remove(&path);
140 match res {
141 Ok(()) | Err(Error::DetachedTaskStopped) => { }
142 Err(e) => {
143 error!(target: "fud::put_task()", "Error while inserting resource: {e}");
144
145 notify_event!(fud_2, InsertError, {
147 path,
148 error: e.to_string(),
149 });
150 }
151 }
152 },
153 Error::DetachedTaskStopped,
154 fud.executor.clone(),
155 );
156 }
157}
158
159pub async fn lookup_task(fud: Arc<Fud>) -> Result<()> {
161 loop {
162 let key = fud.lookup_rx.recv().await.unwrap();
163
164 let mut lookup_tasks = fud.lookup_tasks.write().await;
165 let task = StoppableTask::new();
166 lookup_tasks.insert(key, task.clone());
167 drop(lookup_tasks);
168
169 let fud_1 = fud.clone();
170 let fud_2 = fud.clone();
171 task.start(
172 async move {
173 fud_1.dht.lookup_value(&key).await;
174 Ok(())
175 },
176 move |res| async move {
177 let mut lookup_tasks = fud_2.lookup_tasks.write().await;
180 lookup_tasks.remove(&key);
181 match res {
182 Ok(()) | Err(Error::DetachedTaskStopped) => { }
183 Err(e) => {
184 error!(target: "dht::lookup_task()", "Error in DHT lookup task: {e}");
185 }
186 }
187 },
188 Error::DetachedTaskStopped,
189 fud.executor.clone(),
190 );
191 }
192}
193
194pub async fn verify_node_task(fud: Arc<Fud>) -> Result<()> {
198 loop {
199 let node = fud.verify_node_rx.recv().await.unwrap();
200 if let Ok((channel, _)) = fud.dht.create_channel_to_node(&node).await {
201 fud.dht.cleanup_channel(channel).await;
202 }
203 }
204}
205
206pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
209 let interval = 3600; loop {
212 sleep(interval).await;
213
214 info!(target: "fud::announce_seed_task()", "Verifying seeds...");
215 let seeding_resources = match fud.verify_resources(None).await {
216 Ok(resources) => resources,
217 Err(e) => {
218 error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {e}");
219 continue;
220 }
221 };
222
223 info!(target: "fud::announce_seed_task()", "Announcing files...");
224 for resource in seeding_resources {
225 if let Ok(seeder) = fud.new_seeder(&resource.hash).await {
226 let seeders = vec![seeder];
227 let _ = fud
228 .dht
229 .announce(
230 &resource.hash,
231 &seeders.clone(),
232 &FudAnnounce { key: resource.hash, seeders },
233 )
234 .await;
235 }
236 }
237
238 info!(target: "fud::announce_seed_task()", "Pruning seeders...");
239 fud.prune_seeders(interval.try_into().unwrap()).await;
240 }
241}
242
243pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
249 let interval = 600; loop {
252 sleep(interval).await;
253
254 let mut pow = fud.pow.write().await;
255 if !pow.settings.read().await.btc_enabled {
256 continue
257 }
258
259 let btc = &mut pow.bitcoin_hash_cache;
260
261 if btc.update().await.is_err() {
262 continue
263 }
264
265 let state = fud.state.read().await;
266 if state.is_none() {
267 continue
268 }
269 let block = state.clone().unwrap().node_data.btc_block_hash;
270 drop(state);
271 let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
272 Some(i) => i < 6,
273 None => true,
274 };
275
276 if !needs_dht_reset {
277 let dht = fud.dht();
279 let mut buckets = dht.buckets.write().await;
280 for bucket in buckets.iter_mut() {
281 for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
282 if !btc.block_hashes.contains(&node.data.btc_block_hash) {
284 bucket.nodes.remove(i);
285 info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
286 }
287 }
288 }
289 drop(buckets);
290
291 let mut seeders_table = fud.dht.hash_table.write().await;
293 for (key, seeders) in seeders_table.iter_mut() {
294 for (i, seeder) in seeders.clone().iter().enumerate().rev() {
295 if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
296 seeders.remove(i);
297 info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
298 }
299 }
300 }
301
302 continue
303 }
304
305 info!(target: "fud::node_id_task()", "Creating a new node id...");
306 let (node_data, secret_key) = match pow.generate_node().await {
307 Ok(res) => res,
308 Err(e) => {
309 warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
310 continue
311 }
312 };
313 drop(pow);
314 info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
315
316 let dht = fud.dht();
318 let mut channel_cache = dht.channel_cache.write().await;
319 for channel in dht.p2p.hosts().channels().clone() {
320 channel.stop().await;
321 channel_cache.remove(&channel.info.id);
322 }
323 drop(channel_cache);
324
325 dht.reset().await;
327
328 let mut state = fud.state.write().await;
330 *state = Some(FudState { node_data, secret_key });
331
332 }
334}
335
336macro_rules! start_task {
337 ($fud:expr, $task_name:expr, $task_fn:expr, $tasks:expr) => {{
338 info!(target: "fud", "Starting {} task", $task_name);
339 let task = StoppableTask::new();
340 let fud_ = $fud.clone();
341 task.clone().start(
342 async move { $task_fn(fud_).await },
343 |res| async {
344 match res {
345 Ok(()) | Err(Error::DetachedTaskStopped) => { }
346 Err(e) => error!(target: "fud", "Failed starting {} task: {e}", $task_name),
347 }
348 },
349 Error::DetachedTaskStopped,
350 $fud.executor.clone(),
351 );
352 $tasks.insert($task_name.to_string(), task);
353 }};
354}
355pub(crate) use start_task;