darkfid/registry/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{BTreeSet, HashMap, HashSet},
21    sync::Arc,
22};
23
24use sled_overlay::sled::IVec;
25use smol::lock::{Mutex, RwLock};
26use tinyjson::JsonValue;
27use tracing::{error, info};
28
29use darkfi::{
30    blockchain::BlockInfo,
31    rpc::{
32        jsonrpc::JsonSubscriber,
33        server::{listen_and_serve, RequestHandler},
34        settings::RpcSettings,
35    },
36    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
37    util::encoding::base64,
38    validator::{consensus::Proposal, ValidatorPtr},
39    Error, Result,
40};
41use darkfi_sdk::{
42    crypto::{keypair::Network, pasta_prelude::PrimeField},
43    tx::TransactionHash,
44};
45use darkfi_serial::serialize_async;
46
47use crate::{
48    proto::{DarkfidP2pHandlerPtr, ProposalMessage},
49    rpc::{stratum::StratumRpcHandler, xmr::MmRpcHandler},
50    DarkfiNode, DarkfiNodePtr,
51};
52
53/// Block related structures
54pub mod model;
55use model::{
56    generate_next_block_template, BlockTemplate, MinerClient, MinerRewardsRecipientConfig,
57    PowRewardV1Zk,
58};
59
60/// Atomic pointer to the DarkFi node miners registry.
61pub type DarkfiMinersRegistryPtr = Arc<DarkfiMinersRegistry>;
62
63/// DarkFi node miners registry.
64pub struct DarkfiMinersRegistry {
65    /// Blockchain network
66    pub network: Network,
67    /// PowRewardV1 ZK data
68    pub powrewardv1_zk: PowRewardV1Zk,
69    /// Mining block templates of each wallet config
70    pub block_templates: RwLock<HashMap<String, BlockTemplate>>,
71    /// Active native clients mapped to their job information.
72    /// This client information includes their wallet template key,
73    /// recipient configuration, current mining job key(job id) and
74    /// its connection publisher. For native jobs the job key is the
75    /// hex encoded header hash.
76    pub jobs: RwLock<HashMap<String, MinerClient>>,
77    /// Active merge mining jobs mapped to the wallet template they
78    /// represent. The key(job id) is the the header template hash.
79    pub mm_jobs: RwLock<HashMap<String, String>>,
80    /// Submission lock so we can queue up submissions process
81    pub submit_lock: RwLock<()>,
82    /// Stratum JSON-RPC background task
83    stratum_rpc_task: StoppableTaskPtr,
84    /// Stratum JSON-RPC connection tracker
85    pub stratum_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
86    /// HTTP JSON-RPC background task
87    mm_rpc_task: StoppableTaskPtr,
88    /// HTTP JSON-RPC connection tracker
89    pub mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
90}
91
92impl DarkfiMinersRegistry {
93    /// Initialize a DarkFi node miners registry.
94    pub fn init(network: Network, validator: &ValidatorPtr) -> Result<DarkfiMinersRegistryPtr> {
95        info!(
96            target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
97            "Initializing a new DarkFi node miners registry..."
98        );
99
100        // Generate the PowRewardV1 ZK data
101        let powrewardv1_zk = PowRewardV1Zk::new(validator)?;
102
103        // Generate the stratum JSON-RPC background task and its
104        // connections tracker.
105        let stratum_rpc_task = StoppableTask::new();
106        let stratum_rpc_connections = Mutex::new(HashSet::new());
107
108        // Generate the HTTP JSON-RPC background task and its
109        // connections tracker.
110        let mm_rpc_task = StoppableTask::new();
111        let mm_rpc_connections = Mutex::new(HashSet::new());
112
113        info!(
114            target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
115            "DarkFi node miners registry generated successfully!"
116        );
117
118        Ok(Arc::new(Self {
119            network,
120            powrewardv1_zk,
121            block_templates: RwLock::new(HashMap::new()),
122            jobs: RwLock::new(HashMap::new()),
123            mm_jobs: RwLock::new(HashMap::new()),
124            submit_lock: RwLock::new(()),
125            stratum_rpc_task,
126            stratum_rpc_connections,
127            mm_rpc_task,
128            mm_rpc_connections,
129        }))
130    }
131
132    /// Start the DarkFi node miners registry for provided DarkFi node
133    /// instance.
134    pub fn start(
135        &self,
136        executor: &ExecutorPtr,
137        node: &DarkfiNodePtr,
138        stratum_rpc_settings: &Option<RpcSettings>,
139        mm_rpc_settings: &Option<RpcSettings>,
140    ) -> Result<()> {
141        info!(
142            target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
143            "Starting the DarkFi node miners registry..."
144        );
145
146        // Start the stratum server JSON-RPC task
147        if let Some(stratum_rpc) = stratum_rpc_settings {
148            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting Stratum JSON-RPC server");
149            let node_ = node.clone();
150            self.stratum_rpc_task.clone().start(
151                listen_and_serve::<StratumRpcHandler>(stratum_rpc.clone(), node.clone(), None, executor.clone()),
152                |res| async move {
153                    match res {
154                        Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<StratumRpcHandler>>::stop_connections(&node_).await,
155                        Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting Stratum JSON-RPC server: {e}"),
156                    }
157                },
158                Error::RpcServerStopped,
159                executor.clone(),
160            );
161        } else {
162            // Create a dummy task
163            self.stratum_rpc_task.clone().start(
164                async { Ok(()) },
165                |_| async { /* Do nothing */ },
166                Error::RpcServerStopped,
167                executor.clone(),
168            );
169        }
170
171        // Start the merge mining JSON-RPC task
172        if let Some(mm_rpc) = mm_rpc_settings {
173            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting merge mining JSON-RPC server");
174            let node_ = node.clone();
175            self.mm_rpc_task.clone().start(
176                listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), node.clone(), None, executor.clone()),
177                |res| async move {
178                    match res {
179                        Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
180                        Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting merge mining JSON-RPC server: {e}"),
181                    }
182                },
183                Error::RpcServerStopped,
184                executor.clone(),
185            );
186        } else {
187            // Create a dummy task
188            self.mm_rpc_task.clone().start(
189                async { Ok(()) },
190                |_| async { /* Do nothing */ },
191                Error::RpcServerStopped,
192                executor.clone(),
193            );
194        }
195
196        info!(
197            target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
198            "DarkFi node miners registry started successfully!"
199        );
200
201        Ok(())
202    }
203
204    /// Stop the DarkFi node miners registry.
205    pub async fn stop(&self) {
206        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Terminating DarkFi node miners registry...");
207
208        // Stop the Stratum JSON-RPC task
209        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping Stratum JSON-RPC server...");
210        self.stratum_rpc_task.stop().await;
211
212        // Stop the merge mining JSON-RPC task
213        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping merge mining JSON-RPC server...");
214        self.mm_rpc_task.stop().await;
215
216        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "DarkFi node miners registry terminated successfully!");
217    }
218
219    /// Create a registry record for provided wallet config. If the
220    /// record already exists return its template, otherwise create its
221    /// current template based on provided validator state.
222    ///
223    /// Note: Always remember to purge new trees from the database if
224    /// not needed.
225    async fn create_template(
226        &self,
227        validator: &ValidatorPtr,
228        wallet: &String,
229        config: &MinerRewardsRecipientConfig,
230    ) -> Result<BlockTemplate> {
231        // Grab a lock over current templates
232        let mut block_templates = self.block_templates.write().await;
233
234        // Check if a template already exists for this wallet
235        if let Some(block_template) = block_templates.get(wallet) {
236            return Ok(block_template.clone())
237        }
238
239        // Grab validator best current fork
240        let mut extended_fork = validator.best_current_fork().await?;
241
242        // Generate the next block template
243        let block_template = generate_next_block_template(
244            &mut extended_fork,
245            config,
246            &self.powrewardv1_zk.zkbin,
247            &self.powrewardv1_zk.provingkey,
248            validator.verify_fees,
249        )
250        .await?;
251
252        // Create the new registry record
253        block_templates.insert(wallet.clone(), block_template.clone());
254
255        // Print the new template wallet information
256        let recipient_str = format!("{}", config.recipient);
257        let spend_hook_str = match config.spend_hook {
258            Some(spend_hook) => format!("{spend_hook}"),
259            None => String::from("-"),
260        };
261        let user_data_str = match config.user_data {
262            Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
263            None => String::from("-"),
264        };
265        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
266            "Created new block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
267        );
268
269        Ok(block_template)
270    }
271
272    /// Register a new miner and create its job.
273    pub async fn register_miner(
274        &self,
275        validator: &ValidatorPtr,
276        wallet: &String,
277        config: &MinerRewardsRecipientConfig,
278    ) -> Result<(String, String, JsonValue, JsonSubscriber)> {
279        // Grab a lock over current native jobs
280        let mut jobs = self.jobs.write().await;
281
282        // Create wallet template
283        let block_template = self.create_template(validator, wallet, config).await?;
284
285        // Grab the hex encoded block hash and create the client job record
286        let (job_id, job) = block_template.job_notification();
287        let (client_id, client) = MinerClient::new(wallet, config, &job_id);
288        let publisher = client.publisher.clone();
289        jobs.insert(client_id.clone(), client);
290
291        Ok((client_id, job_id, job, publisher))
292    }
293
294    /// Register a new merge miner and create its job.
295    pub async fn register_merge_miner(
296        &self,
297        validator: &ValidatorPtr,
298        wallet: &String,
299        config: &MinerRewardsRecipientConfig,
300    ) -> Result<(String, f64)> {
301        // Grab a lock over current mm jobs
302        let mut jobs = self.mm_jobs.write().await;
303
304        // Create wallet template
305        let block_template = self.create_template(validator, wallet, config).await?;
306
307        // Grab the block template hash and its difficulty, and then
308        // create the job record.
309        let block_template_hash = block_template.block.header.template_hash().as_string();
310        let difficulty = block_template.difficulty;
311        jobs.insert(block_template_hash.clone(), wallet.clone());
312
313        Ok((block_template_hash, difficulty))
314    }
315
316    /// Submit provided block to the provided node.
317    pub async fn submit(
318        &self,
319        validator: &ValidatorPtr,
320        subscribers: &HashMap<&'static str, JsonSubscriber>,
321        p2p_handler: &DarkfidP2pHandlerPtr,
322        block: BlockInfo,
323    ) -> Result<()> {
324        let proposal = Proposal::new(block);
325        validator.append_proposal(&proposal).await?;
326
327        info!(
328            target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
329            "Proposing new block to network",
330        );
331
332        let proposals_sub = subscribers.get("proposals").unwrap();
333        let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
334        proposals_sub.notify(vec![enc_prop].into()).await;
335
336        info!(
337            target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
338            "Broadcasting new block to network",
339        );
340        let message = ProposalMessage(proposal);
341        p2p_handler.p2p.broadcast(&message).await;
342
343        Ok(())
344    }
345
346    /// Refresh outdated jobs in the provided registry maps based on
347    /// provided validator state.
348    ///
349    /// Note: Always remember to purge new trees from the database if
350    /// not needed.
351    pub async fn refresh_jobs(
352        &self,
353        block_templates: &mut HashMap<String, BlockTemplate>,
354        jobs: &mut HashMap<String, MinerClient>,
355        mm_jobs: &mut HashMap<String, String>,
356        validator: &ValidatorPtr,
357    ) -> Result<()> {
358        // Find inactive native jobs and drop them
359        let mut dropped_jobs = vec![];
360        let mut active_templates = HashSet::new();
361        for (client_id, client) in jobs.iter() {
362            // Clear inactive client publisher subscribers. If none
363            // exists afterwards, the client is considered inactive so
364            // we mark it for drop.
365            if client.publisher.publisher.clear_inactive().await {
366                dropped_jobs.push(client_id.clone());
367                continue
368            }
369
370            // Mark client block template as active
371            active_templates.insert(client.wallet.clone());
372        }
373        jobs.retain(|client_id, _| !dropped_jobs.contains(client_id));
374
375        // Grab validator best current fork and its last proposal for
376        // checks.
377        let extended_fork = validator.best_current_fork().await?;
378        let last_proposal = extended_fork.last_proposal()?.hash;
379
380        // Find mm jobs not extending the best current fork and drop
381        // them.
382        let mut dropped_mm_jobs = vec![];
383        for (job_id, wallet) in mm_jobs.iter() {
384            // Grab its wallet template. Its safe to unwrap here since
385            // we know the job exists.
386            let block_template = block_templates.get(wallet).unwrap();
387
388            // Check if it extends current best fork
389            if block_template.block.header.previous == last_proposal {
390                active_templates.insert(wallet.clone());
391                continue
392            }
393
394            // This mm job doesn't extend current best fork so we mark
395            // it for drop.
396            dropped_mm_jobs.push(job_id.clone());
397        }
398        mm_jobs.retain(|job_id, _| !dropped_mm_jobs.contains(job_id));
399
400        // Drop inactive templates. Merge miners will create a new
401        // template and job on next poll.
402        block_templates.retain(|wallet, _| active_templates.contains(wallet));
403
404        // Return if no wallets templates exists.
405        if block_templates.is_empty() {
406            return Ok(())
407        }
408
409        // Iterate over active clients to refresh their jobs, if needed
410        for (job_id, client) in jobs.iter_mut() {
411            // Grab its wallet template. Its safe to unwrap here since
412            // we know the job exists.
413            let block_template = block_templates.get_mut(&client.wallet).unwrap();
414
415            // Check if it extends current best fork
416            if block_template.block.header.previous == last_proposal {
417                continue
418            }
419
420            // Clone the fork so each client generates over a new one
421            let mut extended_fork = extended_fork.full_clone()?;
422
423            // Generate the next block template
424            let result = generate_next_block_template(
425                &mut extended_fork,
426                &client.config,
427                &self.powrewardv1_zk.zkbin,
428                &self.powrewardv1_zk.provingkey,
429                validator.verify_fees,
430            )
431            .await;
432
433            // Check result
434            *block_template = match result {
435                Ok(b) => b,
436                Err(e) => {
437                    error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
438                        "Updating block template for job {job_id} failed: {e}",
439                    );
440                    // Mark block template as not submitted so the
441                    // miner can submit another one and don't get stuck
442                    block_template.submitted = false;
443                    continue;
444                }
445            };
446
447            // Print the updated template wallet information
448            let recipient_str = format!("{}", client.config.recipient);
449            let spend_hook_str = match client.config.spend_hook {
450                Some(spend_hook) => format!("{spend_hook}"),
451                None => String::from("-"),
452            };
453            let user_data_str = match client.config.user_data {
454                Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
455                None => String::from("-"),
456            };
457            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
458                "Updated block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
459            );
460
461            // Create the new job notification
462            let (job, notification) = block_template.job_notification();
463
464            // Update the client record
465            client.job = job;
466
467            // Push job notification to subscriber
468            client.publisher.notify(notification).await;
469        }
470
471        Ok(())
472    }
473
474    /// Refresh outdated jobs in the registry based on provided
475    /// validator state.
476    pub async fn refresh(&self, validator: &ValidatorPtr) -> Result<()> {
477        // Grab registry locks
478        let submit_lock = self.submit_lock.write().await;
479        let mut block_templates = self.block_templates.write().await;
480        let mut jobs = self.jobs.write().await;
481        let mut mm_jobs = self.mm_jobs.write().await;
482
483        // Refresh jobs
484        self.refresh_jobs(&mut block_templates, &mut jobs, &mut mm_jobs, validator).await?;
485
486        // Release registry locks
487        drop(block_templates);
488        drop(jobs);
489        drop(mm_jobs);
490        drop(submit_lock);
491
492        Ok(())
493    }
494
495    /// Auxilliary function to retrieve all current block templates
496    /// newly opened trees.
497    pub fn new_trees(&self, block_templates: &HashMap<String, BlockTemplate>) -> BTreeSet<IVec> {
498        let mut new_trees = BTreeSet::new();
499        for block_template in block_templates.values() {
500            for new_tree in &block_template.new_trees {
501                new_trees.insert(new_tree.clone());
502            }
503        }
504        new_trees
505    }
506
507    /// Auxilliary function to retrieve all current block templates
508    /// transactions hashes.
509    pub fn proposed_transactions(
510        &self,
511        block_templates: &HashMap<String, BlockTemplate>,
512    ) -> HashSet<TransactionHash> {
513        let mut proposed_txs = HashSet::new();
514        for block_template in block_templates.values() {
515            for tx in &block_template.block.txs {
516                proposed_txs.insert(tx.hash());
517            }
518        }
519        proposed_txs
520    }
521}