1use std::{
20 collections::{BTreeSet, HashMap, HashSet},
21 sync::Arc,
22};
23
24use sled_overlay::sled::IVec;
25use smol::lock::{Mutex, RwLock};
26use tinyjson::JsonValue;
27use tracing::{error, info};
28
29use darkfi::{
30 blockchain::BlockInfo,
31 rpc::{
32 jsonrpc::JsonSubscriber,
33 server::{listen_and_serve, RequestHandler},
34 settings::RpcSettings,
35 },
36 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
37 util::encoding::base64,
38 validator::{consensus::Proposal, ValidatorPtr},
39 Error, Result,
40};
41use darkfi_sdk::{
42 crypto::{keypair::Network, pasta_prelude::PrimeField},
43 tx::TransactionHash,
44};
45use darkfi_serial::serialize_async;
46
47use crate::{
48 proto::{DarkfidP2pHandlerPtr, ProposalMessage},
49 rpc::{stratum::StratumRpcHandler, xmr::MmRpcHandler},
50 DarkfiNode, DarkfiNodePtr,
51};
52
53pub mod model;
55use model::{
56 generate_next_block_template, BlockTemplate, MinerClient, MinerRewardsRecipientConfig,
57 PowRewardV1Zk,
58};
59
60pub type DarkfiMinersRegistryPtr = Arc<DarkfiMinersRegistry>;
62
63pub struct DarkfiMinersRegistry {
65 pub network: Network,
67 pub powrewardv1_zk: PowRewardV1Zk,
69 pub block_templates: RwLock<HashMap<String, BlockTemplate>>,
71 pub jobs: RwLock<HashMap<String, MinerClient>>,
77 pub mm_jobs: RwLock<HashMap<String, String>>,
80 pub submit_lock: RwLock<()>,
82 stratum_rpc_task: StoppableTaskPtr,
84 pub stratum_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
86 mm_rpc_task: StoppableTaskPtr,
88 pub mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
90}
91
92impl DarkfiMinersRegistry {
93 pub fn init(network: Network, validator: &ValidatorPtr) -> Result<DarkfiMinersRegistryPtr> {
95 info!(
96 target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
97 "Initializing a new DarkFi node miners registry..."
98 );
99
100 let powrewardv1_zk = PowRewardV1Zk::new(validator)?;
102
103 let stratum_rpc_task = StoppableTask::new();
106 let stratum_rpc_connections = Mutex::new(HashSet::new());
107
108 let mm_rpc_task = StoppableTask::new();
111 let mm_rpc_connections = Mutex::new(HashSet::new());
112
113 info!(
114 target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
115 "DarkFi node miners registry generated successfully!"
116 );
117
118 Ok(Arc::new(Self {
119 network,
120 powrewardv1_zk,
121 block_templates: RwLock::new(HashMap::new()),
122 jobs: RwLock::new(HashMap::new()),
123 mm_jobs: RwLock::new(HashMap::new()),
124 submit_lock: RwLock::new(()),
125 stratum_rpc_task,
126 stratum_rpc_connections,
127 mm_rpc_task,
128 mm_rpc_connections,
129 }))
130 }
131
132 pub fn start(
135 &self,
136 executor: &ExecutorPtr,
137 node: &DarkfiNodePtr,
138 stratum_rpc_settings: &Option<RpcSettings>,
139 mm_rpc_settings: &Option<RpcSettings>,
140 ) -> Result<()> {
141 info!(
142 target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
143 "Starting the DarkFi node miners registry..."
144 );
145
146 if let Some(stratum_rpc) = stratum_rpc_settings {
148 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting Stratum JSON-RPC server");
149 let node_ = node.clone();
150 self.stratum_rpc_task.clone().start(
151 listen_and_serve::<StratumRpcHandler>(stratum_rpc.clone(), node.clone(), None, executor.clone()),
152 |res| async move {
153 match res {
154 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<StratumRpcHandler>>::stop_connections(&node_).await,
155 Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting Stratum JSON-RPC server: {e}"),
156 }
157 },
158 Error::RpcServerStopped,
159 executor.clone(),
160 );
161 } else {
162 self.stratum_rpc_task.clone().start(
164 async { Ok(()) },
165 |_| async { },
166 Error::RpcServerStopped,
167 executor.clone(),
168 );
169 }
170
171 if let Some(mm_rpc) = mm_rpc_settings {
173 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting merge mining JSON-RPC server");
174 let node_ = node.clone();
175 self.mm_rpc_task.clone().start(
176 listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), node.clone(), None, executor.clone()),
177 |res| async move {
178 match res {
179 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
180 Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting merge mining JSON-RPC server: {e}"),
181 }
182 },
183 Error::RpcServerStopped,
184 executor.clone(),
185 );
186 } else {
187 self.mm_rpc_task.clone().start(
189 async { Ok(()) },
190 |_| async { },
191 Error::RpcServerStopped,
192 executor.clone(),
193 );
194 }
195
196 info!(
197 target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
198 "DarkFi node miners registry started successfully!"
199 );
200
201 Ok(())
202 }
203
204 pub async fn stop(&self) {
206 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Terminating DarkFi node miners registry...");
207
208 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping Stratum JSON-RPC server...");
210 self.stratum_rpc_task.stop().await;
211
212 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping merge mining JSON-RPC server...");
214 self.mm_rpc_task.stop().await;
215
216 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "DarkFi node miners registry terminated successfully!");
217 }
218
219 async fn create_template(
226 &self,
227 validator: &ValidatorPtr,
228 wallet: &String,
229 config: &MinerRewardsRecipientConfig,
230 ) -> Result<BlockTemplate> {
231 let mut block_templates = self.block_templates.write().await;
233
234 if let Some(block_template) = block_templates.get(wallet) {
236 return Ok(block_template.clone())
237 }
238
239 let mut extended_fork = validator.best_current_fork().await?;
241
242 let block_template = generate_next_block_template(
244 &mut extended_fork,
245 config,
246 &self.powrewardv1_zk.zkbin,
247 &self.powrewardv1_zk.provingkey,
248 validator.verify_fees,
249 )
250 .await?;
251
252 block_templates.insert(wallet.clone(), block_template.clone());
254
255 let recipient_str = format!("{}", config.recipient);
257 let spend_hook_str = match config.spend_hook {
258 Some(spend_hook) => format!("{spend_hook}"),
259 None => String::from("-"),
260 };
261 let user_data_str = match config.user_data {
262 Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
263 None => String::from("-"),
264 };
265 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
266 "Created new block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
267 );
268
269 Ok(block_template)
270 }
271
272 pub async fn register_miner(
274 &self,
275 validator: &ValidatorPtr,
276 wallet: &String,
277 config: &MinerRewardsRecipientConfig,
278 ) -> Result<(String, String, JsonValue, JsonSubscriber)> {
279 let mut jobs = self.jobs.write().await;
281
282 let block_template = self.create_template(validator, wallet, config).await?;
284
285 let (job_id, job) = block_template.job_notification();
287 let (client_id, client) = MinerClient::new(wallet, config, &job_id);
288 let publisher = client.publisher.clone();
289 jobs.insert(client_id.clone(), client);
290
291 Ok((client_id, job_id, job, publisher))
292 }
293
294 pub async fn register_merge_miner(
296 &self,
297 validator: &ValidatorPtr,
298 wallet: &String,
299 config: &MinerRewardsRecipientConfig,
300 ) -> Result<(String, f64)> {
301 let mut jobs = self.mm_jobs.write().await;
303
304 let block_template = self.create_template(validator, wallet, config).await?;
306
307 let block_template_hash = block_template.block.header.template_hash().as_string();
310 let difficulty = block_template.difficulty;
311 jobs.insert(block_template_hash.clone(), wallet.clone());
312
313 Ok((block_template_hash, difficulty))
314 }
315
316 pub async fn submit(
318 &self,
319 validator: &ValidatorPtr,
320 subscribers: &HashMap<&'static str, JsonSubscriber>,
321 p2p_handler: &DarkfidP2pHandlerPtr,
322 block: BlockInfo,
323 ) -> Result<()> {
324 let proposal = Proposal::new(block);
325 validator.append_proposal(&proposal).await?;
326
327 info!(
328 target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
329 "Proposing new block to network",
330 );
331
332 let proposals_sub = subscribers.get("proposals").unwrap();
333 let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
334 proposals_sub.notify(vec![enc_prop].into()).await;
335
336 info!(
337 target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
338 "Broadcasting new block to network",
339 );
340 let message = ProposalMessage(proposal);
341 p2p_handler.p2p.broadcast(&message).await;
342
343 Ok(())
344 }
345
346 pub async fn refresh_jobs(
352 &self,
353 block_templates: &mut HashMap<String, BlockTemplate>,
354 jobs: &mut HashMap<String, MinerClient>,
355 mm_jobs: &mut HashMap<String, String>,
356 validator: &ValidatorPtr,
357 ) -> Result<()> {
358 let mut dropped_jobs = vec![];
360 let mut active_templates = HashSet::new();
361 for (client_id, client) in jobs.iter() {
362 if client.publisher.publisher.clear_inactive().await {
366 dropped_jobs.push(client_id.clone());
367 continue
368 }
369
370 active_templates.insert(client.wallet.clone());
372 }
373 jobs.retain(|client_id, _| !dropped_jobs.contains(client_id));
374
375 let extended_fork = validator.best_current_fork().await?;
378 let last_proposal = extended_fork.last_proposal()?.hash;
379
380 let mut dropped_mm_jobs = vec![];
383 for (job_id, wallet) in mm_jobs.iter() {
384 let block_template = block_templates.get(wallet).unwrap();
387
388 if block_template.block.header.previous == last_proposal {
390 active_templates.insert(wallet.clone());
391 continue
392 }
393
394 dropped_mm_jobs.push(job_id.clone());
397 }
398 mm_jobs.retain(|job_id, _| !dropped_mm_jobs.contains(job_id));
399
400 block_templates.retain(|wallet, _| active_templates.contains(wallet));
403
404 if block_templates.is_empty() {
406 return Ok(())
407 }
408
409 for (job_id, client) in jobs.iter_mut() {
411 let block_template = block_templates.get_mut(&client.wallet).unwrap();
414
415 if block_template.block.header.previous == last_proposal {
417 continue
418 }
419
420 let mut extended_fork = extended_fork.full_clone()?;
422
423 let result = generate_next_block_template(
425 &mut extended_fork,
426 &client.config,
427 &self.powrewardv1_zk.zkbin,
428 &self.powrewardv1_zk.provingkey,
429 validator.verify_fees,
430 )
431 .await;
432
433 *block_template = match result {
435 Ok(b) => b,
436 Err(e) => {
437 error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
438 "Updating block template for job {job_id} failed: {e}",
439 );
440 block_template.submitted = false;
443 continue;
444 }
445 };
446
447 let recipient_str = format!("{}", client.config.recipient);
449 let spend_hook_str = match client.config.spend_hook {
450 Some(spend_hook) => format!("{spend_hook}"),
451 None => String::from("-"),
452 };
453 let user_data_str = match client.config.user_data {
454 Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
455 None => String::from("-"),
456 };
457 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
458 "Updated block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
459 );
460
461 let (job, notification) = block_template.job_notification();
463
464 client.job = job;
466
467 client.publisher.notify(notification).await;
469 }
470
471 Ok(())
472 }
473
474 pub async fn refresh(&self, validator: &ValidatorPtr) -> Result<()> {
477 let submit_lock = self.submit_lock.write().await;
479 let mut block_templates = self.block_templates.write().await;
480 let mut jobs = self.jobs.write().await;
481 let mut mm_jobs = self.mm_jobs.write().await;
482
483 self.refresh_jobs(&mut block_templates, &mut jobs, &mut mm_jobs, validator).await?;
485
486 drop(block_templates);
488 drop(jobs);
489 drop(mm_jobs);
490 drop(submit_lock);
491
492 Ok(())
493 }
494
495 pub fn new_trees(&self, block_templates: &HashMap<String, BlockTemplate>) -> BTreeSet<IVec> {
498 let mut new_trees = BTreeSet::new();
499 for block_template in block_templates.values() {
500 for new_tree in &block_template.new_trees {
501 new_trees.insert(new_tree.clone());
502 }
503 }
504 new_trees
505 }
506
507 pub fn proposed_transactions(
510 &self,
511 block_templates: &HashMap<String, BlockTemplate>,
512 ) -> HashSet<TransactionHash> {
513 let mut proposed_txs = HashSet::new();
514 for block_template in block_templates.values() {
515 for tx in &block_template.block.txs {
516 proposed_txs.insert(tx.hash());
517 }
518 }
519 proposed_txs
520 }
521}