1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use smol::lock::Mutex;
25use tracing::{debug, error, info};
26
27use darkfi::{
28 net::settings::Settings,
29 rpc::{
30 jsonrpc::JsonSubscriber,
31 server::{listen_and_serve, RequestHandler},
32 settings::RpcSettings,
33 },
34 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
35 validator::{Validator, ValidatorConfig, ValidatorPtr},
36 Error, Result,
37};
38use darkfi_sdk::crypto::keypair::Network;
39
40#[cfg(test)]
41mod tests;
42
43mod error;
44use error::{server_error, RpcError};
45
46mod rpc;
48use rpc::{management::ManagementRpcHandler, DefaultRpcHandler};
49
50pub mod task;
52use task::{consensus::ConsensusInitTaskConfig, consensus_init_task, garbage_collect_task};
53
54mod proto;
56use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
57
58mod registry;
60use registry::{DarkfiMinersRegistry, DarkfiMinersRegistryPtr};
61
62pub type DarkfiNodePtr = Arc<DarkfiNode>;
64
65pub struct DarkfiNode {
67 validator: ValidatorPtr,
69 p2p_handler: DarkfidP2pHandlerPtr,
71 registry: DarkfiMinersRegistryPtr,
73 subscribers: HashMap<&'static str, JsonSubscriber>,
75 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
77 management_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
79}
80
81impl DarkfiNode {
82 pub async fn new(
83 validator: ValidatorPtr,
84 p2p_handler: DarkfidP2pHandlerPtr,
85 registry: DarkfiMinersRegistryPtr,
86 subscribers: HashMap<&'static str, JsonSubscriber>,
87 ) -> Result<DarkfiNodePtr> {
88 Ok(Arc::new(Self {
89 validator,
90 p2p_handler,
91 registry,
92 subscribers,
93 rpc_connections: Mutex::new(HashSet::new()),
94 management_rpc_connections: Mutex::new(HashSet::new()),
95 }))
96 }
97}
98
99pub type DarkfidPtr = Arc<Darkfid>;
101
102pub struct Darkfid {
104 node: DarkfiNodePtr,
106 dnet_task: StoppableTaskPtr,
108 rpc_task: StoppableTaskPtr,
110 management_rpc_task: StoppableTaskPtr,
112 consensus_task: StoppableTaskPtr,
114 gc_task: StoppableTaskPtr,
116}
117
118impl Darkfid {
119 pub async fn init(
124 network: Network,
125 sled_db: &sled_overlay::sled::Db,
126 config: &ValidatorConfig,
127 net_settings: &Settings,
128 ex: &ExecutorPtr,
129 ) -> Result<DarkfidPtr> {
130 info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
131 let validator = Validator::new(sled_db, config).await?;
133
134 let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
136
137 let registry = DarkfiMinersRegistry::init(network, &validator).await?;
139
140 let mut subscribers = HashMap::new();
142 subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
143 subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
144 subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
145 subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
146
147 let node = DarkfiNode::new(validator, p2p_handler, registry, subscribers).await?;
149
150 let dnet_task = StoppableTask::new();
152 let rpc_task = StoppableTask::new();
153 let management_rpc_task = StoppableTask::new();
154 let consensus_task = StoppableTask::new();
155 let gc_task = StoppableTask::new();
156
157 info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
158
159 Ok(Arc::new(Self {
160 node,
161 dnet_task,
162 rpc_task,
163 management_rpc_task,
164 consensus_task,
165 gc_task,
166 }))
167 }
168
169 pub async fn start(
173 &self,
174 executor: &ExecutorPtr,
175 rpc_settings: &RpcSettings,
176 management_rpc_settings: &RpcSettings,
177 stratum_rpc_settings: &Option<RpcSettings>,
178 mm_rpc_settings: &Option<RpcSettings>,
179 config: &ConsensusInitTaskConfig,
180 ) -> Result<()> {
181 info!(target: "darkfid::Darkfid::start", "Starting Darkfi daemon...");
182
183 info!(target: "darkfid::Darkfid::start", "Starting dnet subs task");
185 let dnet_sub_ = self.node.subscribers.get("dnet").unwrap().clone();
186 let p2p_ = self.node.p2p_handler.p2p.clone();
187 self.dnet_task.clone().start(
188 async move {
189 let dnet_sub = p2p_.dnet_subscribe().await;
190 loop {
191 let event = dnet_sub.receive().await;
192 debug!(target: "darkfid::Darkfid::dnet_task", "Got dnet event: {event:?}");
193 dnet_sub_.notify(vec![event.into()].into()).await;
194 }
195 },
196 |res| async {
197 match res {
198 Ok(()) | Err(Error::DetachedTaskStopped) => { }
199 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting dnet subs task: {e}"),
200 }
201 },
202 Error::DetachedTaskStopped,
203 executor.clone(),
204 );
205
206 info!(target: "darkfid::Darkfid::start", "Starting main JSON-RPC server");
208 let node_ = self.node.clone();
209 self.rpc_task.clone().start(
210 listen_and_serve::<DefaultRpcHandler>(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
211 |res| async move {
212 match res {
213 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<DefaultRpcHandler>>::stop_connections(&node_).await,
214 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting main JSON-RPC server: {e}"),
215 }
216 },
217 Error::RpcServerStopped,
218 executor.clone(),
219 );
220
221 info!(target: "darkfid::Darkfid::start", "Starting management JSON-RPC server");
223 let node_ = self.node.clone();
224 self.management_rpc_task.clone().start(
225 listen_and_serve::<ManagementRpcHandler>(management_rpc_settings.clone(), self.node.clone(), None, executor.clone()),
226 |res| async move {
227 match res {
228 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<ManagementRpcHandler>>::stop_connections(&node_).await,
229 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting management JSON-RPC server: {e}"),
230 }
231 },
232 Error::RpcServerStopped,
233 executor.clone(),
234 );
235
236 info!(target: "darkfid::Darkfid::start", "Starting miners registry");
238 self.node.registry.start(executor, &self.node, stratum_rpc_settings, mm_rpc_settings)?;
239
240 info!(target: "darkfid::Darkfid::start", "Starting P2P network");
242 self.node.p2p_handler.start(executor, &self.node).await?;
243
244 let (sender, receiver) = smol::channel::unbounded::<()>();
246
247 info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
249 self.consensus_task.clone().start(
250 consensus_init_task(
251 self.node.clone(),
252 config.clone(),
253 sender,
254 ),
255 |res| async move {
256 match res {
257 Ok(()) | Err(Error::ConsensusTaskStopped) | Err(Error::MinerTaskStopped) => { }
258 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting consensus initialization task: {e}"),
259 }
260 },
261 Error::ConsensusTaskStopped,
262 executor.clone(),
263 );
264
265 info!(target: "darkfid::Darkfid::start", "Starting garbage collection task");
267 self.gc_task.clone().start(
268 garbage_collect_task(receiver, self.node.clone()),
269 |res| async {
270 match res {
271 Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { }
272 Err(e) => {
273 error!(target: "darkfid", "Failed starting garbage collection task: {e}")
274 }
275 }
276 },
277 Error::GarbageCollectionTaskStopped,
278 executor.clone(),
279 );
280
281 info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
282 Ok(())
283 }
284
285 pub async fn stop(&self) -> Result<()> {
287 info!(target: "darkfid::Darkfid::stop", "Terminating Darkfi daemon...");
288
289 info!(target: "darkfid::Darkfid::stop", "Stopping dnet subs task...");
291 self.dnet_task.stop().await;
292
293 info!(target: "darkfid::Darkfid::stop", "Stopping main JSON-RPC server...");
295 self.rpc_task.stop().await;
296
297 info!(target: "darkfid::Darkfid::stop", "Stopping management JSON-RPC server...");
299 self.management_rpc_task.stop().await;
300
301 info!(target: "darkfid::Darkfid::stop", "Stopping miners registry...");
303 self.node.registry.stop().await;
304
305 info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
307 self.node.p2p_handler.stop().await;
308
309 info!(target: "darkfid::Darkfid::stop", "Stopping garbage collection task...");
311 self.gc_task.stop().await;
312
313 info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
315 self.consensus_task.stop().await;
316
317 info!(target: "darkfid::Darkfid::stop", "Flushing sled database...");
319 let flushed_bytes =
320 self.node.validator.read().await.blockchain.sled_db.flush_async().await?;
321 info!(target: "darkfid::Darkfid::stop", "Flushed {flushed_bytes} bytes");
322
323 info!(target: "darkfid::Darkfid::stop", "Darkfi daemon terminated successfully!");
324 Ok(())
325 }
326}