darkfi/system/
publisher.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{collections::HashMap, sync::Arc};
20
21use rand::{rngs::OsRng, Rng};
22use smol::lock::Mutex;
23use tracing::warn;
24
25pub type PublisherPtr<T> = Arc<Publisher<T>>;
26pub type SubscriptionId = usize;
27
28#[derive(Debug)]
29/// Subscription to the Publisher. Created using `publisher.subscribe().await`.
30pub struct Subscription<T> {
31    id: SubscriptionId,
32    recv_queue: smol::channel::Receiver<T>,
33    parent: Arc<Publisher<T>>,
34}
35
36impl<T: Clone> Subscription<T> {
37    pub fn get_id(&self) -> SubscriptionId {
38        self.id
39    }
40
41    /// Receive message.
42    pub async fn receive(&self) -> T {
43        let message_result = self.recv_queue.recv().await;
44
45        match message_result {
46            Ok(message_result) => message_result,
47            Err(err) => {
48                panic!("Subscription::receive() recv_queue failed! {err}");
49            }
50        }
51    }
52
53    /// Must be called manually since async Drop is not possible in Rust
54    pub async fn unsubscribe(&self) {
55        self.parent.clone().unsubscribe(self.id).await
56    }
57}
58
59/// Simple broadcast (publish-subscribe) class.
60#[derive(Debug)]
61pub struct Publisher<T> {
62    subs: Mutex<HashMap<SubscriptionId, smol::channel::Sender<T>>>,
63}
64
65impl<T: Clone> Publisher<T> {
66    /// Construct a new publisher.
67    pub fn new() -> Arc<Self> {
68        Arc::new(Self { subs: Mutex::new(HashMap::new()) })
69    }
70
71    fn random_id() -> SubscriptionId {
72        OsRng.gen()
73    }
74
75    /// Make sure you call this method early in your setup. That way the subscription
76    /// will begin accumulating messages from notify.
77    /// Then when your main loop begins calling `sub.receive().await`, the messages will
78    /// already be queued.
79    pub async fn subscribe(self: Arc<Self>) -> Subscription<T> {
80        let (sender, recvr) = smol::channel::unbounded();
81
82        // Poor-man's do/while
83        let mut subs = self.subs.lock().await;
84        let mut sub_id = Self::random_id();
85        while subs.contains_key(&sub_id) {
86            sub_id = Self::random_id();
87        }
88
89        subs.insert(sub_id, sender);
90
91        Subscription { id: sub_id, recv_queue: recvr, parent: self.clone() }
92    }
93
94    async fn unsubscribe(self: Arc<Self>, sub_id: SubscriptionId) {
95        self.subs.lock().await.remove(&sub_id);
96    }
97
98    /// Publish a message to all listening subscriptions.
99    pub async fn notify(&self, message_result: T) {
100        self.notify_with_exclude(message_result, &[]).await
101    }
102
103    /// Publish a message to all listening subscriptions but exclude some subset.
104    pub async fn notify_with_exclude(&self, message_result: T, exclude_list: &[SubscriptionId]) {
105        for (id, sub) in (*self.subs.lock().await).iter() {
106            if exclude_list.contains(id) {
107                continue
108            }
109
110            if let Err(e) = sub.send(message_result.clone()).await {
111                warn!(
112                    target: "system::publisher",
113                    "[system::publisher] Error returned sending message in notify_with_exclude() call! {e}"
114                );
115            }
116        }
117    }
118
119    /// Clear inactive subscribtions.
120    /// Returns a flag indicating if we have active subscriptions
121    /// after cleanup.
122    pub async fn clear_inactive(&self) -> bool {
123        // Grab a lock over current jobs
124        let mut subs = self.subs.lock().await;
125
126        // Find inactive subscriptions
127        let mut dropped = vec![];
128        for (sub, channel) in subs.iter() {
129            if channel.receiver_count() == 0 {
130                dropped.push(*sub);
131            }
132        }
133
134        // Drop inactive subscriptions
135        for sub in dropped {
136            subs.remove(&sub);
137        }
138
139        // Return flag indicating if we still have subscriptions
140        subs.is_empty()
141    }
142}