1use std::{
20 collections::{BTreeSet, HashMap, HashSet},
21 sync::Arc,
22};
23
24use sled_overlay::sled::IVec;
25use smol::lock::{Mutex, RwLock};
26use tinyjson::JsonValue;
27use tracing::{error, info};
28
29use darkfi::{
30 blockchain::BlockInfo,
31 rpc::{
32 jsonrpc::JsonSubscriber,
33 server::{listen_and_serve, RequestHandler},
34 settings::RpcSettings,
35 },
36 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
37 util::encoding::base64,
38 validator::{consensus::Proposal, Validator, ValidatorPtr},
39 Error, Result,
40};
41use darkfi_sdk::{
42 crypto::{keypair::Network, pasta_prelude::PrimeField},
43 tx::TransactionHash,
44};
45use darkfi_serial::serialize_async;
46
47use crate::{
48 proto::{DarkfidP2pHandlerPtr, ProposalMessage},
49 rpc::{stratum::StratumRpcHandler, xmr::MmRpcHandler},
50 DarkfiNode, DarkfiNodePtr,
51};
52
53pub mod model;
55use model::{
56 generate_next_block_template, BlockTemplate, MinerClient, MinerRewardsRecipientConfig,
57 PowRewardV1Zk,
58};
59
60pub type DarkfiMinersRegistryStatePtr = Arc<RwLock<DarkfiMinersRegistryState>>;
62
63pub struct DarkfiMinersRegistryState {
65 pub powrewardv1_zk: PowRewardV1Zk,
67 pub block_templates: HashMap<String, BlockTemplate>,
69 pub jobs: HashMap<String, MinerClient>,
75 pub mm_jobs: HashMap<String, String>,
78}
79
80impl DarkfiMinersRegistryState {
81 pub async fn new(validator: &ValidatorPtr) -> Result<DarkfiMinersRegistryStatePtr> {
82 let powrewardv1_zk = PowRewardV1Zk::new(validator).await?;
84
85 Ok(Arc::new(RwLock::new(Self {
86 powrewardv1_zk,
87 block_templates: HashMap::new(),
88 jobs: HashMap::new(),
89 mm_jobs: HashMap::new(),
90 })))
91 }
92
93 async fn create_template(
100 &mut self,
101 validator: &Validator,
102 wallet: &String,
103 config: &MinerRewardsRecipientConfig,
104 ) -> Result<BlockTemplate> {
105 if let Some(block_template) = self.block_templates.get(wallet) {
107 return Ok(block_template.clone())
108 }
109
110 let mut extended_fork = validator.best_current_fork().await?;
112
113 let block_template = generate_next_block_template(
115 &mut extended_fork,
116 config,
117 &self.powrewardv1_zk.zkbin,
118 &self.powrewardv1_zk.provingkey,
119 validator.verify_fees,
120 )
121 .await?;
122
123 self.block_templates.insert(wallet.clone(), block_template.clone());
125
126 let recipient_str = format!("{}", config.recipient);
128 let spend_hook_str = match config.spend_hook {
129 Some(spend_hook) => format!("{spend_hook}"),
130 None => String::from("-"),
131 };
132 let user_data_str = match config.user_data {
133 Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
134 None => String::from("-"),
135 };
136 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
137 "Created new block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
138 );
139
140 Ok(block_template)
141 }
142
143 pub async fn register_miner(
145 &mut self,
146 validator: &Validator,
147 wallet: &String,
148 config: &MinerRewardsRecipientConfig,
149 ) -> Result<(String, String, JsonValue, JsonSubscriber)> {
150 let block_template = self.create_template(validator, wallet, config).await?;
152
153 let (job_id, job) = block_template.job_notification();
155 let (client_id, client) = MinerClient::new(wallet, config, &job_id);
156 let publisher = client.publisher.clone();
157 self.jobs.insert(client_id.clone(), client);
158
159 Ok((client_id, job_id, job, publisher))
160 }
161
162 pub async fn register_merge_miner(
164 &mut self,
165 validator: &Validator,
166 wallet: &String,
167 config: &MinerRewardsRecipientConfig,
168 ) -> Result<(String, f64)> {
169 let block_template = self.create_template(validator, wallet, config).await?;
171
172 let block_template_hash = block_template.block.header.template_hash().as_string();
175 let difficulty = block_template.difficulty;
176 self.mm_jobs.insert(block_template_hash.clone(), wallet.clone());
177
178 Ok((block_template_hash, difficulty))
179 }
180
181 pub async fn submit(
183 &self,
184 validator: &mut Validator,
185 subscribers: &HashMap<&'static str, JsonSubscriber>,
186 p2p_handler: &DarkfidP2pHandlerPtr,
187 block: BlockInfo,
188 ) -> Result<()> {
189 let proposal = Proposal::new(block);
190 validator.append_proposal(&proposal).await?;
191
192 info!(
193 target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
194 "Proposing new block to network",
195 );
196
197 let proposals_sub = subscribers.get("proposals").unwrap();
198 let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
199 proposals_sub.notify(vec![enc_prop].into()).await;
200
201 info!(
202 target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
203 "Broadcasting new block to network",
204 );
205 let message = ProposalMessage(proposal);
206 p2p_handler.p2p.broadcast(&message).await;
207
208 Ok(())
209 }
210
211 pub async fn refresh(&mut self, validator: &Validator) -> Result<()> {
214 let mut dropped_jobs = vec![];
216 let mut active_templates = HashSet::new();
217 for (client_id, client) in self.jobs.iter() {
218 if client.publisher.publisher.clear_inactive().await {
222 dropped_jobs.push(client_id.clone());
223 continue
224 }
225
226 active_templates.insert(client.wallet.clone());
228 }
229 self.jobs.retain(|client_id, _| !dropped_jobs.contains(client_id));
230
231 let extended_fork = validator.best_current_fork().await?;
234 let last_proposal = extended_fork.last_proposal()?.hash;
235
236 let mut dropped_mm_jobs = vec![];
239 for (job_id, wallet) in self.mm_jobs.iter() {
240 let block_template = self.block_templates.get(wallet).unwrap();
243
244 if block_template.block.header.previous == last_proposal {
246 active_templates.insert(wallet.clone());
247 continue
248 }
249
250 dropped_mm_jobs.push(job_id.clone());
253 }
254 self.mm_jobs.retain(|job_id, _| !dropped_mm_jobs.contains(job_id));
255
256 self.block_templates.retain(|wallet, _| active_templates.contains(wallet));
259
260 if self.block_templates.is_empty() {
262 return Ok(())
263 }
264
265 for (job_id, client) in self.jobs.iter_mut() {
267 let block_template = self.block_templates.get_mut(&client.wallet).unwrap();
270
271 if block_template.block.header.previous == last_proposal {
273 continue
274 }
275
276 let mut extended_fork = extended_fork.full_clone()?;
278
279 let result = generate_next_block_template(
281 &mut extended_fork,
282 &client.config,
283 &self.powrewardv1_zk.zkbin,
284 &self.powrewardv1_zk.provingkey,
285 validator.verify_fees,
286 )
287 .await;
288
289 *block_template = match result {
291 Ok(b) => b,
292 Err(e) => {
293 error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
294 "Updating block template for job {job_id} failed: {e}",
295 );
296 block_template.submitted = false;
299 continue;
300 }
301 };
302
303 let recipient_str = format!("{}", client.config.recipient);
305 let spend_hook_str = match client.config.spend_hook {
306 Some(spend_hook) => format!("{spend_hook}"),
307 None => String::from("-"),
308 };
309 let user_data_str = match client.config.user_data {
310 Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
311 None => String::from("-"),
312 };
313 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
314 "Updated block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
315 );
316
317 let (job, notification) = block_template.job_notification();
319
320 client.job = job;
322
323 client.publisher.notify(notification).await;
325 }
326
327 Ok(())
328 }
329
330 pub fn new_trees(&self) -> BTreeSet<IVec> {
333 let mut new_trees = BTreeSet::new();
334 for block_template in self.block_templates.values() {
335 for new_tree in &block_template.new_trees {
336 new_trees.insert(new_tree.clone());
337 }
338 }
339 new_trees
340 }
341
342 pub fn proposed_transactions(&self) -> HashSet<TransactionHash> {
345 let mut proposed_txs = HashSet::new();
346 for block_template in self.block_templates.values() {
347 for tx in &block_template.block.txs {
348 proposed_txs.insert(tx.hash());
349 }
350 }
351 proposed_txs
352 }
353}
354
355pub type DarkfiMinersRegistryPtr = Arc<DarkfiMinersRegistry>;
357
358pub struct DarkfiMinersRegistry {
360 pub network: Network,
362 pub state: DarkfiMinersRegistryStatePtr,
364 stratum_rpc_task: StoppableTaskPtr,
366 pub stratum_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
368 mm_rpc_task: StoppableTaskPtr,
370 pub mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
372}
373
374impl DarkfiMinersRegistry {
375 pub async fn init(
377 network: Network,
378 validator: &ValidatorPtr,
379 ) -> Result<DarkfiMinersRegistryPtr> {
380 info!(
381 target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
382 "Initializing a new DarkFi node miners registry..."
383 );
384
385 let state = DarkfiMinersRegistryState::new(validator).await?;
387
388 let stratum_rpc_task = StoppableTask::new();
391 let stratum_rpc_connections = Mutex::new(HashSet::new());
392
393 let mm_rpc_task = StoppableTask::new();
396 let mm_rpc_connections = Mutex::new(HashSet::new());
397
398 info!(
399 target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
400 "DarkFi node miners registry generated successfully!"
401 );
402
403 Ok(Arc::new(Self {
404 network,
405 state,
406 stratum_rpc_task,
407 stratum_rpc_connections,
408 mm_rpc_task,
409 mm_rpc_connections,
410 }))
411 }
412
413 pub fn start(
416 &self,
417 executor: &ExecutorPtr,
418 node: &DarkfiNodePtr,
419 stratum_rpc_settings: &Option<RpcSettings>,
420 mm_rpc_settings: &Option<RpcSettings>,
421 ) -> Result<()> {
422 info!(
423 target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
424 "Starting the DarkFi node miners registry..."
425 );
426
427 if let Some(stratum_rpc) = stratum_rpc_settings {
429 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting Stratum JSON-RPC server");
430 let node_ = node.clone();
431 self.stratum_rpc_task.clone().start(
432 listen_and_serve::<StratumRpcHandler>(stratum_rpc.clone(), node.clone(), None, executor.clone()),
433 |res| async move {
434 match res {
435 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<StratumRpcHandler>>::stop_connections(&node_).await,
436 Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting Stratum JSON-RPC server: {e}"),
437 }
438 },
439 Error::RpcServerStopped,
440 executor.clone(),
441 );
442 } else {
443 self.stratum_rpc_task.clone().start(
445 async { Ok(()) },
446 |_| async { },
447 Error::RpcServerStopped,
448 executor.clone(),
449 );
450 }
451
452 if let Some(mm_rpc) = mm_rpc_settings {
454 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting merge mining JSON-RPC server");
455 let node_ = node.clone();
456 self.mm_rpc_task.clone().start(
457 listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), node.clone(), None, executor.clone()),
458 |res| async move {
459 match res {
460 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
461 Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting merge mining JSON-RPC server: {e}"),
462 }
463 },
464 Error::RpcServerStopped,
465 executor.clone(),
466 );
467 } else {
468 self.mm_rpc_task.clone().start(
470 async { Ok(()) },
471 |_| async { },
472 Error::RpcServerStopped,
473 executor.clone(),
474 );
475 }
476
477 info!(
478 target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
479 "DarkFi node miners registry started successfully!"
480 );
481
482 Ok(())
483 }
484
485 pub async fn stop(&self) {
487 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Terminating DarkFi node miners registry...");
488
489 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping Stratum JSON-RPC server...");
491 self.stratum_rpc_task.stop().await;
492
493 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping merge mining JSON-RPC server...");
495 self.mm_rpc_task.stop().await;
496
497 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "DarkFi node miners registry terminated successfully!");
498 }
499}