1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use smol::{channel::Receiver, lock::RwLock};
25use tinyjson::JsonValue;
26use tracing::{debug, error, info};
27
28use darkfi::{
29 blockchain::BlockDifficulty,
30 net::{ChannelPtr, P2pPtr},
31 rpc::jsonrpc::JsonSubscriber,
32 util::{encoding::base64, time::Timestamp},
33 validator::{
34 consensus::{Fork, Proposal},
35 pow::PoWModule,
36 utils::{best_fork_index, header_rank},
37 verification::verify_fork_proposal,
38 ValidatorPtr,
39 },
40 Error, Result,
41};
42use darkfi_serial::serialize_async;
43
44use crate::proto::{
45 ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
46 ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
47 ProposalMessage, BATCH,
48};
49
50pub async fn handle_unknown_proposals(
52 receiver: Receiver<(Proposal, u32)>,
53 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
54 unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
55 validator: ValidatorPtr,
56 p2p: P2pPtr,
57 proposals_sub: JsonSubscriber,
58 blocks_sub: JsonSubscriber,
59) -> Result<()> {
60 debug!(target: "darkfid::task::handle_unknown_proposal", "START");
61 loop {
62 let (proposal, channel) = match receiver.recv().await {
64 Ok(m) => m,
65 Err(e) => {
66 debug!(
67 target: "darkfid::task::handle_unknown_proposal",
68 "recv fail: {e}"
69 );
70 continue
71 }
72 };
73
74 let lock = unknown_proposals.read().await;
76 let contains_proposal = lock.contains(proposal.hash.inner());
77 drop(lock);
78 if !contains_proposal {
79 debug!(
80 target: "darkfid::task::handle_unknown_proposal",
81 "Proposal {} is not in our unknown proposals queue.",
82 proposal.hash,
83 );
84 continue
85 };
86
87 let mut lock = unknown_proposals_channels.write().await;
89 let channel_counter = if let Some((counter, timestamp)) = lock.get_mut(&channel) {
90 *counter += 1;
91 *timestamp = Timestamp::current_time().inner();
92 *counter
93 } else {
94 lock.insert(channel, (1, Timestamp::current_time().inner()));
95 1
96 };
97 drop(lock);
98
99 if handle_unknown_proposal(
101 &validator,
102 &p2p,
103 &proposals_sub,
104 &blocks_sub,
105 channel,
106 &proposal,
107 )
108 .await
109 {
110 if channel_counter > 5 {
112 if let Some(channel) = p2p.get_channel(channel) {
113 channel.ban().await;
114 }
115 unknown_proposals_channels.write().await.remove(&channel);
116 }
117 };
118
119 let mut lock = unknown_proposals.write().await;
121 lock.remove(proposal.hash.inner());
122 drop(lock);
123 }
124}
125
126async fn handle_unknown_proposal(
129 validator: &ValidatorPtr,
130 p2p: &P2pPtr,
131 proposals_sub: &JsonSubscriber,
132 blocks_sub: &JsonSubscriber,
133 channel: u32,
134 proposal: &Proposal,
135) -> bool {
136 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
138 let Some(channel) = p2p.get_channel(channel) else {
139 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
140 return false
141 };
142
143 let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
145 debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
146 return true
147 };
148
149 let last = match validator.blockchain.last() {
151 Ok(l) => l,
152 Err(e) => {
153 error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
154 return false
155 }
156 };
157 let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
158 if let Err(e) = channel.send(&request).await {
159 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
160 return true
161 };
162
163 let comms_timeout =
164 p2p.settings().read_arc().await.outbound_connect_timeout(channel.address().scheme());
165
166 let response = match response_sub.receive_with_timeout(comms_timeout).await {
168 Ok(r) => r,
169 Err(e) => {
170 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
171 return true
172 }
173 };
174 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
175
176 debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
178
179 if response.proposals.is_empty() {
181 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
182 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
183 }
184
185 if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
187 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
188 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
189 }
190
191 if response.proposals[0].block.header.previous != last.1 {
193 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
194 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
195 }
196
197 if response.proposals.last().unwrap().hash != proposal.hash {
199 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
200 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
201 }
202
203 for proposal in &response.proposals {
205 match validator.append_proposal(proposal).await {
207 Ok(()) => { }
208 Err(Error::ProposalAlreadyExists) => continue,
210 Err(e) => {
211 debug!(
212 target: "darkfid::task::handle_unknown_proposal",
213 "Error while appending response proposal: {e}"
214 );
215 break;
216 }
217 };
218
219 let message = ProposalMessage(proposal.clone());
221 p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
222
223 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
225 proposals_sub.notify(vec![enc_prop].into()).await;
226 }
227
228 false
229}
230
231async fn handle_reorg(
239 validator: &ValidatorPtr,
240 p2p: &P2pPtr,
241 proposals_sub: &JsonSubscriber,
242 blocks_sub: &JsonSubscriber,
243 channel: ChannelPtr,
244 proposal: &Proposal,
245) -> bool {
246 info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
247
248 if proposal.block.header.height == 0 {
250 debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
251 return true
252 }
253
254 let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
256 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
257 return true
258 };
259
260 let mut peer_header_hashes = vec![];
262
263 let mut previous_height = proposal.block.header.height;
265 let mut previous_hash = proposal.hash;
266 for height in (0..proposal.block.header.height).rev() {
267 let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
269 if let Err(e) = channel.send(&request).await {
270 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
271 return true
272 };
273
274 let comms_timeout =
275 p2p.settings().read_arc().await.outbound_connect_timeout(channel.address().scheme());
276 let response = match response_sub.receive_with_timeout(comms_timeout).await {
278 Ok(r) => r,
279 Err(e) => {
280 debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
281 return true
282 }
283 };
284 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
285
286 let Some(peer_header) = response.fork_header else {
288 debug!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
289 return true
290 };
291
292 let headers = match validator.blockchain.blocks.get_order(&[height], false) {
294 Ok(r) => r,
295 Err(e) => {
296 error!(target: "darkfid::task::handle_reorg", "Retrieving headers failed: {e}");
297 return false
298 }
299 };
300 match headers[0] {
301 Some(known_header) => {
302 if known_header == peer_header {
303 previous_height = height;
304 previous_hash = known_header;
305 break
306 }
307 peer_header_hashes.insert(0, peer_header);
309 }
310 None => peer_header_hashes.insert(0, peer_header),
311 }
312 }
313
314 if peer_header_hashes.is_empty() {
316 debug!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
317 return true
318 }
319
320 let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
322 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
323 return true
324 };
325
326 let last_common_height = previous_height;
328 let last_difficulty = match previous_height {
329 0 => {
330 let genesis_timestamp = match validator.blockchain.genesis_block() {
331 Ok(b) => b.header.timestamp,
332 Err(e) => {
333 error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
334 return false
335 }
336 };
337 BlockDifficulty::genesis(genesis_timestamp)
338 }
339 _ => match validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
340 Ok(d) => d[0].clone().unwrap(),
341 Err(e) => {
342 error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
343 return false
344 }
345 },
346 };
347
348 let module = match PoWModule::new(
350 validator.consensus.blockchain.clone(),
351 validator.consensus.module.read().await.target,
352 validator.consensus.module.read().await.fixed_difficulty.clone(),
353 Some(last_common_height + 1),
354 ) {
355 Ok(m) => m,
356 Err(e) => {
357 error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
358 return false
359 }
360 };
361
362 info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
364 let mut batch = Vec::with_capacity(BATCH);
365 let mut total_processed = 0;
366 let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
367 let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
368 let mut headers_module = module.clone();
369 for (index, hash) in peer_header_hashes.iter().enumerate() {
370 batch.push(*hash);
372
373 if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
375 continue
376 }
377
378 let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
380 if let Err(e) = channel.send(&request).await {
381 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
382 return true
383 };
384
385 let comms_timeout =
386 p2p.settings().read_arc().await.outbound_connect_timeout(channel.address().scheme());
387 let response = match response_sub.receive_with_timeout(comms_timeout).await {
389 Ok(r) => r,
390 Err(e) => {
391 debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
392 return true
393 }
394 };
395 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
396
397 if response.headers.len() != batch.len() {
399 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
400 return true
401 }
402
403 for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
405 let peer_header_hash = peer_header.hash();
406 debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
407
408 if peer_header_hash != batch[peer_header_index] {
410 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
411 return true
412 }
413
414 if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
416 debug!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
417 return true
418 }
419
420 let (next_difficulty, target_distance_sq, hash_distance_sq) = match header_rank(
422 &headers_module,
423 peer_header,
424 ) {
425 Ok(tuple) => tuple,
426 Err(Error::PoWInvalidOutHash) => {
427 debug!(target: "darkfid::task::handle_reorg", "Invalid header hash detected");
428 return true
429 }
430 Err(e) => {
431 debug!(target: "darkfid::task::handle_reorg", "Computing header rank failed: {e}");
432 return false
433 }
434 };
435
436 targets_rank += target_distance_sq.clone();
438 hashes_rank += hash_distance_sq.clone();
439
440 if let Err(e) = headers_module.append(peer_header, &next_difficulty) {
442 debug!(target: "darkfid::task::handle_reorg", "Error while appending header to module: {e}");
443 return true
444 };
445
446 previous_height = peer_header.height;
448 previous_hash = peer_header_hash;
449 }
450
451 total_processed += response.headers.len();
452 info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
453
454 batch = Vec::with_capacity(BATCH);
456 }
457
458 let forks = validator.consensus.forks.read().await;
460 let index = match best_fork_index(&forks) {
461 Ok(i) => i,
462 Err(e) => {
463 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
464 return false
465 }
466 };
467 let best_fork = &forks[index];
468 if targets_rank < best_fork.targets_rank ||
469 (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
470 {
471 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
472 drop(forks);
473 return true
474 }
475 drop(forks);
476
477 let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
479 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
480 return true
481 };
482
483 let mut peer_fork =
485 match Fork::new(validator.consensus.blockchain.clone(), module.clone()).await {
486 Ok(f) => f,
487 Err(e) => {
488 error!(target: "darkfid::task::handle_reorg", "Generating peer fork failed: {e}");
489 return false
490 }
491 };
492 peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
493 peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
494
495 let inverse_diffs = match validator
497 .blockchain
498 .blocks
499 .get_state_inverse_diffs_after(last_common_height)
500 {
501 Ok(i) => i,
502 Err(e) => {
503 error!(target: "darkfid::task::handle_reorg", "Retrieving state inverse diffs failed: {e}");
504 peer_fork.purge_new_trees();
505 return false
506 }
507 };
508 for inverse_diff in inverse_diffs.iter().rev() {
509 if let Err(e) =
510 peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff)
511 {
512 error!(target: "darkfid::task::handle_reorg", "Applying inverse diff failed: {e}");
513 peer_fork.purge_new_trees();
514 return false
515 }
516 }
517
518 if let Err(e) = peer_fork.compute_monotree() {
520 error!(target: "darkfid::task::handle_reorg", "Rebuilding peer fork monotree failed: {e}");
521 peer_fork.purge_new_trees();
522 return false
523 }
524
525 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
527 let mut batch = Vec::with_capacity(BATCH);
528 let mut total_processed = 0;
529 for (index, hash) in peer_header_hashes.iter().enumerate() {
530 batch.push(*hash);
532
533 if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
535 continue
536 }
537
538 let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
540 if let Err(e) = channel.send(&request).await {
541 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
542 peer_fork.purge_new_trees();
543 return true
544 };
545
546 let comms_timeout =
547 p2p.settings().read_arc().await.outbound_connect_timeout(channel.address().scheme());
548
549 let response = match response_sub.receive_with_timeout(comms_timeout).await {
551 Ok(r) => r,
552 Err(e) => {
553 debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
554 peer_fork.purge_new_trees();
555 return true
556 }
557 };
558 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
559
560 if response.proposals.len() != batch.len() {
562 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
563 peer_fork.purge_new_trees();
564 return true
565 }
566
567 for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
569 info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
570
571 if peer_proposal.hash != batch[peer_proposal_index] {
573 error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
574 peer_fork.purge_new_trees();
575 return true
576 }
577
578 if let Err(e) =
580 verify_fork_proposal(&mut peer_fork, peer_proposal, validator.verify_fees).await
581 {
582 error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
583 return true
584 }
585
586 if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
588 error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
589 peer_fork.purge_new_trees();
590 return true
591 }
592 }
593
594 total_processed += response.proposals.len();
595 info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
596
597 batch = Vec::with_capacity(BATCH);
599 }
600
601 if let Err(e) = verify_fork_proposal(&mut peer_fork, proposal, validator.verify_fees).await {
603 error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
604 return true
605 }
606
607 if let Err(e) = peer_fork.append_proposal(proposal).await {
609 error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
610 peer_fork.purge_new_trees();
611 return true
612 }
613
614 let mut forks = validator.consensus.forks.write().await;
616 let index = match best_fork_index(&forks) {
617 Ok(i) => i,
618 Err(e) => {
619 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
620 peer_fork.purge_new_trees();
621 return false
622 }
623 };
624 let best_fork = &forks[index];
625 if peer_fork.targets_rank < best_fork.targets_rank ||
626 (peer_fork.targets_rank == best_fork.targets_rank &&
627 peer_fork.hashes_rank <= best_fork.hashes_rank)
628 {
629 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
630 peer_fork.purge_new_trees();
631 drop(forks);
632 return true
633 }
634
635 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
637 *validator.consensus.module.write().await = module;
638 *forks = vec![peer_fork];
639 drop(forks);
640
641 let confirmed = match validator.confirmation().await {
643 Ok(f) => f,
644 Err(e) => {
645 error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
646 return false
647 }
648 };
649
650 if !confirmed.is_empty() {
651 let mut notif_blocks = Vec::with_capacity(confirmed.len());
652 for block in confirmed {
653 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
654 }
655 blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
656 }
657
658 let message = ProposalMessage(proposal.clone());
660 p2p.broadcast(&message).await;
661
662 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
664 proposals_sub.notify(vec![enc_prop].into()).await;
665
666 false
667}