darkfid/task/
consensus.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::str::FromStr;
20
21use darkfi::{
22    blockchain::HeaderHash,
23    rpc::{jsonrpc::JsonNotification, util::JsonValue},
24    system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25    util::{encoding::base64, time::Timestamp},
26    Error, Result,
27};
28use darkfi_serial::serialize_async;
29use tracing::{error, info};
30
31use crate::{
32    task::{
33        garbage_collect::{garbage_collect_task, purge_unreferenced_trees},
34        sync_task,
35    },
36    DarkfiNodePtr,
37};
38
39/// Auxiliary structure representing node consensus init task configuration.
40#[derive(Clone)]
41pub struct ConsensusInitTaskConfig {
42    /// Skip syncing process and start node right away
43    pub skip_sync: bool,
44    /// Optional sync checkpoint height
45    pub checkpoint_height: Option<u32>,
46    /// Optional sync checkpoint hash
47    pub checkpoint: Option<String>,
48}
49
50/// Sync the node consensus state and start the corresponding task, based on node type.
51pub async fn consensus_init_task(
52    node: DarkfiNodePtr,
53    config: ConsensusInitTaskConfig,
54    ex: ExecutorPtr,
55) -> Result<()> {
56    // Check current canonical blockchain for curruption
57    // TODO: create a restore method reverting each block backwards
58    //       until its healthy again
59    node.validator.consensus.healthcheck().await?;
60
61    // Check if network genesis is in the future.
62    let current = Timestamp::current_time().inner();
63    let genesis = node.validator.consensus.module.read().await.genesis.inner();
64    if current < genesis {
65        let diff = genesis - current;
66        info!(target: "darkfid::task::consensus_init_task", "Waiting for network genesis: {diff} seconds");
67        sleep(diff).await;
68    }
69
70    // Generate a new fork to be able to extend
71    info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
72    node.validator.consensus.generate_empty_fork().await?;
73
74    // Sync blockchain
75    let comms_timeout =
76        node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
77
78    let checkpoint = if !config.skip_sync {
79        // Parse configured checkpoint
80        if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
81            return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
82        }
83
84        let checkpoint = if let Some(height) = config.checkpoint_height {
85            Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
86        } else {
87            None
88        };
89
90        loop {
91            match sync_task(&node, checkpoint).await {
92                Ok(_) => break,
93                Err(e) => {
94                    error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
95                    info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
96                    sleep(comms_timeout).await;
97                }
98            }
99        }
100        checkpoint
101    } else {
102        *node.validator.synced.write().await = true;
103        None
104    };
105
106    // Gracefully handle network disconnections
107    loop {
108        match listen_to_network(&node, &ex).await {
109            Ok(_) => return Ok(()),
110            Err(Error::NetworkNotConnected) => {
111                // Sync node again
112                *node.validator.synced.write().await = false;
113                if !config.skip_sync {
114                    loop {
115                        match sync_task(&node, checkpoint).await {
116                            Ok(_) => break,
117                            Err(e) => {
118                                error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
119                                info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
120                                sleep(comms_timeout).await;
121                            }
122                        }
123                    }
124                } else {
125                    *node.validator.synced.write().await = true;
126                }
127            }
128            Err(e) => return Err(e),
129        }
130    }
131}
132
133/// Async task to start the consensus task, while monitoring for a network disconnections.
134async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
135    // Grab proposals subscriber and subscribe to it
136    let proposals_sub = node.subscribers.get("proposals").unwrap();
137    let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
138
139    // Subscribe to the network disconnect subscriber
140    let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
141
142    let result = smol::future::or(
143        monitor_network(&net_subscription),
144        consensus_task(node, &prop_subscription, ex),
145    )
146    .await;
147
148    // Terminate the subscriptions
149    prop_subscription.unsubscribe().await;
150    net_subscription.unsubscribe().await;
151
152    result
153}
154
155/// Async task to monitor network disconnections.
156async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
157    Err(subscription.receive().await)
158}
159
160/// Async task used for listening for new blocks and perform consensus.
161async fn consensus_task(
162    node: &DarkfiNodePtr,
163    subscription: &Subscription<JsonNotification>,
164    ex: &ExecutorPtr,
165) -> Result<()> {
166    info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
167
168    // Grab blocks subscriber
169    let block_sub = node.subscribers.get("blocks").unwrap();
170
171    // Create the garbage collection task using a dummy task
172    let gc_task = StoppableTask::new();
173    gc_task.clone().start(
174        async { Ok(()) },
175        |_| async { /* Do nothing */ },
176        Error::GarbageCollectionTaskStopped,
177        ex.clone(),
178    );
179
180    loop {
181        subscription.receive().await;
182
183        // Check if we can confirm anything and broadcast them
184        let confirmed = match node.validator.confirmation().await {
185            Ok(f) => f,
186            Err(e) => {
187                error!(
188                    target: "darkfid::task::consensus_task",
189                    "Confirmation failed: {e}"
190                );
191                continue
192            }
193        };
194
195        // Refresh mining registry
196        if let Err(e) = node.registry.refresh(&node.validator).await {
197            error!(target: "darkfid", "Failed refreshing mining block templates: {e}")
198        }
199
200        if confirmed.is_empty() {
201            continue
202        }
203
204        // Grab the append lock so no other proposal gets processed
205        // while the node is purging all unreferenced contract trees
206        // from the database.
207        let append_lock = node.validator.consensus.append_lock.write().await;
208        purge_unreferenced_trees(node).await;
209        drop(append_lock);
210
211        let mut notif_blocks = Vec::with_capacity(confirmed.len());
212        for block in confirmed {
213            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
214        }
215        block_sub.notify(JsonValue::Array(notif_blocks)).await;
216
217        // Invoke the detached garbage collection task
218        gc_task.clone().stop().await;
219        gc_task.clone().start(
220            garbage_collect_task(node.clone()),
221            |res| async {
222                match res {
223                    Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
224                    Err(e) => {
225                        error!(target: "darkfid", "Failed starting garbage collection task: {e}")
226                    }
227                }
228            },
229            Error::GarbageCollectionTaskStopped,
230            ex.clone(),
231        );
232    }
233}