1use std::sync::Arc;
20
21use async_trait::async_trait;
22use tracing::{debug, error};
23
24use darkfi::{
25 blockchain::{BlockInfo, Header, HeaderHash},
26 impl_p2p_message,
27 net::{
28 metering::MeteringConfiguration,
29 protocol::protocol_generic::{
30 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
31 },
32 session::SESSION_DEFAULT,
33 Message, P2pPtr,
34 },
35 system::ExecutorPtr,
36 util::time::NanoTimestamp,
37 validator::{consensus::Proposal, ValidatorPtr},
38 Error, Result,
39};
40use darkfi_serial::{SerialDecodable, SerialEncodable};
41
42pub const BATCH: usize = 20;
44
45const PROTOCOL_SYNC_METERING_CONFIGURATION: MeteringConfiguration = MeteringConfiguration {
52 threshold: 20,
53 sleep_step: 500,
54 expiry_time: NanoTimestamp::from_secs(5),
55};
56
57#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
61pub struct TipRequest {
62 pub tip: HeaderHash,
64}
65
66impl_p2p_message!(TipRequest, "tiprequest", 32, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
67
68#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
72pub struct TipResponse {
73 pub synced: bool,
75 pub height: Option<u32>,
77 pub hash: Option<HeaderHash>,
79}
80
81impl_p2p_message!(TipResponse, "tipresponse", 39, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
82
83#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
86pub struct HeaderSyncRequest {
87 pub height: u32,
89}
90
91impl_p2p_message!(
92 HeaderSyncRequest,
93 "headersyncrequest",
94 4,
95 1,
96 PROTOCOL_SYNC_METERING_CONFIGURATION
97);
98
99#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
102pub struct HeaderSyncResponse {
103 pub headers: Vec<Header>,
105}
106
107impl_p2p_message!(
108 HeaderSyncResponse,
109 "headersyncresponse",
110 72121, 1,
112 PROTOCOL_SYNC_METERING_CONFIGURATION
113);
114
115#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
118pub struct SyncRequest {
119 pub headers: Vec<HeaderHash>,
121}
122
123impl_p2p_message!(SyncRequest, "syncrequest", 641, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
124
125#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
128pub struct SyncResponse {
129 pub blocks: Vec<BlockInfo>,
131}
132
133impl_p2p_message!(SyncResponse, "syncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
134
135#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
141pub struct ForkSyncRequest {
142 pub tip: HeaderHash,
144 pub fork_tip: Option<HeaderHash>,
146}
147
148impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 65, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
149
150#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
153pub struct ForkSyncResponse {
154 pub proposals: Vec<Proposal>,
156}
157
158impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
159
160#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
163pub struct ForkHeaderHashRequest {
164 pub height: u32,
166 pub fork_header: HeaderHash,
168}
169
170impl_p2p_message!(
171 ForkHeaderHashRequest,
172 "forkheaderhashrequest",
173 36,
174 1,
175 PROTOCOL_SYNC_METERING_CONFIGURATION
176);
177
178#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
181pub struct ForkHeaderHashResponse {
182 pub fork_header: Option<HeaderHash>,
184}
185
186impl_p2p_message!(
187 ForkHeaderHashResponse,
188 "forkheaderhashresponse",
189 33,
190 1,
191 PROTOCOL_SYNC_METERING_CONFIGURATION
192);
193
194#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
198pub struct ForkHeadersRequest {
199 pub headers: Vec<HeaderHash>,
201 pub fork_header: HeaderHash,
203}
204
205impl_p2p_message!(
206 ForkHeadersRequest,
207 "forkheadersrequest",
208 673,
209 1,
210 PROTOCOL_SYNC_METERING_CONFIGURATION
211);
212
213#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
216pub struct ForkHeadersResponse {
217 pub headers: Vec<Header>,
219}
220
221impl_p2p_message!(
222 ForkHeadersResponse,
223 "forkheadersresponse",
224 72121, 1,
226 PROTOCOL_SYNC_METERING_CONFIGURATION
227);
228
229#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
233pub struct ForkProposalsRequest {
234 pub headers: Vec<HeaderHash>,
236 pub fork_header: HeaderHash,
238}
239
240impl_p2p_message!(
241 ForkProposalsRequest,
242 "forkproposalsrequest",
243 673,
244 1,
245 PROTOCOL_SYNC_METERING_CONFIGURATION
246);
247
248#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
251pub struct ForkProposalsResponse {
252 pub proposals: Vec<Proposal>,
254}
255
256impl_p2p_message!(
257 ForkProposalsResponse,
258 "forkproposalsresponse",
259 0,
260 1,
261 PROTOCOL_SYNC_METERING_CONFIGURATION
262);
263
264pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
266
267pub struct ProtocolSyncHandler {
269 tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
271 header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
273 sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
275 fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
277 fork_header_hash_handler:
279 ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
280 fork_headers_handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
282 fork_proposals_handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
284}
285
286impl ProtocolSyncHandler {
287 pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
290 debug!(
291 target: "darkfid::proto::protocol_sync::init",
292 "Adding all sync protocols to the protocol registry"
293 );
294
295 let tip_handler =
296 ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
297 let header_handler =
298 ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
299 let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
300 let fork_sync_handler =
301 ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
302 let fork_header_hash_handler =
303 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaderHash", SESSION_DEFAULT).await;
304 let fork_headers_handler =
305 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaders", SESSION_DEFAULT).await;
306 let fork_proposals_handler =
307 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkProposals", SESSION_DEFAULT).await;
308
309 Arc::new(Self {
310 tip_handler,
311 header_handler,
312 sync_handler,
313 fork_sync_handler,
314 fork_header_hash_handler,
315 fork_headers_handler,
316 fork_proposals_handler,
317 })
318 }
319
320 pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
322 debug!(
323 target: "darkfid::proto::protocol_sync::start",
324 "Starting sync protocols handlers tasks..."
325 );
326
327 self.tip_handler.task.clone().start(
328 handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
329 |res| async move {
330 match res {
331 Ok(()) | Err(Error::DetachedTaskStopped) => { }
332 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
333 }
334 },
335 Error::DetachedTaskStopped,
336 executor.clone(),
337 );
338
339 self.header_handler.task.clone().start(
340 handle_receive_header_request(self.header_handler.clone(), validator.clone()),
341 |res| async move {
342 match res {
343 Ok(()) | Err(Error::DetachedTaskStopped) => { }
344 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
345 }
346 },
347 Error::DetachedTaskStopped,
348 executor.clone(),
349 );
350
351 self.sync_handler.task.clone().start(
352 handle_receive_request(self.sync_handler.clone(), validator.clone()),
353 |res| async move {
354 match res {
355 Ok(()) | Err(Error::DetachedTaskStopped) => { }
356 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
357 }
358 },
359 Error::DetachedTaskStopped,
360 executor.clone(),
361 );
362
363 self.fork_sync_handler.task.clone().start(
364 handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
365 |res| async move {
366 match res {
367 Ok(()) | Err(Error::DetachedTaskStopped) => { }
368 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
369 }
370 },
371 Error::DetachedTaskStopped,
372 executor.clone(),
373 );
374
375 self.fork_header_hash_handler.task.clone().start(
376 handle_receive_fork_header_hash_request(self.fork_header_hash_handler.clone(), validator.clone()),
377 |res| async move {
378 match res {
379 Ok(()) | Err(Error::DetachedTaskStopped) => { }
380 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaderHash handler task: {e}"),
381 }
382 },
383 Error::DetachedTaskStopped,
384 executor.clone(),
385 );
386
387 self.fork_headers_handler.task.clone().start(
388 handle_receive_fork_headers_request(self.fork_headers_handler.clone(), validator.clone()),
389 |res| async move {
390 match res {
391 Ok(()) | Err(Error::DetachedTaskStopped) => { }
392 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaders handler task: {e}"),
393 }
394 },
395 Error::DetachedTaskStopped,
396 executor.clone(),
397 );
398
399 self.fork_proposals_handler.task.clone().start(
400 handle_receive_fork_proposals_request(self.fork_proposals_handler.clone(), validator.clone()),
401 |res| async move {
402 match res {
403 Ok(()) | Err(Error::DetachedTaskStopped) => { }
404 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkProposals handler task: {e}"),
405 }
406 },
407 Error::DetachedTaskStopped,
408 executor.clone(),
409 );
410
411 debug!(
412 target: "darkfid::proto::protocol_sync::start",
413 "Sync protocols handlers tasks started!"
414 );
415
416 Ok(())
417 }
418
419 pub async fn stop(&self) {
421 debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
422 self.tip_handler.task.stop().await;
423 self.header_handler.task.stop().await;
424 self.sync_handler.task.stop().await;
425 self.fork_sync_handler.task.stop().await;
426 self.fork_header_hash_handler.task.stop().await;
427 self.fork_headers_handler.task.stop().await;
428 self.fork_proposals_handler.task.stop().await;
429 debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
430 }
431}
432
433async fn handle_receive_tip_request(
435 handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
436 validator: ValidatorPtr,
437) -> Result<()> {
438 debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
439 loop {
440 let (channel, request) = match handler.receiver.recv().await {
442 Ok(r) => r,
443 Err(e) => {
444 debug!(
445 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
446 "recv fail: {e}"
447 );
448 continue
449 }
450 };
451
452 debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "Received request: {request:?}");
453
454 let validator = validator.read().await;
456 if !validator.synced {
457 debug!(
458 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
459 "Node still syncing blockchain"
460 );
461 handler
462 .send_action(
463 channel,
464 ProtocolGenericAction::Response(TipResponse {
465 synced: false,
466 height: None,
467 hash: None,
468 }),
469 )
470 .await;
471 continue
472 }
473
474 match validator.blockchain.blocks.contains(&request.tip) {
476 Ok(contains) => {
477 if !contains {
478 debug!(
479 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
480 "Node doesn't follow request sequence"
481 );
482 handler
483 .send_action(
484 channel,
485 ProtocolGenericAction::Response(TipResponse {
486 synced: true,
487 height: None,
488 hash: None,
489 }),
490 )
491 .await;
492 continue
493 }
494 }
495 Err(e) => {
496 error!(
497 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
498 "block_store.contains fail: {e}"
499 );
500 handler.send_action(channel, ProtocolGenericAction::Skip).await;
501 continue
502 }
503 }
504
505 let tip = match validator.blockchain.last() {
507 Ok(v) => v,
508 Err(e) => {
509 error!(
510 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
511 "blockchain.last fail: {e}"
512 );
513 handler.send_action(channel, ProtocolGenericAction::Skip).await;
514 continue
515 }
516 };
517
518 handler
520 .send_action(
521 channel,
522 ProtocolGenericAction::Response(TipResponse {
523 synced: true,
524 height: Some(tip.0),
525 hash: Some(tip.1),
526 }),
527 )
528 .await;
529 }
530}
531
532async fn handle_receive_header_request(
534 handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
535 validator: ValidatorPtr,
536) -> Result<()> {
537 debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
538 loop {
539 let (channel, request) = match handler.receiver.recv().await {
541 Ok(r) => r,
542 Err(e) => {
543 debug!(
544 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
545 "recv fail: {e}"
546 );
547 continue
548 }
549 };
550
551 let validator = validator.read().await;
553 if !validator.synced {
554 debug!(
555 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
556 "Node still syncing blockchain, skipping..."
557 );
558 handler.send_action(channel, ProtocolGenericAction::Skip).await;
559 continue
560 }
561
562 debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "Received request: {request:?}");
563
564 let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
566 Ok(v) => v,
567 Err(e) => {
568 error!(
569 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
570 "get_headers_before fail: {e}"
571 );
572 handler.send_action(channel, ProtocolGenericAction::Skip).await;
573 continue
574 }
575 };
576
577 handler
579 .send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
580 .await;
581 }
582}
583
584async fn handle_receive_request(
586 handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
587 validator: ValidatorPtr,
588) -> Result<()> {
589 debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
590 loop {
591 let (channel, request) = match handler.receiver.recv().await {
593 Ok(r) => r,
594 Err(e) => {
595 debug!(
596 target: "darkfid::proto::protocol_sync::handle_receive_request",
597 "recv fail: {e}"
598 );
599 continue
600 }
601 };
602
603 let validator = validator.read().await;
605 if !validator.synced {
606 debug!(
607 target: "darkfid::proto::protocol_sync::handle_receive_request",
608 "Node still syncing blockchain, skipping..."
609 );
610 handler.send_action(channel, ProtocolGenericAction::Skip).await;
611 continue
612 }
613
614 if request.headers.len() > BATCH {
616 debug!(
617 target: "darkfid::proto::protocol_sync::handle_receive_request",
618 "Node requested more blocks than allowed."
619 );
620 handler.send_action(channel, ProtocolGenericAction::Skip).await;
621 continue
622 }
623
624 debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}");
625
626 let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
628 Ok(v) => v,
629 Err(e) => {
630 error!(
631 target: "darkfid::proto::protocol_sync::handle_receive_request",
632 "get_blocks_after fail: {e}"
633 );
634 handler.send_action(channel, ProtocolGenericAction::Skip).await;
635 continue
636 }
637 };
638
639 handler
641 .send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
642 .await;
643 }
644}
645
646async fn handle_receive_fork_request(
648 handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
649 validator: ValidatorPtr,
650) -> Result<()> {
651 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
652 loop {
653 let (channel, request) = match handler.receiver.recv().await {
655 Ok(r) => r,
656 Err(e) => {
657 debug!(
658 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
659 "recv fail: {e}"
660 );
661 continue
662 }
663 };
664
665 let validator = validator.read().await;
667 if !validator.synced {
668 debug!(
669 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
670 "Node still syncing blockchain, skipping..."
671 );
672 handler.send_action(channel, ProtocolGenericAction::Skip).await;
673 continue
674 }
675
676 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
677
678 let proposals = match validator
680 .consensus
681 .get_fork_proposals_after(request.tip, request.fork_tip, BATCH as u32)
682 .await
683 {
684 Ok(p) => p,
685 Err(e) => {
686 debug!(
687 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
688 "Getting fork proposals failed: {e}"
689 );
690 handler.send_action(channel, ProtocolGenericAction::Skip).await;
691 continue
692 }
693 };
694
695 handler
697 .send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
698 .await;
699 }
700}
701
702async fn handle_receive_fork_header_hash_request(
704 handler: ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
705 validator: ValidatorPtr,
706) -> Result<()> {
707 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "START");
708 loop {
709 let (channel, request) = match handler.receiver.recv().await {
711 Ok(r) => r,
712 Err(e) => {
713 debug!(
714 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
715 "recv fail: {e}"
716 );
717 continue
718 }
719 };
720
721 let validator = validator.read().await;
723 if !validator.synced {
724 debug!(
725 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
726 "Node still syncing blockchain, skipping..."
727 );
728 handler.send_action(channel, ProtocolGenericAction::Skip).await;
729 continue
730 }
731
732 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "Received request: {request:?}");
733
734 let fork_header = match validator
736 .consensus
737 .get_fork_header_hash(request.height, &request.fork_header)
738 .await
739 {
740 Ok(h) => h,
741 Err(e) => {
742 debug!(
743 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
744 "Getting fork header hash failed: {e}"
745 );
746 handler.send_action(channel, ProtocolGenericAction::Skip).await;
747 continue
748 }
749 };
750
751 if fork_header.is_some() {
753 handler
754 .send_action(
755 channel,
756 ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header }),
757 )
758 .await;
759 continue
760 }
761
762 if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
764 debug!(
765 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
766 "Getting fork header hash failed: {e}"
767 );
768 handler.send_action(channel, ProtocolGenericAction::Skip).await;
769 continue
770 };
771
772 let response = match validator.blockchain.blocks.get_order(&[request.height], false) {
773 Ok(h) => ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header: h[0] }),
774 Err(e) => {
775 debug!(
776 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
777 "Getting fork header hash failed: {e}"
778 );
779 ProtocolGenericAction::Skip
780 }
781 };
782
783 handler.send_action(channel, response).await;
785 }
786}
787
788async fn handle_receive_fork_headers_request(
790 handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
791 validator: ValidatorPtr,
792) -> Result<()> {
793 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "START");
794 loop {
795 let (channel, request) = match handler.receiver.recv().await {
797 Ok(r) => r,
798 Err(e) => {
799 debug!(
800 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
801 "recv fail: {e}"
802 );
803 continue
804 }
805 };
806
807 let validator = validator.read().await;
809 if !validator.synced {
810 debug!(
811 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
812 "Node still syncing blockchain, skipping..."
813 );
814 handler.send_action(channel, ProtocolGenericAction::Skip).await;
815 continue
816 }
817
818 if request.headers.len() > BATCH {
820 debug!(
821 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
822 "Node requested more headers than allowed."
823 );
824 handler.send_action(channel, ProtocolGenericAction::Skip).await;
825 continue
826 }
827
828 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "Received request: {request:?}");
829
830 let headers = match validator
832 .consensus
833 .get_fork_headers(&request.headers, &request.fork_header)
834 .await
835 {
836 Ok(h) => h,
837 Err(e) => {
838 debug!(
839 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
840 "Getting fork headers failed: {e}"
841 );
842 handler.send_action(channel, ProtocolGenericAction::Skip).await;
843 continue
844 }
845 };
846
847 if !headers.is_empty() {
849 handler
850 .send_action(
851 channel,
852 ProtocolGenericAction::Response(ForkHeadersResponse { headers }),
853 )
854 .await;
855 continue
856 }
857
858 if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
860 debug!(
861 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
862 "Getting fork header hash failed: {e}"
863 );
864 handler.send_action(channel, ProtocolGenericAction::Skip).await;
865 continue
866 };
867
868 let response = match validator.blockchain.headers.get(&request.headers, true) {
869 Ok(h) => ProtocolGenericAction::Response(ForkHeadersResponse {
870 headers: h.iter().map(|x| x.clone().unwrap()).collect(),
871 }),
872 Err(e) => {
873 debug!(
874 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
875 "Getting fork headers failed: {e}"
876 );
877 ProtocolGenericAction::Skip
878 }
879 };
880
881 handler.send_action(channel, response).await;
883 }
884}
885
886async fn handle_receive_fork_proposals_request(
888 handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
889 validator: ValidatorPtr,
890) -> Result<()> {
891 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "START");
892 loop {
893 let (channel, request) = match handler.receiver.recv().await {
895 Ok(r) => r,
896 Err(e) => {
897 debug!(
898 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
899 "recv fail: {e}"
900 );
901 continue
902 }
903 };
904
905 let validator = validator.read().await;
907 if !validator.synced {
908 debug!(
909 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
910 "Node still syncing blockchain, skipping..."
911 );
912 handler.send_action(channel, ProtocolGenericAction::Skip).await;
913 continue
914 }
915
916 if request.headers.len() > BATCH {
918 debug!(
919 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
920 "Node requested more proposals than allowed."
921 );
922 handler.send_action(channel, ProtocolGenericAction::Skip).await;
923 continue
924 }
925
926 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "Received request: {request:?}");
927
928 let proposals = match validator
930 .consensus
931 .get_fork_proposals(&request.headers, &request.fork_header)
932 .await
933 {
934 Ok(p) => p,
935 Err(e) => {
936 debug!(
937 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
938 "Getting fork proposals failed: {e}"
939 );
940 handler.send_action(channel, ProtocolGenericAction::Skip).await;
941 continue
942 }
943 };
944
945 if !proposals.is_empty() {
947 handler
948 .send_action(
949 channel,
950 ProtocolGenericAction::Response(ForkProposalsResponse { proposals }),
951 )
952 .await;
953 continue
954 }
955
956 if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
958 debug!(
959 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
960 "Getting fork header hash failed: {e}"
961 );
962 handler.send_action(channel, ProtocolGenericAction::Skip).await;
963 continue
964 };
965
966 let response = match validator.blockchain.get_blocks_by_hash(&request.headers) {
967 Ok(blocks) => {
968 let mut proposals = Vec::with_capacity(blocks.len());
969 for block in blocks {
970 proposals.push(Proposal::new(block));
971 }
972 ProtocolGenericAction::Response(ForkProposalsResponse { proposals })
973 }
974 Err(e) => {
975 debug!(
976 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
977 "Getting fork proposals failed: {e}"
978 );
979 ProtocolGenericAction::Skip
980 }
981 };
982
983 handler.send_action(channel, response).await;
985 }
986}