1use clap::{Parser, Subcommand};
20use smol::lock::RwLock;
21use std::{
22 collections::HashMap,
23 io::{stdout, Write},
24 sync::Arc,
25};
26use termcolor::{ColorChoice, StandardStream, WriteColor};
27use tracing::error;
28use url::Url;
29
30use darkfi::{
31 cli_desc,
32 rpc::{
33 client::RpcClient,
34 jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
35 util::JsonValue,
36 },
37 system::{ExecutorPtr, Publisher, StoppableTask},
38 util::logger::setup_logging,
39 Error, Result,
40};
41
42use fud::{
43 resource::{Resource, ResourceStatus, ResourceType},
44 util::{hash_to_string, FileSelection},
45};
46
47mod util;
48use crate::util::{
49 format_bytes, format_duration, format_progress_bytes, optional_value, print_tree,
50 status_to_colorspec, type_to_colorspec, TreeNode,
51};
52
53#[derive(Parser)]
54#[clap(name = "fu", about = cli_desc!(), version)]
55#[clap(arg_required_else_help(true))]
56struct Args {
57 #[clap(short, action = clap::ArgAction::Count)]
58 verbose: u8,
60
61 #[clap(short, long, default_value = "tcp://127.0.0.1:9705")]
62 endpoint: Url,
64
65 #[clap(subcommand)]
66 command: Subcmd,
67}
68
69#[derive(Subcommand)]
70enum Subcmd {
71 Get {
73 hash: String,
75 path: Option<String>,
77 #[arg(short, long, num_args = 1..)]
79 files: Option<Vec<String>>,
80 },
81
82 Put {
84 path: String,
86 },
87
88 Ls {},
90
91 Watch {},
93
94 Rm {
96 hash: String,
98 },
99
100 Buckets {},
102
103 Seeders {},
105
106 Verify {
108 files: Option<Vec<String>>,
110 },
111
112 Lookup {
114 hash: String,
116 },
117}
118
119struct Fu {
120 pub rpc_client: Arc<RpcClient>,
121 pub endpoint: Url,
122}
123
124impl Fu {
125 async fn get(
126 &self,
127 hash: String,
128 path: Option<String>,
129 files: Option<Vec<String>>,
130 ex: ExecutorPtr,
131 ) -> Result<()> {
132 let publisher = Publisher::new();
133 let subscription = Arc::new(publisher.clone().subscribe().await);
134 let subscriber_task = StoppableTask::new();
135 let hash_ = hash.clone();
136 let publisher_ = publisher.clone();
137 let rpc_client_ = self.rpc_client.clone();
138 subscriber_task.clone().start(
139 async move {
140 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
141 rpc_client_.subscribe(req, publisher).await
142 },
143 move |res| async move {
144 match res {
145 Ok(()) | Err(Error::DetachedTaskStopped) => { }
146 Err(e) => {
147 error!("{e}");
148 publisher_
149 .notify(JsonResult::Error(JsonError::new(
150 ErrorCode::InternalError,
151 None,
152 0,
153 )))
154 .await;
155 }
156 }
157 },
158 Error::DetachedTaskStopped,
159 ex.clone(),
160 );
161
162 let progress_bar_width = 20;
163 let mut started = false;
164 let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
165
166 let mut print_progress = |info: &HashMap<String, JsonValue>| {
167 started = true;
168 let rs: Resource = info.get("resource").unwrap().clone().into();
169
170 print!("\x1B[2K\r"); let percent = if rs.target_bytes_downloaded > rs.target_bytes_size {
174 1.0
175 } else if rs.target_bytes_size > 0 {
176 rs.target_bytes_downloaded as f64 / rs.target_bytes_size as f64
177 } else {
178 0.0
179 };
180 let completed = (percent * progress_bar_width as f64) as usize;
181 let remaining = match progress_bar_width > completed {
182 true => progress_bar_width - completed,
183 false => 0,
184 };
185 let bar = "=".repeat(completed) + &" ".repeat(remaining);
186 print!("[{bar}] {:.1}% | ", percent * 100.0);
187
188 if rs.target_bytes_size > 0 {
190 if rs.target_bytes_downloaded == rs.target_bytes_size {
191 print!("{} | ", format_bytes(rs.target_bytes_size));
192 } else {
193 print!(
194 "{} | ",
195 format_progress_bytes(rs.target_bytes_downloaded, rs.target_bytes_size)
196 );
197 }
198 }
199
200 if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
202 print!("{}/s | ", format_bytes(*rs.speeds.last().unwrap() as u64));
203 }
204
205 if rs.target_chunks_count > 0 {
207 let s = if rs.target_chunks_count > 1 { "s" } else { "" };
208 if rs.target_chunks_downloaded == rs.target_chunks_count {
209 print!("{} chunk{s} | ", rs.target_chunks_count);
210 } else {
211 print!(
212 "{}/{} chunk{s} | ",
213 rs.target_chunks_downloaded, rs.target_chunks_count
214 );
215 }
216 }
217
218 if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
220 print!("ETA: {} | ", format_duration(rs.get_eta()));
221 }
222
223 let is_done = rs.target_chunks_downloaded == rs.target_chunks_count &&
225 rs.status.as_str() == "incomplete";
226 let status = if is_done { ResourceStatus::Seeding } else { rs.status };
227 tstdout.set_color(&status_to_colorspec(&status)).unwrap();
228 print!(
229 "{}",
230 if let ResourceStatus::Seeding = status { "done" } else { status.as_str() }
231 );
232 tstdout.reset().unwrap();
233 stdout().flush().unwrap();
234 };
235
236 let req = JsonRequest::new(
237 "get",
238 JsonValue::Array(vec![
239 JsonValue::String(hash_.clone()),
240 JsonValue::String(path.unwrap_or_default()),
241 match files {
242 Some(files) => {
243 JsonValue::Array(files.into_iter().map(JsonValue::String).collect())
244 }
245 None => JsonValue::Null,
246 },
247 ]),
248 );
249 let rpc_client_getter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
251 let _ = rpc_client_getter.request(req).await?;
252
253 loop {
254 match subscription.receive().await {
255 JsonResult::Notification(n) => {
256 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
257 let info = params.get("info");
258 if info.is_none() {
259 continue
260 }
261 let info = info.unwrap().get::<HashMap<String, JsonValue>>().unwrap();
262
263 let hash = match info.get("hash") {
264 Some(hash_value) => hash_value.get::<String>().unwrap(),
265 None => continue,
266 };
267 if *hash != hash_ {
268 continue;
269 }
270 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
271 "download_started" |
272 "metadata_download_completed" |
273 "chunk_download_completed" |
274 "resource_updated" => {
275 print_progress(info);
276 }
277 "download_completed" => {
278 let resource_json = info
279 .get("resource")
280 .unwrap()
281 .get::<HashMap<String, JsonValue>>()
282 .unwrap();
283 let path = resource_json.get("path").unwrap().get::<String>().unwrap();
284 print_progress(info);
285 println!("\nDownload completed:\n{path}");
286 return Ok(());
287 }
288 "metadata_not_found" => {
289 println!();
290 return Err(Error::Custom(format!("Could not find {hash}")));
291 }
292 "chunk_not_found" => {
293 }
296 "missing_chunks" => {
297 println!();
299 return Err(Error::Custom("Missing chunks".to_string()));
300 }
301 "download_error" => {
302 if started {
304 println!();
305 }
306 return Err(Error::Custom(
307 info.get("error").unwrap().get::<String>().unwrap().to_string(),
308 ));
309 }
310 _ => {}
311 }
312 }
313
314 JsonResult::Error(e) => {
315 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
316 }
317
318 x => {
319 return Err(Error::UnexpectedJsonRpc(format!(
320 "Got unexpected data from JSON-RPC: {x:?}"
321 )))
322 }
323 }
324 }
325 }
326
327 async fn put(&self, path: String, ex: ExecutorPtr) -> Result<()> {
328 let publisher = Publisher::new();
329 let subscription = Arc::new(publisher.clone().subscribe().await);
330 let subscriber_task = StoppableTask::new();
331 let publisher_ = publisher.clone();
332 let rpc_client_ = self.rpc_client.clone();
333 subscriber_task.clone().start(
334 async move {
335 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
336 rpc_client_.subscribe(req, publisher).await
337 },
338 move |res| async move {
339 match res {
340 Ok(()) | Err(Error::DetachedTaskStopped) => { }
341 Err(e) => {
342 error!("{e}");
343 publisher_
344 .notify(JsonResult::Error(JsonError::new(
345 ErrorCode::InternalError,
346 None,
347 0,
348 )))
349 .await;
350 }
351 }
352 },
353 Error::DetachedTaskStopped,
354 ex.clone(),
355 );
356
357 let rpc_client_putter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
358 let req = JsonRequest::new("put", JsonValue::Array(vec![JsonValue::String(path)]));
359 let rep = rpc_client_putter.request(req).await?;
360 let path_str = rep.get::<String>().unwrap().clone();
361
362 loop {
363 match subscription.receive().await {
364 JsonResult::Notification(n) => {
365 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
366 let info =
367 params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
368 let path = match info.get("path") {
369 Some(path) => path.get::<String>().unwrap(),
370 None => continue,
371 };
372 if *path != path_str {
373 continue;
374 }
375
376 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
377 "insert_completed" => {
378 let id = info.get("hash").unwrap().get::<String>().unwrap().to_string();
379 println!("{id}");
380 break Ok(())
381 }
382 "insert_error" => {
383 return Err(Error::Custom(
384 info.get("error").unwrap().get::<String>().unwrap().to_string(),
385 ));
386 }
387 _ => {}
388 }
389 }
390
391 JsonResult::Error(e) => {
392 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
393 }
394
395 x => {
396 return Err(Error::UnexpectedJsonRpc(format!(
397 "Got unexpected data from JSON-RPC: {x:?}"
398 )))
399 }
400 }
401 }
402 }
403
404 async fn list_resources(&self) -> Result<()> {
405 let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
406 let rep = self.rpc_client.request(req).await?;
407
408 let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
409 let resources: Vec<Resource> = resources_json.into_iter().map(|v| v.into()).collect();
410
411 for resource in resources.iter() {
412 let mut tree: Vec<TreeNode<String>> = vec![
413 TreeNode::kv("ID".to_string(), hash_to_string(&resource.hash)),
414 TreeNode::kvc(
415 "Type".to_string(),
416 resource.rtype.as_str().to_string(),
417 type_to_colorspec(&resource.rtype),
418 ),
419 TreeNode::kvc(
420 "Status".to_string(),
421 resource.status.as_str().to_string(),
422 status_to_colorspec(&resource.status),
423 ),
424 TreeNode::kv("Chunks".to_string(), {
425 if let ResourceType::Directory = resource.rtype {
426 format!(
427 "{}/{} ({}/{})",
428 resource.total_chunks_downloaded,
429 optional_value!(resource.total_chunks_count),
430 resource.target_chunks_downloaded,
431 optional_value!(resource.target_chunks_count)
432 )
433 } else {
434 format!(
435 "{}/{}",
436 resource.total_chunks_downloaded,
437 optional_value!(resource.total_chunks_count)
438 )
439 }
440 }),
441 TreeNode::kv("Bytes".to_string(), {
442 if let ResourceType::Directory = resource.rtype {
443 format!(
444 "{} ({})",
445 optional_value!(resource.total_bytes_size, |x: u64| {
446 format_progress_bytes(resource.total_bytes_downloaded, x)
447 }),
448 optional_value!(resource.target_bytes_size, |x: u64| {
449 format_progress_bytes(resource.target_bytes_downloaded, x)
450 })
451 )
452 } else {
453 optional_value!(resource.total_bytes_size, |x: u64| format_progress_bytes(
454 resource.total_bytes_downloaded,
455 x
456 ))
457 }
458 }),
459 ];
460 if let FileSelection::Set(set) = &resource.file_selection {
461 tree.push(TreeNode::key("Selected files".to_string()));
462 tree.last_mut().unwrap().children = set
463 .clone()
464 .into_iter()
465 .map(|path| TreeNode::key(path.to_string_lossy().to_string()))
466 .collect();
467 }
468 print_tree(&resource.path.to_string_lossy(), &tree);
469 }
470
471 Ok(())
472 }
473
474 async fn buckets(&self) -> Result<()> {
475 let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![]));
476 let rep = self.rpc_client.request(req).await?;
477 let buckets: Vec<JsonValue> = rep.try_into().unwrap();
478 let mut empty = true;
479 for (bucket_i, bucket) in buckets.into_iter().enumerate() {
480 let nodes: Vec<JsonValue> = bucket.try_into().unwrap();
481 if nodes.is_empty() {
482 continue
483 }
484 empty = false;
485
486 let tree: Vec<TreeNode<String>> = nodes
487 .into_iter()
488 .map(|n| {
489 let node: Vec<JsonValue> = n.try_into().unwrap();
490 let node_id: JsonValue = node[0].clone();
491 let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
492
493 let addresses_vec: Vec<String> = addresses
494 .into_iter()
495 .map(|addr| TryInto::<String>::try_into(addr).unwrap())
496 .collect();
497
498 let node_id_string: String = node_id.try_into().unwrap();
499
500 TreeNode {
501 key: node_id_string,
502 value: None,
503 color: None,
504 children: addresses_vec
505 .into_iter()
506 .map(|addr| TreeNode::key(addr.clone()))
507 .collect(),
508 }
509 })
510 .collect();
511
512 print_tree(format!("Bucket {bucket_i}").as_str(), &tree);
513 }
514
515 if empty {
516 println!("All buckets are empty");
517 }
518
519 Ok(())
520 }
521
522 async fn seeders(&self) -> Result<()> {
523 let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
524 let rep = self.rpc_client.request(req).await?;
525
526 let resources: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap();
527
528 if resources.is_empty() {
529 println!("No known seeders");
530 return Ok(())
531 }
532
533 for (hash, nodes) in resources {
534 let nodes: Vec<JsonValue> = nodes.try_into().unwrap();
535 let tree: Vec<TreeNode<String>> = nodes
536 .into_iter()
537 .map(|n| {
538 let node: Vec<JsonValue> = n.try_into().unwrap();
539 let node_id: JsonValue = node[0].clone();
540 let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
541
542 let addresses_vec: Vec<String> = addresses
543 .into_iter()
544 .map(|addr| TryInto::<String>::try_into(addr).unwrap())
545 .collect();
546
547 let node_id_string: String = node_id.try_into().unwrap();
548
549 TreeNode {
550 key: node_id_string,
551 value: None,
552 color: None,
553 children: addresses_vec
554 .into_iter()
555 .map(|addr| TreeNode::key(addr.clone()))
556 .collect(),
557 }
558 })
559 .collect();
560
561 print_tree(&hash, &tree);
562 }
563
564 Ok(())
565 }
566
567 async fn watch(&self, ex: ExecutorPtr) -> Result<()> {
568 let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
569 let rep = self.rpc_client.request(req).await?;
570
571 let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
572 let resources: Arc<RwLock<Vec<Resource>>> = Arc::new(RwLock::new(vec![]));
573
574 let publisher = Publisher::new();
575 let subscription = Arc::new(publisher.clone().subscribe().await);
576 let subscriber_task = StoppableTask::new();
577 let publisher_ = publisher.clone();
578 let rpc_client_ = self.rpc_client.clone();
579 subscriber_task.clone().start(
580 async move {
581 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
582 rpc_client_.subscribe(req, publisher).await
583 },
584 move |res| async move {
585 match res {
586 Ok(()) | Err(Error::DetachedTaskStopped) => { }
587 Err(e) => {
588 error!("{e}");
589 publisher_
590 .notify(JsonResult::Error(JsonError::new(
591 ErrorCode::InternalError,
592 None,
593 0,
594 )))
595 .await;
596 }
597 }
598 },
599 Error::DetachedTaskStopped,
600 ex,
601 );
602
603 let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
604
605 let mut update_resource = async |resource: &Resource| {
606 let mut resources_write = resources.write().await;
607 let i = match resources_write.iter().position(|r| r.hash == resource.hash) {
608 Some(i) => {
609 resources_write.remove(i);
610 resources_write.insert(i, resource.clone());
611 i
612 }
613 None => {
614 resources_write.push(resource.clone());
615 resources_write.len() - 1
616 }
617 };
618
619 print!("\x1b[{};1H\x1B[2K", i + 2);
621
622 print!("\r{:>44} ", hash_to_string(&resource.hash));
624
625 tstdout.set_color(&type_to_colorspec(&resource.rtype)).unwrap();
627 print!(
628 "{:>4} ",
629 match resource.rtype.as_str() {
630 "unknown" => "?",
631 "directory" => "dir",
632 _ => resource.rtype.as_str(),
633 }
634 );
635 tstdout.reset().unwrap();
636
637 tstdout.set_color(&status_to_colorspec(&resource.status)).unwrap();
639 print!("{:>11} ", resource.status.as_str());
640 tstdout.reset().unwrap();
641
642 match resource.total_bytes_size {
644 0 => {
645 print!("{:>5.1} {:>16} ", 0.0, "?");
646 }
647 _ => {
648 let percent = resource.total_bytes_downloaded as f64 /
649 resource.total_bytes_size as f64 *
650 100.0;
651 if resource.total_bytes_downloaded == resource.total_bytes_size {
652 print!("{:>5.1} {:>16} ", percent, format_bytes(resource.total_bytes_size));
653 } else {
654 print!(
655 "{:>5.1} {:>16} ",
656 percent,
657 format_progress_bytes(
658 resource.total_bytes_downloaded,
659 resource.total_bytes_size
660 )
661 );
662 }
663 }
664 };
665
666 match resource.total_chunks_count {
668 0 => {
669 print!("{:>9} ", format!("{}/?", resource.total_chunks_downloaded));
670 }
671 _ => {
672 if resource.total_chunks_downloaded == resource.total_chunks_count {
673 print!("{:>9} ", resource.total_chunks_count.to_string());
674 } else {
675 print!(
676 "{:>9} ",
677 format!(
678 "{}/{}",
679 resource.total_chunks_downloaded, resource.total_chunks_count
680 )
681 );
682 }
683 }
684 };
685
686 let speed_available = resource.total_bytes_downloaded < resource.total_bytes_size &&
688 resource.status.as_str() == "downloading" &&
689 !resource.speeds.is_empty();
690 print!(
691 "{:>12} ",
692 match speed_available {
693 false => "-".to_string(),
694 true => format!("{}/s", format_bytes(*resource.speeds.last().unwrap() as u64)),
695 }
696 );
697
698 let eta = resource.get_eta();
700 print!(
701 "{:>6}",
702 match eta {
703 0 => "-".to_string(),
704 _ => format_duration(eta),
705 }
706 );
707
708 println!();
709
710 print!("\x1b[{};1H", resources_write.len() + 2);
712 stdout().flush().unwrap();
713 };
714
715 let print_begin = async || {
716 print!("\x1B[2J\x1B[1;1H");
718
719 println!(
721 "\x1b[4m{:>44} {:>4} {:>11} {:>5} {:>16} {:>9} {:>12} {:>6}\x1b[0m",
722 "Hash", "Type", "Status", "%", "Bytes", "Chunks", "Speed", "ETA"
723 );
724 };
725
726 print_begin().await;
727 if resources_json.is_empty() {
728 println!("No known resources");
729 } else {
730 for resource in resources_json.iter() {
731 let rs: Resource = resource.clone().into();
732 update_resource(&rs).await;
733 }
734 }
735
736 loop {
737 match subscription.receive().await {
738 JsonResult::Notification(n) => {
739 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
740 let info = params.get("info");
741 if info.is_none() {
742 continue
743 }
744 let info = info.unwrap().get::<HashMap<String, JsonValue>>().unwrap();
745 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
746 "download_started" |
747 "metadata_download_completed" |
748 "chunk_download_completed" |
749 "download_completed" |
750 "missing_chunks" |
751 "metadata_not_found" |
752 "resource_updated" => {
753 let resource: Resource = info.get("resource").unwrap().clone().into();
754 update_resource(&resource).await;
755 }
756 "resource_removed" => {
757 {
758 let hash = info.get("hash").unwrap().get::<String>().unwrap();
759 let mut resources_write = resources.write().await;
760 let i = resources_write
761 .iter()
762 .position(|r| hash_to_string(&r.hash) == *hash);
763 if let Some(i) = i {
764 resources_write.remove(i);
765 }
766 }
767
768 let r = resources.read().await.clone();
769 print_begin().await;
770 for resource in r.iter() {
771 update_resource(resource).await;
772 }
773 }
774 "download_error" => {
775 }
777 _ => {}
778 }
779 }
780
781 JsonResult::Error(e) => {
782 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
783 }
784
785 x => {
786 return Err(Error::UnexpectedJsonRpc(format!(
787 "Got unexpected data from JSON-RPC: {x:?}"
788 )))
789 }
790 }
791 }
792 }
793
794 async fn remove(&self, hash: String) -> Result<()> {
795 let req = JsonRequest::new("remove", JsonValue::Array(vec![JsonValue::String(hash)]));
796 self.rpc_client.request(req).await?;
797 Ok(())
798 }
799
800 async fn verify(&self, files: Option<Vec<String>>) -> Result<()> {
801 let files = files.unwrap_or_default().into_iter().map(JsonValue::String).collect();
802 let req = JsonRequest::new("verify", JsonValue::Array(files));
803 self.rpc_client.request(req).await?;
804 Ok(())
805 }
806
807 async fn lookup(&self, hash: String, ex: ExecutorPtr) -> Result<()> {
808 let publisher = Publisher::new();
809 let subscription = Arc::new(publisher.clone().subscribe().await);
810 let subscriber_task = StoppableTask::new();
811 let publisher_ = publisher.clone();
812 let rpc_client_ = self.rpc_client.clone();
813 subscriber_task.clone().start(
814 async move {
815 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
816 rpc_client_.subscribe(req, publisher).await
817 },
818 move |res| async move {
819 match res {
820 Ok(()) | Err(Error::DetachedTaskStopped) => { }
821 Err(e) => {
822 error!("{e}");
823 publisher_
824 .notify(JsonResult::Error(JsonError::new(
825 ErrorCode::InternalError,
826 None,
827 0,
828 )))
829 .await;
830 }
831 }
832 },
833 Error::DetachedTaskStopped,
834 ex.clone(),
835 );
836 let req =
837 JsonRequest::new("lookup", JsonValue::Array(vec![JsonValue::String(hash.clone())]));
838 let rpc_client_lookup = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
839 rpc_client_lookup.request(req).await?;
840
841 let print_seeders = |info: &HashMap<String, JsonValue>| {
842 let seeders = info.get("seeders").unwrap().get::<Vec<JsonValue>>().unwrap();
843 for seeder in seeders {
844 let seeder = seeder.get::<HashMap<String, JsonValue>>().unwrap();
845 let node: HashMap<String, JsonValue> =
846 seeder.get("node").unwrap().clone().try_into().unwrap();
847 let node_id: String = node.get("id").unwrap().clone().try_into().unwrap();
848 let addresses: Vec<JsonValue> =
849 node.get("addresses").unwrap().clone().try_into().unwrap();
850 let tree: Vec<_> = addresses
851 .into_iter()
852 .map(|addr| TreeNode::key(TryInto::<String>::try_into(addr).unwrap()))
853 .collect();
854
855 print_tree(node_id.as_str(), &tree);
856 }
857 };
858
859 loop {
860 match subscription.receive().await {
861 JsonResult::Notification(n) => {
862 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
863 let info =
864 params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
865 let hash_ = match info.get("hash") {
866 Some(hash_value) => hash_value.get::<String>().unwrap(),
867 None => continue,
868 };
869 if hash != *hash_ {
870 continue;
871 }
872
873 if params.get("event").unwrap().get::<String>().unwrap().as_str() ==
874 "seeders_found"
875 {
876 print_seeders(info);
877 break
878 }
879 }
880
881 JsonResult::Error(e) => {
882 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
883 }
884
885 x => {
886 return Err(Error::UnexpectedJsonRpc(format!(
887 "Got unexpected data from JSON-RPC: {x:?}"
888 )))
889 }
890 }
891 }
892 Ok(())
893 }
894}
895
896fn main() -> Result<()> {
897 let args = Args::parse();
898
899 setup_logging(args.verbose, None)?;
900
901 let ex = Arc::new(smol::Executor::new());
902 smol::block_on(async {
903 ex.run(async {
904 let rpc_client = Arc::new(RpcClient::new(args.endpoint.clone(), ex.clone()).await?);
905 let fu = Fu { rpc_client, endpoint: args.endpoint.clone() };
906
907 match args.command {
908 Subcmd::Get { hash, path, files } => fu.get(hash, path, files, ex.clone()).await,
909 Subcmd::Put { path } => fu.put(path, ex.clone()).await,
910 Subcmd::Ls {} => fu.list_resources().await,
911 Subcmd::Watch {} => fu.watch(ex.clone()).await,
912 Subcmd::Rm { hash } => fu.remove(hash).await,
913 Subcmd::Buckets {} => fu.buckets().await,
914 Subcmd::Seeders {} => fu.seeders().await,
915 Subcmd::Verify { files } => fu.verify(files).await,
916 Subcmd::Lookup { hash } => fu.lookup(hash, ex.clone()).await,
917 }?;
918
919 Ok(())
920 })
921 .await
922 })
923}