darkfid/task/
sync.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::collections::HashMap;
20
21use darkfi::{
22    blockchain::HeaderHash, net::ChannelPtr, rpc::jsonrpc::JsonSubscriber, system::sleep,
23    util::encoding::base64, validator::consensus::Proposal, Error, Result,
24};
25use darkfi_serial::serialize_async;
26use rand::{prelude::SliceRandom, rngs::OsRng};
27use tinyjson::JsonValue;
28use tracing::{debug, info, warn};
29
30use crate::{
31    proto::{
32        ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, SyncRequest,
33        SyncResponse, TipRequest, TipResponse, BATCH,
34    },
35    DarkfiNodePtr,
36};
37
38// TODO: Parallelize independent requests.
39//       We can also make them be like torrents, where we retrieve chunks not in order.
40/// async task used for block syncing.
41/// A checkpoint can be provided to ensure node syncs the correct sequence.
42pub async fn sync_task(node: &DarkfiNodePtr, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
43    info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
44
45    // Grab blocks subscriber
46    let block_sub = node.subscribers.get("blocks").unwrap();
47
48    // Grab last known block header, including existing pending sync ones
49    let mut last = node.validator.blockchain.last()?;
50
51    // If checkpoint is not reached, purge headers and start syncing from scratch
52    if let Some(checkpoint) = checkpoint {
53        if checkpoint.0 > last.0 {
54            node.validator.blockchain.headers.remove_all_sync()?;
55        }
56    }
57
58    // Check sync headers first record is the next one
59    if let Some(next) = node.validator.blockchain.headers.get_first_sync()? {
60        if next.height == last.0 + 1 {
61            // Grab last sync header to continue syncing from
62            if let Some(last_sync) = node.validator.blockchain.headers.get_last_sync()? {
63                last = (last_sync.height, last_sync.hash());
64            }
65        } else {
66            // Purge headers and start syncing from scratch
67            node.validator.blockchain.headers.remove_all_sync()?;
68        }
69    }
70    info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
71
72    // Grab the most common tip and the corresponding peers
73    let (mut common_tip_height, common_tip_hash, mut common_tip_peers) =
74        most_common_tip(node, &last.1, checkpoint).await;
75
76    // If the most common tip is the empty tip, we skip syncing
77    // further and will reorg if needed when a new proposal arrives.
78    if common_tip_hash == [0u8; 32] {
79        *node.validator.synced.write().await = true;
80        info!(target: "darkfid::task::sync_task", "Blockchain synced!");
81        return Ok(())
82    }
83
84    // If last known block header is before the checkpoint, we sync until that first.
85    if let Some(checkpoint) = checkpoint {
86        if checkpoint.0 > last.0 {
87            info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
88            // Retrieve all the headers backwards until our last known one and verify them.
89            // We use the next height, in order to also retrieve the checkpoint header.
90            retrieve_headers(node, &common_tip_peers, last.0, checkpoint.0 + 1).await?;
91
92            // Retrieve all the blocks for those headers and apply them to canonical
93            last = retrieve_blocks(node, &common_tip_peers, last, block_sub, true).await?;
94            info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
95
96            // Grab synced peers most common tip again
97            (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
98        }
99    }
100
101    // Sync headers and blocks
102    loop {
103        // Retrieve all the headers backwards until our last known one and verify them.
104        // We use the next height, in order to also retrieve the peers tip header.
105        retrieve_headers(node, &common_tip_peers, last.0, common_tip_height + 1).await?;
106
107        // Retrieve all the blocks for those headers and apply them to canonical
108        let last_received =
109            retrieve_blocks(node, &common_tip_peers, last, block_sub, false).await?;
110        info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
111
112        if last == last_received {
113            break
114        }
115
116        last = last_received;
117
118        // Grab synced peers most common tip again
119        (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
120    }
121
122    // Sync best fork
123    sync_best_fork(node, &common_tip_peers, &last.1).await;
124
125    // Perform confirmation
126    let confirmed = node.validator.confirmation().await?;
127    if !confirmed.is_empty() {
128        // Notify subscriber
129        let mut notif_blocks = Vec::with_capacity(confirmed.len());
130        for block in confirmed {
131            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
132        }
133        block_sub.notify(JsonValue::Array(notif_blocks)).await;
134    }
135
136    *node.validator.synced.write().await = true;
137    info!(target: "darkfid::task::sync_task", "Blockchain synced!");
138    Ok(())
139}
140
141/// Auxiliary function to block until node is connected to at least one synced peer,
142/// and retrieve the synced peers tips.
143async fn synced_peers(
144    node: &DarkfiNodePtr,
145    last_tip: &HeaderHash,
146    checkpoint: Option<(u32, HeaderHash)>,
147) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
148    info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
149    let mut tips = HashMap::new();
150    loop {
151        // Grab channels
152        let peers = node.p2p_handler.p2p.hosts().channels();
153
154        // Ask each peer(if we got any) if they are synced
155        for peer in peers {
156            let comms_timeout = node
157                .p2p_handler
158                .p2p
159                .settings()
160                .read_arc()
161                .await
162                .outbound_connect_timeout(peer.address().scheme());
163
164            // If a checkpoint was provider, we check that the peer follows that sequence
165            if let Some(c) = checkpoint {
166                // Communication setup
167                let Ok(response_sub) = peer.subscribe_msg::<HeaderSyncResponse>().await else {
168                    debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncResponse` communication setup with peer: {peer:?}");
169                    continue
170                };
171
172                // Node creates a `HeaderSyncRequest` and sends it
173                let request = HeaderSyncRequest { height: c.0 + 1 };
174                if let Err(e) = peer.send(&request).await {
175                    debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
176                    continue
177                };
178
179                // Node waits for response
180                let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
181                    debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
182                    continue
183                };
184
185                // Handle response
186                if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1 {
187                    debug!(target: "darkfid::task::sync::synced_peers", "Invalid `HeaderSyncResponse` from peer: {peer:?}");
188                    continue
189                }
190            }
191
192            // Communication setup
193            let Ok(response_sub) = peer.subscribe_msg::<TipResponse>().await else {
194                debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipResponse` communication setup with peer: {peer:?}");
195                continue
196            };
197
198            // Node creates a `TipRequest` and sends it
199            let request = TipRequest { tip: *last_tip };
200            if let Err(e) = peer.send(&request).await {
201                debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipRequest` send to peer {peer:?}: {e}");
202                continue
203            };
204
205            // Node waits for response
206            let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
207                debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `TipResponse` from peer: {peer:?}");
208                continue
209            };
210
211            // Handle response
212            if response.synced {
213                // Grab response tip. Empty response while synced means
214                // the peer is on an entirely different chain/fork, so
215                // we keep track of them in the empty tip reference.
216                let tip = match response.height {
217                    Some(height) => match response.hash {
218                        Some(hash) => (height, *hash.inner()),
219                        None => (0, [0u8; 32]),
220                    },
221                    None => (0, [0u8; 32]),
222                };
223                let Some(tip_peers) = tips.get_mut(&tip) else {
224                    tips.insert(tip, vec![peer.clone()]);
225                    continue
226                };
227                tip_peers.push(peer.clone());
228            }
229        }
230
231        // Check if we got any tips
232        if !tips.is_empty() {
233            break
234        }
235
236        warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
237        let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
238        let _ = subscription.receive().await;
239        subscription.unsubscribe().await;
240
241        let comms_timeout =
242            node.p2p_handler.p2p.settings().read_arc().await.outbound_connect_timeout_max();
243
244        info!(target: "darkfid::task::sync::synced_peers", "Sleeping for {comms_timeout} to allow for more nodes to connect...");
245        sleep(comms_timeout).await;
246    }
247
248    tips
249}
250
251/// Auxiliary function to ask all peers for their current tip and find the most common one.
252async fn most_common_tip(
253    node: &DarkfiNodePtr,
254    last_tip: &HeaderHash,
255    checkpoint: Option<(u32, HeaderHash)>,
256) -> (u32, [u8; 32], Vec<ChannelPtr>) {
257    // Grab synced peers tips
258    let tips = synced_peers(node, last_tip, checkpoint).await;
259
260    // Grab the most common highest tip peers
261    info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
262    let mut common_tip = (0, [0u8; 32], vec![]);
263    for (tip, peers) in tips {
264        // Check if tip peers is less than the most common tip peers
265        if peers.len() < common_tip.2.len() {
266            continue;
267        }
268        // If peers are the same length, skip if tip height is less than
269        // the most common tip height.
270        if peers.len() == common_tip.2.len() || tip.0 < common_tip.0 {
271            continue;
272        }
273        // Keep the heighest tip with the most peers
274        common_tip = (tip.0, tip.1, peers);
275    }
276
277    info!(target: "darkfid::task::sync::most_common_tip", "Most common tip: {} - {}", common_tip.0, HeaderHash::new(common_tip.1));
278    common_tip
279}
280
281/// Auxiliary function to retrieve headers backwards until our last known one and verify them.
282async fn retrieve_headers(
283    node: &DarkfiNodePtr,
284    peers: &[ChannelPtr],
285    last_known: u32,
286    tip_height: u32,
287) -> Result<()> {
288    info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers...");
289    // Communication setup
290    let mut peer_subs = vec![];
291    for peer in peers {
292        match peer.subscribe_msg::<HeaderSyncResponse>().await {
293            Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
294            Err(e) => {
295                debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncResponse` communication setup with peer {peer:?}: {e}");
296                peer_subs.push((None, true))
297            }
298        }
299    }
300
301    // We subtract 1 since tip_height is increased by one
302    let total = tip_height - last_known - 1;
303    let mut last_tip_height = tip_height;
304    'headers_loop: loop {
305        // Check if all our peers are failing
306        let mut count = 0;
307        for (peer_sub, failed) in &peer_subs {
308            if peer_sub.is_none() || *failed {
309                count += 1;
310            }
311        }
312        if count == peer_subs.len() {
313            debug!(target: "darkfid::task::sync::retrieve_headers", "All peer connections failed.");
314            break
315        }
316
317        for (index, peer) in peers.iter().enumerate() {
318            // Grab the response sub reference
319            let (peer_sub, failed) = &mut peer_subs[index];
320            if *failed {
321                continue;
322            }
323            let Some(ref response_sub) = peer_sub else {
324                continue;
325            };
326
327            // Node creates a `HeaderSyncRequest` and sends it
328            let request = HeaderSyncRequest { height: last_tip_height };
329            if let Err(e) = peer.send(&request).await {
330                debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
331                *failed = true;
332                continue
333            };
334
335            let comms_timeout = node
336                .p2p_handler
337                .p2p
338                .settings()
339                .read_arc()
340                .await
341                .outbound_connect_timeout(peer.address().scheme());
342
343            // Node waits for response
344            let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
345                debug!(target: "darkfid::task::sync::retrieve_headers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
346                *failed = true;
347                continue
348            };
349
350            // Retain only the headers after our last known
351            let mut response_headers = response.headers.to_vec();
352            response_headers.retain(|h| h.height > last_known);
353
354            if response_headers.is_empty() {
355                break 'headers_loop
356            }
357
358            // Store the headers
359            node.validator.blockchain.headers.insert_sync(&response_headers)?;
360            last_tip_height = response_headers[0].height;
361            info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{total}", node.validator.blockchain.headers.len_sync());
362        }
363    }
364
365    // Check if we retrieved any new headers
366    if node.validator.blockchain.headers.is_empty_sync() {
367        return Ok(());
368    }
369
370    // Verify headers sequence. Here we do a quick and dirty verification
371    // of just the hashes and heights sequence. We will formaly verify
372    // the blocks when we retrieve them. We verify them in batches,
373    // to not load them all in memory.
374    info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence...");
375    let mut verified_headers = 0;
376    let total = node.validator.blockchain.headers.len_sync();
377    // First we verify the first `BATCH` sequence, using the last known header
378    // as the first sync header previous.
379    let last_known = node.validator.consensus.best_fork_last_header().await?;
380    let mut headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
381    if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
382        node.validator.blockchain.headers.remove_all_sync()?;
383        return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
384    }
385    verified_headers += 1;
386    for (index, header) in headers[1..].iter().enumerate() {
387        if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
388            node.validator.blockchain.headers.remove_all_sync()?;
389            return Err(Error::BlockIsInvalid(header.hash().as_string()))
390        }
391        verified_headers += 1;
392    }
393    info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
394
395    // Now we verify the rest sequences
396    let mut last_checked = headers.last().unwrap().clone();
397    headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
398    while !headers.is_empty() {
399        if headers[0].previous != last_checked.hash() ||
400            headers[0].height != last_checked.height + 1
401        {
402            node.validator.blockchain.headers.remove_all_sync()?;
403            return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
404        }
405        verified_headers += 1;
406        for (index, header) in headers[1..].iter().enumerate() {
407            if header.previous != headers[index].hash() ||
408                header.height != headers[index].height + 1
409            {
410                node.validator.blockchain.headers.remove_all_sync()?;
411                return Err(Error::BlockIsInvalid(header.hash().as_string()))
412            }
413            verified_headers += 1;
414        }
415        last_checked = headers.last().unwrap().clone();
416        headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
417        info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
418    }
419
420    info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!");
421    Ok(())
422}
423
424/// Auxiliary function to retrieve blocks of provided headers and apply them to canonical.
425async fn retrieve_blocks(
426    node: &DarkfiNodePtr,
427    peers: &[ChannelPtr],
428    last_known: (u32, HeaderHash),
429    block_sub: &JsonSubscriber,
430    checkpoint_blocks: bool,
431) -> Result<(u32, HeaderHash)> {
432    info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers...");
433    let mut last_received = last_known;
434    // Communication setup
435    let mut peer_subs = vec![];
436    for peer in peers {
437        match peer.subscribe_msg::<SyncResponse>().await {
438            Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
439            Err(e) => {
440                debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncResponse` communication setup with peer {peer:?}: {e}");
441                peer_subs.push((None, true))
442            }
443        }
444    }
445
446    let mut received_blocks = 0;
447    let total = node.validator.blockchain.headers.len_sync();
448    'blocks_loop: loop {
449        // Check if all our peers are failing
450        let mut count = 0;
451        for (peer_sub, failed) in &peer_subs {
452            if peer_sub.is_none() || *failed {
453                count += 1;
454            }
455        }
456        if count == peer_subs.len() {
457            debug!(target: "darkfid::task::sync::retrieve_blocks", "All peer connections failed.");
458            break
459        }
460
461        'peers_loop: for (index, peer) in peers.iter().enumerate() {
462            // Grab the response sub reference
463            let (peer_sub, failed) = &mut peer_subs[index];
464            if *failed {
465                continue;
466            }
467            let Some(ref response_sub) = peer_sub else {
468                continue;
469            };
470
471            // Grab first `BATCH` headers
472            let headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
473            if headers.is_empty() {
474                break 'blocks_loop
475            }
476            let mut headers_hashes = Vec::with_capacity(headers.len());
477            let mut synced_headers = Vec::with_capacity(headers.len());
478            for header in &headers {
479                headers_hashes.push(header.hash());
480                synced_headers.push(header.height);
481            }
482
483            // Node creates a `SyncRequest` and sends it
484            let request = SyncRequest { headers: headers_hashes.clone() };
485            if let Err(e) = peer.send(&request).await {
486                debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncRequest` send to peer {peer:?}: {e}");
487                *failed = true;
488                continue
489            };
490
491            let comms_timeout = node
492                .p2p_handler
493                .p2p
494                .settings()
495                .read_arc()
496                .await
497                .outbound_connect_timeout(peer.address().scheme());
498
499            // Node waits for response
500            let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
501                debug!(target: "darkfid::task::sync::retrieve_blocks", "Timeout while waiting for `SyncResponse` from peer: {peer:?}");
502                *failed = true;
503                continue
504            };
505
506            // Verify and store retrieved blocks
507            debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
508            received_blocks += response.blocks.len();
509            if checkpoint_blocks {
510                if let Err(e) =
511                    node.validator.add_checkpoint_blocks(&response.blocks, &headers_hashes).await
512                {
513                    debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while adding checkpoint blocks: {e}");
514                    continue
515                };
516            } else {
517                for block in &response.blocks {
518                    if let Err(e) =
519                        node.validator.append_proposal(&Proposal::new(block.clone())).await
520                    {
521                        debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while appending proposal: {e}");
522                        continue 'peers_loop
523                    };
524                }
525            }
526            last_received = (*synced_headers.last().unwrap(), *headers_hashes.last().unwrap());
527
528            // Remove synced headers
529            node.validator.blockchain.headers.remove_sync(&synced_headers)?;
530
531            if checkpoint_blocks {
532                // Notify subscriber
533                let mut notif_blocks = Vec::with_capacity(response.blocks.len());
534                info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks added:");
535                for (index, block) in response.blocks.iter().enumerate() {
536                    info!(target: "darkfid::task::sync::retrieve_blocks", "\t{} - {}", headers_hashes[index], headers[index].height);
537                    notif_blocks
538                        .push(JsonValue::String(base64::encode(&serialize_async(block).await)));
539                }
540                block_sub.notify(JsonValue::Array(notif_blocks)).await;
541            } else {
542                // Perform confirmation for received blocks
543                let confirmed = node.validator.confirmation().await?;
544                if !confirmed.is_empty() {
545                    // Notify subscriber
546                    let mut notif_blocks = Vec::with_capacity(confirmed.len());
547                    for block in confirmed {
548                        notif_blocks.push(JsonValue::String(base64::encode(
549                            &serialize_async(&block).await,
550                        )));
551                    }
552                    block_sub.notify(JsonValue::Array(notif_blocks)).await;
553                }
554            }
555
556            info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {received_blocks}/{total}");
557        }
558    }
559
560    Ok(last_received)
561}
562
563/// Auxiliary function to retrieve best fork state from a random peer.
564async fn sync_best_fork(node: &DarkfiNodePtr, peers: &[ChannelPtr], last_tip: &HeaderHash) {
565    info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers...");
566    // Getting a random peer to ask for blocks
567    let peer = &peers.choose(&mut OsRng).unwrap();
568
569    // Communication setup
570    let Ok(response_sub) = peer.subscribe_msg::<ForkSyncResponse>().await else {
571        debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncResponse` communication setup with peer: {peer:?}");
572        return
573    };
574    let notif_sub = node.subscribers.get("proposals").unwrap();
575
576    // Node creates a `ForkSyncRequest` and sends it
577    let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
578    if let Err(e) = peer.send(&request).await {
579        debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncRequest` send to peer {peer:?}: {e}");
580        return
581    };
582
583    let comms_timeout = node
584        .p2p_handler
585        .p2p
586        .settings()
587        .read_arc()
588        .await
589        .outbound_connect_timeout(peer.address().scheme());
590
591    // Node waits for response
592    let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
593        debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");
594        return
595    };
596
597    // Verify and store retrieved proposals
598    debug!(target: "darkfid::task::sync::sync_best_fork", "Processing received proposals");
599    for proposal in &response.proposals {
600        if let Err(e) = node.validator.append_proposal(proposal).await {
601            debug!(target: "darkfid::task::sync::sync_best_fork", "Error while appending proposal: {e}");
602            return
603        };
604        // Notify subscriber
605        let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
606        notif_sub.notify(vec![enc_prop].into()).await;
607    }
608}