1use async_trait::async_trait;
20use smol::Executor;
21use std::{path::StripPrefixError, sync::Arc};
22use tracing::{debug, error, info, warn};
23
24use darkfi::{
25 dht::{event::DhtEvent, DhtHandler},
26 geode::hash_to_string,
27 impl_p2p_message,
28 net::{
29 metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
30 session::SESSION_INBOUND,
31 ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
32 ProtocolJobsManager, ProtocolJobsManagerPtr,
33 },
34 Error, Result,
35};
36use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
37use darkfi_serial::{SerialDecodable, SerialEncodable};
38
39use crate::{
40 dht::{FudNode, FudSeeder},
41 Fud,
42};
43
44pub trait ResourceMessage {
47 fn resource_hash(&self) -> blake3::Hash;
48}
49macro_rules! impl_resource_msg {
50 ($msg:ty, $field:ident) => {
51 impl ResourceMessage for $msg {
52 fn resource_hash(&self) -> blake3::Hash {
53 self.$field
54 }
55 }
56 };
57 ($msg:ty) => {
58 impl_resource_msg!($msg, resource);
59 };
60}
61
62#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
64pub struct FudFileReply {
65 pub resource: blake3::Hash,
66 pub chunk_hashes: Vec<blake3::Hash>,
67}
68impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
69impl_resource_msg!(FudFileReply);
70
71#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
73pub struct FudDirectoryReply {
74 pub resource: blake3::Hash,
75 pub chunk_hashes: Vec<blake3::Hash>,
76 pub files: Vec<(String, u64)>, }
78impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
79impl_resource_msg!(FudDirectoryReply);
80
81#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
83pub struct FudAnnounce {
84 pub key: blake3::Hash,
85 pub seeders: Vec<FudSeeder>,
86}
87impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
88
89#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
91pub struct FudChunkReply {
92 pub resource: blake3::Hash,
93 pub chunk: Vec<u8>,
95}
96impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
97impl_resource_msg!(FudChunkReply);
98
99#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
101pub struct FudMetadataNotFound {
102 pub resource: blake3::Hash,
103}
104impl_p2p_message!(FudMetadataNotFound, "FudMetadataNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
105impl_resource_msg!(FudMetadataNotFound);
106
107#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
109pub struct FudChunkNotFound {
110 pub resource: blake3::Hash,
111 pub chunk: blake3::Hash,
112}
113impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
114impl_resource_msg!(FudChunkNotFound);
115
116#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
118pub struct FudPingRequest {
119 pub random: u64,
120}
121impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
122
123#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
125pub struct FudPingReply {
126 pub node: FudNode,
127 pub random: u64,
128 pub sig: Signature,
130}
131impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
132
133#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
135pub struct FudMetadataRequest {
136 pub resource: blake3::Hash,
137}
138impl_p2p_message!(FudMetadataRequest, "FudMetadataRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
139impl_resource_msg!(FudMetadataRequest);
140
141#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
143pub struct FudChunkRequest {
144 pub resource: blake3::Hash,
145 pub chunk: blake3::Hash,
146}
147impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
148impl_resource_msg!(FudChunkRequest);
149
150#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
152pub struct FudNodesRequest {
153 pub key: blake3::Hash,
154}
155impl_p2p_message!(FudNodesRequest, "FudNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
156
157#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
159pub struct FudNodesReply {
160 pub key: blake3::Hash,
161 pub nodes: Vec<FudNode>,
162}
163impl_p2p_message!(FudNodesReply, "FudNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
164impl_resource_msg!(FudNodesReply, key);
165
166#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
168pub struct FudSeedersRequest {
169 pub key: blake3::Hash,
170}
171impl_p2p_message!(FudSeedersRequest, "FudSeedersRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
172
173#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
175pub struct FudSeedersReply {
176 pub key: blake3::Hash,
177 pub seeders: Vec<FudSeeder>,
178 pub nodes: Vec<FudNode>,
179}
180impl_p2p_message!(FudSeedersReply, "FudSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
181impl_resource_msg!(FudSeedersReply, key);
182
183pub struct ProtocolFud {
185 channel: ChannelPtr,
186 ping_request_sub: MessageSubscription<FudPingRequest>,
187 find_metadata_request_sub: MessageSubscription<FudMetadataRequest>,
188 find_chunk_request_sub: MessageSubscription<FudChunkRequest>,
189 find_nodes_request_sub: MessageSubscription<FudNodesRequest>,
190 find_seeders_request_sub: MessageSubscription<FudSeedersRequest>,
191 announce_sub: MessageSubscription<FudAnnounce>,
192 fud: Arc<Fud>,
193 jobsman: ProtocolJobsManagerPtr,
194}
195
196impl ProtocolFud {
197 pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
198 debug!(
199 target: "fud::proto::ProtocolFud::init()",
200 "Adding ProtocolFud to the protocol registry"
201 );
202
203 let msg_subsystem = channel.message_subsystem();
204 msg_subsystem.add_dispatch::<FudPingRequest>().await;
205 msg_subsystem.add_dispatch::<FudPingReply>().await;
206 msg_subsystem.add_dispatch::<FudMetadataRequest>().await;
207 msg_subsystem.add_dispatch::<FudChunkRequest>().await;
208 msg_subsystem.add_dispatch::<FudChunkReply>().await;
209 msg_subsystem.add_dispatch::<FudChunkNotFound>().await;
210 msg_subsystem.add_dispatch::<FudFileReply>().await;
211 msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
212 msg_subsystem.add_dispatch::<FudMetadataNotFound>().await;
213 msg_subsystem.add_dispatch::<FudNodesRequest>().await;
214 msg_subsystem.add_dispatch::<FudNodesReply>().await;
215 msg_subsystem.add_dispatch::<FudSeedersRequest>().await;
216 msg_subsystem.add_dispatch::<FudSeedersReply>().await;
217 msg_subsystem.add_dispatch::<FudAnnounce>().await;
218
219 let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
220 let find_metadata_request_sub = channel.subscribe_msg::<FudMetadataRequest>().await?;
221 let find_chunk_request_sub = channel.subscribe_msg::<FudChunkRequest>().await?;
222 let find_nodes_request_sub = channel.subscribe_msg::<FudNodesRequest>().await?;
223 let find_seeders_request_sub = channel.subscribe_msg::<FudSeedersRequest>().await?;
224 let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
225
226 Ok(Arc::new(Self {
227 channel: channel.clone(),
228 ping_request_sub,
229 find_metadata_request_sub,
230 find_chunk_request_sub,
231 find_nodes_request_sub,
232 find_seeders_request_sub,
233 announce_sub,
234 fud,
235 jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
236 }))
237 }
238
239 async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
240 debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
241
242 loop {
243 let ping_req = match self.ping_request_sub.receive().await {
244 Ok(v) => v,
245 Err(Error::ChannelStopped) => continue,
246 Err(_) => continue,
247 };
248 info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST from {}", self.channel.display_address());
249 self.fud.dht.update_channel(self.channel.info.id).await;
250
251 let self_node = self.fud.node().await;
252 if self_node.is_err() {
253 self.channel.stop().await;
254 continue
255 }
256 let state = self.fud.state.read().await;
257 if state.is_none() {
258 self.channel.stop().await;
259 continue
260 }
261
262 let reply = FudPingReply {
263 node: self_node.unwrap(),
264 random: ping_req.random,
265 sig: state.clone().unwrap().secret_key.sign(&ping_req.random.to_be_bytes()),
266 };
267 drop(state);
268
269 if let Err(e) = self.channel.send(&reply).await {
270 self.fud
271 .dht
272 .event_publisher
273 .notify(DhtEvent::PingSent { to: self.channel.clone(), result: Err(e) })
274 .await;
275 continue;
276 }
277 self.fud
278 .dht
279 .event_publisher
280 .notify(DhtEvent::PingSent { to: self.channel.clone(), result: Ok(()) })
281 .await;
282
283 if self.channel.session_type_id() & SESSION_INBOUND != 0 {
285 let _ = self.fud.ping(self.channel.clone()).await;
286 }
287 }
288 }
289
290 async fn handle_fud_metadata_request(self: Arc<Self>) -> Result<()> {
291 debug!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "START");
292
293 loop {
294 let request = match self.find_metadata_request_sub.receive().await {
295 Ok(v) => v,
296 Err(Error::ChannelStopped) => continue,
297 Err(_) => continue,
298 };
299 info!(target: "fud::ProtocolFud::handle_fud_request()", "Received METADATA REQUEST for {}", hash_to_string(&request.resource));
300 self.fud.dht.update_channel(self.channel.info.id).await;
301
302 let notfound = async || {
303 let reply = FudMetadataNotFound { resource: request.resource };
304 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "We do not have the metadata of {}", hash_to_string(&request.resource));
305 let _ = self.channel.send(&reply).await;
306 };
307
308 let path = self.fud.hash_to_path(&request.resource).ok().flatten();
309 if path.is_none() {
310 notfound().await;
311 continue
312 }
313 let path = path.unwrap();
314
315 let chunked_file = self.fud.geode.get(&request.resource, &path).await.ok();
316 if chunked_file.is_none() {
317 notfound().await;
318 continue
319 }
320 let mut chunked_file = chunked_file.unwrap();
321
322 if chunked_file.len() == 1 && !chunked_file.is_dir() {
324 let chunk_hash = chunked_file.get_chunks()[0].hash;
325 let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
326 if let Ok(chunk) = chunk {
327 if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.resource {
328 notfound().await;
330 continue
331 }
332 let reply = FudChunkReply { resource: request.resource, chunk };
333 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
334 let _ = self.channel.send(&reply).await;
335 continue
336 }
337 }
339
340 match chunked_file.is_dir() {
342 false => {
343 let reply = FudFileReply {
344 resource: request.resource,
345 chunk_hashes: chunked_file.get_chunks().iter().map(|c| c.hash).collect(),
346 };
347 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.resource));
348 let _ = self.channel.send(&reply).await;
349 }
350 true => {
351 let files = chunked_file
352 .get_files()
353 .iter()
354 .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
355 Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
356 Err(e) => Err(e),
357 })
358 .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
359 if let Err(e) = files {
360 error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
361 notfound().await;
362 continue
363 }
364 let reply = FudDirectoryReply {
365 resource: request.resource,
366 chunk_hashes: chunked_file.get_chunks().iter().map(|c| c.hash).collect(),
367 files: files.unwrap(),
368 };
369 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.resource));
370 let _ = self.channel.send(&reply).await;
371 }
372 };
373 }
374 }
375
376 async fn handle_fud_chunk_request(self: Arc<Self>) -> Result<()> {
377 debug!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "START");
378
379 loop {
380 let request = match self.find_chunk_request_sub.receive().await {
381 Ok(v) => v,
382 Err(Error::ChannelStopped) => continue,
383 Err(_) => continue,
384 };
385 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Received CHUNK REQUEST for {}", hash_to_string(&request.resource));
386 self.fud.dht.update_channel(self.channel.info.id).await;
387
388 let notfound = async || {
389 let reply = FudChunkNotFound { resource: request.resource, chunk: request.chunk };
390 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "We do not have chunk {} of resource {}", hash_to_string(&request.resource), hash_to_string(&request.chunk));
391 let _ = self.channel.send(&reply).await;
392 };
393
394 let path = self.fud.hash_to_path(&request.resource).ok().flatten();
395 if path.is_none() {
396 notfound().await;
397 continue
398 }
399 let path = path.unwrap();
400
401 let chunked = self.fud.geode.get(&request.resource, &path).await;
402 if chunked.is_err() {
403 notfound().await;
404 continue
405 }
406
407 let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.chunk).await;
408 if let Ok(chunk) = chunk {
409 if !self.fud.geode.verify_chunk(&request.chunk, &chunk) {
410 notfound().await;
412 continue
413 }
414 let reply = FudChunkReply { resource: request.resource, chunk };
415 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.chunk));
416 let _ = self.channel.send(&reply).await;
417 continue
418 }
419
420 notfound().await;
421 }
422 }
423
424 async fn handle_fud_nodes_request(self: Arc<Self>) -> Result<()> {
425 debug!(target: "fud::ProtocolFud::handle_fud_nodes_request()", "START");
426
427 loop {
428 let request = match self.find_nodes_request_sub.receive().await {
429 Ok(v) => v,
430 Err(Error::ChannelStopped) => continue,
431 Err(_) => continue,
432 };
433 info!(target: "fud::ProtocolFud::handle_fud_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
434 self.fud.dht.update_channel(self.channel.info.id).await;
435
436 let reply = FudNodesReply {
437 key: request.key,
438 nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
439 };
440 match self.channel.send(&reply).await {
441 Ok(()) => continue,
442 Err(_e) => continue,
443 }
444 }
445 }
446
447 async fn handle_fud_seeders_request(self: Arc<Self>) -> Result<()> {
448 debug!(target: "fud::ProtocolFud::handle_fud_seeders_request()", "START");
449
450 loop {
451 let request = match self.find_seeders_request_sub.receive().await {
452 Ok(v) => v,
453 Err(Error::ChannelStopped) => continue,
454 Err(_) => continue,
455 };
456 info!(target: "fud::ProtocolFud::handle_fud_seeders_request()", "Received FIND SEEDERS for {} from {:?}", hash_to_string(&request.key), self.channel);
457 self.fud.dht.update_channel(self.channel.info.id).await;
458
459 let router = self.fud.dht.hash_table.read().await;
460 let peers = router.get(&request.key);
461
462 match peers {
463 Some(seeders) => {
464 let _ = self
465 .channel
466 .send(&FudSeedersReply {
467 key: request.key,
468 seeders: seeders.to_vec(),
469 nodes: self
470 .fud
471 .dht()
472 .find_neighbors(&request.key, self.fud.dht().settings.k)
473 .await,
474 })
475 .await;
476 }
477 None => {
478 let _ = self
479 .channel
480 .send(&FudSeedersReply {
481 key: request.key,
482 seeders: vec![],
483 nodes: self
484 .fud
485 .dht()
486 .find_neighbors(&request.key, self.fud.dht().settings.k)
487 .await,
488 })
489 .await;
490 }
491 };
492 }
493 }
494
495 async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
496 debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
497
498 loop {
499 let request = match self.announce_sub.receive().await {
500 Ok(v) => v,
501 Err(Error::ChannelStopped) => continue,
502 Err(_) => continue,
503 };
504 info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
505 self.fud.dht.update_channel(self.channel.info.id).await;
506
507 let mut seeders = vec![];
508
509 for seeder in request.seeders.clone() {
510 if seeder.node.addresses.is_empty() {
511 continue
512 }
513 if let Err(e) = self.fud.pow.write().await.verify_node(&seeder.node.data).await {
514 warn!(target: "fud::ProtocolFud::handle_fud_announce()", "Received seeder with invalid PoW: {e}");
515 continue
516 }
517 if !seeder.verify_signature().await {
518 warn!(target: "fud::ProtocolFud::handle_fud_announce()", "Received seeder with invalid signature");
519 continue
520 }
521
522 seeders.push(seeder);
525 }
526
527 self.fud.add_value(&request.key, &seeders).await;
528 }
529 }
530}
531
532#[async_trait]
533impl ProtocolBase for ProtocolFud {
534 async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
535 debug!(target: "fud::ProtocolFud::start()", "START");
536 self.jobsman.clone().start(executor.clone());
537 self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
538 self.jobsman
539 .clone()
540 .spawn(self.clone().handle_fud_metadata_request(), executor.clone())
541 .await;
542 self.jobsman.clone().spawn(self.clone().handle_fud_chunk_request(), executor.clone()).await;
543 self.jobsman.clone().spawn(self.clone().handle_fud_nodes_request(), executor.clone()).await;
544 self.jobsman
545 .clone()
546 .spawn(self.clone().handle_fud_seeders_request(), executor.clone())
547 .await;
548 self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
549 debug!(target: "fud::ProtocolFud::start()", "END");
550 Ok(())
551 }
552
553 fn name(&self) -> &'static str {
554 "ProtocolFud"
555 }
556}