darkfid/task/
consensus.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::str::FromStr;
20
21use darkfi::{
22    blockchain::HeaderHash,
23    rpc::{jsonrpc::JsonNotification, util::JsonValue},
24    system::{sleep, Subscription},
25    util::{encoding::base64, time::Timestamp},
26    Error, Result,
27};
28use darkfi_serial::serialize_async;
29use smol::channel::Sender;
30use tracing::{error, info};
31
32use crate::{task::sync_task, DarkfiNodePtr};
33
34/// Auxiliary structure representing node consensus init task configuration.
35#[derive(Clone)]
36pub struct ConsensusInitTaskConfig {
37    /// Skip syncing process and start node right away
38    pub skip_sync: bool,
39    /// Optional sync checkpoint height
40    pub checkpoint_height: Option<u32>,
41    /// Optional sync checkpoint hash
42    pub checkpoint: Option<String>,
43}
44
45/// Sync the node consensus state and start the corresponding task, based on node type.
46pub async fn consensus_init_task(
47    node: DarkfiNodePtr,
48    config: ConsensusInitTaskConfig,
49    sender: Sender<()>,
50) -> Result<()> {
51    // Check current canonical blockchain for curruption
52    // TODO: create a restore method reverting each block backwards
53    //       until its healthy again
54    let mut validator = node.validator.write().await;
55    validator.consensus.healthcheck().await?;
56
57    // Check if network genesis is in the future.
58    let current = Timestamp::current_time().inner();
59    let genesis = validator.consensus.module.genesis.inner();
60    if current < genesis {
61        let diff = genesis - current;
62        info!(target: "darkfid::task::consensus_init_task", "Waiting for network genesis: {diff} seconds");
63        sleep(diff).await;
64    }
65
66    // Generate a new fork to be able to extend
67    info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
68    validator.consensus.generate_empty_fork().await?;
69    drop(validator);
70
71    // Sync blockchain
72    let comms_timeout =
73        node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
74
75    let checkpoint = if !config.skip_sync {
76        // Parse configured checkpoint
77        if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
78            return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
79        }
80
81        let checkpoint = if let Some(height) = config.checkpoint_height {
82            Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
83        } else {
84            None
85        };
86
87        loop {
88            match sync_task(&node, checkpoint).await {
89                Ok(_) => break,
90                Err(e) => {
91                    error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
92                    info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
93                    sleep(comms_timeout).await;
94                }
95            }
96        }
97        checkpoint
98    } else {
99        node.validator.write().await.synced = true;
100        None
101    };
102
103    // Gracefully handle network disconnections
104    loop {
105        match listen_to_network(&node, &sender).await {
106            Ok(_) => return Ok(()),
107            Err(Error::NetworkNotConnected) => {
108                // Sync node again
109                node.validator.write().await.synced = false;
110                if !config.skip_sync {
111                    loop {
112                        match sync_task(&node, checkpoint).await {
113                            Ok(_) => break,
114                            Err(e) => {
115                                error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
116                                info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
117                                sleep(comms_timeout).await;
118                            }
119                        }
120                    }
121                } else {
122                    node.validator.write().await.synced = true;
123                }
124            }
125            Err(e) => return Err(e),
126        }
127    }
128}
129
130/// Async task to start the consensus task, while monitoring for a network disconnections.
131async fn listen_to_network(node: &DarkfiNodePtr, sender: &Sender<()>) -> Result<()> {
132    // Grab proposals subscriber and subscribe to it
133    let proposals_sub = node.subscribers.get("proposals").unwrap();
134    let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
135
136    // Subscribe to the network disconnect subscriber
137    let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
138
139    let result = smol::future::or(
140        monitor_network(&net_subscription),
141        consensus_task(node, &prop_subscription, sender),
142    )
143    .await;
144
145    // Terminate the subscriptions
146    prop_subscription.unsubscribe().await;
147    net_subscription.unsubscribe().await;
148
149    result
150}
151
152/// Async task to monitor network disconnections.
153async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
154    Err(subscription.receive().await)
155}
156
157/// Async task used for listening for new blocks and perform consensus.
158async fn consensus_task(
159    node: &DarkfiNodePtr,
160    subscription: &Subscription<JsonNotification>,
161    sender: &Sender<()>,
162) -> Result<()> {
163    info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
164
165    // Grab blocks subscriber
166    let block_sub = node.subscribers.get("blocks").unwrap();
167
168    loop {
169        // Wait for a new proposal
170        subscription.receive().await;
171
172        // Check if we can confirm anything and broadcast them
173        let mut validator = node.validator.write().await;
174        let confirmed = match validator.confirmation().await {
175            Ok(f) => f,
176            Err(e) => {
177                error!(
178                    target: "darkfid::task::consensus_task",
179                    "Confirmation failed: {e}"
180                );
181                continue
182            }
183        };
184
185        // Refresh mining registry
186        if let Err(e) = node.registry.state.write().await.refresh(&validator).await {
187            error!(target: "darkfid::task::consensus_task", "Failed refreshing mining block templates: {e}")
188        }
189
190        // Notify the garbage collection task
191        if let Err(e) = sender.send(()).await {
192            error!(
193                target: "darkfid::task::consensus_task",
194                "Garbage collection channel send fail: {e}"
195            );
196        };
197
198        // Check if something was confirmed
199        if confirmed.is_empty() {
200            continue
201        }
202
203        // Broadcast confirmed blocks to subscribers
204        let mut notif_blocks = Vec::with_capacity(confirmed.len());
205        for block in confirmed {
206            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
207        }
208        block_sub.notify(JsonValue::Array(notif_blocks)).await;
209    }
210}