darkfid/task/
consensus.rs1use std::str::FromStr;
20
21use darkfi::{
22 blockchain::HeaderHash,
23 rpc::{jsonrpc::JsonNotification, util::JsonValue},
24 system::{sleep, Subscription},
25 util::{encoding::base64, time::Timestamp},
26 Error, Result,
27};
28use darkfi_serial::serialize_async;
29use smol::channel::Sender;
30use tracing::{error, info};
31
32use crate::{task::sync_task, DarkfiNodePtr};
33
34#[derive(Clone)]
36pub struct ConsensusInitTaskConfig {
37 pub skip_sync: bool,
39 pub checkpoint_height: Option<u32>,
41 pub checkpoint: Option<String>,
43}
44
45pub async fn consensus_init_task(
47 node: DarkfiNodePtr,
48 config: ConsensusInitTaskConfig,
49 sender: Sender<()>,
50) -> Result<()> {
51 let mut validator = node.validator.write().await;
55 validator.consensus.healthcheck().await?;
56
57 let current = Timestamp::current_time().inner();
59 let genesis = validator.consensus.module.genesis.inner();
60 if current < genesis {
61 let diff = genesis - current;
62 info!(target: "darkfid::task::consensus_init_task", "Waiting for network genesis: {diff} seconds");
63 sleep(diff).await;
64 }
65
66 info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
68 validator.consensus.generate_empty_fork().await?;
69 drop(validator);
70
71 let comms_timeout =
73 node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
74
75 let checkpoint = if !config.skip_sync {
76 if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
78 return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
79 }
80
81 let checkpoint = if let Some(height) = config.checkpoint_height {
82 Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
83 } else {
84 None
85 };
86
87 loop {
88 match sync_task(&node, checkpoint).await {
89 Ok(_) => break,
90 Err(e) => {
91 error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
92 info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
93 sleep(comms_timeout).await;
94 }
95 }
96 }
97 checkpoint
98 } else {
99 node.validator.write().await.synced = true;
100 None
101 };
102
103 loop {
105 match listen_to_network(&node, &sender).await {
106 Ok(_) => return Ok(()),
107 Err(Error::NetworkNotConnected) => {
108 node.validator.write().await.synced = false;
110 if !config.skip_sync {
111 loop {
112 match sync_task(&node, checkpoint).await {
113 Ok(_) => break,
114 Err(e) => {
115 error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
116 info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
117 sleep(comms_timeout).await;
118 }
119 }
120 }
121 } else {
122 node.validator.write().await.synced = true;
123 }
124 }
125 Err(e) => return Err(e),
126 }
127 }
128}
129
130async fn listen_to_network(node: &DarkfiNodePtr, sender: &Sender<()>) -> Result<()> {
132 let proposals_sub = node.subscribers.get("proposals").unwrap();
134 let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
135
136 let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
138
139 let result = smol::future::or(
140 monitor_network(&net_subscription),
141 consensus_task(node, &prop_subscription, sender),
142 )
143 .await;
144
145 prop_subscription.unsubscribe().await;
147 net_subscription.unsubscribe().await;
148
149 result
150}
151
152async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
154 Err(subscription.receive().await)
155}
156
157async fn consensus_task(
159 node: &DarkfiNodePtr,
160 subscription: &Subscription<JsonNotification>,
161 sender: &Sender<()>,
162) -> Result<()> {
163 info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
164
165 let block_sub = node.subscribers.get("blocks").unwrap();
167
168 loop {
169 subscription.receive().await;
171
172 let mut validator = node.validator.write().await;
174 let confirmed = match validator.confirmation().await {
175 Ok(f) => f,
176 Err(e) => {
177 error!(
178 target: "darkfid::task::consensus_task",
179 "Confirmation failed: {e}"
180 );
181 continue
182 }
183 };
184
185 if let Err(e) = node.registry.state.write().await.refresh(&validator).await {
187 error!(target: "darkfid::task::consensus_task", "Failed refreshing mining block templates: {e}")
188 }
189
190 if let Err(e) = sender.send(()).await {
192 error!(
193 target: "darkfid::task::consensus_task",
194 "Garbage collection channel send fail: {e}"
195 );
196 };
197
198 if confirmed.is_empty() {
200 continue
201 }
202
203 let mut notif_blocks = Vec::with_capacity(confirmed.len());
205 for block in confirmed {
206 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
207 }
208 block_sub.notify(JsonValue::Array(notif_blocks)).await;
209 }
210}