darkfid/task/
unknown_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use num_bigint::BigUint;
25use smol::{channel::Receiver, lock::RwLock};
26use tinyjson::JsonValue;
27use tracing::{debug, error, info};
28
29use darkfi::{
30    blockchain::{BlockDifficulty, HeaderHash},
31    net::ChannelPtr,
32    util::{encoding::base64, time::Timestamp},
33    validator::{
34        consensus::{Fork, Proposal},
35        pow::PoWModule,
36        utils::{best_fork_index, header_rank},
37        verification::verify_fork_proposal,
38        ValidatorPtr,
39    },
40    Error::{Custom, DatabaseError, PoWInvalidOutHash, ProposalAlreadyExists},
41    Result,
42};
43use darkfi_serial::serialize_async;
44
45use crate::{
46    proto::{
47        ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
48        ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
49        ProposalMessage, BATCH,
50    },
51    DarkfiNodePtr,
52};
53
54/// Background task to handle unknown proposals.
55pub async fn handle_unknown_proposals(
56    receiver: Receiver<(Proposal, u32)>,
57    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
58    unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
59    node: DarkfiNodePtr,
60) -> Result<()> {
61    debug!(target: "darkfid::task::handle_unknown_proposal", "START");
62    loop {
63        // Wait for a new unknown proposal trigger
64        let (proposal, channel) = match receiver.recv().await {
65            Ok(m) => m,
66            Err(e) => {
67                debug!(
68                    target: "darkfid::task::handle_unknown_proposal",
69                    "recv fail: {e}"
70                );
71                continue
72            }
73        };
74
75        // Check if proposal exists in our queue
76        let lock = unknown_proposals.read().await;
77        let contains_proposal = lock.contains(proposal.hash.inner());
78        drop(lock);
79        if !contains_proposal {
80            debug!(
81                target: "darkfid::task::handle_unknown_proposal",
82                "Proposal {} is not in our unknown proposals queue.",
83                proposal.hash,
84            );
85            continue
86        };
87
88        // Increase channel counter
89        let mut lock = unknown_proposals_channels.write().await;
90        let channel_counter = if let Some((counter, timestamp)) = lock.get_mut(&channel) {
91            *counter += 1;
92            *timestamp = Timestamp::current_time().inner();
93            *counter
94        } else {
95            lock.insert(channel, (1, Timestamp::current_time().inner()));
96            1
97        };
98        drop(lock);
99
100        // Handle the unknown proposal
101        if handle_unknown_proposal(&node, channel, &proposal).await {
102            // Ban channel if it exceeds 5 consecutive unknown proposals
103            if channel_counter > 5 {
104                if let Some(channel) = node.p2p_handler.p2p.get_channel(channel) {
105                    channel.ban().await;
106                }
107                unknown_proposals_channels.write().await.remove(&channel);
108            }
109        };
110
111        // Remove proposal from the queue
112        let mut lock = unknown_proposals.write().await;
113        lock.remove(proposal.hash.inner());
114        drop(lock);
115    }
116}
117
118/// Background task to handle an unknown proposal.
119/// Returns a boolean flag indicate if we should ban the channel.
120async fn handle_unknown_proposal(node: &DarkfiNodePtr, channel: u32, proposal: &Proposal) -> bool {
121    // If proposal fork chain was not found, we ask our peer for its sequence
122    debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
123    let Some(channel) = node.p2p_handler.p2p.get_channel(channel) else {
124        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
125        return false
126    };
127
128    // Communication setup
129    let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
130        debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
131        return true
132    };
133
134    // Grab last known block to create the request and execute it
135    let last = match node.validator.blockchain.last() {
136        Ok(l) => l,
137        Err(e) => {
138            error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
139            return false
140        }
141    };
142    let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
143    if let Err(e) = channel.send(&request).await {
144        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
145        return true
146    };
147
148    let comms_timeout = node
149        .p2p_handler
150        .p2p
151        .settings()
152        .read_arc()
153        .await
154        .outbound_connect_timeout(channel.address().scheme());
155
156    // Node waits for response
157    let response = match response_sub.receive_with_timeout(comms_timeout).await {
158        Ok(r) => r,
159        Err(e) => {
160            debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
161            return true
162        }
163    };
164    debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
165
166    // Verify and store retrieved proposals
167    debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
168
169    // Response should not be empty
170    if response.proposals.is_empty() {
171        debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
172        return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
173    }
174
175    // Sequence length must correspond to requested height
176    if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
177        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
178        return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
179    }
180
181    // First proposal must extend canonical
182    if response.proposals[0].block.header.previous != last.1 {
183        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
184        return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
185    }
186
187    // Last proposal must be the same as the one requested
188    if response.proposals.last().unwrap().hash != proposal.hash {
189        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
190        return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
191    }
192
193    // Process response proposals
194    for proposal in &response.proposals {
195        // Append proposal
196        match node.validator.append_proposal(proposal).await {
197            Ok(()) => { /* Do nothing */ }
198            // Skip already existing proposals
199            Err(ProposalAlreadyExists) => continue,
200            Err(e) => {
201                debug!(
202                    target: "darkfid::task::handle_unknown_proposal",
203                    "Error while appending response proposal: {e}"
204                );
205                break;
206            }
207        };
208
209        // Broadcast proposal to rest nodes
210        let message = ProposalMessage(proposal.clone());
211        node.p2p_handler.p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
212
213        // Notify proposals subscriber
214        let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
215        node.subscribers.get("proposals").unwrap().notify(vec![enc_prop].into()).await;
216    }
217
218    false
219}
220
221/// Auxiliary function to handle a potential reorg. We first find our
222/// last common block with the peer, then grab the header sequence from
223/// that block until the proposal and check if it ranks higher than our
224/// current best ranking fork, to perform a reorg.
225///
226/// Returns a boolean flag indicate if we should ban the channel.
227///
228/// Note: Always remember to purge new trees from the database if not
229/// needed.
230//  TODO: We keep everything in memory which can result in OOM for a
231//        valid long fork. We could use some disk space to store stuff.
232async fn handle_reorg(
233    // Node pointer
234    node: &DarkfiNodePtr,
235    // Peer channel and its communications timeout
236    channel: &(&ChannelPtr, &u64),
237    // Peer fork proposal
238    proposal: &Proposal,
239) -> bool {
240    info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {:?}", proposal.hash, proposal.block.header.height, channel.0);
241
242    // Check if genesis proposal was provided
243    if proposal.block.header.height == 0 {
244        debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
245        return true
246    }
247
248    // Find last common header and its sequence, going backwards from
249    // the proposal.
250    let (last_common_height, last_common_hash, peer_header_hashes) =
251        match retrieve_peer_header_hashes(&node.validator, channel, proposal).await {
252            Ok(t) => t,
253            Err(DatabaseError(e)) => {
254                error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer headers hashes: {e}");
255                return false
256            }
257            Err(e) => {
258                error!(target: "darkfid::task::handle_reorg", "Retrieving peer headers hashes failed: {e}");
259                return true
260            }
261        };
262
263    // Create a new PoW module from last common height
264    let module = match PoWModule::new(
265        node.validator.consensus.blockchain.clone(),
266        node.validator.consensus.module.read().await.target,
267        node.validator.consensus.module.read().await.fixed_difficulty.clone(),
268        Some(last_common_height + 1),
269    ) {
270        Ok(m) => m,
271        Err(e) => {
272            error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
273            return false
274        }
275    };
276
277    // Grab last common height ranks
278    let last_difficulty = match last_common_height {
279        0 => {
280            let genesis_timestamp = match node.validator.blockchain.genesis_block() {
281                Ok(b) => b.header.timestamp,
282                Err(e) => {
283                    error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
284                    return false
285                }
286            };
287            BlockDifficulty::genesis(genesis_timestamp)
288        }
289        _ => match node.validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
290            Ok(d) => d[0].clone().unwrap(),
291            Err(e) => {
292                error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
293                return false
294            }
295        },
296    };
297
298    // Retrieve the headers of the hashes sequence and its ranking
299    let (targets_rank, hashes_rank) = match retrieve_peer_headers_sequence_ranking(
300        (&last_common_height, &last_common_hash, &module, &last_difficulty),
301        channel,
302        proposal,
303        &peer_header_hashes,
304    )
305    .await
306    {
307        Ok(p) => p,
308        Err(DatabaseError(e)) => {
309            error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer headers: {e}");
310            return false
311        }
312        Err(e) => {
313            error!(target: "darkfid::task::handle_reorg", "Retrieving peer headers failed: {e}");
314            return true
315        }
316    };
317
318    // Grab the append lock so no other proposal gets processed while
319    // we are verifying the sequence.
320    let append_lock = node.validator.consensus.append_lock.write().await;
321
322    // Check if the sequence ranks higher than our current best fork
323    let mut forks = node.validator.consensus.forks.write().await;
324    let index = match best_fork_index(&forks) {
325        Ok(i) => i,
326        Err(e) => {
327            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
328            return false
329        }
330    };
331    let best_fork = &forks[index];
332    if targets_rank < best_fork.targets_rank ||
333        (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
334    {
335        info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
336        return true
337    }
338
339    // Generate the peer fork and retrieve its ranking
340    let peer_fork = match retrieve_peer_fork(
341        &node.validator,
342        (&last_common_height, &module, &last_difficulty),
343        channel,
344        proposal,
345        &peer_header_hashes,
346    )
347    .await
348    {
349        Ok(p) => p,
350        Err(DatabaseError(e)) => {
351            error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer fork: {e}");
352            return false
353        }
354        Err(e) => {
355            error!(target: "darkfid::task::handle_reorg", "Retrieving peer fork failed: {e}");
356            return true
357        }
358    };
359
360    // Check if the peer fork ranks higher than our current best fork
361    if peer_fork.targets_rank < best_fork.targets_rank ||
362        (peer_fork.targets_rank == best_fork.targets_rank &&
363            peer_fork.hashes_rank <= best_fork.hashes_rank)
364    {
365        info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
366        return true
367    }
368
369    // Execute the reorg
370    info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
371    if let Err(e) = node.validator.blockchain.reset_to_height(last_common_height) {
372        error!(target: "darkfid::task::handle_reorg", "Applying full inverse diff failed: {e}");
373        return false
374    };
375    *node.validator.consensus.module.write().await = module;
376    *forks = vec![peer_fork];
377    drop(forks);
378    drop(append_lock);
379
380    // Check if we can confirm anything and broadcast them
381    let confirmed = match node.validator.confirmation().await {
382        Ok(f) => f,
383        Err(e) => {
384            error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
385            return false
386        }
387    };
388
389    // Refresh mining registry
390    if let Err(e) = node.registry.refresh(&node.validator).await {
391        error!(target: "darkfid::task::handle_reorg", "Failed refreshing mining block templates: {e}")
392    }
393
394    if !confirmed.is_empty() {
395        let mut notif_blocks = Vec::with_capacity(confirmed.len());
396        for block in confirmed {
397            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
398        }
399        node.subscribers.get("blocks").unwrap().notify(JsonValue::Array(notif_blocks)).await;
400    }
401
402    // Broadcast proposal to the network
403    let message = ProposalMessage(proposal.clone());
404    node.p2p_handler.p2p.broadcast(&message).await;
405
406    // Notify proposals subscriber
407    let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
408    node.subscribers.get("proposals").unwrap().notify(vec![enc_prop].into()).await;
409
410    false
411}
412
413/// Auxiliary function to retrieve the last common header and height,
414/// along with the headers sequence up to provided peer proposal.
415async fn retrieve_peer_header_hashes(
416    // Validator pointer
417    validator: &ValidatorPtr,
418    // Peer channel and its communications timeout
419    channel: &(&ChannelPtr, &u64),
420    // Peer fork proposal
421    proposal: &Proposal,
422) -> Result<(u32, HeaderHash, Vec<HeaderHash>)> {
423    // Communication setup
424    let response_sub = channel.0.subscribe_msg::<ForkHeaderHashResponse>().await?;
425
426    // Keep track of received header hashes sequence
427    let mut peer_header_hashes = vec![];
428
429    // Find last common header, going backwards from the proposal
430    let mut previous_height = proposal.block.header.height;
431    let mut previous_hash = proposal.hash;
432    for height in (0..proposal.block.header.height).rev() {
433        // Request peer header hash for this height
434        let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
435        channel.0.send(&request).await?;
436
437        // Node waits for response
438        let response = response_sub.receive_with_timeout(*channel.1).await?;
439        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
440
441        // Check if peer returned a header
442        let Some(peer_header) = response.fork_header else {
443            return Err(Custom(String::from("Peer responded with an empty header")))
444        };
445
446        // Check if we know this header
447        let headers = match validator.blockchain.blocks.get_order(&[height], false) {
448            Ok(h) => h,
449            Err(e) => return Err(DatabaseError(format!("Retrieving headers failed: {e}"))),
450        };
451        match headers[0] {
452            Some(known_header) => {
453                if known_header == peer_header {
454                    previous_height = height;
455                    previous_hash = known_header;
456                    break
457                }
458                // Since we retrieve in right -> left order we push them in reverse order
459                peer_header_hashes.insert(0, peer_header);
460            }
461            None => peer_header_hashes.insert(0, peer_header),
462        }
463    }
464
465    Ok((previous_height, previous_hash, peer_header_hashes))
466}
467
468/// Auxiliary function to retrieve provided peer headers hashes
469/// sequence and its ranking, based on provided last common
470/// information.
471async fn retrieve_peer_headers_sequence_ranking(
472    // Last common header, PoW module and difficulty
473    last_common_info: (&u32, &HeaderHash, &PoWModule, &BlockDifficulty),
474    // Peer channel and its communications timeout
475    channel: &(&ChannelPtr, &u64),
476    // Peer fork trigger proposal
477    proposal: &Proposal,
478    // Peer header hashes sequence
479    header_hashes: &[HeaderHash],
480) -> Result<(BigUint, BigUint)> {
481    // Communication setup
482    let response_sub = channel.0.subscribe_msg::<ForkHeadersResponse>().await?;
483
484    // Retrieve the headers of the hashes sequence, in batches, keeping track of the sequence ranking
485    info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", header_hashes.len());
486    let mut previous_height = *last_common_info.0;
487    let mut previous_hash = *last_common_info.1;
488    let mut module = last_common_info.2.clone();
489    let mut targets_rank = last_common_info.3.ranks.targets_rank.clone();
490    let mut hashes_rank = last_common_info.3.ranks.hashes_rank.clone();
491    let mut batch = Vec::with_capacity(BATCH);
492    let mut total_processed = 0;
493    for (index, hash) in header_hashes.iter().enumerate() {
494        // Add hash in batch sequence
495        batch.push(*hash);
496
497        // Check if batch is full so we can send it
498        if batch.len() < BATCH && index != header_hashes.len() - 1 {
499            continue
500        }
501
502        // Request peer headers
503        let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
504        channel.0.send(&request).await?;
505
506        // Node waits for response
507        let response = response_sub.receive_with_timeout(*channel.1).await?;
508        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
509
510        // Response sequence must be the same length as the one requested
511        if response.headers.len() != batch.len() {
512            return Err(Custom(String::from(
513                "Peer responded with a different headers sequence length",
514            )))
515        }
516
517        // Process retrieved headers
518        for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
519            let peer_header_hash = peer_header.hash();
520            debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
521
522            // Validate its the header we requested
523            if peer_header_hash != batch[peer_header_index] {
524                return Err(Custom(format!(
525                    "Peer responded with a differend header: {} - {peer_header_hash}",
526                    batch[peer_header_index]
527                )))
528            }
529
530            // Validate sequence is correct
531            if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
532                return Err(Custom(String::from("Invalid header sequence detected")))
533            }
534
535            // Verify header hash and calculate its rank
536            let (next_difficulty, target_distance_sq, hash_distance_sq) =
537                match header_rank(&module, peer_header) {
538                    Ok(tuple) => tuple,
539                    Err(PoWInvalidOutHash) => return Err(PoWInvalidOutHash),
540                    Err(e) => {
541                        return Err(DatabaseError(format!("Computing header rank failed: {e}")))
542                    }
543                };
544
545            // Update sequence ranking
546            targets_rank += target_distance_sq.clone();
547            hashes_rank += hash_distance_sq.clone();
548
549            // Update PoW headers module
550            module.append(peer_header, &next_difficulty)?;
551
552            // Set previous header
553            previous_height = peer_header.height;
554            previous_hash = peer_header_hash;
555        }
556
557        total_processed += response.headers.len();
558        info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", header_hashes.len());
559
560        // Reset batch
561        batch = Vec::with_capacity(BATCH);
562    }
563
564    // Validate trigger proposal header sequence is correct
565    if proposal.block.header.previous != previous_hash ||
566        proposal.block.header.height != previous_height + 1
567    {
568        return Err(Custom(String::from("Invalid header sequence detected")))
569    }
570
571    // Verify trigger proposal header hash and calculate its rank
572    let (_, target_distance_sq, hash_distance_sq) =
573        match header_rank(&module, &proposal.block.header) {
574            Ok(tuple) => tuple,
575            Err(PoWInvalidOutHash) => return Err(PoWInvalidOutHash),
576            Err(e) => return Err(DatabaseError(format!("Computing header rank failed: {e}"))),
577        };
578
579    // Update sequence ranking
580    targets_rank += target_distance_sq.clone();
581    hashes_rank += hash_distance_sq.clone();
582
583    Ok((targets_rank, hashes_rank))
584}
585
586/// Auxiliary function to generate provided peer headers hashes fork
587/// and its ranking, based on provided last common information.
588async fn retrieve_peer_fork(
589    // Validator pointer
590    validator: &ValidatorPtr,
591    // Last common header height, PoW module and difficulty
592    last_common_info: (&u32, &PoWModule, &BlockDifficulty),
593    // Peer channel and its communications timeout
594    channel: &(&ChannelPtr, &u64),
595    // Peer fork trigger proposal
596    proposal: &Proposal,
597    // Peer header hashes sequence
598    header_hashes: &[HeaderHash],
599) -> Result<Fork> {
600    // Communication setup
601    let response_sub = channel.0.subscribe_msg::<ForkProposalsResponse>().await?;
602
603    // Create a fork from last common height
604    let mut peer_fork =
605        match Fork::new(validator.consensus.blockchain.clone(), last_common_info.1.clone()).await {
606            Ok(f) => f,
607            Err(e) => return Err(DatabaseError(format!("Generating peer fork failed: {e}"))),
608        };
609    peer_fork.targets_rank = last_common_info.2.ranks.targets_rank.clone();
610    peer_fork.hashes_rank = last_common_info.2.ranks.hashes_rank.clone();
611
612    // Grab all state inverse diffs after last common height, and add them to the fork
613    let inverse_diffs =
614        match validator.blockchain.blocks.get_state_inverse_diffs_after(*last_common_info.0) {
615            Ok(i) => i,
616            Err(e) => {
617                return Err(DatabaseError(format!("Retrieving state inverse diffs failed: {e}")))
618            }
619        };
620    for inverse_diff in inverse_diffs.iter().rev() {
621        let result =
622            peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff);
623        if let Err(e) = result {
624            return Err(DatabaseError(format!("Applying state inverse diff failed: {e}")))
625        }
626    }
627
628    // Grab current overlay diff and use it as the first diff of the
629    // peer fork, so all consecutive diffs represent just the proposal
630    // changes.
631    let diff = peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().diff(&[]);
632    let diff = match diff {
633        Ok(d) => d,
634        Err(e) => {
635            return Err(DatabaseError(format!("Generate full state inverse diff failed: {e}")))
636        }
637    };
638    peer_fork.diffs = vec![diff];
639
640    // Retrieve the proposals of the hashes sequence, in batches
641    info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", header_hashes.len());
642    let mut batch = Vec::with_capacity(BATCH);
643    let mut total_processed = 0;
644    for (index, hash) in header_hashes.iter().enumerate() {
645        // Add hash in batch sequence
646        batch.push(*hash);
647
648        // Check if batch is full so we can send it
649        if batch.len() < BATCH && index != header_hashes.len() - 1 {
650            continue
651        }
652
653        // Request peer proposals
654        let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
655        channel.0.send(&request).await?;
656
657        // Node waits for response
658        let response = response_sub.receive_with_timeout(*channel.1).await?;
659        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
660
661        // Response sequence must be the same length as the one requested
662        if response.proposals.len() != batch.len() {
663            return Err(Custom(String::from(
664                "Peer responded with a different proposals sequence length",
665            )))
666        }
667
668        // Process retrieved proposal
669        for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
670            info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
671
672            // Validate its the proposal we requested
673            if peer_proposal.hash != batch[peer_proposal_index] {
674                return Err(Custom(format!(
675                    "Peer responded with a differend proposal: {} - {}",
676                    batch[peer_proposal_index], peer_proposal.hash
677                )))
678            }
679
680            // Verify proposal
681            verify_fork_proposal(&mut peer_fork, peer_proposal, validator.verify_fees).await?;
682
683            // Append proposal
684            peer_fork.append_proposal(peer_proposal).await?;
685        }
686
687        total_processed += response.proposals.len();
688        info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", header_hashes.len());
689
690        // Reset batch
691        batch = Vec::with_capacity(BATCH);
692    }
693
694    // Verify trigger proposal
695    verify_fork_proposal(&mut peer_fork, proposal, validator.verify_fees).await?;
696
697    // Append trigger proposal
698    peer_fork.append_proposal(proposal).await?;
699
700    // Remove the reorg diff from the fork
701    peer_fork.diffs.remove(0);
702
703    Ok(peer_fork)
704}