fu/
main.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use clap::{Parser, Subcommand};
20use smol::lock::RwLock;
21use std::{
22    collections::HashMap,
23    io::{stdout, Write},
24    sync::Arc,
25};
26use termcolor::{ColorChoice, StandardStream, WriteColor};
27use tracing::error;
28use url::Url;
29
30use darkfi::{
31    cli_desc,
32    rpc::{
33        client::RpcClient,
34        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
35        util::JsonValue,
36    },
37    system::{ExecutorPtr, Publisher, StoppableTask},
38    util::logger::setup_logging,
39    Error, Result,
40};
41
42use fud::{
43    resource::{Resource, ResourceStatus, ResourceType},
44    util::{hash_to_string, FileSelection},
45};
46
47mod util;
48use crate::util::{
49    format_bytes, format_duration, format_progress_bytes, optional_value, print_tree,
50    status_to_colorspec, type_to_colorspec, TreeNode,
51};
52
53#[derive(Parser)]
54#[clap(name = "fu", about = cli_desc!(), version)]
55#[clap(arg_required_else_help(true))]
56struct Args {
57    #[clap(short, action = clap::ArgAction::Count)]
58    /// Increase verbosity (-vvv supported)
59    verbose: u8,
60
61    #[clap(short, long, default_value = "tcp://127.0.0.1:9705")]
62    /// fud JSON-RPC endpoint
63    endpoint: Url,
64
65    #[clap(subcommand)]
66    command: Subcmd,
67}
68
69#[derive(Subcommand)]
70enum Subcmd {
71    /// Retrieve provided resource from the fud network
72    Get {
73        /// Resource hash
74        hash: String,
75        /// Download path (relative or absolute)
76        path: Option<String>,
77        /// Optional list of files you want to download (only used for directories)
78        #[arg(short, long, num_args = 1..)]
79        files: Option<Vec<String>>,
80    },
81
82    /// Put a file or directory onto the fud network
83    Put {
84        /// File path or directory path
85        path: String,
86    },
87
88    /// List resources
89    Ls {},
90
91    /// Watch
92    Watch {},
93
94    /// Remove a resource from fud
95    Rm {
96        /// Resource hash
97        hash: String,
98    },
99
100    /// Get the current node buckets
101    Buckets {},
102
103    /// Get the router state
104    Seeders {},
105
106    /// Verify local files
107    Verify {
108        /// File hashes
109        files: Option<Vec<String>>,
110    },
111
112    /// Lookup seeders of a resource from the network
113    Lookup {
114        /// Resource hash
115        hash: String,
116    },
117}
118
119struct Fu {
120    pub rpc_client: Arc<RpcClient>,
121    pub endpoint: Url,
122}
123
124impl Fu {
125    async fn get(
126        &self,
127        hash: String,
128        path: Option<String>,
129        files: Option<Vec<String>>,
130        ex: ExecutorPtr,
131    ) -> Result<()> {
132        let publisher = Publisher::new();
133        let subscription = Arc::new(publisher.clone().subscribe().await);
134        let subscriber_task = StoppableTask::new();
135        let hash_ = hash.clone();
136        let publisher_ = publisher.clone();
137        let rpc_client_ = self.rpc_client.clone();
138        subscriber_task.clone().start(
139            async move {
140                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
141                rpc_client_.subscribe(req, publisher).await
142            },
143            move |res| async move {
144                match res {
145                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
146                    Err(e) => {
147                        error!("{e}");
148                        publisher_
149                            .notify(JsonResult::Error(JsonError::new(
150                                ErrorCode::InternalError,
151                                None,
152                                0,
153                            )))
154                            .await;
155                    }
156                }
157            },
158            Error::DetachedTaskStopped,
159            ex.clone(),
160        );
161
162        let progress_bar_width = 20;
163        let mut started = false;
164        let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
165
166        let mut print_progress = |info: &HashMap<String, JsonValue>| {
167            started = true;
168            let rs: Resource = info.get("resource").unwrap().clone().into();
169
170            print!("\x1B[2K\r"); // Clear current line
171
172            // Progress bar
173            let percent = if rs.target_bytes_downloaded > rs.target_bytes_size {
174                1.0
175            } else if rs.target_bytes_size > 0 {
176                rs.target_bytes_downloaded as f64 / rs.target_bytes_size as f64
177            } else {
178                0.0
179            };
180            let completed = (percent * progress_bar_width as f64) as usize;
181            let remaining = match progress_bar_width > completed {
182                true => progress_bar_width - completed,
183                false => 0,
184            };
185            let bar = "=".repeat(completed) + &" ".repeat(remaining);
186            print!("[{bar}] {:.1}% | ", percent * 100.0);
187
188            // Downloaded / Total (in bytes)
189            if rs.target_bytes_size > 0 {
190                if rs.target_bytes_downloaded == rs.target_bytes_size {
191                    print!("{} | ", format_bytes(rs.target_bytes_size));
192                } else {
193                    print!(
194                        "{} | ",
195                        format_progress_bytes(rs.target_bytes_downloaded, rs.target_bytes_size)
196                    );
197                }
198            }
199
200            // Download speed (in bytes/sec)
201            if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
202                print!("{}/s | ", format_bytes(*rs.speeds.last().unwrap() as u64));
203            }
204
205            // Downloaded / Total (in chunks)
206            if rs.target_chunks_count > 0 {
207                let s = if rs.target_chunks_count > 1 { "s" } else { "" };
208                if rs.target_chunks_downloaded == rs.target_chunks_count {
209                    print!("{} chunk{s} | ", rs.target_chunks_count);
210                } else {
211                    print!(
212                        "{}/{} chunk{s} | ",
213                        rs.target_chunks_downloaded, rs.target_chunks_count
214                    );
215                }
216            }
217
218            // ETA
219            if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
220                print!("ETA: {} | ", format_duration(rs.get_eta()));
221            }
222
223            // Status
224            let is_done = rs.target_chunks_downloaded == rs.target_chunks_count &&
225                rs.status.as_str() == "incomplete";
226            let status = if is_done { ResourceStatus::Seeding } else { rs.status };
227            tstdout.set_color(&status_to_colorspec(&status)).unwrap();
228            print!(
229                "{}",
230                if let ResourceStatus::Seeding = status { "done" } else { status.as_str() }
231            );
232            tstdout.reset().unwrap();
233            stdout().flush().unwrap();
234        };
235
236        let req = JsonRequest::new(
237            "get",
238            JsonValue::Array(vec![
239                JsonValue::String(hash_.clone()),
240                JsonValue::String(path.unwrap_or_default()),
241                match files {
242                    Some(files) => {
243                        JsonValue::Array(files.into_iter().map(JsonValue::String).collect())
244                    }
245                    None => JsonValue::Null,
246                },
247            ]),
248        );
249        // Create a RPC client to send the `get` request
250        let rpc_client_getter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
251        let _ = rpc_client_getter.request(req).await?;
252
253        loop {
254            match subscription.receive().await {
255                JsonResult::Notification(n) => {
256                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
257                    let info = params.get("info");
258                    if info.is_none() {
259                        continue
260                    }
261                    let info = info.unwrap().get::<HashMap<String, JsonValue>>().unwrap();
262
263                    let hash = match info.get("hash") {
264                        Some(hash_value) => hash_value.get::<String>().unwrap(),
265                        None => continue,
266                    };
267                    if *hash != hash_ {
268                        continue;
269                    }
270                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
271                        "download_started" |
272                        "metadata_download_completed" |
273                        "chunk_download_completed" |
274                        "resource_updated" => {
275                            print_progress(info);
276                        }
277                        "download_completed" => {
278                            let resource_json = info
279                                .get("resource")
280                                .unwrap()
281                                .get::<HashMap<String, JsonValue>>()
282                                .unwrap();
283                            let path = resource_json.get("path").unwrap().get::<String>().unwrap();
284                            print_progress(info);
285                            println!("\nDownload completed:\n{path}");
286                            return Ok(());
287                        }
288                        "metadata_not_found" => {
289                            println!();
290                            return Err(Error::Custom(format!("Could not find {hash}")));
291                        }
292                        "chunk_not_found" => {
293                            // A seeder does not have a chunk we are looking for,
294                            // we will try another seeder so there is nothing to do
295                        }
296                        "missing_chunks" => {
297                            // We tried all seeders and some chunks are still missing
298                            println!();
299                            return Err(Error::Custom("Missing chunks".to_string()));
300                        }
301                        "download_error" => {
302                            // An error that caused the download to be unsuccessful
303                            if started {
304                                println!();
305                            }
306                            return Err(Error::Custom(
307                                info.get("error").unwrap().get::<String>().unwrap().to_string(),
308                            ));
309                        }
310                        _ => {}
311                    }
312                }
313
314                JsonResult::Error(e) => {
315                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
316                }
317
318                x => {
319                    return Err(Error::UnexpectedJsonRpc(format!(
320                        "Got unexpected data from JSON-RPC: {x:?}"
321                    )))
322                }
323            }
324        }
325    }
326
327    async fn put(&self, path: String, ex: ExecutorPtr) -> Result<()> {
328        let publisher = Publisher::new();
329        let subscription = Arc::new(publisher.clone().subscribe().await);
330        let subscriber_task = StoppableTask::new();
331        let publisher_ = publisher.clone();
332        let rpc_client_ = self.rpc_client.clone();
333        subscriber_task.clone().start(
334            async move {
335                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
336                rpc_client_.subscribe(req, publisher).await
337            },
338            move |res| async move {
339                match res {
340                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
341                    Err(e) => {
342                        error!("{e}");
343                        publisher_
344                            .notify(JsonResult::Error(JsonError::new(
345                                ErrorCode::InternalError,
346                                None,
347                                0,
348                            )))
349                            .await;
350                    }
351                }
352            },
353            Error::DetachedTaskStopped,
354            ex.clone(),
355        );
356
357        let rpc_client_putter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
358        let req = JsonRequest::new("put", JsonValue::Array(vec![JsonValue::String(path)]));
359        let rep = rpc_client_putter.request(req).await?;
360        let path_str = rep.get::<String>().unwrap().clone();
361
362        loop {
363            match subscription.receive().await {
364                JsonResult::Notification(n) => {
365                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
366                    let info =
367                        params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
368                    let path = match info.get("path") {
369                        Some(path) => path.get::<String>().unwrap(),
370                        None => continue,
371                    };
372                    if *path != path_str {
373                        continue;
374                    }
375
376                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
377                        "insert_completed" => {
378                            let id = info.get("hash").unwrap().get::<String>().unwrap().to_string();
379                            println!("{id}");
380                            break Ok(())
381                        }
382                        "insert_error" => {
383                            return Err(Error::Custom(
384                                info.get("error").unwrap().get::<String>().unwrap().to_string(),
385                            ));
386                        }
387                        _ => {}
388                    }
389                }
390
391                JsonResult::Error(e) => {
392                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
393                }
394
395                x => {
396                    return Err(Error::UnexpectedJsonRpc(format!(
397                        "Got unexpected data from JSON-RPC: {x:?}"
398                    )))
399                }
400            }
401        }
402    }
403
404    async fn list_resources(&self) -> Result<()> {
405        let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
406        let rep = self.rpc_client.request(req).await?;
407
408        let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
409        let resources: Vec<Resource> = resources_json.into_iter().map(|v| v.into()).collect();
410
411        for resource in resources.iter() {
412            let mut tree: Vec<TreeNode<String>> = vec![
413                TreeNode::kv("ID".to_string(), hash_to_string(&resource.hash)),
414                TreeNode::kvc(
415                    "Type".to_string(),
416                    resource.rtype.as_str().to_string(),
417                    type_to_colorspec(&resource.rtype),
418                ),
419                TreeNode::kvc(
420                    "Status".to_string(),
421                    resource.status.as_str().to_string(),
422                    status_to_colorspec(&resource.status),
423                ),
424                TreeNode::kv("Chunks".to_string(), {
425                    if let ResourceType::Directory = resource.rtype {
426                        format!(
427                            "{}/{} ({}/{})",
428                            resource.total_chunks_downloaded,
429                            optional_value!(resource.total_chunks_count),
430                            resource.target_chunks_downloaded,
431                            optional_value!(resource.target_chunks_count)
432                        )
433                    } else {
434                        format!(
435                            "{}/{}",
436                            resource.total_chunks_downloaded,
437                            optional_value!(resource.total_chunks_count)
438                        )
439                    }
440                }),
441                TreeNode::kv("Bytes".to_string(), {
442                    if let ResourceType::Directory = resource.rtype {
443                        format!(
444                            "{} ({})",
445                            optional_value!(resource.total_bytes_size, |x: u64| {
446                                format_progress_bytes(resource.total_bytes_downloaded, x)
447                            }),
448                            optional_value!(resource.target_bytes_size, |x: u64| {
449                                format_progress_bytes(resource.target_bytes_downloaded, x)
450                            })
451                        )
452                    } else {
453                        optional_value!(resource.total_bytes_size, |x: u64| format_progress_bytes(
454                            resource.total_bytes_downloaded,
455                            x
456                        ))
457                    }
458                }),
459            ];
460            if let FileSelection::Set(set) = &resource.file_selection {
461                tree.push(TreeNode::key("Selected files".to_string()));
462                tree.last_mut().unwrap().children = set
463                    .clone()
464                    .into_iter()
465                    .map(|path| TreeNode::key(path.to_string_lossy().to_string()))
466                    .collect();
467            }
468            print_tree(&resource.path.to_string_lossy(), &tree);
469        }
470
471        Ok(())
472    }
473
474    async fn buckets(&self) -> Result<()> {
475        let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![]));
476        let rep = self.rpc_client.request(req).await?;
477        let buckets: Vec<JsonValue> = rep.try_into().unwrap();
478        let mut empty = true;
479        for (bucket_i, bucket) in buckets.into_iter().enumerate() {
480            let nodes: Vec<JsonValue> = bucket.try_into().unwrap();
481            if nodes.is_empty() {
482                continue
483            }
484            empty = false;
485
486            let tree: Vec<TreeNode<String>> = nodes
487                .into_iter()
488                .map(|n| {
489                    let node: Vec<JsonValue> = n.try_into().unwrap();
490                    let node_id: JsonValue = node[0].clone();
491                    let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
492
493                    let addresses_vec: Vec<String> = addresses
494                        .into_iter()
495                        .map(|addr| TryInto::<String>::try_into(addr).unwrap())
496                        .collect();
497
498                    let node_id_string: String = node_id.try_into().unwrap();
499
500                    TreeNode {
501                        key: node_id_string,
502                        value: None,
503                        color: None,
504                        children: addresses_vec
505                            .into_iter()
506                            .map(|addr| TreeNode::key(addr.clone()))
507                            .collect(),
508                    }
509                })
510                .collect();
511
512            print_tree(format!("Bucket {bucket_i}").as_str(), &tree);
513        }
514
515        if empty {
516            println!("All buckets are empty");
517        }
518
519        Ok(())
520    }
521
522    async fn seeders(&self) -> Result<()> {
523        let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
524        let rep = self.rpc_client.request(req).await?;
525
526        let resources: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap();
527
528        if resources.is_empty() {
529            println!("No known seeders");
530            return Ok(())
531        }
532
533        for (hash, nodes) in resources {
534            let nodes: Vec<JsonValue> = nodes.try_into().unwrap();
535            let tree: Vec<TreeNode<String>> = nodes
536                .into_iter()
537                .map(|n| {
538                    let node: Vec<JsonValue> = n.try_into().unwrap();
539                    let node_id: JsonValue = node[0].clone();
540                    let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
541
542                    let addresses_vec: Vec<String> = addresses
543                        .into_iter()
544                        .map(|addr| TryInto::<String>::try_into(addr).unwrap())
545                        .collect();
546
547                    let node_id_string: String = node_id.try_into().unwrap();
548
549                    TreeNode {
550                        key: node_id_string,
551                        value: None,
552                        color: None,
553                        children: addresses_vec
554                            .into_iter()
555                            .map(|addr| TreeNode::key(addr.clone()))
556                            .collect(),
557                    }
558                })
559                .collect();
560
561            print_tree(&hash, &tree);
562        }
563
564        Ok(())
565    }
566
567    async fn watch(&self, ex: ExecutorPtr) -> Result<()> {
568        let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
569        let rep = self.rpc_client.request(req).await?;
570
571        let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
572        let resources: Arc<RwLock<Vec<Resource>>> = Arc::new(RwLock::new(vec![]));
573
574        let publisher = Publisher::new();
575        let subscription = Arc::new(publisher.clone().subscribe().await);
576        let subscriber_task = StoppableTask::new();
577        let publisher_ = publisher.clone();
578        let rpc_client_ = self.rpc_client.clone();
579        subscriber_task.clone().start(
580            async move {
581                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
582                rpc_client_.subscribe(req, publisher).await
583            },
584            move |res| async move {
585                match res {
586                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
587                    Err(e) => {
588                        error!("{e}");
589                        publisher_
590                            .notify(JsonResult::Error(JsonError::new(
591                                ErrorCode::InternalError,
592                                None,
593                                0,
594                            )))
595                            .await;
596                    }
597                }
598            },
599            Error::DetachedTaskStopped,
600            ex,
601        );
602
603        let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
604
605        let mut update_resource = async |resource: &Resource| {
606            let mut resources_write = resources.write().await;
607            let i = match resources_write.iter().position(|r| r.hash == resource.hash) {
608                Some(i) => {
609                    resources_write.remove(i);
610                    resources_write.insert(i, resource.clone());
611                    i
612                }
613                None => {
614                    resources_write.push(resource.clone());
615                    resources_write.len() - 1
616                }
617            };
618
619            // Move the cursor to the i-th line and clear it
620            print!("\x1b[{};1H\x1B[2K", i + 2);
621
622            // Hash
623            print!("\r{:>44} ", hash_to_string(&resource.hash));
624
625            // Type
626            tstdout.set_color(&type_to_colorspec(&resource.rtype)).unwrap();
627            print!(
628                "{:>4} ",
629                match resource.rtype.as_str() {
630                    "unknown" => "?",
631                    "directory" => "dir",
632                    _ => resource.rtype.as_str(),
633                }
634            );
635            tstdout.reset().unwrap();
636
637            // Status
638            tstdout.set_color(&status_to_colorspec(&resource.status)).unwrap();
639            print!("{:>11} ", resource.status.as_str());
640            tstdout.reset().unwrap();
641
642            // Downloaded / Total (in bytes)
643            match resource.total_bytes_size {
644                0 => {
645                    print!("{:>5.1} {:>16} ", 0.0, "?");
646                }
647                _ => {
648                    let percent = resource.total_bytes_downloaded as f64 /
649                        resource.total_bytes_size as f64 *
650                        100.0;
651                    if resource.total_bytes_downloaded == resource.total_bytes_size {
652                        print!("{:>5.1} {:>16} ", percent, format_bytes(resource.total_bytes_size));
653                    } else {
654                        print!(
655                            "{:>5.1} {:>16} ",
656                            percent,
657                            format_progress_bytes(
658                                resource.total_bytes_downloaded,
659                                resource.total_bytes_size
660                            )
661                        );
662                    }
663                }
664            };
665
666            // Downloaded / Total (in chunks)
667            match resource.total_chunks_count {
668                0 => {
669                    print!("{:>9} ", format!("{}/?", resource.total_chunks_downloaded));
670                }
671                _ => {
672                    if resource.total_chunks_downloaded == resource.total_chunks_count {
673                        print!("{:>9} ", resource.total_chunks_count.to_string());
674                    } else {
675                        print!(
676                            "{:>9} ",
677                            format!(
678                                "{}/{}",
679                                resource.total_chunks_downloaded, resource.total_chunks_count
680                            )
681                        );
682                    }
683                }
684            };
685
686            // Download speed (in bytes/sec)
687            let speed_available = resource.total_bytes_downloaded < resource.total_bytes_size &&
688                resource.status.as_str() == "downloading" &&
689                !resource.speeds.is_empty();
690            print!(
691                "{:>12} ",
692                match speed_available {
693                    false => "-".to_string(),
694                    true => format!("{}/s", format_bytes(*resource.speeds.last().unwrap() as u64)),
695                }
696            );
697
698            // ETA
699            let eta = resource.get_eta();
700            print!(
701                "{:>6}",
702                match eta {
703                    0 => "-".to_string(),
704                    _ => format_duration(eta),
705                }
706            );
707
708            println!();
709
710            // Move the cursor to end
711            print!("\x1b[{};1H", resources_write.len() + 2);
712            stdout().flush().unwrap();
713        };
714
715        let print_begin = async || {
716            // Clear
717            print!("\x1B[2J\x1B[1;1H");
718
719            // Print column headers
720            println!(
721                "\x1b[4m{:>44} {:>4} {:>11} {:>5} {:>16} {:>9} {:>12} {:>6}\x1b[0m",
722                "Hash", "Type", "Status", "%", "Bytes", "Chunks", "Speed", "ETA"
723            );
724        };
725
726        print_begin().await;
727        if resources_json.is_empty() {
728            println!("No known resources");
729        } else {
730            for resource in resources_json.iter() {
731                let rs: Resource = resource.clone().into();
732                update_resource(&rs).await;
733            }
734        }
735
736        loop {
737            match subscription.receive().await {
738                JsonResult::Notification(n) => {
739                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
740                    let info = params.get("info");
741                    if info.is_none() {
742                        continue
743                    }
744                    let info = info.unwrap().get::<HashMap<String, JsonValue>>().unwrap();
745                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
746                        "download_started" |
747                        "metadata_download_completed" |
748                        "chunk_download_completed" |
749                        "download_completed" |
750                        "missing_chunks" |
751                        "metadata_not_found" |
752                        "resource_updated" => {
753                            let resource: Resource = info.get("resource").unwrap().clone().into();
754                            update_resource(&resource).await;
755                        }
756                        "resource_removed" => {
757                            {
758                                let hash = info.get("hash").unwrap().get::<String>().unwrap();
759                                let mut resources_write = resources.write().await;
760                                let i = resources_write
761                                    .iter()
762                                    .position(|r| hash_to_string(&r.hash) == *hash);
763                                if let Some(i) = i {
764                                    resources_write.remove(i);
765                                }
766                            }
767
768                            let r = resources.read().await.clone();
769                            print_begin().await;
770                            for resource in r.iter() {
771                                update_resource(resource).await;
772                            }
773                        }
774                        "download_error" => {
775                            // An error that caused the download to be unsuccessful
776                        }
777                        _ => {}
778                    }
779                }
780
781                JsonResult::Error(e) => {
782                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
783                }
784
785                x => {
786                    return Err(Error::UnexpectedJsonRpc(format!(
787                        "Got unexpected data from JSON-RPC: {x:?}"
788                    )))
789                }
790            }
791        }
792    }
793
794    async fn remove(&self, hash: String) -> Result<()> {
795        let req = JsonRequest::new("remove", JsonValue::Array(vec![JsonValue::String(hash)]));
796        self.rpc_client.request(req).await?;
797        Ok(())
798    }
799
800    async fn verify(&self, files: Option<Vec<String>>) -> Result<()> {
801        let files = files.unwrap_or_default().into_iter().map(JsonValue::String).collect();
802        let req = JsonRequest::new("verify", JsonValue::Array(files));
803        self.rpc_client.request(req).await?;
804        Ok(())
805    }
806
807    async fn lookup(&self, hash: String, ex: ExecutorPtr) -> Result<()> {
808        let publisher = Publisher::new();
809        let subscription = Arc::new(publisher.clone().subscribe().await);
810        let subscriber_task = StoppableTask::new();
811        let publisher_ = publisher.clone();
812        let rpc_client_ = self.rpc_client.clone();
813        subscriber_task.clone().start(
814            async move {
815                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
816                rpc_client_.subscribe(req, publisher).await
817            },
818            move |res| async move {
819                match res {
820                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
821                    Err(e) => {
822                        error!("{e}");
823                        publisher_
824                            .notify(JsonResult::Error(JsonError::new(
825                                ErrorCode::InternalError,
826                                None,
827                                0,
828                            )))
829                            .await;
830                    }
831                }
832            },
833            Error::DetachedTaskStopped,
834            ex.clone(),
835        );
836        let req =
837            JsonRequest::new("lookup", JsonValue::Array(vec![JsonValue::String(hash.clone())]));
838        let rpc_client_lookup = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
839        rpc_client_lookup.request(req).await?;
840
841        let print_seeders = |info: &HashMap<String, JsonValue>| {
842            let seeders = info.get("seeders").unwrap().get::<Vec<JsonValue>>().unwrap();
843            for seeder in seeders {
844                let seeder = seeder.get::<HashMap<String, JsonValue>>().unwrap();
845                let node: HashMap<String, JsonValue> =
846                    seeder.get("node").unwrap().clone().try_into().unwrap();
847                let node_id: String = node.get("id").unwrap().clone().try_into().unwrap();
848                let addresses: Vec<JsonValue> =
849                    node.get("addresses").unwrap().clone().try_into().unwrap();
850                let tree: Vec<_> = addresses
851                    .into_iter()
852                    .map(|addr| TreeNode::key(TryInto::<String>::try_into(addr).unwrap()))
853                    .collect();
854
855                print_tree(node_id.as_str(), &tree);
856            }
857        };
858
859        loop {
860            match subscription.receive().await {
861                JsonResult::Notification(n) => {
862                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
863                    let info =
864                        params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
865                    let hash_ = match info.get("hash") {
866                        Some(hash_value) => hash_value.get::<String>().unwrap(),
867                        None => continue,
868                    };
869                    if hash != *hash_ {
870                        continue;
871                    }
872
873                    if params.get("event").unwrap().get::<String>().unwrap().as_str() ==
874                        "seeders_found"
875                    {
876                        print_seeders(info);
877                        break
878                    }
879                }
880
881                JsonResult::Error(e) => {
882                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
883                }
884
885                x => {
886                    return Err(Error::UnexpectedJsonRpc(format!(
887                        "Got unexpected data from JSON-RPC: {x:?}"
888                    )))
889                }
890            }
891        }
892        Ok(())
893    }
894}
895
896fn main() -> Result<()> {
897    let args = Args::parse();
898
899    setup_logging(args.verbose, None)?;
900
901    let ex = Arc::new(smol::Executor::new());
902    smol::block_on(async {
903        ex.run(async {
904            let rpc_client = Arc::new(RpcClient::new(args.endpoint.clone(), ex.clone()).await?);
905            let fu = Fu { rpc_client, endpoint: args.endpoint.clone() };
906
907            match args.command {
908                Subcmd::Get { hash, path, files } => fu.get(hash, path, files, ex.clone()).await,
909                Subcmd::Put { path } => fu.put(path, ex.clone()).await,
910                Subcmd::Ls {} => fu.list_resources().await,
911                Subcmd::Watch {} => fu.watch(ex.clone()).await,
912                Subcmd::Rm { hash } => fu.remove(hash).await,
913                Subcmd::Buckets {} => fu.buckets().await,
914                Subcmd::Seeders {} => fu.seeders().await,
915                Subcmd::Verify { files } => fu.verify(files).await,
916                Subcmd::Lookup { hash } => fu.lookup(hash, ex.clone()).await,
917            }?;
918
919            Ok(())
920        })
921        .await
922    })
923}