darkfi/net/protocol/
protocol_jobs_manager.rs1use std::sync::Arc;
20
21use smol::{future::Future, lock::Mutex, Executor, Task};
22use tracing::{debug, trace};
23
24use super::super::channel::ChannelPtr;
25use crate::Result;
26
27pub type ProtocolJobsManagerPtr = Arc<ProtocolJobsManager>;
29
30pub struct ProtocolJobsManager {
31 name: &'static str,
32 channel: ChannelPtr,
33 tasks: Mutex<Vec<Task<Result<()>>>>,
34}
35
36impl ProtocolJobsManager {
37 pub fn new(name: &'static str, channel: ChannelPtr) -> ProtocolJobsManagerPtr {
39 Arc::new(Self { name, channel, tasks: Mutex::new(vec![]) })
40 }
41
42 pub fn name(self: Arc<Self>) -> &'static str {
44 self.name
45 }
46
47 pub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) {
49 executor.spawn(self.handle_stop()).detach()
50 }
51
52 pub async fn spawn<'a, F>(&self, future: F, executor: Arc<Executor<'a>>)
54 where
55 F: Future<Output = Result<()>> + Send + 'a,
56 {
57 self.tasks.lock().await.push(executor.spawn(future))
58 }
59
60 async fn handle_stop(self: Arc<Self>) {
64 let stop_sub = self.channel.subscribe_stop().await;
65
66 if let Ok(stop_sub) = stop_sub {
67 stop_sub.receive().await;
69 }
70
71 self.close_all_tasks().await
72 }
73
74 async fn close_all_tasks(self: Arc<Self>) {
76 debug!(
77 target: "net::protocol_jobs_manager",
78 "ProtocolJobsManager::close_all_tasks() [START, name={}, addr={}]",
79 self.name, self.channel.display_address(),
80 );
81
82 let tasks = std::mem::take(&mut *self.tasks.lock().await);
83
84 trace!(target: "net::protocol_jobs_manager", "Cancelling {} tasks", tasks.len());
85 let mut i = 0;
86 #[allow(clippy::explicit_counter_loop)]
87 for task in tasks {
88 trace!(target: "net::protocol_jobs_manager", "Cancelling task #{i}");
89 let _ = task.cancel().await;
90 trace!(target: "net::protocol_jobs_manager", "Cancelled task #{i}");
91 i += 1;
92 }
93 }
94}