1use std::collections::HashMap;
20
21use darkfi::{
22 blockchain::HeaderHash, net::ChannelPtr, rpc::jsonrpc::JsonSubscriber, system::sleep,
23 util::encoding::base64, validator::consensus::Proposal, Error, Result,
24};
25use darkfi_serial::serialize_async;
26use rand::{prelude::SliceRandom, rngs::OsRng};
27use tinyjson::JsonValue;
28use tracing::{debug, info, warn};
29
30use crate::{
31 proto::{
32 ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, SyncRequest,
33 SyncResponse, TipRequest, TipResponse, BATCH,
34 },
35 DarkfiNodePtr,
36};
37
38pub async fn sync_task(node: &DarkfiNodePtr, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
43 info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
44
45 let block_sub = node.subscribers.get("blocks").unwrap();
47
48 let mut last = node.validator.blockchain.last()?;
50
51 if let Some(checkpoint) = checkpoint {
53 if checkpoint.0 > last.0 {
54 node.validator.blockchain.headers.remove_all_sync()?;
55 }
56 }
57
58 if let Some(next) = node.validator.blockchain.headers.get_first_sync()? {
60 if next.height == last.0 + 1 {
61 if let Some(last_sync) = node.validator.blockchain.headers.get_last_sync()? {
63 last = (last_sync.height, last_sync.hash());
64 }
65 } else {
66 node.validator.blockchain.headers.remove_all_sync()?;
68 }
69 }
70 info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
71
72 let (mut common_tip_height, common_tip_hash, mut common_tip_peers) =
74 most_common_tip(node, &last.1, checkpoint).await;
75
76 if common_tip_hash == [0u8; 32] {
79 *node.validator.synced.write().await = true;
80 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
81 return Ok(())
82 }
83
84 if let Some(checkpoint) = checkpoint {
86 if checkpoint.0 > last.0 {
87 info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
88 retrieve_headers(node, &common_tip_peers, last.0, checkpoint.0 + 1).await?;
91
92 last = retrieve_blocks(node, &common_tip_peers, last, block_sub, true).await?;
94 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
95
96 (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
98 }
99 }
100
101 loop {
103 retrieve_headers(node, &common_tip_peers, last.0, common_tip_height + 1).await?;
106
107 let last_received =
109 retrieve_blocks(node, &common_tip_peers, last, block_sub, false).await?;
110 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
111
112 if last == last_received {
113 break
114 }
115
116 last = last_received;
117
118 (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
120 }
121
122 sync_best_fork(node, &common_tip_peers, &last.1).await;
124
125 let confirmed = node.validator.confirmation().await?;
127 if !confirmed.is_empty() {
128 let mut notif_blocks = Vec::with_capacity(confirmed.len());
130 for block in confirmed {
131 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
132 }
133 block_sub.notify(JsonValue::Array(notif_blocks)).await;
134 }
135
136 *node.validator.synced.write().await = true;
137 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
138 Ok(())
139}
140
141async fn synced_peers(
144 node: &DarkfiNodePtr,
145 last_tip: &HeaderHash,
146 checkpoint: Option<(u32, HeaderHash)>,
147) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
148 info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
149 let mut tips = HashMap::new();
150 loop {
151 let peers = node.p2p_handler.p2p.hosts().channels();
153
154 for peer in peers {
156 let comms_timeout = node
157 .p2p_handler
158 .p2p
159 .settings()
160 .read_arc()
161 .await
162 .outbound_connect_timeout(peer.address().scheme());
163
164 if let Some(c) = checkpoint {
166 let Ok(response_sub) = peer.subscribe_msg::<HeaderSyncResponse>().await else {
168 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncResponse` communication setup with peer: {peer:?}");
169 continue
170 };
171
172 let request = HeaderSyncRequest { height: c.0 + 1 };
174 if let Err(e) = peer.send(&request).await {
175 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
176 continue
177 };
178
179 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
181 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
182 continue
183 };
184
185 if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1 {
187 debug!(target: "darkfid::task::sync::synced_peers", "Invalid `HeaderSyncResponse` from peer: {peer:?}");
188 continue
189 }
190 }
191
192 let Ok(response_sub) = peer.subscribe_msg::<TipResponse>().await else {
194 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipResponse` communication setup with peer: {peer:?}");
195 continue
196 };
197
198 let request = TipRequest { tip: *last_tip };
200 if let Err(e) = peer.send(&request).await {
201 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipRequest` send to peer {peer:?}: {e}");
202 continue
203 };
204
205 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
207 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `TipResponse` from peer: {peer:?}");
208 continue
209 };
210
211 if response.synced {
213 let tip = if response.height.is_some() && response.hash.is_some() {
215 (response.height.unwrap(), *response.hash.unwrap().inner())
216 } else {
217 (0, [0u8; 32])
221 };
222 let Some(tip_peers) = tips.get_mut(&tip) else {
223 tips.insert(tip, vec![peer.clone()]);
224 continue
225 };
226 tip_peers.push(peer.clone());
227 }
228 }
229
230 if !tips.is_empty() {
232 break
233 }
234
235 warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
236 let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
237 let _ = subscription.receive().await;
238 subscription.unsubscribe().await;
239
240 let comms_timeout =
241 node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
242
243 info!(target: "darkfid::task::sync::synced_peers", "Sleeping for {comms_timeout} to allow for more nodes to connect...");
244 sleep(comms_timeout).await;
245 }
246
247 tips
248}
249
250async fn most_common_tip(
252 node: &DarkfiNodePtr,
253 last_tip: &HeaderHash,
254 checkpoint: Option<(u32, HeaderHash)>,
255) -> (u32, [u8; 32], Vec<ChannelPtr>) {
256 let tips = synced_peers(node, last_tip, checkpoint).await;
258
259 info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
261 let mut common_tip = (0, [0u8; 32], vec![]);
262 for (tip, peers) in tips {
263 if peers.len() < common_tip.2.len() {
265 continue;
266 }
267 if peers.len() == common_tip.2.len() || tip.0 < common_tip.0 {
270 continue;
271 }
272 common_tip = (tip.0, tip.1, peers);
274 }
275
276 info!(target: "darkfid::task::sync::most_common_tip", "Most common tip: {} - {}", common_tip.0, HeaderHash::new(common_tip.1));
277 common_tip
278}
279
280async fn retrieve_headers(
282 node: &DarkfiNodePtr,
283 peers: &[ChannelPtr],
284 last_known: u32,
285 tip_height: u32,
286) -> Result<()> {
287 info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers...");
288 let mut peer_subs = vec![];
290 for peer in peers {
291 match peer.subscribe_msg::<HeaderSyncResponse>().await {
292 Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
293 Err(e) => {
294 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncResponse` communication setup with peer {peer:?}: {e}");
295 peer_subs.push((None, true))
296 }
297 }
298 }
299
300 let total = tip_height - last_known - 1;
302 let mut last_tip_height = tip_height;
303 'headers_loop: loop {
304 let mut count = 0;
306 for (peer_sub, failed) in &peer_subs {
307 if peer_sub.is_none() || *failed {
308 count += 1;
309 }
310 }
311 if count == peer_subs.len() {
312 debug!(target: "darkfid::task::sync::retrieve_headers", "All peer connections failed.");
313 break
314 }
315
316 for (index, peer) in peers.iter().enumerate() {
317 let (peer_sub, failed) = &mut peer_subs[index];
319 if *failed {
320 continue;
321 }
322 let Some(ref response_sub) = peer_sub else {
323 continue;
324 };
325
326 let request = HeaderSyncRequest { height: last_tip_height };
328 if let Err(e) = peer.send(&request).await {
329 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
330 *failed = true;
331 continue
332 };
333
334 let comms_timeout = node
335 .p2p_handler
336 .p2p
337 .settings()
338 .read_arc()
339 .await
340 .outbound_connect_timeout(peer.address().scheme());
341
342 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
344 debug!(target: "darkfid::task::sync::retrieve_headers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
345 *failed = true;
346 continue
347 };
348
349 let mut response_headers = response.headers.to_vec();
351 response_headers.retain(|h| h.height > last_known);
352
353 if response_headers.is_empty() {
354 break 'headers_loop
355 }
356
357 node.validator.blockchain.headers.insert_sync(&response_headers)?;
359 last_tip_height = response_headers[0].height;
360 info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{total}", node.validator.blockchain.headers.len_sync());
361 }
362 }
363
364 if node.validator.blockchain.headers.is_empty_sync() {
366 return Ok(());
367 }
368
369 info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence...");
374 let mut verified_headers = 0;
375 let total = node.validator.blockchain.headers.len_sync();
376 let last_known = node.validator.consensus.best_fork_last_header().await?;
379 let mut headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
380 if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
381 node.validator.blockchain.headers.remove_all_sync()?;
382 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
383 }
384 verified_headers += 1;
385 for (index, header) in headers[1..].iter().enumerate() {
386 if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
387 node.validator.blockchain.headers.remove_all_sync()?;
388 return Err(Error::BlockIsInvalid(header.hash().as_string()))
389 }
390 verified_headers += 1;
391 }
392 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
393
394 let mut last_checked = headers.last().unwrap().clone();
396 headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
397 while !headers.is_empty() {
398 if headers[0].previous != last_checked.hash() ||
399 headers[0].height != last_checked.height + 1
400 {
401 node.validator.blockchain.headers.remove_all_sync()?;
402 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
403 }
404 verified_headers += 1;
405 for (index, header) in headers[1..].iter().enumerate() {
406 if header.previous != headers[index].hash() ||
407 header.height != headers[index].height + 1
408 {
409 node.validator.blockchain.headers.remove_all_sync()?;
410 return Err(Error::BlockIsInvalid(header.hash().as_string()))
411 }
412 verified_headers += 1;
413 }
414 last_checked = headers.last().unwrap().clone();
415 headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
416 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
417 }
418
419 info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!");
420 Ok(())
421}
422
423async fn retrieve_blocks(
425 node: &DarkfiNodePtr,
426 peers: &[ChannelPtr],
427 last_known: (u32, HeaderHash),
428 block_sub: &JsonSubscriber,
429 checkpoint_blocks: bool,
430) -> Result<(u32, HeaderHash)> {
431 info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers...");
432 let mut last_received = last_known;
433 let mut peer_subs = vec![];
435 for peer in peers {
436 match peer.subscribe_msg::<SyncResponse>().await {
437 Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
438 Err(e) => {
439 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncResponse` communication setup with peer {peer:?}: {e}");
440 peer_subs.push((None, true))
441 }
442 }
443 }
444
445 let mut received_blocks = 0;
446 let total = node.validator.blockchain.headers.len_sync();
447 'blocks_loop: loop {
448 let mut count = 0;
450 for (peer_sub, failed) in &peer_subs {
451 if peer_sub.is_none() || *failed {
452 count += 1;
453 }
454 }
455 if count == peer_subs.len() {
456 debug!(target: "darkfid::task::sync::retrieve_blocks", "All peer connections failed.");
457 break
458 }
459
460 'peers_loop: for (index, peer) in peers.iter().enumerate() {
461 let (peer_sub, failed) = &mut peer_subs[index];
463 if *failed {
464 continue;
465 }
466 let Some(ref response_sub) = peer_sub else {
467 continue;
468 };
469
470 let headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
472 if headers.is_empty() {
473 break 'blocks_loop
474 }
475 let mut headers_hashes = Vec::with_capacity(headers.len());
476 let mut synced_headers = Vec::with_capacity(headers.len());
477 for header in &headers {
478 headers_hashes.push(header.hash());
479 synced_headers.push(header.height);
480 }
481
482 let request = SyncRequest { headers: headers_hashes.clone() };
484 if let Err(e) = peer.send(&request).await {
485 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncRequest` send to peer {peer:?}: {e}");
486 *failed = true;
487 continue
488 };
489
490 let comms_timeout = node
491 .p2p_handler
492 .p2p
493 .settings()
494 .read_arc()
495 .await
496 .outbound_connect_timeout(peer.address().scheme());
497
498 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
500 debug!(target: "darkfid::task::sync::retrieve_blocks", "Timeout while waiting for `SyncResponse` from peer: {peer:?}");
501 *failed = true;
502 continue
503 };
504
505 debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
507 received_blocks += response.blocks.len();
508 if checkpoint_blocks {
509 if let Err(e) =
510 node.validator.add_checkpoint_blocks(&response.blocks, &headers_hashes).await
511 {
512 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while adding checkpoint blocks: {e}");
513 continue
514 };
515 } else {
516 for block in &response.blocks {
517 if let Err(e) =
518 node.validator.append_proposal(&Proposal::new(block.clone())).await
519 {
520 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while appending proposal: {e}");
521 continue 'peers_loop
522 };
523 }
524 }
525 last_received = (*synced_headers.last().unwrap(), *headers_hashes.last().unwrap());
526
527 node.validator.blockchain.headers.remove_sync(&synced_headers)?;
529
530 if checkpoint_blocks {
531 let mut notif_blocks = Vec::with_capacity(response.blocks.len());
533 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks added:");
534 for (index, block) in response.blocks.iter().enumerate() {
535 info!(target: "darkfid::task::sync::retrieve_blocks", "\t{} - {}", headers_hashes[index], headers[index].height);
536 notif_blocks
537 .push(JsonValue::String(base64::encode(&serialize_async(block).await)));
538 }
539 block_sub.notify(JsonValue::Array(notif_blocks)).await;
540 } else {
541 let confirmed = node.validator.confirmation().await?;
543 if !confirmed.is_empty() {
544 let mut notif_blocks = Vec::with_capacity(confirmed.len());
546 for block in confirmed {
547 notif_blocks.push(JsonValue::String(base64::encode(
548 &serialize_async(&block).await,
549 )));
550 }
551 block_sub.notify(JsonValue::Array(notif_blocks)).await;
552 }
553 }
554
555 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {received_blocks}/{total}");
556 }
557 }
558
559 Ok(last_received)
560}
561
562async fn sync_best_fork(node: &DarkfiNodePtr, peers: &[ChannelPtr], last_tip: &HeaderHash) {
564 info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers...");
565 let peer = &peers.choose(&mut OsRng).unwrap();
567
568 let Ok(response_sub) = peer.subscribe_msg::<ForkSyncResponse>().await else {
570 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncResponse` communication setup with peer: {peer:?}");
571 return
572 };
573 let notif_sub = node.subscribers.get("proposals").unwrap();
574
575 let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
577 if let Err(e) = peer.send(&request).await {
578 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncRequest` send to peer {peer:?}: {e}");
579 return
580 };
581
582 let comms_timeout = node
583 .p2p_handler
584 .p2p
585 .settings()
586 .read_arc()
587 .await
588 .outbound_connect_timeout(peer.address().scheme());
589
590 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
592 debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");
593 return
594 };
595
596 debug!(target: "darkfid::task::sync::sync_best_fork", "Processing received proposals");
598 for proposal in &response.proposals {
599 if let Err(e) = node.validator.append_proposal(proposal).await {
600 debug!(target: "darkfid::task::sync::sync_best_fork", "Error while appending proposal: {e}");
601 return
602 };
603 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
605 notif_sub.notify(vec![enc_prop].into()).await;
606 }
607}