1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use num_bigint::BigUint;
25use smol::{channel::Receiver, lock::RwLock};
26use tinyjson::JsonValue;
27use tracing::{debug, error, info};
28
29use darkfi::{
30 blockchain::{BlockDifficulty, HeaderHash},
31 net::ChannelPtr,
32 util::{encoding::base64, time::Timestamp},
33 validator::{
34 consensus::{Fork, Proposal},
35 pow::PoWModule,
36 utils::{best_fork_index, header_rank},
37 verification::verify_fork_proposal,
38 ValidatorPtr,
39 },
40 Error::{Custom, DatabaseError, PoWInvalidOutHash, ProposalAlreadyExists},
41 Result,
42};
43use darkfi_serial::serialize_async;
44
45use crate::{
46 proto::{
47 ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
48 ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
49 ProposalMessage, BATCH,
50 },
51 DarkfiNodePtr,
52};
53
54pub async fn handle_unknown_proposals(
56 receiver: Receiver<(Proposal, u32)>,
57 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
58 unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
59 node: DarkfiNodePtr,
60) -> Result<()> {
61 debug!(target: "darkfid::task::handle_unknown_proposal", "START");
62 loop {
63 let (proposal, channel) = match receiver.recv().await {
65 Ok(m) => m,
66 Err(e) => {
67 debug!(
68 target: "darkfid::task::handle_unknown_proposal",
69 "recv fail: {e}"
70 );
71 continue
72 }
73 };
74
75 let lock = unknown_proposals.read().await;
77 let contains_proposal = lock.contains(proposal.hash.inner());
78 drop(lock);
79 if !contains_proposal {
80 debug!(
81 target: "darkfid::task::handle_unknown_proposal",
82 "Proposal {} is not in our unknown proposals queue.",
83 proposal.hash,
84 );
85 continue
86 };
87
88 let mut lock = unknown_proposals_channels.write().await;
90 let channel_counter = if let Some((counter, timestamp)) = lock.get_mut(&channel) {
91 *counter += 1;
92 *timestamp = Timestamp::current_time().inner();
93 *counter
94 } else {
95 lock.insert(channel, (1, Timestamp::current_time().inner()));
96 1
97 };
98 drop(lock);
99
100 if handle_unknown_proposal(&node, channel, &proposal).await {
102 if channel_counter > 5 {
104 if let Some(channel) = node.p2p_handler.p2p.get_channel(channel) {
105 channel.ban().await;
106 }
107 unknown_proposals_channels.write().await.remove(&channel);
108 }
109 };
110
111 let mut lock = unknown_proposals.write().await;
113 lock.remove(proposal.hash.inner());
114 drop(lock);
115 }
116}
117
118async fn handle_unknown_proposal(node: &DarkfiNodePtr, channel: u32, proposal: &Proposal) -> bool {
121 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
123 let Some(channel) = node.p2p_handler.p2p.get_channel(channel) else {
124 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
125 return false
126 };
127
128 let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
130 debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
131 return true
132 };
133
134 let last = match node.validator.blockchain.last() {
136 Ok(l) => l,
137 Err(e) => {
138 error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
139 return false
140 }
141 };
142 let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
143 if let Err(e) = channel.send(&request).await {
144 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
145 return true
146 };
147
148 let comms_timeout = node
149 .p2p_handler
150 .p2p
151 .settings()
152 .read_arc()
153 .await
154 .outbound_connect_timeout(channel.address().scheme());
155
156 let response = match response_sub.receive_with_timeout(comms_timeout).await {
158 Ok(r) => r,
159 Err(e) => {
160 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
161 return true
162 }
163 };
164 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
165
166 debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
168
169 if response.proposals.is_empty() {
171 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
172 return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
173 }
174
175 if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
177 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
178 return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
179 }
180
181 if response.proposals[0].block.header.previous != last.1 {
183 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
184 return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
185 }
186
187 if response.proposals.last().unwrap().hash != proposal.hash {
189 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
190 return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
191 }
192
193 for proposal in &response.proposals {
195 match node.validator.append_proposal(proposal).await {
197 Ok(()) => { }
198 Err(ProposalAlreadyExists) => continue,
200 Err(e) => {
201 debug!(
202 target: "darkfid::task::handle_unknown_proposal",
203 "Error while appending response proposal: {e}"
204 );
205 break;
206 }
207 };
208
209 let message = ProposalMessage(proposal.clone());
211 node.p2p_handler.p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
212
213 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
215 node.subscribers.get("proposals").unwrap().notify(vec![enc_prop].into()).await;
216 }
217
218 false
219}
220
221async fn handle_reorg(
233 node: &DarkfiNodePtr,
235 channel: &(&ChannelPtr, &u64),
237 proposal: &Proposal,
239) -> bool {
240 info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {:?}", proposal.hash, proposal.block.header.height, channel.0);
241
242 if proposal.block.header.height == 0 {
244 debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
245 return true
246 }
247
248 let (last_common_height, last_common_hash, peer_header_hashes) =
251 match retrieve_peer_header_hashes(&node.validator, channel, proposal).await {
252 Ok(t) => t,
253 Err(DatabaseError(e)) => {
254 error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer headers hashes: {e}");
255 return false
256 }
257 Err(e) => {
258 error!(target: "darkfid::task::handle_reorg", "Retrieving peer headers hashes failed: {e}");
259 return true
260 }
261 };
262
263 let module = match PoWModule::new(
265 node.validator.consensus.blockchain.clone(),
266 node.validator.consensus.module.read().await.target,
267 node.validator.consensus.module.read().await.fixed_difficulty.clone(),
268 Some(last_common_height + 1),
269 ) {
270 Ok(m) => m,
271 Err(e) => {
272 error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
273 return false
274 }
275 };
276
277 let last_difficulty = match last_common_height {
279 0 => {
280 let genesis_timestamp = match node.validator.blockchain.genesis_block() {
281 Ok(b) => b.header.timestamp,
282 Err(e) => {
283 error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
284 return false
285 }
286 };
287 BlockDifficulty::genesis(genesis_timestamp)
288 }
289 _ => match node.validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
290 Ok(d) => d[0].clone().unwrap(),
291 Err(e) => {
292 error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
293 return false
294 }
295 },
296 };
297
298 let (targets_rank, hashes_rank) = match retrieve_peer_headers_sequence_ranking(
300 (&last_common_height, &last_common_hash, &module, &last_difficulty),
301 channel,
302 proposal,
303 &peer_header_hashes,
304 )
305 .await
306 {
307 Ok(p) => p,
308 Err(DatabaseError(e)) => {
309 error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer headers: {e}");
310 return false
311 }
312 Err(e) => {
313 error!(target: "darkfid::task::handle_reorg", "Retrieving peer headers failed: {e}");
314 return true
315 }
316 };
317
318 let append_lock = node.validator.consensus.append_lock.write().await;
321
322 let mut forks = node.validator.consensus.forks.write().await;
324 let index = match best_fork_index(&forks) {
325 Ok(i) => i,
326 Err(e) => {
327 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
328 return false
329 }
330 };
331 let best_fork = &forks[index];
332 if targets_rank < best_fork.targets_rank ||
333 (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
334 {
335 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
336 return true
337 }
338
339 let peer_fork = match retrieve_peer_fork(
341 &node.validator,
342 (&last_common_height, &module, &last_difficulty),
343 channel,
344 proposal,
345 &peer_header_hashes,
346 )
347 .await
348 {
349 Ok(p) => p,
350 Err(DatabaseError(e)) => {
351 error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer fork: {e}");
352 return false
353 }
354 Err(e) => {
355 error!(target: "darkfid::task::handle_reorg", "Retrieving peer fork failed: {e}");
356 return true
357 }
358 };
359
360 if peer_fork.targets_rank < best_fork.targets_rank ||
362 (peer_fork.targets_rank == best_fork.targets_rank &&
363 peer_fork.hashes_rank <= best_fork.hashes_rank)
364 {
365 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
366 return true
367 }
368
369 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
371 if let Err(e) = node.validator.blockchain.reset_to_height(last_common_height) {
372 error!(target: "darkfid::task::handle_reorg", "Applying full inverse diff failed: {e}");
373 return false
374 };
375 *node.validator.consensus.module.write().await = module;
376 *forks = vec![peer_fork];
377 drop(forks);
378 drop(append_lock);
379
380 let confirmed = match node.validator.confirmation().await {
382 Ok(f) => f,
383 Err(e) => {
384 error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
385 return false
386 }
387 };
388
389 if let Err(e) = node.registry.refresh(&node.validator).await {
391 error!(target: "darkfid::task::handle_reorg", "Failed refreshing mining block templates: {e}")
392 }
393
394 if !confirmed.is_empty() {
395 let mut notif_blocks = Vec::with_capacity(confirmed.len());
396 for block in confirmed {
397 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
398 }
399 node.subscribers.get("blocks").unwrap().notify(JsonValue::Array(notif_blocks)).await;
400 }
401
402 let message = ProposalMessage(proposal.clone());
404 node.p2p_handler.p2p.broadcast(&message).await;
405
406 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
408 node.subscribers.get("proposals").unwrap().notify(vec![enc_prop].into()).await;
409
410 false
411}
412
413async fn retrieve_peer_header_hashes(
416 validator: &ValidatorPtr,
418 channel: &(&ChannelPtr, &u64),
420 proposal: &Proposal,
422) -> Result<(u32, HeaderHash, Vec<HeaderHash>)> {
423 let response_sub = channel.0.subscribe_msg::<ForkHeaderHashResponse>().await?;
425
426 let mut peer_header_hashes = vec![];
428
429 let mut previous_height = proposal.block.header.height;
431 let mut previous_hash = proposal.hash;
432 for height in (0..proposal.block.header.height).rev() {
433 let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
435 channel.0.send(&request).await?;
436
437 let response = response_sub.receive_with_timeout(*channel.1).await?;
439 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
440
441 let Some(peer_header) = response.fork_header else {
443 return Err(Custom(String::from("Peer responded with an empty header")))
444 };
445
446 let headers = match validator.blockchain.blocks.get_order(&[height], false) {
448 Ok(h) => h,
449 Err(e) => return Err(DatabaseError(format!("Retrieving headers failed: {e}"))),
450 };
451 match headers[0] {
452 Some(known_header) => {
453 if known_header == peer_header {
454 previous_height = height;
455 previous_hash = known_header;
456 break
457 }
458 peer_header_hashes.insert(0, peer_header);
460 }
461 None => peer_header_hashes.insert(0, peer_header),
462 }
463 }
464
465 Ok((previous_height, previous_hash, peer_header_hashes))
466}
467
468async fn retrieve_peer_headers_sequence_ranking(
472 last_common_info: (&u32, &HeaderHash, &PoWModule, &BlockDifficulty),
474 channel: &(&ChannelPtr, &u64),
476 proposal: &Proposal,
478 header_hashes: &[HeaderHash],
480) -> Result<(BigUint, BigUint)> {
481 let response_sub = channel.0.subscribe_msg::<ForkHeadersResponse>().await?;
483
484 info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", header_hashes.len());
486 let mut previous_height = *last_common_info.0;
487 let mut previous_hash = *last_common_info.1;
488 let mut module = last_common_info.2.clone();
489 let mut targets_rank = last_common_info.3.ranks.targets_rank.clone();
490 let mut hashes_rank = last_common_info.3.ranks.hashes_rank.clone();
491 let mut batch = Vec::with_capacity(BATCH);
492 let mut total_processed = 0;
493 for (index, hash) in header_hashes.iter().enumerate() {
494 batch.push(*hash);
496
497 if batch.len() < BATCH && index != header_hashes.len() - 1 {
499 continue
500 }
501
502 let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
504 channel.0.send(&request).await?;
505
506 let response = response_sub.receive_with_timeout(*channel.1).await?;
508 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
509
510 if response.headers.len() != batch.len() {
512 return Err(Custom(String::from(
513 "Peer responded with a different headers sequence length",
514 )))
515 }
516
517 for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
519 let peer_header_hash = peer_header.hash();
520 debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
521
522 if peer_header_hash != batch[peer_header_index] {
524 return Err(Custom(format!(
525 "Peer responded with a differend header: {} - {peer_header_hash}",
526 batch[peer_header_index]
527 )))
528 }
529
530 if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
532 return Err(Custom(String::from("Invalid header sequence detected")))
533 }
534
535 let (next_difficulty, target_distance_sq, hash_distance_sq) =
537 match header_rank(&module, peer_header) {
538 Ok(tuple) => tuple,
539 Err(PoWInvalidOutHash) => return Err(PoWInvalidOutHash),
540 Err(e) => {
541 return Err(DatabaseError(format!("Computing header rank failed: {e}")))
542 }
543 };
544
545 targets_rank += target_distance_sq.clone();
547 hashes_rank += hash_distance_sq.clone();
548
549 module.append(peer_header, &next_difficulty)?;
551
552 previous_height = peer_header.height;
554 previous_hash = peer_header_hash;
555 }
556
557 total_processed += response.headers.len();
558 info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", header_hashes.len());
559
560 batch = Vec::with_capacity(BATCH);
562 }
563
564 if proposal.block.header.previous != previous_hash ||
566 proposal.block.header.height != previous_height + 1
567 {
568 return Err(Custom(String::from("Invalid header sequence detected")))
569 }
570
571 let (_, target_distance_sq, hash_distance_sq) =
573 match header_rank(&module, &proposal.block.header) {
574 Ok(tuple) => tuple,
575 Err(PoWInvalidOutHash) => return Err(PoWInvalidOutHash),
576 Err(e) => return Err(DatabaseError(format!("Computing header rank failed: {e}"))),
577 };
578
579 targets_rank += target_distance_sq.clone();
581 hashes_rank += hash_distance_sq.clone();
582
583 Ok((targets_rank, hashes_rank))
584}
585
586async fn retrieve_peer_fork(
589 validator: &ValidatorPtr,
591 last_common_info: (&u32, &PoWModule, &BlockDifficulty),
593 channel: &(&ChannelPtr, &u64),
595 proposal: &Proposal,
597 header_hashes: &[HeaderHash],
599) -> Result<Fork> {
600 let response_sub = channel.0.subscribe_msg::<ForkProposalsResponse>().await?;
602
603 let mut peer_fork =
605 match Fork::new(validator.consensus.blockchain.clone(), last_common_info.1.clone()).await {
606 Ok(f) => f,
607 Err(e) => return Err(DatabaseError(format!("Generating peer fork failed: {e}"))),
608 };
609 peer_fork.targets_rank = last_common_info.2.ranks.targets_rank.clone();
610 peer_fork.hashes_rank = last_common_info.2.ranks.hashes_rank.clone();
611
612 let inverse_diffs =
614 match validator.blockchain.blocks.get_state_inverse_diffs_after(*last_common_info.0) {
615 Ok(i) => i,
616 Err(e) => {
617 return Err(DatabaseError(format!("Retrieving state inverse diffs failed: {e}")))
618 }
619 };
620 for inverse_diff in inverse_diffs.iter().rev() {
621 let result =
622 peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff);
623 if let Err(e) = result {
624 return Err(DatabaseError(format!("Applying state inverse diff failed: {e}")))
625 }
626 }
627
628 let diff = peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().diff(&[]);
632 let diff = match diff {
633 Ok(d) => d,
634 Err(e) => {
635 return Err(DatabaseError(format!("Generate full state inverse diff failed: {e}")))
636 }
637 };
638 peer_fork.diffs = vec![diff];
639
640 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", header_hashes.len());
642 let mut batch = Vec::with_capacity(BATCH);
643 let mut total_processed = 0;
644 for (index, hash) in header_hashes.iter().enumerate() {
645 batch.push(*hash);
647
648 if batch.len() < BATCH && index != header_hashes.len() - 1 {
650 continue
651 }
652
653 let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
655 channel.0.send(&request).await?;
656
657 let response = response_sub.receive_with_timeout(*channel.1).await?;
659 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
660
661 if response.proposals.len() != batch.len() {
663 return Err(Custom(String::from(
664 "Peer responded with a different proposals sequence length",
665 )))
666 }
667
668 for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
670 info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
671
672 if peer_proposal.hash != batch[peer_proposal_index] {
674 return Err(Custom(format!(
675 "Peer responded with a differend proposal: {} - {}",
676 batch[peer_proposal_index], peer_proposal.hash
677 )))
678 }
679
680 verify_fork_proposal(&mut peer_fork, peer_proposal, validator.verify_fees).await?;
682
683 peer_fork.append_proposal(peer_proposal).await?;
685 }
686
687 total_processed += response.proposals.len();
688 info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", header_hashes.len());
689
690 batch = Vec::with_capacity(BATCH);
692 }
693
694 verify_fork_proposal(&mut peer_fork, proposal, validator.verify_fees).await?;
696
697 peer_fork.append_proposal(proposal).await?;
699
700 peer_fork.diffs.remove(0);
702
703 Ok(peer_fork)
704}