darkfid/task/
unknown_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use smol::{channel::Receiver, lock::RwLock};
25use tinyjson::JsonValue;
26use tracing::{debug, error, info};
27
28use darkfi::{
29    blockchain::BlockDifficulty,
30    net::{ChannelPtr, P2pPtr},
31    rpc::jsonrpc::JsonSubscriber,
32    util::{encoding::base64, time::Timestamp},
33    validator::{
34        consensus::{Fork, Proposal},
35        pow::PoWModule,
36        utils::{best_fork_index, header_rank},
37        verification::verify_fork_proposal,
38        ValidatorPtr,
39    },
40    Error, Result,
41};
42use darkfi_serial::serialize_async;
43
44use crate::proto::{
45    ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
46    ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
47    ProposalMessage, BATCH,
48};
49
50/// Background task to handle unknown proposals.
51pub async fn handle_unknown_proposals(
52    receiver: Receiver<(Proposal, u32)>,
53    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
54    unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
55    validator: ValidatorPtr,
56    p2p: P2pPtr,
57    proposals_sub: JsonSubscriber,
58    blocks_sub: JsonSubscriber,
59) -> Result<()> {
60    debug!(target: "darkfid::task::handle_unknown_proposal", "START");
61    loop {
62        // Wait for a new unknown proposal trigger
63        let (proposal, channel) = match receiver.recv().await {
64            Ok(m) => m,
65            Err(e) => {
66                debug!(
67                    target: "darkfid::task::handle_unknown_proposal",
68                    "recv fail: {e}"
69                );
70                continue
71            }
72        };
73
74        // Check if proposal exists in our queue
75        let lock = unknown_proposals.read().await;
76        let contains_proposal = lock.contains(proposal.hash.inner());
77        drop(lock);
78        if !contains_proposal {
79            debug!(
80                target: "darkfid::task::handle_unknown_proposal",
81                "Proposal {} is not in our unknown proposals queue.",
82                proposal.hash,
83            );
84            continue
85        };
86
87        // Increase channel counter
88        let mut lock = unknown_proposals_channels.write().await;
89        let channel_counter = if let Some((counter, timestamp)) = lock.get_mut(&channel) {
90            *counter += 1;
91            *timestamp = Timestamp::current_time().inner();
92            *counter
93        } else {
94            lock.insert(channel, (1, Timestamp::current_time().inner()));
95            1
96        };
97        drop(lock);
98
99        // Handle the unknown proposal
100        if handle_unknown_proposal(
101            &validator,
102            &p2p,
103            &proposals_sub,
104            &blocks_sub,
105            channel,
106            &proposal,
107        )
108        .await
109        {
110            // Ban channel if it exceeds 5 consecutive unknown proposals
111            if channel_counter > 5 {
112                if let Some(channel) = p2p.get_channel(channel) {
113                    channel.ban().await;
114                }
115                unknown_proposals_channels.write().await.remove(&channel);
116            }
117        };
118
119        // Remove proposal from the queue
120        let mut lock = unknown_proposals.write().await;
121        lock.remove(proposal.hash.inner());
122        drop(lock);
123    }
124}
125
126/// Background task to handle an unknown proposal.
127/// Returns a boolean flag indicate if we should ban the channel.
128async fn handle_unknown_proposal(
129    validator: &ValidatorPtr,
130    p2p: &P2pPtr,
131    proposals_sub: &JsonSubscriber,
132    blocks_sub: &JsonSubscriber,
133    channel: u32,
134    proposal: &Proposal,
135) -> bool {
136    // If proposal fork chain was not found, we ask our peer for its sequence
137    debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
138    let Some(channel) = p2p.get_channel(channel) else {
139        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
140        return false
141    };
142
143    // Communication setup
144    let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
145        debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
146        return true
147    };
148
149    // Grab last known block to create the request and execute it
150    let last = match validator.blockchain.last() {
151        Ok(l) => l,
152        Err(e) => {
153            error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
154            return false
155        }
156    };
157    let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
158    if let Err(e) = channel.send(&request).await {
159        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
160        return true
161    };
162
163    let comms_timeout =
164        p2p.settings().read_arc().await.outbound_connect_timeout(channel.address().scheme());
165
166    // Node waits for response
167    let response = match response_sub.receive_with_timeout(comms_timeout).await {
168        Ok(r) => r,
169        Err(e) => {
170            debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
171            return true
172        }
173    };
174    debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
175
176    // Verify and store retrieved proposals
177    debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
178
179    // Response should not be empty
180    if response.proposals.is_empty() {
181        debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
182        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
183    }
184
185    // Sequence length must correspond to requested height
186    if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
187        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
188        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
189    }
190
191    // First proposal must extend canonical
192    if response.proposals[0].block.header.previous != last.1 {
193        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
194        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
195    }
196
197    // Last proposal must be the same as the one requested
198    if response.proposals.last().unwrap().hash != proposal.hash {
199        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
200        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
201    }
202
203    // Process response proposals
204    for proposal in &response.proposals {
205        // Append proposal
206        match validator.append_proposal(proposal).await {
207            Ok(()) => { /* Do nothing */ }
208            // Skip already existing proposals
209            Err(Error::ProposalAlreadyExists) => continue,
210            Err(e) => {
211                debug!(
212                    target: "darkfid::task::handle_unknown_proposal",
213                    "Error while appending response proposal: {e}"
214                );
215                break;
216            }
217        };
218
219        // Broadcast proposal to rest nodes
220        let message = ProposalMessage(proposal.clone());
221        p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
222
223        // Notify proposals subscriber
224        let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
225        proposals_sub.notify(vec![enc_prop].into()).await;
226    }
227
228    false
229}
230
231/// Auxiliary function to handle a potential reorg.
232/// We first find our last common block with the peer,
233/// then grab the header sequence from that block until
234/// the proposal and check if it ranks higher than our
235/// current best ranking fork, to perform a reorg.
236/// Returns a boolean flag indicate if we should ban the
237/// channel.
238async fn handle_reorg(
239    validator: &ValidatorPtr,
240    p2p: &P2pPtr,
241    proposals_sub: &JsonSubscriber,
242    blocks_sub: &JsonSubscriber,
243    channel: ChannelPtr,
244    proposal: &Proposal,
245) -> bool {
246    info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
247
248    // Check if genesis proposal was provided
249    if proposal.block.header.height == 0 {
250        debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
251        return true
252    }
253
254    // Communication setup
255    let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
256        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
257        return true
258    };
259
260    // Keep track of received header hashes sequence
261    let mut peer_header_hashes = vec![];
262
263    // Find last common header, going backwards from the proposal
264    let mut previous_height = proposal.block.header.height;
265    let mut previous_hash = proposal.hash;
266    for height in (0..proposal.block.header.height).rev() {
267        // Request peer header hash for this height
268        let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
269        if let Err(e) = channel.send(&request).await {
270            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
271            return true
272        };
273
274        let comms_timeout =
275            p2p.settings().read_arc().await.outbound_connect_timeout(channel.address().scheme());
276        // Node waits for response
277        let response = match response_sub.receive_with_timeout(comms_timeout).await {
278            Ok(r) => r,
279            Err(e) => {
280                debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
281                return true
282            }
283        };
284        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
285
286        // Check if peer returned a header
287        let Some(peer_header) = response.fork_header else {
288            debug!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
289            return true
290        };
291
292        // Check if we know this header
293        let headers = match validator.blockchain.blocks.get_order(&[height], false) {
294            Ok(r) => r,
295            Err(e) => {
296                error!(target: "darkfid::task::handle_reorg", "Retrieving headers failed: {e}");
297                return false
298            }
299        };
300        match headers[0] {
301            Some(known_header) => {
302                if known_header == peer_header {
303                    previous_height = height;
304                    previous_hash = known_header;
305                    break
306                }
307                // Since we retrieve in right -> left order we push them in reverse order
308                peer_header_hashes.insert(0, peer_header);
309            }
310            None => peer_header_hashes.insert(0, peer_header),
311        }
312    }
313
314    // Check if we have a sequence to process
315    if peer_header_hashes.is_empty() {
316        debug!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
317        return true
318    }
319
320    // Communication setup
321    let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
322        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
323        return true
324    };
325
326    // Grab last common height ranks
327    let last_common_height = previous_height;
328    let last_difficulty = match previous_height {
329        0 => {
330            let genesis_timestamp = match validator.blockchain.genesis_block() {
331                Ok(b) => b.header.timestamp,
332                Err(e) => {
333                    error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
334                    return false
335                }
336            };
337            BlockDifficulty::genesis(genesis_timestamp)
338        }
339        _ => match validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
340            Ok(d) => d[0].clone().unwrap(),
341            Err(e) => {
342                error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
343                return false
344            }
345        },
346    };
347
348    // Create a new PoW from last common height
349    let module = match PoWModule::new(
350        validator.consensus.blockchain.clone(),
351        validator.consensus.module.read().await.target,
352        validator.consensus.module.read().await.fixed_difficulty.clone(),
353        Some(last_common_height + 1),
354    ) {
355        Ok(m) => m,
356        Err(e) => {
357            error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
358            return false
359        }
360    };
361
362    // Retrieve the headers of the hashes sequence, in batches, keeping track of the sequence ranking
363    info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
364    let mut batch = Vec::with_capacity(BATCH);
365    let mut total_processed = 0;
366    let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
367    let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
368    let mut headers_module = module.clone();
369    for (index, hash) in peer_header_hashes.iter().enumerate() {
370        // Add hash in batch sequence
371        batch.push(*hash);
372
373        // Check if batch is full so we can send it
374        if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
375            continue
376        }
377
378        // Request peer headers
379        let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
380        if let Err(e) = channel.send(&request).await {
381            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
382            return true
383        };
384
385        let comms_timeout =
386            p2p.settings().read_arc().await.outbound_connect_timeout(channel.address().scheme());
387        // Node waits for response
388        let response = match response_sub.receive_with_timeout(comms_timeout).await {
389            Ok(r) => r,
390            Err(e) => {
391                debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
392                return true
393            }
394        };
395        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
396
397        // Response sequence must be the same length as the one requested
398        if response.headers.len() != batch.len() {
399            debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
400            return true
401        }
402
403        // Process retrieved headers
404        for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
405            let peer_header_hash = peer_header.hash();
406            debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
407
408            // Validate its the header we requested
409            if peer_header_hash != batch[peer_header_index] {
410                debug!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
411                return true
412            }
413
414            // Validate sequence is correct
415            if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
416                debug!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
417                return true
418            }
419
420            // Verify header hash and calculate its rank
421            let (next_difficulty, target_distance_sq, hash_distance_sq) = match header_rank(
422                &headers_module,
423                peer_header,
424            ) {
425                Ok(tuple) => tuple,
426                Err(Error::PoWInvalidOutHash) => {
427                    debug!(target: "darkfid::task::handle_reorg", "Invalid header hash detected");
428                    return true
429                }
430                Err(e) => {
431                    debug!(target: "darkfid::task::handle_reorg", "Computing header rank failed: {e}");
432                    return false
433                }
434            };
435
436            // Update sequence ranking
437            targets_rank += target_distance_sq.clone();
438            hashes_rank += hash_distance_sq.clone();
439
440            // Update PoW headers module
441            if let Err(e) = headers_module.append(peer_header, &next_difficulty) {
442                debug!(target: "darkfid::task::handle_reorg", "Error while appending header to module: {e}");
443                return true
444            };
445
446            // Set previous header
447            previous_height = peer_header.height;
448            previous_hash = peer_header_hash;
449        }
450
451        total_processed += response.headers.len();
452        info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
453
454        // Reset batch
455        batch = Vec::with_capacity(BATCH);
456    }
457
458    // Check if the sequence ranks higher than our current best fork
459    let forks = validator.consensus.forks.read().await;
460    let index = match best_fork_index(&forks) {
461        Ok(i) => i,
462        Err(e) => {
463            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
464            return false
465        }
466    };
467    let best_fork = &forks[index];
468    if targets_rank < best_fork.targets_rank ||
469        (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
470    {
471        info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
472        drop(forks);
473        return true
474    }
475    drop(forks);
476
477    // Communication setup
478    let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
479        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
480        return true
481    };
482
483    // Create a fork from last common height
484    let mut peer_fork =
485        match Fork::new(validator.consensus.blockchain.clone(), module.clone()).await {
486            Ok(f) => f,
487            Err(e) => {
488                error!(target: "darkfid::task::handle_reorg", "Generating peer fork failed: {e}");
489                return false
490            }
491        };
492    peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
493    peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
494
495    // Grab all state inverse diffs after last common height, and add them to the fork
496    let inverse_diffs = match validator
497        .blockchain
498        .blocks
499        .get_state_inverse_diffs_after(last_common_height)
500    {
501        Ok(i) => i,
502        Err(e) => {
503            error!(target: "darkfid::task::handle_reorg", "Retrieving state inverse diffs failed: {e}");
504            peer_fork.purge_new_trees();
505            return false
506        }
507    };
508    for inverse_diff in inverse_diffs.iter().rev() {
509        if let Err(e) =
510            peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff)
511        {
512            error!(target: "darkfid::task::handle_reorg", "Applying inverse diff failed: {e}");
513            peer_fork.purge_new_trees();
514            return false
515        }
516    }
517
518    // Rebuild fork contracts states monotree
519    if let Err(e) = peer_fork.compute_monotree() {
520        error!(target: "darkfid::task::handle_reorg", "Rebuilding peer fork monotree failed: {e}");
521        peer_fork.purge_new_trees();
522        return false
523    }
524
525    // Retrieve the proposals of the hashes sequence, in batches
526    info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
527    let mut batch = Vec::with_capacity(BATCH);
528    let mut total_processed = 0;
529    for (index, hash) in peer_header_hashes.iter().enumerate() {
530        // Add hash in batch sequence
531        batch.push(*hash);
532
533        // Check if batch is full so we can send it
534        if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
535            continue
536        }
537
538        // Request peer proposals
539        let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
540        if let Err(e) = channel.send(&request).await {
541            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
542            peer_fork.purge_new_trees();
543            return true
544        };
545
546        let comms_timeout =
547            p2p.settings().read_arc().await.outbound_connect_timeout(channel.address().scheme());
548
549        // Node waits for response
550        let response = match response_sub.receive_with_timeout(comms_timeout).await {
551            Ok(r) => r,
552            Err(e) => {
553                debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
554                peer_fork.purge_new_trees();
555                return true
556            }
557        };
558        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
559
560        // Response sequence must be the same length as the one requested
561        if response.proposals.len() != batch.len() {
562            debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
563            peer_fork.purge_new_trees();
564            return true
565        }
566
567        // Process retrieved proposal
568        for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
569            info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
570
571            // Validate its the proposal we requested
572            if peer_proposal.hash != batch[peer_proposal_index] {
573                error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
574                peer_fork.purge_new_trees();
575                return true
576            }
577
578            // Verify proposal
579            if let Err(e) =
580                verify_fork_proposal(&mut peer_fork, peer_proposal, validator.verify_fees).await
581            {
582                error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
583                return true
584            }
585
586            // Append proposal
587            if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
588                error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
589                peer_fork.purge_new_trees();
590                return true
591            }
592        }
593
594        total_processed += response.proposals.len();
595        info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
596
597        // Reset batch
598        batch = Vec::with_capacity(BATCH);
599    }
600
601    // Verify trigger proposal
602    if let Err(e) = verify_fork_proposal(&mut peer_fork, proposal, validator.verify_fees).await {
603        error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
604        return true
605    }
606
607    // Append trigger proposal
608    if let Err(e) = peer_fork.append_proposal(proposal).await {
609        error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
610        peer_fork.purge_new_trees();
611        return true
612    }
613
614    // Check if the peer fork ranks higher than our current best fork
615    let mut forks = validator.consensus.forks.write().await;
616    let index = match best_fork_index(&forks) {
617        Ok(i) => i,
618        Err(e) => {
619            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
620            peer_fork.purge_new_trees();
621            return false
622        }
623    };
624    let best_fork = &forks[index];
625    if peer_fork.targets_rank < best_fork.targets_rank ||
626        (peer_fork.targets_rank == best_fork.targets_rank &&
627            peer_fork.hashes_rank <= best_fork.hashes_rank)
628    {
629        info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
630        peer_fork.purge_new_trees();
631        drop(forks);
632        return true
633    }
634
635    // Execute the reorg
636    info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
637    *validator.consensus.module.write().await = module;
638    *forks = vec![peer_fork];
639    drop(forks);
640
641    // Check if we can confirm anything and broadcast them
642    let confirmed = match validator.confirmation().await {
643        Ok(f) => f,
644        Err(e) => {
645            error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
646            return false
647        }
648    };
649
650    if !confirmed.is_empty() {
651        let mut notif_blocks = Vec::with_capacity(confirmed.len());
652        for block in confirmed {
653            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
654        }
655        blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
656    }
657
658    // Broadcast proposal to the network
659    let message = ProposalMessage(proposal.clone());
660    p2p.broadcast(&message).await;
661
662    // Notify proposals subscriber
663    let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
664    proposals_sub.notify(vec![enc_prop].into()).await;
665
666    false
667}