1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use num_bigint::BigUint;
25use smol::{channel::Receiver, lock::RwLock};
26use tinyjson::JsonValue;
27use tracing::{debug, error, info};
28
29use darkfi::{
30 blockchain::{BlockDifficulty, BlockchainOverlay, HeaderHash},
31 net::ChannelPtr,
32 util::{encoding::base64, time::Timestamp},
33 validator::{
34 consensus::{Fork, Proposal},
35 pow::PoWModule,
36 utils::{best_fork_index, header_rank},
37 verification::verify_fork_proposal,
38 Validator, ValidatorPtr,
39 },
40 Error::{Custom, DatabaseError, PoWInvalidOutHash, ProposalAlreadyExists},
41 Result,
42};
43use darkfi_serial::serialize_async;
44
45use crate::{
46 proto::{
47 ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
48 ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
49 ProposalMessage, BATCH,
50 },
51 DarkfiNodePtr,
52};
53
54pub async fn handle_unknown_proposals(
56 receiver: Receiver<(Proposal, u32)>,
57 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
58 unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
59 node: DarkfiNodePtr,
60) -> Result<()> {
61 debug!(target: "darkfid::task::handle_unknown_proposal", "START");
62 loop {
63 let (proposal, channel) = match receiver.recv().await {
65 Ok(m) => m,
66 Err(e) => {
67 debug!(
68 target: "darkfid::task::handle_unknown_proposal",
69 "recv fail: {e}"
70 );
71 continue
72 }
73 };
74
75 let lock = unknown_proposals.read().await;
77 let contains_proposal = lock.contains(proposal.hash.inner());
78 drop(lock);
79 if !contains_proposal {
80 debug!(
81 target: "darkfid::task::handle_unknown_proposal",
82 "Proposal {} is not in our unknown proposals queue.",
83 proposal.hash,
84 );
85 continue
86 };
87
88 let mut lock = unknown_proposals_channels.write().await;
90 let channel_counter = if let Some((counter, timestamp)) = lock.get_mut(&channel) {
91 *counter += 1;
92 *timestamp = Timestamp::current_time().inner();
93 *counter
94 } else {
95 lock.insert(channel, (1, Timestamp::current_time().inner()));
96 1
97 };
98 drop(lock);
99
100 if handle_unknown_proposal(&node, channel, &proposal).await {
102 if channel_counter > 5 {
104 if let Some(channel) = node.p2p_handler.p2p.get_channel(channel) {
105 channel.ban().await;
106 }
107 unknown_proposals_channels.write().await.remove(&channel);
108 }
109 };
110
111 let mut lock = unknown_proposals.write().await;
113 lock.remove(proposal.hash.inner());
114 drop(lock);
115 }
116}
117
118async fn handle_unknown_proposal(node: &DarkfiNodePtr, channel: u32, proposal: &Proposal) -> bool {
121 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
123 let Some(channel) = node.p2p_handler.p2p.get_channel(channel) else {
124 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
125 return false
126 };
127
128 let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
130 debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
131 return true
132 };
133
134 let last = match node.validator.read().await.blockchain.last() {
136 Ok(l) => l,
137 Err(e) => {
138 error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
139 return false
140 }
141 };
142 let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
143 if let Err(e) = channel.send(&request).await {
144 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
145 return true
146 };
147
148 let comms_timeout = node
149 .p2p_handler
150 .p2p
151 .settings()
152 .read_arc()
153 .await
154 .outbound_connect_timeout(channel.address().scheme());
155
156 let response = match response_sub.receive_with_timeout(comms_timeout).await {
158 Ok(r) => r,
159 Err(e) => {
160 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
161 return true
162 }
163 };
164 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
165
166 debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
168
169 if response.proposals.is_empty() {
171 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
172 return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
173 }
174
175 if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
177 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
178 return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
179 }
180
181 if response.proposals[0].block.header.previous != last.1 {
183 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
184 return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
185 }
186
187 if response.proposals.last().unwrap().hash != proposal.hash {
189 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
190 return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
191 }
192
193 for proposal in &response.proposals {
195 match node.validator.write().await.append_proposal(proposal).await {
197 Ok(()) => { }
198 Err(ProposalAlreadyExists) => continue,
200 Err(e) => {
201 debug!(
202 target: "darkfid::task::handle_unknown_proposal",
203 "Error while appending response proposal: {e}"
204 );
205 break;
206 }
207 };
208
209 let message = ProposalMessage(proposal.clone());
211 node.p2p_handler.p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
212
213 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
215 node.subscribers.get("proposals").unwrap().notify(vec![enc_prop].into()).await;
216 }
217
218 false
219}
220
221async fn handle_reorg(
233 node: &DarkfiNodePtr,
235 channel: &(&ChannelPtr, &u64),
237 proposal: &Proposal,
239) -> bool {
240 info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {:?}", proposal.hash, proposal.block.header.height, channel.0);
241
242 if proposal.block.header.height == 0 {
244 debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
245 return true
246 }
247
248 let (last_common_height, last_common_hash, peer_header_hashes) =
251 match retrieve_peer_header_hashes(&node.validator, channel, proposal).await {
252 Ok(t) => t,
253 Err(DatabaseError(e)) => {
254 error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer headers hashes: {e}");
255 return false
256 }
257 Err(e) => {
258 error!(target: "darkfid::task::handle_reorg", "Retrieving peer headers hashes failed: {e}");
259 return true
260 }
261 };
262
263 let validator = node.validator.read().await;
265 let module = match PoWModule::new(
266 validator.consensus.blockchain.clone(),
267 validator.consensus.module.target,
268 validator.consensus.module.fixed_difficulty.clone(),
269 Some(last_common_height + 1),
270 ) {
271 Ok(m) => m,
272 Err(e) => {
273 error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
274 return false
275 }
276 };
277
278 let last_difficulty = match last_common_height {
280 0 => {
281 let genesis_timestamp = match validator.blockchain.genesis_block() {
282 Ok(b) => b.header.timestamp,
283 Err(e) => {
284 error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
285 return false
286 }
287 };
288 BlockDifficulty::genesis(genesis_timestamp)
289 }
290 _ => match validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
291 Ok(d) => d[0].clone().unwrap(),
292 Err(e) => {
293 error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
294 return false
295 }
296 },
297 };
298 drop(validator);
299
300 let (targets_rank, hashes_rank) = match retrieve_peer_headers_sequence_ranking(
302 (&last_common_height, &last_common_hash, &module, &last_difficulty),
303 channel,
304 proposal,
305 &peer_header_hashes,
306 )
307 .await
308 {
309 Ok(p) => p,
310 Err(DatabaseError(e)) => {
311 error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer headers: {e}");
312 return false
313 }
314 Err(e) => {
315 error!(target: "darkfid::task::handle_reorg", "Retrieving peer headers failed: {e}");
316 return true
317 }
318 };
319
320 let validator = node.validator.read().await;
322 let index = match best_fork_index(&validator.consensus.forks) {
323 Ok(i) => i,
324 Err(e) => {
325 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
326 return false
327 }
328 };
329 let best_fork = &validator.consensus.forks[index];
330 if targets_rank < best_fork.targets_rank ||
331 (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
332 {
333 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
334 return true
335 }
336 drop(validator);
337
338 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
340 let peer_proposals = match retrieve_peer_proposals_sequence(
341 channel,
342 proposal,
343 &peer_header_hashes,
344 )
345 .await
346 {
347 Ok(p) => p,
348 Err(e) => {
349 error!(target: "darkfid::task::handle_reorg", "Retrieving peer proposals failed: {e}");
350 return true
351 }
352 };
353
354 let mut validator = node.validator.write().await;
357
358 let index = match best_fork_index(&validator.consensus.forks) {
360 Ok(i) => i,
361 Err(e) => {
362 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
363 return false
364 }
365 };
366 let best_fork = &validator.consensus.forks[index];
367 if targets_rank < best_fork.targets_rank ||
368 (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
369 {
370 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
371 return true
372 }
373
374 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, generating its fork...");
376 let mut peer_fork = match generate_peer_fork(
377 &validator,
378 (&last_common_height, &module, &last_difficulty),
379 &peer_proposals,
380 )
381 .await
382 {
383 Ok(f) => f,
384 Err(DatabaseError(e)) => {
385 error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer fork: {e}");
386 return false
387 }
388 Err(e) => {
389 error!(target: "darkfid::task::handle_reorg", "Generatating peer fork failed: {e}");
390 return true
391 }
392 };
393
394 info!(target: "darkfid::task::handle_reorg", "Executing reorg...");
396 if let Err(e) = validator.blockchain.reset_to_height(last_common_height) {
397 error!(target: "darkfid::task::handle_reorg", "Applying full inverse diff failed: {e}");
398 return false
399 };
400
401 let overlay = match BlockchainOverlay::new(&validator.blockchain) {
403 Ok(o) => o,
404 Err(e) => {
405 error!(target: "darkfid::task::handle_reorg", "Generating a new blockchain overlay failed: {e}");
406 return false
407 }
408 };
409 let mut diffs = Vec::with_capacity(peer_fork.diffs.len());
410 for diff in peer_fork.diffs {
411 let overlay = overlay.lock().unwrap();
412 let mut overlay = overlay.overlay.lock().unwrap();
413 if let Err(e) = overlay.add_diff(&diff) {
414 error!(target: "darkfid::task::handle_reorg", "Applying peer fork diff failed: {e}");
415 return false
416 }
417 match overlay.diff(&diffs) {
418 Ok(diff) => diffs.push(diff),
419 Err(e) => {
420 error!(target: "darkfid::task::handle_reorg", "Generate clean state inverse diff failed: {e}");
421 return false
422 }
423 }
424 }
425 peer_fork.overlay = overlay;
426 peer_fork.diffs = diffs;
427
428 validator.consensus.module = module;
430 validator.consensus.forks = vec![peer_fork];
431
432 let confirmed = match validator.confirmation().await {
434 Ok(f) => f,
435 Err(e) => {
436 error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
437 return false
438 }
439 };
440
441 if let Err(e) = node.registry.state.write().await.refresh(&validator).await {
443 error!(target: "darkfid::task::handle_reorg", "Failed refreshing mining block templates: {e}")
444 }
445
446 if !confirmed.is_empty() {
447 let mut notif_blocks = Vec::with_capacity(confirmed.len());
448 for block in confirmed {
449 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
450 }
451 node.subscribers.get("blocks").unwrap().notify(JsonValue::Array(notif_blocks)).await;
452 }
453
454 let message = ProposalMessage(proposal.clone());
456 node.p2p_handler.p2p.broadcast(&message).await;
457
458 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
460 node.subscribers.get("proposals").unwrap().notify(vec![enc_prop].into()).await;
461
462 false
463}
464
465async fn retrieve_peer_header_hashes(
468 validator: &ValidatorPtr,
470 channel: &(&ChannelPtr, &u64),
472 proposal: &Proposal,
474) -> Result<(u32, HeaderHash, Vec<HeaderHash>)> {
475 let response_sub = channel.0.subscribe_msg::<ForkHeaderHashResponse>().await?;
477
478 let mut peer_header_hashes = vec![];
480
481 let mut previous_height = proposal.block.header.height;
483 let mut previous_hash = proposal.hash;
484 for height in (0..proposal.block.header.height).rev() {
485 let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
487 channel.0.send(&request).await?;
488
489 let response = response_sub.receive_with_timeout(*channel.1).await?;
491 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
492
493 let Some(peer_header) = response.fork_header else {
495 return Err(Custom(String::from("Peer responded with an empty header")))
496 };
497
498 let headers = match validator.read().await.blockchain.blocks.get_order(&[height], false) {
500 Ok(h) => h,
501 Err(e) => return Err(DatabaseError(format!("Retrieving headers failed: {e}"))),
502 };
503 match headers[0] {
504 Some(known_header) => {
505 if known_header == peer_header {
506 previous_height = height;
507 previous_hash = known_header;
508 break
509 }
510 peer_header_hashes.insert(0, peer_header);
512 }
513 None => peer_header_hashes.insert(0, peer_header),
514 }
515 }
516
517 Ok((previous_height, previous_hash, peer_header_hashes))
518}
519
520async fn retrieve_peer_headers_sequence_ranking(
524 last_common_info: (&u32, &HeaderHash, &PoWModule, &BlockDifficulty),
526 channel: &(&ChannelPtr, &u64),
528 proposal: &Proposal,
530 header_hashes: &[HeaderHash],
532) -> Result<(BigUint, BigUint)> {
533 let response_sub = channel.0.subscribe_msg::<ForkHeadersResponse>().await?;
535
536 info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", header_hashes.len());
538 let mut previous_height = *last_common_info.0;
539 let mut previous_hash = *last_common_info.1;
540 let mut module = last_common_info.2.clone();
541 let mut targets_rank = last_common_info.3.ranks.targets_rank.clone();
542 let mut hashes_rank = last_common_info.3.ranks.hashes_rank.clone();
543 let mut batch = Vec::with_capacity(BATCH);
544 let mut total_processed = 0;
545 for (index, hash) in header_hashes.iter().enumerate() {
546 batch.push(*hash);
548
549 if batch.len() < BATCH && index != header_hashes.len() - 1 {
551 continue
552 }
553
554 let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
556 channel.0.send(&request).await?;
557
558 let response = response_sub.receive_with_timeout(*channel.1).await?;
560 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
561
562 if response.headers.len() != batch.len() {
564 return Err(Custom(String::from(
565 "Peer responded with a different headers sequence length",
566 )))
567 }
568
569 for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
571 let peer_header_hash = peer_header.hash();
572 debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
573
574 if peer_header_hash != batch[peer_header_index] {
576 return Err(Custom(format!(
577 "Peer responded with a differend header: {} - {peer_header_hash}",
578 batch[peer_header_index]
579 )))
580 }
581
582 if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
584 return Err(Custom(String::from("Invalid header sequence detected")))
585 }
586
587 let (next_difficulty, target_distance_sq, hash_distance_sq) =
589 match header_rank(&mut module, peer_header) {
590 Ok(tuple) => tuple,
591 Err(PoWInvalidOutHash) => return Err(PoWInvalidOutHash),
592 Err(e) => {
593 return Err(DatabaseError(format!("Computing header rank failed: {e}")))
594 }
595 };
596
597 targets_rank += target_distance_sq.clone();
599 hashes_rank += hash_distance_sq.clone();
600
601 module.append(peer_header, &next_difficulty)?;
603
604 previous_height = peer_header.height;
606 previous_hash = peer_header_hash;
607 }
608
609 total_processed += response.headers.len();
610 info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", header_hashes.len());
611
612 batch = Vec::with_capacity(BATCH);
614 }
615
616 if proposal.block.header.previous != previous_hash ||
618 proposal.block.header.height != previous_height + 1
619 {
620 return Err(Custom(String::from("Invalid header sequence detected")))
621 }
622
623 let (_, target_distance_sq, hash_distance_sq) =
625 match header_rank(&mut module, &proposal.block.header) {
626 Ok(tuple) => tuple,
627 Err(PoWInvalidOutHash) => return Err(PoWInvalidOutHash),
628 Err(e) => return Err(DatabaseError(format!("Computing header rank failed: {e}"))),
629 };
630
631 targets_rank += target_distance_sq.clone();
633 hashes_rank += hash_distance_sq.clone();
634
635 Ok((targets_rank, hashes_rank))
636}
637
638async fn retrieve_peer_proposals_sequence(
641 channel: &(&ChannelPtr, &u64),
643 proposal: &Proposal,
645 header_hashes: &[HeaderHash],
647) -> Result<Vec<Proposal>> {
648 let response_sub = channel.0.subscribe_msg::<ForkProposalsResponse>().await?;
650
651 let mut batch = Vec::with_capacity(BATCH);
653 let mut proposals = Vec::with_capacity(header_hashes.len());
654 for (index, hash) in header_hashes.iter().enumerate() {
655 batch.push(*hash);
657
658 if batch.len() < BATCH && index != header_hashes.len() - 1 {
660 continue
661 }
662
663 let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
665 channel.0.send(&request).await?;
666
667 let response = response_sub.receive_with_timeout(*channel.1).await?;
669 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
670
671 if response.proposals.len() != batch.len() {
673 return Err(Custom(String::from(
674 "Peer responded with a different proposals sequence length",
675 )))
676 }
677
678 for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
680 if peer_proposal.hash != batch[peer_proposal_index] {
682 return Err(Custom(format!(
683 "Peer responded with a differend proposal: {} - {}",
684 batch[peer_proposal_index], peer_proposal.hash
685 )))
686 }
687 proposals.push(peer_proposal.clone());
688 }
689
690 info!(target: "darkfid::task::handle_reorg", "Proposals received: {}/{}", proposals.len(), header_hashes.len());
691
692 batch = Vec::with_capacity(BATCH);
694 }
695
696 proposals.push(proposal.clone());
698
699 Ok(proposals)
700}
701
702async fn generate_peer_fork(
705 validator: &Validator,
707 last_common_info: (&u32, &PoWModule, &BlockDifficulty),
709 proposals: &[Proposal],
711) -> Result<Fork> {
712 let mut fork =
714 match Fork::new(validator.consensus.blockchain.clone(), last_common_info.1.clone()).await {
715 Ok(f) => f,
716 Err(e) => return Err(DatabaseError(format!("Generating peer fork failed: {e}"))),
717 };
718 fork.targets_rank = last_common_info.2.ranks.targets_rank.clone();
719 fork.hashes_rank = last_common_info.2.ranks.hashes_rank.clone();
720
721 let inverse_diffs =
723 match validator.blockchain.blocks.get_state_inverse_diffs_after(*last_common_info.0) {
724 Ok(i) => i,
725 Err(e) => {
726 return Err(DatabaseError(format!("Retrieving state inverse diffs failed: {e}")))
727 }
728 };
729 for inverse_diff in inverse_diffs.iter().rev() {
730 let result = fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff);
731 if let Err(e) = result {
732 return Err(DatabaseError(format!("Applying state inverse diff failed: {e}")))
733 }
734 }
735
736 let diff = fork.overlay.lock().unwrap().overlay.lock().unwrap().diff(&[]);
740 let diff = match diff {
741 Ok(d) => d,
742 Err(e) => {
743 return Err(DatabaseError(format!("Generate full state inverse diff failed: {e}")))
744 }
745 };
746 fork.diffs = vec![diff];
747
748 for proposal in proposals {
750 info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", proposal.hash, proposal.block.header.height);
751
752 verify_fork_proposal(&mut fork, proposal, validator.verify_fees).await?;
754
755 fork.append_proposal(proposal).await?;
757 }
758
759 fork.diffs.remove(0);
761
762 Ok(fork)
763}