darkfi/system/condvar.rs
1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20 future::Future,
21 pin::Pin,
22 sync::Mutex,
23 task::{Context, Poll, Waker},
24};
25
26/// Condition variables allow you to block a task while waiting for an event to occur.
27/// Condition variables are typically associated with a boolean predicate (a condition).
28/// ```rust
29/// let cv = Arc::new(CondVar::new());
30///
31/// let cv_ = cv.clone();
32/// executor_
33/// .spawn(async move {
34/// // Waits here until notify() is called
35/// cv_.wait().await;
36/// // Check for some condition...
37/// })
38/// .detach();
39///
40/// // Allow above code to continue
41/// cv.notify();
42/// ```
43/// After the condition variable is woken up, the user may `wait` again for another `notify`
44/// signal by first calling `cv_.reset()`.
45pub struct CondVar {
46 state: Mutex<CondVarState>,
47}
48
49struct CondVarState {
50 is_awake: bool,
51 wakers: Vec<Waker>,
52}
53
54impl CondVar {
55 pub const fn new() -> Self {
56 Self { state: Mutex::new(CondVarState { is_awake: false, wakers: Vec::new() }) }
57 }
58
59 /// Wakeup the waiting task. Subsequent calls to this do nothing until `wait()` is called.
60 pub fn notify(&self) {
61 let wakers = {
62 let mut state = self.state.lock().unwrap();
63 state.is_awake = true;
64 std::mem::take(&mut state.wakers)
65 };
66 // Notify the executor that the pending future from wait() is to be polled again.
67 for waker in wakers {
68 waker.wake()
69 }
70 }
71
72 /// Reset the condition variable and wait for a notification
73 pub fn wait(&self) -> CondVarWait<'_> {
74 CondVarWait { state: &self.state }
75 }
76
77 /// Reset self ready to wait() again.
78 /// The reason this is separate from `wait()` is that usually
79 /// on the first `wait()` we want to catch any `notify()` calls that
80 /// happened before we started. For example,
81 /// ```rust
82 /// loop {
83 /// // Wait for signal
84 /// cv.wait().await;
85 ///
86 /// // Do stuff...
87 ///
88 /// cv.reset();
89 /// }
90 /// ```
91 pub fn reset(&self) {
92 let mut state = self.state.lock().unwrap();
93 state.is_awake = false;
94 }
95}
96
97impl Default for CondVar {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103/// Awaitable futures object returned by `condvar.wait()`
104pub struct CondVarWait<'a> {
105 state: &'a Mutex<CondVarState>,
106}
107
108impl Future for CondVarWait<'_> {
109 type Output = ();
110
111 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
112 let mut state = self.state.lock().unwrap();
113
114 if state.is_awake {
115 return Poll::Ready(())
116 }
117
118 let cx_waker = cx.waker();
119
120 // Do we have any wakers in the list?
121 if !state.wakers.iter().any(|w| cx_waker.will_wake(w)) {
122 // Add our waker
123 state.wakers.push(cx_waker.clone())
124 }
125
126 Poll::Pending
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use futures::{select, FutureExt};
134 use smol::Executor;
135 use std::sync::Arc;
136
137 #[test]
138 fn condvar_test() {
139 let executor = Arc::new(Executor::new());
140 let executor_ = executor.clone();
141 smol::block_on(executor.run(async move {
142 let cv = Arc::new(CondVar::new());
143
144 let cv_ = cv.clone();
145 let t = executor_.spawn(async move {
146 // Waits here until notify() is called
147 cv_.wait().await;
148 });
149
150 // Allow above code to continue
151 cv.notify();
152
153 t.await;
154 }))
155 }
156
157 #[test]
158 fn condvar_reset() {
159 let executor = Arc::new(Executor::new());
160 let executor_ = executor.clone();
161 smol::block_on(executor.run(async move {
162 let cv = Arc::new(CondVar::new());
163
164 let cv_ = cv.clone();
165 let t = executor_.spawn(async move {
166 cv_.wait().await;
167 });
168
169 // #1 send signal
170 cv.notify();
171 // Multiple calls to notify do nothing until we call reset()
172 cv.notify();
173
174 t.await;
175
176 // Without calling reset(), then the wait() will return instantly
177 cv.reset();
178
179 let cv_ = cv.clone();
180 let t = executor_.spawn(async move {
181 cv_.wait().await;
182 });
183
184 // #2 send signal again
185 cv.notify();
186
187 t.await;
188 }))
189 }
190
191 #[test]
192 fn condvar_double_wait() {
193 let executor = Arc::new(Executor::new());
194 let executor_ = executor.clone();
195 smol::block_on(executor.run(async move {
196 let cv = Arc::new(CondVar::new());
197
198 let cv2 = cv.clone();
199 let cv3 = cv.clone();
200 let t1 = executor_.spawn(async move { cv2.wait().await });
201 let t2 = executor_.spawn(async move { cv3.wait().await });
202
203 // Allow above code to continue
204 cv.notify();
205
206 t1.await;
207 t2.await;
208 }))
209 }
210
211 #[test]
212 fn condvar_wait_after_notify() {
213 let executor = Arc::new(Executor::new());
214 let executor_ = executor.clone();
215 smol::block_on(executor.run(async move {
216 let cv = Arc::new(CondVar::new());
217
218 let cv2 = cv.clone();
219 let t = executor_.spawn(async move { cv2.wait().await });
220
221 cv.notify();
222 t.await;
223
224 // Should complete immediately
225 let cv2 = cv.clone();
226 let t = executor_.spawn(async move { cv2.wait().await });
227 t.await;
228 }))
229 }
230
231 #[test]
232 fn condvar_drop() {
233 let executor = Arc::new(Executor::new());
234 let executor_ = executor.clone();
235 smol::block_on(executor.run(async move {
236 let cv = Arc::new(CondVar::new());
237
238 let cv_ = cv.clone();
239 let t = executor_.spawn(async move {
240 select! {
241 () = cv_.wait().fuse() => (),
242 () = (async {}).fuse() => ()
243 }
244
245 // The above future was dropped and we make a new one
246 cv_.wait().await
247 });
248
249 // Allow above code to continue
250 cv.notify();
251 t.await;
252 }))
253 }
254}