1use std::{
20 collections::{BTreeMap, HashSet, VecDeque},
21 slice,
22 str::FromStr,
23 sync::{
24 atomic::{AtomicUsize, Ordering::SeqCst},
25 Arc,
26 },
27};
28
29use darkfi_sdk::{
30 crypto::{poseidon_hash, util::FieldElemAsStr},
31 pasta::pallas,
32};
33use darkfi_serial::{
34 async_trait, deserialize_async_partial, serialize_async, SerialDecodable, SerialEncodable,
35};
36use smol::Executor;
37use tracing::{debug, error, info, trace, warn};
38
39use super::{
40 event::Header,
41 rln::{closest_epoch, create_slash_proof, hash_event, sss_recover, MessageMetadata, RLNNode},
42 Event, EventGraphPtr, LayerUTips, NULL_ID,
43};
44use crate::{
45 event_graph::rln::{read_register_vk, read_signal_vk, read_slash_pk, read_slash_vk, Blob},
46 impl_p2p_message,
47 net::{
48 metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
49 ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr,
50 ProtocolJobsManager, ProtocolJobsManagerPtr,
51 },
52 system::msleep,
53 util::time::NanoTimestamp,
54 zk::Proof,
55 Error, Result,
56};
57
58const MALICIOUS_THRESHOLD: usize = 5;
61
62const WINDOW_MAXSIZE: usize = 200;
64const WINDOW_EXPIRY_TIME: NanoTimestamp = NanoTimestamp::from_secs(60);
66
67const RATELIMIT_EXPIRY_TIME: NanoTimestamp = NanoTimestamp::from_secs(10);
69const RATELIMIT_MIN_COUNT: usize = 6;
71const RATELIMIT_SAMPLE_IDX: usize = 10;
73const RATELIMIT_SAMPLE_SLEEP: usize = 1000;
75
76struct MovingWindow {
77 times: VecDeque<NanoTimestamp>,
78 expiry_time: NanoTimestamp,
79}
80
81impl MovingWindow {
82 fn new(expiry_time: NanoTimestamp) -> Self {
83 Self { times: VecDeque::new(), expiry_time }
84 }
85
86 fn clean(&mut self) {
88 while let Some(ts) = self.times.front() {
89 let Ok(elapsed) = ts.elapsed() else {
90 debug!(target: "event_graph::protocol::MovingWindow::clean", "Timestamp [{ts}] is in future. Removing...");
91 let _ = self.times.pop_front();
92 continue
93 };
94 if elapsed < self.expiry_time {
95 break
96 }
97 let _ = self.times.pop_front();
98 }
99 }
100
101 fn ticktock(&mut self) {
103 self.clean();
104 self.times.push_back(NanoTimestamp::current_time());
105 }
106
107 #[inline]
108 fn count(&self) -> usize {
109 self.times.len()
110 }
111}
112
113pub struct ProtocolEventGraph {
115 channel: ChannelPtr,
117 event_graph: EventGraphPtr,
119 ev_put_sub: MessageSubscription<EventPut>,
121 st_put_sub: MessageSubscription<StaticPut>,
123 ev_req_sub: MessageSubscription<EventReq>,
125 ev_rep_sub: MessageSubscription<EventRep>,
127 _hdr_put_sub: MessageSubscription<HeaderPut>,
129 hdr_req_sub: MessageSubscription<HeaderReq>,
131 _hdr_rep_sub: MessageSubscription<HeaderRep>,
133 tip_req_sub: MessageSubscription<TipReq>,
135 _tip_rep_sub: MessageSubscription<TipRep>,
137 malicious_count: AtomicUsize,
139 jobsman: ProtocolJobsManagerPtr,
141 broadcaster_push: smol::channel::Sender<EventPut>,
144 broadcaster_pull: smol::channel::Receiver<EventPut>,
146}
147
148#[derive(Clone, SerialEncodable, SerialDecodable)]
150pub struct EventPut(pub Event, pub Vec<u8>);
151impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
152
153#[derive(Clone, SerialEncodable, SerialDecodable)]
156pub struct StaticPut(pub Event, pub Vec<u8>);
157impl_p2p_message!(StaticPut, "EventGraph::StaticPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
158
159#[derive(Clone, SerialEncodable, SerialDecodable)]
161pub struct EventReq(pub Vec<blake3::Hash>);
162impl_p2p_message!(EventReq, "EventGraph::EventReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
163
164#[derive(Clone, SerialEncodable, SerialDecodable)]
166pub struct EventRep(pub Vec<Event>);
167impl_p2p_message!(EventRep, "EventGraph::EventRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
168
169#[derive(Clone, SerialEncodable, SerialDecodable)]
171pub struct HeaderPut(pub Header);
172impl_p2p_message!(HeaderPut, "EventGraph::HeaderPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
173
174#[derive(Clone, SerialEncodable, SerialDecodable)]
176pub struct HeaderReq(pub String, pub LayerUTips);
177impl_p2p_message!(HeaderReq, "EventGraph::HeaderReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
178
179#[derive(Clone, SerialEncodable, SerialDecodable)]
181pub struct HeaderRep(pub Vec<Header>);
182impl_p2p_message!(HeaderRep, "EventGraph::HeaderRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
183
184#[derive(Clone, SerialEncodable, SerialDecodable)]
186pub struct TipReq(pub String);
187impl_p2p_message!(TipReq, "EventGraph::TipReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
188
189#[derive(Clone, SerialEncodable, SerialDecodable)]
191pub struct TipRep(pub LayerUTips);
192impl_p2p_message!(TipRep, "EventGraph::TipRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
193
194#[async_trait]
195impl ProtocolBase for ProtocolEventGraph {
196 async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
197 self.jobsman.clone().start(ex.clone());
198 self.jobsman.clone().spawn(self.clone().handle_event_put(), ex.clone()).await;
199 self.jobsman.clone().spawn(self.clone().handle_static_put(), ex.clone()).await;
200 self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await;
201 self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await;
204 self.jobsman.clone().spawn(self.clone().handle_tip_req(), ex.clone()).await;
205 self.jobsman.clone().spawn(self.clone().broadcast_rate_limiter(), ex.clone()).await;
206 Ok(())
207 }
208
209 fn name(&self) -> &'static str {
210 "ProtocolEventGraph"
211 }
212}
213
214impl ProtocolEventGraph {
215 pub async fn init(event_graph: EventGraphPtr, channel: ChannelPtr) -> Result<ProtocolBasePtr> {
216 let msg_subsystem = channel.message_subsystem();
217 msg_subsystem.add_dispatch::<EventPut>().await;
218 msg_subsystem.add_dispatch::<StaticPut>().await;
219 msg_subsystem.add_dispatch::<EventReq>().await;
220 msg_subsystem.add_dispatch::<EventRep>().await;
221 msg_subsystem.add_dispatch::<HeaderPut>().await;
222 msg_subsystem.add_dispatch::<HeaderReq>().await;
223 msg_subsystem.add_dispatch::<HeaderRep>().await;
224 msg_subsystem.add_dispatch::<TipReq>().await;
225 msg_subsystem.add_dispatch::<TipRep>().await;
226
227 let ev_put_sub = channel.subscribe_msg::<EventPut>().await?;
228 let st_put_sub = channel.subscribe_msg::<StaticPut>().await?;
229 let ev_req_sub = channel.subscribe_msg::<EventReq>().await?;
230 let ev_rep_sub = channel.subscribe_msg::<EventRep>().await?;
231 let _hdr_put_sub = channel.subscribe_msg::<HeaderPut>().await?;
232 let hdr_req_sub = channel.subscribe_msg::<HeaderReq>().await?;
233 let _hdr_rep_sub = channel.subscribe_msg::<HeaderRep>().await?;
234 let tip_req_sub = channel.subscribe_msg::<TipReq>().await?;
235 let _tip_rep_sub = channel.subscribe_msg::<TipRep>().await?;
236
237 let (broadcaster_push, broadcaster_pull) = smol::channel::unbounded();
238
239 Ok(Arc::new(Self {
240 channel: channel.clone(),
241 event_graph,
242 ev_put_sub,
243 st_put_sub,
244 ev_req_sub,
245 ev_rep_sub,
246 _hdr_put_sub,
247 hdr_req_sub,
248 _hdr_rep_sub,
249 tip_req_sub,
250 _tip_rep_sub,
251 malicious_count: AtomicUsize::new(0),
252 jobsman: ProtocolJobsManager::new("ProtocolEventGraph", channel.clone()),
253 broadcaster_push,
254 broadcaster_pull,
255 }))
256 }
257
258 async fn increase_malicious_count(self: Arc<Self>) -> Result<()> {
259 let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
260 if malicious_count + 1 == MALICIOUS_THRESHOLD {
261 error!(
262 target: "event_graph::protocol::handle_event_put",
263 "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
264 self.channel.display_address(),
265 );
266 self.channel.stop().await;
267 return Err(Error::ChannelStopped)
268 }
269
270 warn!(
271 target: "event_graph::protocol::handle_event_put",
272 "[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.display_address(),
273 );
274
275 Ok(())
276 }
277
278 async fn handle_event_put(self: Arc<Self>) -> Result<()> {
282 let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME);
284 let mut metadata = MessageMetadata::new();
285 let mut current_epoch = 0;
286
287 loop {
288 let (event, blob) = match self.ev_put_sub.receive().await {
289 Ok(v) => (v.0.clone(), v.1.clone()),
290 Err(_) => continue,
291 };
292 trace!(
293 target: "event_graph::protocol::handle_event_put",
294 "Got EventPut: {} [{}]", event.id(), self.channel.display_address(),
295 );
296
297 if !*self.event_graph.synced.read().await {
299 debug!(
300 target: "event_graph::protocol::handle_event_put",
301 "DAG is still syncing, skipping..."
302 );
303 continue
304 }
305
306 let mut verification_failed = false;
307 #[allow(clippy::never_loop)]
308 loop {
309 if blob.is_empty() {
310 break
311 }
312 let rcvd_blob: Blob = match deserialize_async_partial(&blob).await {
313 Ok((v, _)) => v,
314 Err(e) => {
315 error!(target: "event_graph::protocol::handle_event_put()","[EVENTGRAPH] Failed deserializing event ephemeral data: {}", e);
316 break
317 }
318 };
319
320 if current_epoch != closest_epoch(event.header.timestamp) {
322 metadata = MessageMetadata::new()
323 }
324
325 let rln_app_identifier = pallas::Base::from(1000);
326 current_epoch = closest_epoch(event.header.timestamp);
327 let epoch = pallas::Base::from(current_epoch);
328 let external_nullifier = poseidon_hash([epoch, rln_app_identifier]);
329 let x = hash_event(&event);
330 let identity_root = self.event_graph.rln_identity_tree.read().await.root();
331 let public_inputs = vec![
332 identity_root,
333 external_nullifier,
334 x,
335 rcvd_blob.y,
336 rcvd_blob.internal_nullifier,
337 ];
338
339 if metadata.is_duplicate(
340 &external_nullifier,
341 &rcvd_blob.internal_nullifier,
342 &x,
343 &rcvd_blob.y,
344 ) {
345 error!(target: "event_graph::protocol::handle_event_put()", "[RLN] Duplicate Message!");
346 verification_failed = true;
347 break
348 }
349
350 if metadata.is_reused(&external_nullifier, &rcvd_blob.internal_nullifier) {
351 info!(target: "event_graph::protocol::handle_event_put()", "[RLN] Metadata is reused.. slashing..");
352 let shares =
353 metadata.get_shares(&external_nullifier, &rcvd_blob.internal_nullifier);
354 let secret = sss_recover(&shares);
355
356 let slash_pk = read_slash_pk(&self.event_graph.sled_db)?;
358 let mut identity_tree = self.event_graph.rln_identity_tree.write().await;
360
361 info!("[RLN] Creating slashing proof");
362 let (proof, identity_root) = match create_slash_proof(
363 secret,
364 rcvd_blob.user_msg_limit,
365 &mut identity_tree,
366 &slash_pk,
367 ) {
368 Ok(v) => v,
369 Err(e) => {
370 error!("[RLN] Failed creating RLN slash proof: {}", e);
371 (Proof::new(vec![]), pallas::Base::from(0))
373 }
374 };
375 drop(identity_tree);
376
377 let blob =
378 serialize_async(&(proof, secret, rcvd_blob.user_msg_limit, identity_root))
379 .await;
380
381 let evgr = &self.event_graph;
382 let identity_secret_hash =
383 poseidon_hash([secret, rcvd_blob.user_msg_limit.into()]);
384 let identity_commitment = poseidon_hash([identity_secret_hash]);
385 let rln_commitment = RLNNode::Slashing(identity_commitment);
386 let st_event =
387 Event::new_static(serialize_async(&rln_commitment).await, evgr).await;
388 evgr.static_insert(&st_event).await?;
389 evgr.static_broadcast(st_event, blob).await?;
390
391 verification_failed = true;
392 break
393 }
394
395 metadata.add_share(
397 external_nullifier,
398 rcvd_blob.internal_nullifier,
399 x,
400 rcvd_blob.y,
401 )?;
402
403 info!(target: "event_graph::protocol::handle_event_put()", "[RLN] Verifying incoming Event RLN proof");
404 let signal_vk = read_signal_vk(&self.event_graph.sled_db)?;
405 verification_failed = rcvd_blob.proof.verify(&signal_vk, &public_inputs).is_err();
406
407 break
408 }
409
410 if verification_failed {
411 error!(target: "event_graph::protocol::handle_event_put()", "[RLN] Incoming Event RLN Signaling proof verification failed");
412 continue
413 }
414
415 _ = self.ev_rep_sub.clean().await;
417
418 let current_genesis = self.event_graph.current_genesis.read().await;
420 let genesis_timestamp = current_genesis.header.timestamp;
421 let dag_name = genesis_timestamp.to_string();
422 let hdr_tree_name = format!("headers_{dag_name}");
423 let event_id = event.id();
424 if self
425 .event_graph
426 .dag_store
427 .read()
428 .await
429 .get_dag(&hdr_tree_name)
430 .contains_key(event_id.as_bytes())
431 .unwrap()
432 {
433 debug!(
434 target: "event_graph::protocol::handle_event_put",
435 "Event {event_id} is already known"
436 );
437 continue
438 }
439
440 bantimes.ticktock();
443 if bantimes.count() > WINDOW_MAXSIZE {
444 self.channel.ban().await;
445 return Err(Error::MaliciousFlood)
447 }
448
449 if event.header.timestamp < genesis_timestamp {
462 debug!(
463 target: "event_graph::protocol::handle_event_put",
464 "Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{genesis_timestamp}`",
465 event.id(), event.header.timestamp
466 );
467 }
468
469 if !event.validate_new() {
473 self.clone().increase_malicious_count().await?;
474 continue
475 }
476
477 debug!(
480 target: "event_graph::protocol::handle_event_put",
481 "Event {event_id} is new"
482 );
483
484 let mut missing_parents = HashSet::new();
485 for parent_id in event.header.parents.iter() {
486 if parent_id == &NULL_ID {
489 continue
490 }
491
492 if !self
493 .event_graph
494 .dag_store
495 .read()
496 .await
497 .get_dag(&hdr_tree_name)
498 .contains_key(parent_id.as_bytes())
499 .unwrap()
500 {
501 missing_parents.insert(*parent_id);
502 }
503 }
504
505 if !missing_parents.is_empty() {
509 let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
515 let mut received_events_hashes = HashSet::new();
516
517 debug!(
518 target: "event_graph::protocol::handle_event_put",
519 "Event has {} missing parents. Requesting...", missing_parents.len(),
520 );
521
522 let current_genesis = self.event_graph.current_genesis.read().await;
523 let dag_name = current_genesis.header.timestamp.to_string();
524 let hdr_tree_name = format!("headers_{dag_name}");
525
526 while !missing_parents.is_empty() {
527 debug!(
529 target: "event_graph::protocol::handle_event_put",
530 "Requesting {missing_parents:?}..."
531 );
532
533 self.channel
534 .send(&EventReq(missing_parents.clone().into_iter().collect()))
535 .await?;
536
537 let Ok(parents) = self
539 .ev_rep_sub
540 .receive_with_timeout(
541 self.event_graph
542 .p2p
543 .settings()
544 .read()
545 .await
546 .outbound_connect_timeout_max(),
547 )
548 .await
549 else {
550 error!(
551 target: "event_graph::protocol::handle_event_put",
552 "[EVENTGRAPH] Timeout while waiting for parents {missing_parents:?} from {}",
553 self.channel.display_address(),
554 );
555 self.channel.stop().await;
556 return Err(Error::ChannelStopped)
557 };
558
559 let parents = parents.0.clone();
560
561 for parent in parents {
562 let parent_id = parent.id();
563 if !missing_parents.contains(&parent_id) {
564 error!(
565 target: "event_graph::protocol::handle_event_put",
566 "[EVENTGRAPH] Peer {} replied with a wrong event: {}",
567 self.channel.display_address(), parent.id(),
568 );
569 self.channel.stop().await;
570 return Err(Error::ChannelStopped)
571 }
572
573 debug!(
574 target: "event_graph::protocol::handle_event_put",
575 "Got correct parent event {}", parent.id(),
576 );
577
578 if let Some(layer_events) = received_events.get_mut(&parent.header.layer) {
579 layer_events.push(parent.clone());
580 } else {
581 let layer_events = vec![parent.clone()];
582 received_events.insert(parent.header.layer, layer_events);
583 }
584 received_events_hashes.insert(parent_id);
585
586 missing_parents.remove(&parent_id);
587
588 for upper_parent in parent.header.parents.iter() {
590 if upper_parent == &NULL_ID {
591 continue
592 }
593
594 if !missing_parents.contains(upper_parent) &&
595 !received_events_hashes.contains(upper_parent) &&
596 !self
597 .event_graph
598 .dag_store
599 .read()
600 .await
601 .get_dag(&hdr_tree_name)
602 .contains_key(upper_parent.as_bytes())
603 .unwrap()
604 {
605 debug!(
606 target: "event_graph::protocol::handle_event_put",
607 "Found upper missing parent event {upper_parent}"
608 );
609 missing_parents.insert(*upper_parent);
610 }
611 }
612 }
613 } let mut events = vec![];
618 for (_, tips) in received_events {
619 for tip in tips {
620 events.push(tip);
621 }
622 }
623 let headers = events.iter().map(|x| x.header.clone()).collect();
624 if self.event_graph.header_dag_insert(headers, &dag_name).await.is_err() {
625 self.clone().increase_malicious_count().await?;
626 continue
627 }
628 if !self.event_graph.fast_mode &&
630 self.event_graph.dag_insert(&events, &dag_name).await.is_err()
631 {
632 self.clone().increase_malicious_count().await?;
633 continue
634 }
635 } debug!(
641 target: "event_graph::protocol::handle_event_put",
642 "Got all parents necessary for insertion",
643 );
644 if self
645 .event_graph
646 .header_dag_insert(vec![event.header.clone()], &dag_name)
647 .await
648 .is_err()
649 {
650 self.clone().increase_malicious_count().await?;
651 continue
652 }
653
654 if self.event_graph.dag_insert(slice::from_ref(&event), &dag_name).await.is_err() {
655 self.clone().increase_malicious_count().await?;
656 continue
657 }
658
659 self.broadcaster_push
660 .send(EventPut(event, blob))
661 .await
662 .expect("push broadcaster closed");
663 }
664 }
665
666 async fn handle_static_put(self: Arc<Self>) -> Result<()> {
667 let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME);
669
670 loop {
671 let (event, blob) = match self.st_put_sub.receive().await {
672 Ok(v) => (v.0.clone(), v.1.clone()),
673 Err(_) => continue,
674 };
675 trace!(
676 target: "event_graph::protocol::handle_static_put()",
677 "Got StaticPut: {} [{}]", event.id(), self.channel.address(),
678 );
679
680 if !*self.event_graph.synced.read().await {
682 debug!(
683 target: "event_graph::protocol::handle_static_put",
684 "DAG is still syncing, skipping..."
685 );
686 continue
687 }
688
689 let event_id = event.id();
690 if self.event_graph.static_dag.contains_key(event_id.as_bytes())? {
691 debug!(
692 target: "event_graph::protocol::handle_static_put()",
693 "Event {} is already known", event_id,
694 );
695 continue
696 }
697
698 let rln_account: RLNNode = match deserialize_async_partial(event.content()).await {
699 Ok((v, _)) => v,
700 Err(e) => {
701 error!(target: "event_graph::protocol::handle_static_put()","[RLN] Failed deserializing event ephemeral data: {}", e);
702 continue
703 }
704 };
705
706 if blob.is_empty() {
707 error!(target: "event_graph::protocol::handle_static_put()","[RLN] Failed to register/slash: Not enough data provided");
708 continue
709 }
710 match rln_account {
711 RLNNode::Registration(commitment) => {
712 let (proof, user_msg_limit): (Proof, u64) = match deserialize_async_partial(
713 &blob,
714 )
715 .await
716 {
717 Ok((v, _)) => v,
718 Err(e) => {
719 error!(target: "event_graph::protocol::handle_static_put()","[RLN] Failed deserializing event ephemeral data: {}", e);
720 continue
721 }
722 };
723
724 info!("registering account: {:?}", commitment);
725 let public_inputs = vec![commitment, user_msg_limit.into()];
726
727 let register_vk = read_register_vk(&self.event_graph.sled_db)?;
728 if proof.verify(®ister_vk, &public_inputs).is_err() {
729 error!(target: "event_graph::protocol::handle_static_put()", "[RLN] Incoming Event RLN Registration proof verification failed");
730 continue
731 }
732 }
733 RLNNode::Slashing(commitment) => {
734 let (proof, secret, user_msg_limit, identity_root): (
735 Proof,
736 pallas::Base,
737 u64,
738 pallas::Base,
739 ) = match deserialize_async_partial(&blob).await {
740 Ok((v, _)) => v,
741 Err(e) => {
742 error!(target: "event_graph::protocol::handle_static_put()","[RLN] Failed deserializing event ephemeral data: {}", e);
743 continue
744 }
745 };
746
747 let public_inputs =
748 vec![secret, pallas::Base::from(user_msg_limit), identity_root];
749 let slash_vk = read_slash_vk(&self.event_graph.sled_db)?;
750 if proof.verify(&slash_vk, &public_inputs).is_err() {
751 error!(target: "event_graph::protocol::handle_static_put()", "[RLN] Incoming Event RLN Slashing proof verification failed");
752 continue
753 }
754
755 let identity_secret_hash = poseidon_hash([secret, user_msg_limit.into()]);
756 let rebuilt_commitment = poseidon_hash([identity_secret_hash]);
757
758 assert_eq!(commitment, rebuilt_commitment);
759 info!("slashing account: {}", rebuilt_commitment.to_string());
760 let commitment = vec![rebuilt_commitment];
761 let commitment: Vec<_> = commitment.into_iter().map(|l| (l, l)).collect();
762
763 let mut rln_id_tree = self.event_graph.rln_identity_tree.write().await;
764 rln_id_tree.remove_leaves(commitment)?;
765 }
766 }
767
768 for parent in event.header.parents.iter() {
770 if *parent == NULL_ID {
771 continue
772 }
773 if !self.event_graph.static_dag.contains_key(parent.as_bytes())? {
774 debug!(
775 target: "event_graph::protocol::handle_static_put()",
776 "Event {} is orphan", event_id,
777 );
778 return Err(Error::EventNotFound("Event is orphan".to_owned()))
779 }
780 }
781
782 bantimes.ticktock();
785 if bantimes.count() > WINDOW_MAXSIZE {
786 self.channel.ban().await;
787 return Err(Error::MaliciousFlood)
789 }
790
791 if !event.validate_new() {
795 self.clone().increase_malicious_count().await?;
796 continue
797 }
798
799 debug!(
802 target: "event_graph::protocol::handle_event_put()",
803 "Event {} is new", event_id,
804 );
805
806 self.event_graph.static_insert(&event).await?;
807 self.event_graph.static_broadcast(event, blob).await?
808 }
809 }
810
811 async fn handle_event_req(self: Arc<Self>) -> Result<()> {
814 loop {
815 let event_ids = match self.ev_req_sub.receive().await {
816 Ok(v) => v.0.clone(),
817 Err(_) => continue,
818 };
819 trace!(
820 target: "event_graph::protocol::handle_event_req",
821 "Got EventReq: {event_ids:?} [{}]", self.channel.display_address(),
822 );
823
824 if !*self.event_graph.synced.read().await {
826 debug!(
827 target: "event_graph::protocol::handle_event_req",
828 "DAG is still syncing, skipping..."
829 );
830 continue
831 }
832
833 let mut events = vec![];
845 for event_id in event_ids.iter() {
846 if let Ok(event) = self
847 .event_graph
848 .fetch_event_from_dags(event_id)
849 .await?
850 .ok_or(Error::EventNotFound("The requested event is not found".to_owned()))
851 {
852 debug!(
855 target: "event_graph::protocol::handle_event_req()",
856 "Fetching event {:?} from DAG", event_id,
857 );
858 events.push(event);
859 } else {
860 let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
861 if malicious_count + 1 == MALICIOUS_THRESHOLD {
862 error!(
863 target: "event_graph::protocol::handle_event_req",
864 "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
865 self.channel.display_address(),
866 );
867 self.channel.stop().await;
868 return Err(Error::ChannelStopped)
869 }
870
871 warn!(
872 target: "event_graph::protocol::handle_event_req",
873 "[EVENTGRAPH] Peer {} requested an unexpected event {event_id:?}",
874 self.channel.display_address()
875 );
876 continue
877 }
878 }
879
880 let genesis_timestamp = self.event_graph.current_genesis.read().await.header.timestamp;
884 let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
885
886 for event in events.iter() {
887 if event.header.timestamp < genesis_timestamp {
888 error!(
889 target: "event_graph::protocol::handle_event_req",
890 "Requested event by peer {} is older than previous rotation period. It should have been pruned.
891 Event timestamp: `{}`. Genesis timestamp: `{genesis_timestamp}`",
892 event.id(), event.header.timestamp
893 );
894 }
895
896 for parent_id in event.header.parents.iter() {
899 if parent_id != &NULL_ID {
900 bcast_ids.insert(*parent_id);
901 }
902 }
903 }
904 drop(bcast_ids);
908
909 self.channel.send(&EventRep(events)).await?;
911 }
912 }
913
914 async fn handle_header_req(self: Arc<Self>) -> Result<()> {
918 loop {
919 let Ok(v) = self.hdr_req_sub.receive().await else { continue };
920 let (dag_name, tips) = (&v.0, &v.1);
921
922 trace!(
923 target: "event_graph::protocol::handle_tip_req",
924 "Got TipReq [{}]", self.channel.display_address(),
925 );
926
927 if !*self.event_graph.synced.read().await {
929 debug!(
930 target: "event_graph::protocol::handle_tip_req",
931 "DAG is still syncing, skipping..."
932 );
933 continue
934 }
935
936 let dag_timestamp = u64::from_str(dag_name)?;
941 let store = self.event_graph.dag_store.read().await;
942 if !store.header_dags.contains_key(&dag_timestamp) {
943 continue
944 }
945 let headers = self.event_graph.fetch_headers_with_tips(dag_name, tips).await?;
946 self.channel.send(&HeaderRep(headers)).await?;
955 }
956 }
958
959 async fn handle_tip_req(self: Arc<Self>) -> Result<()> {
963 loop {
964 let dag_name = match self.tip_req_sub.receive().await {
965 Ok(v) => v.0.clone(),
966 Err(_) => continue,
967 };
968 trace!(
969 target: "event_graph::protocol::handle_tip_req",
970 "Got TipReq [{}]", self.channel.display_address(),
971 );
972
973 if !*self.event_graph.synced.read().await {
975 debug!(
976 target: "event_graph::protocol::handle_tip_req",
977 "DAG is still syncing, skipping..."
978 );
979 continue
980 }
981
982 let layers = match dag_name.as_str() {
987 "static-dag" => {
988 let tips = self.event_graph.static_unreferenced_tips().await;
989 &tips.clone()
990 }
991 _ => {
992 let dag_timestamp = u64::from_str(&dag_name)?;
993 let store = self.event_graph.dag_store.read().await;
994 let (_, layers) = match store.header_dags.get(&dag_timestamp) {
995 Some(v) => v,
996 None => continue,
997 };
998 &layers.clone()
999 }
1000 };
1001 let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
1003 for (_, tips) in layers.iter() {
1004 for tip in tips {
1005 bcast_ids.insert(*tip);
1006 }
1007 }
1008 drop(bcast_ids);
1009
1010 self.channel.send(&TipRep(layers.clone())).await?;
1011 }
1012 }
1013
1014 async fn broadcast_rate_limiter(self: Arc<Self>) -> Result<()> {
1040 let mut ratelimit = MovingWindow::new(RATELIMIT_EXPIRY_TIME);
1041
1042 loop {
1043 let event_put = self.broadcaster_pull.recv().await.expect("pull broadcaster closed");
1044
1045 ratelimit.ticktock();
1046 if ratelimit.count() > RATELIMIT_MIN_COUNT {
1047 let sleep_time =
1048 ((ratelimit.count() - RATELIMIT_MIN_COUNT) * RATELIMIT_SAMPLE_SLEEP /
1049 (RATELIMIT_SAMPLE_IDX - RATELIMIT_MIN_COUNT)) as u64;
1050 debug!(
1051 target: "event_graph::protocol::broadcast_rate_limiter",
1052 "Activated rate limit: sleeping {sleep_time} ms [count={}]",
1053 ratelimit.count()
1054 );
1055 msleep(sleep_time).await;
1057 }
1058
1059 self.event_graph
1061 .p2p
1062 .broadcast_with_exclude(&event_put, &[self.channel.address().clone()])
1063 .await;
1064 }
1065 }
1066}
1067
1068#[cfg(test)]
1069mod test {
1070 use super::*;
1071 use std::time::UNIX_EPOCH;
1072
1073 #[test]
1074 fn test_eventgraph_moving_window_clean_future() {
1075 let mut window = MovingWindow::new(NanoTimestamp::from_secs(60));
1076 let future = UNIX_EPOCH.elapsed().unwrap().as_secs() + 100;
1077 window.times.push_back(NanoTimestamp::from_secs(future.into()));
1078 window.clean();
1079 assert_eq!(window.count(), 0);
1080 }
1081}