darkfi/net/
metering.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::collections::VecDeque;
20
21use tracing::debug;
22
23use crate::util::time::NanoTimestamp;
24
25/// Struct representing metering configuration parameters.
26#[derive(Clone, Debug)]
27pub struct MeteringConfiguration {
28    /// Defines the threshold after which rate limit kicks in.
29    /// Set to 0 for no threshold.
30    ///
31    /// If we don't use raw count as our metric, it should be calculated
32    /// by multiplying the median increase of the measured item with the
33    /// "max" number of items we want before rate limit starts.
34    /// For example, if we measure some item that increases our total
35    /// measurement by ~5 and want to rate limit after about 10, this
36    /// should be set as 50.
37    pub threshold: u64,
38    /// Sleep time for each unit over the threshold, in milliseconds.
39    ///
40    /// This is used to calculate sleep time when ratelimit is active.
41    /// The computed sleep time when we are over the threshold will be:
42    ///     sleep_time = (total - threshold) * sleep_step
43    pub sleep_step: u64,
44    /// Parameter defining the expiration of each item, for time based
45    /// decay, in nano seconds. Set to 0 for no expiration.
46    pub expiry_time: NanoTimestamp,
47}
48
49impl MeteringConfiguration {
50    /// Generate a new `MeteringConfiguration` for provided threshold,
51    /// sleep step and expiration time (seconds).
52    pub fn new(threshold: u64, sleep_step: u64, expiry_time: u128) -> Self {
53        Self { threshold, sleep_step, expiry_time: NanoTimestamp::from_secs(expiry_time) }
54    }
55}
56
57impl Default for MeteringConfiguration {
58    fn default() -> Self {
59        Self { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) }
60    }
61}
62
63/// Default `MeteringConfiguration` as a constant,
64/// so it can be used in trait macros.
65pub const DEFAULT_METERING_CONFIGURATION: MeteringConfiguration =
66    MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) };
67
68/// Struct to keep track of some sequential metered actions and compute
69/// rate limits.
70///
71/// The queue uses a time based decay and prunes metering information
72/// after corresponding expiration time has passed.
73#[derive(Debug)]
74pub struct MeteringQueue {
75    /// Metering configuration of the queue.
76    config: MeteringConfiguration,
77    /// Ring buffer keeping track of action execution timestamp and
78    /// its metered value.
79    queue: VecDeque<(NanoTimestamp, u64)>,
80}
81
82impl MeteringQueue {
83    /// Generate a new `MeteringQueue` for provided `MeteringConfiguration`.
84    pub fn new(config: MeteringConfiguration) -> Self {
85        Self { config, queue: VecDeque::new() }
86    }
87
88    /// Prune expired metering information from the queue.
89    pub fn clean(&mut self) {
90        // Check if expiration has been set
91        if self.config.expiry_time.0 == 0 {
92            return
93        }
94
95        // Iterate the queue to cleanup expired elements
96        while let Some((ts, _)) = self.queue.front() {
97            // This is an edge case where system reports a future timestamp
98            // therefore elapsed computation fails.
99            let Ok(elapsed) = ts.elapsed() else {
100                debug!(target: "net::metering::MeteringQueue::clean", "Timestamp [{ts}] is in future. Removing...");
101                let _ = self.queue.pop_front();
102                continue
103            };
104
105            // Check if elapsed time is over the expiration limit
106            if elapsed < self.config.expiry_time {
107                break
108            }
109
110            // Remove element
111            let _ = self.queue.pop_front();
112        }
113    }
114
115    /// Add new metering value to the queue, after
116    /// prunning expired metering information.
117    /// If no threshold has been set, the insert is
118    /// ignored.
119    pub fn push(&mut self, value: &u64) {
120        // Check if threshold has been set
121        if self.config.threshold == 0 {
122            return
123        }
124
125        // Prune expired elements
126        self.clean();
127
128        // Push the new value
129        self.queue.push_back((NanoTimestamp::current_time(), *value));
130    }
131
132    /// Compute the current metered values total.
133    pub fn total(&self) -> u64 {
134        let mut total = 0;
135        for (_, value) in &self.queue {
136            total += value;
137        }
138        total
139    }
140
141    /// Compute sleep time for current metered values total, based on
142    /// the metering configuration.
143    ///
144    /// The sleep time increases linearly, based on configuration sleep
145    /// step. For example, in a raw count metering model, if we set the
146    /// configuration with threshold = 6 and sleep_step = 250, when
147    /// total = 10, returned sleep time will be 1000 ms.
148    ///
149    /// Sleep times table for the above example:
150    ///
151    /// | Total | Sleep Time (ms) |
152    /// |-------|-----------------|
153    /// | 0     | 0               |
154    /// | 4     | 0               |
155    /// | 6     | 0               |
156    /// | 7     | 250             |
157    /// | 8     | 500             |
158    /// | 9     | 750             |
159    /// | 10    | 1000            |
160    /// | 14    | 2000            |
161    /// | 18    | 3000            |
162    pub fn sleep_time(&self) -> Option<u64> {
163        // Check if threshold has been set
164        if self.config.threshold == 0 {
165            return None
166        }
167
168        // Check if we are over the threshold
169        let total = self.total();
170        if total < self.config.threshold {
171            return None
172        }
173
174        // Compute the actual sleep time
175        Some((total - self.config.threshold) * self.config.sleep_step)
176    }
177}
178
179#[test]
180fn test_net_metering_queue_default() {
181    let mut queue = MeteringQueue::new(MeteringConfiguration::default());
182    for _ in 0..100 {
183        queue.push(&1);
184        assert!(queue.queue.is_empty());
185        assert_eq!(queue.total(), 0);
186        assert!(queue.sleep_time().is_none());
187    }
188}
189
190#[test]
191fn test_net_metering_queue_raw_count() {
192    let threshold = 6;
193    let sleep_step = 250;
194    let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0);
195    let mut queue = MeteringQueue::new(metering_configuration);
196    for i in 1..threshold {
197        queue.push(&1);
198        assert_eq!(queue.total(), i);
199        assert!(queue.sleep_time().is_none());
200    }
201    for i in threshold..100 {
202        queue.push(&1);
203        assert_eq!(queue.total(), i);
204        assert_eq!(queue.sleep_time(), Some((i - threshold) * sleep_step));
205    }
206}
207
208#[test]
209fn test_net_metering_queue_sleep_time() {
210    let metered_value_median = 5;
211    let threshold_items = 10;
212    let threshold = metered_value_median * threshold_items;
213    let sleep_step = 50;
214    let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0);
215    let mut queue = MeteringQueue::new(metering_configuration);
216    for i in 1..threshold_items {
217        queue.push(&metered_value_median);
218        assert_eq!(queue.total(), (i * metered_value_median));
219        assert!(queue.sleep_time().is_none());
220    }
221    for i in threshold_items..100 {
222        queue.push(&metered_value_median);
223        let expected_total = i * metered_value_median;
224        assert_eq!(queue.total(), expected_total);
225        assert_eq!(queue.sleep_time(), Some((expected_total - threshold) * sleep_step));
226    }
227}