darkfid/task/
consensus.rs1use std::str::FromStr;
20
21use darkfi::{
22 blockchain::HeaderHash,
23 rpc::{jsonrpc::JsonNotification, util::JsonValue},
24 system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25 util::{encoding::base64, time::Timestamp},
26 Error, Result,
27};
28use darkfi_serial::serialize_async;
29use tracing::{error, info};
30
31use crate::{
32 task::{
33 garbage_collect::{garbage_collect_task, purge_unreferenced_trees},
34 sync_task,
35 },
36 DarkfiNodePtr,
37};
38
39#[derive(Clone)]
41pub struct ConsensusInitTaskConfig {
42 pub skip_sync: bool,
44 pub checkpoint_height: Option<u32>,
46 pub checkpoint: Option<String>,
48}
49
50pub async fn consensus_init_task(
52 node: DarkfiNodePtr,
53 config: ConsensusInitTaskConfig,
54 ex: ExecutorPtr,
55) -> Result<()> {
56 node.validator.consensus.healthcheck().await?;
60
61 let current = Timestamp::current_time().inner();
63 let genesis = node.validator.consensus.module.read().await.genesis.inner();
64 if current < genesis {
65 let diff = genesis - current;
66 info!(target: "darkfid::task::consensus_init_task", "Waiting for network genesis: {diff} seconds");
67 sleep(diff).await;
68 }
69
70 info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
72 node.validator.consensus.generate_empty_fork().await?;
73
74 let comms_timeout =
76 node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
77
78 let checkpoint = if !config.skip_sync {
79 if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
81 return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
82 }
83
84 let checkpoint = if let Some(height) = config.checkpoint_height {
85 Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
86 } else {
87 None
88 };
89
90 loop {
91 match sync_task(&node, checkpoint).await {
92 Ok(_) => break,
93 Err(e) => {
94 error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
95 info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
96 sleep(comms_timeout).await;
97 }
98 }
99 }
100 checkpoint
101 } else {
102 *node.validator.synced.write().await = true;
103 None
104 };
105
106 loop {
108 match listen_to_network(&node, &ex).await {
109 Ok(_) => return Ok(()),
110 Err(Error::NetworkNotConnected) => {
111 *node.validator.synced.write().await = false;
113 if !config.skip_sync {
114 loop {
115 match sync_task(&node, checkpoint).await {
116 Ok(_) => break,
117 Err(e) => {
118 error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
119 info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
120 sleep(comms_timeout).await;
121 }
122 }
123 }
124 } else {
125 *node.validator.synced.write().await = true;
126 }
127 }
128 Err(e) => return Err(e),
129 }
130 }
131}
132
133async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
135 let proposals_sub = node.subscribers.get("proposals").unwrap();
137 let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
138
139 let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
141
142 let result = smol::future::or(
143 monitor_network(&net_subscription),
144 consensus_task(node, &prop_subscription, ex),
145 )
146 .await;
147
148 prop_subscription.unsubscribe().await;
150 net_subscription.unsubscribe().await;
151
152 result
153}
154
155async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
157 Err(subscription.receive().await)
158}
159
160async fn consensus_task(
162 node: &DarkfiNodePtr,
163 subscription: &Subscription<JsonNotification>,
164 ex: &ExecutorPtr,
165) -> Result<()> {
166 info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
167
168 let block_sub = node.subscribers.get("blocks").unwrap();
170
171 let gc_task = StoppableTask::new();
173 gc_task.clone().start(
174 async { Ok(()) },
175 |_| async { },
176 Error::GarbageCollectionTaskStopped,
177 ex.clone(),
178 );
179
180 loop {
181 subscription.receive().await;
182
183 let confirmed = match node.validator.confirmation().await {
185 Ok(f) => f,
186 Err(e) => {
187 error!(
188 target: "darkfid::task::consensus_task",
189 "Confirmation failed: {e}"
190 );
191 continue
192 }
193 };
194
195 if let Err(e) = node.registry.refresh(&node.validator).await {
197 error!(target: "darkfid", "Failed refreshing mining block templates: {e}")
198 }
199
200 if confirmed.is_empty() {
201 continue
202 }
203
204 let append_lock = node.validator.consensus.append_lock.write().await;
208 purge_unreferenced_trees(node).await;
209 drop(append_lock);
210
211 let mut notif_blocks = Vec::with_capacity(confirmed.len());
212 for block in confirmed {
213 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
214 }
215 block_sub.notify(JsonValue::Array(notif_blocks)).await;
216
217 gc_task.clone().stop().await;
219 gc_task.clone().start(
220 garbage_collect_task(node.clone()),
221 |res| async {
222 match res {
223 Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { }
224 Err(e) => {
225 error!(target: "darkfid", "Failed starting garbage collection task: {e}")
226 }
227 }
228 },
229 Error::GarbageCollectionTaskStopped,
230 ex.clone(),
231 );
232 }
233}