darkfid/task/
consensus.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::str::FromStr;
20
21use darkfi::{
22    blockchain::HeaderHash,
23    rpc::{jsonrpc::JsonNotification, util::JsonValue},
24    system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25    util::{encoding::base64, time::Timestamp},
26    Error, Result,
27};
28use darkfi_serial::serialize_async;
29use tracing::{error, info};
30
31use crate::{
32    task::{garbage_collect::garbage_collect_task, sync_task},
33    DarkfiNodePtr,
34};
35
36/// Auxiliary structure representing node consensus init task configuration.
37#[derive(Clone)]
38pub struct ConsensusInitTaskConfig {
39    /// Skip syncing process and start node right away
40    pub skip_sync: bool,
41    /// Optional sync checkpoint height
42    pub checkpoint_height: Option<u32>,
43    /// Optional sync checkpoint hash
44    pub checkpoint: Option<String>,
45}
46
47/// Sync the node consensus state and start the corresponding task, based on node type.
48pub async fn consensus_init_task(
49    node: DarkfiNodePtr,
50    config: ConsensusInitTaskConfig,
51    ex: ExecutorPtr,
52) -> Result<()> {
53    // Check current canonical blockchain for curruption
54    // TODO: create a restore method reverting each block backwards
55    //       until its healthy again
56    let mut validator = node.validator.write().await;
57    validator.consensus.healthcheck().await?;
58
59    // Check if network genesis is in the future.
60    let current = Timestamp::current_time().inner();
61    let genesis = validator.consensus.module.genesis.inner();
62    if current < genesis {
63        let diff = genesis - current;
64        info!(target: "darkfid::task::consensus_init_task", "Waiting for network genesis: {diff} seconds");
65        sleep(diff).await;
66    }
67
68    // Generate a new fork to be able to extend
69    info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
70    validator.consensus.generate_empty_fork().await?;
71    drop(validator);
72
73    // Sync blockchain
74    let comms_timeout =
75        node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
76
77    let checkpoint = if !config.skip_sync {
78        // Parse configured checkpoint
79        if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
80            return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
81        }
82
83        let checkpoint = if let Some(height) = config.checkpoint_height {
84            Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
85        } else {
86            None
87        };
88
89        loop {
90            match sync_task(&node, checkpoint).await {
91                Ok(_) => break,
92                Err(e) => {
93                    error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
94                    info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
95                    sleep(comms_timeout).await;
96                }
97            }
98        }
99        checkpoint
100    } else {
101        node.validator.write().await.synced = true;
102        None
103    };
104
105    // Gracefully handle network disconnections
106    loop {
107        match listen_to_network(&node, &ex).await {
108            Ok(_) => return Ok(()),
109            Err(Error::NetworkNotConnected) => {
110                // Sync node again
111                node.validator.write().await.synced = false;
112                if !config.skip_sync {
113                    loop {
114                        match sync_task(&node, checkpoint).await {
115                            Ok(_) => break,
116                            Err(e) => {
117                                error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
118                                info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
119                                sleep(comms_timeout).await;
120                            }
121                        }
122                    }
123                } else {
124                    node.validator.write().await.synced = true;
125                }
126            }
127            Err(e) => return Err(e),
128        }
129    }
130}
131
132/// Async task to start the consensus task, while monitoring for a network disconnections.
133async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
134    // Grab proposals subscriber and subscribe to it
135    let proposals_sub = node.subscribers.get("proposals").unwrap();
136    let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
137
138    // Subscribe to the network disconnect subscriber
139    let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
140
141    let result = smol::future::or(
142        monitor_network(&net_subscription),
143        consensus_task(node, &prop_subscription, ex),
144    )
145    .await;
146
147    // Terminate the subscriptions
148    prop_subscription.unsubscribe().await;
149    net_subscription.unsubscribe().await;
150
151    result
152}
153
154/// Async task to monitor network disconnections.
155async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
156    Err(subscription.receive().await)
157}
158
159/// Async task used for listening for new blocks and perform consensus.
160async fn consensus_task(
161    node: &DarkfiNodePtr,
162    subscription: &Subscription<JsonNotification>,
163    ex: &ExecutorPtr,
164) -> Result<()> {
165    info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
166
167    // Grab blocks subscriber
168    let block_sub = node.subscribers.get("blocks").unwrap();
169
170    // Create the garbage collection task using a dummy task
171    let gc_task = StoppableTask::new();
172    gc_task.clone().start(
173        async { Ok(()) },
174        |_| async { /* Do nothing */ },
175        Error::GarbageCollectionTaskStopped,
176        ex.clone(),
177    );
178
179    loop {
180        subscription.receive().await;
181
182        // Check if we can confirm anything and broadcast them
183        let mut validator = node.validator.write().await;
184        let confirmed = match validator.confirmation().await {
185            Ok(f) => f,
186            Err(e) => {
187                error!(
188                    target: "darkfid::task::consensus_task",
189                    "Confirmation failed: {e}"
190                );
191                continue
192            }
193        };
194
195        // Refresh mining registry
196        let mut registry = node.registry.state.write().await;
197        if let Err(e) = registry.refresh(&validator).await {
198            error!(target: "darkfid", "Failed refreshing mining block templates: {e}")
199        }
200
201        if confirmed.is_empty() {
202            continue
203        }
204
205        // Purge all unreferenced contract trees from the database
206        if let Err(e) =
207            validator.consensus.purge_unreferenced_trees(&mut registry.new_trees()).await
208        {
209            error!(target: "darkfid::task::garbage_collect::purge_unreferenced_trees", "Purging unreferenced contract trees from the database failed: {e}");
210        }
211
212        let mut notif_blocks = Vec::with_capacity(confirmed.len());
213        for block in confirmed {
214            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
215        }
216        block_sub.notify(JsonValue::Array(notif_blocks)).await;
217
218        // Invoke the detached garbage collection task
219        gc_task.clone().stop().await;
220        gc_task.clone().start(
221            garbage_collect_task(node.clone()),
222            |res| async {
223                match res {
224                    Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
225                    Err(e) => {
226                        error!(target: "darkfid", "Failed starting garbage collection task: {e}")
227                    }
228                }
229            },
230            Error::GarbageCollectionTaskStopped,
231            ex.clone(),
232        );
233    }
234}