1use std::collections::{BTreeSet, HashMap};
20
21use darkfi_sdk::{crypto::MerkleTree, tx::TransactionHash};
22use darkfi_serial::{async_trait, deserialize, SerialDecodable, SerialEncodable};
23use num_bigint::BigUint;
24use sled_overlay::{database::SledDbOverlayStateDiff, sled::IVec};
25use tracing::{debug, info, warn};
26
27use crate::{
28 blockchain::{
29 block_store::{BlockDifficulty, BlockRanks},
30 parse_record, BlockInfo, Blockchain, BlockchainOverlay, BlockchainOverlayPtr, Header,
31 HeaderHash,
32 },
33 runtime::vm_runtime::GAS_LIMIT,
34 tx::{Transaction, MAX_TX_CALLS},
35 validator::{
36 pow::{PoWModule, RANDOMX_KEY_CHANGE_DELAY, RANDOMX_KEY_CHANGING_HEIGHT},
37 utils::{best_fork_index, block_rank, find_extended_fork_index, worst_fork_index},
38 verification::{verify_proposal, verify_transaction},
39 },
40 zk::VerifyingKey,
41 Error, Result,
42};
43
44pub const BLOCK_GAS_LIMIT: u64 = GAS_LIMIT * MAX_TX_CALLS as u64 * 50;
46
47pub struct Consensus {
50 pub blockchain: Blockchain,
52 pub confirmation_threshold: usize,
54 pub forks: Vec<Fork>,
56 max_forks: usize,
58 pub module: PoWModule,
60}
61
62impl Consensus {
63 pub fn new(
65 blockchain: Blockchain,
66 confirmation_threshold: usize,
67 max_forks: usize,
68 pow_target: u32,
69 pow_fixed_difficulty: Option<BigUint>,
70 ) -> Result<Self> {
71 let max_forks = if max_forks == 0 { 1 } else { max_forks };
72 let module = PoWModule::new(blockchain.clone(), pow_target, pow_fixed_difficulty, None)?;
73
74 Ok(Self { blockchain, confirmation_threshold, forks: vec![], max_forks, module })
75 }
76
77 pub async fn generate_empty_fork(&mut self) -> Result<()> {
81 debug!(target: "validator::consensus::generate_empty_fork", "Generating new empty fork...");
82 for fork in &self.forks {
84 if fork.proposals.is_empty() {
85 debug!(target: "validator::consensus::generate_empty_fork", "An empty fork already exists.");
86 return Ok(())
87 }
88 }
89 let fork = Fork::new(self.blockchain.clone(), self.module.clone()).await?;
90 self.push_fork(fork);
91 debug!(target: "validator::consensus::generate_empty_fork", "Fork generated!");
92
93 Ok(())
94 }
95
96 fn push_fork(&mut self, fork: Fork) {
101 if self.forks.len() < self.max_forks {
103 self.forks.push(fork);
104 return
105 }
106
107 let index = worst_fork_index(&self.forks).unwrap();
112
113 if fork.targets_rank < self.forks[index].targets_rank {
115 return
116 }
117
118 if fork.targets_rank == self.forks[index].targets_rank &&
120 fork.hashes_rank <= self.forks[index].hashes_rank
121 {
122 return
123 }
124
125 self.forks[index] = fork;
127 }
128
129 pub async fn append_proposal(
133 &mut self,
134 proposal: &Proposal,
135 is_new: bool,
136 verify_fees: bool,
137 ) -> Result<()> {
138 debug!(target: "validator::consensus::append_proposal", "Appending proposal {}", proposal.hash);
139
140 for fork in &self.forks {
142 for p in fork.proposals.iter().rev() {
143 if p == &proposal.hash {
144 debug!(target: "validator::consensus::append_proposal", "Proposal {} already exists", proposal.hash);
145 return Err(Error::ProposalAlreadyExists)
146 }
147 }
148 }
149 if let Ok(canonical_headers) =
151 self.blockchain.blocks.get_order(&[proposal.block.header.height], true)
152 {
153 if canonical_headers[0].unwrap() == proposal.hash {
154 debug!(target: "validator::consensus::append_proposal", "Proposal {} already exists", proposal.hash);
155 return Err(Error::ProposalAlreadyExists)
156 }
157 }
158
159 let (mut fork, index) = verify_proposal(self, proposal, is_new, verify_fees).await?;
161
162 fork.append_proposal(proposal).await?;
164
165 match index {
168 Some(i)
169 if i < self.forks.len() &&
170 self.forks[i].proposals == fork.proposals[..fork.proposals.len() - 1] =>
171 {
172 self.forks[i] = fork
173 }
174 _ => self.push_fork(fork),
175 }
176
177 if !proposal.block.txs.is_empty() {
179 self.blockchain.remove_pending_txs(&proposal.block.txs)?;
180 }
181
182 info!(target: "validator::consensus::append_proposal", "Appended proposal {} - {}", proposal.hash, proposal.block.header.height);
183
184 Ok(())
185 }
186
187 pub async fn find_extended_fork(&self, proposal: &Proposal) -> Result<(Fork, Option<usize>)> {
194 let found = find_extended_fork_index(&self.forks, proposal);
196 if found.is_err() {
197 if let Err(Error::ProposalAlreadyExists) = found {
198 return Err(Error::ProposalAlreadyExists)
199 }
200
201 let (last_height, last_block) = self.blockchain.last()?;
203 if proposal.block.header.previous != last_block ||
204 proposal.block.header.height <= last_height
205 {
206 return Err(Error::ExtendedChainIndexNotFound)
207 }
208
209 for (f_index, fork) in self.forks.iter().enumerate() {
211 if fork.proposals.is_empty() {
212 return Ok((self.forks[f_index].full_clone()?, Some(f_index)))
213 }
214 }
215
216 let fork = Fork::new(self.blockchain.clone(), self.module.clone()).await?;
218 return Ok((fork, None))
219 }
220
221 let (f_index, p_index) = found.unwrap();
222 let original_fork = &self.forks[f_index];
223 if p_index == (original_fork.proposals.len() - 1) {
225 return Ok((original_fork.full_clone()?, Some(f_index)))
226 }
227
228 let mut fork = Fork::new(self.blockchain.clone(), self.module.clone()).await?;
230 fork.proposals = original_fork.proposals[..p_index + 1].to_vec();
231 fork.diffs = original_fork.diffs[..p_index + 1].to_vec();
232
233 let blocks = &original_fork.overlay.lock().unwrap().get_blocks_by_hash(&fork.proposals)?;
235 for (index, block) in blocks.iter().enumerate() {
236 fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(&fork.diffs[index])?;
238
239 let (next_difficulty, target_distance_sq, hash_distance_sq) =
241 block_rank(&mut fork.module, block)?;
242
243 fork.module.append(&block.header, &next_difficulty)?;
245
246 fork.targets_rank += target_distance_sq;
248 fork.hashes_rank += hash_distance_sq;
249 }
250
251 Ok((fork, None))
252 }
253
254 pub async fn confirmation(&self) -> Result<Option<usize>> {
264 debug!(target: "validator::consensus::confirmation", "Started confirmation check");
265
266 let index = best_fork_index(&self.forks)?;
268
269 if self.forks[index].proposals.len() < self.confirmation_threshold {
271 debug!(target: "validator::consensus::confirmation", "Nothing to confirm yet, best fork size: {}", self.forks[index].proposals.len());
272 return Ok(None)
273 }
274
275 for (f_index, fork) in self.forks.iter().enumerate() {
277 if f_index == index {
279 continue
280 }
281
282 if fork.targets_rank != self.forks[index].targets_rank {
284 continue
285 }
286
287 if fork.hashes_rank == self.forks[index].hashes_rank {
289 debug!(target: "validator::consensus::confirmation", "Competing best forks found");
290 return Ok(None)
291 }
292 }
293
294 Ok(Some(index))
295 }
296
297 fn find_fork_by_header(&self, fork_header: &HeaderHash) -> Option<usize> {
300 for (index, fork) in self.forks.iter().enumerate() {
301 for p in fork.proposals.iter().rev() {
302 if p == fork_header {
303 return Some(index)
304 }
305 }
306 }
307 None
308 }
309
310 pub async fn get_fork_header_hash(
313 &self,
314 height: u32,
315 fork_header: &HeaderHash,
316 ) -> Result<Option<HeaderHash>> {
317 let Some(index) = self.find_fork_by_header(fork_header) else { return Ok(None) };
319
320 let header =
322 self.forks[index].overlay.lock().unwrap().blocks.get_order(&[height], false)?[0];
323
324 Ok(header)
325 }
326
327 pub async fn get_fork_headers(
331 &self,
332 headers: &[HeaderHash],
333 fork_header: &HeaderHash,
334 ) -> Result<Vec<Header>> {
335 let Some(index) = self.find_fork_by_header(fork_header) else { return Ok(vec![]) };
337
338 let headers = self.forks[index].overlay.lock().unwrap().get_headers_by_hash(headers)?;
340
341 Ok(headers)
342 }
343
344 pub async fn get_fork_proposals(
348 &self,
349 headers: &[HeaderHash],
350 fork_header: &HeaderHash,
351 ) -> Result<Vec<Proposal>> {
352 let Some(index) = self.find_fork_by_header(fork_header) else { return Ok(vec![]) };
354
355 let blocks = self.forks[index].overlay.lock().unwrap().get_blocks_by_hash(headers)?;
357 let mut proposals = Vec::with_capacity(blocks.len());
358 for block in blocks {
359 proposals.push(Proposal::new(block));
360 }
361
362 Ok(proposals)
363 }
364
365 pub async fn get_fork_proposals_after(
371 &self,
372 tip: HeaderHash,
373 fork_tip: Option<HeaderHash>,
374 limit: u32,
375 ) -> Result<Vec<Proposal>> {
376 let mut proposals = vec![];
378
379 let index = match fork_tip {
381 Some(fork_tip) => {
382 let Some(found) = self.find_fork_by_header(&fork_tip) else { return Ok(proposals) };
383 found
384 }
385 None => best_fork_index(&self.forks)?,
386 };
387
388 let Ok(existing_tips) =
390 self.forks[index].overlay.lock().unwrap().get_blocks_by_hash(&[tip])
391 else {
392 return Ok(proposals)
393 };
394
395 let last_block_height = self.forks[index].overlay.lock().unwrap().last()?.0;
397 if last_block_height.saturating_sub(existing_tips[0].header.height) >= limit {
398 return Ok(proposals)
399 }
400
401 let headers = self.blockchain.blocks.get_all_after(existing_tips[0].header.height)?;
403 let blocks = self.blockchain.get_blocks_by_hash(&headers)?;
404 for block in blocks {
405 proposals.push(Proposal::new(block));
406 }
407 let blocks = self.forks[index]
408 .overlay
409 .lock()
410 .unwrap()
411 .get_blocks_by_hash(&self.forks[index].proposals)?;
412 for block in blocks {
413 if block.header.height > existing_tips[0].header.height {
415 proposals.push(Proposal::new(block));
416 }
417 }
418
419 Ok(proposals)
420 }
421
422 pub async fn current_mining_randomx_key(&self) -> Result<HeaderHash> {
426 let (next_block_height, rx_keys) = if self.forks.is_empty() {
429 let (next_block_height, _) = self.blockchain.last()?;
430 (next_block_height + 1, self.module.darkfi_rx_keys)
431 } else {
432 let index = best_fork_index(&self.forks)?;
434 let fork = &self.forks[index];
435 let last = fork.last_proposal()?;
436 (last.block.header.height + 1, fork.module.darkfi_rx_keys)
437 };
438
439 if next_block_height > RANDOMX_KEY_CHANGING_HEIGHT &&
442 next_block_height % RANDOMX_KEY_CHANGING_HEIGHT == RANDOMX_KEY_CHANGE_DELAY
443 {
444 Ok(rx_keys.1.ok_or_else(|| Error::ParseFailed("darkfi_rx_keys.1 unwrap() error"))?)
445 } else {
446 Ok(rx_keys.0)
447 }
448 }
449
450 pub async fn best_current_fork(&self) -> Result<Fork> {
452 let index = best_fork_index(&self.forks)?;
453 self.forks[index].full_clone()
454 }
455
456 pub async fn best_fork_last_header(&self) -> Result<(u32, HeaderHash)> {
459 if self.forks.is_empty() {
461 return self.blockchain.last()
462 }
463
464 let index = best_fork_index(&self.forks)?;
466 let fork = &self.forks[index];
467
468 let last = fork.last_proposal()?;
470 Ok((last.block.header.height, last.hash))
471 }
472
473 pub async fn reset_forks(
483 &mut self,
484 prefix: &[HeaderHash],
485 confirmed_fork_index: &usize,
486 confirmed_txs: &[Transaction],
487 ) -> Result<()> {
488 let excess = prefix.len();
493 let prefix_last_index = excess - 1;
494 let prefix_last = prefix.last().unwrap();
495 let mut keep = vec![true; self.forks.len()];
496 let confirmed_txs_hashes: Vec<TransactionHash> =
497 confirmed_txs.iter().map(|tx| tx.hash()).collect();
498 for (index, fork) in self.forks.iter_mut().enumerate() {
499 if &index == confirmed_fork_index {
500 continue
501 }
502
503 if fork.proposals.is_empty() ||
509 prefix_last_index >= fork.proposals.len() ||
510 &fork.proposals[prefix_last_index] != prefix_last
511 {
512 keep[index] = false;
513 continue
514 }
515
516 let rest_proposals = fork.proposals.split_off(excess);
518 let rest_diffs = fork.diffs.split_off(excess);
519 let mut diffs = fork.diffs.clone();
520 fork.proposals = rest_proposals;
521 fork.diffs = rest_diffs;
522 for diff in diffs.iter_mut() {
523 fork.overlay.lock().unwrap().overlay.lock().unwrap().remove_diff(diff);
524 }
525 }
526
527 let mut iter = keep.iter();
529 self.forks.retain(|_| *iter.next().unwrap());
530
531 self.blockchain.remove_pending_txs_hashes(&confirmed_txs_hashes)?;
534
535 Ok(())
536 }
537
538 pub async fn purge_forks(&mut self) -> Result<()> {
541 debug!(target: "validator::consensus::purge_forks", "Purging current forks...");
542 self.forks = vec![Fork::new(self.blockchain.clone(), self.module.clone()).await?];
543 debug!(target: "validator::consensus::purge_forks", "Forks purged!");
544
545 Ok(())
546 }
547
548 pub async fn reset_pow_module(&mut self) -> Result<()> {
550 debug!(target: "validator::consensus::reset_pow_module", "Resetting PoW module...");
551 self.module = PoWModule::new(
552 self.blockchain.clone(),
553 self.module.target,
554 self.module.fixed_difficulty.clone(),
555 None,
556 )?;
557 debug!(target: "validator::consensus::reset_pow_module", "PoW module reset successfully!");
558
559 Ok(())
560 }
561
562 pub async fn healthcheck(&self) -> Result<()> {
565 let state_root = self.blockchain.contracts.get_state_monotree_root()?;
567
568 let last_block_state_root = self.blockchain.last_header()?.state_root;
570 if state_root != last_block_state_root {
571 return Err(Error::ContractsStatesRootError(
572 blake3::Hash::from_bytes(state_root).to_string(),
573 blake3::Hash::from_bytes(last_block_state_root).to_string(),
574 ));
575 }
576
577 for fork in &self.forks {
579 fork.healthcheck()?;
580 }
581
582 Ok(())
583 }
584
585 pub async fn purge_unreferenced_trees(
588 &self,
589 referenced_trees: &mut BTreeSet<IVec>,
590 ) -> Result<()> {
591 if self.forks.is_empty() {
593 let fork = Fork::new(self.blockchain.clone(), self.module.clone()).await?;
596 fork.referenced_trees(referenced_trees);
597 } else {
598 for fork in &self.forks {
600 fork.referenced_trees(referenced_trees);
601 }
602 }
603
604 let current_trees = self.blockchain.sled_db.tree_names();
606
607 for tree in current_trees {
610 if referenced_trees.contains(&tree) {
612 continue
613 }
614
615 let Ok(tree) = deserialize::<[u8; 32]>(&tree) else { continue };
617
618 debug!(target: "validator::consensus::purge_unreferenced_trees", "Dropping unreferenced tree: {}", blake3::Hash::from(tree));
620 self.blockchain.sled_db.drop_tree(tree)?;
621 }
622
623 Ok(())
624 }
625}
626
627#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
629pub struct Proposal {
630 pub hash: HeaderHash,
632 pub block: BlockInfo,
634}
635
636impl Proposal {
637 pub fn new(block: BlockInfo) -> Self {
638 let hash = block.hash();
639 Self { hash, block }
640 }
641}
642
643impl From<Proposal> for BlockInfo {
644 fn from(proposal: Proposal) -> BlockInfo {
645 proposal.block
646 }
647}
648
649#[derive(Clone)]
656pub struct Fork {
657 pub blockchain: Blockchain,
659 pub overlay: BlockchainOverlayPtr,
661 pub module: PoWModule,
663 pub proposals: Vec<HeaderHash>,
665 pub diffs: Vec<SledDbOverlayStateDiff>,
667 pub targets_rank: BigUint,
669 pub hashes_rank: BigUint,
671}
672
673impl Fork {
674 pub async fn new(blockchain: Blockchain, module: PoWModule) -> Result<Self> {
675 let overlay = BlockchainOverlay::new(&blockchain)?;
676 let last_difficulty = blockchain.last_block_difficulty()?;
678 let targets_rank = last_difficulty.ranks.targets_rank;
679 let hashes_rank = last_difficulty.ranks.hashes_rank;
680 Ok(Self {
681 blockchain,
682 overlay,
683 module,
684 proposals: vec![],
685 diffs: vec![],
686 targets_rank,
687 hashes_rank,
688 })
689 }
690
691 pub async fn append_proposal(&mut self, proposal: &Proposal) -> Result<()> {
694 let (next_difficulty, target_distance_sq, hash_distance_sq) =
696 block_rank(&mut self.module, &proposal.block)?;
697
698 self.targets_rank += target_distance_sq.clone();
700 self.hashes_rank += hash_distance_sq.clone();
701
702 let cumulative_difficulty =
704 self.module.cumulative_difficulty.clone() + next_difficulty.clone();
705 let ranks = BlockRanks::new(
706 target_distance_sq,
707 self.targets_rank.clone(),
708 hash_distance_sq,
709 self.hashes_rank.clone(),
710 );
711 let block_difficulty = BlockDifficulty::new(
712 proposal.block.header.height,
713 proposal.block.header.timestamp,
714 next_difficulty,
715 cumulative_difficulty,
716 ranks,
717 );
718 self.module.append_difficulty(&self.overlay, &proposal.block.header, block_difficulty)?;
719
720 self.proposals.push(proposal.hash);
722
723 self.diffs.push(self.overlay.lock().unwrap().overlay.lock().unwrap().diff(&self.diffs)?);
725
726 Ok(())
727 }
728
729 pub fn last_proposal(&self) -> Result<Proposal> {
731 let block = if let Some(last) = self.proposals.last() {
732 self.overlay.lock().unwrap().get_blocks_by_hash(&[*last])?[0].clone()
733 } else {
734 self.overlay.lock().unwrap().last_block()?
735 };
736
737 Ok(Proposal::new(block))
738 }
739
740 pub fn get_next_block_height(&self) -> Result<u32> {
742 let proposal = self.last_proposal()?;
743 Ok(proposal.block.header.height + 1)
744 }
745
746 pub async fn unproposed_txs(
753 &mut self,
754 verifying_block_height: u32,
755 verify_fees: bool,
756 ) -> Result<(Vec<Transaction>, u64, u64)> {
757 if self.blockchain.transactions.pending.is_empty() {
759 return Ok((vec![], 0, 0))
760 }
761
762 let mut tree = MerkleTree::new(1);
764
765 let mut total_gas_used = 0_u64;
767 let mut total_gas_paid = 0_u64;
768
769 let mut vks: HashMap<[u8; 32], HashMap<String, VerifyingKey>> = HashMap::new();
772
773 let mut unproposed_txs = vec![];
775 let mut erroneous_txs = vec![];
776 for record in self.blockchain.transactions.pending.iter() {
777 let (tx_hash, tx) = parse_record::<TransactionHash, Transaction>(record?)?;
779
780 if self.overlay.lock().unwrap().transactions.contains(&tx_hash)? {
782 continue
783 }
784
785 for call in &tx.calls {
787 vks.entry(call.data.contract_id.to_bytes()).or_default();
788 }
789
790 self.overlay.lock().unwrap().checkpoint();
792 let gas_data = match verify_transaction(
793 &self.overlay,
794 verifying_block_height,
795 self.module.target,
796 &tx,
797 &mut tree,
798 &mut vks,
799 verify_fees,
800 )
801 .await
802 {
803 Ok(gas_values) => gas_values,
804 Err(e) => {
805 debug!(target: "validator::consensus::unproposed_txs", "Transaction verification failed: {e}");
806 self.overlay.lock().unwrap().revert_to_checkpoint();
807 erroneous_txs.push(tx_hash);
808 continue
809 }
810 };
811
812 let tx_gas_used = gas_data.total_gas_used();
814
815 let accumulated_gas_usage = total_gas_used.saturating_add(tx_gas_used);
817
818 if accumulated_gas_usage > BLOCK_GAS_LIMIT {
821 warn!(
822 target: "validator::consensus::unproposed_txs",
823 "Retrieving transaction {tx_hash} would exceed configured unproposed transaction gas limit: {accumulated_gas_usage} - {BLOCK_GAS_LIMIT}"
824 );
825 self.overlay.lock().unwrap().revert_to_checkpoint();
826 break
827 }
828
829 total_gas_used = total_gas_used.saturating_add(tx_gas_used);
831 total_gas_paid = total_gas_paid.saturating_add(gas_data.paid);
832
833 unproposed_txs.push(tx);
835 }
836
837 self.blockchain.remove_pending_txs_hashes(&erroneous_txs)?;
839
840 Ok((unproposed_txs, total_gas_used, total_gas_paid))
841 }
842
843 pub fn full_clone(&self) -> Result<Self> {
848 let blockchain = self.blockchain.clone();
849 let overlay = self.overlay.lock().unwrap().full_clone()?;
850 let module = self.module.clone();
851 let proposals = self.proposals.clone();
852 let diffs = self.diffs.clone();
853 let targets_rank = self.targets_rank.clone();
854 let hashes_rank = self.hashes_rank.clone();
855
856 Ok(Self { blockchain, overlay, module, proposals, diffs, targets_rank, hashes_rank })
857 }
858
859 pub fn healthcheck(&self) -> Result<()> {
866 let state_root = self.overlay.lock().unwrap().contracts.get_state_monotree_root()?;
868
869 let last_block_state_root = self.last_proposal()?.block.header.state_root;
871 if state_root != last_block_state_root {
872 return Err(Error::ContractsStatesRootError(
873 blake3::Hash::from_bytes(state_root).to_string(),
874 blake3::Hash::from_bytes(last_block_state_root).to_string(),
875 ));
876 }
877
878 Ok(())
879 }
880
881 pub fn referenced_trees(&self, trees: &mut BTreeSet<IVec>) {
884 let fork_overlay = self.overlay.lock().unwrap();
886 let overlay = fork_overlay.overlay.lock().unwrap();
887
888 for initial_tree in &overlay.state.initial_tree_names {
890 trees.insert(initial_tree.clone());
891 }
892
893 for new_tree in &overlay.state.new_tree_names {
895 trees.insert(new_tree.clone());
896 }
897
898 for dropped_tree in overlay.state.dropped_trees.keys() {
900 trees.insert(dropped_tree.clone());
901 }
902
903 for protected_tree in &overlay.state.protected_tree_names {
905 trees.insert(protected_tree.clone());
906 }
907 }
908}