darkfid/task/
consensus.rs1use std::str::FromStr;
20
21use darkfi::{
22 blockchain::HeaderHash,
23 rpc::{jsonrpc::JsonNotification, util::JsonValue},
24 system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25 util::{encoding::base64, time::Timestamp},
26 Error, Result,
27};
28use darkfi_serial::serialize_async;
29use tracing::{error, info};
30
31use crate::{
32 task::{garbage_collect::garbage_collect_task, sync_task},
33 DarkfiNodePtr,
34};
35
36#[derive(Clone)]
38pub struct ConsensusInitTaskConfig {
39 pub skip_sync: bool,
41 pub checkpoint_height: Option<u32>,
43 pub checkpoint: Option<String>,
45}
46
47pub async fn consensus_init_task(
49 node: DarkfiNodePtr,
50 config: ConsensusInitTaskConfig,
51 ex: ExecutorPtr,
52) -> Result<()> {
53 let mut validator = node.validator.write().await;
57 validator.consensus.healthcheck().await?;
58
59 let current = Timestamp::current_time().inner();
61 let genesis = validator.consensus.module.genesis.inner();
62 if current < genesis {
63 let diff = genesis - current;
64 info!(target: "darkfid::task::consensus_init_task", "Waiting for network genesis: {diff} seconds");
65 sleep(diff).await;
66 }
67
68 info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
70 validator.consensus.generate_empty_fork().await?;
71 drop(validator);
72
73 let comms_timeout =
75 node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
76
77 let checkpoint = if !config.skip_sync {
78 if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
80 return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
81 }
82
83 let checkpoint = if let Some(height) = config.checkpoint_height {
84 Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
85 } else {
86 None
87 };
88
89 loop {
90 match sync_task(&node, checkpoint).await {
91 Ok(_) => break,
92 Err(e) => {
93 error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
94 info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
95 sleep(comms_timeout).await;
96 }
97 }
98 }
99 checkpoint
100 } else {
101 node.validator.write().await.synced = true;
102 None
103 };
104
105 loop {
107 match listen_to_network(&node, &ex).await {
108 Ok(_) => return Ok(()),
109 Err(Error::NetworkNotConnected) => {
110 node.validator.write().await.synced = false;
112 if !config.skip_sync {
113 loop {
114 match sync_task(&node, checkpoint).await {
115 Ok(_) => break,
116 Err(e) => {
117 error!(target: "darkfid::task::consensus_task", "Sync task failed: {e}");
118 info!(target: "darkfid::task::consensus_task", "Sleeping for {comms_timeout} before retry...");
119 sleep(comms_timeout).await;
120 }
121 }
122 }
123 } else {
124 node.validator.write().await.synced = true;
125 }
126 }
127 Err(e) => return Err(e),
128 }
129 }
130}
131
132async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
134 let proposals_sub = node.subscribers.get("proposals").unwrap();
136 let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
137
138 let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
140
141 let result = smol::future::or(
142 monitor_network(&net_subscription),
143 consensus_task(node, &prop_subscription, ex),
144 )
145 .await;
146
147 prop_subscription.unsubscribe().await;
149 net_subscription.unsubscribe().await;
150
151 result
152}
153
154async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
156 Err(subscription.receive().await)
157}
158
159async fn consensus_task(
161 node: &DarkfiNodePtr,
162 subscription: &Subscription<JsonNotification>,
163 ex: &ExecutorPtr,
164) -> Result<()> {
165 info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
166
167 let block_sub = node.subscribers.get("blocks").unwrap();
169
170 let gc_task = StoppableTask::new();
172 gc_task.clone().start(
173 async { Ok(()) },
174 |_| async { },
175 Error::GarbageCollectionTaskStopped,
176 ex.clone(),
177 );
178
179 loop {
180 subscription.receive().await;
181
182 let mut validator = node.validator.write().await;
184 let confirmed = match validator.confirmation().await {
185 Ok(f) => f,
186 Err(e) => {
187 error!(
188 target: "darkfid::task::consensus_task",
189 "Confirmation failed: {e}"
190 );
191 continue
192 }
193 };
194
195 let mut registry = node.registry.state.write().await;
197 if let Err(e) = registry.refresh(&validator).await {
198 error!(target: "darkfid", "Failed refreshing mining block templates: {e}")
199 }
200
201 if confirmed.is_empty() {
202 continue
203 }
204
205 if let Err(e) =
207 validator.consensus.purge_unreferenced_trees(&mut registry.new_trees()).await
208 {
209 error!(target: "darkfid::task::garbage_collect::purge_unreferenced_trees", "Purging unreferenced contract trees from the database failed: {e}");
210 }
211
212 let mut notif_blocks = Vec::with_capacity(confirmed.len());
213 for block in confirmed {
214 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
215 }
216 block_sub.notify(JsonValue::Array(notif_blocks)).await;
217
218 gc_task.clone().stop().await;
220 gc_task.clone().start(
221 garbage_collect_task(node.clone()),
222 |res| async {
223 match res {
224 Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { }
225 Err(e) => {
226 error!(target: "darkfid", "Failed starting garbage collection task: {e}")
227 }
228 }
229 },
230 Error::GarbageCollectionTaskStopped,
231 ex.clone(),
232 );
233 }
234}