darkfid/task/
consensus.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::str::FromStr;
20
21use darkfi::{
22    blockchain::HeaderHash,
23    rpc::{jsonrpc::JsonNotification, util::JsonValue},
24    system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25    util::{encoding::base64, time::Timestamp},
26    Error, Result,
27};
28use darkfi_serial::serialize_async;
29use tracing::{error, info};
30
31use crate::{
32    task::{garbage_collect_task, sync_task},
33    DarkfiNodePtr,
34};
35
36/// Auxiliary structure representing node consensus init task configuration.
37#[derive(Clone)]
38pub struct ConsensusInitTaskConfig {
39    /// Skip syncing process and start node right away
40    pub skip_sync: bool,
41    /// Optional sync checkpoint height
42    pub checkpoint_height: Option<u32>,
43    /// Optional sync checkpoint hash
44    pub checkpoint: Option<String>,
45}
46
47/// Sync the node consensus state and start the corresponding task, based on node type.
48pub async fn consensus_init_task(
49    node: DarkfiNodePtr,
50    config: ConsensusInitTaskConfig,
51    ex: ExecutorPtr,
52) -> Result<()> {
53    // Check current canonical blockchain for curruption
54    // TODO: create a restore method reverting each block backwards
55    //       until its healthy again
56    node.validator.consensus.healthcheck().await?;
57
58    // Check if network genesis is in the future.
59    let current = Timestamp::current_time().inner();
60    let genesis = node.validator.consensus.module.read().await.genesis.inner();
61    if current < genesis {
62        let diff = genesis - current;
63        info!(target: "darkfid::task::consensus_init_task", "Waiting for network genesis: {diff} seconds");
64        sleep(diff).await;
65    }
66
67    // Generate a new fork to be able to extend
68    info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
69    node.validator.consensus.generate_empty_fork().await?;
70
71    // Sync blockchain
72    let comms_timeout =
73        node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
74
75    let checkpoint = if !config.skip_sync {
76        // Parse configured checkpoint
77        if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
78            return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
79        }
80
81        let checkpoint = if let Some(height) = config.checkpoint_height {
82            Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
83        } else {
84            None
85        };
86
87        loop {
88            match sync_task(&node, checkpoint).await {
89                Ok(_) => break,
90                Err(e) => {
91                    error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
92                    info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
93                    sleep(comms_timeout).await;
94                }
95            }
96        }
97        checkpoint
98    } else {
99        *node.validator.synced.write().await = true;
100        None
101    };
102
103    // Gracefully handle network disconnections
104    loop {
105        match listen_to_network(&node, &ex).await {
106            Ok(_) => return Ok(()),
107            Err(Error::NetworkNotConnected) => {
108                // Sync node again
109                *node.validator.synced.write().await = false;
110                node.validator.consensus.purge_forks().await?;
111                if !config.skip_sync {
112                    loop {
113                        match sync_task(&node, checkpoint).await {
114                            Ok(_) => break,
115                            Err(e) => {
116                                error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
117                                info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
118                                sleep(comms_timeout).await;
119                            }
120                        }
121                    }
122                } else {
123                    *node.validator.synced.write().await = true;
124                }
125            }
126            Err(e) => return Err(e),
127        }
128    }
129}
130
131/// Async task to start the consensus task, while monitoring for a network disconnections.
132async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
133    // Grab proposals subscriber and subscribe to it
134    let proposals_sub = node.subscribers.get("proposals").unwrap();
135    let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
136
137    // Subscribe to the network disconnect subscriber
138    let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
139
140    let result = smol::future::or(
141        monitor_network(&net_subscription),
142        consensus_task(node, &prop_subscription, ex),
143    )
144    .await;
145
146    // Terminate the subscriptions
147    prop_subscription.unsubscribe().await;
148    net_subscription.unsubscribe().await;
149
150    result
151}
152
153/// Async task to monitor network disconnections.
154async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
155    Err(subscription.receive().await)
156}
157
158/// Async task used for listening for new blocks and perform consensus.
159async fn consensus_task(
160    node: &DarkfiNodePtr,
161    subscription: &Subscription<JsonNotification>,
162    ex: &ExecutorPtr,
163) -> Result<()> {
164    info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
165
166    // Grab blocks subscriber
167    let block_sub = node.subscribers.get("blocks").unwrap();
168
169    // Create the garbage collection task using a dummy task
170    let gc_task = StoppableTask::new();
171    gc_task.clone().start(
172        async { Ok(()) },
173        |_| async { /* Do nothing */ },
174        Error::GarbageCollectionTaskStopped,
175        ex.clone(),
176    );
177
178    loop {
179        subscription.receive().await;
180
181        // Check if we can confirm anything and broadcast them
182        let confirmed = match node.validator.confirmation().await {
183            Ok(f) => f,
184            Err(e) => {
185                error!(
186                    target: "darkfid::task::consensus_task",
187                    "Confirmation failed: {e}"
188                );
189                continue
190            }
191        };
192
193        if let Err(e) = node.registry.refresh(&node.validator).await {
194            error!(target: "darkfid", "Failed refreshing mining block templates: {e}")
195        }
196
197        if confirmed.is_empty() {
198            continue
199        }
200
201        let mut notif_blocks = Vec::with_capacity(confirmed.len());
202        for block in confirmed {
203            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
204        }
205        block_sub.notify(JsonValue::Array(notif_blocks)).await;
206
207        // Invoke the detached garbage collection task
208        gc_task.clone().stop().await;
209        gc_task.clone().start(
210            garbage_collect_task(node.clone()),
211            |res| async {
212                match res {
213                    Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
214                    Err(e) => {
215                        error!(target: "darkfid", "Failed starting garbage collection task: {e}")
216                    }
217                }
218            },
219            Error::GarbageCollectionTaskStopped,
220            ex.clone(),
221        );
222    }
223}