1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use smol::lock::Mutex;
25use tracing::{debug, error, info};
26
27use darkfi::{
28 net::settings::Settings,
29 rpc::{
30 jsonrpc::JsonSubscriber,
31 server::{listen_and_serve, RequestHandler},
32 settings::RpcSettings,
33 },
34 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
35 validator::{Validator, ValidatorConfig, ValidatorPtr},
36 Error, Result,
37};
38use darkfi_sdk::crypto::keypair::Network;
39
40#[cfg(test)]
41mod tests;
42
43mod error;
44use error::{server_error, RpcError};
45
46mod rpc;
48use rpc::{management::ManagementRpcHandler, DefaultRpcHandler};
49
50pub mod task;
52use task::{consensus::ConsensusInitTaskConfig, consensus_init_task};
53
54mod proto;
56use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
57
58mod registry;
60use registry::{DarkfiMinersRegistry, DarkfiMinersRegistryPtr};
61
62pub type DarkfiNodePtr = Arc<DarkfiNode>;
64
65pub struct DarkfiNode {
67 validator: ValidatorPtr,
69 p2p_handler: DarkfidP2pHandlerPtr,
71 registry: DarkfiMinersRegistryPtr,
73 txs_batch_size: usize,
75 subscribers: HashMap<&'static str, JsonSubscriber>,
77 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
79 management_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
81}
82
83impl DarkfiNode {
84 pub async fn new(
85 validator: ValidatorPtr,
86 p2p_handler: DarkfidP2pHandlerPtr,
87 registry: DarkfiMinersRegistryPtr,
88 txs_batch_size: usize,
89 subscribers: HashMap<&'static str, JsonSubscriber>,
90 ) -> Result<DarkfiNodePtr> {
91 Ok(Arc::new(Self {
92 validator,
93 p2p_handler,
94 registry,
95 txs_batch_size,
96 subscribers,
97 rpc_connections: Mutex::new(HashSet::new()),
98 management_rpc_connections: Mutex::new(HashSet::new()),
99 }))
100 }
101}
102
103pub type DarkfidPtr = Arc<Darkfid>;
105
106pub struct Darkfid {
108 node: DarkfiNodePtr,
110 dnet_task: StoppableTaskPtr,
112 rpc_task: StoppableTaskPtr,
114 management_rpc_task: StoppableTaskPtr,
116 consensus_task: StoppableTaskPtr,
118}
119
120impl Darkfid {
121 pub async fn init(
126 network: Network,
127 sled_db: &sled_overlay::sled::Db,
128 config: &ValidatorConfig,
129 net_settings: &Settings,
130 txs_batch_size: &Option<usize>,
131 ex: &ExecutorPtr,
132 ) -> Result<DarkfidPtr> {
133 info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
134 let validator = Validator::new(sled_db, config).await?;
136
137 let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
139
140 let registry = DarkfiMinersRegistry::init(network, &validator)?;
142
143 let txs_batch_size = match txs_batch_size {
145 Some(b) => {
146 if *b > 0 {
147 *b
148 } else {
149 50
150 }
151 }
152 None => 50,
153 };
154
155 let mut subscribers = HashMap::new();
157 subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
158 subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
159 subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
160 subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
161
162 let node =
164 DarkfiNode::new(validator, p2p_handler, registry, txs_batch_size, subscribers).await?;
165
166 let dnet_task = StoppableTask::new();
168 let rpc_task = StoppableTask::new();
169 let management_rpc_task = StoppableTask::new();
170 let consensus_task = StoppableTask::new();
171
172 info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
173
174 Ok(Arc::new(Self { node, dnet_task, rpc_task, management_rpc_task, consensus_task }))
175 }
176
177 pub async fn start(
181 &self,
182 executor: &ExecutorPtr,
183 rpc_settings: &RpcSettings,
184 management_rpc_settings: &RpcSettings,
185 stratum_rpc_settings: &Option<RpcSettings>,
186 mm_rpc_settings: &Option<RpcSettings>,
187 config: &ConsensusInitTaskConfig,
188 ) -> Result<()> {
189 info!(target: "darkfid::Darkfid::start", "Starting Darkfi daemon...");
190
191 info!(target: "darkfid::Darkfid::start", "Starting dnet subs task");
193 let dnet_sub_ = self.node.subscribers.get("dnet").unwrap().clone();
194 let p2p_ = self.node.p2p_handler.p2p.clone();
195 self.dnet_task.clone().start(
196 async move {
197 let dnet_sub = p2p_.dnet_subscribe().await;
198 loop {
199 let event = dnet_sub.receive().await;
200 debug!(target: "darkfid::Darkfid::dnet_task", "Got dnet event: {event:?}");
201 dnet_sub_.notify(vec![event.into()].into()).await;
202 }
203 },
204 |res| async {
205 match res {
206 Ok(()) | Err(Error::DetachedTaskStopped) => { }
207 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting dnet subs task: {e}"),
208 }
209 },
210 Error::DetachedTaskStopped,
211 executor.clone(),
212 );
213
214 info!(target: "darkfid::Darkfid::start", "Starting main JSON-RPC server");
216 let node_ = self.node.clone();
217 self.rpc_task.clone().start(
218 listen_and_serve::<DefaultRpcHandler>(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
219 |res| async move {
220 match res {
221 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<DefaultRpcHandler>>::stop_connections(&node_).await,
222 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting main JSON-RPC server: {e}"),
223 }
224 },
225 Error::RpcServerStopped,
226 executor.clone(),
227 );
228
229 info!(target: "darkfid::Darkfid::start", "Starting management JSON-RPC server");
231 let node_ = self.node.clone();
232 self.management_rpc_task.clone().start(
233 listen_and_serve::<ManagementRpcHandler>(management_rpc_settings.clone(), self.node.clone(), None, executor.clone()),
234 |res| async move {
235 match res {
236 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<ManagementRpcHandler>>::stop_connections(&node_).await,
237 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting management JSON-RPC server: {e}"),
238 }
239 },
240 Error::RpcServerStopped,
241 executor.clone(),
242 );
243
244 info!(target: "darkfid::Darkfid::start", "Starting miners registry");
246 self.node.registry.start(executor, &self.node, stratum_rpc_settings, mm_rpc_settings)?;
247
248 info!(target: "darkfid::Darkfid::start", "Starting P2P network");
250 self.node.p2p_handler.start(executor, &self.node.validator, &self.node.subscribers).await?;
251
252 info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
254 self.consensus_task.clone().start(
255 consensus_init_task(
256 self.node.clone(),
257 config.clone(),
258 executor.clone(),
259 ),
260 |res| async move {
261 match res {
262 Ok(()) | Err(Error::ConsensusTaskStopped) | Err(Error::MinerTaskStopped) => { }
263 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting consensus initialization task: {e}"),
264 }
265 },
266 Error::ConsensusTaskStopped,
267 executor.clone(),
268 );
269
270 info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
271 Ok(())
272 }
273
274 pub async fn stop(&self) -> Result<()> {
276 info!(target: "darkfid::Darkfid::stop", "Terminating Darkfi daemon...");
277
278 info!(target: "darkfid::Darkfid::stop", "Stopping dnet subs task...");
280 self.dnet_task.stop().await;
281
282 info!(target: "darkfid::Darkfid::stop", "Stopping main JSON-RPC server...");
284 self.rpc_task.stop().await;
285
286 info!(target: "darkfid::Darkfid::stop", "Stopping management JSON-RPC server...");
288 self.management_rpc_task.stop().await;
289
290 info!(target: "darkfid::Darkfid::stop", "Stopping miners registry...");
292 self.node.registry.stop().await;
293
294 info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
296 self.node.p2p_handler.stop().await;
297
298 info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
300 self.consensus_task.stop().await;
301
302 info!(target: "darkfid::Darkfid::stop", "Flushing sled database...");
304 let flushed_bytes = self.node.validator.blockchain.sled_db.flush_async().await?;
305 info!(target: "darkfid::Darkfid::stop", "Flushed {flushed_bytes} bytes");
306
307 info!(target: "darkfid::Darkfid::stop", "Darkfi daemon terminated successfully!");
308 Ok(())
309 }
310}