fud/
proto.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use async_trait::async_trait;
20use smol::Executor;
21use std::{path::StripPrefixError, sync::Arc};
22use tracing::{debug, error, info, warn};
23
24use darkfi::{
25    dht::{event::DhtEvent, DhtHandler},
26    geode::hash_to_string,
27    impl_p2p_message,
28    net::{
29        metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
30        session::SESSION_INBOUND,
31        ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
32        ProtocolJobsManager, ProtocolJobsManagerPtr,
33    },
34    Error, Result,
35};
36use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
37use darkfi_serial::{SerialDecodable, SerialEncodable};
38
39use crate::{
40    dht::{FudNode, FudSeeder},
41    Fud,
42};
43
44/// Trait for resource-specific messages.
45/// Adds a method to get the resource's hash from the message.
46pub trait ResourceMessage {
47    fn resource_hash(&self) -> blake3::Hash;
48}
49macro_rules! impl_resource_msg {
50    ($msg:ty, $field:ident) => {
51        impl ResourceMessage for $msg {
52            fn resource_hash(&self) -> blake3::Hash {
53                self.$field
54            }
55        }
56    };
57    ($msg:ty) => {
58        impl_resource_msg!($msg, resource);
59    };
60}
61
62/// Message representing a file reply from the network
63#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
64pub struct FudFileReply {
65    pub resource: blake3::Hash,
66    pub chunk_hashes: Vec<blake3::Hash>,
67}
68impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
69impl_resource_msg!(FudFileReply);
70
71/// Message representing a directory reply from the network
72#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
73pub struct FudDirectoryReply {
74    pub resource: blake3::Hash,
75    pub chunk_hashes: Vec<blake3::Hash>,
76    pub files: Vec<(String, u64)>, // Vec of (file path, file size)
77}
78impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
79impl_resource_msg!(FudDirectoryReply);
80
81/// Message representing a node announcing a key on the network
82#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
83pub struct FudAnnounce {
84    pub key: blake3::Hash,
85    pub seeders: Vec<FudSeeder>,
86}
87impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
88
89/// Message representing a chunk reply from the network
90#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
91pub struct FudChunkReply {
92    pub resource: blake3::Hash,
93    // TODO: This should be a chunk-sized array, but then we need padding?
94    pub chunk: Vec<u8>,
95}
96impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
97impl_resource_msg!(FudChunkReply);
98
99/// Message representing a reply when a metadata is not found
100#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
101pub struct FudMetadataNotFound {
102    pub resource: blake3::Hash,
103}
104impl_p2p_message!(FudMetadataNotFound, "FudMetadataNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
105impl_resource_msg!(FudMetadataNotFound);
106
107/// Message representing a reply when a chunk is not found
108#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
109pub struct FudChunkNotFound {
110    pub resource: blake3::Hash,
111    pub chunk: blake3::Hash,
112}
113impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
114impl_resource_msg!(FudChunkNotFound);
115
116/// Message representing a ping request on the network
117#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
118pub struct FudPingRequest {
119    pub random: u64,
120}
121impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
122
123/// Message representing a ping reply on the network
124#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
125pub struct FudPingReply {
126    pub node: FudNode,
127    pub random: u64,
128    /// Signature of the random u64 from the ping request
129    pub sig: Signature,
130}
131impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
132
133/// Message representing a find file/directory request from the network
134#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
135pub struct FudMetadataRequest {
136    pub resource: blake3::Hash,
137}
138impl_p2p_message!(FudMetadataRequest, "FudMetadataRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
139impl_resource_msg!(FudMetadataRequest);
140
141/// Message representing a find chunk request from the network
142#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
143pub struct FudChunkRequest {
144    pub resource: blake3::Hash,
145    pub chunk: blake3::Hash,
146}
147impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
148impl_resource_msg!(FudChunkRequest);
149
150/// Message representing a find nodes request on the network
151#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
152pub struct FudNodesRequest {
153    pub key: blake3::Hash,
154}
155impl_p2p_message!(FudNodesRequest, "FudNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
156
157/// Message representing a find nodes reply on the network
158#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
159pub struct FudNodesReply {
160    pub key: blake3::Hash,
161    pub nodes: Vec<FudNode>,
162}
163impl_p2p_message!(FudNodesReply, "FudNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
164impl_resource_msg!(FudNodesReply, key);
165
166/// Message representing a find seeders request on the network
167#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
168pub struct FudSeedersRequest {
169    pub key: blake3::Hash,
170}
171impl_p2p_message!(FudSeedersRequest, "FudSeedersRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
172
173/// Message representing a find seeders reply on the network
174#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
175pub struct FudSeedersReply {
176    pub key: blake3::Hash,
177    pub seeders: Vec<FudSeeder>,
178    pub nodes: Vec<FudNode>,
179}
180impl_p2p_message!(FudSeedersReply, "FudSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
181impl_resource_msg!(FudSeedersReply, key);
182
183/// P2P protocol implementation for fud.
184pub struct ProtocolFud {
185    channel: ChannelPtr,
186    ping_request_sub: MessageSubscription<FudPingRequest>,
187    find_metadata_request_sub: MessageSubscription<FudMetadataRequest>,
188    find_chunk_request_sub: MessageSubscription<FudChunkRequest>,
189    find_nodes_request_sub: MessageSubscription<FudNodesRequest>,
190    find_seeders_request_sub: MessageSubscription<FudSeedersRequest>,
191    announce_sub: MessageSubscription<FudAnnounce>,
192    fud: Arc<Fud>,
193    jobsman: ProtocolJobsManagerPtr,
194}
195
196impl ProtocolFud {
197    pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
198        debug!(
199            target: "fud::proto::ProtocolFud::init()",
200            "Adding ProtocolFud to the protocol registry"
201        );
202
203        let msg_subsystem = channel.message_subsystem();
204        msg_subsystem.add_dispatch::<FudPingRequest>().await;
205        msg_subsystem.add_dispatch::<FudPingReply>().await;
206        msg_subsystem.add_dispatch::<FudMetadataRequest>().await;
207        msg_subsystem.add_dispatch::<FudChunkRequest>().await;
208        msg_subsystem.add_dispatch::<FudChunkReply>().await;
209        msg_subsystem.add_dispatch::<FudChunkNotFound>().await;
210        msg_subsystem.add_dispatch::<FudFileReply>().await;
211        msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
212        msg_subsystem.add_dispatch::<FudMetadataNotFound>().await;
213        msg_subsystem.add_dispatch::<FudNodesRequest>().await;
214        msg_subsystem.add_dispatch::<FudNodesReply>().await;
215        msg_subsystem.add_dispatch::<FudSeedersRequest>().await;
216        msg_subsystem.add_dispatch::<FudSeedersReply>().await;
217        msg_subsystem.add_dispatch::<FudAnnounce>().await;
218
219        let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
220        let find_metadata_request_sub = channel.subscribe_msg::<FudMetadataRequest>().await?;
221        let find_chunk_request_sub = channel.subscribe_msg::<FudChunkRequest>().await?;
222        let find_nodes_request_sub = channel.subscribe_msg::<FudNodesRequest>().await?;
223        let find_seeders_request_sub = channel.subscribe_msg::<FudSeedersRequest>().await?;
224        let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
225
226        Ok(Arc::new(Self {
227            channel: channel.clone(),
228            ping_request_sub,
229            find_metadata_request_sub,
230            find_chunk_request_sub,
231            find_nodes_request_sub,
232            find_seeders_request_sub,
233            announce_sub,
234            fud,
235            jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
236        }))
237    }
238
239    async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
240        debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
241
242        loop {
243            let ping_req = match self.ping_request_sub.receive().await {
244                Ok(v) => v,
245                Err(Error::ChannelStopped) => continue,
246                Err(_) => continue,
247            };
248            info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST from {}", self.channel.display_address());
249            self.fud.dht.update_channel(self.channel.info.id).await;
250
251            let self_node = self.fud.node().await;
252            if self_node.is_err() {
253                self.channel.stop().await;
254                continue
255            }
256            let state = self.fud.state.read().await;
257            if state.is_none() {
258                self.channel.stop().await;
259                continue
260            }
261
262            let reply = FudPingReply {
263                node: self_node.unwrap(),
264                random: ping_req.random,
265                sig: state.clone().unwrap().secret_key.sign(&ping_req.random.to_be_bytes()),
266            };
267            drop(state);
268
269            if let Err(e) = self.channel.send(&reply).await {
270                self.fud
271                    .dht
272                    .event_publisher
273                    .notify(DhtEvent::PingSent { to: self.channel.clone(), result: Err(e) })
274                    .await;
275                continue;
276            }
277            self.fud
278                .dht
279                .event_publisher
280                .notify(DhtEvent::PingSent { to: self.channel.clone(), result: Ok(()) })
281                .await;
282
283            // Ping the peer if this is an inbound connection
284            if self.channel.session_type_id() & SESSION_INBOUND != 0 {
285                let _ = self.fud.ping(self.channel.clone()).await;
286            }
287        }
288    }
289
290    async fn handle_fud_metadata_request(self: Arc<Self>) -> Result<()> {
291        debug!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "START");
292
293        loop {
294            let request = match self.find_metadata_request_sub.receive().await {
295                Ok(v) => v,
296                Err(Error::ChannelStopped) => continue,
297                Err(_) => continue,
298            };
299            info!(target: "fud::ProtocolFud::handle_fud_request()", "Received METADATA REQUEST for {}", hash_to_string(&request.resource));
300            self.fud.dht.update_channel(self.channel.info.id).await;
301
302            let notfound = async || {
303                let reply = FudMetadataNotFound { resource: request.resource };
304                info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "We do not have the metadata of {}", hash_to_string(&request.resource));
305                let _ = self.channel.send(&reply).await;
306            };
307
308            let path = self.fud.hash_to_path(&request.resource).ok().flatten();
309            if path.is_none() {
310                notfound().await;
311                continue
312            }
313            let path = path.unwrap();
314
315            let chunked_file = self.fud.geode.get(&request.resource, &path).await.ok();
316            if chunked_file.is_none() {
317                notfound().await;
318                continue
319            }
320            let mut chunked_file = chunked_file.unwrap();
321
322            // If it's a file with a single chunk, just reply with the chunk
323            if chunked_file.len() == 1 && !chunked_file.is_dir() {
324                let chunk_hash = chunked_file.get_chunks()[0].hash;
325                let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
326                if let Ok(chunk) = chunk {
327                    if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.resource {
328                        // TODO: Run geode GC
329                        notfound().await;
330                        continue
331                    }
332                    let reply = FudChunkReply { resource: request.resource, chunk };
333                    info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
334                    let _ = self.channel.send(&reply).await;
335                    continue
336                }
337                // We don't have the chunk, but we can still reply with the metadata
338            }
339
340            // Reply with the metadata
341            match chunked_file.is_dir() {
342                false => {
343                    let reply = FudFileReply {
344                        resource: request.resource,
345                        chunk_hashes: chunked_file.get_chunks().iter().map(|c| c.hash).collect(),
346                    };
347                    info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.resource));
348                    let _ = self.channel.send(&reply).await;
349                }
350                true => {
351                    let files = chunked_file
352                        .get_files()
353                        .iter()
354                        .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
355                            Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
356                            Err(e) => Err(e),
357                        })
358                        .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
359                    if let Err(e) = files {
360                        error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
361                        notfound().await;
362                        continue
363                    }
364                    let reply = FudDirectoryReply {
365                        resource: request.resource,
366                        chunk_hashes: chunked_file.get_chunks().iter().map(|c| c.hash).collect(),
367                        files: files.unwrap(),
368                    };
369                    info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.resource));
370                    let _ = self.channel.send(&reply).await;
371                }
372            };
373        }
374    }
375
376    async fn handle_fud_chunk_request(self: Arc<Self>) -> Result<()> {
377        debug!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "START");
378
379        loop {
380            let request = match self.find_chunk_request_sub.receive().await {
381                Ok(v) => v,
382                Err(Error::ChannelStopped) => continue,
383                Err(_) => continue,
384            };
385            info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Received CHUNK REQUEST for {}", hash_to_string(&request.resource));
386            self.fud.dht.update_channel(self.channel.info.id).await;
387
388            let notfound = async || {
389                let reply = FudChunkNotFound { resource: request.resource, chunk: request.chunk };
390                info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "We do not have chunk {} of resource {}", hash_to_string(&request.resource), hash_to_string(&request.chunk));
391                let _ = self.channel.send(&reply).await;
392            };
393
394            let path = self.fud.hash_to_path(&request.resource).ok().flatten();
395            if path.is_none() {
396                notfound().await;
397                continue
398            }
399            let path = path.unwrap();
400
401            let chunked = self.fud.geode.get(&request.resource, &path).await;
402            if chunked.is_err() {
403                notfound().await;
404                continue
405            }
406
407            let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.chunk).await;
408            if let Ok(chunk) = chunk {
409                if !self.fud.geode.verify_chunk(&request.chunk, &chunk) {
410                    // TODO: Run geode GC
411                    notfound().await;
412                    continue
413                }
414                let reply = FudChunkReply { resource: request.resource, chunk };
415                info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.chunk));
416                let _ = self.channel.send(&reply).await;
417                continue
418            }
419
420            notfound().await;
421        }
422    }
423
424    async fn handle_fud_nodes_request(self: Arc<Self>) -> Result<()> {
425        debug!(target: "fud::ProtocolFud::handle_fud_nodes_request()", "START");
426
427        loop {
428            let request = match self.find_nodes_request_sub.receive().await {
429                Ok(v) => v,
430                Err(Error::ChannelStopped) => continue,
431                Err(_) => continue,
432            };
433            info!(target: "fud::ProtocolFud::handle_fud_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
434            self.fud.dht.update_channel(self.channel.info.id).await;
435
436            let reply = FudNodesReply {
437                key: request.key,
438                nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
439            };
440            match self.channel.send(&reply).await {
441                Ok(()) => continue,
442                Err(_e) => continue,
443            }
444        }
445    }
446
447    async fn handle_fud_seeders_request(self: Arc<Self>) -> Result<()> {
448        debug!(target: "fud::ProtocolFud::handle_fud_seeders_request()", "START");
449
450        loop {
451            let request = match self.find_seeders_request_sub.receive().await {
452                Ok(v) => v,
453                Err(Error::ChannelStopped) => continue,
454                Err(_) => continue,
455            };
456            info!(target: "fud::ProtocolFud::handle_fud_seeders_request()", "Received FIND SEEDERS for {} from {:?}", hash_to_string(&request.key), self.channel);
457            self.fud.dht.update_channel(self.channel.info.id).await;
458
459            let router = self.fud.dht.hash_table.read().await;
460            let peers = router.get(&request.key);
461
462            match peers {
463                Some(seeders) => {
464                    let _ = self
465                        .channel
466                        .send(&FudSeedersReply {
467                            key: request.key,
468                            seeders: seeders.to_vec(),
469                            nodes: self
470                                .fud
471                                .dht()
472                                .find_neighbors(&request.key, self.fud.dht().settings.k)
473                                .await,
474                        })
475                        .await;
476                }
477                None => {
478                    let _ = self
479                        .channel
480                        .send(&FudSeedersReply {
481                            key: request.key,
482                            seeders: vec![],
483                            nodes: self
484                                .fud
485                                .dht()
486                                .find_neighbors(&request.key, self.fud.dht().settings.k)
487                                .await,
488                        })
489                        .await;
490                }
491            };
492        }
493    }
494
495    async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
496        debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
497
498        loop {
499            let request = match self.announce_sub.receive().await {
500                Ok(v) => v,
501                Err(Error::ChannelStopped) => continue,
502                Err(_) => continue,
503            };
504            info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
505            self.fud.dht.update_channel(self.channel.info.id).await;
506
507            let mut seeders = vec![];
508
509            for seeder in request.seeders.clone() {
510                if seeder.node.addresses.is_empty() {
511                    continue
512                }
513                if let Err(e) = self.fud.pow.write().await.verify_node(&seeder.node.data).await {
514                    warn!(target: "fud::ProtocolFud::handle_fud_announce()", "Received seeder with invalid PoW: {e}");
515                    continue
516                }
517                if !seeder.verify_signature().await {
518                    warn!(target: "fud::ProtocolFud::handle_fud_announce()", "Received seeder with invalid signature");
519                    continue
520                }
521
522                // TODO: Limit the number of addresses
523                // TODO: Verify each address
524                seeders.push(seeder);
525            }
526
527            self.fud.add_value(&request.key, &seeders).await;
528        }
529    }
530}
531
532#[async_trait]
533impl ProtocolBase for ProtocolFud {
534    async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
535        debug!(target: "fud::ProtocolFud::start()", "START");
536        self.jobsman.clone().start(executor.clone());
537        self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
538        self.jobsman
539            .clone()
540            .spawn(self.clone().handle_fud_metadata_request(), executor.clone())
541            .await;
542        self.jobsman.clone().spawn(self.clone().handle_fud_chunk_request(), executor.clone()).await;
543        self.jobsman.clone().spawn(self.clone().handle_fud_nodes_request(), executor.clone()).await;
544        self.jobsman
545            .clone()
546            .spawn(self.clone().handle_fud_seeders_request(), executor.clone())
547            .await;
548        self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
549        debug!(target: "fud::ProtocolFud::start()", "END");
550        Ok(())
551    }
552
553    fn name(&self) -> &'static str {
554        "ProtocolFud"
555    }
556}