darkfid/registry/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{BTreeSet, HashMap, HashSet},
21    sync::Arc,
22};
23
24use sled_overlay::sled::IVec;
25use smol::lock::{Mutex, RwLock};
26use tinyjson::JsonValue;
27use tracing::{error, info};
28
29use darkfi::{
30    blockchain::BlockInfo,
31    rpc::{
32        jsonrpc::JsonSubscriber,
33        server::{listen_and_serve, RequestHandler},
34        settings::RpcSettings,
35    },
36    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
37    util::encoding::base64,
38    validator::{consensus::Proposal, Validator, ValidatorPtr},
39    Error, Result,
40};
41use darkfi_sdk::{
42    crypto::{keypair::Network, pasta_prelude::PrimeField},
43    tx::TransactionHash,
44};
45use darkfi_serial::serialize_async;
46
47use crate::{
48    proto::{DarkfidP2pHandlerPtr, ProposalMessage},
49    rpc::{stratum::StratumRpcHandler, xmr::MmRpcHandler},
50    DarkfiNode, DarkfiNodePtr,
51};
52
53/// Block related structures
54pub mod model;
55use model::{
56    generate_next_block_template, BlockTemplate, MinerClient, MinerRewardsRecipientConfig,
57    PowRewardV1Zk,
58};
59
60/// Atomic pointer to the DarkFi node miners registry state.
61pub type DarkfiMinersRegistryStatePtr = Arc<RwLock<DarkfiMinersRegistryState>>;
62
63/// DarkFi node miners registry state.
64pub struct DarkfiMinersRegistryState {
65    /// PowRewardV1 ZK data
66    pub powrewardv1_zk: PowRewardV1Zk,
67    /// Mining block templates of each wallet config
68    pub block_templates: HashMap<String, BlockTemplate>,
69    /// Active native clients mapped to their job information.
70    /// This client information includes their wallet template key,
71    /// recipient configuration, current mining job key(job id) and
72    /// its connection publisher. For native jobs the job key is the
73    /// hex encoded header hash.
74    pub jobs: HashMap<String, MinerClient>,
75    /// Active merge mining jobs mapped to the wallet template they
76    /// represent. The key(job id) is the the header template hash.
77    pub mm_jobs: HashMap<String, String>,
78}
79
80impl DarkfiMinersRegistryState {
81    pub async fn new(validator: &ValidatorPtr) -> Result<DarkfiMinersRegistryStatePtr> {
82        // Generate the PowRewardV1 ZK data
83        let powrewardv1_zk = PowRewardV1Zk::new(validator).await?;
84
85        Ok(Arc::new(RwLock::new(Self {
86            powrewardv1_zk,
87            block_templates: HashMap::new(),
88            jobs: HashMap::new(),
89            mm_jobs: HashMap::new(),
90        })))
91    }
92
93    /// Create a registry record for provided wallet config. If the
94    /// record already exists return its template, otherwise create its
95    /// current template based on provided validator state.
96    ///
97    /// Note: Always remember to purge new trees from the database if
98    /// not needed.
99    async fn create_template(
100        &mut self,
101        validator: &Validator,
102        wallet: &String,
103        config: &MinerRewardsRecipientConfig,
104    ) -> Result<BlockTemplate> {
105        // Check if a template already exists for this wallet
106        if let Some(block_template) = self.block_templates.get(wallet) {
107            return Ok(block_template.clone())
108        }
109
110        // Grab validator best current fork
111        let mut extended_fork = validator.best_current_fork().await?;
112
113        // Generate the next block template
114        let block_template = generate_next_block_template(
115            &mut extended_fork,
116            config,
117            &self.powrewardv1_zk.zkbin,
118            &self.powrewardv1_zk.provingkey,
119            validator.verify_fees,
120        )
121        .await?;
122
123        // Create the new registry record
124        self.block_templates.insert(wallet.clone(), block_template.clone());
125
126        // Print the new template wallet information
127        let recipient_str = format!("{}", config.recipient);
128        let spend_hook_str = match config.spend_hook {
129            Some(spend_hook) => format!("{spend_hook}"),
130            None => String::from("-"),
131        };
132        let user_data_str = match config.user_data {
133            Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
134            None => String::from("-"),
135        };
136        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
137            "Created new block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
138        );
139
140        Ok(block_template)
141    }
142
143    /// Register a new miner and create its job.
144    pub async fn register_miner(
145        &mut self,
146        validator: &Validator,
147        wallet: &String,
148        config: &MinerRewardsRecipientConfig,
149    ) -> Result<(String, String, JsonValue, JsonSubscriber)> {
150        // Create wallet template
151        let block_template = self.create_template(validator, wallet, config).await?;
152
153        // Grab the hex encoded block hash and create the client job record
154        let (job_id, job) = block_template.job_notification();
155        let (client_id, client) = MinerClient::new(wallet, config, &job_id);
156        let publisher = client.publisher.clone();
157        self.jobs.insert(client_id.clone(), client);
158
159        Ok((client_id, job_id, job, publisher))
160    }
161
162    /// Register a new merge miner and create its job.
163    pub async fn register_merge_miner(
164        &mut self,
165        validator: &Validator,
166        wallet: &String,
167        config: &MinerRewardsRecipientConfig,
168    ) -> Result<(String, f64)> {
169        // Create wallet template
170        let block_template = self.create_template(validator, wallet, config).await?;
171
172        // Grab the block template hash and its difficulty, and then
173        // create the job record.
174        let block_template_hash = block_template.block.header.template_hash().as_string();
175        let difficulty = block_template.difficulty;
176        self.mm_jobs.insert(block_template_hash.clone(), wallet.clone());
177
178        Ok((block_template_hash, difficulty))
179    }
180
181    /// Submit provided block to the provided node.
182    pub async fn submit(
183        &self,
184        validator: &mut Validator,
185        subscribers: &HashMap<&'static str, JsonSubscriber>,
186        p2p_handler: &DarkfidP2pHandlerPtr,
187        block: BlockInfo,
188    ) -> Result<()> {
189        let proposal = Proposal::new(block);
190        validator.append_proposal(&proposal).await?;
191
192        info!(
193            target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
194            "Proposing new block to network",
195        );
196
197        let proposals_sub = subscribers.get("proposals").unwrap();
198        let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
199        proposals_sub.notify(vec![enc_prop].into()).await;
200
201        info!(
202            target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
203            "Broadcasting new block to network",
204        );
205        let message = ProposalMessage(proposal);
206        p2p_handler.p2p.broadcast(&message).await;
207
208        Ok(())
209    }
210
211    /// Refresh outdated jobs in the registry based on provided
212    /// validator state.
213    pub async fn refresh(&mut self, validator: &Validator) -> Result<()> {
214        // Find inactive native jobs and drop them
215        let mut dropped_jobs = vec![];
216        let mut active_templates = HashSet::new();
217        for (client_id, client) in self.jobs.iter() {
218            // Clear inactive client publisher subscribers. If none
219            // exists afterwards, the client is considered inactive so
220            // we mark it for drop.
221            if client.publisher.publisher.clear_inactive().await {
222                dropped_jobs.push(client_id.clone());
223                continue
224            }
225
226            // Mark client block template as active
227            active_templates.insert(client.wallet.clone());
228        }
229        self.jobs.retain(|client_id, _| !dropped_jobs.contains(client_id));
230
231        // Grab validator best current fork and its last proposal for
232        // checks.
233        let extended_fork = validator.best_current_fork().await?;
234        let last_proposal = extended_fork.last_proposal()?.hash;
235
236        // Find mm jobs not extending the best current fork and drop
237        // them.
238        let mut dropped_mm_jobs = vec![];
239        for (job_id, wallet) in self.mm_jobs.iter() {
240            // Grab its wallet template. Its safe to unwrap here since
241            // we know the job exists.
242            let block_template = self.block_templates.get(wallet).unwrap();
243
244            // Check if it extends current best fork
245            if block_template.block.header.previous == last_proposal {
246                active_templates.insert(wallet.clone());
247                continue
248            }
249
250            // This mm job doesn't extend current best fork so we mark
251            // it for drop.
252            dropped_mm_jobs.push(job_id.clone());
253        }
254        self.mm_jobs.retain(|job_id, _| !dropped_mm_jobs.contains(job_id));
255
256        // Drop inactive templates. Merge miners will create a new
257        // template and job on next poll.
258        self.block_templates.retain(|wallet, _| active_templates.contains(wallet));
259
260        // Return if no wallets templates exists.
261        if self.block_templates.is_empty() {
262            return Ok(())
263        }
264
265        // Iterate over active clients to refresh their jobs, if needed
266        for (job_id, client) in self.jobs.iter_mut() {
267            // Grab its wallet template. Its safe to unwrap here since
268            // we know the job exists.
269            let block_template = self.block_templates.get_mut(&client.wallet).unwrap();
270
271            // Check if it extends current best fork
272            if block_template.block.header.previous == last_proposal {
273                continue
274            }
275
276            // Clone the fork so each client generates over a new one
277            let mut extended_fork = extended_fork.full_clone()?;
278
279            // Generate the next block template
280            let result = generate_next_block_template(
281                &mut extended_fork,
282                &client.config,
283                &self.powrewardv1_zk.zkbin,
284                &self.powrewardv1_zk.provingkey,
285                validator.verify_fees,
286            )
287            .await;
288
289            // Check result
290            *block_template = match result {
291                Ok(b) => b,
292                Err(e) => {
293                    error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
294                        "Updating block template for job {job_id} failed: {e}",
295                    );
296                    // Mark block template as not submitted so the
297                    // miner can submit another one and don't get stuck
298                    block_template.submitted = false;
299                    continue;
300                }
301            };
302
303            // Print the updated template wallet information
304            let recipient_str = format!("{}", client.config.recipient);
305            let spend_hook_str = match client.config.spend_hook {
306                Some(spend_hook) => format!("{spend_hook}"),
307                None => String::from("-"),
308            };
309            let user_data_str = match client.config.user_data {
310                Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
311                None => String::from("-"),
312            };
313            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
314                "Updated block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
315            );
316
317            // Create the new job notification
318            let (job, notification) = block_template.job_notification();
319
320            // Update the client record
321            client.job = job;
322
323            // Push job notification to subscriber
324            client.publisher.notify(notification).await;
325        }
326
327        Ok(())
328    }
329
330    /// Auxilliary function to retrieve all current block templates
331    /// newly opened trees.
332    pub fn new_trees(&self) -> BTreeSet<IVec> {
333        let mut new_trees = BTreeSet::new();
334        for block_template in self.block_templates.values() {
335            for new_tree in &block_template.new_trees {
336                new_trees.insert(new_tree.clone());
337            }
338        }
339        new_trees
340    }
341
342    /// Auxilliary function to retrieve all current block templates
343    /// transactions hashes.
344    pub fn proposed_transactions(&self) -> HashSet<TransactionHash> {
345        let mut proposed_txs = HashSet::new();
346        for block_template in self.block_templates.values() {
347            for tx in &block_template.block.txs {
348                proposed_txs.insert(tx.hash());
349            }
350        }
351        proposed_txs
352    }
353}
354
355/// Atomic pointer to the DarkFi node miners registry.
356pub type DarkfiMinersRegistryPtr = Arc<DarkfiMinersRegistry>;
357
358/// DarkFi node miners registry.
359pub struct DarkfiMinersRegistry {
360    /// Blockchain network
361    pub network: Network,
362    /// Registry state
363    pub state: DarkfiMinersRegistryStatePtr,
364    /// Stratum JSON-RPC background task
365    stratum_rpc_task: StoppableTaskPtr,
366    /// Stratum JSON-RPC connection tracker
367    pub stratum_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
368    /// HTTP JSON-RPC background task
369    mm_rpc_task: StoppableTaskPtr,
370    /// HTTP JSON-RPC connection tracker
371    pub mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
372}
373
374impl DarkfiMinersRegistry {
375    /// Initialize a DarkFi node miners registry.
376    pub async fn init(
377        network: Network,
378        validator: &ValidatorPtr,
379    ) -> Result<DarkfiMinersRegistryPtr> {
380        info!(
381            target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
382            "Initializing a new DarkFi node miners registry..."
383        );
384
385        // Generate the registry state
386        let state = DarkfiMinersRegistryState::new(validator).await?;
387
388        // Generate the stratum JSON-RPC background task and its
389        // connections tracker.
390        let stratum_rpc_task = StoppableTask::new();
391        let stratum_rpc_connections = Mutex::new(HashSet::new());
392
393        // Generate the HTTP JSON-RPC background task and its
394        // connections tracker.
395        let mm_rpc_task = StoppableTask::new();
396        let mm_rpc_connections = Mutex::new(HashSet::new());
397
398        info!(
399            target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
400            "DarkFi node miners registry generated successfully!"
401        );
402
403        Ok(Arc::new(Self {
404            network,
405            state,
406            stratum_rpc_task,
407            stratum_rpc_connections,
408            mm_rpc_task,
409            mm_rpc_connections,
410        }))
411    }
412
413    /// Start the DarkFi node miners registry for provided DarkFi node
414    /// instance.
415    pub fn start(
416        &self,
417        executor: &ExecutorPtr,
418        node: &DarkfiNodePtr,
419        stratum_rpc_settings: &Option<RpcSettings>,
420        mm_rpc_settings: &Option<RpcSettings>,
421    ) -> Result<()> {
422        info!(
423            target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
424            "Starting the DarkFi node miners registry..."
425        );
426
427        // Start the stratum server JSON-RPC task
428        if let Some(stratum_rpc) = stratum_rpc_settings {
429            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting Stratum JSON-RPC server");
430            let node_ = node.clone();
431            self.stratum_rpc_task.clone().start(
432                listen_and_serve::<StratumRpcHandler>(stratum_rpc.clone(), node.clone(), None, executor.clone()),
433                |res| async move {
434                    match res {
435                        Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<StratumRpcHandler>>::stop_connections(&node_).await,
436                        Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting Stratum JSON-RPC server: {e}"),
437                    }
438                },
439                Error::RpcServerStopped,
440                executor.clone(),
441            );
442        } else {
443            // Create a dummy task
444            self.stratum_rpc_task.clone().start(
445                async { Ok(()) },
446                |_| async { /* Do nothing */ },
447                Error::RpcServerStopped,
448                executor.clone(),
449            );
450        }
451
452        // Start the merge mining JSON-RPC task
453        if let Some(mm_rpc) = mm_rpc_settings {
454            info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Starting merge mining JSON-RPC server");
455            let node_ = node.clone();
456            self.mm_rpc_task.clone().start(
457                listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), node.clone(), None, executor.clone()),
458                |res| async move {
459                    match res {
460                        Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
461                        Err(e) => error!(target: "darkfid::registry::mod::DarkfiMinersRegistry::start", "Failed starting merge mining JSON-RPC server: {e}"),
462                    }
463                },
464                Error::RpcServerStopped,
465                executor.clone(),
466            );
467        } else {
468            // Create a dummy task
469            self.mm_rpc_task.clone().start(
470                async { Ok(()) },
471                |_| async { /* Do nothing */ },
472                Error::RpcServerStopped,
473                executor.clone(),
474            );
475        }
476
477        info!(
478            target: "darkfid::registry::mod::DarkfiMinersRegistry::start",
479            "DarkFi node miners registry started successfully!"
480        );
481
482        Ok(())
483    }
484
485    /// Stop the DarkFi node miners registry.
486    pub async fn stop(&self) {
487        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Terminating DarkFi node miners registry...");
488
489        // Stop the Stratum JSON-RPC task
490        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping Stratum JSON-RPC server...");
491        self.stratum_rpc_task.stop().await;
492
493        // Stop the merge mining JSON-RPC task
494        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "Stopping merge mining JSON-RPC server...");
495        self.mm_rpc_task.stop().await;
496
497        info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "DarkFi node miners registry terminated successfully!");
498    }
499}