1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use smol::lock::{Mutex, RwLock};
25use tinyjson::JsonValue;
26use tracing::{error, info};
27
28use darkfi::{
29 blockchain::BlockInfo,
30 rpc::{
31 jsonrpc::JsonSubscriber,
32 server::{listen_and_serve, RequestHandler},
33 settings::RpcSettings,
34 },
35 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
36 util::encoding::base64,
37 validator::{consensus::Proposal, ValidatorPtr},
38 Error, Result,
39};
40use darkfi_sdk::crypto::{keypair::Network, pasta_prelude::PrimeField};
41use darkfi_serial::serialize_async;
42
43use crate::{
44 proto::{DarkfidP2pHandlerPtr, ProposalMessage},
45 rpc::{stratum::StratumRpcHandler, xmr::MmRpcHandler},
46 DarkfiNode, DarkfiNodePtr,
47};
48
49pub mod model;
51use model::{
52 generate_next_block_template, BlockTemplate, MinerClient, MinerRewardsRecipientConfig,
53 PowRewardV1Zk,
54};
55
56pub type DarkfiMinersRegistryPtr = Arc<DarkfiMinersRegistry>;
58
59pub struct DarkfiMinersRegistry {
61 pub network: Network,
63 pub powrewardv1_zk: PowRewardV1Zk,
65 pub block_templates: RwLock<HashMap<String, BlockTemplate>>,
67 pub jobs: RwLock<HashMap<String, MinerClient>>,
73 pub mm_jobs: RwLock<HashMap<String, String>>,
76 pub submit_lock: RwLock<()>,
78 stratum_rpc_task: StoppableTaskPtr,
80 pub stratum_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
82 mm_rpc_task: StoppableTaskPtr,
84 pub mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
86}
87
88impl DarkfiMinersRegistry {
89 pub fn init(network: Network, validator: &ValidatorPtr) -> Result<DarkfiMinersRegistryPtr> {
91 info!(
92 target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
93 "Initializing a new DarkFi node miners registry..."
94 );
95
96 let powrewardv1_zk = PowRewardV1Zk::new(validator)?;
98
99 let stratum_rpc_task = StoppableTask::new();
102 let stratum_rpc_connections = Mutex::new(HashSet::new());
103
104 let mm_rpc_task = StoppableTask::new();
107 let mm_rpc_connections = Mutex::new(HashSet::new());
108
109 info!(
110 target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
111 "DarkFi node miners registry generated successfully!"
112 );
113
114 Ok(Arc::new(Self {
115 network,
116 powrewardv1_zk,
117 block_templates: RwLock::new(HashMap::new()),
118 jobs: RwLock::new(HashMap::new()),
119 mm_jobs: RwLock::new(HashMap::new()),
120 submit_lock: RwLock::new(()),
121 stratum_rpc_task,
122 stratum_rpc_connections,
123 mm_rpc_task,
124 mm_rpc_connections,
125 }))
126 }
127
128 pub fn start(
131 &self,
132 executor: &ExecutorPtr,
133 node: &DarkfiNodePtr,
134 stratum_rpc_settings: &Option<RpcSettings>,
135 mm_rpc_settings: &Option<RpcSettings>,
136 ) -> Result<()> {
137 info!(
138 target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
139 "Starting the DarkFi node miners registry..."
140 );
141
142 if let Some(stratum_rpc) = stratum_rpc_settings {
144 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting Stratum JSON-RPC server");
145 let node_ = node.clone();
146 self.stratum_rpc_task.clone().start(
147 listen_and_serve::<StratumRpcHandler>(stratum_rpc.clone(), node.clone(), None, executor.clone()),
148 |res| async move {
149 match res {
150 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<StratumRpcHandler>>::stop_connections(&node_).await,
151 Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting Stratum JSON-RPC server: {e}"),
152 }
153 },
154 Error::RpcServerStopped,
155 executor.clone(),
156 );
157 } else {
158 self.stratum_rpc_task.clone().start(
160 async { Ok(()) },
161 |_| async { },
162 Error::RpcServerStopped,
163 executor.clone(),
164 );
165 }
166
167 if let Some(mm_rpc) = mm_rpc_settings {
169 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting merge mining JSON-RPC server");
170 let node_ = node.clone();
171 self.mm_rpc_task.clone().start(
172 listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), node.clone(), None, executor.clone()),
173 |res| async move {
174 match res {
175 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
176 Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting merge mining JSON-RPC server: {e}"),
177 }
178 },
179 Error::RpcServerStopped,
180 executor.clone(),
181 );
182 } else {
183 self.mm_rpc_task.clone().start(
185 async { Ok(()) },
186 |_| async { },
187 Error::RpcServerStopped,
188 executor.clone(),
189 );
190 }
191
192 info!(
193 target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
194 "DarkFi node miners registry started successfully!"
195 );
196
197 Ok(())
198 }
199
200 pub async fn stop(&self) {
202 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Terminating DarkFi node miners registry...");
203
204 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping Stratum JSON-RPC server...");
206 self.stratum_rpc_task.stop().await;
207
208 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping merge mining JSON-RPC server...");
210 self.mm_rpc_task.stop().await;
211
212 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "DarkFi node miners registry terminated successfully!");
213 }
214
215 async fn create_template(
219 &self,
220 validator: &ValidatorPtr,
221 wallet: &String,
222 config: &MinerRewardsRecipientConfig,
223 ) -> Result<BlockTemplate> {
224 let mut block_templates = self.block_templates.write().await;
226
227 if let Some(block_template) = block_templates.get(wallet) {
229 return Ok(block_template.clone())
230 }
231
232 let mut extended_fork = validator.best_current_fork().await?;
234
235 let result = generate_next_block_template(
237 &mut extended_fork,
238 config,
239 &self.powrewardv1_zk.zkbin,
240 &self.powrewardv1_zk.provingkey,
241 validator.verify_fees,
242 )
243 .await;
244
245 extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
247
248 let block_template = result?;
250
251 block_templates.insert(wallet.clone(), block_template.clone());
253
254 let recipient_str = format!("{}", config.recipient);
256 let spend_hook_str = match config.spend_hook {
257 Some(spend_hook) => format!("{spend_hook}"),
258 None => String::from("-"),
259 };
260 let user_data_str = match config.user_data {
261 Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
262 None => String::from("-"),
263 };
264 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
265 "Created new block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
266 );
267
268 Ok(block_template)
269 }
270
271 pub async fn register_miner(
273 &self,
274 validator: &ValidatorPtr,
275 wallet: &String,
276 config: &MinerRewardsRecipientConfig,
277 ) -> Result<(String, String, JsonValue, JsonSubscriber)> {
278 let mut jobs = self.jobs.write().await;
280
281 let block_template = self.create_template(validator, wallet, config).await?;
283
284 let (job_id, job) = block_template.job_notification();
286 let (client_id, client) = MinerClient::new(wallet, config, &job_id);
287 let publisher = client.publisher.clone();
288 jobs.insert(client_id.clone(), client);
289
290 Ok((client_id, job_id, job, publisher))
291 }
292
293 pub async fn register_merge_miner(
295 &self,
296 validator: &ValidatorPtr,
297 wallet: &String,
298 config: &MinerRewardsRecipientConfig,
299 ) -> Result<(String, f64)> {
300 let mut jobs = self.mm_jobs.write().await;
302
303 let block_template = self.create_template(validator, wallet, config).await?;
305
306 let block_template_hash = block_template.block.header.template_hash().as_string();
309 let difficulty = block_template.difficulty;
310 jobs.insert(block_template_hash.clone(), wallet.clone());
311
312 Ok((block_template_hash, difficulty))
313 }
314
315 pub async fn submit(
317 &self,
318 validator: &ValidatorPtr,
319 subscribers: &HashMap<&'static str, JsonSubscriber>,
320 p2p_handler: &DarkfidP2pHandlerPtr,
321 block: BlockInfo,
322 ) -> Result<()> {
323 let proposal = Proposal::new(block);
324 validator.append_proposal(&proposal).await?;
325
326 info!(
327 target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
328 "Proposing new block to network",
329 );
330
331 let proposals_sub = subscribers.get("proposals").unwrap();
332 let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
333 proposals_sub.notify(vec![enc_prop].into()).await;
334
335 info!(
336 target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
337 "Broadcasting new block to network",
338 );
339 let message = ProposalMessage(proposal);
340 p2p_handler.p2p.broadcast(&message).await;
341
342 Ok(())
343 }
344
345 pub async fn refresh_jobs(
348 &self,
349 block_templates: &mut HashMap<String, BlockTemplate>,
350 jobs: &mut HashMap<String, MinerClient>,
351 mm_jobs: &mut HashMap<String, String>,
352 validator: &ValidatorPtr,
353 ) -> Result<()> {
354 let mut dropped_jobs = vec![];
356 let mut active_templates = HashSet::new();
357 for (client_id, client) in jobs.iter() {
358 if client.publisher.publisher.clear_inactive().await {
362 dropped_jobs.push(client_id.clone());
363 continue
364 }
365
366 active_templates.insert(client.wallet.clone());
368 }
369 jobs.retain(|client_id, _| !dropped_jobs.contains(client_id));
370
371 let extended_fork = validator.best_current_fork().await?;
374 let last_proposal = extended_fork.last_proposal()?.hash;
375
376 let mut dropped_mm_jobs = vec![];
379 for (job_id, wallet) in mm_jobs.iter() {
380 let block_template = block_templates.get(wallet).unwrap();
383
384 if block_template.block.header.previous == last_proposal {
386 active_templates.insert(wallet.clone());
387 continue
388 }
389
390 dropped_mm_jobs.push(job_id.clone());
393 }
394 mm_jobs.retain(|job_id, _| !dropped_mm_jobs.contains(job_id));
395
396 block_templates.retain(|wallet, _| active_templates.contains(wallet));
399
400 if block_templates.is_empty() {
402 return Ok(())
403 }
404
405 for (_, client) in jobs.iter_mut() {
407 let block_template = block_templates.get_mut(&client.wallet).unwrap();
410
411 if block_template.block.header.previous == last_proposal {
413 continue
414 }
415
416 let mut extended_fork = extended_fork.full_clone()?;
418
419 let result = generate_next_block_template(
421 &mut extended_fork,
422 &client.config,
423 &self.powrewardv1_zk.zkbin,
424 &self.powrewardv1_zk.provingkey,
425 validator.verify_fees,
426 )
427 .await;
428
429 extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
431
432 *block_template = result?;
434
435 let recipient_str = format!("{}", client.config.recipient);
437 let spend_hook_str = match client.config.spend_hook {
438 Some(spend_hook) => format!("{spend_hook}"),
439 None => String::from("-"),
440 };
441 let user_data_str = match client.config.user_data {
442 Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
443 None => String::from("-"),
444 };
445 info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
446 "Updated block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
447 );
448
449 let (job, notification) = block_template.job_notification();
451
452 client.job = job;
454
455 client.publisher.notify(notification).await;
457 }
458
459 Ok(())
460 }
461
462 pub async fn refresh(&self, validator: &ValidatorPtr) -> Result<()> {
465 let submit_lock = self.submit_lock.write().await;
467 let mut block_templates = self.block_templates.write().await;
468 let mut jobs = self.jobs.write().await;
469 let mut mm_jobs = self.mm_jobs.write().await;
470
471 self.refresh_jobs(&mut block_templates, &mut jobs, &mut mm_jobs, validator).await?;
473
474 drop(block_templates);
476 drop(jobs);
477 drop(mm_jobs);
478 drop(submit_lock);
479
480 Ok(())
481 }
482}