fud/rpc/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    path::PathBuf,
22    sync::Arc,
23};
24
25use async_trait::async_trait;
26use smol::lock::{Mutex, MutexGuard};
27use tinyjson::JsonValue;
28use tracing::error;
29
30use darkfi::{
31    dht::DhtNode,
32    geode::hash_to_string,
33    net::P2pPtr,
34    rpc::{
35        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber},
36        p2p_method::HandlerP2p,
37        server::RequestHandler,
38    },
39    system::StoppableTaskPtr,
40    util::path::expand_path,
41    Result,
42};
43
44use crate::{util::FileSelection, Fud};
45
46/// Management JSON-RPC
47pub mod management;
48
49pub struct DefaultRpcInterface {
50    fud: Arc<Fud>,
51    rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
52    event_sub: JsonSubscriber,
53}
54
55#[async_trait]
56impl RequestHandler<()> for DefaultRpcInterface {
57    async fn handle_request(&self, req: JsonRequest) -> JsonResult {
58        return match req.method.as_str() {
59            "ping" => self.pong(req.id, req.params).await,
60
61            "put" => self.put(req.id, req.params).await,
62            "get" => self.get(req.id, req.params).await,
63            "subscribe" => self.subscribe(req.id, req.params).await,
64            "remove" => self.remove(req.id, req.params).await,
65            "list_resources" => self.list_resources(req.id, req.params).await,
66            "list_buckets" => self.list_buckets(req.id, req.params).await,
67            "list_seeders" => self.list_seeders(req.id, req.params).await,
68            "verify" => self.verify(req.id, req.params).await,
69            "lookup" => self.lookup(req.id, req.params).await,
70
71            _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
72        }
73    }
74
75    async fn connections_mut(&self) -> MutexGuard<'_, HashSet<StoppableTaskPtr>> {
76        self.rpc_connections.lock().await
77    }
78}
79
80impl HandlerP2p for DefaultRpcInterface {
81    fn p2p(&self) -> P2pPtr {
82        self.fud.p2p.clone()
83    }
84}
85
86/// Fud RPC methods
87impl DefaultRpcInterface {
88    pub fn new(fud: Arc<Fud>, event_sub: JsonSubscriber) -> Self {
89        Self { fud, rpc_connections: Mutex::new(HashSet::new()), event_sub }
90    }
91
92    // RPCAPI:
93    // Put a file/directory onto the network. Takes a local filesystem path as a parameter.
94    // Returns the resource hash that serves as a pointer to the file/directory.
95    //
96    // --> {"jsonrpc": "2.0", "method": "put", "params": ["/foo.txt"], "id": 42}
97    // <-- {"jsonrpc": "2.0", "result: "df4...3db7", "id": 42}
98    async fn put(&self, id: u16, params: JsonValue) -> JsonResult {
99        let params = params.get::<Vec<JsonValue>>().unwrap();
100        if params.len() != 1 || !params[0].is_string() {
101            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
102        }
103
104        let path = params[0].get::<String>().unwrap();
105        let path = match expand_path(path.as_str()) {
106            Ok(v) => v,
107            Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
108        };
109
110        // A valid path was passed. Let's see if we can read it, and if so,
111        // add it to Geode.
112        let res = self.fud.put(&path).await;
113        if let Err(e) = res {
114            return JsonError::new(ErrorCode::InternalError, Some(format!("{e}")), id).into()
115        }
116
117        JsonResponse::new(JsonValue::String(path.to_string_lossy().to_string()), id).into()
118    }
119
120    // RPCAPI:
121    // Fetch a resource from the network. Takes a hash, path (absolute or relative), and an
122    // optional list of file paths (only used for directories) as parameters.
123    // Returns the path where the resource will be located once downloaded.
124    //
125    // --> {"jsonrpc": "2.0", "method": "get", "params": ["1211...abfd", "~/myfile.jpg", null], "id": 42}
126    // <-- {"jsonrpc": "2.0", "result": "/home/user/myfile.jpg", "id": 42}
127    async fn get(&self, id: u16, params: JsonValue) -> JsonResult {
128        let params = params.get::<Vec<JsonValue>>().unwrap();
129        if params.len() != 3 || !params[0].is_string() || !params[1].is_string() {
130            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
131        }
132
133        let mut hash_buf = vec![];
134        match bs58::decode(params[0].get::<String>().unwrap().as_str()).onto(&mut hash_buf) {
135            Ok(_) => {}
136            Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
137        }
138
139        if hash_buf.len() != 32 {
140            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
141        }
142
143        let mut hash_buf_arr = [0u8; 32];
144        hash_buf_arr.copy_from_slice(&hash_buf);
145
146        let hash = blake3::Hash::from_bytes(hash_buf_arr);
147        let hash_str = hash_to_string(&hash);
148
149        let path = match params[1].get::<String>() {
150            Some(path) => match path.is_empty() {
151                true => match self.fud.hash_to_path(&hash).ok().flatten() {
152                    Some(path) => path,
153                    None => self.fud.downloads_path.join(&hash_str),
154                },
155                false => match PathBuf::from(path).is_absolute() {
156                    true => PathBuf::from(path),
157                    false => self.fud.downloads_path.join(path),
158                },
159            },
160            None => self.fud.downloads_path.join(&hash_str),
161        };
162
163        let files: FileSelection = match &params[2] {
164            JsonValue::Array(files) => files
165                .iter()
166                .filter_map(|v| {
167                    if let JsonValue::String(file) = v {
168                        Some(PathBuf::from(file.clone()))
169                    } else {
170                        None
171                    }
172                })
173                .collect(),
174            JsonValue::Null => FileSelection::All,
175            _ => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
176        };
177
178        // Start downloading the resource
179        if let Err(e) = self.fud.get(&hash, &path, files).await {
180            return JsonError::new(ErrorCode::InternalError, Some(e.to_string()), id).into()
181        }
182
183        JsonResponse::new(JsonValue::String(path.to_string_lossy().to_string()), id).into()
184    }
185
186    // RPCAPI:
187    // Subscribe to fud events.
188    //
189    // --> {"jsonrpc": "2.0", "method": "get", "params": [], "id": 42}
190    // <-- {"jsonrpc": "2.0", "result": `event`, "id": 42}
191    async fn subscribe(&self, _id: u16, _params: JsonValue) -> JsonResult {
192        self.event_sub.clone().into()
193    }
194
195    // RPCAPI:
196    // Returns resources.
197    //
198    // --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1}
199    // <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:9700"]]]], "id": 1}
200    pub async fn list_resources(&self, id: u16, params: JsonValue) -> JsonResult {
201        let params = params.get::<Vec<JsonValue>>().unwrap();
202        if !params.is_empty() {
203            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
204        }
205
206        let resources_read = self.fud.resources.read().await;
207        let mut resources: Vec<JsonValue> = vec![];
208        for (_, resource) in resources_read.iter() {
209            resources.push(resource.clone().into());
210        }
211
212        JsonResponse::new(JsonValue::Array(resources), id).into()
213    }
214
215    // RPCAPI:
216    // Returns the current buckets.
217    //
218    // --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1}
219    // <-- {"jsonrpc": "2.0", "result": [["abcdef", ["tcp://127.0.0.1:9700"]]], "id": 1}
220    pub async fn list_buckets(&self, id: u16, params: JsonValue) -> JsonResult {
221        let params = params.get::<Vec<JsonValue>>().unwrap();
222        if !params.is_empty() {
223            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
224        }
225        let mut buckets = vec![];
226        for bucket in self.fud.dht.buckets.read().await.iter() {
227            let mut nodes = vec![];
228            for node in bucket.nodes.clone() {
229                let mut addresses = vec![];
230                for addr in &node.addresses {
231                    addresses.push(JsonValue::String(addr.to_string()));
232                }
233                nodes.push(JsonValue::Array(vec![
234                    JsonValue::String(hash_to_string(&node.id())),
235                    JsonValue::Array(addresses),
236                ]));
237            }
238            buckets.push(JsonValue::Array(nodes));
239        }
240
241        JsonResponse::new(JsonValue::Array(buckets), id).into()
242    }
243
244    // RPCAPI:
245    // Returns the content of the seeders router.
246    //
247    // --> {"jsonrpc": "2.0", "method": "list_seeders", "params": [], "id": 1}
248    // <-- {"jsonrpc": "2.0", "result": {"seeders": {"abcdefileid": [["abcdef", ["tcp://127.0.0.1:9700"]]]}}, "id": 1}
249    pub async fn list_seeders(&self, id: u16, params: JsonValue) -> JsonResult {
250        let params = params.get::<Vec<JsonValue>>().unwrap();
251        if !params.is_empty() {
252            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
253        }
254        let mut seeders_table: HashMap<String, JsonValue> = HashMap::new();
255        for (hash, items) in self.fud.dht.hash_table.read().await.iter() {
256            let mut nodes = vec![];
257            for item in items {
258                let mut addresses = vec![];
259                for addr in &item.node.addresses {
260                    addresses.push(JsonValue::String(addr.to_string()));
261                }
262                nodes.push(JsonValue::Array(vec![
263                    JsonValue::String(hash_to_string(&item.node.id())),
264                    JsonValue::Array(addresses),
265                ]));
266            }
267            seeders_table.insert(hash_to_string(hash), JsonValue::Array(nodes));
268        }
269        let mut res: HashMap<String, JsonValue> = HashMap::new();
270        res.insert("seeders".to_string(), JsonValue::Object(seeders_table));
271
272        JsonResponse::new(JsonValue::Object(res), id).into()
273    }
274
275    // RPCAPI:
276    // Removes a resource.
277    //
278    // --> {"jsonrpc": "2.0", "method": "remove", "params": ["1211...abfd"], "id": 1}
279    // <-- {"jsonrpc": "2.0", "result": [], "id": 1}
280    pub async fn remove(&self, id: u16, params: JsonValue) -> JsonResult {
281        let params = params.get::<Vec<JsonValue>>().unwrap();
282        if params.len() != 1 || !params[0].is_string() {
283            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
284        }
285        let mut hash_buf = [0u8; 32];
286        match bs58::decode(params[0].get::<String>().unwrap().as_str()).onto(&mut hash_buf) {
287            Ok(_) => {}
288            Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
289        }
290
291        self.fud.remove(&blake3::Hash::from_bytes(hash_buf)).await;
292
293        JsonResponse::new(JsonValue::Array(vec![]), id).into()
294    }
295
296    // RPCAPI:
297    // Verifies local files. Takes a list of file hashes as parameters.
298    // An empty list means all known files.
299    // Returns the path where the file will be located once downloaded.
300    //
301    // --> {"jsonrpc": "2.0", "method": "verify", "params": ["1211...abfd"], "id": 42}
302    // <-- {"jsonrpc": "2.0", "result": [], "id": 1}
303    async fn verify(&self, id: u16, params: JsonValue) -> JsonResult {
304        let params = params.get::<Vec<JsonValue>>().unwrap();
305        if !params.iter().all(|param| param.is_string()) {
306            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
307        }
308        let hashes = if params.is_empty() {
309            None
310        } else {
311            let hashes_str: Vec<String> =
312                params.iter().map(|param| param.get::<String>().unwrap().clone()).collect();
313            let hashes: Result<Vec<blake3::Hash>> = hashes_str
314                .into_iter()
315                .map(|hash_str| {
316                    let mut buf = [0u8; 32];
317                    bs58::decode(hash_str).onto(&mut buf)?;
318                    Ok(blake3::Hash::from_bytes(buf))
319                })
320                .collect();
321            if hashes.is_err() {
322                return JsonError::new(ErrorCode::InvalidParams, None, id).into();
323            }
324            Some(hashes.unwrap())
325        };
326
327        if let Err(e) = self.fud.verify_resources(hashes).await {
328            error!(target: "fud::verify()", "Could not verify resources: {e}");
329            return JsonError::new(ErrorCode::InternalError, None, id).into();
330        }
331
332        JsonResponse::new(JsonValue::Array(vec![]), id).into()
333    }
334
335    // RPCAPI:
336    // Lookup a resource's seeders.
337    //
338    // --> {"jsonrpc": "2.0", "method": "lookup", "params": ["1211...abfd"], "id": 1}
339    // <-- {"jsonrpc": "2.0", "result": {"seeders": {"abcdefileid": [["abcdef", ["tcp://127.0.0.1:9701"]]]}}, "id": 1}
340    pub async fn lookup(&self, id: u16, params: JsonValue) -> JsonResult {
341        let params = params.get::<Vec<JsonValue>>().unwrap();
342        if params.len() != 1 || !params[0].is_string() {
343            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
344        }
345        let mut hash_buf = [0u8; 32];
346        match bs58::decode(params[0].get::<String>().unwrap().as_str()).onto(&mut hash_buf) {
347            Ok(_) => {}
348            Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
349        }
350
351        let _ = self.fud.lookup_tx.send(blake3::Hash::from_bytes(hash_buf)).await;
352
353        JsonResponse::new(JsonValue::Array(vec![]), id).into()
354    }
355}