darkfid/
lib.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use smol::lock::Mutex;
25use tracing::{debug, error, info};
26
27use darkfi::{
28    net::settings::Settings,
29    rpc::{
30        jsonrpc::JsonSubscriber,
31        server::{listen_and_serve, RequestHandler},
32        settings::RpcSettings,
33    },
34    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
35    validator::{Validator, ValidatorConfig, ValidatorPtr},
36    Error, Result,
37};
38use darkfi_sdk::crypto::keypair::Network;
39
40#[cfg(test)]
41mod tests;
42
43mod error;
44use error::{server_error, RpcError};
45
46/// JSON-RPC requests handler and methods
47mod rpc;
48use rpc::{management::ManagementRpcHandler, DefaultRpcHandler};
49
50/// Validator async tasks
51pub mod task;
52use task::{consensus::ConsensusInitTaskConfig, consensus_init_task};
53
54/// P2P net protocols
55mod proto;
56use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
57
58/// Miners registry
59mod registry;
60use registry::{DarkfiMinersRegistry, DarkfiMinersRegistryPtr};
61
62/// Atomic pointer to the DarkFi node
63pub type DarkfiNodePtr = Arc<DarkfiNode>;
64
65/// Structure representing a DarkFi node
66pub struct DarkfiNode {
67    /// Validator(node) pointer
68    validator: ValidatorPtr,
69    /// P2P network protocols handler
70    p2p_handler: DarkfidP2pHandlerPtr,
71    /// Node miners registry pointer
72    registry: DarkfiMinersRegistryPtr,
73    /// Garbage collection task transactions batch size
74    txs_batch_size: usize,
75    /// A map of various subscribers exporting live info from the blockchain
76    subscribers: HashMap<&'static str, JsonSubscriber>,
77    /// Main JSON-RPC connection tracker
78    rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
79    /// Management JSON-RPC connection tracker
80    management_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
81}
82
83impl DarkfiNode {
84    pub async fn new(
85        validator: ValidatorPtr,
86        p2p_handler: DarkfidP2pHandlerPtr,
87        registry: DarkfiMinersRegistryPtr,
88        txs_batch_size: usize,
89        subscribers: HashMap<&'static str, JsonSubscriber>,
90    ) -> Result<DarkfiNodePtr> {
91        Ok(Arc::new(Self {
92            validator,
93            p2p_handler,
94            registry,
95            txs_batch_size,
96            subscribers,
97            rpc_connections: Mutex::new(HashSet::new()),
98            management_rpc_connections: Mutex::new(HashSet::new()),
99        }))
100    }
101}
102
103/// Atomic pointer to the DarkFi daemon
104pub type DarkfidPtr = Arc<Darkfid>;
105
106/// Structure representing a DarkFi daemon
107pub struct Darkfid {
108    /// Darkfi node instance
109    node: DarkfiNodePtr,
110    /// `dnet` background task
111    dnet_task: StoppableTaskPtr,
112    /// Main JSON-RPC background task
113    rpc_task: StoppableTaskPtr,
114    /// Management JSON-RPC background task
115    management_rpc_task: StoppableTaskPtr,
116    /// Consensus protocol background task
117    consensus_task: StoppableTaskPtr,
118}
119
120impl Darkfid {
121    /// Initialize a DarkFi daemon.
122    ///
123    /// Generates a new `DarkfiNode` for provided configuration,
124    /// along with all the corresponding background tasks.
125    pub async fn init(
126        network: Network,
127        sled_db: &sled_overlay::sled::Db,
128        config: &ValidatorConfig,
129        net_settings: &Settings,
130        txs_batch_size: &Option<usize>,
131        ex: &ExecutorPtr,
132    ) -> Result<DarkfidPtr> {
133        info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
134        // Initialize validator
135        let validator = Validator::new(sled_db, config).await?;
136
137        // Initialize P2P network
138        let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
139
140        // Initialize the miners registry
141        let registry = DarkfiMinersRegistry::init(network, &validator)?;
142
143        // Grab blockchain network configured transactions batch size for garbage collection
144        let txs_batch_size = match txs_batch_size {
145            Some(b) => {
146                if *b > 0 {
147                    *b
148                } else {
149                    50
150                }
151            }
152            None => 50,
153        };
154
155        // Here we initialize various subscribers that can export live blockchain/consensus data.
156        let mut subscribers = HashMap::new();
157        subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
158        subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
159        subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
160        subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
161
162        // Initialize node
163        let node =
164            DarkfiNode::new(validator, p2p_handler, registry, txs_batch_size, subscribers).await?;
165
166        // Generate the background tasks
167        let dnet_task = StoppableTask::new();
168        let rpc_task = StoppableTask::new();
169        let management_rpc_task = StoppableTask::new();
170        let consensus_task = StoppableTask::new();
171
172        info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
173
174        Ok(Arc::new(Self { node, dnet_task, rpc_task, management_rpc_task, consensus_task }))
175    }
176
177    /// Start the DarkFi daemon in the given executor, using the
178    /// provided JSON-RPC settings and consensus initialization
179    /// configuration.
180    pub async fn start(
181        &self,
182        executor: &ExecutorPtr,
183        rpc_settings: &RpcSettings,
184        management_rpc_settings: &RpcSettings,
185        stratum_rpc_settings: &Option<RpcSettings>,
186        mm_rpc_settings: &Option<RpcSettings>,
187        config: &ConsensusInitTaskConfig,
188    ) -> Result<()> {
189        info!(target: "darkfid::Darkfid::start", "Starting Darkfi daemon...");
190
191        // Start the `dnet` task
192        info!(target: "darkfid::Darkfid::start", "Starting dnet subs task");
193        let dnet_sub_ = self.node.subscribers.get("dnet").unwrap().clone();
194        let p2p_ = self.node.p2p_handler.p2p.clone();
195        self.dnet_task.clone().start(
196            async move {
197                let dnet_sub = p2p_.dnet_subscribe().await;
198                loop {
199                    let event = dnet_sub.receive().await;
200                    debug!(target: "darkfid::Darkfid::dnet_task", "Got dnet event: {event:?}");
201                    dnet_sub_.notify(vec![event.into()].into()).await;
202                }
203            },
204            |res| async {
205                match res {
206                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
207                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting dnet subs task: {e}"),
208                }
209            },
210            Error::DetachedTaskStopped,
211            executor.clone(),
212        );
213
214        // Start the main JSON-RPC task
215        info!(target: "darkfid::Darkfid::start", "Starting main JSON-RPC server");
216        let node_ = self.node.clone();
217        self.rpc_task.clone().start(
218            listen_and_serve::<DefaultRpcHandler>(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
219            |res| async move {
220                match res {
221                    Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<DefaultRpcHandler>>::stop_connections(&node_).await,
222                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting main JSON-RPC server: {e}"),
223                }
224            },
225            Error::RpcServerStopped,
226            executor.clone(),
227        );
228
229        // Start the management JSON-RPC task
230        info!(target: "darkfid::Darkfid::start", "Starting management JSON-RPC server");
231        let node_ = self.node.clone();
232        self.management_rpc_task.clone().start(
233            listen_and_serve::<ManagementRpcHandler>(management_rpc_settings.clone(), self.node.clone(), None, executor.clone()),
234            |res| async move {
235                match res {
236                    Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<ManagementRpcHandler>>::stop_connections(&node_).await,
237                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting management JSON-RPC server: {e}"),
238                }
239            },
240            Error::RpcServerStopped,
241            executor.clone(),
242        );
243
244        // Start the miners registry
245        info!(target: "darkfid::Darkfid::start", "Starting miners registry");
246        self.node.registry.start(executor, &self.node, stratum_rpc_settings, mm_rpc_settings)?;
247
248        // Start the P2P network
249        info!(target: "darkfid::Darkfid::start", "Starting P2P network");
250        self.node.p2p_handler.start(executor, &self.node.validator, &self.node.subscribers).await?;
251
252        // Start the consensus protocol
253        info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
254        self.consensus_task.clone().start(
255            consensus_init_task(
256                self.node.clone(),
257                config.clone(),
258                executor.clone(),
259            ),
260            |res| async move {
261                match res {
262                    Ok(()) | Err(Error::ConsensusTaskStopped) | Err(Error::MinerTaskStopped) => { /* Do nothing */ }
263                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting consensus initialization task: {e}"),
264                }
265            },
266            Error::ConsensusTaskStopped,
267            executor.clone(),
268        );
269
270        info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
271        Ok(())
272    }
273
274    /// Stop the DarkFi daemon.
275    pub async fn stop(&self) -> Result<()> {
276        info!(target: "darkfid::Darkfid::stop", "Terminating Darkfi daemon...");
277
278        // Stop the `dnet` node
279        info!(target: "darkfid::Darkfid::stop", "Stopping dnet subs task...");
280        self.dnet_task.stop().await;
281
282        // Stop the main JSON-RPC task
283        info!(target: "darkfid::Darkfid::stop", "Stopping main JSON-RPC server...");
284        self.rpc_task.stop().await;
285
286        // Stop the management JSON-RPC task
287        info!(target: "darkfid::Darkfid::stop", "Stopping management JSON-RPC server...");
288        self.management_rpc_task.stop().await;
289
290        // Stop the miners registry
291        info!(target: "darkfid::Darkfid::stop", "Stopping miners registry...");
292        self.node.registry.stop().await;
293
294        // Stop the P2P network
295        info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
296        self.node.p2p_handler.stop().await;
297
298        // Stop the consensus task
299        info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
300        self.consensus_task.stop().await;
301
302        // Flush sled database data
303        info!(target: "darkfid::Darkfid::stop", "Flushing sled database...");
304        let flushed_bytes = self.node.validator.blockchain.sled_db.flush_async().await?;
305        info!(target: "darkfid::Darkfid::stop", "Flushed {flushed_bytes} bytes");
306
307        info!(target: "darkfid::Darkfid::stop", "Darkfi daemon terminated successfully!");
308        Ok(())
309    }
310}