darkfid/proto/
protocol_sync.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use tracing::{debug, error};
23
24use darkfi::{
25    blockchain::{BlockInfo, Header, HeaderHash},
26    impl_p2p_message,
27    net::{
28        metering::MeteringConfiguration,
29        protocol::protocol_generic::{
30            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
31        },
32        session::SESSION_DEFAULT,
33        Message, P2pPtr,
34    },
35    system::ExecutorPtr,
36    util::time::NanoTimestamp,
37    validator::{consensus::Proposal, ValidatorPtr},
38    Error, Result,
39};
40use darkfi_serial::{SerialDecodable, SerialEncodable};
41
42// Constant defining max elements we send in vectors during syncing.
43pub const BATCH: usize = 20;
44
45// TODO: Fine tune
46// Protocol metering configuration.
47// Since all messages are synchronous(request -> response) we will define
48// strict rules to prevent spamming.
49// Each message score will be 1, with a threshold of 20 and expiry time of 5.
50// Check ../tests/metering.rs for each message max bytes definition.
51const PROTOCOL_SYNC_METERING_CONFIGURATION: MeteringConfiguration = MeteringConfiguration {
52    threshold: 20,
53    sleep_step: 500,
54    expiry_time: NanoTimestamp::from_secs(5),
55};
56
57/// Structure represening a request to ask a node for their current
58/// canonical(confirmed) tip block hash, if they are synced. We also
59/// include our own tip, so they can verify we follow the same sequence.
60#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
61pub struct TipRequest {
62    /// Canonical(confirmed) tip block hash
63    pub tip: HeaderHash,
64}
65
66impl_p2p_message!(TipRequest, "tiprequest", 32, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
67
68/// Structure representing the response to `TipRequest`,
69/// containing a boolean flag to indicate if we are synced,
70/// and our canonical(confirmed) tip block height and hash.
71#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
72pub struct TipResponse {
73    /// Flag indicating the node is synced
74    pub synced: bool,
75    /// Canonical(confirmed) tip block height
76    pub height: Option<u32>,
77    /// Canonical(confirmed) tip block hash
78    pub hash: Option<HeaderHash>,
79}
80
81impl_p2p_message!(TipResponse, "tipresponse", 39, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
82
83/// Structure represening a request to ask a node for up to `BATCH` headers before
84/// the provided header height.
85#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
86pub struct HeaderSyncRequest {
87    /// Header height
88    pub height: u32,
89}
90
91impl_p2p_message!(
92    HeaderSyncRequest,
93    "headersyncrequest",
94    4,
95    1,
96    PROTOCOL_SYNC_METERING_CONFIGURATION
97);
98
99/// Structure representing the response to `HeaderSyncRequest`,
100/// containing up to `BATCH` headers before the requested block height.
101#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
102pub struct HeaderSyncResponse {
103    /// Response headers
104    pub headers: Vec<Header>,
105}
106
107impl_p2p_message!(
108    HeaderSyncResponse,
109    "headersyncresponse",
110    72121, // We leave some headroom for merge mining data
111    1,
112    PROTOCOL_SYNC_METERING_CONFIGURATION
113);
114
115/// Structure represening a request to ask a node for up to`BATCH` blocks
116/// of provided headers.
117#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
118pub struct SyncRequest {
119    /// Header hashes
120    pub headers: Vec<HeaderHash>,
121}
122
123impl_p2p_message!(SyncRequest, "syncrequest", 641, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
124
125/// Structure representing the response to `SyncRequest`,
126/// containing up to `BATCH` blocks after the requested block height.
127#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
128pub struct SyncResponse {
129    /// Response blocks
130    pub blocks: Vec<BlockInfo>,
131}
132
133impl_p2p_message!(SyncResponse, "syncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
134
135/// Structure represening a request to ask a node a fork sequence.
136/// If we include a specific fork tip, they have to return its sequence,
137/// otherwise they respond with their best fork sequence.
138/// We also include our own canonical(confirmed) tip, so they can verify
139/// we follow the same sequence.
140#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
141pub struct ForkSyncRequest {
142    /// Canonical(confirmed) tip block hash
143    pub tip: HeaderHash,
144    /// Optional fork tip block hash
145    pub fork_tip: Option<HeaderHash>,
146}
147
148impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 65, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
149
150/// Structure representing the response to `ForkSyncRequest`,
151/// containing the requested fork sequence, up to `BATCH` proposals.
152#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
153pub struct ForkSyncResponse {
154    /// Response fork proposals
155    pub proposals: Vec<Proposal>,
156}
157
158impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
159
160/// Structure represening a request to ask a node a fork header for the
161/// requested height. The fork is identified by the provided header hash.
162#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
163pub struct ForkHeaderHashRequest {
164    /// Header height
165    pub height: u32,
166    /// Block header hash to identify the fork
167    pub fork_header: HeaderHash,
168}
169
170impl_p2p_message!(
171    ForkHeaderHashRequest,
172    "forkheaderhashrequest",
173    36,
174    1,
175    PROTOCOL_SYNC_METERING_CONFIGURATION
176);
177
178/// Structure representing the response to `ForkHeaderHashRequest`,
179/// containing the requested fork header hash, if it was found.
180#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
181pub struct ForkHeaderHashResponse {
182    /// Response fork block header hash
183    pub fork_header: Option<HeaderHash>,
184}
185
186impl_p2p_message!(
187    ForkHeaderHashResponse,
188    "forkheaderhashresponse",
189    33,
190    1,
191    PROTOCOL_SYNC_METERING_CONFIGURATION
192);
193
194/// Structure represening a request to ask a node for up to `BATCH`
195/// fork headers for provided header hashes.  The fork is identified
196/// by the provided header hash.
197#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
198pub struct ForkHeadersRequest {
199    /// Header hashes
200    pub headers: Vec<HeaderHash>,
201    /// Block header hash to identify the fork
202    pub fork_header: HeaderHash,
203}
204
205impl_p2p_message!(
206    ForkHeadersRequest,
207    "forkheadersrequest",
208    673,
209    1,
210    PROTOCOL_SYNC_METERING_CONFIGURATION
211);
212
213/// Structure representing the response to `ForkHeadersRequest`,
214/// containing up to `BATCH` fork headers.
215#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
216pub struct ForkHeadersResponse {
217    /// Response headers
218    pub headers: Vec<Header>,
219}
220
221impl_p2p_message!(
222    ForkHeadersResponse,
223    "forkheadersresponse",
224    72121, // We leave some headroom for merge mining data
225    1,
226    PROTOCOL_SYNC_METERING_CONFIGURATION
227);
228
229/// Structure represening a request to ask a node for up to `BATCH`
230/// fork proposals for provided header hashes.  The fork is identified
231/// by the provided header hash.
232#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
233pub struct ForkProposalsRequest {
234    /// Header hashes
235    pub headers: Vec<HeaderHash>,
236    /// Block header hash to identify the fork
237    pub fork_header: HeaderHash,
238}
239
240impl_p2p_message!(
241    ForkProposalsRequest,
242    "forkproposalsrequest",
243    673,
244    1,
245    PROTOCOL_SYNC_METERING_CONFIGURATION
246);
247
248/// Structure representing the response to `ForkProposalsRequest`,
249/// containing up to `BATCH` fork headers.
250#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
251pub struct ForkProposalsResponse {
252    /// Response proposals
253    pub proposals: Vec<Proposal>,
254}
255
256impl_p2p_message!(
257    ForkProposalsResponse,
258    "forkproposalsresponse",
259    0,
260    1,
261    PROTOCOL_SYNC_METERING_CONFIGURATION
262);
263
264/// Atomic pointer to the `ProtocolSync` handler.
265pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
266
267/// Handler managing all `ProtocolSync` messages, over generic P2P protocols.
268pub struct ProtocolSyncHandler {
269    /// The generic handler for `TipRequest` messages.
270    tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
271    /// The generic handler for `HeaderSyncRequest` messages.
272    header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
273    /// The generic handler for `SyncRequest` messages.
274    sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
275    /// The generic handler for `ForkSyncRequest` messages.
276    fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
277    /// The generic handler for `ForkHeaderHashRequest` messages.
278    fork_header_hash_handler:
279        ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
280    /// The generic handler for `ForkHeadersRequest` messages.
281    fork_headers_handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
282    /// The generic handler for `ForkProposalsRequest` messages.
283    fork_proposals_handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
284}
285
286impl ProtocolSyncHandler {
287    /// Initialize the generic prototocol handlers for all `ProtocolSync` messages
288    /// and register them to the provided P2P network, using the default session flag.
289    pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
290        debug!(
291            target: "darkfid::proto::protocol_sync::init",
292            "Adding all sync protocols to the protocol registry"
293        );
294
295        let tip_handler =
296            ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
297        let header_handler =
298            ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
299        let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
300        let fork_sync_handler =
301            ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
302        let fork_header_hash_handler =
303            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaderHash", SESSION_DEFAULT).await;
304        let fork_headers_handler =
305            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaders", SESSION_DEFAULT).await;
306        let fork_proposals_handler =
307            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkProposals", SESSION_DEFAULT).await;
308
309        Arc::new(Self {
310            tip_handler,
311            header_handler,
312            sync_handler,
313            fork_sync_handler,
314            fork_header_hash_handler,
315            fork_headers_handler,
316            fork_proposals_handler,
317        })
318    }
319
320    /// Start all `ProtocolSync` background tasks.
321    pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
322        debug!(
323            target: "darkfid::proto::protocol_sync::start",
324            "Starting sync protocols handlers tasks..."
325        );
326
327        self.tip_handler.task.clone().start(
328            handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
329            |res| async move {
330                match res {
331                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
332                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
333                }
334            },
335            Error::DetachedTaskStopped,
336            executor.clone(),
337        );
338
339        self.header_handler.task.clone().start(
340            handle_receive_header_request(self.header_handler.clone(), validator.clone()),
341            |res| async move {
342                match res {
343                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
344                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
345                }
346            },
347            Error::DetachedTaskStopped,
348            executor.clone(),
349        );
350
351        self.sync_handler.task.clone().start(
352            handle_receive_request(self.sync_handler.clone(), validator.clone()),
353            |res| async move {
354                match res {
355                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
356                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
357                }
358            },
359            Error::DetachedTaskStopped,
360            executor.clone(),
361        );
362
363        self.fork_sync_handler.task.clone().start(
364            handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
365            |res| async move {
366                match res {
367                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
368                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
369                }
370            },
371            Error::DetachedTaskStopped,
372            executor.clone(),
373        );
374
375        self.fork_header_hash_handler.task.clone().start(
376            handle_receive_fork_header_hash_request(self.fork_header_hash_handler.clone(), validator.clone()),
377            |res| async move {
378                match res {
379                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
380                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaderHash handler task: {e}"),
381                }
382            },
383            Error::DetachedTaskStopped,
384            executor.clone(),
385        );
386
387        self.fork_headers_handler.task.clone().start(
388            handle_receive_fork_headers_request(self.fork_headers_handler.clone(), validator.clone()),
389            |res| async move {
390                match res {
391                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
392                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaders handler task: {e}"),
393                }
394            },
395            Error::DetachedTaskStopped,
396            executor.clone(),
397        );
398
399        self.fork_proposals_handler.task.clone().start(
400            handle_receive_fork_proposals_request(self.fork_proposals_handler.clone(), validator.clone()),
401            |res| async move {
402                match res {
403                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
404                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkProposals handler task: {e}"),
405                }
406            },
407            Error::DetachedTaskStopped,
408            executor.clone(),
409        );
410
411        debug!(
412            target: "darkfid::proto::protocol_sync::start",
413            "Sync protocols handlers tasks started!"
414        );
415
416        Ok(())
417    }
418
419    /// Stop all `ProtocolSync` background tasks.
420    pub async fn stop(&self) {
421        debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
422        self.tip_handler.task.stop().await;
423        self.header_handler.task.stop().await;
424        self.sync_handler.task.stop().await;
425        self.fork_sync_handler.task.stop().await;
426        self.fork_header_hash_handler.task.stop().await;
427        self.fork_headers_handler.task.stop().await;
428        self.fork_proposals_handler.task.stop().await;
429        debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
430    }
431}
432
433/// Background handler function for ProtocolSyncTip.
434async fn handle_receive_tip_request(
435    handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
436    validator: ValidatorPtr,
437) -> Result<()> {
438    debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
439    loop {
440        // Wait for a new tip request message
441        let (channel, request) = match handler.receiver.recv().await {
442            Ok(r) => r,
443            Err(e) => {
444                debug!(
445                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
446                    "recv fail: {e}"
447                );
448                continue
449            }
450        };
451
452        debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "Received request: {request:?}");
453
454        // Check if node has finished syncing its blockchain
455        let validator = validator.read().await;
456        if !validator.synced {
457            debug!(
458                target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
459                "Node still syncing blockchain"
460            );
461            handler
462                .send_action(
463                    channel,
464                    ProtocolGenericAction::Response(TipResponse {
465                        synced: false,
466                        height: None,
467                        hash: None,
468                    }),
469                )
470                .await;
471            continue
472        }
473
474        // Check we follow the same sequence
475        match validator.blockchain.blocks.contains(&request.tip) {
476            Ok(contains) => {
477                if !contains {
478                    debug!(
479                        target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
480                        "Node doesn't follow request sequence"
481                    );
482                    handler
483                        .send_action(
484                            channel,
485                            ProtocolGenericAction::Response(TipResponse {
486                                synced: true,
487                                height: None,
488                                hash: None,
489                            }),
490                        )
491                        .await;
492                    continue
493                }
494            }
495            Err(e) => {
496                error!(
497                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
498                    "block_store.contains fail: {e}"
499                );
500                handler.send_action(channel, ProtocolGenericAction::Skip).await;
501                continue
502            }
503        }
504
505        // Grab our current tip and return it
506        let tip = match validator.blockchain.last() {
507            Ok(v) => v,
508            Err(e) => {
509                error!(
510                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
511                    "blockchain.last fail: {e}"
512                );
513                handler.send_action(channel, ProtocolGenericAction::Skip).await;
514                continue
515            }
516        };
517
518        // Send response
519        handler
520            .send_action(
521                channel,
522                ProtocolGenericAction::Response(TipResponse {
523                    synced: true,
524                    height: Some(tip.0),
525                    hash: Some(tip.1),
526                }),
527            )
528            .await;
529    }
530}
531
532/// Background handler function for ProtocolSyncHeader.
533async fn handle_receive_header_request(
534    handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
535    validator: ValidatorPtr,
536) -> Result<()> {
537    debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
538    loop {
539        // Wait for a new header request message
540        let (channel, request) = match handler.receiver.recv().await {
541            Ok(r) => r,
542            Err(e) => {
543                debug!(
544                    target: "darkfid::proto::protocol_sync::handle_receive_header_request",
545                    "recv fail: {e}"
546                );
547                continue
548            }
549        };
550
551        // Check if node has finished syncing its blockchain
552        let validator = validator.read().await;
553        if !validator.synced {
554            debug!(
555                target: "darkfid::proto::protocol_sync::handle_receive_header_request",
556                "Node still syncing blockchain, skipping..."
557            );
558            handler.send_action(channel, ProtocolGenericAction::Skip).await;
559            continue
560        }
561
562        debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "Received request: {request:?}");
563
564        // Grab the corresponding headers
565        let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
566            Ok(v) => v,
567            Err(e) => {
568                error!(
569                    target: "darkfid::proto::protocol_sync::handle_receive_header_request",
570                    "get_headers_before fail: {e}"
571                );
572                handler.send_action(channel, ProtocolGenericAction::Skip).await;
573                continue
574            }
575        };
576
577        // Send response
578        handler
579            .send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
580            .await;
581    }
582}
583
584/// Background handler function for ProtocolSync.
585async fn handle_receive_request(
586    handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
587    validator: ValidatorPtr,
588) -> Result<()> {
589    debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
590    loop {
591        // Wait for a new sync request message
592        let (channel, request) = match handler.receiver.recv().await {
593            Ok(r) => r,
594            Err(e) => {
595                debug!(
596                    target: "darkfid::proto::protocol_sync::handle_receive_request",
597                    "recv fail: {e}"
598                );
599                continue
600            }
601        };
602
603        // Check if node has finished syncing its blockchain
604        let validator = validator.read().await;
605        if !validator.synced {
606            debug!(
607                target: "darkfid::proto::protocol_sync::handle_receive_request",
608                "Node still syncing blockchain, skipping..."
609            );
610            handler.send_action(channel, ProtocolGenericAction::Skip).await;
611            continue
612        }
613
614        // Check if request exists the configured limit
615        if request.headers.len() > BATCH {
616            debug!(
617                target: "darkfid::proto::protocol_sync::handle_receive_request",
618                "Node requested more blocks than allowed."
619            );
620            handler.send_action(channel, ProtocolGenericAction::Skip).await;
621            continue
622        }
623
624        debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}");
625
626        // Grab the corresponding blocks
627        let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
628            Ok(v) => v,
629            Err(e) => {
630                error!(
631                    target: "darkfid::proto::protocol_sync::handle_receive_request",
632                    "get_blocks_after fail: {e}"
633                );
634                handler.send_action(channel, ProtocolGenericAction::Skip).await;
635                continue
636            }
637        };
638
639        // Send response
640        handler
641            .send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
642            .await;
643    }
644}
645
646/// Background handler function for ProtocolSyncFork.
647async fn handle_receive_fork_request(
648    handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
649    validator: ValidatorPtr,
650) -> Result<()> {
651    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
652    loop {
653        // Wait for a new fork sync request message
654        let (channel, request) = match handler.receiver.recv().await {
655            Ok(r) => r,
656            Err(e) => {
657                debug!(
658                    target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
659                    "recv fail: {e}"
660                );
661                continue
662            }
663        };
664
665        // Check if node has finished syncing its blockchain
666        let validator = validator.read().await;
667        if !validator.synced {
668            debug!(
669                target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
670                "Node still syncing blockchain, skipping..."
671            );
672            handler.send_action(channel, ProtocolGenericAction::Skip).await;
673            continue
674        }
675
676        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
677
678        // Retrieve proposals sequence
679        let proposals = match validator
680            .consensus
681            .get_fork_proposals_after(request.tip, request.fork_tip, BATCH as u32)
682            .await
683        {
684            Ok(p) => p,
685            Err(e) => {
686                debug!(
687                    target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
688                    "Getting fork proposals failed: {e}"
689                );
690                handler.send_action(channel, ProtocolGenericAction::Skip).await;
691                continue
692            }
693        };
694
695        // Send response
696        handler
697            .send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
698            .await;
699    }
700}
701
702/// Background handler function for ProtocolSyncForkHeaderHash.
703async fn handle_receive_fork_header_hash_request(
704    handler: ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
705    validator: ValidatorPtr,
706) -> Result<()> {
707    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "START");
708    loop {
709        // Wait for a new fork header hash request message
710        let (channel, request) = match handler.receiver.recv().await {
711            Ok(r) => r,
712            Err(e) => {
713                debug!(
714                    target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
715                    "recv fail: {e}"
716                );
717                continue
718            }
719        };
720
721        // Check if node has finished syncing its blockchain
722        let validator = validator.read().await;
723        if !validator.synced {
724            debug!(
725                target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
726                "Node still syncing blockchain, skipping..."
727            );
728            handler.send_action(channel, ProtocolGenericAction::Skip).await;
729            continue
730        }
731
732        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "Received request: {request:?}");
733
734        // Retrieve fork header
735        let fork_header = match validator
736            .consensus
737            .get_fork_header_hash(request.height, &request.fork_header)
738            .await
739        {
740            Ok(h) => h,
741            Err(e) => {
742                debug!(
743                    target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
744                    "Getting fork header hash failed: {e}"
745                );
746                handler.send_action(channel, ProtocolGenericAction::Skip).await;
747                continue
748            }
749        };
750
751        // Send response if header was found
752        if fork_header.is_some() {
753            handler
754                .send_action(
755                    channel,
756                    ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header }),
757                )
758                .await;
759            continue
760        }
761
762        // If header wasn't found in a fork, check canonical
763        if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
764            debug!(
765                target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
766                "Getting fork header hash failed: {e}"
767            );
768            handler.send_action(channel, ProtocolGenericAction::Skip).await;
769            continue
770        };
771
772        let response = match validator.blockchain.blocks.get_order(&[request.height], false) {
773            Ok(h) => ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header: h[0] }),
774            Err(e) => {
775                debug!(
776                    target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
777                    "Getting fork header hash failed: {e}"
778                );
779                ProtocolGenericAction::Skip
780            }
781        };
782
783        // Send response
784        handler.send_action(channel, response).await;
785    }
786}
787
788/// Background handler function for ProtocolSyncForkHeaders.
789async fn handle_receive_fork_headers_request(
790    handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
791    validator: ValidatorPtr,
792) -> Result<()> {
793    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "START");
794    loop {
795        // Wait for a new fork header hash request message
796        let (channel, request) = match handler.receiver.recv().await {
797            Ok(r) => r,
798            Err(e) => {
799                debug!(
800                    target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
801                    "recv fail: {e}"
802                );
803                continue
804            }
805        };
806
807        // Check if node has finished syncing its blockchain
808        let validator = validator.read().await;
809        if !validator.synced {
810            debug!(
811                target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
812                "Node still syncing blockchain, skipping..."
813            );
814            handler.send_action(channel, ProtocolGenericAction::Skip).await;
815            continue
816        }
817
818        // Check if request exists the configured limit
819        if request.headers.len() > BATCH {
820            debug!(
821                target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
822                "Node requested more headers than allowed."
823            );
824            handler.send_action(channel, ProtocolGenericAction::Skip).await;
825            continue
826        }
827
828        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "Received request: {request:?}");
829
830        // Retrieve fork headers
831        let headers = match validator
832            .consensus
833            .get_fork_headers(&request.headers, &request.fork_header)
834            .await
835        {
836            Ok(h) => h,
837            Err(e) => {
838                debug!(
839                    target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
840                    "Getting fork headers failed: {e}"
841                );
842                handler.send_action(channel, ProtocolGenericAction::Skip).await;
843                continue
844            }
845        };
846
847        // Send response if headers were found
848        if !headers.is_empty() {
849            handler
850                .send_action(
851                    channel,
852                    ProtocolGenericAction::Response(ForkHeadersResponse { headers }),
853                )
854                .await;
855            continue
856        }
857
858        // If headers weren't found in a fork, check canonical
859        if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
860            debug!(
861                target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
862                "Getting fork header hash failed: {e}"
863            );
864            handler.send_action(channel, ProtocolGenericAction::Skip).await;
865            continue
866        };
867
868        let response = match validator.blockchain.headers.get(&request.headers, true) {
869            Ok(h) => ProtocolGenericAction::Response(ForkHeadersResponse {
870                headers: h.iter().map(|x| x.clone().unwrap()).collect(),
871            }),
872            Err(e) => {
873                debug!(
874                    target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
875                    "Getting fork headers failed: {e}"
876                );
877                ProtocolGenericAction::Skip
878            }
879        };
880
881        // Send response
882        handler.send_action(channel, response).await;
883    }
884}
885
886/// Background handler function for ProtocolSyncForkProposals.
887async fn handle_receive_fork_proposals_request(
888    handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
889    validator: ValidatorPtr,
890) -> Result<()> {
891    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "START");
892    loop {
893        // Wait for a new fork header hash request message
894        let (channel, request) = match handler.receiver.recv().await {
895            Ok(r) => r,
896            Err(e) => {
897                debug!(
898                    target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
899                    "recv fail: {e}"
900                );
901                continue
902            }
903        };
904
905        // Check if node has finished syncing its blockchain
906        let validator = validator.read().await;
907        if !validator.synced {
908            debug!(
909                target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
910                "Node still syncing blockchain, skipping..."
911            );
912            handler.send_action(channel, ProtocolGenericAction::Skip).await;
913            continue
914        }
915
916        // Check if request exists the configured limit
917        if request.headers.len() > BATCH {
918            debug!(
919                target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
920                "Node requested more proposals than allowed."
921            );
922            handler.send_action(channel, ProtocolGenericAction::Skip).await;
923            continue
924        }
925
926        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "Received request: {request:?}");
927
928        // Retrieve fork proposals
929        let proposals = match validator
930            .consensus
931            .get_fork_proposals(&request.headers, &request.fork_header)
932            .await
933        {
934            Ok(p) => p,
935            Err(e) => {
936                debug!(
937                    target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
938                    "Getting fork proposals failed: {e}"
939                );
940                handler.send_action(channel, ProtocolGenericAction::Skip).await;
941                continue
942            }
943        };
944
945        // Send response if proposals were found
946        if !proposals.is_empty() {
947            handler
948                .send_action(
949                    channel,
950                    ProtocolGenericAction::Response(ForkProposalsResponse { proposals }),
951                )
952                .await;
953            continue
954        }
955
956        // If proposals weren't found in a fork, check canonical
957        if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
958            debug!(
959                target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
960                "Getting fork header hash failed: {e}"
961            );
962            handler.send_action(channel, ProtocolGenericAction::Skip).await;
963            continue
964        };
965
966        let response = match validator.blockchain.get_blocks_by_hash(&request.headers) {
967            Ok(blocks) => {
968                let mut proposals = Vec::with_capacity(blocks.len());
969                for block in blocks {
970                    proposals.push(Proposal::new(block));
971                }
972                ProtocolGenericAction::Response(ForkProposalsResponse { proposals })
973            }
974            Err(e) => {
975                debug!(
976                    target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
977                    "Getting fork proposals failed: {e}"
978                );
979                ProtocolGenericAction::Skip
980            }
981        };
982
983        // Send response
984        handler.send_action(channel, response).await;
985    }
986}