darkfid/proto/
protocol_tx.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use tinyjson::JsonValue;
22use tracing::{debug, error};
23
24use darkfi::{
25    net::{
26        protocol::protocol_generic::{
27            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
28        },
29        session::SESSION_DEFAULT,
30        P2pPtr,
31    },
32    rpc::jsonrpc::JsonSubscriber,
33    system::ExecutorPtr,
34    tx::Transaction,
35    util::encoding::base64,
36    validator::ValidatorPtr,
37    Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::registry::DarkfiMinersRegistryStatePtr;
42
43/// Atomic pointer to the `ProtocolTx` handler.
44pub type ProtocolTxHandlerPtr = Arc<ProtocolTxHandler>;
45
46/// Handler managing [`Transaction`] messages, over a generic P2P protocol.
47pub struct ProtocolTxHandler {
48    /// The generic handler for [`Transaction`] messages.
49    handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
50}
51
52impl ProtocolTxHandler {
53    /// Initialize a generic prototocol handler for [`Transaction`] messages
54    /// and registers it to the provided P2P network, using the default session flag.
55    pub async fn init(p2p: &P2pPtr) -> ProtocolTxHandlerPtr {
56        debug!(
57            target: "darkfid::proto::protocol_tx::init",
58            "Adding ProtocolTx to the protocol registry"
59        );
60
61        let handler = ProtocolGenericHandler::new(p2p, "ProtocolTx", SESSION_DEFAULT).await;
62
63        Arc::new(Self { handler })
64    }
65
66    /// Start the `ProtocolTx` background task.
67    pub async fn start(
68        &self,
69        executor: &ExecutorPtr,
70        validator: &ValidatorPtr,
71        registry_state: &DarkfiMinersRegistryStatePtr,
72        subscriber: JsonSubscriber,
73    ) -> Result<()> {
74        debug!(
75            target: "darkfid::proto::protocol_tx::start",
76            "Starting ProtocolTx handler task..."
77        );
78
79        self.handler.task.clone().start(
80            handle_receive_tx(self.handler.clone(), validator.clone(), registry_state.clone(), subscriber),
81            |res| async move {
82                match res {
83                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
84                    Err(e) => error!(target: "darkfid::proto::protocol_tx::start", "Failed starting ProtocolTx handler task: {e}"),
85                }
86            },
87            Error::DetachedTaskStopped,
88            executor.clone(),
89        );
90
91        debug!(
92            target: "darkfid::proto::protocol_tx::start",
93            "ProtocolTx handler task started!"
94        );
95
96        Ok(())
97    }
98
99    /// Stop the `ProtocolTx` background task.
100    pub async fn stop(&self) {
101        debug!(target: "darkfid::proto::protocol_tx::stop", "Terminating ProtocolTx handler task...");
102        self.handler.task.stop().await;
103        debug!(target: "darkfid::proto::protocol_tx::stop", "ProtocolTx handler task terminated!");
104    }
105}
106
107/// Background handler function for ProtocolTx.
108async fn handle_receive_tx(
109    handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
110    validator: ValidatorPtr,
111    registry_state: DarkfiMinersRegistryStatePtr,
112    subscriber: JsonSubscriber,
113) -> Result<()> {
114    debug!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "START");
115    loop {
116        // Wait for a new transaction message
117        let (channel, tx) = match handler.receiver.recv().await {
118            Ok(r) => r,
119            Err(e) => {
120                debug!(
121                    target: "darkfid::proto::protocol_tx::handle_receive_tx",
122                    "recv fail: {e}"
123                );
124                continue
125            }
126        };
127
128        // Check if node has finished syncing its blockchain
129        let mut validator = validator.write().await;
130        if !validator.synced {
131            debug!(
132                target: "darkfid::proto::protocol_tx::handle_receive_tx",
133                "Node still syncing blockchain, skipping..."
134            );
135            handler.send_action(channel, ProtocolGenericAction::Skip).await;
136            continue
137        }
138
139        // Append transaction
140        let result = validator.append_tx(&tx, true).await;
141
142        // Purge all unreferenced contract trees from the database
143        if let Err(e) = validator
144            .consensus
145            .purge_unreferenced_trees(&mut registry_state.read().await.new_trees())
146            .await
147        {
148            error!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "Purging unreferenced contract trees from the database failed: {e}");
149            handler.send_action(channel, ProtocolGenericAction::Skip).await;
150            continue
151        }
152
153        // Handle result
154        if let Err(e) = result {
155            debug!(
156                target: "darkfid::proto::protocol_tx::handle_receive_tx",
157                "append_tx fail: {e}"
158            );
159            handler.send_action(channel, ProtocolGenericAction::Skip).await;
160            continue
161        }
162
163        // Signal handler to broadcast the valid transaction to rest nodes
164        handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
165
166        // Notify subscriber
167        let encoded_tx = JsonValue::String(base64::encode(&serialize_async(&tx).await));
168        subscriber.notify(vec![encoded_tx].into()).await;
169    }
170}