1use std::collections::HashMap;
20
21use darkfi::{
22 blockchain::HeaderHash, net::ChannelPtr, rpc::jsonrpc::JsonSubscriber, system::sleep,
23 util::encoding::base64, validator::consensus::Proposal, Error, Result,
24};
25use darkfi_serial::serialize_async;
26use rand::{prelude::SliceRandom, rngs::OsRng};
27use tinyjson::JsonValue;
28use tracing::{debug, info, warn};
29
30use crate::{
31 proto::{
32 ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, SyncRequest,
33 SyncResponse, TipRequest, TipResponse, BATCH,
34 },
35 DarkfiNodePtr,
36};
37
38pub async fn sync_task(node: &DarkfiNodePtr, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
43 info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
44
45 let block_sub = node.subscribers.get("blocks").unwrap();
47
48 let mut last = node.validator.blockchain.last()?;
50
51 if let Some(checkpoint) = checkpoint {
53 if checkpoint.0 > last.0 {
54 node.validator.blockchain.headers.remove_all_sync()?;
55 }
56 }
57
58 if let Some(next) = node.validator.blockchain.headers.get_first_sync()? {
60 if next.height == last.0 + 1 {
61 if let Some(last_sync) = node.validator.blockchain.headers.get_last_sync()? {
63 last = (last_sync.height, last_sync.hash());
64 }
65 } else {
66 node.validator.blockchain.headers.remove_all_sync()?;
68 }
69 }
70 info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
71
72 let (mut common_tip_height, common_tip_hash, mut common_tip_peers) =
74 most_common_tip(node, &last.1, checkpoint).await;
75
76 if common_tip_hash == [0u8; 32] {
79 *node.validator.synced.write().await = true;
80 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
81 return Ok(())
82 }
83
84 if let Some(checkpoint) = checkpoint {
86 if checkpoint.0 > last.0 {
87 info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
88 retrieve_headers(node, &common_tip_peers, last.0, checkpoint.0 + 1).await?;
91
92 last = retrieve_blocks(node, &common_tip_peers, last, block_sub, true).await?;
94 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
95
96 (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
98 }
99 }
100
101 loop {
103 retrieve_headers(node, &common_tip_peers, last.0, common_tip_height + 1).await?;
106
107 let last_received =
109 retrieve_blocks(node, &common_tip_peers, last, block_sub, false).await?;
110 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
111
112 if last == last_received {
113 break
114 }
115
116 last = last_received;
117
118 (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
120 }
121
122 sync_best_fork(node, &common_tip_peers, &last.1).await;
124
125 let confirmed = node.validator.confirmation().await?;
127 if !confirmed.is_empty() {
128 let mut notif_blocks = Vec::with_capacity(confirmed.len());
130 for block in confirmed {
131 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
132 }
133 block_sub.notify(JsonValue::Array(notif_blocks)).await;
134 }
135
136 *node.validator.synced.write().await = true;
137 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
138 Ok(())
139}
140
141async fn synced_peers(
144 node: &DarkfiNodePtr,
145 last_tip: &HeaderHash,
146 checkpoint: Option<(u32, HeaderHash)>,
147) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
148 info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
149 let mut tips = HashMap::new();
150 loop {
151 let peers = node.p2p_handler.p2p.hosts().channels();
153
154 for peer in peers {
156 let comms_timeout = node
157 .p2p_handler
158 .p2p
159 .settings()
160 .read_arc()
161 .await
162 .outbound_connect_timeout(peer.address().scheme());
163
164 if let Some(c) = checkpoint {
166 let Ok(response_sub) = peer.subscribe_msg::<HeaderSyncResponse>().await else {
168 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncResponse` communication setup with peer: {peer:?}");
169 continue
170 };
171
172 let request = HeaderSyncRequest { height: c.0 + 1 };
174 if let Err(e) = peer.send(&request).await {
175 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
176 continue
177 };
178
179 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
181 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
182 continue
183 };
184
185 if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1 {
187 debug!(target: "darkfid::task::sync::synced_peers", "Invalid `HeaderSyncResponse` from peer: {peer:?}");
188 continue
189 }
190 }
191
192 let Ok(response_sub) = peer.subscribe_msg::<TipResponse>().await else {
194 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipResponse` communication setup with peer: {peer:?}");
195 continue
196 };
197
198 let request = TipRequest { tip: *last_tip };
200 if let Err(e) = peer.send(&request).await {
201 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipRequest` send to peer {peer:?}: {e}");
202 continue
203 };
204
205 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
207 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `TipResponse` from peer: {peer:?}");
208 continue
209 };
210
211 if response.synced {
213 let tip = match response.height {
217 Some(height) => match response.hash {
218 Some(hash) => (height, *hash.inner()),
219 None => (0, [0u8; 32]),
220 },
221 None => (0, [0u8; 32]),
222 };
223 let Some(tip_peers) = tips.get_mut(&tip) else {
224 tips.insert(tip, vec![peer.clone()]);
225 continue
226 };
227 tip_peers.push(peer.clone());
228 }
229 }
230
231 if !tips.is_empty() {
233 break
234 }
235
236 warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
237 let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
238 let _ = subscription.receive().await;
239 subscription.unsubscribe().await;
240
241 let comms_timeout =
242 node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
243
244 info!(target: "darkfid::task::sync::synced_peers", "Sleeping for {comms_timeout} to allow for more nodes to connect...");
245 sleep(comms_timeout).await;
246 }
247
248 tips
249}
250
251async fn most_common_tip(
253 node: &DarkfiNodePtr,
254 last_tip: &HeaderHash,
255 checkpoint: Option<(u32, HeaderHash)>,
256) -> (u32, [u8; 32], Vec<ChannelPtr>) {
257 let tips = synced_peers(node, last_tip, checkpoint).await;
259
260 info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
262 let mut common_tip = (0, [0u8; 32], vec![]);
263 for (tip, peers) in tips {
264 if peers.len() < common_tip.2.len() {
266 continue;
267 }
268 if peers.len() == common_tip.2.len() || tip.0 < common_tip.0 {
271 continue;
272 }
273 common_tip = (tip.0, tip.1, peers);
275 }
276
277 info!(target: "darkfid::task::sync::most_common_tip", "Most common tip: {} - {}", common_tip.0, HeaderHash::new(common_tip.1));
278 common_tip
279}
280
281async fn retrieve_headers(
283 node: &DarkfiNodePtr,
284 peers: &[ChannelPtr],
285 last_known: u32,
286 tip_height: u32,
287) -> Result<()> {
288 info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers...");
289 let mut peer_subs = vec![];
291 for peer in peers {
292 match peer.subscribe_msg::<HeaderSyncResponse>().await {
293 Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
294 Err(e) => {
295 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncResponse` communication setup with peer {peer:?}: {e}");
296 peer_subs.push((None, true))
297 }
298 }
299 }
300
301 let total = tip_height - last_known - 1;
303 let mut last_tip_height = tip_height;
304 'headers_loop: loop {
305 let mut count = 0;
307 for (peer_sub, failed) in &peer_subs {
308 if peer_sub.is_none() || *failed {
309 count += 1;
310 }
311 }
312 if count == peer_subs.len() {
313 debug!(target: "darkfid::task::sync::retrieve_headers", "All peer connections failed.");
314 break
315 }
316
317 for (index, peer) in peers.iter().enumerate() {
318 let (peer_sub, failed) = &mut peer_subs[index];
320 if *failed {
321 continue;
322 }
323 let Some(ref response_sub) = peer_sub else {
324 continue;
325 };
326
327 let request = HeaderSyncRequest { height: last_tip_height };
329 if let Err(e) = peer.send(&request).await {
330 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
331 *failed = true;
332 continue
333 };
334
335 let comms_timeout = node
336 .p2p_handler
337 .p2p
338 .settings()
339 .read_arc()
340 .await
341 .outbound_connect_timeout(peer.address().scheme());
342
343 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
345 debug!(target: "darkfid::task::sync::retrieve_headers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
346 *failed = true;
347 continue
348 };
349
350 let mut response_headers = response.headers.to_vec();
352 response_headers.retain(|h| h.height > last_known);
353
354 if response_headers.is_empty() {
355 break 'headers_loop
356 }
357
358 node.validator.blockchain.headers.insert_sync(&response_headers)?;
360 last_tip_height = response_headers[0].height;
361 info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{total}", node.validator.blockchain.headers.len_sync());
362 }
363 }
364
365 if node.validator.blockchain.headers.is_empty_sync() {
367 return Ok(());
368 }
369
370 info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence...");
375 let mut verified_headers = 0;
376 let total = node.validator.blockchain.headers.len_sync();
377 let last_known = node.validator.consensus.best_fork_last_header().await?;
380 let mut headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
381 if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
382 node.validator.blockchain.headers.remove_all_sync()?;
383 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
384 }
385 verified_headers += 1;
386 for (index, header) in headers[1..].iter().enumerate() {
387 if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
388 node.validator.blockchain.headers.remove_all_sync()?;
389 return Err(Error::BlockIsInvalid(header.hash().as_string()))
390 }
391 verified_headers += 1;
392 }
393 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
394
395 let mut last_checked = headers.last().unwrap().clone();
397 headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
398 while !headers.is_empty() {
399 if headers[0].previous != last_checked.hash() ||
400 headers[0].height != last_checked.height + 1
401 {
402 node.validator.blockchain.headers.remove_all_sync()?;
403 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
404 }
405 verified_headers += 1;
406 for (index, header) in headers[1..].iter().enumerate() {
407 if header.previous != headers[index].hash() ||
408 header.height != headers[index].height + 1
409 {
410 node.validator.blockchain.headers.remove_all_sync()?;
411 return Err(Error::BlockIsInvalid(header.hash().as_string()))
412 }
413 verified_headers += 1;
414 }
415 last_checked = headers.last().unwrap().clone();
416 headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
417 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
418 }
419
420 info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!");
421 Ok(())
422}
423
424async fn retrieve_blocks(
426 node: &DarkfiNodePtr,
427 peers: &[ChannelPtr],
428 last_known: (u32, HeaderHash),
429 block_sub: &JsonSubscriber,
430 checkpoint_blocks: bool,
431) -> Result<(u32, HeaderHash)> {
432 info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers...");
433 let mut last_received = last_known;
434 let mut peer_subs = vec![];
436 for peer in peers {
437 match peer.subscribe_msg::<SyncResponse>().await {
438 Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
439 Err(e) => {
440 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncResponse` communication setup with peer {peer:?}: {e}");
441 peer_subs.push((None, true))
442 }
443 }
444 }
445
446 let mut received_blocks = 0;
447 let total = node.validator.blockchain.headers.len_sync();
448 'blocks_loop: loop {
449 let mut count = 0;
451 for (peer_sub, failed) in &peer_subs {
452 if peer_sub.is_none() || *failed {
453 count += 1;
454 }
455 }
456 if count == peer_subs.len() {
457 debug!(target: "darkfid::task::sync::retrieve_blocks", "All peer connections failed.");
458 break
459 }
460
461 'peers_loop: for (index, peer) in peers.iter().enumerate() {
462 let (peer_sub, failed) = &mut peer_subs[index];
464 if *failed {
465 continue;
466 }
467 let Some(ref response_sub) = peer_sub else {
468 continue;
469 };
470
471 let headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
473 if headers.is_empty() {
474 break 'blocks_loop
475 }
476 let mut headers_hashes = Vec::with_capacity(headers.len());
477 let mut synced_headers = Vec::with_capacity(headers.len());
478 for header in &headers {
479 headers_hashes.push(header.hash());
480 synced_headers.push(header.height);
481 }
482
483 let request = SyncRequest { headers: headers_hashes.clone() };
485 if let Err(e) = peer.send(&request).await {
486 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncRequest` send to peer {peer:?}: {e}");
487 *failed = true;
488 continue
489 };
490
491 let comms_timeout = node
492 .p2p_handler
493 .p2p
494 .settings()
495 .read_arc()
496 .await
497 .outbound_connect_timeout(peer.address().scheme());
498
499 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
501 debug!(target: "darkfid::task::sync::retrieve_blocks", "Timeout while waiting for `SyncResponse` from peer: {peer:?}");
502 *failed = true;
503 continue
504 };
505
506 debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
508 received_blocks += response.blocks.len();
509 if checkpoint_blocks {
510 if let Err(e) =
511 node.validator.add_checkpoint_blocks(&response.blocks, &headers_hashes).await
512 {
513 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while adding checkpoint blocks: {e}");
514 continue
515 };
516 } else {
517 for block in &response.blocks {
518 if let Err(e) =
519 node.validator.append_proposal(&Proposal::new(block.clone())).await
520 {
521 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while appending proposal: {e}");
522 continue 'peers_loop
523 };
524 }
525 }
526 last_received = (*synced_headers.last().unwrap(), *headers_hashes.last().unwrap());
527
528 node.validator.blockchain.headers.remove_sync(&synced_headers)?;
530
531 if checkpoint_blocks {
532 let mut notif_blocks = Vec::with_capacity(response.blocks.len());
534 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks added:");
535 for (index, block) in response.blocks.iter().enumerate() {
536 info!(target: "darkfid::task::sync::retrieve_blocks", "\t{} - {}", headers_hashes[index], headers[index].height);
537 notif_blocks
538 .push(JsonValue::String(base64::encode(&serialize_async(block).await)));
539 }
540 block_sub.notify(JsonValue::Array(notif_blocks)).await;
541 } else {
542 let confirmed = node.validator.confirmation().await?;
544 if !confirmed.is_empty() {
545 let mut notif_blocks = Vec::with_capacity(confirmed.len());
547 for block in confirmed {
548 notif_blocks.push(JsonValue::String(base64::encode(
549 &serialize_async(&block).await,
550 )));
551 }
552 block_sub.notify(JsonValue::Array(notif_blocks)).await;
553 }
554 }
555
556 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {received_blocks}/{total}");
557 }
558 }
559
560 Ok(last_received)
561}
562
563async fn sync_best_fork(node: &DarkfiNodePtr, peers: &[ChannelPtr], last_tip: &HeaderHash) {
565 info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers...");
566 let peer = &peers.choose(&mut OsRng).unwrap();
568
569 let Ok(response_sub) = peer.subscribe_msg::<ForkSyncResponse>().await else {
571 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncResponse` communication setup with peer: {peer:?}");
572 return
573 };
574 let notif_sub = node.subscribers.get("proposals").unwrap();
575
576 let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
578 if let Err(e) = peer.send(&request).await {
579 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncRequest` send to peer {peer:?}: {e}");
580 return
581 };
582
583 let comms_timeout = node
584 .p2p_handler
585 .p2p
586 .settings()
587 .read_arc()
588 .await
589 .outbound_connect_timeout(peer.address().scheme());
590
591 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
593 debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");
594 return
595 };
596
597 debug!(target: "darkfid::task::sync::sync_best_fork", "Processing received proposals");
599 for proposal in &response.proposals {
600 if let Err(e) = node.validator.append_proposal(proposal).await {
601 debug!(target: "darkfid::task::sync::sync_best_fork", "Error while appending proposal: {e}");
602 return
603 };
604 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
606 notif_sub.notify(vec![enc_prop].into()).await;
607 }
608}