darkfi/event_graph/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{BTreeMap, HashMap, HashSet, VecDeque},
21    path::PathBuf,
22    sync::Arc,
23};
24
25use darkfi_serial::{deserialize_async, serialize_async};
26use num_bigint::BigUint;
27use sled_overlay::{sled, SledTreeOverlay};
28use smol::{
29    lock::{OnceCell, RwLock},
30    Executor,
31};
32use tracing::{debug, error, info, warn};
33
34use crate::{
35    event_graph::util::replayer_log,
36    net::P2pPtr,
37    system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
38    Error, Result,
39};
40
41#[cfg(feature = "rpc")]
42use {
43    crate::rpc::{
44        jsonrpc::{JsonResponse, JsonResult},
45        util::json_map,
46    },
47    tinyjson::JsonValue::{self},
48};
49
50/// An event graph event
51pub mod event;
52pub use event::Event;
53
54/// P2P protocol implementation for the Event Graph
55pub mod proto;
56use proto::{EventRep, EventReq, TipRep, TipReq};
57
58/// Utility functions
59pub mod util;
60use util::{generate_genesis, millis_until_next_rotation, next_rotation_timestamp};
61
62// Debugging event graph
63pub mod deg;
64use deg::DegEvent;
65
66#[cfg(test)]
67mod tests;
68
69/// Initial genesis timestamp in millis (07 Sep 2023, 00:00:00 UTC)
70/// Must always be UTC midnight.
71pub const INITIAL_GENESIS: u64 = 1_694_044_800_000;
72/// Genesis event contents
73pub const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
74
75/// The number of parents an event is supposed to have.
76pub const N_EVENT_PARENTS: usize = 5;
77/// Allowed timestamp drift in milliseconds
78const EVENT_TIME_DRIFT: u64 = 60_000;
79/// Null event ID
80pub const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]);
81
82/// Atomic pointer to an [`EventGraph`] instance.
83pub type EventGraphPtr = Arc<EventGraph>;
84
85/// An Event Graph instance
86pub struct EventGraph {
87    /// Pointer to the P2P network instance
88    p2p: P2pPtr,
89    /// Sled tree containing the DAG
90    dag: sled::Tree,
91    /// Replay logs path.
92    datastore: PathBuf,
93    /// Run in replay_mode where if set we log Sled DB instructions
94    /// into `datastore`, useful to reacreate a faulty DAG to debug.
95    replay_mode: bool,
96    /// The set of unreferenced DAG tips
97    unreferenced_tips: RwLock<BTreeMap<u64, HashSet<blake3::Hash>>>,
98    /// A `HashSet` containg event IDs and their 1-level parents.
99    /// These come from the events we've sent out using `EventPut`.
100    /// They are used with `EventReq` to decide if we should reply
101    /// or not. Additionally it is also used when we broadcast the
102    /// `TipRep` message telling peers about our unreferenced tips.
103    broadcasted_ids: RwLock<HashSet<blake3::Hash>>,
104    /// DAG Pruning Task
105    pub prune_task: OnceCell<StoppableTaskPtr>,
106    /// Event publisher, this notifies whenever an event is
107    /// inserted into the DAG
108    pub event_pub: PublisherPtr<Event>,
109    /// Current genesis event
110    current_genesis: RwLock<Event>,
111    /// Currently configured DAG rotation, in days
112    days_rotation: u64,
113    /// Flag signalling DAG has finished initial sync
114    pub synced: RwLock<bool>,
115    /// Enable graph debugging
116    pub deg_enabled: RwLock<bool>,
117    /// The publisher for which we can give deg info over
118    deg_publisher: PublisherPtr<DegEvent>,
119}
120
121impl EventGraph {
122    /// Create a new [`EventGraph`] instance, creates a new Genesis
123    /// event and checks if it
124    /// is containd in DAG, if not prunes DAG, may also start a pruning
125    /// task based on `days_rotation`, and return an atomic instance of
126    /// `Self`
127    /// * `p2p` atomic pointer to p2p.
128    /// * `sled_db` sled DB instance.
129    /// * `datastore` path where we should log db instrucion if run in
130    ///   replay mode.
131    /// * `replay_mode` set the flag to keep a log of db instructions.
132    /// * `dag_tree_name` the name of disk-backed tree (or DAG name).
133    /// * `days_rotation` marks the lifetime of the DAG before it's
134    ///   pruned.
135    pub async fn new(
136        p2p: P2pPtr,
137        sled_db: sled::Db,
138        datastore: PathBuf,
139        replay_mode: bool,
140        dag_tree_name: &str,
141        days_rotation: u64,
142        ex: Arc<Executor<'_>>,
143    ) -> Result<EventGraphPtr> {
144        let dag = sled_db.open_tree(dag_tree_name)?;
145        let unreferenced_tips = RwLock::new(BTreeMap::new());
146        let broadcasted_ids = RwLock::new(HashSet::new());
147        let event_pub = Publisher::new();
148
149        // Create the current genesis event based on the `days_rotation`
150        let current_genesis = generate_genesis(days_rotation);
151        let self_ = Arc::new(Self {
152            p2p,
153            dag: dag.clone(),
154            datastore,
155            replay_mode,
156            unreferenced_tips,
157            broadcasted_ids,
158            prune_task: OnceCell::new(),
159            event_pub,
160            current_genesis: RwLock::new(current_genesis.clone()),
161            days_rotation,
162            synced: RwLock::new(false),
163            deg_enabled: RwLock::new(false),
164            deg_publisher: Publisher::new(),
165        });
166
167        // Check if we have it in our DAG.
168        // If not, we can prune the DAG and insert this new genesis event.
169        if !dag.contains_key(current_genesis.id().as_bytes())? {
170            info!(
171                target: "event_graph::new",
172                "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
173            );
174            self_.dag_prune(current_genesis).await?;
175        }
176
177        // Find the unreferenced tips in the current DAG state.
178        *self_.unreferenced_tips.write().await = self_.find_unreferenced_tips().await;
179
180        // Spawn the DAG pruning task
181        if days_rotation > 0 {
182            let prune_task = StoppableTask::new();
183            let _ = self_.prune_task.set(prune_task.clone()).await;
184
185            prune_task.clone().start(
186                self_.clone().dag_prune_task(days_rotation),
187                |res| async move {
188                    match res {
189                        Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
190                        Err(e) => error!(target: "event_graph::_handle_stop", "[EVENTGRAPH] Failed stopping prune task: {e}")
191                    }
192                },
193                Error::DetachedTaskStopped,
194                ex.clone(),
195            );
196        }
197
198        Ok(self_)
199    }
200
201    pub fn days_rotation(&self) -> u64 {
202        self.days_rotation
203    }
204
205    /// Sync the DAG from connected peers
206    pub async fn dag_sync(&self) -> Result<()> {
207        // We do an optimistic sync where we ask all our connected peers for
208        // the latest layer DAG tips (unreferenced events) and then we accept
209        // the ones we see the most times.
210        // * Compare received tips with local ones, identify which we are missing.
211        // * Request these from peers
212        // * Recursively request these backward
213        //
214        // Verification:
215        // * Timestamps should go backwards
216        // * Cross-check with multiple peers, this means we should request the
217        //   same event from multiple peers and make sure it is the same.
218        // * Since we should be pruning, if we're not synced after some reasonable
219        //   amount of iterations, these could be faulty peers and we can try again
220        //   from the beginning
221
222        // Get references to all our peers.
223        let channels = self.p2p.hosts().peers();
224        let mut communicated_peers = channels.len();
225        info!(
226            target: "event_graph::dag_sync",
227            "[EVENTGRAPH] Syncing DAG from {communicated_peers} peers..."
228        );
229
230        // Here we keep track of the tips, their layers and how many time we've seen them.
231        let mut tips: HashMap<blake3::Hash, (u64, usize)> = HashMap::new();
232
233        // Let's first ask all of our peers for their tips and collect them
234        // in our hashmap above.
235        for channel in channels.iter() {
236            let url = channel.display_address();
237
238            let tip_rep_sub = match channel.subscribe_msg::<TipRep>().await {
239                Ok(v) => v,
240                Err(e) => {
241                    error!(
242                        target: "event_graph::dag_sync",
243                        "[EVENTGRAPH] Sync: Couldn't subscribe TipReq for peer {url}, skipping ({e})"
244                    );
245                    communicated_peers -= 1;
246                    continue
247                }
248            };
249
250            if let Err(e) = channel.send(&TipReq {}).await {
251                error!(
252                    target: "event_graph::dag_sync",
253                    "[EVENTGRAPH] Sync: Couldn't contact peer {url}, skipping ({e})"
254                );
255                communicated_peers -= 1;
256                continue
257            };
258
259            let outbound_connect_timeout = self
260                .p2p
261                .settings()
262                .read_arc()
263                .await
264                .outbound_connect_timeout(channel.address().scheme());
265            // Node waits for response
266            let Ok(peer_tips) = tip_rep_sub.receive_with_timeout(outbound_connect_timeout).await
267            else {
268                error!(
269                    target: "event_graph::dag_sync",
270                    "[EVENTGRAPH] Sync: Peer {url} didn't reply with tips in time, skipping"
271                );
272                communicated_peers -= 1;
273                continue
274            };
275
276            let peer_tips = &peer_tips.0;
277
278            // Note down the seen tips
279            for (layer, layer_tips) in peer_tips {
280                for tip in layer_tips {
281                    if let Some(seen_tip) = tips.get_mut(tip) {
282                        seen_tip.1 += 1;
283                    } else {
284                        tips.insert(*tip, (*layer, 1));
285                    }
286                }
287            }
288        }
289
290        // After we've communicated all the peers, let's see what happened.
291        if tips.is_empty() {
292            error!(
293                target: "event_graph::dag_sync",
294                "[EVENTGRAPH] Sync: Could not find any DAG tips",
295            );
296            return Err(Error::DagSyncFailed)
297        }
298
299        // We know the number of peers we've communicated with,
300        // so we will consider events we saw at more than 2/3 of
301        // those peers.
302        let consideration_threshold = communicated_peers * 2 / 3;
303        let mut considered_tips = HashSet::new();
304        for (tip, (_, amount)) in tips.iter() {
305            if amount > &consideration_threshold {
306                considered_tips.insert(*tip);
307            }
308        }
309        drop(tips);
310
311        // Now begin fetching the events backwards.
312        let mut missing_parents = HashSet::new();
313        for tip in considered_tips.iter() {
314            assert!(tip != &NULL_ID);
315
316            if !self.dag.contains_key(tip.as_bytes()).unwrap() {
317                missing_parents.insert(*tip);
318            }
319        }
320
321        if missing_parents.is_empty() {
322            *self.synced.write().await = true;
323            info!(target: "event_graph::dag_sync", "[EVENTGRAPH] DAG synced successfully!");
324            return Ok(())
325        }
326
327        info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Fetching events");
328        let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
329        let mut received_events_hashes = HashSet::new();
330
331        while !missing_parents.is_empty() {
332            let mut found_event = false;
333
334            for channel in channels.iter() {
335                let url = channel.display_address();
336
337                debug!(
338                    target: "event_graph::dag_sync",
339                    "Requesting {missing_parents:?} from {url}..."
340                );
341
342                let ev_rep_sub = match channel.subscribe_msg::<EventRep>().await {
343                    Ok(v) => v,
344                    Err(e) => {
345                        error!(
346                            target: "event_graph::dag_sync",
347                            "[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {url}, skipping ({e})"
348                        );
349                        continue
350                    }
351                };
352
353                let request_missing_events = missing_parents.clone().into_iter().collect();
354                if let Err(e) = channel.send(&EventReq(request_missing_events)).await {
355                    error!(
356                        target: "event_graph::dag_sync",
357                        "[EVENTGRAPH] Sync: Failed communicating EventReq({missing_parents:?}) to {url}: {e}"
358                    );
359                    continue
360                }
361
362                let outbound_connect_timeout = self
363                    .p2p
364                    .settings()
365                    .read_arc()
366                    .await
367                    .outbound_connect_timeout(channel.address().scheme());
368                // Node waits for response
369                let Ok(parent) = ev_rep_sub.receive_with_timeout(outbound_connect_timeout).await
370                else {
371                    error!(
372                        target: "event_graph::dag_sync",
373                        "[EVENTGRAPH] Sync: Timeout waiting for parents {missing_parents:?} from {url}"
374                    );
375                    continue
376                };
377
378                let parents = parent.0.clone();
379
380                for parent in parents {
381                    let parent_id = parent.id();
382                    if !missing_parents.contains(&parent_id) {
383                        error!(
384                            target: "event_graph::dag_sync",
385                            "[EVENTGRAPH] Sync: Peer {url} replied with a wrong event: {}",
386                            parent.id()
387                        );
388                        continue
389                    }
390
391                    debug!(
392                        target: "event_graph::dag_sync",
393                        "Got correct parent event {parent_id}"
394                    );
395
396                    if let Some(layer_events) = received_events.get_mut(&parent.layer) {
397                        layer_events.push(parent.clone());
398                    } else {
399                        let layer_events = vec![parent.clone()];
400                        received_events.insert(parent.layer, layer_events);
401                    }
402                    received_events_hashes.insert(parent_id);
403
404                    missing_parents.remove(&parent_id);
405                    found_event = true;
406
407                    // See if we have the upper parents
408                    for upper_parent in parent.parents.iter() {
409                        if upper_parent == &NULL_ID {
410                            continue
411                        }
412
413                        if !missing_parents.contains(upper_parent) &&
414                            !received_events_hashes.contains(upper_parent) &&
415                            !self.dag.contains_key(upper_parent.as_bytes()).unwrap()
416                        {
417                            debug!(
418                                target: "event_graph::dag_sync",
419                                "Found upper missing parent event {upper_parent}"
420                            );
421                            missing_parents.insert(*upper_parent);
422                        }
423                    }
424                }
425
426                break
427            }
428
429            if !found_event {
430                error!(
431                    target: "event_graph::dag_sync",
432                    "[EVENTGRAPH] Sync: Failed to get all events",
433                );
434                return Err(Error::DagSyncFailed)
435            }
436        } // <-- while !missing_parents.is_empty
437
438        // At this point we should've got all the events.
439        // We should add them to the DAG.
440        let mut events = vec![];
441        for (_, tips) in received_events {
442            for tip in tips {
443                events.push(tip);
444            }
445        }
446        self.dag_insert(&events).await?;
447
448        *self.synced.write().await = true;
449
450        info!(target: "event_graph::dag_sync", "[EVENTGRAPH] DAG synced successfully!");
451        Ok(())
452    }
453
454    /// Atomically prune the DAG and insert the given event as genesis.
455    async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
456        debug!(target: "event_graph::dag_prune", "Pruning DAG...");
457
458        // Acquire exclusive locks to unreferenced_tips, broadcasted_ids and
459        // current_genesis while this operation is happening. We do this to
460        // ensure that during the pruning operation, no other operations are
461        // able to access the intermediate state which could lead to producing
462        // the wrong state after pruning.
463        let mut unreferenced_tips = self.unreferenced_tips.write().await;
464        let mut broadcasted_ids = self.broadcasted_ids.write().await;
465        let mut current_genesis = self.current_genesis.write().await;
466
467        // Atomically clear the DAG and write the new genesis event.
468        let mut batch = sled::Batch::default();
469        for key in self.dag.iter().keys() {
470            batch.remove(key.unwrap());
471        }
472        batch.insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event).await);
473
474        debug!(target: "event_graph::dag_prune", "Applying batch...");
475        if let Err(e) = self.dag.apply_batch(batch) {
476            panic!("Failed pruning DAG, sled apply_batch error: {e}");
477        }
478
479        // Clear unreferenced tips and bcast ids
480        *unreferenced_tips = BTreeMap::new();
481        unreferenced_tips.insert(0, HashSet::from([genesis_event.id()]));
482        *current_genesis = genesis_event;
483        *broadcasted_ids = HashSet::new();
484        drop(unreferenced_tips);
485        drop(broadcasted_ids);
486        drop(current_genesis);
487
488        debug!(target: "event_graph::dag_prune", "DAG pruned successfully");
489        Ok(())
490    }
491
492    /// Background task periodically pruning the DAG.
493    async fn dag_prune_task(self: Arc<Self>, days_rotation: u64) -> Result<()> {
494        // The DAG should periodically be pruned. This can be a configurable
495        // parameter. By pruning, we should deterministically replace the
496        // genesis event (can use a deterministic timestamp) and drop everything
497        // in the DAG, leaving just the new genesis event.
498        debug!(target: "event_graph::dag_prune_task", "Spawned background DAG pruning task");
499
500        loop {
501            // Find the next rotation timestamp:
502            let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
503
504            // Prepare the new genesis event
505            let current_genesis = Event {
506                timestamp: next_rotation,
507                content: GENESIS_CONTENTS.to_vec(),
508                parents: [NULL_ID; N_EVENT_PARENTS],
509                layer: 0,
510            };
511
512            // Sleep until it's time to rotate.
513            let s = millis_until_next_rotation(next_rotation);
514
515            debug!(target: "event_graph::dag_prune_task", "Sleeping {s}ms until next DAG prune");
516            msleep(s).await;
517            debug!(target: "event_graph::dag_prune_task", "Rotation period reached");
518
519            // Trigger DAG prune
520            self.dag_prune(current_genesis).await?;
521        }
522    }
523
524    /// Atomically insert given events into the DAG and return the event IDs.
525    /// All provided events must be valid. An overlay is used over the DAG tree,
526    /// temporary writting each event in order. After all events have been
527    /// validated and inserted successfully, we write the overlay to sled.
528    /// This will append the new events into the unreferenced tips set, and
529    /// remove the events' parents from it. It will also append the events'
530    /// level-1 parents to the `broadcasted_ids` set, so the P2P protocol
531    /// knows that any requests for them are actually legitimate.
532    /// TODO: The `broadcasted_ids` set should periodically be pruned, when
533    /// some sensible time has passed after broadcasting the event.
534    pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> {
535        // Sanity check
536        if events.is_empty() {
537            return Ok(vec![])
538        }
539
540        // Acquire exclusive locks to `unreferenced_tips and broadcasted_ids`
541        let mut unreferenced_tips = self.unreferenced_tips.write().await;
542        let mut broadcasted_ids = self.broadcasted_ids.write().await;
543
544        // Here we keep the IDs to return
545        let mut ids = Vec::with_capacity(events.len());
546
547        // Create an overlay over the DAG tree
548        let mut overlay = SledTreeOverlay::new(&self.dag);
549
550        // Grab genesis timestamp
551        let genesis_timestamp = self.current_genesis.read().await.timestamp;
552
553        // Iterate over given events to validate them and
554        // write them to the overlay
555        for event in events {
556            let event_id = event.id();
557            debug!(
558                target: "event_graph::dag_insert",
559                "Inserting event {event_id} into the DAG"
560            );
561
562            if !event
563                .validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay))
564                .await?
565            {
566                error!(target: "event_graph::dag_insert", "Event {event_id} is invalid!");
567                return Err(Error::EventIsInvalid)
568            }
569
570            let event_se = serialize_async(event).await;
571
572            // Add the event to the overlay
573            overlay.insert(event_id.as_bytes(), &event_se)?;
574
575            if self.replay_mode {
576                replayer_log(&self.datastore, "insert".to_owned(), event_se)?;
577            }
578            // Note down the event ID to return
579            ids.push(event_id);
580        }
581
582        // Aggregate changes into a single batch
583        let batch = overlay.aggregate().unwrap();
584
585        // Atomically apply the batch.
586        // Panic if something is corrupted.
587        if let Err(e) = self.dag.apply_batch(batch) {
588            panic!("Failed applying dag_insert batch to sled: {e}");
589        }
590
591        // Iterate over given events to update references and
592        // send out notifications about them
593        for event in events {
594            let event_id = event.id();
595
596            // Update the unreferenced DAG tips set
597            debug!(
598                target: "event_graph::dag_insert",
599                "Event {event_id} parents {:#?}", event.parents,
600            );
601            for parent_id in event.parents.iter() {
602                if parent_id != &NULL_ID {
603                    debug!(
604                        target: "event_graph::dag_insert",
605                        "Removing {parent_id} from unreferenced_tips"
606                    );
607
608                    // Iterate over unreferenced tips in previous layers
609                    // and remove the parent
610                    // NOTE: this might be too exhaustive, but the
611                    // assumption is that previous layers unreferenced
612                    // tips will be few.
613                    for (layer, tips) in unreferenced_tips.iter_mut() {
614                        if layer >= &event.layer {
615                            continue
616                        }
617                        tips.remove(parent_id);
618                    }
619                    broadcasted_ids.insert(*parent_id);
620                }
621            }
622            unreferenced_tips.retain(|_, tips| !tips.is_empty());
623            debug!(
624                target: "event_graph::dag_insert",
625                "Adding {event_id} to unreferenced tips"
626            );
627
628            if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) {
629                layer_tips.insert(event_id);
630            } else {
631                let mut layer_tips = HashSet::new();
632                layer_tips.insert(event_id);
633                unreferenced_tips.insert(event.layer, layer_tips);
634            }
635
636            // Send out notifications about the new event
637            self.event_pub.notify(event.clone()).await;
638        }
639
640        // Drop the exclusive locks
641        drop(unreferenced_tips);
642        drop(broadcasted_ids);
643
644        Ok(ids)
645    }
646
647    /// Fetch an event from the DAG
648    pub async fn dag_get(&self, event_id: &blake3::Hash) -> Result<Option<Event>> {
649        let Some(bytes) = self.dag.get(event_id.as_bytes())? else { return Ok(None) };
650        let event: Event = deserialize_async(&bytes).await?;
651
652        Ok(Some(event))
653    }
654
655    /// Get next layer along with its N_EVENT_PARENTS from the unreferenced
656    /// tips of the DAG. Since tips are mapped by their layer, we go backwards
657    /// until we fill the vector, ensuring we always use latest layers tips as
658    /// parents.
659    async fn get_next_layer_with_parents(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
660        let unreferenced_tips = self.unreferenced_tips.read().await;
661
662        let mut parents = [NULL_ID; N_EVENT_PARENTS];
663        let mut index = 0;
664        'outer: for (_, tips) in unreferenced_tips.iter().rev() {
665            for tip in tips.iter() {
666                parents[index] = *tip;
667                index += 1;
668                if index >= N_EVENT_PARENTS {
669                    break 'outer
670                }
671            }
672        }
673
674        let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
675
676        assert!(parents.iter().any(|x| x != &NULL_ID));
677        (next_layer, parents)
678    }
679
680    /// Find the unreferenced tips in the current DAG state, mapped by their layers.
681    async fn find_unreferenced_tips(&self) -> BTreeMap<u64, HashSet<blake3::Hash>> {
682        // First get all the event IDs
683        let mut tips = HashSet::new();
684        for iter_elem in self.dag.iter() {
685            let (id, _) = iter_elem.unwrap();
686            let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
687            tips.insert(id);
688        }
689
690        // Iterate again to find unreferenced IDs
691        for iter_elem in self.dag.iter() {
692            let (_, event) = iter_elem.unwrap();
693            let event: Event = deserialize_async(&event).await.unwrap();
694            for parent in event.parents.iter() {
695                tips.remove(parent);
696            }
697        }
698
699        // Build the layers map
700        let mut map: BTreeMap<u64, HashSet<blake3::Hash>> = BTreeMap::new();
701        for tip in tips {
702            let event = self.dag_get(&tip).await.unwrap().unwrap();
703            if let Some(layer_tips) = map.get_mut(&event.layer) {
704                layer_tips.insert(tip);
705            } else {
706                let mut layer_tips = HashSet::new();
707                layer_tips.insert(tip);
708                map.insert(event.layer, layer_tips);
709            }
710        }
711
712        map
713    }
714
715    /// Internal function used for DAG sorting.
716    async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
717        let (_, tips) = self.get_next_layer_with_parents().await;
718
719        // Convert the hash to BigUint for sorting
720        let mut sorted: Vec<_> =
721            tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
722        sorted.sort_unstable();
723
724        // Convert back to blake3
725        let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
726        for (i, id) in sorted.iter().enumerate() {
727            let mut bytes = id.to_bytes_be();
728
729            // Ensure we have 32 bytes
730            while bytes.len() < blake3::OUT_LEN {
731                bytes.insert(0, 0);
732            }
733
734            tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap());
735        }
736
737        tips_sorted
738    }
739
740    /// Perform a topological sort of the DAG.
741    pub async fn order_events(&self) -> Vec<Event> {
742        let mut ordered_events = VecDeque::new();
743        let mut visited = HashSet::new();
744
745        for tip in self.get_unreferenced_tips_sorted().await {
746            if !visited.contains(&tip) && tip != NULL_ID {
747                let tip = self.dag_get(&tip).await.unwrap().unwrap();
748                ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await);
749            }
750        }
751
752        let mut ord_events_vec = ordered_events.make_contiguous().to_vec();
753        // Order events based on thier layer numbers, or based on timestamp if they are equal
754        ord_events_vec
755            .sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.timestamp.cmp(&a.1.timestamp)));
756
757        ord_events_vec.iter().map(|a| a.1.clone()).collect::<Vec<Event>>()
758    }
759
760    /// We do a non-recursive DFS (<https://en.wikipedia.org/wiki/Depth-first_search>),
761    /// and additionally we consider the timestamps.
762    async fn dfs_topological_sort(
763        &self,
764        event: Event,
765        visited: &mut HashSet<blake3::Hash>,
766    ) -> VecDeque<(u64, Event)> {
767        let mut ordered_events = VecDeque::new();
768        let mut stack = VecDeque::new();
769        let event_id = event.id();
770        stack.push_back(event_id);
771
772        while let Some(event_id) = stack.pop_front() {
773            if !visited.contains(&event_id) && event_id != NULL_ID {
774                visited.insert(event_id);
775                if let Some(event) = self.dag_get(&event_id).await.unwrap() {
776                    for parent in event.parents.iter() {
777                        stack.push_back(*parent);
778                    }
779
780                    ordered_events.push_back((event.layer, event))
781                }
782            }
783        }
784
785        ordered_events
786    }
787
788    /// Enable graph debugging
789    pub async fn deg_enable(&self) {
790        *self.deg_enabled.write().await = true;
791        warn!("[EVENTGRAPH] Graph debugging enabled!");
792    }
793
794    /// Disable graph debugging
795    pub async fn deg_disable(&self) {
796        *self.deg_enabled.write().await = false;
797        warn!("[EVENTGRAPH] Graph debugging disabled!");
798    }
799
800    /// Subscribe to deg events
801    pub async fn deg_subscribe(&self) -> Subscription<DegEvent> {
802        self.deg_publisher.clone().subscribe().await
803    }
804
805    /// Send a deg notification over the publisher
806    pub async fn deg_notify(&self, event: DegEvent) {
807        self.deg_publisher.notify(event).await;
808    }
809
810    #[cfg(feature = "rpc")]
811    pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult {
812        let mut graph = HashMap::new();
813        for iter_elem in self.dag.iter() {
814            let (id, val) = iter_elem.unwrap();
815            let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
816            let val: Event = deserialize_async(&val).await.unwrap();
817            graph.insert(id, val);
818        }
819
820        let json_graph = graph
821            .into_iter()
822            .map(|(k, v)| {
823                let key = k.to_string();
824                let value = JsonValue::from(v);
825                (key, value)
826            })
827            .collect();
828        let values = json_map([("dag", JsonValue::Object(json_graph))]);
829
830        let result = JsonValue::Object(HashMap::from([("eventgraph_info".to_string(), values)]));
831
832        JsonResponse::new(result, id).into()
833    }
834
835    /// Fetch all the events that are on a higher layers than the
836    /// provided ones.
837    pub async fn fetch_successors_of(
838        &self,
839        tips: BTreeMap<u64, HashSet<blake3::Hash>>,
840    ) -> Result<Vec<Event>> {
841        debug!(
842             target: "event_graph::fetch_successors_of",
843             "fetching successors of {tips:?}"
844        );
845
846        let mut graph = HashMap::new();
847        for iter_elem in self.dag.iter() {
848            let (id, val) = iter_elem.unwrap();
849            let hash = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
850            let event: Event = deserialize_async(&val).await.unwrap();
851            graph.insert(hash, event);
852        }
853
854        let mut result = vec![];
855
856        'outer: for tip in tips.iter() {
857            for i in tip.1.iter() {
858                if !graph.contains_key(i) {
859                    continue 'outer;
860                }
861            }
862
863            for (_, ev) in graph.iter() {
864                if ev.layer > *tip.0 && !result.contains(ev) {
865                    result.push(ev.clone())
866                }
867            }
868        }
869
870        result.sort_by(|a, b| a.layer.cmp(&b.layer));
871
872        Ok(result)
873    }
874}