darkfi/system/
publisher.rs1use std::{collections::HashMap, sync::Arc};
20
21use rand::{rngs::OsRng, Rng};
22use smol::lock::Mutex;
23use tracing::warn;
24
25pub type PublisherPtr<T> = Arc<Publisher<T>>;
26pub type SubscriptionId = usize;
27
28#[derive(Debug)]
29pub struct Subscription<T> {
31 id: SubscriptionId,
32 recv_queue: smol::channel::Receiver<T>,
33 parent: Arc<Publisher<T>>,
34}
35
36impl<T: Clone> Subscription<T> {
37 pub fn get_id(&self) -> SubscriptionId {
38 self.id
39 }
40
41 pub async fn receive(&self) -> T {
43 let message_result = self.recv_queue.recv().await;
44
45 match message_result {
46 Ok(message_result) => message_result,
47 Err(err) => {
48 panic!("Subscription::receive() recv_queue failed! {err}");
49 }
50 }
51 }
52
53 pub async fn unsubscribe(&self) {
55 self.parent.clone().unsubscribe(self.id).await
56 }
57}
58
59#[derive(Debug)]
61pub struct Publisher<T> {
62 subs: Mutex<HashMap<SubscriptionId, smol::channel::Sender<T>>>,
63}
64
65impl<T: Clone> Publisher<T> {
66 pub fn new() -> Arc<Self> {
68 Arc::new(Self { subs: Mutex::new(HashMap::new()) })
69 }
70
71 fn random_id() -> SubscriptionId {
72 OsRng.gen()
73 }
74
75 pub async fn subscribe(self: Arc<Self>) -> Subscription<T> {
80 let (sender, recvr) = smol::channel::unbounded();
81
82 let mut subs = self.subs.lock().await;
84 let mut sub_id = Self::random_id();
85 while subs.contains_key(&sub_id) {
86 sub_id = Self::random_id();
87 }
88
89 subs.insert(sub_id, sender);
90
91 Subscription { id: sub_id, recv_queue: recvr, parent: self.clone() }
92 }
93
94 async fn unsubscribe(self: Arc<Self>, sub_id: SubscriptionId) {
95 self.subs.lock().await.remove(&sub_id);
96 }
97
98 pub async fn notify(&self, message_result: T) {
100 self.notify_with_exclude(message_result, &[]).await
101 }
102
103 pub async fn notify_with_exclude(&self, message_result: T, exclude_list: &[SubscriptionId]) {
105 for (id, sub) in (*self.subs.lock().await).iter() {
106 if exclude_list.contains(id) {
107 continue
108 }
109
110 if let Err(e) = sub.send(message_result.clone()).await {
111 warn!(
112 target: "system::publisher",
113 "[system::publisher] Error returned sending message in notify_with_exclude() call! {e}"
114 );
115 }
116 }
117 }
118
119 pub async fn clear_inactive(&self) -> bool {
123 let mut subs = self.subs.lock().await;
125
126 let mut dropped = vec![];
128 for (sub, channel) in subs.iter() {
129 if channel.receiver_count() == 0 {
130 dropped.push(*sub);
131 }
132 }
133
134 for sub in dropped {
136 subs.remove(&sub);
137 }
138
139 subs.is_empty()
141 }
142}