1use std::collections::HashMap;
20
21use darkfi::{
22 blockchain::HeaderHash, net::ChannelPtr, rpc::jsonrpc::JsonSubscriber, system::sleep,
23 util::encoding::base64, validator::consensus::Proposal, Error, Result,
24};
25use darkfi_serial::serialize_async;
26use rand::{prelude::SliceRandom, rngs::OsRng};
27use tinyjson::JsonValue;
28use tracing::{debug, info, warn};
29
30use crate::{
31 proto::{
32 ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, SyncRequest,
33 SyncResponse, TipRequest, TipResponse, BATCH,
34 },
35 DarkfiNodePtr,
36};
37
38pub async fn sync_task(node: &DarkfiNodePtr, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
43 info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
44
45 let block_sub = node.subscribers.get("blocks").unwrap();
47
48 let validator = node.validator.read().await;
50 let mut last = validator.blockchain.last()?;
51
52 if let Some(checkpoint) = checkpoint {
54 if checkpoint.0 > last.0 {
55 validator.blockchain.headers.remove_all_sync()?;
56 }
57 }
58
59 if let Some(next) = validator.blockchain.headers.get_first_sync()? {
61 if next.height == last.0 + 1 {
62 if let Some(last_sync) = validator.blockchain.headers.get_last_sync()? {
64 last = (last_sync.height, last_sync.hash());
65 }
66 } else {
67 validator.blockchain.headers.remove_all_sync()?;
69 }
70 }
71 drop(validator);
72 info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
73
74 let (mut common_tip_height, common_tip_hash, mut common_tip_peers) =
76 most_common_tip(node, &last.1, checkpoint).await;
77
78 if common_tip_hash == [0u8; 32] {
81 node.validator.write().await.synced = true;
82 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
83 return Ok(())
84 }
85
86 if let Some(checkpoint) = checkpoint {
88 if checkpoint.0 > last.0 {
89 info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
90 retrieve_headers(node, &common_tip_peers, last, checkpoint.0 + 1).await?;
93
94 last = retrieve_blocks(node, &common_tip_peers, last, block_sub, true).await?;
96 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
97
98 (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
100 }
101 }
102
103 loop {
105 retrieve_headers(node, &common_tip_peers, last, common_tip_height + 1).await?;
108
109 let last_received =
111 retrieve_blocks(node, &common_tip_peers, last, block_sub, false).await?;
112 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
113
114 if last == last_received {
115 break
116 }
117
118 last = last_received;
119
120 (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
122 }
123
124 sync_best_fork(node, &common_tip_peers, &last.1).await;
126
127 let mut validator = node.validator.write().await;
129 let confirmed = validator.confirmation().await?;
130 if !confirmed.is_empty() {
131 let mut notif_blocks = Vec::with_capacity(confirmed.len());
133 for block in confirmed {
134 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
135 }
136 block_sub.notify(JsonValue::Array(notif_blocks)).await;
137 }
138
139 validator.synced = true;
140 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
141 Ok(())
142}
143
144async fn synced_peers(
147 node: &DarkfiNodePtr,
148 last_tip: &HeaderHash,
149 checkpoint: Option<(u32, HeaderHash)>,
150) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
151 info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
152 let mut tips = HashMap::new();
153 loop {
154 let peers = node.p2p_handler.p2p.hosts().channels();
156
157 for peer in peers {
159 let comms_timeout = node
160 .p2p_handler
161 .p2p
162 .settings()
163 .read_arc()
164 .await
165 .outbound_connect_timeout(peer.address().scheme());
166
167 if let Some(c) = checkpoint {
169 let Ok(response_sub) = peer.subscribe_msg::<HeaderSyncResponse>().await else {
171 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncResponse` communication setup with peer: {peer:?}");
172 continue
173 };
174
175 let request = HeaderSyncRequest { height: c.0 + 1 };
177 if let Err(e) = peer.send(&request).await {
178 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
179 continue
180 };
181
182 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
184 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
185 continue
186 };
187
188 if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1 {
190 debug!(target: "darkfid::task::sync::synced_peers", "Invalid `HeaderSyncResponse` from peer: {peer:?}");
191 continue
192 }
193 }
194
195 let Ok(response_sub) = peer.subscribe_msg::<TipResponse>().await else {
197 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipResponse` communication setup with peer: {peer:?}");
198 continue
199 };
200
201 let request = TipRequest { tip: *last_tip };
203 if let Err(e) = peer.send(&request).await {
204 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipRequest` send to peer {peer:?}: {e}");
205 continue
206 };
207
208 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
210 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `TipResponse` from peer: {peer:?}");
211 continue
212 };
213
214 if response.synced {
216 let tip = match response.height {
220 Some(height) => match response.hash {
221 Some(hash) => (height, *hash.inner()),
222 None => (0, [0u8; 32]),
223 },
224 None => (0, [0u8; 32]),
225 };
226 let Some(tip_peers) = tips.get_mut(&tip) else {
227 tips.insert(tip, vec![peer.clone()]);
228 continue
229 };
230 tip_peers.push(peer.clone());
231 }
232 }
233
234 if !tips.is_empty() {
236 break
237 }
238
239 warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
240 let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
241 let _ = subscription.receive().await;
242 subscription.unsubscribe().await;
243
244 let comms_timeout =
245 node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
246
247 info!(target: "darkfid::task::sync::synced_peers", "Sleeping for {comms_timeout} to allow for more nodes to connect...");
248 sleep(comms_timeout).await;
249 }
250
251 tips
252}
253
254async fn most_common_tip(
256 node: &DarkfiNodePtr,
257 last_tip: &HeaderHash,
258 checkpoint: Option<(u32, HeaderHash)>,
259) -> (u32, [u8; 32], Vec<ChannelPtr>) {
260 let tips = synced_peers(node, last_tip, checkpoint).await;
262
263 info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
265 let mut common_tip = (0, [0u8; 32], vec![]);
266 for (tip, peers) in tips {
267 if peers.len() < common_tip.2.len() {
269 continue;
270 }
271 if peers.len() == common_tip.2.len() || tip.0 < common_tip.0 {
274 continue;
275 }
276 common_tip = (tip.0, tip.1, peers);
278 }
279
280 info!(target: "darkfid::task::sync::most_common_tip", "Most common tip: {} - {}", common_tip.0, HeaderHash::new(common_tip.1));
281 common_tip
282}
283
284async fn retrieve_headers(
286 node: &DarkfiNodePtr,
287 peers: &[ChannelPtr],
288 last_known: (u32, HeaderHash),
289 tip_height: u32,
290) -> Result<()> {
291 info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers...");
292 let mut peer_subs = vec![];
294 for peer in peers {
295 match peer.subscribe_msg::<HeaderSyncResponse>().await {
296 Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
297 Err(e) => {
298 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncResponse` communication setup with peer {peer:?}: {e}");
299 peer_subs.push((None, true))
300 }
301 }
302 }
303
304 let total = tip_height - last_known.0 - 1;
306 let mut last_tip_height = tip_height;
307 let validator = node.validator.read().await;
308 'headers_loop: loop {
309 let mut count = 0;
311 for (peer_sub, failed) in &peer_subs {
312 if peer_sub.is_none() || *failed {
313 count += 1;
314 }
315 }
316 if count == peer_subs.len() {
317 debug!(target: "darkfid::task::sync::retrieve_headers", "All peer connections failed.");
318 break
319 }
320
321 for (index, peer) in peers.iter().enumerate() {
322 let (peer_sub, failed) = &mut peer_subs[index];
324 if *failed {
325 continue;
326 }
327 let Some(ref response_sub) = peer_sub else {
328 continue;
329 };
330
331 let request = HeaderSyncRequest { height: last_tip_height };
333 if let Err(e) = peer.send(&request).await {
334 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
335 *failed = true;
336 continue
337 };
338
339 let comms_timeout = node
340 .p2p_handler
341 .p2p
342 .settings()
343 .read_arc()
344 .await
345 .outbound_connect_timeout(peer.address().scheme());
346
347 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
349 debug!(target: "darkfid::task::sync::retrieve_headers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
350 *failed = true;
351 continue
352 };
353
354 let mut response_headers = response.headers.to_vec();
356 response_headers.retain(|h| h.height > last_known.0);
357
358 if response_headers.is_empty() {
359 break 'headers_loop
360 }
361
362 validator.blockchain.headers.insert_sync(&response_headers)?;
364 last_tip_height = response_headers[0].height;
365 info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{total}", validator.blockchain.headers.len_sync());
366 }
367 }
368
369 if validator.blockchain.headers.is_empty_sync() {
371 return Ok(());
372 }
373
374 info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence...");
379 let mut verified_headers = 0;
380 let total = validator.blockchain.headers.len_sync();
381 let mut headers = validator.blockchain.headers.get_after_sync(0, BATCH)?;
384 if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
385 validator.blockchain.headers.remove_all_sync()?;
386 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
387 }
388 verified_headers += 1;
389 for (index, header) in headers[1..].iter().enumerate() {
390 if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
391 return Err(Error::BlockIsInvalid(header.hash().as_string()))
392 }
393 verified_headers += 1;
394 }
395 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
396
397 let mut last_checked = headers.last().unwrap().clone();
399 headers = validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
400 while !headers.is_empty() {
401 if headers[0].previous != last_checked.hash() ||
402 headers[0].height != last_checked.height + 1
403 {
404 validator.blockchain.headers.remove_all_sync()?;
405 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
406 }
407 verified_headers += 1;
408 for (index, header) in headers[1..].iter().enumerate() {
409 if header.previous != headers[index].hash() ||
410 header.height != headers[index].height + 1
411 {
412 validator.blockchain.headers.remove_all_sync()?;
413 return Err(Error::BlockIsInvalid(header.hash().as_string()))
414 }
415 verified_headers += 1;
416 }
417 last_checked = headers.last().unwrap().clone();
418 headers = validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
419 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
420 }
421
422 info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!");
423 Ok(())
424}
425
426async fn retrieve_blocks(
428 node: &DarkfiNodePtr,
429 peers: &[ChannelPtr],
430 last_known: (u32, HeaderHash),
431 block_sub: &JsonSubscriber,
432 checkpoint_blocks: bool,
433) -> Result<(u32, HeaderHash)> {
434 info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers...");
435 let mut last_received = last_known;
436 let mut peer_subs = vec![];
438 for peer in peers {
439 match peer.subscribe_msg::<SyncResponse>().await {
440 Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
441 Err(e) => {
442 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncResponse` communication setup with peer {peer:?}: {e}");
443 peer_subs.push((None, true))
444 }
445 }
446 }
447
448 let mut received_blocks = 0;
449 let mut validator = node.validator.write().await;
450 let total = validator.blockchain.headers.len_sync();
451 'blocks_loop: loop {
452 let mut count = 0;
454 for (peer_sub, failed) in &peer_subs {
455 if peer_sub.is_none() || *failed {
456 count += 1;
457 }
458 }
459 if count == peer_subs.len() {
460 debug!(target: "darkfid::task::sync::retrieve_blocks", "All peer connections failed.");
461 break
462 }
463
464 'peers_loop: for (index, peer) in peers.iter().enumerate() {
465 let (peer_sub, failed) = &mut peer_subs[index];
467 if *failed {
468 continue;
469 }
470 let Some(ref response_sub) = peer_sub else {
471 continue;
472 };
473
474 let headers = validator.blockchain.headers.get_after_sync(0, BATCH)?;
476 if headers.is_empty() {
477 break 'blocks_loop
478 }
479 let mut headers_hashes = Vec::with_capacity(headers.len());
480 let mut synced_headers = Vec::with_capacity(headers.len());
481 for header in &headers {
482 headers_hashes.push(header.hash());
483 synced_headers.push(header.height);
484 }
485
486 let request = SyncRequest { headers: headers_hashes.clone() };
488 if let Err(e) = peer.send(&request).await {
489 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncRequest` send to peer {peer:?}: {e}");
490 *failed = true;
491 continue
492 };
493
494 let comms_timeout = node
495 .p2p_handler
496 .p2p
497 .settings()
498 .read_arc()
499 .await
500 .outbound_connect_timeout(peer.address().scheme());
501
502 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
504 debug!(target: "darkfid::task::sync::retrieve_blocks", "Timeout while waiting for `SyncResponse` from peer: {peer:?}");
505 *failed = true;
506 continue
507 };
508
509 debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
511 received_blocks += response.blocks.len();
512 if checkpoint_blocks {
513 if let Err(e) =
514 validator.add_checkpoint_blocks(&response.blocks, &headers_hashes).await
515 {
516 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while adding checkpoint blocks: {e}");
517 continue
518 };
519 } else {
520 for block in &response.blocks {
521 match validator.append_proposal(&Proposal::new(block.clone())).await {
522 Ok(()) | Err(Error::ProposalAlreadyExists) => continue,
523 Err(e) => {
524 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while appending proposal: {e}");
525 continue 'peers_loop
526 }
527 }
528 }
529 }
530 last_received = (*synced_headers.last().unwrap(), *headers_hashes.last().unwrap());
531
532 validator.blockchain.headers.remove_sync(&synced_headers)?;
534
535 if checkpoint_blocks {
536 let mut notif_blocks = Vec::with_capacity(response.blocks.len());
538 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks added:");
539 for (index, block) in response.blocks.iter().enumerate() {
540 info!(target: "darkfid::task::sync::retrieve_blocks", "\t{} - {}", headers_hashes[index], headers[index].height);
541 notif_blocks
542 .push(JsonValue::String(base64::encode(&serialize_async(block).await)));
543 }
544 block_sub.notify(JsonValue::Array(notif_blocks)).await;
545 } else {
546 let confirmed = validator.confirmation().await?;
548 if !confirmed.is_empty() {
549 let mut notif_blocks = Vec::with_capacity(confirmed.len());
551 for block in confirmed {
552 notif_blocks.push(JsonValue::String(base64::encode(
553 &serialize_async(&block).await,
554 )));
555 }
556 block_sub.notify(JsonValue::Array(notif_blocks)).await;
557 }
558 }
559
560 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {received_blocks}/{total}");
561 }
562 }
563
564 Ok(last_received)
565}
566
567async fn sync_best_fork(node: &DarkfiNodePtr, peers: &[ChannelPtr], last_tip: &HeaderHash) {
569 info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers...");
570 let peer = &peers.choose(&mut OsRng).unwrap();
572
573 let Ok(response_sub) = peer.subscribe_msg::<ForkSyncResponse>().await else {
575 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncResponse` communication setup with peer: {peer:?}");
576 return
577 };
578 let notif_sub = node.subscribers.get("proposals").unwrap();
579
580 let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
582 if let Err(e) = peer.send(&request).await {
583 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncRequest` send to peer {peer:?}: {e}");
584 return
585 };
586
587 let comms_timeout = node
588 .p2p_handler
589 .p2p
590 .settings()
591 .read_arc()
592 .await
593 .outbound_connect_timeout(peer.address().scheme());
594
595 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
597 debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");
598 return
599 };
600
601 debug!(target: "darkfid::task::sync::sync_best_fork", "Processing received proposals");
603 let mut validator = node.validator.write().await;
604 for proposal in &response.proposals {
605 if let Err(e) = validator.append_proposal(proposal).await {
606 debug!(target: "darkfid::task::sync::sync_best_fork", "Error while appending proposal: {e}");
607 return
608 };
609 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
611 notif_sub.notify(vec![enc_prop].into()).await;
612 }
613}