darkfi/net/protocol/
protocol_jobs_manager.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use smol::{future::Future, lock::Mutex, Executor, Task};
22use tracing::{debug, trace};
23
24use super::super::channel::ChannelPtr;
25use crate::Result;
26
27/// Pointer to protocol jobs manager
28pub type ProtocolJobsManagerPtr = Arc<ProtocolJobsManager>;
29
30pub struct ProtocolJobsManager {
31    name: &'static str,
32    channel: ChannelPtr,
33    tasks: Mutex<Vec<Task<Result<()>>>>,
34}
35
36impl ProtocolJobsManager {
37    /// Create a new protocol jobs manager
38    pub fn new(name: &'static str, channel: ChannelPtr) -> ProtocolJobsManagerPtr {
39        Arc::new(Self { name, channel, tasks: Mutex::new(vec![]) })
40    }
41
42    /// Returns configured name
43    pub fn name(self: Arc<Self>) -> &'static str {
44        self.name
45    }
46
47    /// Runs the task on an executor
48    pub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) {
49        executor.spawn(self.handle_stop()).detach()
50    }
51
52    /// Spawns a new task and adds it to the internal queue
53    pub async fn spawn<'a, F>(&self, future: F, executor: Arc<Executor<'a>>)
54    where
55        F: Future<Output = Result<()>> + Send + 'a,
56    {
57        self.tasks.lock().await.push(executor.spawn(future))
58    }
59
60    /// Waits for a stop signal, then closes all tasks.
61    /// Ensures that all tasks are stopped when a channel closes.
62    /// Called in `start()`
63    async fn handle_stop(self: Arc<Self>) {
64        let stop_sub = self.channel.subscribe_stop().await;
65
66        if let Ok(stop_sub) = stop_sub {
67            // Wait for the stop signal
68            stop_sub.receive().await;
69        }
70
71        self.close_all_tasks().await
72    }
73
74    /// Closes all open tasks. Takes all the tasks from the internal queue.
75    async fn close_all_tasks(self: Arc<Self>) {
76        debug!(
77            target: "net::protocol_jobs_manager",
78            "ProtocolJobsManager::close_all_tasks() [START, name={}, addr={}]",
79            self.name, self.channel.display_address(),
80        );
81
82        let tasks = std::mem::take(&mut *self.tasks.lock().await);
83
84        trace!(target: "net::protocol_jobs_manager", "Cancelling {} tasks", tasks.len());
85        let mut i = 0;
86        #[allow(clippy::explicit_counter_loop)]
87        for task in tasks {
88            trace!(target: "net::protocol_jobs_manager", "Cancelling task #{i}");
89            let _ = task.cancel().await;
90            trace!(target: "net::protocol_jobs_manager", "Cancelled task #{i}");
91            i += 1;
92        }
93    }
94}