1use std::{
20 collections::{BTreeMap, HashSet, VecDeque},
21 slice,
22 sync::{
23 atomic::{AtomicUsize, Ordering::SeqCst},
24 Arc,
25 },
26};
27
28use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
29use smol::Executor;
30use tracing::{debug, error, trace, warn};
31
32use super::{Event, EventGraphPtr, NULL_ID};
33use crate::{
34 impl_p2p_message,
35 net::{
36 metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
37 ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr,
38 ProtocolJobsManager, ProtocolJobsManagerPtr,
39 },
40 system::msleep,
41 util::time::NanoTimestamp,
42 Error, Result,
43};
44
45const MALICIOUS_THRESHOLD: usize = 5;
48
49const WINDOW_MAXSIZE: usize = 200;
51const WINDOW_EXPIRY_TIME: NanoTimestamp = NanoTimestamp::from_secs(60);
53
54const RATELIMIT_EXPIRY_TIME: NanoTimestamp = NanoTimestamp::from_secs(10);
56const RATELIMIT_MIN_COUNT: usize = 6;
58const RATELIMIT_SAMPLE_IDX: usize = 10;
60const RATELIMIT_SAMPLE_SLEEP: usize = 1000;
62
63struct MovingWindow {
64 times: VecDeque<NanoTimestamp>,
65 expiry_time: NanoTimestamp,
66}
67
68impl MovingWindow {
69 fn new(expiry_time: NanoTimestamp) -> Self {
70 Self { times: VecDeque::new(), expiry_time }
71 }
72
73 fn clean(&mut self) {
75 while let Some(ts) = self.times.front() {
76 let Ok(elapsed) = ts.elapsed() else {
77 debug!(target: "event_graph::protocol::MovingWindow::clean", "Timestamp [{ts}] is in future. Removing...");
78 let _ = self.times.pop_front();
79 continue
80 };
81 if elapsed < self.expiry_time {
82 break
83 }
84 let _ = self.times.pop_front();
85 }
86 }
87
88 fn ticktock(&mut self) {
90 self.clean();
91 self.times.push_back(NanoTimestamp::current_time());
92 }
93
94 #[inline]
95 fn count(&self) -> usize {
96 self.times.len()
97 }
98}
99
100pub struct ProtocolEventGraph {
102 channel: ChannelPtr,
104 event_graph: EventGraphPtr,
106 ev_put_sub: MessageSubscription<EventPut>,
108 ev_req_sub: MessageSubscription<EventReq>,
110 ev_rep_sub: MessageSubscription<EventRep>,
112 tip_req_sub: MessageSubscription<TipReq>,
114 _tip_rep_sub: MessageSubscription<TipRep>,
116 malicious_count: AtomicUsize,
118 jobsman: ProtocolJobsManagerPtr,
120 broadcaster_push: smol::channel::Sender<EventPut>,
123 broadcaster_pull: smol::channel::Receiver<EventPut>,
125}
126
127#[derive(Clone, SerialEncodable, SerialDecodable)]
129pub struct EventPut(pub Event);
130impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
131
132#[derive(Clone, SerialEncodable, SerialDecodable)]
134pub struct EventReq(pub Vec<blake3::Hash>);
135impl_p2p_message!(EventReq, "EventGraph::EventReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
136
137#[derive(Clone, SerialEncodable, SerialDecodable)]
139pub struct EventRep(pub Vec<Event>);
140impl_p2p_message!(EventRep, "EventGraph::EventRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
141
142#[derive(Clone, SerialEncodable, SerialDecodable)]
144pub struct TipReq {}
145impl_p2p_message!(TipReq, "EventGraph::TipReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
146
147#[derive(Clone, SerialEncodable, SerialDecodable)]
149pub struct TipRep(pub BTreeMap<u64, HashSet<blake3::Hash>>);
150impl_p2p_message!(TipRep, "EventGraph::TipRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
151
152#[async_trait]
153impl ProtocolBase for ProtocolEventGraph {
154 async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
155 self.jobsman.clone().start(ex.clone());
156 self.jobsman.clone().spawn(self.clone().handle_event_put(), ex.clone()).await;
157 self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await;
158 self.jobsman.clone().spawn(self.clone().handle_tip_req(), ex.clone()).await;
159 self.jobsman.clone().spawn(self.clone().broadcast_rate_limiter(), ex.clone()).await;
160 Ok(())
161 }
162
163 fn name(&self) -> &'static str {
164 "ProtocolEventGraph"
165 }
166}
167
168impl ProtocolEventGraph {
169 pub async fn init(event_graph: EventGraphPtr, channel: ChannelPtr) -> Result<ProtocolBasePtr> {
170 let msg_subsystem = channel.message_subsystem();
171 msg_subsystem.add_dispatch::<EventPut>().await;
172 msg_subsystem.add_dispatch::<EventReq>().await;
173 msg_subsystem.add_dispatch::<EventRep>().await;
174 msg_subsystem.add_dispatch::<TipReq>().await;
175 msg_subsystem.add_dispatch::<TipRep>().await;
176
177 let ev_put_sub = channel.subscribe_msg::<EventPut>().await?;
178 let ev_req_sub = channel.subscribe_msg::<EventReq>().await?;
179 let ev_rep_sub = channel.subscribe_msg::<EventRep>().await?;
180 let tip_req_sub = channel.subscribe_msg::<TipReq>().await?;
181 let _tip_rep_sub = channel.subscribe_msg::<TipRep>().await?;
182
183 let (broadcaster_push, broadcaster_pull) = smol::channel::unbounded();
184
185 Ok(Arc::new(Self {
186 channel: channel.clone(),
187 event_graph,
188 ev_put_sub,
189 ev_req_sub,
190 ev_rep_sub,
191 tip_req_sub,
192 _tip_rep_sub,
193 malicious_count: AtomicUsize::new(0),
194 jobsman: ProtocolJobsManager::new("ProtocolEventGraph", channel.clone()),
195 broadcaster_push,
196 broadcaster_pull,
197 }))
198 }
199
200 async fn increase_malicious_count(self: Arc<Self>) -> Result<()> {
201 let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
202 if malicious_count + 1 == MALICIOUS_THRESHOLD {
203 error!(
204 target: "event_graph::protocol::handle_event_put",
205 "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
206 self.channel.display_address(),
207 );
208 self.channel.stop().await;
209 return Err(Error::ChannelStopped)
210 }
211
212 warn!(
213 target: "event_graph::protocol::handle_event_put",
214 "[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.display_address(),
215 );
216
217 Ok(())
218 }
219
220 async fn handle_event_put(self: Arc<Self>) -> Result<()> {
224 let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME);
226
227 loop {
228 let event = match self.ev_put_sub.receive().await {
229 Ok(v) => v.0.clone(),
230 Err(_) => continue,
231 };
232 trace!(
233 target: "event_graph::protocol::handle_event_put",
234 "Got EventPut: {} [{}]", event.id(), self.channel.display_address(),
235 );
236
237 if !*self.event_graph.synced.read().await {
239 debug!(
240 target: "event_graph::protocol::handle_event_put",
241 "DAG is still syncing, skipping..."
242 );
243 continue
244 }
245
246 let event_id = event.id();
248 if self.event_graph.dag.contains_key(event_id.as_bytes()).unwrap() {
249 debug!(
250 target: "event_graph::protocol::handle_event_put",
251 "Event {event_id} is already known"
252 );
253 continue
254 }
255
256 bantimes.ticktock();
259 if bantimes.count() > WINDOW_MAXSIZE {
260 self.channel.ban().await;
261 return Err(Error::MaliciousFlood)
263 }
264
265 let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
278 if event.timestamp < genesis_timestamp {
279 debug!(
280 target: "event_graph::protocol::handle_event_put",
281 "Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{genesis_timestamp}`",
282 event.id(), event.timestamp
283 );
284 }
285
286 if !event.validate_new() {
290 self.clone().increase_malicious_count().await?;
291 continue
292 }
293
294 debug!(
297 target: "event_graph::protocol::handle_event_put",
298 "Event {event_id} is new"
299 );
300
301 let mut missing_parents = HashSet::new();
302 for parent_id in event.parents.iter() {
303 if parent_id == &NULL_ID {
306 continue
307 }
308
309 if !self.event_graph.dag.contains_key(parent_id.as_bytes()).unwrap() {
310 missing_parents.insert(*parent_id);
311 }
312 }
313
314 if !missing_parents.is_empty() {
318 let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
324 let mut received_events_hashes = HashSet::new();
325
326 debug!(
327 target: "event_graph::protocol::handle_event_put",
328 "Event has {} missing parents. Requesting...", missing_parents.len(),
329 );
330
331 while !missing_parents.is_empty() {
332 debug!(
334 target: "event_graph::protocol::handle_event_put",
335 "Requesting {missing_parents:?}..."
336 );
337
338 self.channel
339 .send(&EventReq(missing_parents.clone().into_iter().collect()))
340 .await?;
341
342 let outbound_connect_timeout = self
343 .event_graph
344 .p2p
345 .settings()
346 .read_arc()
347 .await
348 .outbound_connect_timeout(self.channel.address().scheme());
349 let Ok(parents) =
351 self.ev_rep_sub.receive_with_timeout(outbound_connect_timeout).await
352 else {
353 error!(
354 target: "event_graph::protocol::handle_event_put",
355 "[EVENTGRAPH] Timeout while waiting for parents {missing_parents:?} from {}",
356 self.channel.display_address(),
357 );
358 self.channel.stop().await;
359 return Err(Error::ChannelStopped)
360 };
361
362 let parents = parents.0.clone();
363
364 for parent in parents {
365 let parent_id = parent.id();
366 if !missing_parents.contains(&parent_id) {
367 error!(
368 target: "event_graph::protocol::handle_event_put",
369 "[EVENTGRAPH] Peer {} replied with a wrong event: {}",
370 self.channel.display_address(), parent.id(),
371 );
372 self.channel.stop().await;
373 return Err(Error::ChannelStopped)
374 }
375
376 debug!(
377 target: "event_graph::protocol::handle_event_put",
378 "Got correct parent event {}", parent.id(),
379 );
380
381 if let Some(layer_events) = received_events.get_mut(&parent.layer) {
382 layer_events.push(parent.clone());
383 } else {
384 let layer_events = vec![parent.clone()];
385 received_events.insert(parent.layer, layer_events);
386 }
387 received_events_hashes.insert(parent_id);
388
389 missing_parents.remove(&parent_id);
390
391 for upper_parent in parent.parents.iter() {
393 if upper_parent == &NULL_ID {
394 continue
395 }
396
397 if !missing_parents.contains(upper_parent) &&
398 !received_events_hashes.contains(upper_parent) &&
399 !self
400 .event_graph
401 .dag
402 .contains_key(upper_parent.as_bytes())
403 .unwrap()
404 {
405 debug!(
406 target: "event_graph::protocol::handle_event_put",
407 "Found upper missing parent event {upper_parent}"
408 );
409 missing_parents.insert(*upper_parent);
410 }
411 }
412 }
413 } let mut events = vec![];
418 for (_, tips) in received_events {
419 for tip in tips {
420 events.push(tip);
421 }
422 }
423 if self.event_graph.dag_insert(&events).await.is_err() {
424 self.clone().increase_malicious_count().await?;
425 continue
426 }
427 } debug!(
433 target: "event_graph::protocol::handle_event_put",
434 "Got all parents necessary for insertion",
435 );
436 if self.event_graph.dag_insert(slice::from_ref(&event)).await.is_err() {
437 self.clone().increase_malicious_count().await?;
438 continue
439 }
440
441 self.broadcaster_push.send(EventPut(event)).await.expect("push broadcaster closed");
442 }
443 }
444
445 async fn handle_event_req(self: Arc<Self>) -> Result<()> {
448 loop {
449 let event_ids = match self.ev_req_sub.receive().await {
450 Ok(v) => v.0.clone(),
451 Err(_) => continue,
452 };
453 trace!(
454 target: "event_graph::protocol::handle_event_req",
455 "Got EventReq: {event_ids:?} [{}]", self.channel.display_address(),
456 );
457
458 if !*self.event_graph.synced.read().await {
460 debug!(
461 target: "event_graph::protocol::handle_event_req",
462 "DAG is still syncing, skipping..."
463 );
464 continue
465 }
466
467 let mut events = vec![];
479 for event_id in event_ids.iter() {
480 if !self.event_graph.broadcasted_ids.read().await.contains(event_id) {
481 let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
482 if malicious_count + 1 == MALICIOUS_THRESHOLD {
483 error!(
484 target: "event_graph::protocol::handle_event_req",
485 "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
486 self.channel.display_address(),
487 );
488 self.channel.stop().await;
489 return Err(Error::ChannelStopped)
490 }
491
492 warn!(
493 target: "event_graph::protocol::handle_event_req",
494 "[EVENTGRAPH] Peer {} requested an unexpected event {event_id:?}",
495 self.channel.display_address()
496 );
497 continue
498 }
499
500 debug!(
503 target: "event_graph::protocol::handle_event_req",
504 "Fetching event {event_id:?} from DAG"
505 );
506 events.push(self.event_graph.dag_get(event_id).await.unwrap().unwrap());
507 }
508
509 let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
513 let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
514
515 for event in events.iter() {
516 if event.timestamp < genesis_timestamp {
517 error!(
518 target: "event_graph::protocol::handle_event_req",
519 "Requested event by peer {} is older than previous rotation period. It should have been pruned.
520 Event timestamp: `{}`. Genesis timestamp: `{genesis_timestamp}`",
521 event.id(), event.timestamp
522 );
523 }
524
525 for parent_id in event.parents.iter() {
528 if parent_id != &NULL_ID {
529 bcast_ids.insert(*parent_id);
530 }
531 }
532 }
533 drop(bcast_ids);
537
538 self.channel.send(&EventRep(events)).await?;
540 }
541 }
542
543 async fn handle_tip_req(self: Arc<Self>) -> Result<()> {
547 loop {
548 self.tip_req_sub.receive().await?;
549 trace!(
550 target: "event_graph::protocol::handle_tip_req",
551 "Got TipReq [{}]", self.channel.display_address(),
552 );
553
554 if !*self.event_graph.synced.read().await {
556 debug!(
557 target: "event_graph::protocol::handle_tip_req",
558 "DAG is still syncing, skipping..."
559 );
560 continue
561 }
562
563 let layers = self.event_graph.unreferenced_tips.read().await.clone();
568 let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
569 for (_, tips) in layers.iter() {
570 for tip in tips {
571 bcast_ids.insert(*tip);
572 }
573 }
574 drop(bcast_ids);
575
576 self.channel.send(&TipRep(layers)).await?;
577 }
578 }
579
580 async fn broadcast_rate_limiter(self: Arc<Self>) -> Result<()> {
606 let mut ratelimit = MovingWindow::new(RATELIMIT_EXPIRY_TIME);
607
608 loop {
609 let event_put = self.broadcaster_pull.recv().await.expect("pull broadcaster closed");
610
611 ratelimit.ticktock();
612 if ratelimit.count() > RATELIMIT_MIN_COUNT {
613 let sleep_time =
614 ((ratelimit.count() - RATELIMIT_MIN_COUNT) * RATELIMIT_SAMPLE_SLEEP /
615 (RATELIMIT_SAMPLE_IDX - RATELIMIT_MIN_COUNT)) as u64;
616 debug!(
617 target: "event_graph::protocol::broadcast_rate_limiter",
618 "Activated rate limit: sleeping {sleep_time} ms [count={}]",
619 ratelimit.count()
620 );
621 msleep(sleep_time).await;
623 }
624
625 self.event_graph
627 .p2p
628 .broadcast_with_exclude(&event_put, &[self.channel.address().clone()])
629 .await;
630 }
631 }
632}
633
634#[cfg(test)]
635mod test {
636 use super::*;
637 use std::time::UNIX_EPOCH;
638
639 #[test]
640 fn test_eventgraph_moving_window_clean_future() {
641 let mut window = MovingWindow::new(NanoTimestamp::from_secs(60));
642 let future = UNIX_EPOCH.elapsed().unwrap().as_secs() + 100;
643 window.times.push_back(NanoTimestamp::from_secs(future.into()));
644 window.clean();
645 assert_eq!(window.count(), 0);
646 }
647}