darkfid/task/
consensus.rs1use std::str::FromStr;
20
21use darkfi::{
22 blockchain::HeaderHash,
23 rpc::{jsonrpc::JsonNotification, util::JsonValue},
24 system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25 util::{encoding::base64, time::Timestamp},
26 Error, Result,
27};
28use darkfi_serial::serialize_async;
29use tracing::{error, info};
30
31use crate::{
32 task::{garbage_collect_task, sync_task},
33 DarkfiNodePtr,
34};
35
36#[derive(Clone)]
38pub struct ConsensusInitTaskConfig {
39 pub skip_sync: bool,
41 pub checkpoint_height: Option<u32>,
43 pub checkpoint: Option<String>,
45}
46
47pub async fn consensus_init_task(
49 node: DarkfiNodePtr,
50 config: ConsensusInitTaskConfig,
51 ex: ExecutorPtr,
52) -> Result<()> {
53 node.validator.consensus.healthcheck().await?;
57
58 let current = Timestamp::current_time().inner();
60 let genesis = node.validator.consensus.module.read().await.genesis.inner();
61 if current < genesis {
62 let diff = genesis - current;
63 info!(target: "darkfid::task::consensus_init_task", "Waiting for network genesis: {diff} seconds");
64 sleep(diff).await;
65 }
66
67 info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
69 node.validator.consensus.generate_empty_fork().await?;
70
71 let comms_timeout =
73 node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
74
75 let checkpoint = if !config.skip_sync {
76 if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
78 return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
79 }
80
81 let checkpoint = if let Some(height) = config.checkpoint_height {
82 Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
83 } else {
84 None
85 };
86
87 loop {
88 match sync_task(&node, checkpoint).await {
89 Ok(_) => break,
90 Err(e) => {
91 error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
92 info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
93 sleep(comms_timeout).await;
94 }
95 }
96 }
97 checkpoint
98 } else {
99 *node.validator.synced.write().await = true;
100 None
101 };
102
103 loop {
105 match listen_to_network(&node, &ex).await {
106 Ok(_) => return Ok(()),
107 Err(Error::NetworkNotConnected) => {
108 *node.validator.synced.write().await = false;
110 node.validator.consensus.purge_forks().await?;
111 if !config.skip_sync {
112 loop {
113 match sync_task(&node, checkpoint).await {
114 Ok(_) => break,
115 Err(e) => {
116 error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
117 info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
118 sleep(comms_timeout).await;
119 }
120 }
121 }
122 } else {
123 *node.validator.synced.write().await = true;
124 }
125 }
126 Err(e) => return Err(e),
127 }
128 }
129}
130
131async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
133 let proposals_sub = node.subscribers.get("proposals").unwrap();
135 let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
136
137 let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
139
140 let result = smol::future::or(
141 monitor_network(&net_subscription),
142 consensus_task(node, &prop_subscription, ex),
143 )
144 .await;
145
146 prop_subscription.unsubscribe().await;
148 net_subscription.unsubscribe().await;
149
150 result
151}
152
153async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
155 Err(subscription.receive().await)
156}
157
158async fn consensus_task(
160 node: &DarkfiNodePtr,
161 subscription: &Subscription<JsonNotification>,
162 ex: &ExecutorPtr,
163) -> Result<()> {
164 info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
165
166 let block_sub = node.subscribers.get("blocks").unwrap();
168
169 let gc_task = StoppableTask::new();
171 gc_task.clone().start(
172 async { Ok(()) },
173 |_| async { },
174 Error::GarbageCollectionTaskStopped,
175 ex.clone(),
176 );
177
178 loop {
179 subscription.receive().await;
180
181 let confirmed = match node.validator.confirmation().await {
183 Ok(f) => f,
184 Err(e) => {
185 error!(
186 target: "darkfid::task::consensus_task",
187 "Confirmation failed: {e}"
188 );
189 continue
190 }
191 };
192
193 if let Err(e) = node.registry.refresh(&node.validator).await {
194 error!(target: "darkfid", "Failed refreshing mining block templates: {e}")
195 }
196
197 if confirmed.is_empty() {
198 continue
199 }
200
201 let mut notif_blocks = Vec::with_capacity(confirmed.len());
202 for block in confirmed {
203 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
204 }
205 block_sub.notify(JsonValue::Array(notif_blocks)).await;
206
207 gc_task.clone().stop().await;
209 gc_task.clone().start(
210 garbage_collect_task(node.clone()),
211 |res| async {
212 match res {
213 Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { }
214 Err(e) => {
215 error!(target: "darkfid", "Failed starting garbage collection task: {e}")
216 }
217 }
218 },
219 Error::GarbageCollectionTaskStopped,
220 ex.clone(),
221 );
222 }
223}