darkfid/task/
unknown_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use num_bigint::BigUint;
25use smol::{channel::Receiver, lock::RwLock};
26use tinyjson::JsonValue;
27use tracing::{debug, error, info};
28
29use darkfi::{
30    blockchain::{BlockDifficulty, BlockchainOverlay, HeaderHash},
31    net::ChannelPtr,
32    util::{encoding::base64, time::Timestamp},
33    validator::{
34        consensus::{Fork, Proposal},
35        pow::PoWModule,
36        utils::{best_fork_index, header_rank},
37        verification::verify_fork_proposal,
38        Validator, ValidatorPtr,
39    },
40    Error::{Custom, DatabaseError, PoWInvalidOutHash, ProposalAlreadyExists},
41    Result,
42};
43use darkfi_serial::serialize_async;
44
45use crate::{
46    proto::{
47        ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
48        ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
49        ProposalMessage, BATCH,
50    },
51    DarkfiNodePtr,
52};
53
54/// Background task to handle unknown proposals.
55pub async fn handle_unknown_proposals(
56    receiver: Receiver<(Proposal, u32)>,
57    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
58    unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
59    node: DarkfiNodePtr,
60) -> Result<()> {
61    debug!(target: "darkfid::task::handle_unknown_proposal", "START");
62    loop {
63        // Wait for a new unknown proposal trigger
64        let (proposal, channel) = match receiver.recv().await {
65            Ok(m) => m,
66            Err(e) => {
67                debug!(
68                    target: "darkfid::task::handle_unknown_proposal",
69                    "recv fail: {e}"
70                );
71                continue
72            }
73        };
74
75        // Check if proposal exists in our queue
76        let lock = unknown_proposals.read().await;
77        let contains_proposal = lock.contains(proposal.hash.inner());
78        drop(lock);
79        if !contains_proposal {
80            debug!(
81                target: "darkfid::task::handle_unknown_proposal",
82                "Proposal {} is not in our unknown proposals queue.",
83                proposal.hash,
84            );
85            continue
86        };
87
88        // Increase channel counter
89        let mut lock = unknown_proposals_channels.write().await;
90        let channel_counter = if let Some((counter, timestamp)) = lock.get_mut(&channel) {
91            *counter += 1;
92            *timestamp = Timestamp::current_time().inner();
93            *counter
94        } else {
95            lock.insert(channel, (1, Timestamp::current_time().inner()));
96            1
97        };
98        drop(lock);
99
100        // Handle the unknown proposal
101        if handle_unknown_proposal(&node, channel, &proposal).await {
102            // Ban channel if it exceeds 5 consecutive unknown proposals
103            if channel_counter > 5 {
104                if let Some(channel) = node.p2p_handler.p2p.get_channel(channel) {
105                    channel.ban().await;
106                }
107                unknown_proposals_channels.write().await.remove(&channel);
108            }
109        };
110
111        // Remove proposal from the queue
112        let mut lock = unknown_proposals.write().await;
113        lock.remove(proposal.hash.inner());
114        drop(lock);
115    }
116}
117
118/// Background task to handle an unknown proposal.
119/// Returns a boolean flag indicate if we should ban the channel.
120async fn handle_unknown_proposal(node: &DarkfiNodePtr, channel: u32, proposal: &Proposal) -> bool {
121    // If proposal fork chain was not found, we ask our peer for its sequence
122    debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
123    let Some(channel) = node.p2p_handler.p2p.get_channel(channel) else {
124        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
125        return false
126    };
127
128    // Communication setup
129    let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
130        debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
131        return true
132    };
133
134    // Grab last known block to create the request and execute it
135    let last = match node.validator.read().await.blockchain.last() {
136        Ok(l) => l,
137        Err(e) => {
138            error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
139            return false
140        }
141    };
142    let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
143    if let Err(e) = channel.send(&request).await {
144        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
145        return true
146    };
147
148    let comms_timeout = node
149        .p2p_handler
150        .p2p
151        .settings()
152        .read_arc()
153        .await
154        .outbound_connect_timeout(channel.address().scheme());
155
156    // Node waits for response
157    let response = match response_sub.receive_with_timeout(comms_timeout).await {
158        Ok(r) => r,
159        Err(e) => {
160            debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
161            return true
162        }
163    };
164    debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
165
166    // Verify and store retrieved proposals
167    debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
168
169    // Response should not be empty
170    if response.proposals.is_empty() {
171        debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
172        return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
173    }
174
175    // Sequence length must correspond to requested height
176    if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
177        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
178        return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
179    }
180
181    // First proposal must extend canonical
182    if response.proposals[0].block.header.previous != last.1 {
183        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
184        return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
185    }
186
187    // Last proposal must be the same as the one requested
188    if response.proposals.last().unwrap().hash != proposal.hash {
189        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
190        return handle_reorg(node, &(&channel, &comms_timeout), proposal).await
191    }
192
193    // Process response proposals
194    for proposal in &response.proposals {
195        // Append proposal
196        match node.validator.write().await.append_proposal(proposal).await {
197            Ok(()) => { /* Do nothing */ }
198            // Skip already existing proposals
199            Err(ProposalAlreadyExists) => continue,
200            Err(e) => {
201                debug!(
202                    target: "darkfid::task::handle_unknown_proposal",
203                    "Error while appending response proposal: {e}"
204                );
205                break;
206            }
207        };
208
209        // Broadcast proposal to rest nodes
210        let message = ProposalMessage(proposal.clone());
211        node.p2p_handler.p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
212
213        // Notify proposals subscriber
214        let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
215        node.subscribers.get("proposals").unwrap().notify(vec![enc_prop].into()).await;
216    }
217
218    false
219}
220
221/// Auxiliary function to handle a potential reorg. We first find our
222/// last common block with the peer, then grab the header sequence from
223/// that block until the proposal and check if it ranks higher than our
224/// current best ranking fork, to perform a reorg.
225///
226/// Returns a boolean flag indicate if we should ban the channel.
227///
228/// Note: Always remember to purge new trees from the database if not
229/// needed.
230//  TODO: We keep everything in memory which can result in OOM for a
231//        valid long fork. We could use some disk space to store stuff.
232async fn handle_reorg(
233    // Node pointer
234    node: &DarkfiNodePtr,
235    // Peer channel and its communications timeout
236    channel: &(&ChannelPtr, &u64),
237    // Peer fork proposal
238    proposal: &Proposal,
239) -> bool {
240    info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {:?}", proposal.hash, proposal.block.header.height, channel.0);
241
242    // Check if genesis proposal was provided
243    if proposal.block.header.height == 0 {
244        debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
245        return true
246    }
247
248    // Find last common header and its sequence, going backwards from
249    // the proposal.
250    let (last_common_height, last_common_hash, peer_header_hashes) =
251        match retrieve_peer_header_hashes(&node.validator, channel, proposal).await {
252            Ok(t) => t,
253            Err(DatabaseError(e)) => {
254                error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer headers hashes: {e}");
255                return false
256            }
257            Err(e) => {
258                error!(target: "darkfid::task::handle_reorg", "Retrieving peer headers hashes failed: {e}");
259                return true
260            }
261        };
262
263    // Create a new PoW module from last common height
264    let validator = node.validator.read().await;
265    let module = match PoWModule::new(
266        validator.consensus.blockchain.clone(),
267        validator.consensus.module.target,
268        validator.consensus.module.fixed_difficulty.clone(),
269        Some(last_common_height + 1),
270    ) {
271        Ok(m) => m,
272        Err(e) => {
273            error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
274            return false
275        }
276    };
277
278    // Grab last common height ranks
279    let last_difficulty = match last_common_height {
280        0 => {
281            let genesis_timestamp = match validator.blockchain.genesis_block() {
282                Ok(b) => b.header.timestamp,
283                Err(e) => {
284                    error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
285                    return false
286                }
287            };
288            BlockDifficulty::genesis(genesis_timestamp)
289        }
290        _ => match validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
291            Ok(d) => d[0].clone().unwrap(),
292            Err(e) => {
293                error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
294                return false
295            }
296        },
297    };
298    drop(validator);
299
300    // Retrieve the headers of the hashes sequence and its ranking
301    let (targets_rank, hashes_rank) = match retrieve_peer_headers_sequence_ranking(
302        (&last_common_height, &last_common_hash, &module, &last_difficulty),
303        channel,
304        proposal,
305        &peer_header_hashes,
306    )
307    .await
308    {
309        Ok(p) => p,
310        Err(DatabaseError(e)) => {
311            error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer headers: {e}");
312            return false
313        }
314        Err(e) => {
315            error!(target: "darkfid::task::handle_reorg", "Retrieving peer headers failed: {e}");
316            return true
317        }
318    };
319
320    // Check if the sequence ranks higher than our current best fork
321    let validator = node.validator.read().await;
322    let index = match best_fork_index(&validator.consensus.forks) {
323        Ok(i) => i,
324        Err(e) => {
325            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
326            return false
327        }
328    };
329    let best_fork = &validator.consensus.forks[index];
330    if targets_rank < best_fork.targets_rank ||
331        (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
332    {
333        info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
334        return true
335    }
336    drop(validator);
337
338    // Retrieve the proposals of the hashes sequence
339    info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
340    let peer_proposals = match retrieve_peer_proposals_sequence(
341        channel,
342        proposal,
343        &peer_header_hashes,
344    )
345    .await
346    {
347        Ok(p) => p,
348        Err(e) => {
349            error!(target: "darkfid::task::handle_reorg", "Retrieving peer proposals failed: {e}");
350            return true
351        }
352    };
353
354    // Grab the validator lock so no other proposal gets processed
355    // while we are verifying the sequence.
356    let mut validator = node.validator.write().await;
357
358    // Check if the sequence still ranks higher than our current best fork
359    let index = match best_fork_index(&validator.consensus.forks) {
360        Ok(i) => i,
361        Err(e) => {
362            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
363            return false
364        }
365    };
366    let best_fork = &validator.consensus.forks[index];
367    if targets_rank < best_fork.targets_rank ||
368        (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
369    {
370        info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
371        return true
372    }
373
374    // Generate the peer fork and retrieve its ranking
375    info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, generating its fork...");
376    let mut peer_fork = match generate_peer_fork(
377        &validator,
378        (&last_common_height, &module, &last_difficulty),
379        &peer_proposals,
380    )
381    .await
382    {
383        Ok(f) => f,
384        Err(DatabaseError(e)) => {
385            error!(target: "darkfid::task::handle_reorg", "Internal error while retrieving peer fork: {e}");
386            return false
387        }
388        Err(e) => {
389            error!(target: "darkfid::task::handle_reorg", "Generatating peer fork failed: {e}");
390            return true
391        }
392    };
393
394    // Execute the reorg
395    info!(target: "darkfid::task::handle_reorg", "Executing reorg...");
396    if let Err(e) = validator.blockchain.reset_to_height(last_common_height) {
397        error!(target: "darkfid::task::handle_reorg", "Applying full inverse diff failed: {e}");
398        return false
399    };
400
401    // Update fork diffs to forward-only ones
402    let overlay = match BlockchainOverlay::new(&validator.blockchain) {
403        Ok(o) => o,
404        Err(e) => {
405            error!(target: "darkfid::task::handle_reorg", "Generating a new blockchain overlay failed: {e}");
406            return false
407        }
408    };
409    let mut diffs = Vec::with_capacity(peer_fork.diffs.len());
410    for diff in peer_fork.diffs {
411        let overlay = overlay.lock().unwrap();
412        let mut overlay = overlay.overlay.lock().unwrap();
413        if let Err(e) = overlay.add_diff(&diff) {
414            error!(target: "darkfid::task::handle_reorg", "Applying peer fork diff failed: {e}");
415            return false
416        }
417        match overlay.diff(&diffs) {
418            Ok(diff) => diffs.push(diff),
419            Err(e) => {
420                error!(target: "darkfid::task::handle_reorg", "Generate clean state inverse diff failed: {e}");
421                return false
422            }
423        }
424    }
425    peer_fork.overlay = overlay;
426    peer_fork.diffs = diffs;
427
428    // Update validator consensus state
429    validator.consensus.module = module;
430    validator.consensus.forks = vec![peer_fork];
431
432    // Check if we can confirm anything and broadcast them
433    let confirmed = match validator.confirmation().await {
434        Ok(f) => f,
435        Err(e) => {
436            error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
437            return false
438        }
439    };
440
441    // Refresh mining registry
442    if let Err(e) = node.registry.state.write().await.refresh(&validator).await {
443        error!(target: "darkfid::task::handle_reorg", "Failed refreshing mining block templates: {e}")
444    }
445
446    if !confirmed.is_empty() {
447        let mut notif_blocks = Vec::with_capacity(confirmed.len());
448        for block in confirmed {
449            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
450        }
451        node.subscribers.get("blocks").unwrap().notify(JsonValue::Array(notif_blocks)).await;
452    }
453
454    // Broadcast proposal to the network
455    let message = ProposalMessage(proposal.clone());
456    node.p2p_handler.p2p.broadcast(&message).await;
457
458    // Notify proposals subscriber
459    let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
460    node.subscribers.get("proposals").unwrap().notify(vec![enc_prop].into()).await;
461
462    false
463}
464
465/// Auxiliary function to retrieve the last common header and height,
466/// along with the headers sequence up to provided peer proposal.
467async fn retrieve_peer_header_hashes(
468    // Validator pointer
469    validator: &ValidatorPtr,
470    // Peer channel and its communications timeout
471    channel: &(&ChannelPtr, &u64),
472    // Peer fork proposal
473    proposal: &Proposal,
474) -> Result<(u32, HeaderHash, Vec<HeaderHash>)> {
475    // Communication setup
476    let response_sub = channel.0.subscribe_msg::<ForkHeaderHashResponse>().await?;
477
478    // Keep track of received header hashes sequence
479    let mut peer_header_hashes = vec![];
480
481    // Find last common header, going backwards from the proposal
482    let mut previous_height = proposal.block.header.height;
483    let mut previous_hash = proposal.hash;
484    for height in (0..proposal.block.header.height).rev() {
485        // Request peer header hash for this height
486        let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
487        channel.0.send(&request).await?;
488
489        // Node waits for response
490        let response = response_sub.receive_with_timeout(*channel.1).await?;
491        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
492
493        // Check if peer returned a header
494        let Some(peer_header) = response.fork_header else {
495            return Err(Custom(String::from("Peer responded with an empty header")))
496        };
497
498        // Check if we know this header
499        let headers = match validator.read().await.blockchain.blocks.get_order(&[height], false) {
500            Ok(h) => h,
501            Err(e) => return Err(DatabaseError(format!("Retrieving headers failed: {e}"))),
502        };
503        match headers[0] {
504            Some(known_header) => {
505                if known_header == peer_header {
506                    previous_height = height;
507                    previous_hash = known_header;
508                    break
509                }
510                // Since we retrieve in right -> left order we push them in reverse order
511                peer_header_hashes.insert(0, peer_header);
512            }
513            None => peer_header_hashes.insert(0, peer_header),
514        }
515    }
516
517    Ok((previous_height, previous_hash, peer_header_hashes))
518}
519
520/// Auxiliary function to retrieve provided peer headers hashes
521/// sequence and its ranking, based on provided last common
522/// information.
523async fn retrieve_peer_headers_sequence_ranking(
524    // Last common header, PoW module and difficulty
525    last_common_info: (&u32, &HeaderHash, &PoWModule, &BlockDifficulty),
526    // Peer channel and its communications timeout
527    channel: &(&ChannelPtr, &u64),
528    // Peer fork trigger proposal
529    proposal: &Proposal,
530    // Peer header hashes sequence
531    header_hashes: &[HeaderHash],
532) -> Result<(BigUint, BigUint)> {
533    // Communication setup
534    let response_sub = channel.0.subscribe_msg::<ForkHeadersResponse>().await?;
535
536    // Retrieve the headers of the hashes sequence, in batches, keeping track of the sequence ranking
537    info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", header_hashes.len());
538    let mut previous_height = *last_common_info.0;
539    let mut previous_hash = *last_common_info.1;
540    let mut module = last_common_info.2.clone();
541    let mut targets_rank = last_common_info.3.ranks.targets_rank.clone();
542    let mut hashes_rank = last_common_info.3.ranks.hashes_rank.clone();
543    let mut batch = Vec::with_capacity(BATCH);
544    let mut total_processed = 0;
545    for (index, hash) in header_hashes.iter().enumerate() {
546        // Add hash in batch sequence
547        batch.push(*hash);
548
549        // Check if batch is full so we can send it
550        if batch.len() < BATCH && index != header_hashes.len() - 1 {
551            continue
552        }
553
554        // Request peer headers
555        let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
556        channel.0.send(&request).await?;
557
558        // Node waits for response
559        let response = response_sub.receive_with_timeout(*channel.1).await?;
560        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
561
562        // Response sequence must be the same length as the one requested
563        if response.headers.len() != batch.len() {
564            return Err(Custom(String::from(
565                "Peer responded with a different headers sequence length",
566            )))
567        }
568
569        // Process retrieved headers
570        for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
571            let peer_header_hash = peer_header.hash();
572            debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
573
574            // Validate its the header we requested
575            if peer_header_hash != batch[peer_header_index] {
576                return Err(Custom(format!(
577                    "Peer responded with a differend header: {} - {peer_header_hash}",
578                    batch[peer_header_index]
579                )))
580            }
581
582            // Validate sequence is correct
583            if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
584                return Err(Custom(String::from("Invalid header sequence detected")))
585            }
586
587            // Verify header hash and calculate its rank
588            let (next_difficulty, target_distance_sq, hash_distance_sq) =
589                match header_rank(&mut module, peer_header) {
590                    Ok(tuple) => tuple,
591                    Err(PoWInvalidOutHash) => return Err(PoWInvalidOutHash),
592                    Err(e) => {
593                        return Err(DatabaseError(format!("Computing header rank failed: {e}")))
594                    }
595                };
596
597            // Update sequence ranking
598            targets_rank += target_distance_sq.clone();
599            hashes_rank += hash_distance_sq.clone();
600
601            // Update PoW headers module
602            module.append(peer_header, &next_difficulty)?;
603
604            // Set previous header
605            previous_height = peer_header.height;
606            previous_hash = peer_header_hash;
607        }
608
609        total_processed += response.headers.len();
610        info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", header_hashes.len());
611
612        // Reset batch
613        batch = Vec::with_capacity(BATCH);
614    }
615
616    // Validate trigger proposal header sequence is correct
617    if proposal.block.header.previous != previous_hash ||
618        proposal.block.header.height != previous_height + 1
619    {
620        return Err(Custom(String::from("Invalid header sequence detected")))
621    }
622
623    // Verify trigger proposal header hash and calculate its rank
624    let (_, target_distance_sq, hash_distance_sq) =
625        match header_rank(&mut module, &proposal.block.header) {
626            Ok(tuple) => tuple,
627            Err(PoWInvalidOutHash) => return Err(PoWInvalidOutHash),
628            Err(e) => return Err(DatabaseError(format!("Computing header rank failed: {e}"))),
629        };
630
631    // Update sequence ranking
632    targets_rank += target_distance_sq.clone();
633    hashes_rank += hash_distance_sq.clone();
634
635    Ok((targets_rank, hashes_rank))
636}
637
638/// Auxiliary function to retrieve provided peer proposals hashes
639/// sequence.
640async fn retrieve_peer_proposals_sequence(
641    // Peer channel and its communications timeout
642    channel: &(&ChannelPtr, &u64),
643    // Peer fork trigger proposal
644    proposal: &Proposal,
645    // Peer header hashes sequence
646    header_hashes: &[HeaderHash],
647) -> Result<Vec<Proposal>> {
648    // Communication setup
649    let response_sub = channel.0.subscribe_msg::<ForkProposalsResponse>().await?;
650
651    // Retrieve the proposals of the hashes sequence, in batches
652    let mut batch = Vec::with_capacity(BATCH);
653    let mut proposals = Vec::with_capacity(header_hashes.len());
654    for (index, hash) in header_hashes.iter().enumerate() {
655        // Add hash in batch sequence
656        batch.push(*hash);
657
658        // Check if batch is full so we can send it
659        if batch.len() < BATCH && index != header_hashes.len() - 1 {
660            continue
661        }
662
663        // Request peer proposals
664        let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
665        channel.0.send(&request).await?;
666
667        // Node waits for response
668        let response = response_sub.receive_with_timeout(*channel.1).await?;
669        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
670
671        // Response sequence must be the same length as the one requested
672        if response.proposals.len() != batch.len() {
673            return Err(Custom(String::from(
674                "Peer responded with a different proposals sequence length",
675            )))
676        }
677
678        // Process retrieved proposal
679        for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
680            // Validate its the proposal we requested
681            if peer_proposal.hash != batch[peer_proposal_index] {
682                return Err(Custom(format!(
683                    "Peer responded with a differend proposal: {} - {}",
684                    batch[peer_proposal_index], peer_proposal.hash
685                )))
686            }
687            proposals.push(peer_proposal.clone());
688        }
689
690        info!(target: "darkfid::task::handle_reorg", "Proposals received: {}/{}", proposals.len(), header_hashes.len());
691
692        // Reset batch
693        batch = Vec::with_capacity(BATCH);
694    }
695
696    // Insert trigger proposal
697    proposals.push(proposal.clone());
698
699    Ok(proposals)
700}
701
702/// Auxiliary function to generate provided peer proposals sequence
703/// fork and its ranking, based on provided last common information.
704async fn generate_peer_fork(
705    // Validator pointer
706    validator: &Validator,
707    // Last common header height, PoW module and difficulty
708    last_common_info: (&u32, &PoWModule, &BlockDifficulty),
709    // Peer proposals sequence
710    proposals: &[Proposal],
711) -> Result<Fork> {
712    // Create a fork from last common height
713    let mut fork =
714        match Fork::new(validator.consensus.blockchain.clone(), last_common_info.1.clone()).await {
715            Ok(f) => f,
716            Err(e) => return Err(DatabaseError(format!("Generating peer fork failed: {e}"))),
717        };
718    fork.targets_rank = last_common_info.2.ranks.targets_rank.clone();
719    fork.hashes_rank = last_common_info.2.ranks.hashes_rank.clone();
720
721    // Grab all state inverse diffs after last common height, and add them to the fork
722    let inverse_diffs =
723        match validator.blockchain.blocks.get_state_inverse_diffs_after(*last_common_info.0) {
724            Ok(i) => i,
725            Err(e) => {
726                return Err(DatabaseError(format!("Retrieving state inverse diffs failed: {e}")))
727            }
728        };
729    for inverse_diff in inverse_diffs.iter().rev() {
730        let result = fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff);
731        if let Err(e) = result {
732            return Err(DatabaseError(format!("Applying state inverse diff failed: {e}")))
733        }
734    }
735
736    // Grab current overlay diff and use it as the first diff of the
737    // peer fork, so all consecutive diffs represent just the proposal
738    // changes.
739    let diff = fork.overlay.lock().unwrap().overlay.lock().unwrap().diff(&[]);
740    let diff = match diff {
741        Ok(d) => d,
742        Err(e) => {
743            return Err(DatabaseError(format!("Generate full state inverse diff failed: {e}")))
744        }
745    };
746    fork.diffs = vec![diff];
747
748    // Process the proposals sequence
749    for proposal in proposals {
750        info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", proposal.hash, proposal.block.header.height);
751
752        // Verify proposal
753        verify_fork_proposal(&mut fork, proposal, validator.verify_fees).await?;
754
755        // Append proposal
756        fork.append_proposal(proposal).await?;
757    }
758
759    // Remove the reorg diff from the fork
760    fork.diffs.remove(0);
761
762    Ok(fork)
763}