darkfi/event_graph/
proto.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{BTreeMap, HashSet, VecDeque},
21    slice,
22    str::FromStr,
23    sync::{
24        atomic::{AtomicUsize, Ordering::SeqCst},
25        Arc,
26    },
27};
28
29use darkfi_sdk::{
30    crypto::{poseidon_hash, util::FieldElemAsStr},
31    pasta::pallas,
32};
33use darkfi_serial::{
34    async_trait, deserialize_async_partial, serialize_async, SerialDecodable, SerialEncodable,
35};
36use smol::Executor;
37use tracing::{debug, error, info, trace, warn};
38
39use super::{
40    event::Header,
41    rln::{closest_epoch, create_slash_proof, hash_event, sss_recover, MessageMetadata, RLNNode},
42    Event, EventGraphPtr, LayerUTips, NULL_ID,
43};
44use crate::{
45    event_graph::rln::{read_register_vk, read_signal_vk, read_slash_pk, read_slash_vk, Blob},
46    impl_p2p_message,
47    net::{
48        metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
49        ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr,
50        ProtocolJobsManager, ProtocolJobsManagerPtr,
51    },
52    system::msleep,
53    util::time::NanoTimestamp,
54    zk::Proof,
55    Error, Result,
56};
57
58/// Malicious behaviour threshold. If the threshold is reached, we will
59/// drop the peer from our P2P connection.
60const MALICIOUS_THRESHOLD: usize = 5;
61
62/// Global limit of messages per window
63const WINDOW_MAXSIZE: usize = 200;
64/// Rolling length of the window
65const WINDOW_EXPIRY_TIME: NanoTimestamp = NanoTimestamp::from_secs(60);
66
67/// Rolling length of the window
68const RATELIMIT_EXPIRY_TIME: NanoTimestamp = NanoTimestamp::from_secs(10);
69/// Ratelimit kicks in above this count
70const RATELIMIT_MIN_COUNT: usize = 6;
71/// Sample point used to calculate sleep time when ratelimit is active
72const RATELIMIT_SAMPLE_IDX: usize = 10;
73/// Sleep for this amount of time when `count == RATE_LIMIT_SAMPLE_IDX`.
74const RATELIMIT_SAMPLE_SLEEP: usize = 1000;
75
76struct MovingWindow {
77    times: VecDeque<NanoTimestamp>,
78    expiry_time: NanoTimestamp,
79}
80
81impl MovingWindow {
82    fn new(expiry_time: NanoTimestamp) -> Self {
83        Self { times: VecDeque::new(), expiry_time }
84    }
85
86    /// Clean out expired timestamps from the window.
87    fn clean(&mut self) {
88        while let Some(ts) = self.times.front() {
89            let Ok(elapsed) = ts.elapsed() else {
90                debug!(target: "event_graph::protocol::MovingWindow::clean", "Timestamp [{ts}] is in future. Removing...");
91                let _ = self.times.pop_front();
92                continue
93            };
94            if elapsed < self.expiry_time {
95                break
96            }
97            let _ = self.times.pop_front();
98        }
99    }
100
101    /// Add new timestamp
102    fn ticktock(&mut self) {
103        self.clean();
104        self.times.push_back(NanoTimestamp::current_time());
105    }
106
107    #[inline]
108    fn count(&self) -> usize {
109        self.times.len()
110    }
111}
112
113/// P2P protocol implementation for the Event Graph.
114pub struct ProtocolEventGraph {
115    /// Pointer to the connected peer
116    channel: ChannelPtr,
117    /// Pointer to the Event Graph instance
118    event_graph: EventGraphPtr,
119    /// `MessageSubscriber` for `EventPut`
120    ev_put_sub: MessageSubscription<EventPut>,
121    /// `MessageSubscriber` for `StaticPut`
122    st_put_sub: MessageSubscription<StaticPut>,
123    /// `MessageSubscriber` for `EventReq`
124    ev_req_sub: MessageSubscription<EventReq>,
125    /// `MessageSubscriber` for `EventRep`
126    ev_rep_sub: MessageSubscription<EventRep>,
127    /// `MessageSubscriber` for `HeaderPut`
128    _hdr_put_sub: MessageSubscription<HeaderPut>,
129    /// `MessageSubscriber` for `HeaderReq`
130    hdr_req_sub: MessageSubscription<HeaderReq>,
131    /// `MessageSubscriber` for `HeaderRep`
132    _hdr_rep_sub: MessageSubscription<HeaderRep>,
133    /// `MessageSubscriber` for `TipReq`
134    tip_req_sub: MessageSubscription<TipReq>,
135    /// `MessageSubscriber` for `TipRep`
136    _tip_rep_sub: MessageSubscription<TipRep>,
137    /// Peer malicious message count
138    malicious_count: AtomicUsize,
139    /// P2P jobs manager pointer
140    jobsman: ProtocolJobsManagerPtr,
141    /// To apply the rate-limit, we don't broadcast directly but instead send into the
142    /// sending queue.
143    broadcaster_push: smol::channel::Sender<EventPut>,
144    /// Receive send requests and rate-limit broadcasting them.
145    broadcaster_pull: smol::channel::Receiver<EventPut>,
146}
147
148/// A P2P message representing publishing an event on the network
149#[derive(Clone, SerialEncodable, SerialDecodable)]
150pub struct EventPut(pub Event, pub Vec<u8>);
151impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
152
153/// A P2P message representing publishing an event of a static graph
154/// (most likely RLN_identities) on the network
155#[derive(Clone, SerialEncodable, SerialDecodable)]
156pub struct StaticPut(pub Event, pub Vec<u8>);
157impl_p2p_message!(StaticPut, "EventGraph::StaticPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
158
159/// A P2P message representing an event request
160#[derive(Clone, SerialEncodable, SerialDecodable)]
161pub struct EventReq(pub Vec<blake3::Hash>);
162impl_p2p_message!(EventReq, "EventGraph::EventReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
163
164/// A P2P message representing an event reply
165#[derive(Clone, SerialEncodable, SerialDecodable)]
166pub struct EventRep(pub Vec<Event>);
167impl_p2p_message!(EventRep, "EventGraph::EventRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
168
169/// A P2P message representing publishing an event's header on the network
170#[derive(Clone, SerialEncodable, SerialDecodable)]
171pub struct HeaderPut(pub Header);
172impl_p2p_message!(HeaderPut, "EventGraph::HeaderPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
173
174/// A P2P message representing a header request
175#[derive(Clone, SerialEncodable, SerialDecodable)]
176pub struct HeaderReq(pub String, pub LayerUTips);
177impl_p2p_message!(HeaderReq, "EventGraph::HeaderReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
178
179/// A P2P message representing a header reply
180#[derive(Clone, SerialEncodable, SerialDecodable)]
181pub struct HeaderRep(pub Vec<Header>);
182impl_p2p_message!(HeaderRep, "EventGraph::HeaderRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
183
184/// A P2P message representing a request for a peer's DAG tips
185#[derive(Clone, SerialEncodable, SerialDecodable)]
186pub struct TipReq(pub String);
187impl_p2p_message!(TipReq, "EventGraph::TipReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
188
189/// A P2P message representing a reply for the peer's DAG tips
190#[derive(Clone, SerialEncodable, SerialDecodable)]
191pub struct TipRep(pub LayerUTips);
192impl_p2p_message!(TipRep, "EventGraph::TipRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
193
194#[async_trait]
195impl ProtocolBase for ProtocolEventGraph {
196    async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
197        self.jobsman.clone().start(ex.clone());
198        self.jobsman.clone().spawn(self.clone().handle_event_put(), ex.clone()).await;
199        self.jobsman.clone().spawn(self.clone().handle_static_put(), ex.clone()).await;
200        self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await;
201        // self.jobsman.clone().spawn(self.clone().handle_header_put(), ex.clone()).await;
202        // self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await;
203        self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await;
204        self.jobsman.clone().spawn(self.clone().handle_tip_req(), ex.clone()).await;
205        self.jobsman.clone().spawn(self.clone().broadcast_rate_limiter(), ex.clone()).await;
206        Ok(())
207    }
208
209    fn name(&self) -> &'static str {
210        "ProtocolEventGraph"
211    }
212}
213
214impl ProtocolEventGraph {
215    pub async fn init(event_graph: EventGraphPtr, channel: ChannelPtr) -> Result<ProtocolBasePtr> {
216        let msg_subsystem = channel.message_subsystem();
217        msg_subsystem.add_dispatch::<EventPut>().await;
218        msg_subsystem.add_dispatch::<StaticPut>().await;
219        msg_subsystem.add_dispatch::<EventReq>().await;
220        msg_subsystem.add_dispatch::<EventRep>().await;
221        msg_subsystem.add_dispatch::<HeaderPut>().await;
222        msg_subsystem.add_dispatch::<HeaderReq>().await;
223        msg_subsystem.add_dispatch::<HeaderRep>().await;
224        msg_subsystem.add_dispatch::<TipReq>().await;
225        msg_subsystem.add_dispatch::<TipRep>().await;
226
227        let ev_put_sub = channel.subscribe_msg::<EventPut>().await?;
228        let st_put_sub = channel.subscribe_msg::<StaticPut>().await?;
229        let ev_req_sub = channel.subscribe_msg::<EventReq>().await?;
230        let ev_rep_sub = channel.subscribe_msg::<EventRep>().await?;
231        let _hdr_put_sub = channel.subscribe_msg::<HeaderPut>().await?;
232        let hdr_req_sub = channel.subscribe_msg::<HeaderReq>().await?;
233        let _hdr_rep_sub = channel.subscribe_msg::<HeaderRep>().await?;
234        let tip_req_sub = channel.subscribe_msg::<TipReq>().await?;
235        let _tip_rep_sub = channel.subscribe_msg::<TipRep>().await?;
236
237        let (broadcaster_push, broadcaster_pull) = smol::channel::unbounded();
238
239        Ok(Arc::new(Self {
240            channel: channel.clone(),
241            event_graph,
242            ev_put_sub,
243            st_put_sub,
244            ev_req_sub,
245            ev_rep_sub,
246            _hdr_put_sub,
247            hdr_req_sub,
248            _hdr_rep_sub,
249            tip_req_sub,
250            _tip_rep_sub,
251            malicious_count: AtomicUsize::new(0),
252            jobsman: ProtocolJobsManager::new("ProtocolEventGraph", channel.clone()),
253            broadcaster_push,
254            broadcaster_pull,
255        }))
256    }
257
258    async fn increase_malicious_count(self: Arc<Self>) -> Result<()> {
259        let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
260        if malicious_count + 1 == MALICIOUS_THRESHOLD {
261            error!(
262                target: "event_graph::protocol::handle_event_put",
263                "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
264                self.channel.display_address(),
265            );
266            self.channel.stop().await;
267            return Err(Error::ChannelStopped)
268        }
269
270        warn!(
271            target: "event_graph::protocol::handle_event_put",
272            "[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.display_address(),
273        );
274
275        Ok(())
276    }
277
278    /// Protocol function handling `EventPut`.
279    /// This is triggered whenever someone broadcasts (or relays) a new
280    /// event on the network.
281    async fn handle_event_put(self: Arc<Self>) -> Result<()> {
282        // Rolling window of event timestamps on this channel
283        let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME);
284        let mut metadata = MessageMetadata::new();
285        let mut current_epoch = 0;
286
287        loop {
288            let (event, blob) = match self.ev_put_sub.receive().await {
289                Ok(v) => (v.0.clone(), v.1.clone()),
290                Err(_) => continue,
291            };
292            trace!(
293                 target: "event_graph::protocol::handle_event_put",
294                 "Got EventPut: {} [{}]", event.id(), self.channel.display_address(),
295            );
296
297            // Check if node has finished syncing its DAG
298            if !*self.event_graph.synced.read().await {
299                debug!(
300                    target: "event_graph::protocol::handle_event_put",
301                    "DAG is still syncing, skipping..."
302                );
303                continue
304            }
305
306            let mut verification_failed = false;
307            #[allow(clippy::never_loop)]
308            loop {
309                if blob.is_empty() {
310                    break
311                }
312                let rcvd_blob: Blob = match deserialize_async_partial(&blob).await {
313                    Ok((v, _)) => v,
314                    Err(e) => {
315                        error!(target: "event_graph::protocol::handle_event_put()","[EVENTGRAPH] Failed deserializing event ephemeral data: {}", e);
316                        break
317                    }
318                };
319
320                // If the current epoch is different, we reset the stored shares
321                if current_epoch != closest_epoch(event.header.timestamp) {
322                    metadata = MessageMetadata::new()
323                }
324
325                let rln_app_identifier = pallas::Base::from(1000);
326                current_epoch = closest_epoch(event.header.timestamp);
327                let epoch = pallas::Base::from(current_epoch);
328                let external_nullifier = poseidon_hash([epoch, rln_app_identifier]);
329                let x = hash_event(&event);
330                let identity_root = self.event_graph.rln_identity_tree.read().await.root();
331                let public_inputs = vec![
332                    identity_root,
333                    external_nullifier,
334                    x,
335                    rcvd_blob.y,
336                    rcvd_blob.internal_nullifier,
337                ];
338
339                if metadata.is_duplicate(
340                    &external_nullifier,
341                    &rcvd_blob.internal_nullifier,
342                    &x,
343                    &rcvd_blob.y,
344                ) {
345                    error!(target: "event_graph::protocol::handle_event_put()", "[RLN] Duplicate Message!");
346                    verification_failed = true;
347                    break
348                }
349
350                if metadata.is_reused(&external_nullifier, &rcvd_blob.internal_nullifier) {
351                    info!(target: "event_graph::protocol::handle_event_put()", "[RLN] Metadata is reused.. slashing..");
352                    let shares =
353                        metadata.get_shares(&external_nullifier, &rcvd_blob.internal_nullifier);
354                    let secret = sss_recover(&shares);
355
356                    // Broadcast slashing event
357                    let slash_pk = read_slash_pk(&self.event_graph.sled_db)?;
358                    // let slash_pk = &self.event_graph.slash_pk;
359                    let mut identity_tree = self.event_graph.rln_identity_tree.write().await;
360
361                    info!("[RLN] Creating slashing proof");
362                    let (proof, identity_root) = match create_slash_proof(
363                        secret,
364                        rcvd_blob.user_msg_limit,
365                        &mut identity_tree,
366                        &slash_pk,
367                    ) {
368                        Ok(v) => v,
369                        Err(e) => {
370                            error!("[RLN] Failed creating RLN slash proof: {}", e);
371                            // Just use an empty "proof"
372                            (Proof::new(vec![]), pallas::Base::from(0))
373                        }
374                    };
375                    drop(identity_tree);
376
377                    let blob =
378                        serialize_async(&(proof, secret, rcvd_blob.user_msg_limit, identity_root))
379                            .await;
380
381                    let evgr = &self.event_graph;
382                    let identity_secret_hash =
383                        poseidon_hash([secret, rcvd_blob.user_msg_limit.into()]);
384                    let identity_commitment = poseidon_hash([identity_secret_hash]);
385                    let rln_commitment = RLNNode::Slashing(identity_commitment);
386                    let st_event =
387                        Event::new_static(serialize_async(&rln_commitment).await, evgr).await;
388                    evgr.static_insert(&st_event).await?;
389                    evgr.static_broadcast(st_event, blob).await?;
390
391                    verification_failed = true;
392                    break
393                }
394
395                // At this point we can safely add the shares
396                metadata.add_share(
397                    external_nullifier,
398                    rcvd_blob.internal_nullifier,
399                    x,
400                    rcvd_blob.y,
401                )?;
402
403                info!(target: "event_graph::protocol::handle_event_put()", "[RLN] Verifying incoming Event RLN proof");
404                let signal_vk = read_signal_vk(&self.event_graph.sled_db)?;
405                verification_failed = rcvd_blob.proof.verify(&signal_vk, &public_inputs).is_err();
406
407                break
408            }
409
410            if verification_failed {
411                error!(target: "event_graph::protocol::handle_event_put()", "[RLN] Incoming Event RLN Signaling proof verification failed");
412                continue
413            }
414
415            // Remove lingering messages from dag_sync event request response
416            _ = self.ev_rep_sub.clean().await;
417
418            // If we have already seen the event, we'll stay quiet.
419            let current_genesis = self.event_graph.current_genesis.read().await;
420            let genesis_timestamp = current_genesis.header.timestamp;
421            let dag_name = genesis_timestamp.to_string();
422            let hdr_tree_name = format!("headers_{dag_name}");
423            let event_id = event.id();
424            if self
425                .event_graph
426                .dag_store
427                .read()
428                .await
429                .get_dag(&hdr_tree_name)
430                .contains_key(event_id.as_bytes())
431                .unwrap()
432            {
433                debug!(
434                    target: "event_graph::protocol::handle_event_put",
435                    "Event {event_id} is already known"
436                );
437                continue
438            }
439
440            // There's a new unique event.
441            // Apply ban logic to stop network floods.
442            bantimes.ticktock();
443            if bantimes.count() > WINDOW_MAXSIZE {
444                self.channel.ban().await;
445                // This error is actually unused. We could return Ok here too.
446                return Err(Error::MaliciousFlood)
447            }
448
449            // We received an event. Check if we already have it in our DAG.
450            // Check event is not older that current genesis event timestamp.
451            // Also check if we have the event's parents. In the case we do
452            // not have the parents, we'll request them from the peer that has
453            // sent this event to us. In case they do not reply in time, we drop
454            // the event.
455
456            // Check if the event is older than the genesis event. If so, we should
457            // not include it in our Dag.
458            // The genesis event marks the last time the Dag has been pruned of old
459            // events. The pruning interval is defined by the days_rotation field
460            // of [`EventGraph`].
461            if event.header.timestamp < genesis_timestamp {
462                debug!(
463                    target: "event_graph::protocol::handle_event_put",
464                    "Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{genesis_timestamp}`",
465                event.id(), event.header.timestamp
466                );
467            }
468
469            // Validate the new event first. If we do not consider it valid, we
470            // will just drop it and stay quiet. If the malicious threshold
471            // is reached, we will stop the connection.
472            if !event.validate_new() {
473                self.clone().increase_malicious_count().await?;
474                continue
475            }
476
477            // At this point, this is a new event to us. Let's see if we
478            // have all of its parents.
479            debug!(
480                target: "event_graph::protocol::handle_event_put",
481                "Event {event_id} is new"
482            );
483
484            let mut missing_parents = HashSet::new();
485            for parent_id in event.header.parents.iter() {
486                // `event.validate_new()` should have already made sure that
487                // not all parents are NULL, and that there are no duplicates.
488                if parent_id == &NULL_ID {
489                    continue
490                }
491
492                if !self
493                    .event_graph
494                    .dag_store
495                    .read()
496                    .await
497                    .get_dag(&hdr_tree_name)
498                    .contains_key(parent_id.as_bytes())
499                    .unwrap()
500                {
501                    missing_parents.insert(*parent_id);
502                }
503            }
504
505            // If we have missing parents, then we have to attempt to
506            // fetch them from this peer. Do this recursively until we
507            // find all of them.
508            if !missing_parents.is_empty() {
509                // We track the received events mapped by their layer.
510                // If/when we get all of them, we need to insert them in order so
511                // the DAG state stays correct and unreferenced tips represent the
512                // actual thing they should. If we insert them out of order, then
513                // we might have wrong unreferenced tips.
514                let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
515                let mut received_events_hashes = HashSet::new();
516
517                debug!(
518                    target: "event_graph::protocol::handle_event_put",
519                    "Event has {} missing parents. Requesting...", missing_parents.len(),
520                );
521
522                let current_genesis = self.event_graph.current_genesis.read().await;
523                let dag_name = current_genesis.header.timestamp.to_string();
524                let hdr_tree_name = format!("headers_{dag_name}");
525
526                while !missing_parents.is_empty() {
527                    // for parent_id in missing_parents.clone().iter() {
528                    debug!(
529                        target: "event_graph::protocol::handle_event_put",
530                        "Requesting {missing_parents:?}..."
531                    );
532
533                    self.channel
534                        .send(&EventReq(missing_parents.clone().into_iter().collect()))
535                        .await?;
536
537                    // Node waits for response
538                    let Ok(parents) = self
539                        .ev_rep_sub
540                        .receive_with_timeout(
541                            self.event_graph
542                                .p2p
543                                .settings()
544                                .read()
545                                .await
546                                .outbound_connect_timeout_max(),
547                        )
548                        .await
549                    else {
550                        error!(
551                            target: "event_graph::protocol::handle_event_put",
552                            "[EVENTGRAPH] Timeout while waiting for parents {missing_parents:?} from {}",
553                            self.channel.display_address(),
554                        );
555                        self.channel.stop().await;
556                        return Err(Error::ChannelStopped)
557                    };
558
559                    let parents = parents.0.clone();
560
561                    for parent in parents {
562                        let parent_id = parent.id();
563                        if !missing_parents.contains(&parent_id) {
564                            error!(
565                                target: "event_graph::protocol::handle_event_put",
566                                "[EVENTGRAPH] Peer {} replied with a wrong event: {}",
567                                self.channel.display_address(), parent.id(),
568                            );
569                            self.channel.stop().await;
570                            return Err(Error::ChannelStopped)
571                        }
572
573                        debug!(
574                            target: "event_graph::protocol::handle_event_put",
575                            "Got correct parent event {}", parent.id(),
576                        );
577
578                        if let Some(layer_events) = received_events.get_mut(&parent.header.layer) {
579                            layer_events.push(parent.clone());
580                        } else {
581                            let layer_events = vec![parent.clone()];
582                            received_events.insert(parent.header.layer, layer_events);
583                        }
584                        received_events_hashes.insert(parent_id);
585
586                        missing_parents.remove(&parent_id);
587
588                        // See if we have the upper parents
589                        for upper_parent in parent.header.parents.iter() {
590                            if upper_parent == &NULL_ID {
591                                continue
592                            }
593
594                            if !missing_parents.contains(upper_parent) &&
595                                !received_events_hashes.contains(upper_parent) &&
596                                !self
597                                    .event_graph
598                                    .dag_store
599                                    .read()
600                                    .await
601                                    .get_dag(&hdr_tree_name)
602                                    .contains_key(upper_parent.as_bytes())
603                                    .unwrap()
604                            {
605                                debug!(
606                                    target: "event_graph::protocol::handle_event_put",
607                                    "Found upper missing parent event {upper_parent}"
608                                );
609                                missing_parents.insert(*upper_parent);
610                            }
611                        }
612                    }
613                } // <-- while !missing_parents.is_empty()
614
615                // At this point we should've got all the events.
616                // We should add them to the DAG.
617                let mut events = vec![];
618                for (_, tips) in received_events {
619                    for tip in tips {
620                        events.push(tip);
621                    }
622                }
623                let headers = events.iter().map(|x| x.header.clone()).collect();
624                if self.event_graph.header_dag_insert(headers, &dag_name).await.is_err() {
625                    self.clone().increase_malicious_count().await?;
626                    continue
627                }
628                // FIXME
629                if !self.event_graph.fast_mode &&
630                    self.event_graph.dag_insert(&events, &dag_name).await.is_err()
631                {
632                    self.clone().increase_malicious_count().await?;
633                    continue
634                }
635            } // <-- !missing_parents.is_empty()
636
637            // If we're here, we have all the parents, and we can now
638            // perform a full validation and add the actual event to
639            // the DAG.
640            debug!(
641                target: "event_graph::protocol::handle_event_put",
642                "Got all parents necessary for insertion",
643            );
644            if self
645                .event_graph
646                .header_dag_insert(vec![event.header.clone()], &dag_name)
647                .await
648                .is_err()
649            {
650                self.clone().increase_malicious_count().await?;
651                continue
652            }
653
654            if self.event_graph.dag_insert(slice::from_ref(&event), &dag_name).await.is_err() {
655                self.clone().increase_malicious_count().await?;
656                continue
657            }
658
659            self.broadcaster_push
660                .send(EventPut(event, blob))
661                .await
662                .expect("push broadcaster closed");
663        }
664    }
665
666    async fn handle_static_put(self: Arc<Self>) -> Result<()> {
667        // Rolling window of event timestamps on this channel
668        let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME);
669
670        loop {
671            let (event, blob) = match self.st_put_sub.receive().await {
672                Ok(v) => (v.0.clone(), v.1.clone()),
673                Err(_) => continue,
674            };
675            trace!(
676                 target: "event_graph::protocol::handle_static_put()",
677                 "Got StaticPut: {} [{}]", event.id(), self.channel.address(),
678            );
679
680            // Check if node has finished syncing its DAG
681            if !*self.event_graph.synced.read().await {
682                debug!(
683                    target: "event_graph::protocol::handle_static_put",
684                    "DAG is still syncing, skipping..."
685                );
686                continue
687            }
688
689            let event_id = event.id();
690            if self.event_graph.static_dag.contains_key(event_id.as_bytes())? {
691                debug!(
692                    target: "event_graph::protocol::handle_static_put()",
693                    "Event {} is already known", event_id,
694                );
695                continue
696            }
697
698            let rln_account: RLNNode = match deserialize_async_partial(event.content()).await {
699                Ok((v, _)) => v,
700                Err(e) => {
701                    error!(target: "event_graph::protocol::handle_static_put()","[RLN] Failed deserializing event ephemeral data: {}", e);
702                    continue
703                }
704            };
705
706            if blob.is_empty() {
707                error!(target: "event_graph::protocol::handle_static_put()","[RLN] Failed to register/slash: Not enough data provided");
708                continue
709            }
710            match rln_account {
711                RLNNode::Registration(commitment) => {
712                    let (proof, user_msg_limit): (Proof, u64) = match deserialize_async_partial(
713                        &blob,
714                    )
715                    .await
716                    {
717                        Ok((v, _)) => v,
718                        Err(e) => {
719                            error!(target: "event_graph::protocol::handle_static_put()","[RLN] Failed deserializing event ephemeral data: {}", e);
720                            continue
721                        }
722                    };
723
724                    info!("registering account: {:?}", commitment);
725                    let public_inputs = vec![commitment, user_msg_limit.into()];
726
727                    let register_vk = read_register_vk(&self.event_graph.sled_db)?;
728                    if proof.verify(&register_vk, &public_inputs).is_err() {
729                        error!(target: "event_graph::protocol::handle_static_put()", "[RLN] Incoming Event RLN Registration proof verification failed");
730                        continue
731                    }
732                }
733                RLNNode::Slashing(commitment) => {
734                    let (proof, secret, user_msg_limit, identity_root): (
735                        Proof,
736                        pallas::Base,
737                        u64,
738                        pallas::Base,
739                    ) = match deserialize_async_partial(&blob).await {
740                        Ok((v, _)) => v,
741                        Err(e) => {
742                            error!(target: "event_graph::protocol::handle_static_put()","[RLN] Failed deserializing event ephemeral data: {}", e);
743                            continue
744                        }
745                    };
746
747                    let public_inputs =
748                        vec![secret, pallas::Base::from(user_msg_limit), identity_root];
749                    let slash_vk = read_slash_vk(&self.event_graph.sled_db)?;
750                    if proof.verify(&slash_vk, &public_inputs).is_err() {
751                        error!(target: "event_graph::protocol::handle_static_put()", "[RLN] Incoming Event RLN Slashing proof verification failed");
752                        continue
753                    }
754
755                    let identity_secret_hash = poseidon_hash([secret, user_msg_limit.into()]);
756                    let rebuilt_commitment = poseidon_hash([identity_secret_hash]);
757
758                    assert_eq!(commitment, rebuilt_commitment);
759                    info!("slashing account: {}", rebuilt_commitment.to_string());
760                    let commitment = vec![rebuilt_commitment];
761                    let commitment: Vec<_> = commitment.into_iter().map(|l| (l, l)).collect();
762
763                    let mut rln_id_tree = self.event_graph.rln_identity_tree.write().await;
764                    rln_id_tree.remove_leaves(commitment)?;
765                }
766            }
767
768            // Check if event's parents are in the static DAG
769            for parent in event.header.parents.iter() {
770                if *parent == NULL_ID {
771                    continue
772                }
773                if !self.event_graph.static_dag.contains_key(parent.as_bytes())? {
774                    debug!(
775                        target: "event_graph::protocol::handle_static_put()",
776                        "Event {} is orphan", event_id,
777                    );
778                    return Err(Error::EventNotFound("Event is orphan".to_owned()))
779                }
780            }
781
782            // There's a new unique event.
783            // Apply ban logic to stop network floods.
784            bantimes.ticktock();
785            if bantimes.count() > WINDOW_MAXSIZE {
786                self.channel.ban().await;
787                // This error is actually unused. We could return Ok here too.
788                return Err(Error::MaliciousFlood)
789            }
790
791            // Validate the new event first. If we do not consider it valid, we
792            // will just drop it and stay quiet. If the malicious threshold
793            // is reached, we will stop the connection.
794            if !event.validate_new() {
795                self.clone().increase_malicious_count().await?;
796                continue
797            }
798
799            // At this point, this is a new event to us. Let's see if we
800            // have all of its parents.
801            debug!(
802                target: "event_graph::protocol::handle_event_put()",
803                "Event {} is new", event_id,
804            );
805
806            self.event_graph.static_insert(&event).await?;
807            self.event_graph.static_broadcast(event, blob).await?
808        }
809    }
810
811    /// Protocol function handling `EventReq`.
812    /// This is triggered whenever someone requests an event from us.
813    async fn handle_event_req(self: Arc<Self>) -> Result<()> {
814        loop {
815            let event_ids = match self.ev_req_sub.receive().await {
816                Ok(v) => v.0.clone(),
817                Err(_) => continue,
818            };
819            trace!(
820                target: "event_graph::protocol::handle_event_req",
821                "Got EventReq: {event_ids:?} [{}]", self.channel.display_address(),
822            );
823
824            // Check if node has finished syncing its DAG
825            if !*self.event_graph.synced.read().await {
826                debug!(
827                    target: "event_graph::protocol::handle_event_req",
828                    "DAG is still syncing, skipping..."
829                );
830                continue
831            }
832
833            // We received an event request from somebody.
834            // If we do have it, we will send it back to them as `EventRep`.
835            // Otherwise, we'll stay quiet. An honest node should always have
836            // something to reply with provided that the request is legitimate,
837            // i.e. we've sent something to them and they did not haveinfo some of
838            // the parents.
839
840            // Check if we expected this request to come around.
841            // I dunno if this is a good idea, but it seems it will help
842            // against malicious event requests where they want us to keep
843            // reading our db and steal our bandwidth.
844            let mut events = vec![];
845            for event_id in event_ids.iter() {
846                if let Ok(event) = self
847                    .event_graph
848                    .fetch_event_from_dags(event_id)
849                    .await?
850                    .ok_or(Error::EventNotFound("The requested event is not found".to_owned()))
851                {
852                    // At this point we should have it in our DAG.
853                    // This code panics if this is not the case.
854                    debug!(
855                        target: "event_graph::protocol::handle_event_req()",
856                        "Fetching event {:?} from DAG", event_id,
857                    );
858                    events.push(event);
859                } else {
860                    let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
861                    if malicious_count + 1 == MALICIOUS_THRESHOLD {
862                        error!(
863                            target: "event_graph::protocol::handle_event_req",
864                            "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
865                            self.channel.display_address(),
866                        );
867                        self.channel.stop().await;
868                        return Err(Error::ChannelStopped)
869                    }
870
871                    warn!(
872                        target: "event_graph::protocol::handle_event_req",
873                        "[EVENTGRAPH] Peer {} requested an unexpected event {event_id:?}",
874                        self.channel.display_address()
875                    );
876                    continue
877                }
878            }
879
880            // Check if the incoming event is older than the genesis event. If so, something
881            // has gone wrong. The event should have been pruned during the last
882            // rotation.
883            let genesis_timestamp = self.event_graph.current_genesis.read().await.header.timestamp;
884            let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
885
886            for event in events.iter() {
887                if event.header.timestamp < genesis_timestamp {
888                    error!(
889                        target: "event_graph::protocol::handle_event_req",
890                        "Requested event by peer {} is older than previous rotation period. It should have been pruned.
891                    Event timestamp: `{}`. Genesis timestamp: `{genesis_timestamp}`",
892                    event.id(), event.header.timestamp
893                    );
894                }
895
896                // Now let's get the upper level of event IDs. When we reply, we could
897                // get requests for those IDs as well.
898                for parent_id in event.header.parents.iter() {
899                    if parent_id != &NULL_ID {
900                        bcast_ids.insert(*parent_id);
901                    }
902                }
903            }
904            // TODO: We should remove the reply from the bcast IDs for this specific channel.
905            //       We can't remove them for everyone.
906            //bcast_ids.remove(&event_id);
907            drop(bcast_ids);
908
909            // Reply with the event
910            self.channel.send(&EventRep(events)).await?;
911        }
912    }
913
914    /// Protocol function handling `HeaderReq`.
915    /// This is triggered whenever someone requests syncing headers by
916    /// sending their current headers.
917    async fn handle_header_req(self: Arc<Self>) -> Result<()> {
918        loop {
919            let Ok(v) = self.hdr_req_sub.receive().await else { continue };
920            let (dag_name, tips) = (&v.0, &v.1);
921
922            trace!(
923                target: "event_graph::protocol::handle_tip_req",
924                "Got TipReq [{}]", self.channel.display_address(),
925            );
926
927            // Check if node has finished syncing its DAG
928            if !*self.event_graph.synced.read().await {
929                debug!(
930                    target: "event_graph::protocol::handle_tip_req",
931                    "DAG is still syncing, skipping..."
932                );
933                continue
934            }
935
936            // TODO: Rate limit
937
938            // We received header request. Let's find them, add them to
939            // our bcast ids list, and reply with them.
940            let dag_timestamp = u64::from_str(dag_name)?;
941            let store = self.event_graph.dag_store.read().await;
942            if !store.header_dags.contains_key(&dag_timestamp) {
943                continue
944            }
945            let headers = self.event_graph.fetch_headers_with_tips(dag_name, tips).await?;
946            // let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
947            // for (_, tips) in layers.iter() {
948            //     for tip in tips {
949            //         bcast_ids.insert(*tip);
950            //     }
951            // }
952            // drop(bcast_ids);
953
954            self.channel.send(&HeaderRep(headers)).await?;
955        }
956        // Ok(())
957    }
958
959    /// Protocol function handling `TipReq`.
960    /// This is triggered when someone requests the current unreferenced
961    /// tips of our DAG.
962    async fn handle_tip_req(self: Arc<Self>) -> Result<()> {
963        loop {
964            let dag_name = match self.tip_req_sub.receive().await {
965                Ok(v) => v.0.clone(),
966                Err(_) => continue,
967            };
968            trace!(
969                target: "event_graph::protocol::handle_tip_req",
970                "Got TipReq [{}]", self.channel.display_address(),
971            );
972
973            // Check if node has finished syncing its DAG
974            if !*self.event_graph.synced.read().await {
975                debug!(
976                    target: "event_graph::protocol::handle_tip_req",
977                    "DAG is still syncing, skipping..."
978                );
979                continue
980            }
981
982            // TODO: Rate limit
983
984            // We received a tip request. Let's find them, add them to
985            // our bcast ids list, and reply with them.
986            let layers = match dag_name.as_str() {
987                "static-dag" => {
988                    let tips = self.event_graph.static_unreferenced_tips().await;
989                    &tips.clone()
990                }
991                _ => {
992                    let dag_timestamp = u64::from_str(&dag_name)?;
993                    let store = self.event_graph.dag_store.read().await;
994                    let (_, layers) = match store.header_dags.get(&dag_timestamp) {
995                        Some(v) => v,
996                        None => continue,
997                    };
998                    &layers.clone()
999                }
1000            };
1001            // let layers = self.event_graph.dag_store.read().await.find_unreferenced_tips(&dag_name).await;
1002            let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
1003            for (_, tips) in layers.iter() {
1004                for tip in tips {
1005                    bcast_ids.insert(*tip);
1006                }
1007            }
1008            drop(bcast_ids);
1009
1010            self.channel.send(&TipRep(layers.clone())).await?;
1011        }
1012    }
1013
1014    /// We need to rate limit message propagation so malicious nodes don't get us banned
1015    /// for flooding. We do that by aggregating messages here into a queue then apply
1016    /// rate limit logic before broadcasting.
1017    ///
1018    /// The rate limit logic is this:
1019    ///
1020    /// * If the count is less then RATELIMIT_MIN_COUNT then do nothing.
1021    /// * Otherwise sleep for `sleep_time` ms.
1022    ///
1023    /// To calculate the sleep time, we use the RATELIMIT_SAMPLE_* values.
1024    /// For example RATELIMIT_SAMPLE_IDX = 10, RATELIMIT_SAMPLE_SLEEP = 1000
1025    /// means that when N = 10, then sleep for 1000 ms.
1026    ///
1027    /// Let RATELIMIT_MIN_COUNT = 6, then here's a table of sleep times:
1028    ///
1029    /// | Count | Sleep Time / ms |
1030    /// |-------|-----------------|
1031    /// | 0     | 0               |
1032    /// | 4     | 0               |
1033    /// | 6     | 0               |
1034    /// | 10    | 1000            |
1035    /// | 14    | 2000            |
1036    /// | 18    | 3000            |
1037    ///
1038    /// So we use the sample to calculate a straight line from RATELIMIT_MIN_COUNT.
1039    async fn broadcast_rate_limiter(self: Arc<Self>) -> Result<()> {
1040        let mut ratelimit = MovingWindow::new(RATELIMIT_EXPIRY_TIME);
1041
1042        loop {
1043            let event_put = self.broadcaster_pull.recv().await.expect("pull broadcaster closed");
1044
1045            ratelimit.ticktock();
1046            if ratelimit.count() > RATELIMIT_MIN_COUNT {
1047                let sleep_time =
1048                    ((ratelimit.count() - RATELIMIT_MIN_COUNT) * RATELIMIT_SAMPLE_SLEEP /
1049                        (RATELIMIT_SAMPLE_IDX - RATELIMIT_MIN_COUNT)) as u64;
1050                debug!(
1051                    target: "event_graph::protocol::broadcast_rate_limiter",
1052                    "Activated rate limit: sleeping {sleep_time} ms [count={}]",
1053                    ratelimit.count()
1054                );
1055                // Apply the ratelimit
1056                msleep(sleep_time).await;
1057            }
1058
1059            // Relay the event to other peers.
1060            self.event_graph
1061                .p2p
1062                .broadcast_with_exclude(&event_put, &[self.channel.address().clone()])
1063                .await;
1064        }
1065    }
1066}
1067
1068#[cfg(test)]
1069mod test {
1070    use super::*;
1071    use std::time::UNIX_EPOCH;
1072
1073    #[test]
1074    fn test_eventgraph_moving_window_clean_future() {
1075        let mut window = MovingWindow::new(NanoTimestamp::from_secs(60));
1076        let future = UNIX_EPOCH.elapsed().unwrap().as_secs() + 100;
1077        window.times.push_back(NanoTimestamp::from_secs(future.into()));
1078        window.clean();
1079        assert_eq!(window.count(), 0);
1080    }
1081}