darkfid/task/
garbage_collect.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::collections::HashMap;
20
21use darkfi::{
22    blockchain::parse_record, tx::Transaction, validator::verification::verify_transaction,
23    zk::VerifyingKey, Result,
24};
25use darkfi_sdk::{crypto::MerkleTree, tx::TransactionHash};
26use smol::channel::Receiver;
27use tracing::{debug, error, info};
28
29use crate::DarkfiNodePtr;
30
31/// Auxiliary macro to check if channel receiver is empty so we can
32/// abort current iteration.
33macro_rules! trigger_queue_check {
34    ($receiver:ident, $label:tt) => {
35        if !$receiver.is_empty() {
36            continue $label
37         }
38     };
39}
40
41/// Async task used for purging unreferenced trees and erroneous
42/// pending transactions from the nodes mempool.
43pub async fn garbage_collect_task(receiver: Receiver<()>, node: DarkfiNodePtr) -> Result<()> {
44    info!(target: "darkfid::task::garbage_collect_task", "Starting garbage collection task...");
45
46    'outer: loop {
47        // Wait for a new trigger
48        if let Err(e) = receiver.recv().await {
49            error!(target: "darkfid::task::garbage_collect_task", "recv fail: {e}");
50            continue
51        };
52
53        // Purge all unreferenced contract trees from the database
54        trigger_queue_check!(receiver, 'outer);
55        debug!(target: "darkfid::task::garbage_collect_task", "Starting garbage collection iteration...");
56        if let Err(e) = node
57            .validator
58            .read()
59            .await
60            .consensus
61            .purge_unreferenced_trees(&mut node.registry.state.read().await.new_trees())
62            .await
63        {
64            error!(target: "darkfid::task::garbage_collect_task", "Purging unreferenced contract trees from the database failed: {e}");
65            continue
66        }
67        debug!(target: "darkfid::task::garbage_collect_task", "Unreferenced trees purged successfully, retrieving pending transactions...");
68
69        // Check if our mempool is empty
70        trigger_queue_check!(receiver, 'outer);
71        let validator = node.validator.read().await;
72        if validator.blockchain.transactions.pending.is_empty() {
73            debug!(target: "darkfid::task::garbage_collect_task", "No pending transactions to process");
74            continue
75        }
76
77        // Grab validator current best fork and an iterator over its
78        // pending transactions so we don't hold the validator lock.
79        let pending = validator.blockchain.transactions.pending.iter();
80        let fork = match validator.best_current_fork().await {
81            Ok(f) => f,
82            Err(e) => {
83                error!(target: "darkfid::task::garbage_collect_task", "Retrieving validator current best fork failed: {e}");
84                continue
85            }
86        };
87        let verify_fees = validator.verify_fees;
88        drop(validator);
89
90        // Transactions Merkle tree
91        trigger_queue_check!(receiver, 'outer);
92        let mut tree = MerkleTree::new(1);
93
94        // Map of ZK proof verifying keys for the current transactions
95        // batch.
96        let mut vks: HashMap<[u8; 32], HashMap<String, VerifyingKey>> = HashMap::new();
97
98        // Grab forks' next block height
99        let next_block_height = match fork.get_next_block_height() {
100            Ok(h) => h,
101            Err(e) => {
102                error!(
103                   target: "darkfid::task::garbage_collect_task",
104                   "Next fork block height retrieval failed: {e}"
105                );
106                continue
107            }
108        };
109
110        // Iterate over all pending transactions
111        for record in pending {
112            trigger_queue_check!(receiver, 'outer);
113            let record = match record {
114                Ok(r) => r,
115                Err(e) => {
116                    error!(target: "darkfid::task::garbage_collect_task", "Failed retrieving pending tx: {e}");
117                    continue 'outer
118                }
119            };
120            let (tx_hash, tx) = match parse_record::<TransactionHash, Transaction>(record) {
121                Ok((h, t)) => (h, t),
122                Err(e) => {
123                    error!(target: "darkfid::task::garbage_collect_task", "Failed parsing pending tx: {e}");
124                    continue
125                }
126            };
127
128            // If the transaction has already been proposed, remove it
129            trigger_queue_check!(receiver, 'outer);
130            debug!(target: "darkfid::task::garbage_collect_task", "Checking transaction: {tx_hash}");
131            if fork.overlay.lock().unwrap().transactions.contains(&tx_hash)? {
132                debug!(target: "darkfid::task::garbage_collect_task", "Transaction {tx_hash} has already been proposed, removing...");
133                if let Err(e) = fork.blockchain.remove_pending_txs_hashes(&[tx_hash]) {
134                    error!(target: "darkfid::task::garbage_collect_task", "Failed removing pending tx: {e}");
135                };
136                continue
137            }
138
139            // Update the verifying keys map
140            trigger_queue_check!(receiver, 'outer);
141            for call in &tx.calls {
142                vks.entry(call.data.contract_id.to_bytes()).or_default();
143            }
144
145            // Verify the transaction against current state
146            trigger_queue_check!(receiver, 'outer);
147            fork.overlay.lock().unwrap().checkpoint();
148            let result = verify_transaction(
149                &fork.overlay,
150                next_block_height,
151                fork.module.target,
152                &tx,
153                &mut tree,
154                &mut vks,
155                verify_fees,
156            )
157            .await;
158            fork.overlay.lock().unwrap().revert_to_checkpoint();
159            if let Err(e) = result {
160                debug!(target: "darkfid::task::garbage_collect_task", "Pending transaction {tx_hash} verification failed: {e}");
161                if let Err(e) = fork.blockchain.remove_pending_txs_hashes(&[tx_hash]) {
162                    error!(target: "darkfid::task::garbage_collect_task", "Failed removing pending tx: {e}");
163                };
164                continue
165            }
166            debug!(target: "darkfid::task::garbage_collect_task", "Pending transaction {tx_hash} verification successfully.");
167        }
168    }
169}