darkfi/system/
condvar.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    future::Future,
21    pin::Pin,
22    sync::Mutex,
23    task::{Context, Poll, Waker},
24};
25
26/// Condition variables allow you to block a task while waiting for an event to occur.
27/// Condition variables are typically associated with a boolean predicate (a condition).
28/// ```rust
29///    let cv = Arc::new(CondVar::new());
30///
31///    let cv_ = cv.clone();
32///    executor_
33///        .spawn(async move {
34///            // Waits here until notify() is called
35///            cv_.wait().await;
36///            // Check for some condition...
37///        })
38///        .detach();
39///
40///    // Allow above code to continue
41///    cv.notify();
42/// ```
43/// After the condition variable is woken up, the user may `wait` again for another `notify`
44/// signal by first calling `cv_.reset()`.
45pub struct CondVar {
46    state: Mutex<CondVarState>,
47}
48
49struct CondVarState {
50    is_awake: bool,
51    wakers: Vec<Waker>,
52}
53
54impl CondVar {
55    pub const fn new() -> Self {
56        Self { state: Mutex::new(CondVarState { is_awake: false, wakers: Vec::new() }) }
57    }
58
59    /// Wakeup the waiting task. Subsequent calls to this do nothing until `wait()` is called.
60    pub fn notify(&self) {
61        let wakers = {
62            let mut state = self.state.lock().unwrap();
63            state.is_awake = true;
64            std::mem::take(&mut state.wakers)
65        };
66        // Notify the executor that the pending future from wait() is to be polled again.
67        for waker in wakers {
68            waker.wake()
69        }
70    }
71
72    /// Reset the condition variable and wait for a notification
73    pub fn wait(&self) -> CondVarWait<'_> {
74        CondVarWait { state: &self.state }
75    }
76
77    /// Reset self ready to wait() again.
78    /// The reason this is separate from `wait()` is that usually
79    /// on the first `wait()` we want to catch any `notify()` calls that
80    /// happened before we started. For example,
81    /// ```rust
82    /// loop {
83    ///     // Wait for signal
84    ///     cv.wait().await;
85    ///
86    ///     // Do stuff...
87    ///
88    ///     cv.reset();
89    /// }
90    /// ```
91    pub fn reset(&self) {
92        let mut state = self.state.lock().unwrap();
93        state.is_awake = false;
94    }
95}
96
97impl Default for CondVar {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103/// Awaitable futures object returned by `condvar.wait()`
104pub struct CondVarWait<'a> {
105    state: &'a Mutex<CondVarState>,
106}
107
108impl Future for CondVarWait<'_> {
109    type Output = ();
110
111    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
112        let mut state = self.state.lock().unwrap();
113
114        if state.is_awake {
115            return Poll::Ready(())
116        }
117
118        let cx_waker = cx.waker();
119
120        // Do we have any wakers in the list?
121        if !state.wakers.iter().any(|w| cx_waker.will_wake(w)) {
122            // Add our waker
123            state.wakers.push(cx_waker.clone())
124        }
125
126        Poll::Pending
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use futures::{select, FutureExt};
134    use smol::Executor;
135    use std::sync::Arc;
136
137    #[test]
138    fn condvar_test() {
139        let executor = Arc::new(Executor::new());
140        let executor_ = executor.clone();
141        smol::block_on(executor.run(async move {
142            let cv = Arc::new(CondVar::new());
143
144            let cv_ = cv.clone();
145            let t = executor_.spawn(async move {
146                // Waits here until notify() is called
147                cv_.wait().await;
148            });
149
150            // Allow above code to continue
151            cv.notify();
152
153            t.await;
154        }))
155    }
156
157    #[test]
158    fn condvar_reset() {
159        let executor = Arc::new(Executor::new());
160        let executor_ = executor.clone();
161        smol::block_on(executor.run(async move {
162            let cv = Arc::new(CondVar::new());
163
164            let cv_ = cv.clone();
165            let t = executor_.spawn(async move {
166                cv_.wait().await;
167            });
168
169            // #1 send signal
170            cv.notify();
171            // Multiple calls to notify do nothing until we call reset()
172            cv.notify();
173
174            t.await;
175
176            // Without calling reset(), then the wait() will return instantly
177            cv.reset();
178
179            let cv_ = cv.clone();
180            let t = executor_.spawn(async move {
181                cv_.wait().await;
182            });
183
184            // #2 send signal again
185            cv.notify();
186
187            t.await;
188        }))
189    }
190
191    #[test]
192    fn condvar_double_wait() {
193        let executor = Arc::new(Executor::new());
194        let executor_ = executor.clone();
195        smol::block_on(executor.run(async move {
196            let cv = Arc::new(CondVar::new());
197
198            let cv2 = cv.clone();
199            let cv3 = cv.clone();
200            let t1 = executor_.spawn(async move { cv2.wait().await });
201            let t2 = executor_.spawn(async move { cv3.wait().await });
202
203            // Allow above code to continue
204            cv.notify();
205
206            t1.await;
207            t2.await;
208        }))
209    }
210
211    #[test]
212    fn condvar_wait_after_notify() {
213        let executor = Arc::new(Executor::new());
214        let executor_ = executor.clone();
215        smol::block_on(executor.run(async move {
216            let cv = Arc::new(CondVar::new());
217
218            let cv2 = cv.clone();
219            let t = executor_.spawn(async move { cv2.wait().await });
220
221            cv.notify();
222            t.await;
223
224            // Should complete immediately
225            let cv2 = cv.clone();
226            let t = executor_.spawn(async move { cv2.wait().await });
227            t.await;
228        }))
229    }
230
231    #[test]
232    fn condvar_drop() {
233        let executor = Arc::new(Executor::new());
234        let executor_ = executor.clone();
235        smol::block_on(executor.run(async move {
236            let cv = Arc::new(CondVar::new());
237
238            let cv_ = cv.clone();
239            let t = executor_.spawn(async move {
240                select! {
241                    () = cv_.wait().fuse() => (),
242                    () = (async {}).fuse() => ()
243                }
244
245                // The above future was dropped and we make a new one
246                cv_.wait().await
247            });
248
249            // Allow above code to continue
250            cv.notify();
251            t.await;
252        }))
253    }
254}