darkfid/proto/
protocol_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use async_trait::async_trait;
25use smol::{channel::Sender, lock::RwLock};
26use tinyjson::JsonValue;
27use tracing::{debug, error};
28
29use darkfi::{
30    impl_p2p_message,
31    net::{
32        metering::MeteringConfiguration,
33        protocol::protocol_generic::{
34            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
35        },
36        session::SESSION_DEFAULT,
37        Message, P2pPtr,
38    },
39    rpc::jsonrpc::JsonSubscriber,
40    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
41    util::{
42        encoding::base64,
43        time::{NanoTimestamp, Timestamp},
44    },
45    validator::{consensus::Proposal, ValidatorPtr},
46    Error, Result,
47};
48use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
49
50use crate::{task::handle_unknown_proposals, DarkfiNodePtr};
51
52/// Auxiliary [`Proposal`] wrapper structure used for messaging.
53#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
54pub struct ProposalMessage(pub Proposal);
55
56// TODO: Fine tune
57// Since messages are asynchronous we will define loose rules to prevent spamming.
58// Each message score will be 1, with a threshold of 50 and expiry time of 5.
59// We are not limiting `Proposal` size.
60impl_p2p_message!(
61    ProposalMessage,
62    "proposal",
63    0,
64    1,
65    MeteringConfiguration {
66        threshold: 50,
67        sleep_step: 500,
68        expiry_time: NanoTimestamp::from_secs(5),
69    }
70);
71
72/// Atomic pointer to the `ProtocolProposal` handler.
73pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
74
75/// Handler managing [`Proposal`] messages, over a generic P2P
76/// protocol.
77pub struct ProtocolProposalHandler {
78    /// The generic handler for [`Proposal`] messages.
79    proposals_handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
80    /// Unknown proposals queue to be checked for reorg.
81    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
82    /// Unknown proposals channels to ban them after 5 consecutive
83    /// unknown proposals. Records expire after a 10 minutes timeframe.
84    unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
85    /// Handler background task to process unknown proposals queue.
86    unknown_proposals_handler: StoppableTaskPtr,
87}
88
89impl ProtocolProposalHandler {
90    /// Initialize a generic prototocol handler for [`Proposal`] messages
91    /// and registers it to the provided P2P network, using the default session flag.
92    pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
93        debug!(
94            target: "darkfid::proto::protocol_proposal::init",
95            "Adding ProtocolProposal to the protocol registry"
96        );
97
98        let proposals_handler =
99            ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
100        let unknown_proposals = Arc::new(RwLock::new(HashSet::new()));
101        let unknown_proposals_channels = Arc::new(RwLock::new(HashMap::new()));
102        let unknown_proposals_handler = StoppableTask::new();
103
104        Arc::new(Self {
105            proposals_handler,
106            unknown_proposals,
107            unknown_proposals_channels,
108            unknown_proposals_handler,
109        })
110    }
111
112    /// Start the `ProtocolProposal` background task.
113    pub async fn start(&self, executor: &ExecutorPtr, node: &DarkfiNodePtr) -> Result<()> {
114        debug!(
115            target: "darkfid::proto::protocol_proposal::start",
116            "Starting ProtocolProposal handler task..."
117        );
118
119        // Generate the message queue smol channel
120        let (sender, receiver) = smol::channel::unbounded::<(Proposal, u32)>();
121
122        // Start the unkown proposals handler task
123        self.unknown_proposals_handler.clone().start(
124            handle_unknown_proposals(receiver, self.unknown_proposals.clone(), self.unknown_proposals_channels.clone(), node.clone()),
125            |res| async move {
126                match res {
127                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
128                    Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting unknown proposals handler task: {e}"),
129                }
130            },
131            Error::DetachedTaskStopped,
132            executor.clone(),
133        );
134
135        // Start the proposals handler task
136        self.proposals_handler.task.clone().start(
137            handle_receive_proposal(self.proposals_handler.clone(), sender, self.unknown_proposals.clone(), self.unknown_proposals_channels.clone(), node.validator.clone(), node.subscribers.get("proposals").unwrap().clone()),
138            |res| async move {
139                match res {
140                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
141                    Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
142                }
143            },
144            Error::DetachedTaskStopped,
145            executor.clone(),
146        );
147
148        debug!(
149            target: "darkfid::proto::protocol_proposal::start",
150            "ProtocolProposal handler task started!"
151        );
152
153        Ok(())
154    }
155
156    /// Stop the `ProtocolProposal` background tasks.
157    pub async fn stop(&self) {
158        debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
159        self.unknown_proposals_handler.stop().await;
160        self.proposals_handler.task.stop().await;
161        let mut unknown_proposals = self.unknown_proposals.write().await;
162        *unknown_proposals = HashSet::new();
163        drop(unknown_proposals);
164        debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
165    }
166}
167
168/// Background handler function for ProtocolProposal.
169async fn handle_receive_proposal(
170    handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
171    sender: Sender<(Proposal, u32)>,
172    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
173    unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
174    validator: ValidatorPtr,
175    proposals_sub: JsonSubscriber,
176) -> Result<()> {
177    debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
178    loop {
179        // Wait for a new proposal message
180        let (channel, proposal) = match handler.receiver.recv().await {
181            Ok(r) => r,
182            Err(e) => {
183                debug!(
184                    target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
185                    "recv fail: {e}"
186                );
187                continue
188            }
189        };
190
191        // Check if node has finished syncing its blockchain
192        if !*validator.synced.read().await {
193            debug!(
194                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
195                "Node still syncing blockchain, skipping..."
196            );
197            handler.send_action(channel, ProtocolGenericAction::Skip).await;
198            continue
199        }
200
201        // Cleanup expired unknown proposals channels records if queue
202        // is empty and we have records.
203        if unknown_proposals.read().await.is_empty() {
204            let mut unknown_proposals_channels = unknown_proposals_channels.write().await;
205            if !unknown_proposals_channels.is_empty() {
206                let now = Timestamp::current_time().inner();
207                let mut expired_channels = vec![];
208                for (channel, (_, timestamp)) in unknown_proposals_channels.iter() {
209                    if now - timestamp >= 600 {
210                        expired_channels.push(*channel);
211                    }
212                }
213                unknown_proposals_channels.retain(|channel, _| !expired_channels.contains(channel));
214            }
215            drop(unknown_proposals_channels);
216        }
217
218        // Append proposal
219        match validator.append_proposal(&proposal.0).await {
220            Ok(()) => {
221                // Signal handler to broadcast the valid proposal to rest nodes
222                handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
223
224                // Notify proposals subscriber
225                let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
226                proposals_sub.notify(vec![enc_prop].into()).await;
227
228                // Drop channel from unknown proposals channels records
229                unknown_proposals_channels.write().await.remove(&channel);
230
231                continue
232            }
233            Err(e) => {
234                debug!(
235                    target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
236                    "append_proposal fail: {e}",
237                );
238
239                handler.send_action(channel, ProtocolGenericAction::Skip).await;
240
241                match e {
242                    Error::ExtendedChainIndexNotFound => { /* Do nothing */ }
243                    _ => continue,
244                }
245            }
246        };
247
248        // Check if we already have the unknown proposal record in our
249        // queue.
250        let mut lock = unknown_proposals.write().await;
251        if lock.contains(proposal.0.hash.inner()) {
252            debug!(
253                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
254                "Proposal {} is already in our unknown proposals queue.",
255                proposal.0.hash,
256            );
257            drop(lock);
258            continue
259        }
260
261        // Insert new record in our queue
262        lock.insert(proposal.0.hash.0);
263        drop(lock);
264
265        // Notify the unknown proposals handler task
266        if let Err(e) = sender.send((proposal.0, channel)).await {
267            debug!(
268                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
269                "Channel {channel} send fail: {e}"
270            );
271        };
272    }
273}