darkfid/
lib.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use smol::lock::Mutex;
25use tracing::{debug, error, info};
26
27use darkfi::{
28    net::settings::Settings,
29    rpc::{
30        jsonrpc::JsonSubscriber,
31        server::{listen_and_serve, RequestHandler},
32        settings::RpcSettings,
33    },
34    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
35    validator::{Validator, ValidatorConfig, ValidatorPtr},
36    Error, Result,
37};
38use darkfi_sdk::crypto::keypair::Network;
39
40#[cfg(test)]
41mod tests;
42
43mod error;
44use error::{server_error, RpcError};
45
46/// JSON-RPC requests handler and methods
47mod rpc;
48use rpc::{management::ManagementRpcHandler, DefaultRpcHandler};
49
50/// Validator async tasks
51pub mod task;
52use task::{consensus::ConsensusInitTaskConfig, consensus_init_task, garbage_collect_task};
53
54/// P2P net protocols
55mod proto;
56use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
57
58/// Miners registry
59mod registry;
60use registry::{DarkfiMinersRegistry, DarkfiMinersRegistryPtr};
61
62/// Atomic pointer to the DarkFi node
63pub type DarkfiNodePtr = Arc<DarkfiNode>;
64
65/// Structure representing a DarkFi node
66pub struct DarkfiNode {
67    /// Validator(node) pointer
68    validator: ValidatorPtr,
69    /// P2P network protocols handler
70    p2p_handler: DarkfidP2pHandlerPtr,
71    /// Node miners registry pointer
72    registry: DarkfiMinersRegistryPtr,
73    /// A map of various subscribers exporting live info from the blockchain
74    subscribers: HashMap<&'static str, JsonSubscriber>,
75    /// Main JSON-RPC connection tracker
76    rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
77    /// Management JSON-RPC connection tracker
78    management_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
79}
80
81impl DarkfiNode {
82    pub async fn new(
83        validator: ValidatorPtr,
84        p2p_handler: DarkfidP2pHandlerPtr,
85        registry: DarkfiMinersRegistryPtr,
86        subscribers: HashMap<&'static str, JsonSubscriber>,
87    ) -> Result<DarkfiNodePtr> {
88        Ok(Arc::new(Self {
89            validator,
90            p2p_handler,
91            registry,
92            subscribers,
93            rpc_connections: Mutex::new(HashSet::new()),
94            management_rpc_connections: Mutex::new(HashSet::new()),
95        }))
96    }
97}
98
99/// Atomic pointer to the DarkFi daemon
100pub type DarkfidPtr = Arc<Darkfid>;
101
102/// Structure representing a DarkFi daemon
103pub struct Darkfid {
104    /// Darkfi node instance
105    node: DarkfiNodePtr,
106    /// `dnet` background task
107    dnet_task: StoppableTaskPtr,
108    /// Main JSON-RPC background task
109    rpc_task: StoppableTaskPtr,
110    /// Management JSON-RPC background task
111    management_rpc_task: StoppableTaskPtr,
112    /// Consensus protocol background task
113    consensus_task: StoppableTaskPtr,
114    /// Node garbage collection background task
115    gc_task: StoppableTaskPtr,
116}
117
118impl Darkfid {
119    /// Initialize a DarkFi daemon.
120    ///
121    /// Generates a new `DarkfiNode` for provided configuration,
122    /// along with all the corresponding background tasks.
123    pub async fn init(
124        network: Network,
125        sled_db: &sled_overlay::sled::Db,
126        config: &ValidatorConfig,
127        net_settings: &Settings,
128        ex: &ExecutorPtr,
129    ) -> Result<DarkfidPtr> {
130        info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
131        // Initialize validator
132        let validator = Validator::new(sled_db, config).await?;
133
134        // Initialize P2P network
135        let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
136
137        // Initialize the miners registry
138        let registry = DarkfiMinersRegistry::init(network, &validator).await?;
139
140        // Here we initialize various subscribers that can export live blockchain/consensus data.
141        let mut subscribers = HashMap::new();
142        subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
143        subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
144        subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
145        subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
146
147        // Initialize node
148        let node = DarkfiNode::new(validator, p2p_handler, registry, subscribers).await?;
149
150        // Generate the background tasks
151        let dnet_task = StoppableTask::new();
152        let rpc_task = StoppableTask::new();
153        let management_rpc_task = StoppableTask::new();
154        let consensus_task = StoppableTask::new();
155        let gc_task = StoppableTask::new();
156
157        info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
158
159        Ok(Arc::new(Self {
160            node,
161            dnet_task,
162            rpc_task,
163            management_rpc_task,
164            consensus_task,
165            gc_task,
166        }))
167    }
168
169    /// Start the DarkFi daemon in the given executor, using the
170    /// provided JSON-RPC settings and consensus initialization
171    /// configuration.
172    pub async fn start(
173        &self,
174        executor: &ExecutorPtr,
175        rpc_settings: &RpcSettings,
176        management_rpc_settings: &RpcSettings,
177        stratum_rpc_settings: &Option<RpcSettings>,
178        mm_rpc_settings: &Option<RpcSettings>,
179        config: &ConsensusInitTaskConfig,
180    ) -> Result<()> {
181        info!(target: "darkfid::Darkfid::start", "Starting Darkfi daemon...");
182
183        // Start the `dnet` task
184        info!(target: "darkfid::Darkfid::start", "Starting dnet subs task");
185        let dnet_sub_ = self.node.subscribers.get("dnet").unwrap().clone();
186        let p2p_ = self.node.p2p_handler.p2p.clone();
187        self.dnet_task.clone().start(
188            async move {
189                let dnet_sub = p2p_.dnet_subscribe().await;
190                loop {
191                    let event = dnet_sub.receive().await;
192                    debug!(target: "darkfid::Darkfid::dnet_task", "Got dnet event: {event:?}");
193                    dnet_sub_.notify(vec![event.into()].into()).await;
194                }
195            },
196            |res| async {
197                match res {
198                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
199                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting dnet subs task: {e}"),
200                }
201            },
202            Error::DetachedTaskStopped,
203            executor.clone(),
204        );
205
206        // Start the main JSON-RPC task
207        info!(target: "darkfid::Darkfid::start", "Starting main JSON-RPC server");
208        let node_ = self.node.clone();
209        self.rpc_task.clone().start(
210            listen_and_serve::<DefaultRpcHandler>(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
211            |res| async move {
212                match res {
213                    Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<DefaultRpcHandler>>::stop_connections(&node_).await,
214                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting main JSON-RPC server: {e}"),
215                }
216            },
217            Error::RpcServerStopped,
218            executor.clone(),
219        );
220
221        // Start the management JSON-RPC task
222        info!(target: "darkfid::Darkfid::start", "Starting management JSON-RPC server");
223        let node_ = self.node.clone();
224        self.management_rpc_task.clone().start(
225            listen_and_serve::<ManagementRpcHandler>(management_rpc_settings.clone(), self.node.clone(), None, executor.clone()),
226            |res| async move {
227                match res {
228                    Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<ManagementRpcHandler>>::stop_connections(&node_).await,
229                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting management JSON-RPC server: {e}"),
230                }
231            },
232            Error::RpcServerStopped,
233            executor.clone(),
234        );
235
236        // Start the miners registry
237        info!(target: "darkfid::Darkfid::start", "Starting miners registry");
238        self.node.registry.start(executor, &self.node, stratum_rpc_settings, mm_rpc_settings)?;
239
240        // Start the P2P network
241        info!(target: "darkfid::Darkfid::start", "Starting P2P network");
242        self.node.p2p_handler.start(executor, &self.node).await?;
243
244        // Generate the signal queue smol channel
245        let (sender, receiver) = smol::channel::unbounded::<()>();
246
247        // Start the consensus protocol
248        info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
249        self.consensus_task.clone().start(
250            consensus_init_task(
251                self.node.clone(),
252                config.clone(),
253                sender,
254            ),
255            |res| async move {
256                match res {
257                    Ok(()) | Err(Error::ConsensusTaskStopped) | Err(Error::MinerTaskStopped) => { /* Do nothing */ }
258                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting consensus initialization task: {e}"),
259                }
260            },
261            Error::ConsensusTaskStopped,
262            executor.clone(),
263        );
264
265        // Start the garbage collection task
266        info!(target: "darkfid::Darkfid::start", "Starting garbage collection task");
267        self.gc_task.clone().start(
268            garbage_collect_task(receiver, self.node.clone()),
269            |res| async {
270                match res {
271                    Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
272                    Err(e) => {
273                        error!(target: "darkfid", "Failed starting garbage collection task: {e}")
274                    }
275                }
276            },
277            Error::GarbageCollectionTaskStopped,
278            executor.clone(),
279        );
280
281        info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
282        Ok(())
283    }
284
285    /// Stop the DarkFi daemon.
286    pub async fn stop(&self) -> Result<()> {
287        info!(target: "darkfid::Darkfid::stop", "Terminating Darkfi daemon...");
288
289        // Stop the `dnet` node
290        info!(target: "darkfid::Darkfid::stop", "Stopping dnet subs task...");
291        self.dnet_task.stop().await;
292
293        // Stop the main JSON-RPC task
294        info!(target: "darkfid::Darkfid::stop", "Stopping main JSON-RPC server...");
295        self.rpc_task.stop().await;
296
297        // Stop the management JSON-RPC task
298        info!(target: "darkfid::Darkfid::stop", "Stopping management JSON-RPC server...");
299        self.management_rpc_task.stop().await;
300
301        // Stop the miners registry
302        info!(target: "darkfid::Darkfid::stop", "Stopping miners registry...");
303        self.node.registry.stop().await;
304
305        // Stop the P2P network
306        info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
307        self.node.p2p_handler.stop().await;
308
309        // Stop the garbage collection task
310        info!(target: "darkfid::Darkfid::stop", "Stopping garbage collection task...");
311        self.gc_task.stop().await;
312
313        // Stop the consensus task
314        info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
315        self.consensus_task.stop().await;
316
317        // Flush sled database data
318        info!(target: "darkfid::Darkfid::stop", "Flushing sled database...");
319        let flushed_bytes =
320            self.node.validator.read().await.blockchain.sled_db.flush_async().await?;
321        info!(target: "darkfid::Darkfid::stop", "Flushed {flushed_bytes} bytes");
322
323        info!(target: "darkfid::Darkfid::stop", "Darkfi daemon terminated successfully!");
324        Ok(())
325    }
326}