darkfid/registry/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use smol::lock::{Mutex, RwLock};
25use tinyjson::JsonValue;
26use tracing::{error, info};
27
28use darkfi::{
29    blockchain::BlockInfo,
30    rpc::{
31        jsonrpc::JsonSubscriber,
32        server::{listen_and_serve, RequestHandler},
33        settings::RpcSettings,
34    },
35    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
36    util::encoding::base64,
37    validator::{consensus::Proposal, ValidatorPtr},
38    Error, Result,
39};
40use darkfi_sdk::crypto::{keypair::Network, pasta_prelude::PrimeField};
41use darkfi_serial::serialize_async;
42
43use crate::{
44    proto::{DarkfidP2pHandlerPtr, ProposalMessage},
45    rpc::{stratum::StratumRpcHandler, xmr::MmRpcHandler},
46    DarkfiNode, DarkfiNodePtr,
47};
48
49/// Block related structures
50pub mod model;
51use model::{
52    generate_next_block_template, BlockTemplate, MinerClient, MinerRewardsRecipientConfig,
53    PowRewardV1Zk,
54};
55
56/// Atomic pointer to the DarkFi node miners registry.
57pub type DarkfiMinersRegistryPtr = Arc<DarkfiMinersRegistry>;
58
59/// DarkFi node miners registry.
60pub struct DarkfiMinersRegistry {
61    /// Blockchain network
62    pub network: Network,
63    /// PowRewardV1 ZK data
64    pub powrewardv1_zk: PowRewardV1Zk,
65    /// Mining block templates of each wallet config
66    pub block_templates: RwLock<HashMap<String, BlockTemplate>>,
67    /// Active native clients mapped to their job information.
68    /// This client information includes their wallet template key,
69    /// recipient configuration, current mining job key(job id) and
70    /// its connection publisher. For native jobs the job key is the
71    /// hex encoded header hash.
72    pub jobs: RwLock<HashMap<String, MinerClient>>,
73    /// Active merge mining jobs mapped to the wallet template they
74    /// represent. The key(job id) is the the header template hash.
75    pub mm_jobs: RwLock<HashMap<String, String>>,
76    /// Submission lock so we can queue up submissions process
77    pub submit_lock: RwLock<()>,
78    /// Stratum JSON-RPC background task
79    stratum_rpc_task: StoppableTaskPtr,
80    /// Stratum JSON-RPC connection tracker
81    pub stratum_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
82    /// HTTP JSON-RPC background task
83    mm_rpc_task: StoppableTaskPtr,
84    /// HTTP JSON-RPC connection tracker
85    pub mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
86}
87
88impl DarkfiMinersRegistry {
89    /// Initialize a DarkFi node miners registry.
90    pub fn init(network: Network, validator: &ValidatorPtr) -> Result<DarkfiMinersRegistryPtr> {
91        info!(
92            target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
93            "Initializing a new DarkFi node miners registry..."
94        );
95
96        // Generate the PowRewardV1 ZK data
97        let powrewardv1_zk = PowRewardV1Zk::new(validator)?;
98
99        // Generate the stratum JSON-RPC background task and its
100        // connections tracker.
101        let stratum_rpc_task = StoppableTask::new();
102        let stratum_rpc_connections = Mutex::new(HashSet::new());
103
104        // Generate the HTTP JSON-RPC background task and its
105        // connections tracker.
106        let mm_rpc_task = StoppableTask::new();
107        let mm_rpc_connections = Mutex::new(HashSet::new());
108
109        info!(
110            target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
111            "DarkFi node miners registry generated successfully!"
112        );
113
114        Ok(Arc::new(Self {
115            network,
116            powrewardv1_zk,
117            block_templates: RwLock::new(HashMap::new()),
118            jobs: RwLock::new(HashMap::new()),
119            mm_jobs: RwLock::new(HashMap::new()),
120            submit_lock: RwLock::new(()),
121            stratum_rpc_task,
122            stratum_rpc_connections,
123            mm_rpc_task,
124            mm_rpc_connections,
125        }))
126    }
127
128    /// Start the DarkFi node miners registry for provided DarkFi node
129    /// instance.
130    pub fn start(
131        &self,
132        executor: &ExecutorPtr,
133        node: &DarkfiNodePtr,
134        stratum_rpc_settings: &Option<RpcSettings>,
135        mm_rpc_settings: &Option<RpcSettings>,
136    ) -> Result<()> {
137        info!(
138            target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
139            "Starting the DarkFi node miners registry..."
140        );
141
142        // Start the stratum server JSON-RPC task
143        if let Some(stratum_rpc) = stratum_rpc_settings {
144            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting Stratum JSON-RPC server");
145            let node_ = node.clone();
146            self.stratum_rpc_task.clone().start(
147                listen_and_serve::<StratumRpcHandler>(stratum_rpc.clone(), node.clone(), None, executor.clone()),
148                |res| async move {
149                    match res {
150                        Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<StratumRpcHandler>>::stop_connections(&node_).await,
151                        Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting Stratum JSON-RPC server: {e}"),
152                    }
153                },
154                Error::RpcServerStopped,
155                executor.clone(),
156            );
157        } else {
158            // Create a dummy task
159            self.stratum_rpc_task.clone().start(
160                async { Ok(()) },
161                |_| async { /* Do nothing */ },
162                Error::RpcServerStopped,
163                executor.clone(),
164            );
165        }
166
167        // Start the merge mining JSON-RPC task
168        if let Some(mm_rpc) = mm_rpc_settings {
169            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting merge mining JSON-RPC server");
170            let node_ = node.clone();
171            self.mm_rpc_task.clone().start(
172                listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), node.clone(), None, executor.clone()),
173                |res| async move {
174                    match res {
175                        Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
176                        Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting merge mining JSON-RPC server: {e}"),
177                    }
178                },
179                Error::RpcServerStopped,
180                executor.clone(),
181            );
182        } else {
183            // Create a dummy task
184            self.mm_rpc_task.clone().start(
185                async { Ok(()) },
186                |_| async { /* Do nothing */ },
187                Error::RpcServerStopped,
188                executor.clone(),
189            );
190        }
191
192        info!(
193            target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
194            "DarkFi node miners registry started successfully!"
195        );
196
197        Ok(())
198    }
199
200    /// Stop the DarkFi node miners registry.
201    pub async fn stop(&self) {
202        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Terminating DarkFi node miners registry...");
203
204        // Stop the Stratum JSON-RPC task
205        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping Stratum JSON-RPC server...");
206        self.stratum_rpc_task.stop().await;
207
208        // Stop the merge mining JSON-RPC task
209        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping merge mining JSON-RPC server...");
210        self.mm_rpc_task.stop().await;
211
212        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "DarkFi node miners registry terminated successfully!");
213    }
214
215    /// Create a registry record for provided wallet config. If the
216    /// record already exists return its template, otherwise create its
217    /// current template based on provided validator state.
218    async fn create_template(
219        &self,
220        validator: &ValidatorPtr,
221        wallet: &String,
222        config: &MinerRewardsRecipientConfig,
223    ) -> Result<BlockTemplate> {
224        // Grab a lock over current templates
225        let mut block_templates = self.block_templates.write().await;
226
227        // Check if a template already exists for this wallet
228        if let Some(block_template) = block_templates.get(wallet) {
229            return Ok(block_template.clone())
230        }
231
232        // Grab validator best current fork
233        let mut extended_fork = validator.best_current_fork().await?;
234
235        // Generate the next block template
236        let result = generate_next_block_template(
237            &mut extended_fork,
238            config,
239            &self.powrewardv1_zk.zkbin,
240            &self.powrewardv1_zk.provingkey,
241            validator.verify_fees,
242        )
243        .await;
244
245        // Drop new trees opened by the forks' overlay
246        extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
247
248        // Check result
249        let block_template = result?;
250
251        // Create the new registry record
252        block_templates.insert(wallet.clone(), block_template.clone());
253
254        // Print the new template wallet information
255        let recipient_str = format!("{}", config.recipient);
256        let spend_hook_str = match config.spend_hook {
257            Some(spend_hook) => format!("{spend_hook}"),
258            None => String::from("-"),
259        };
260        let user_data_str = match config.user_data {
261            Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
262            None => String::from("-"),
263        };
264        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
265            "Created new block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
266        );
267
268        Ok(block_template)
269    }
270
271    /// Register a new miner and create its job.
272    pub async fn register_miner(
273        &self,
274        validator: &ValidatorPtr,
275        wallet: &String,
276        config: &MinerRewardsRecipientConfig,
277    ) -> Result<(String, String, JsonValue, JsonSubscriber)> {
278        // Grab a lock over current native jobs
279        let mut jobs = self.jobs.write().await;
280
281        // Create wallet template
282        let block_template = self.create_template(validator, wallet, config).await?;
283
284        // Grab the hex encoded block hash and create the client job record
285        let (job_id, job) = block_template.job_notification();
286        let (client_id, client) = MinerClient::new(wallet, config, &job_id);
287        let publisher = client.publisher.clone();
288        jobs.insert(client_id.clone(), client);
289
290        Ok((client_id, job_id, job, publisher))
291    }
292
293    /// Register a new merge miner and create its job.
294    pub async fn register_merge_miner(
295        &self,
296        validator: &ValidatorPtr,
297        wallet: &String,
298        config: &MinerRewardsRecipientConfig,
299    ) -> Result<(String, f64)> {
300        // Grab a lock over current mm jobs
301        let mut jobs = self.mm_jobs.write().await;
302
303        // Create wallet template
304        let block_template = self.create_template(validator, wallet, config).await?;
305
306        // Grab the block template hash and its difficulty, and then
307        // create the job record.
308        let block_template_hash = block_template.block.header.template_hash().as_string();
309        let difficulty = block_template.difficulty;
310        jobs.insert(block_template_hash.clone(), wallet.clone());
311
312        Ok((block_template_hash, difficulty))
313    }
314
315    /// Submit provided block to the provided node.
316    pub async fn submit(
317        &self,
318        validator: &ValidatorPtr,
319        subscribers: &HashMap<&'static str, JsonSubscriber>,
320        p2p_handler: &DarkfidP2pHandlerPtr,
321        block: BlockInfo,
322    ) -> Result<()> {
323        let proposal = Proposal::new(block);
324        validator.append_proposal(&proposal).await?;
325
326        info!(
327            target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
328            "Proposing new block to network",
329        );
330
331        let proposals_sub = subscribers.get("proposals").unwrap();
332        let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
333        proposals_sub.notify(vec![enc_prop].into()).await;
334
335        info!(
336            target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
337            "Broadcasting new block to network",
338        );
339        let message = ProposalMessage(proposal);
340        p2p_handler.p2p.broadcast(&message).await;
341
342        Ok(())
343    }
344
345    /// Refresh outdated jobs in the provided registry maps based on
346    /// provided validator state.
347    pub async fn refresh_jobs(
348        &self,
349        block_templates: &mut HashMap<String, BlockTemplate>,
350        jobs: &mut HashMap<String, MinerClient>,
351        mm_jobs: &mut HashMap<String, String>,
352        validator: &ValidatorPtr,
353    ) -> Result<()> {
354        // Find inactive native jobs and drop them
355        let mut dropped_jobs = vec![];
356        let mut active_templates = HashSet::new();
357        for (client_id, client) in jobs.iter() {
358            // Clear inactive client publisher subscribers. If none
359            // exists afterwards, the client is considered inactive so
360            // we mark it for drop.
361            if client.publisher.publisher.clear_inactive().await {
362                dropped_jobs.push(client_id.clone());
363                continue
364            }
365
366            // Mark client block template as active
367            active_templates.insert(client.wallet.clone());
368        }
369        jobs.retain(|client_id, _| !dropped_jobs.contains(client_id));
370
371        // Grab validator best current fork and its last proposal for
372        // checks.
373        let extended_fork = validator.best_current_fork().await?;
374        let last_proposal = extended_fork.last_proposal()?.hash;
375
376        // Find mm jobs not extending the best current fork and drop
377        // them.
378        let mut dropped_mm_jobs = vec![];
379        for (job_id, wallet) in mm_jobs.iter() {
380            // Grab its wallet template. Its safe to unwrap here since
381            // we know the job exists.
382            let block_template = block_templates.get(wallet).unwrap();
383
384            // Check if it extends current best fork
385            if block_template.block.header.previous == last_proposal {
386                active_templates.insert(wallet.clone());
387                continue
388            }
389
390            // This mm job doesn't extend current best fork so we mark
391            // it for drop.
392            dropped_mm_jobs.push(job_id.clone());
393        }
394        mm_jobs.retain(|job_id, _| !dropped_mm_jobs.contains(job_id));
395
396        // Drop inactive templates. Merge miners will create a new
397        // template and job on next poll.
398        block_templates.retain(|wallet, _| active_templates.contains(wallet));
399
400        // Return if no wallets templates exists.
401        if block_templates.is_empty() {
402            return Ok(())
403        }
404
405        // Iterate over active clients to refresh their jobs, if needed
406        for (_, client) in jobs.iter_mut() {
407            // Grab its wallet template. Its safe to unwrap here since
408            // we know the job exists.
409            let block_template = block_templates.get_mut(&client.wallet).unwrap();
410
411            // Check if it extends current best fork
412            if block_template.block.header.previous == last_proposal {
413                continue
414            }
415
416            // Clone the fork so each client generates over a new one
417            let mut extended_fork = extended_fork.full_clone()?;
418
419            // Generate the next block template
420            let result = generate_next_block_template(
421                &mut extended_fork,
422                &client.config,
423                &self.powrewardv1_zk.zkbin,
424                &self.powrewardv1_zk.provingkey,
425                validator.verify_fees,
426            )
427            .await;
428
429            // Drop new trees opened by the forks' overlay
430            extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
431
432            // Check result
433            *block_template = result?;
434
435            // Print the updated template wallet information
436            let recipient_str = format!("{}", client.config.recipient);
437            let spend_hook_str = match client.config.spend_hook {
438                Some(spend_hook) => format!("{spend_hook}"),
439                None => String::from("-"),
440            };
441            let user_data_str = match client.config.user_data {
442                Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
443                None => String::from("-"),
444            };
445            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
446                "Updated block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
447            );
448
449            // Create the new job notification
450            let (job, notification) = block_template.job_notification();
451
452            // Update the client record
453            client.job = job;
454
455            // Push job notification to subscriber
456            client.publisher.notify(notification).await;
457        }
458
459        Ok(())
460    }
461
462    /// Refresh outdated jobs in the registry based on provided
463    /// validator state.
464    pub async fn refresh(&self, validator: &ValidatorPtr) -> Result<()> {
465        // Grab registry locks
466        let submit_lock = self.submit_lock.write().await;
467        let mut block_templates = self.block_templates.write().await;
468        let mut jobs = self.jobs.write().await;
469        let mut mm_jobs = self.mm_jobs.write().await;
470
471        // Refresh jobs
472        self.refresh_jobs(&mut block_templates, &mut jobs, &mut mm_jobs, validator).await?;
473
474        // Release registry locks
475        drop(block_templates);
476        drop(jobs);
477        drop(mm_jobs);
478        drop(submit_lock);
479
480        Ok(())
481    }
482}