darkfi/net/metering.rs
1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::collections::VecDeque;
20
21use tracing::debug;
22
23use crate::util::time::NanoTimestamp;
24
25/// Struct representing metering configuration parameters.
26#[derive(Clone, Debug)]
27pub struct MeteringConfiguration {
28 /// Defines the threshold after which rate limit kicks in.
29 /// Set to 0 for no threshold.
30 ///
31 /// If we don't use raw count as our metric, it should be calculated
32 /// by multiplying the median increase of the measured item with the
33 /// "max" number of items we want before rate limit starts.
34 /// For example, if we measure some item that increases our total
35 /// measurement by ~5 and want to rate limit after about 10, this
36 /// should be set as 50.
37 pub threshold: u64,
38 /// Sleep time for each unit over the threshold, in milliseconds.
39 ///
40 /// This is used to calculate sleep time when ratelimit is active.
41 /// The computed sleep time when we are over the threshold will be:
42 /// sleep_time = (total - threshold) * sleep_step
43 pub sleep_step: u64,
44 /// Parameter defining the expiration of each item, for time based
45 /// decay, in nano seconds. Set to 0 for no expiration.
46 pub expiry_time: NanoTimestamp,
47}
48
49impl MeteringConfiguration {
50 /// Generate a new `MeteringConfiguration` for provided threshold,
51 /// sleep step and expiration time (seconds).
52 pub fn new(threshold: u64, sleep_step: u64, expiry_time: u128) -> Self {
53 Self { threshold, sleep_step, expiry_time: NanoTimestamp::from_secs(expiry_time) }
54 }
55}
56
57impl Default for MeteringConfiguration {
58 fn default() -> Self {
59 Self { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) }
60 }
61}
62
63/// Default `MeteringConfiguration` as a constant,
64/// so it can be used in trait macros.
65pub const DEFAULT_METERING_CONFIGURATION: MeteringConfiguration =
66 MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) };
67
68/// Struct to keep track of some sequential metered actions and compute
69/// rate limits.
70///
71/// The queue uses a time based decay and prunes metering information
72/// after corresponding expiration time has passed.
73#[derive(Debug)]
74pub struct MeteringQueue {
75 /// Metering configuration of the queue.
76 config: MeteringConfiguration,
77 /// Ring buffer keeping track of action execution timestamp and
78 /// its metered value.
79 queue: VecDeque<(NanoTimestamp, u64)>,
80}
81
82impl MeteringQueue {
83 /// Generate a new `MeteringQueue` for provided `MeteringConfiguration`.
84 pub fn new(config: MeteringConfiguration) -> Self {
85 Self { config, queue: VecDeque::new() }
86 }
87
88 /// Prune expired metering information from the queue.
89 pub fn clean(&mut self) {
90 // Check if expiration has been set
91 if self.config.expiry_time.0 == 0 {
92 return
93 }
94
95 // Iterate the queue to cleanup expired elements
96 while let Some((ts, _)) = self.queue.front() {
97 // This is an edge case where system reports a future timestamp
98 // therefore elapsed computation fails.
99 let Ok(elapsed) = ts.elapsed() else {
100 debug!(target: "net::metering::MeteringQueue::clean", "Timestamp [{ts}] is in future. Removing...");
101 let _ = self.queue.pop_front();
102 continue
103 };
104
105 // Check if elapsed time is over the expiration limit
106 if elapsed < self.config.expiry_time {
107 break
108 }
109
110 // Remove element
111 let _ = self.queue.pop_front();
112 }
113 }
114
115 /// Add new metering value to the queue, after
116 /// prunning expired metering information.
117 /// If no threshold has been set, the insert is
118 /// ignored.
119 pub fn push(&mut self, value: &u64) {
120 // Check if threshold has been set
121 if self.config.threshold == 0 {
122 return
123 }
124
125 // Prune expired elements
126 self.clean();
127
128 // Push the new value
129 self.queue.push_back((NanoTimestamp::current_time(), *value));
130 }
131
132 /// Compute the current metered values total.
133 pub fn total(&self) -> u64 {
134 let mut total = 0;
135 for (_, value) in &self.queue {
136 total += value;
137 }
138 total
139 }
140
141 /// Compute sleep time for current metered values total, based on
142 /// the metering configuration.
143 ///
144 /// The sleep time increases linearly, based on configuration sleep
145 /// step. For example, in a raw count metering model, if we set the
146 /// configuration with threshold = 6 and sleep_step = 250, when
147 /// total = 10, returned sleep time will be 1000 ms.
148 ///
149 /// Sleep times table for the above example:
150 ///
151 /// | Total | Sleep Time (ms) |
152 /// |-------|-----------------|
153 /// | 0 | 0 |
154 /// | 4 | 0 |
155 /// | 6 | 0 |
156 /// | 7 | 250 |
157 /// | 8 | 500 |
158 /// | 9 | 750 |
159 /// | 10 | 1000 |
160 /// | 14 | 2000 |
161 /// | 18 | 3000 |
162 pub fn sleep_time(&self) -> Option<u64> {
163 // Check if threshold has been set
164 if self.config.threshold == 0 {
165 return None
166 }
167
168 // Check if we are over the threshold
169 let total = self.total();
170 if total < self.config.threshold {
171 return None
172 }
173
174 // Compute the actual sleep time
175 Some((total - self.config.threshold) * self.config.sleep_step)
176 }
177}
178
179#[test]
180fn test_net_metering_queue_default() {
181 let mut queue = MeteringQueue::new(MeteringConfiguration::default());
182 for _ in 0..100 {
183 queue.push(&1);
184 assert!(queue.queue.is_empty());
185 assert_eq!(queue.total(), 0);
186 assert!(queue.sleep_time().is_none());
187 }
188}
189
190#[test]
191fn test_net_metering_queue_raw_count() {
192 let threshold = 6;
193 let sleep_step = 250;
194 let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0);
195 let mut queue = MeteringQueue::new(metering_configuration);
196 for i in 1..threshold {
197 queue.push(&1);
198 assert_eq!(queue.total(), i);
199 assert!(queue.sleep_time().is_none());
200 }
201 for i in threshold..100 {
202 queue.push(&1);
203 assert_eq!(queue.total(), i);
204 assert_eq!(queue.sleep_time(), Some((i - threshold) * sleep_step));
205 }
206}
207
208#[test]
209fn test_net_metering_queue_sleep_time() {
210 let metered_value_median = 5;
211 let threshold_items = 10;
212 let threshold = metered_value_median * threshold_items;
213 let sleep_step = 50;
214 let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0);
215 let mut queue = MeteringQueue::new(metering_configuration);
216 for i in 1..threshold_items {
217 queue.push(&metered_value_median);
218 assert_eq!(queue.total(), (i * metered_value_median));
219 assert!(queue.sleep_time().is_none());
220 }
221 for i in threshold_items..100 {
222 queue.push(&metered_value_median);
223 let expected_total = i * metered_value_median;
224 assert_eq!(queue.total(), expected_total);
225 assert_eq!(queue.sleep_time(), Some((expected_total - threshold) * sleep_step));
226 }
227}