1use std::collections::{HashMap, HashSet};
20
21use async_trait::async_trait;
22use smol::lock::MutexGuard;
23use tinyjson::JsonValue;
24use tracing::{debug, error, info};
25
26use darkfi::{
27 rpc::{
28 jsonrpc::{
29 ErrorCode, ErrorCode::InvalidParams, JsonError, JsonRequest, JsonResponse, JsonResult,
30 },
31 server::RequestHandler,
32 },
33 system::StoppableTaskPtr,
34};
35
36use crate::{
37 error::{miner_status_response, server_error, RpcError},
38 registry::model::MinerRewardsRecipientConfig,
39 DarkfiNode,
40};
41
42pub struct StratumRpcHandler;
47
48#[async_trait]
49#[rustfmt::skip]
50impl RequestHandler<StratumRpcHandler> for DarkfiNode {
51 async fn handle_request(&self, req: JsonRequest) -> JsonResult {
52 debug!(target: "darkfid::rpc::stratum_rpc", "--> {}", req.stringify().unwrap());
53
54 match req.method.as_str() {
55 "login" => self.stratum_login(req.id, req.params).await,
59 "submit" => self.stratum_submit(req.id, req.params).await,
60 "keepalived" => self.stratum_keepalived(req.id, req.params).await,
61 _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
62 }
63 }
64
65 async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
66 self.registry.stratum_rpc_connections.lock().await
67 }
68}
69
70impl DarkfiNode {
71 pub async fn stratum_login(&self, id: u16, params: JsonValue) -> JsonResult {
124 if !*self.validator.synced.read().await {
126 return JsonResponse::new(JsonValue::from(HashMap::new()), id).into()
127 }
128
129 let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
131 return JsonError::new(InvalidParams, None, id).into()
132 };
133
134 let Some(wallet) = params.get("login") else {
136 return server_error(RpcError::MinerMissingLogin, id, None)
137 };
138 let Some(wallet) = wallet.get::<String>() else {
139 return server_error(RpcError::MinerInvalidLogin, id, None)
140 };
141 let config =
142 match MinerRewardsRecipientConfig::from_str(&self.registry.network, wallet).await {
143 Ok(c) => c,
144 Err(e) => return server_error(e, id, None),
145 };
146
147 let Some(pass) = params.get("pass") else {
149 return server_error(RpcError::MinerMissingPassword, id, None)
150 };
151 let Some(_pass) = pass.get::<String>() else {
152 return server_error(RpcError::MinerInvalidPassword, id, None)
153 };
154
155 let Some(agent) = params.get("agent") else {
157 return server_error(RpcError::MinerMissingAgent, id, None)
158 };
159 let Some(agent) = agent.get::<String>() else {
160 return server_error(RpcError::MinerInvalidAgent, id, None)
161 };
162
163 let Some(algo) = params.get("algo") else {
165 return server_error(RpcError::MinerMissingAlgo, id, None)
166 };
167 let Some(algo) = algo.get::<Vec<JsonValue>>() else {
168 return server_error(RpcError::MinerInvalidAlgo, id, None)
169 };
170
171 let mut found_rx0 = false;
174 for i in algo {
175 let Some(algo) = i.get::<String>() else {
176 return server_error(RpcError::MinerInvalidAlgo, id, None)
177 };
178 if algo == "rx/0" {
179 found_rx0 = true;
180 break
181 }
182 }
183 if !found_rx0 {
184 return server_error(RpcError::MinerRandomXNotSupported, id, None)
185 }
186
187 info!(
189 target: "darkfid::rpc::rpc_stratum::stratum_login",
190 "[RPC-STRATUM] Got login from {wallet} ({agent})",
191 );
192 let (client_id, job_id, job, publisher) =
193 match self.registry.register_miner(&self.validator, wallet, &config).await {
194 Ok(p) => p,
195 Err(e) => {
196 error!(
197 target: "darkfid::rpc::rpc_stratum::stratum_login",
198 "[RPC-STRATUM] Failed to register miner: {e}",
199 );
200 return JsonResponse::new(JsonValue::from(HashMap::new()), id).into()
201 }
202 };
203
204 info!(
206 target: "darkfid::rpc::rpc_stratum::stratum_login",
207 "[RPC-STRATUM] Created new mining job for client {client_id}: {job_id}"
208 );
209 let response = JsonValue::from(HashMap::from([
210 ("id".to_string(), JsonValue::from(client_id)),
211 ("job".to_string(), job),
212 ("status".to_string(), JsonValue::from(String::from("OK"))),
213 ]));
214 (publisher, JsonResponse::new(response, id)).into()
215 }
216
217 pub async fn stratum_submit(&self, id: u16, params: JsonValue) -> JsonResult {
242 if !*self.validator.synced.read().await {
244 return miner_status_response(id, "rejected")
245 }
246
247 let submit_lock = self.registry.submit_lock.write().await;
249
250 let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
252 return JsonError::new(InvalidParams, None, id).into()
253 };
254
255 let Some(client_id) = params.get("id") else {
257 return server_error(RpcError::MinerMissingClientId, id, None)
258 };
259 let Some(client_id) = client_id.get::<String>() else {
260 return server_error(RpcError::MinerInvalidClientId, id, None)
261 };
262
263 let mut jobs = self.registry.jobs.write().await;
265 let Some(client) = jobs.get(client_id) else {
266 return miner_status_response(id, "rejected")
267 };
268
269 let Some(job_id) = params.get("job_id") else {
271 return server_error(RpcError::MinerMissingJobId, id, None)
272 };
273 let Some(job_id) = job_id.get::<String>() else {
274 return server_error(RpcError::MinerInvalidJobId, id, None)
275 };
276
277 if &client.job != job_id {
280 return miner_status_response(id, "rejected")
281 }
282
283 let mut block_templates = self.registry.block_templates.write().await;
286 let Some(block_template) = block_templates.get_mut(&client.wallet) else {
287 return miner_status_response(id, "rejected")
288 };
289
290 if block_template.submitted {
293 return miner_status_response(id, "rejected")
294 }
295
296 let Some(nonce) = params.get("nonce") else {
298 return server_error(RpcError::MinerMissingNonce, id, None)
299 };
300 let Some(nonce) = nonce.get::<String>() else {
301 return server_error(RpcError::MinerInvalidNonce, id, None)
302 };
303 let Ok(nonce_bytes) = hex::decode(nonce) else {
304 return server_error(RpcError::MinerInvalidNonce, id, None)
305 };
306 if nonce_bytes.len() != 4 {
307 return server_error(RpcError::MinerInvalidNonce, id, None)
308 }
309 let nonce = u32::from_le_bytes(nonce_bytes.try_into().unwrap());
310
311 let Some(result) = params.get("result") else {
313 return server_error(RpcError::MinerMissingResult, id, None)
314 };
315 let Some(_result) = result.get::<String>() else {
316 return server_error(RpcError::MinerInvalidResult, id, None)
317 };
318
319 info!(
320 target: "darkfid::rpc::rpc_stratum::stratum_submit",
321 "[RPC-STRATUM] Got solution submission from client {client_id} for job: {job_id}",
322 );
323
324 let mut block = block_template.block.clone();
326 block.header.nonce = nonce;
327 block.sign(&block_template.secret);
328
329 if let Err(e) =
331 self.registry.submit(&self.validator, &self.subscribers, &self.p2p_handler, block).await
332 {
333 error!(
334 target: "darkfid::rpc::rpc_stratum::stratum_submit",
335 "[RPC-STRATUM] Error submitting new block: {e}",
336 );
337
338 let mut mm_jobs = self.registry.mm_jobs.write().await;
340 if let Err(e) = self
341 .registry
342 .refresh_jobs(&mut block_templates, &mut jobs, &mut mm_jobs, &self.validator)
343 .await
344 {
345 error!(
346 target: "darkfid::rpc::rpc_stratum::stratum_submit",
347 "[RPC-STRATUM] Error refreshing registry jobs: {e}",
348 );
349 }
350
351 drop(block_templates);
353 drop(jobs);
354 drop(mm_jobs);
355 drop(submit_lock);
356
357 return miner_status_response(id, "rejected")
358 }
359
360 block_template.submitted = true;
362
363 drop(block_templates);
365 drop(jobs);
366 drop(submit_lock);
367
368 miner_status_response(id, "OK")
369 }
370
371 pub async fn stratum_keepalived(&self, id: u16, params: JsonValue) -> JsonResult {
383 let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
385 return JsonError::new(InvalidParams, None, id).into()
386 };
387
388 let Some(client_id) = params.get("id") else {
390 return server_error(RpcError::MinerMissingClientId, id, None)
391 };
392 let Some(client_id) = client_id.get::<String>() else {
393 return server_error(RpcError::MinerInvalidClientId, id, None)
394 };
395
396 if !self.registry.jobs.read().await.contains_key(client_id) {
398 return server_error(RpcError::MinerUnknownClient, id, None)
399 };
400
401 miner_status_response(id, "KEEPALIVED")
403 }
404}