darkfid/proto/
protocol_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use async_trait::async_trait;
25use smol::{channel::Sender, lock::RwLock};
26use tinyjson::JsonValue;
27use tracing::{debug, error};
28
29use darkfi::{
30    impl_p2p_message,
31    net::{
32        metering::MeteringConfiguration,
33        protocol::protocol_generic::{
34            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
35        },
36        session::SESSION_DEFAULT,
37        Message, P2pPtr,
38    },
39    rpc::jsonrpc::JsonSubscriber,
40    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
41    util::{
42        encoding::base64,
43        time::{NanoTimestamp, Timestamp},
44    },
45    validator::{consensus::Proposal, ValidatorPtr},
46    Error, Result,
47};
48use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
49
50use crate::task::handle_unknown_proposals;
51
52/// Auxiliary [`Proposal`] wrapper structure used for messaging.
53#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
54pub struct ProposalMessage(pub Proposal);
55
56// TODO: Fine tune
57// Since messages are asynchronous we will define loose rules to prevent spamming.
58// Each message score will be 1, with a threshold of 50 and expiry time of 5.
59// We are not limiting `Proposal` size.
60impl_p2p_message!(
61    ProposalMessage,
62    "proposal",
63    0,
64    1,
65    MeteringConfiguration {
66        threshold: 50,
67        sleep_step: 500,
68        expiry_time: NanoTimestamp::from_secs(5),
69    }
70);
71
72/// Atomic pointer to the `ProtocolProposal` handler.
73pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
74
75/// Handler managing [`Proposal`] messages, over a generic P2P
76/// protocol.
77pub struct ProtocolProposalHandler {
78    /// The generic handler for [`Proposal`] messages.
79    proposals_handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
80    /// Unknown proposals queue to be checked for reorg.
81    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
82    /// Unknown proposals channels to ban them after 5 consecutive
83    /// unknown proposals. Records expire after a 10 minutes timeframe.
84    unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
85    /// Handler background task to process unknown proposals queue.
86    unknown_proposals_handler: StoppableTaskPtr,
87}
88
89impl ProtocolProposalHandler {
90    /// Initialize a generic prototocol handler for [`Proposal`] messages
91    /// and registers it to the provided P2P network, using the default session flag.
92    pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
93        debug!(
94            target: "darkfid::proto::protocol_proposal::init",
95            "Adding ProtocolProposal to the protocol registry"
96        );
97
98        let proposals_handler =
99            ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
100        let unknown_proposals = Arc::new(RwLock::new(HashSet::new()));
101        let unknown_proposals_channels = Arc::new(RwLock::new(HashMap::new()));
102        let unknown_proposals_handler = StoppableTask::new();
103
104        Arc::new(Self {
105            proposals_handler,
106            unknown_proposals,
107            unknown_proposals_channels,
108            unknown_proposals_handler,
109        })
110    }
111
112    /// Start the `ProtocolProposal` background task.
113    pub async fn start(
114        &self,
115        executor: &ExecutorPtr,
116        validator: &ValidatorPtr,
117        p2p: &P2pPtr,
118        proposals_sub: JsonSubscriber,
119        blocks_sub: JsonSubscriber,
120    ) -> Result<()> {
121        debug!(
122            target: "darkfid::proto::protocol_proposal::start",
123            "Starting ProtocolProposal handler task..."
124        );
125
126        // Generate the message queue smol channel
127        let (sender, receiver) = smol::channel::unbounded::<(Proposal, u32)>();
128
129        // Start the unkown proposals handler task
130        self.unknown_proposals_handler.clone().start(
131            handle_unknown_proposals(receiver, self.unknown_proposals.clone(), self.unknown_proposals_channels.clone(), validator.clone(), p2p.clone(), proposals_sub.clone(), blocks_sub),
132            |res| async move {
133                match res {
134                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
135                    Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting unknown proposals handler task: {e}"),
136                }
137            },
138            Error::DetachedTaskStopped,
139            executor.clone(),
140        );
141
142        // Start the proposals handler task
143        self.proposals_handler.task.clone().start(
144            handle_receive_proposal(self.proposals_handler.clone(), sender, self.unknown_proposals.clone(), self.unknown_proposals_channels.clone(), validator.clone(), proposals_sub),
145            |res| async move {
146                match res {
147                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
148                    Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
149                }
150            },
151            Error::DetachedTaskStopped,
152            executor.clone(),
153        );
154
155        debug!(
156            target: "darkfid::proto::protocol_proposal::start",
157            "ProtocolProposal handler task started!"
158        );
159
160        Ok(())
161    }
162
163    /// Stop the `ProtocolProposal` background tasks.
164    pub async fn stop(&self) {
165        debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
166        self.unknown_proposals_handler.stop().await;
167        self.proposals_handler.task.stop().await;
168        let mut unknown_proposals = self.unknown_proposals.write().await;
169        *unknown_proposals = HashSet::new();
170        drop(unknown_proposals);
171        debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
172    }
173}
174
175/// Background handler function for ProtocolProposal.
176async fn handle_receive_proposal(
177    handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
178    sender: Sender<(Proposal, u32)>,
179    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
180    unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
181    validator: ValidatorPtr,
182    proposals_sub: JsonSubscriber,
183) -> Result<()> {
184    debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
185    loop {
186        // Wait for a new proposal message
187        let (channel, proposal) = match handler.receiver.recv().await {
188            Ok(r) => r,
189            Err(e) => {
190                debug!(
191                    target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
192                    "recv fail: {e}"
193                );
194                continue
195            }
196        };
197
198        // Check if node has finished syncing its blockchain
199        if !*validator.synced.read().await {
200            debug!(
201                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
202                "Node still syncing blockchain, skipping..."
203            );
204            handler.send_action(channel, ProtocolGenericAction::Skip).await;
205            continue
206        }
207
208        // Cleanup expired unknown proposals channels records if queue
209        // is empty and we have records.
210        if unknown_proposals.read().await.is_empty() {
211            let mut unknown_proposals_channels = unknown_proposals_channels.write().await;
212            if !unknown_proposals_channels.is_empty() {
213                let now = Timestamp::current_time().inner();
214                let mut expired_channels = vec![];
215                for (channel, (_, timestamp)) in unknown_proposals_channels.iter() {
216                    if now - timestamp >= 600 {
217                        expired_channels.push(*channel);
218                    }
219                }
220                unknown_proposals_channels.retain(|channel, _| !expired_channels.contains(channel));
221            }
222            drop(unknown_proposals_channels);
223        }
224
225        // Append proposal
226        match validator.append_proposal(&proposal.0).await {
227            Ok(()) => {
228                // Signal handler to broadcast the valid proposal to rest nodes
229                handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
230
231                // Notify proposals subscriber
232                let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
233                proposals_sub.notify(vec![enc_prop].into()).await;
234
235                // Drop channel from unknown proposals channels records
236                unknown_proposals_channels.write().await.remove(&channel);
237
238                continue
239            }
240            Err(e) => {
241                debug!(
242                    target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
243                    "append_proposal fail: {e}",
244                );
245
246                handler.send_action(channel, ProtocolGenericAction::Skip).await;
247
248                match e {
249                    Error::ExtendedChainIndexNotFound => { /* Do nothing */ }
250                    _ => continue,
251                }
252            }
253        };
254
255        // Check if we already have the unknown proposal record in our
256        // queue.
257        let mut lock = unknown_proposals.write().await;
258        if lock.contains(proposal.0.hash.inner()) {
259            debug!(
260                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
261                "Proposal {} is already in our unknown proposals queue.",
262                proposal.0.hash,
263            );
264            drop(lock);
265            continue
266        }
267
268        // Insert new record in our queue
269        lock.insert(proposal.0.hash.0);
270        drop(lock);
271
272        // Notify the unknown proposals handler task
273        if let Err(e) = sender.send((proposal.0, channel)).await {
274            debug!(
275                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
276                "Channel {channel} send fail: {e}"
277            );
278        };
279    }
280}