darkfi/event_graph/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19// use async_std::stream::from_iter;
20use std::{
21    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
22    path::PathBuf,
23    str::FromStr,
24    sync::Arc,
25};
26
27// use futures::stream::FuturesOrdered;
28use blake3::Hash;
29use darkfi_sdk::crypto::smt::{MemoryStorageFp, PoseidonFp, SmtMemoryFp, EMPTY_NODES_FP};
30use darkfi_serial::{deserialize_async, serialize_async};
31use event::Header;
32use futures::{
33    // future,
34    stream::FuturesUnordered,
35    StreamExt,
36};
37use num_bigint::BigUint;
38use sled_overlay::{sled, SledTreeOverlay};
39use smol::{
40    lock::{OnceCell, RwLock},
41    Executor,
42};
43use tracing::{debug, error, info, warn};
44use url::Url;
45
46use crate::{
47    event_graph::{
48        proto::StaticPut,
49        util::{next_hour_timestamp, next_rotation_timestamp, replayer_log},
50    },
51    net::{channel::Channel, P2pPtr},
52    system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
53    Error, Result,
54};
55
56#[cfg(feature = "rpc")]
57use {
58    crate::rpc::{
59        jsonrpc::{JsonResponse, JsonResult},
60        util::json_map,
61    },
62    tinyjson::JsonValue::{self},
63};
64
65/// An event graph event
66pub mod event;
67pub use event::Event;
68
69/// P2P protocol implementation for the Event Graph
70pub mod proto;
71use proto::{EventRep, EventReq, HeaderRep, HeaderReq, TipRep, TipReq};
72
73pub mod rln;
74//use rln::{build_register_vk, build_signal_vk, build_slash_pk, build_slash_vk};
75
76/// Utility functions
77pub mod util;
78use util::{generate_genesis, millis_until_next_rotation};
79
80// Debugging event graph
81pub mod deg;
82use deg::DegEvent;
83
84#[cfg(test)]
85mod tests;
86
87/// Initial genesis timestamp in millis (07 Sep 2023, 00:00:00 UTC)
88/// Must always be UTC midnight.
89pub const INITIAL_GENESIS: u64 = 1_694_044_800_000;
90/// Genesis event contents
91pub const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
92
93/// The number of parents an event is supposed to have.
94pub const N_EVENT_PARENTS: usize = 5;
95/// Allowed timestamp drift in milliseconds
96const EVENT_TIME_DRIFT: u64 = 60_000;
97/// Null event ID
98pub const NULL_ID: Hash = Hash::from_bytes([0x00; blake3::OUT_LEN]);
99/// Null parents
100pub const NULL_PARENTS: [Hash; N_EVENT_PARENTS] = [NULL_ID; N_EVENT_PARENTS];
101
102/// Maximum number of DAGs to store, this should be configurable
103pub const DAGS_MAX_NUMBER: i8 = 24;
104
105/// Atomic pointer to an [`EventGraph`] instance.
106pub type EventGraphPtr = Arc<EventGraph>;
107pub type LayerUTips = BTreeMap<u64, HashSet<blake3::Hash>>;
108
109#[derive(Clone)]
110pub struct DAGStore {
111    db: sled::Db,
112    header_dags: BTreeMap<u64, (sled::Tree, LayerUTips)>,
113    main_dags: BTreeMap<u64, (sled::Tree, LayerUTips)>,
114}
115
116impl DAGStore {
117    pub async fn new(&self, sled_db: sled::Db, hours_rotation: u64) -> Self {
118        let mut considered_trees = BTreeMap::new();
119        let mut considered_header_trees = BTreeMap::new();
120        if hours_rotation > 0 {
121            // Create previous genesises if not existing, since they are deterministic.
122            for i in 1..=DAGS_MAX_NUMBER {
123                let i_hours_ago = next_hour_timestamp((i - DAGS_MAX_NUMBER).into());
124                let header = Header {
125                    timestamp: i_hours_ago,
126                    parents: [NULL_ID; N_EVENT_PARENTS],
127                    layer: 0,
128                };
129                let genesis = Event { header, content: GENESIS_CONTENTS.to_vec() };
130
131                let tree_name = genesis.header.timestamp.to_string();
132                let hdr_tree_name = format!("headers_{tree_name}");
133                let hdr_dag = sled_db.open_tree(hdr_tree_name).unwrap();
134                let dag = sled_db.open_tree(tree_name).unwrap();
135
136                if hdr_dag.is_empty() {
137                    let mut overlay = SledTreeOverlay::new(&hdr_dag);
138
139                    let header_se = serialize_async(&genesis.header).await;
140
141                    // Add the header to the overlay
142                    overlay.insert(genesis.id().as_bytes(), &header_se).unwrap();
143
144                    // Aggregate changes into a single batch
145                    let batch = overlay.aggregate().unwrap();
146
147                    // Atomically apply the batch.
148                    // Panic if something is corrupted.
149                    if let Err(e) = hdr_dag.apply_batch(batch) {
150                        panic!("Failed applying header_dag_insert batch to sled: {}", e);
151                    }
152                }
153                if dag.is_empty() {
154                    let mut overlay = SledTreeOverlay::new(&dag);
155
156                    let event_se = serialize_async(&genesis).await;
157
158                    // Add the event to the overlay
159                    overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
160
161                    // Aggregate changes into a single batch
162                    let batch = overlay.aggregate().unwrap();
163
164                    // Atomically apply the batch.
165                    // Panic if something is corrupted.
166                    if let Err(e) = dag.apply_batch(batch) {
167                        panic!("Failed applying dag_insert batch to sled: {}", e);
168                    }
169                }
170                let utips = self.find_unreferenced_tips(&dag).await;
171                considered_header_trees.insert(genesis.header.timestamp, (hdr_dag, utips.clone()));
172                considered_trees.insert(genesis.header.timestamp, (dag, utips));
173            }
174        } else {
175            let genesis = generate_genesis(0);
176            let tree_name = genesis.header.timestamp.to_string();
177            let hdr_tree_name = format!("headers_{tree_name}");
178            let hdr_dag = sled_db.open_tree(hdr_tree_name).unwrap();
179            let dag = sled_db.open_tree(tree_name).unwrap();
180            if hdr_dag.is_empty() {
181                let mut overlay = SledTreeOverlay::new(&hdr_dag);
182
183                let header_se = serialize_async(&genesis.header).await;
184
185                // Add the header to the overlay
186                overlay.insert(genesis.id().as_bytes(), &header_se).unwrap();
187
188                // Aggregate changes into a single batch
189                let batch = overlay.aggregate().unwrap();
190
191                // Atomically apply the batch.
192                // Panic if something is corrupted.
193                if let Err(e) = hdr_dag.apply_batch(batch) {
194                    panic!("Failed applying header_dag_insert batch to sled: {}", e);
195                }
196            }
197            if dag.is_empty() {
198                let mut overlay = SledTreeOverlay::new(&dag);
199
200                let event_se = serialize_async(&genesis).await;
201
202                // Add the event to the overlay
203                overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
204
205                // Aggregate changes into a single batch
206                let batch = overlay.aggregate().unwrap();
207
208                // Atomically apply the batch.
209                // Panic if something is corrupted.
210                if let Err(e) = dag.apply_batch(batch) {
211                    panic!("Failed applying dag_insert batch to sled: {}", e);
212                }
213            }
214            let utips = self.find_unreferenced_tips(&dag).await;
215            considered_header_trees.insert(genesis.header.timestamp, (hdr_dag, utips.clone()));
216            considered_trees.insert(genesis.header.timestamp, (dag, utips));
217        }
218
219        Self { db: sled_db, header_dags: considered_header_trees, main_dags: considered_trees }
220    }
221
222    /// Adds a DAG into the set of DAGs and drops the oldest one if exeeding DAGS_MAX_NUMBER,
223    /// This is called if prune_task activates.
224    pub async fn add_dag(&mut self, dag_name: &str, genesis_event: &Event) {
225        debug!("add_dag::dags: {}", self.main_dags.len());
226        if self.main_dags.len() != self.header_dags.len() {
227            panic!("main dags length is not the same as header dags")
228        }
229
230        if self.main_dags.len() >= DAGS_MAX_NUMBER.try_into().unwrap() {
231            debug!("[EVENTGRAPH] dropping oldest dag");
232            let (oldest_hdr_tree, _) = self.header_dags.pop_first().unwrap().1;
233            let (oldest_tree, _) = self.main_dags.pop_first().unwrap().1;
234
235            self.db.drop_tree(oldest_hdr_tree.name()).unwrap();
236            self.db.drop_tree(oldest_tree.name()).unwrap();
237        }
238
239        // Insert genesis
240        let hdr_tree_name = format!("headers_{dag_name}");
241        let hdr_dag = self.get_dag(&hdr_tree_name);
242        hdr_dag
243            .insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event.header).await)
244            .unwrap();
245
246        let dag = self.get_dag(dag_name);
247        dag.insert(genesis_event.id().as_bytes(), serialize_async(genesis_event).await).unwrap();
248        let utips = self.find_unreferenced_tips(&dag).await;
249        self.header_dags.insert(genesis_event.header.timestamp, (hdr_dag, utips.clone()));
250        self.main_dags.insert(genesis_event.header.timestamp, (dag, utips));
251    }
252
253    // Get a DAG providing its name.
254    pub fn get_dag(&self, dag_name: &str) -> sled::Tree {
255        self.db.open_tree(dag_name).unwrap()
256    }
257
258    /// Get {count} many DAGs.
259    pub async fn get_dags(&self, count: usize) -> Vec<sled::Tree> {
260        let sorted_dags = self.sort_dags().await;
261        sorted_dags.into_iter().take(count).collect()
262    }
263
264    /// Sort DAGs chronologically
265    async fn sort_dags(&self) -> Vec<sled::Tree> {
266        let mut dags = self
267            .main_dags
268            .iter()
269            .map(|x| {
270                let trees = x.1;
271                trees.0.clone()
272            })
273            .collect::<Vec<_>>();
274        // The BtreeMap stores from oldest to newest, so reverse it to make it chronological
275        dags.reverse();
276
277        dags
278    }
279
280    /// Find the unreferenced tips in the current DAG state, mapped by their layers.
281    async fn find_unreferenced_tips(&self, dag: &sled::Tree) -> LayerUTips {
282        // First get all the event IDs
283        let mut tips = HashSet::new();
284        for iter_elem in dag.iter() {
285            let (id, _) = iter_elem.unwrap();
286            let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
287            tips.insert(id);
288        }
289        // Iterate again to find unreferenced IDs
290        for iter_elem in dag.iter() {
291            let (_, event) = iter_elem.unwrap();
292            let event: Event = deserialize_async(&event).await.unwrap();
293            for parent in event.header.parents.iter() {
294                tips.remove(parent);
295            }
296        }
297        // Build the layers map
298        let mut map: LayerUTips = BTreeMap::new();
299        for tip in tips {
300            let event = self.fetch_event_from_dag(&tip, dag).await.unwrap().unwrap();
301            if let Some(layer_tips) = map.get_mut(&event.header.layer) {
302                layer_tips.insert(tip);
303            } else {
304                let mut layer_tips = HashSet::new();
305                layer_tips.insert(tip);
306                map.insert(event.header.layer, layer_tips);
307            }
308        }
309
310        map
311    }
312
313    /// Fetch an event from the DAG
314    pub async fn fetch_event_from_dag(
315        &self,
316        event_id: &blake3::Hash,
317        dag: &sled::Tree,
318    ) -> Result<Option<Event>> {
319        let Some(bytes) = dag.get(event_id.as_bytes())? else { return Ok(None) };
320
321        let event: Event = deserialize_async(&bytes).await?;
322
323        Ok(Some(event))
324    }
325}
326
327enum PeerStatus {
328    Free,
329    Busy,
330    Failed,
331}
332
333/// An Event Graph instance
334pub struct EventGraph {
335    /// Pointer to the P2P network instance
336    p2p: P2pPtr,
337    /// Sled tree containing the headers
338    dag_store: RwLock<DAGStore>,
339    /// Static DAG
340    static_dag: sled::Tree,
341    /// Replay logs path.
342    datastore: PathBuf,
343    /// Run in replay_mode where if set we log Sled DB instructions
344    /// into `datastore`, useful to reacreate a faulty DAG to debug.
345    replay_mode: bool,
346    /// A `HashSet` containg event IDs and their 1-level parents.
347    /// These come from the events we've sent out using `EventPut`.
348    /// They are used with `EventReq` to decide if we should reply
349    /// or not. Additionally it is also used when we broadcast the
350    /// `TipRep` message telling peers about our unreferenced tips.
351    broadcasted_ids: RwLock<HashSet<Hash>>,
352    /// DAG Pruning Task
353    pub prune_task: OnceCell<StoppableTaskPtr>,
354    /// Event publisher, this notifies whenever an event is
355    /// inserted into the DAG
356    pub event_pub: PublisherPtr<Event>,
357    /// Static Event publisher, this notifies whenever a static event is
358    /// inserted into the static DAG
359    pub static_pub: PublisherPtr<Event>,
360    /// Current genesis event
361    pub current_genesis: RwLock<Event>,
362    /// Currently configured DAG rotation, in hours
363    hours_rotation: u64,
364    /// Flag signalling DAG has finished initial sync
365    pub synced: RwLock<bool>,
366    /// Enable graph debugging
367    pub deg_enabled: RwLock<bool>,
368    /// The publisher for which we can give deg info over
369    deg_publisher: PublisherPtr<DegEvent>,
370    /// Run in fast mode where if set we sync only headers.
371    fast_mode: bool,
372    /// sled DB
373    sled_db: sled::Db,
374    /// RLN identity storage
375    pub rln_identity_tree: RwLock<SmtMemoryFp>,
376}
377
378impl EventGraph {
379    /// Create a new [`EventGraph`] instance, creates a new Genesis
380    /// event and checks if it
381    /// is containd in DAG, if not prunes DAG, may also start a pruning
382    /// task based on `hours_rotation`, and return an atomic instance of
383    /// `Self`
384    /// * `p2p` atomic pointer to p2p.
385    /// * `sled_db` sled DB instance.
386    /// * `datastore` path where we should log db instrucion if run in
387    ///   replay mode.
388    /// * `replay_mode` set the flag to keep a log of db instructions.
389    /// * `hours_rotation` marks the lifetime of the DAG before it's
390    ///   pruned.
391    pub async fn new(
392        p2p: P2pPtr,
393        sled_db: sled::Db,
394        datastore: PathBuf,
395        replay_mode: bool,
396        fast_mode: bool,
397        hours_rotation: u64,
398        ex: Arc<Executor<'_>>,
399    ) -> Result<EventGraphPtr> {
400        /*
401        let _register_vk = build_register_vk(&sled_db)?;
402        let _signal_vk = build_signal_vk(&sled_db)?;
403        let _slash_pk = build_slash_pk(&sled_db)?;
404        let _slash_vk = build_slash_vk(&sled_db)?;
405        */
406
407        let hasher = PoseidonFp::new();
408        // let store = AccountStorage::new(&sled_db, "name".to_owned());
409        let store = MemoryStorageFp::new();
410        let identity_tree = SmtMemoryFp::new(store, hasher.clone(), &EMPTY_NODES_FP);
411
412        let broadcasted_ids = RwLock::new(HashSet::new());
413        let event_pub = Publisher::new();
414        let static_pub = Publisher::new();
415
416        // Create the current genesis event based on the `hours_rotation`
417        let current_genesis = generate_genesis(hours_rotation);
418        let current_dag_tree_name = current_genesis.header.timestamp.to_string();
419        let dag_store = DAGStore {
420            db: sled_db.clone(),
421            header_dags: BTreeMap::default(),
422            main_dags: BTreeMap::default(),
423        }
424        .new(sled_db.clone(), hours_rotation)
425        .await;
426
427        let static_dag = Self::static_new(&sled_db).await?;
428
429        let self_ = Arc::new(Self {
430            p2p,
431            dag_store: RwLock::new(dag_store.clone()),
432            static_dag,
433            datastore,
434            replay_mode,
435            fast_mode,
436            broadcasted_ids,
437            prune_task: OnceCell::new(),
438            event_pub,
439            static_pub,
440            current_genesis: RwLock::new(current_genesis.clone()),
441            hours_rotation,
442            synced: RwLock::new(false),
443            deg_enabled: RwLock::new(false),
444            deg_publisher: Publisher::new(),
445            sled_db,
446            rln_identity_tree: RwLock::new(identity_tree),
447        });
448
449        // Check if we have it in our DAG.
450        // If not, we can prune the DAG and insert this new genesis event.
451        let dag = dag_store.get_dag(&current_dag_tree_name);
452        if !dag.contains_key(current_genesis.id().as_bytes())? {
453            info!(
454                target: "event_graph::new",
455                "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
456            );
457            self_.dag_prune(current_genesis).await?;
458        }
459
460        // Spawn the DAG pruning task
461        if hours_rotation > 0 {
462            let prune_task = StoppableTask::new();
463            let _ = self_.prune_task.set(prune_task.clone()).await;
464
465            prune_task.clone().start(
466                 self_.clone().dag_prune_task(hours_rotation),
467                 |res| async move {
468                     match res {
469                         Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
470                         Err(e) => error!(target: "event_graph::_handle_stop", "[EVENTGRAPH] Failed stopping prune task: {e}")
471                     }
472                 },
473                 Error::DetachedTaskStopped,
474                 ex.clone(),
475             );
476        }
477
478        Ok(self_)
479    }
480
481    pub fn hours_rotation(&self) -> u64 {
482        self.hours_rotation
483    }
484
485    /// Sync the DAG from connected peers
486    pub async fn dag_sync(&self, dag: sled::Tree, fast_mode: bool) -> Result<()> {
487        // We do an optimistic sync where we ask all our connected peers for
488        // the latest layer DAG tips (unreferenced events) and then we accept
489        // the ones we see the most times.
490        // * Compare received tips with local ones, identify which we are missing.
491        // * Request these from peers
492        // * Recursively request these backward
493        //
494        // Verification:
495        // * Timestamps should go backwards
496        // * Cross-check with multiple peers, this means we should request the
497        //   same event from multiple peers and make sure it is the same.
498        // * Since we should be pruning, if we're not synced after some reasonable
499        //   amount of iterations, these could be faulty peers and we can try again
500        //   from the beginning
501
502        let dag_name = String::from_utf8_lossy(&dag.name()).to_string();
503
504        // Get references to all our peers.
505        let channels = self.p2p.hosts().peers();
506        let mut communicated_peers = channels.len();
507        info!(
508            target: "event_graph::dag_sync",
509            "[EVENTGRAPH] Syncing DAG from {communicated_peers} peers..."
510        );
511
512        let comms_timeout = self.p2p.settings().read().await.outbound_connect_timeout_max();
513
514        // Here we keep track of the tips, their layers and how many time we've seen them.
515        let mut tips: HashMap<Hash, (u64, usize)> = HashMap::new();
516
517        // Let's first ask all of our peers for their tips and collect them
518        // in our hashmap above.
519        for channel in channels.iter() {
520            let url = channel.display_address();
521
522            let tip_rep_sub = match channel.subscribe_msg::<TipRep>().await {
523                Ok(v) => v,
524                Err(e) => {
525                    error!(
526                        target: "event_graph::dag_sync",
527                        "[EVENTGRAPH] Sync: Couldn't subscribe TipReq for peer {url}, skipping ({e})"
528                    );
529                    communicated_peers -= 1;
530                    continue
531                }
532            };
533
534            if let Err(e) = channel.send(&TipReq(dag_name.clone())).await {
535                error!(
536                    target: "event_graph::dag_sync",
537                    "[EVENTGRAPH] Sync: Couldn't contact peer {url}, skipping ({e})"
538                );
539                communicated_peers -= 1;
540                continue
541            };
542
543            // Node waits for response
544            let Ok(peer_tips) = tip_rep_sub.receive_with_timeout(comms_timeout).await else {
545                error!(
546                    target: "event_graph::dag_sync",
547                    "[EVENTGRAPH] Sync: Peer {url} didn't reply with tips in time, skipping"
548                );
549                communicated_peers -= 1;
550                continue
551            };
552
553            let peer_tips: &BTreeMap<u64, HashSet<Hash>> = &peer_tips.0;
554
555            // Note down the seen tips
556            for (layer, layer_tips) in peer_tips {
557                for tip in layer_tips {
558                    if let Some(seen_tip) = tips.get_mut(tip) {
559                        seen_tip.1 += 1;
560                    } else {
561                        tips.insert(*tip, (*layer, 1));
562                    }
563                }
564            }
565        }
566
567        // After we've communicated all the peers, let's see what happened.
568        if tips.is_empty() {
569            error!(
570                target: "event_graph::dag_sync",
571                "[EVENTGRAPH] Sync: Could not find any DAG tips",
572            );
573            return Err(Error::DagSyncFailed)
574        }
575
576        // We know the number of peers we've communicated with,
577        // so we will consider events we saw at more than 2/3 of
578        // those peers.
579        let consideration_threshold = communicated_peers * 2 / 3;
580        let mut considered_tips = HashSet::new();
581        for (tip, (_, amount)) in tips.iter() {
582            if amount > &consideration_threshold {
583                considered_tips.insert(*tip);
584            }
585        }
586        drop(tips);
587
588        // Check if already in sync.
589        let mut missing_parents = HashSet::new();
590        for tip in considered_tips.iter() {
591            assert!(tip != &NULL_ID);
592
593            if !dag.contains_key(tip.as_bytes()).unwrap() {
594                missing_parents.insert(*tip);
595            }
596        }
597
598        if missing_parents.is_empty() {
599            info!(target: "event_graph::dag_sync", "[EVENTGRAPH] DAG synced successfully!");
600            return Ok(())
601        }
602        let hdr_tree_name = format!("headers_{dag_name}");
603        let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
604        let dag_timestamp = u64::from_str(&dag_name)?;
605        let our_tips =
606            self.dag_store.read().await.header_dags.get(&dag_timestamp).unwrap().1.clone();
607        let mut headers_requests = FuturesUnordered::new();
608        for channel in channels.iter() {
609            headers_requests.push(request_header(
610                channel,
611                dag_name.clone(),
612                our_tips.clone(),
613                comms_timeout,
614            ))
615        }
616
617        while let Some(peer_headers) = headers_requests.next().await {
618            self.header_dag_insert(peer_headers?, &dag_name).await?
619        }
620
621        // start download payload
622        if !fast_mode {
623            info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Fetching events");
624            let mut header_sorted = vec![];
625
626            let main_dag = self.dag_store.read().await.get_dag(&dag_name);
627            for iter_elem in header_dag.iter() {
628                let (hash_bytes, val) = iter_elem.unwrap();
629                let val: Header = deserialize_async(&val).await.unwrap();
630                if val.parents != NULL_PARENTS && !main_dag.contains_key(hash_bytes)? {
631                    header_sorted.push(val);
632                }
633            }
634            header_sorted.sort_by(|x, y| x.layer.cmp(&y.layer));
635
636            info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Retrieving {} Events", header_sorted.len());
637            // Implement parallel download of events with a batch size
638            let batch = 20;
639            // Mapping of the chunk group id to the chunk, using a BTreeMap help us to
640            // prioritize the older headers when our request fails and we retry
641            let mut chunks: BTreeMap<usize, Vec<blake3::Hash>> = BTreeMap::new();
642            for (i, chunk) in header_sorted.chunks(batch).enumerate() {
643                chunks.insert(i, chunk.iter().map(|h| h.id()).collect());
644            }
645            let mut remaining_chunk_ids: BTreeSet<usize> = chunks.keys().cloned().collect();
646
647            // Mapping of the chunk group id to the received events, using a BTreeMap help
648            // us to verify and insert the events in order
649            let mut received_events: BTreeMap<usize, Vec<Event>> = BTreeMap::new();
650            // Track peers status so that we don't send a new request to the same peer before they
651            // finish the first or send to a failed peer
652            let mut peer_status: HashMap<Url, PeerStatus> = HashMap::new();
653
654            let mut retrieved_count = 0;
655            let mut futures = FuturesUnordered::new();
656
657            while retrieved_count < header_sorted.len() {
658                // Retrieve peers in each loop so we don't send requests to a closed channel
659                let mut free_channels = vec![];
660                let mut busy_channels = 0;
661
662                self.p2p.hosts().peers().iter().for_each(|channel| {
663                    if let Some(status) = peer_status.get(channel.address()) {
664                        match status {
665                            PeerStatus::Free => free_channels.push(channel.clone()),
666                            PeerStatus::Busy => busy_channels += 1,
667                            _ => {}
668                        }
669                    } else {
670                        peer_status.insert(channel.address().clone(), PeerStatus::Free);
671                        free_channels.push(channel.clone());
672                    }
673                });
674
675                // We don't have any channels we can assign to or wait to get response from
676                if free_channels.is_empty() && busy_channels == 0 {
677                    return Err(Error::DagSyncFailed)
678                }
679
680                // We will distribute the remaining chunks to each channel
681                let requested_chunks_len =
682                    std::cmp::min(free_channels.len(), remaining_chunk_ids.len());
683                let requested_chunk_ids: Vec<usize> =
684                    remaining_chunk_ids.iter().take(requested_chunks_len).copied().collect();
685                for (i, chunk_id) in requested_chunk_ids.iter().enumerate() {
686                    futures.push(request_event(
687                        free_channels[i].clone(),
688                        chunks.get(chunk_id).unwrap().clone(),
689                        *chunk_id,
690                        comms_timeout,
691                    ));
692                    remaining_chunk_ids.remove(chunk_id);
693                    peer_status.insert(free_channels[i].address().clone(), PeerStatus::Busy);
694                }
695
696                info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Retrieving Events from {} peers", futures.len());
697                if let Some(resp) = futures.next().await {
698                    let (events, chunk_id, channel) = resp;
699                    if let Ok(events) = events {
700                        retrieved_count += events.len();
701                        received_events.insert(chunk_id, events.clone());
702                        peer_status.insert(channel.address().clone(), PeerStatus::Free);
703                    } else {
704                        remaining_chunk_ids.insert(chunk_id);
705                        peer_status.insert(channel.address().clone(), PeerStatus::Failed);
706                    }
707
708                    info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Retrieved Events: {}/{}", retrieved_count, header_sorted.len());
709                }
710            }
711
712            let mut verified_count = 0;
713            for (_, chunk) in received_events {
714                verified_count += chunk.len();
715                self.dag_insert(&chunk, &dag_name).await?;
716                info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Verified Events: {}/{}", verified_count, retrieved_count);
717            }
718        }
719        // <-- end download payload
720
721        info!(target: "event_graph::dag_sync", "[EVENTGRAPH] DAG synced successfully!");
722        Ok(())
723    }
724
725    /// Choose how many dags to sync
726    pub async fn sync_selected(&self, count: usize, fast_mode: bool) -> Result<()> {
727        let mut dags_to_sync = self.dag_store.read().await.get_dags(count).await;
728        // Since get_dags() return sorted dags in reverse
729        dags_to_sync.reverse();
730        for dag in dags_to_sync {
731            match self.dag_sync(dag, fast_mode).await {
732                Ok(()) => continue,
733                Err(e) => return Err(e),
734            }
735        }
736
737        *self.synced.write().await = true;
738        Ok(())
739    }
740
741    /// Atomically prune the DAG and insert the given event as genesis.
742    async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
743        debug!(target: "event_graph::dag_prune", "Pruning DAG...");
744
745        // Acquire exclusive locks to unreferenced_tips, broadcasted_ids and
746        // current_genesis while this operation is happening. We do this to
747        // ensure that during the pruning operation, no other operations are
748        // able to access the intermediate state which could lead to producing
749        // the wrong state after pruning.
750        let mut broadcasted_ids = self.broadcasted_ids.write().await;
751        let mut current_genesis = self.current_genesis.write().await;
752
753        let dag_name = genesis_event.header.timestamp.to_string();
754        self.dag_store.write().await.add_dag(&dag_name, &genesis_event).await;
755
756        // Clear bcast ids
757        *current_genesis = genesis_event;
758        *broadcasted_ids = HashSet::new();
759        drop(broadcasted_ids);
760        drop(current_genesis);
761
762        debug!(target: "event_graph::dag_prune", "DAG pruned successfully");
763        Ok(())
764    }
765
766    /// Background task periodically pruning the DAG.
767    async fn dag_prune_task(self: Arc<Self>, hours_rotation: u64) -> Result<()> {
768        // The DAG should periodically be pruned. This can be a configurable
769        // parameter. By pruning, we should deterministically replace the
770        // genesis event (can use a deterministic timestamp) and drop everything
771        // in the DAG, leaving just the new genesis event.
772        debug!(target: "event_graph::dag_prune_task", "Spawned background DAG pruning task");
773
774        loop {
775            // Find the next rotation timestamp:
776            let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, hours_rotation);
777
778            let header =
779                Header { timestamp: next_rotation, parents: [NULL_ID; N_EVENT_PARENTS], layer: 0 };
780            // Prepare the new genesis event
781            let current_genesis = Event { header, content: GENESIS_CONTENTS.to_vec() };
782
783            // Sleep until it's time to rotate.
784            let s = millis_until_next_rotation(next_rotation);
785
786            debug!(target: "event_graph::dag_prune_task", "Sleeping {s}ms until next DAG prune");
787            msleep(s).await;
788            debug!(target: "event_graph::dag_prune_task", "Rotation period reached");
789
790            // Trigger DAG prune
791            self.dag_prune(current_genesis).await?;
792        }
793    }
794
795    /// Atomically insert given events into the DAG and return the event IDs.
796    /// All provided events must be valid. An overlay is used over the DAG tree,
797    /// temporary writting each event in order. After all events have been
798    /// validated and inserted successfully, we write the overlay to sled.
799    /// This will append the new events into the unreferenced tips set, and
800    /// remove the events' parents from it. It will also append the events'
801    /// level-1 parents to the `broadcasted_ids` set, so the P2P protocol
802    /// knows that any requests for them are actually legitimate.
803    /// TODO: The `broadcasted_ids` set should periodically be pruned, when
804    /// some sensible time has passed after broadcasting the event.
805    pub async fn dag_insert(&self, events: &[Event], dag_name: &str) -> Result<Vec<Hash>> {
806        // Sanity check
807        if events.is_empty() {
808            return Ok(vec![])
809        }
810
811        // Acquire exclusive locks to `broadcasted_ids`
812        let dag_timestamp = u64::from_str(dag_name)?;
813        let mut broadcasted_ids = self.broadcasted_ids.write().await;
814
815        let main_dag = self.dag_store.read().await.get_dag(dag_name);
816        let hdr_tree_name = format!("headers_{dag_name}");
817        let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
818
819        // Here we keep the IDs to return
820        let mut ids = Vec::with_capacity(events.len());
821
822        // Create an overlay over the DAG tree
823        let mut overlay = SledTreeOverlay::new(&main_dag);
824
825        // Iterate over given events to validate them and
826        // write them to the overlay
827        for event in events {
828            let event_id = event.id();
829            if event.header.parents == NULL_PARENTS {
830                continue
831            }
832            debug!(
833                target: "event_graph::dag_insert",
834                "Inserting event {event_id} into the DAG layer: {}", event.header.layer
835            );
836
837            // check if we already have the event
838            if main_dag.contains_key(event_id.as_bytes())? {
839                continue
840            }
841
842            // check if its header is in header's store
843            if !header_dag.contains_key(event_id.as_bytes())? {
844                continue
845            }
846
847            if !event.dag_validate(&header_dag).await? {
848                error!(target: "event_graph::dag_insert", "Event {} is invalid!", event_id);
849                return Err(Error::EventIsInvalid)
850            }
851
852            let event_se = serialize_async(event).await;
853
854            // Add the event to the overlay
855            overlay.insert(event_id.as_bytes(), &event_se)?;
856
857            if self.replay_mode {
858                replayer_log(&self.datastore, "insert".to_owned(), event_se)?;
859            }
860            // Note down the event ID to return
861            ids.push(event_id);
862        }
863
864        // Aggregate changes into a single batch
865        let batch = match overlay.aggregate() {
866            Some(x) => x,
867            None => return Ok(vec![]),
868        };
869
870        // Atomically apply the batch.
871        // Panic if something is corrupted.
872        if let Err(e) = main_dag.apply_batch(batch) {
873            panic!("Failed applying dag_insert batch to sled: {e}");
874        }
875
876        let mut dag_store = self.dag_store.write().await;
877        let (_, unreferenced_tips) = &mut dag_store.main_dags.get_mut(&dag_timestamp).unwrap();
878
879        // Iterate over given events to update references and
880        // send out notifications about them
881        for event in events {
882            let event_id = event.id();
883            if event.header.parents == NULL_PARENTS {
884                continue
885            }
886
887            // Update the unreferenced DAG tips set
888            debug!(
889                target: "event_graph::dag_insert",
890                "Event {event_id} parents {:#?}", event.header.parents,
891            );
892            for parent_id in event.header.parents.iter() {
893                if parent_id != &NULL_ID {
894                    debug!(
895                        target: "event_graph::dag_insert",
896                        "Removing {parent_id} from unreferenced_tips"
897                    );
898
899                    // Iterate over unreferenced tips in previous layers
900                    // and remove the parent
901                    // NOTE: this might be too exhaustive, but the
902                    // assumption is that previous layers unreferenced
903                    // tips will be few.
904                    for (layer, tips) in unreferenced_tips.iter_mut() {
905                        if layer >= &event.header.layer {
906                            continue
907                        }
908                        tips.remove(parent_id);
909                    }
910                    broadcasted_ids.insert(*parent_id);
911                }
912            }
913            unreferenced_tips.retain(|_, tips| !tips.is_empty());
914            debug!(
915                target: "event_graph::dag_insert",
916                "Adding {event_id} to unreferenced tips"
917            );
918
919            if let Some(layer_tips) = unreferenced_tips.get_mut(&event.header.layer) {
920                layer_tips.insert(event_id);
921            } else {
922                let mut layer_tips = HashSet::new();
923                layer_tips.insert(event_id);
924                unreferenced_tips.insert(event.header.layer, layer_tips);
925            }
926
927            // Send out notifications about the new event
928            self.event_pub.notify(event.clone()).await;
929        }
930
931        dag_store.header_dags.get_mut(&dag_timestamp).unwrap().1 =
932            dag_store.main_dags.get(&dag_timestamp).unwrap().1.clone();
933
934        // Drop the exclusive locks
935        drop(dag_store);
936        drop(broadcasted_ids);
937
938        Ok(ids)
939    }
940
941    pub async fn header_dag_insert(&self, headers: Vec<Header>, dag_name: &str) -> Result<()> {
942        let hdr_tree_name = format!("headers_{dag_name}");
943        let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
944        // Create an overlay over the DAG tree
945        let mut overlay = SledTreeOverlay::new(&header_dag);
946
947        let mut hdrs = headers;
948        hdrs.sort_by(|x, y| x.layer.cmp(&y.layer));
949
950        // Iterate over given events to validate them and
951        // write them to the overlay
952        for header in hdrs {
953            let header_id = header.id();
954            if header.parents == NULL_PARENTS {
955                continue
956            }
957            debug!(
958                target: "event_graph::header_dag_insert",
959                "Inserting header {} into the DAG", header_id,
960            );
961            if !header.validate(&header_dag, self.hours_rotation, Some(&overlay)).await? {
962                error!(target: "event_graph::header_dag_insert", "Header {} is invalid!", header_id);
963                return Err(Error::HeaderIsInvalid)
964            }
965            let header_se = serialize_async(&header).await;
966
967            // Add the event to the overlay
968            overlay.insert(header_id.as_bytes(), &header_se)?;
969        }
970
971        // Aggregate changes into a single batch
972        let batch = match overlay.aggregate() {
973            Some(x) => x,
974            None => return Ok(()),
975        };
976
977        // Atomically apply the batch.
978        // Panic if something is corrupted.
979        if let Err(e) = header_dag.apply_batch(batch) {
980            panic!("Failed applying dag_insert batch to sled: {}", e);
981        }
982
983        Ok(())
984    }
985
986    /// Search and fetch an event through all DAGs
987    pub async fn fetch_event_from_dags(&self, event_id: &blake3::Hash) -> Result<Option<Event>> {
988        let store = self.dag_store.read().await;
989        for tree_elem in store.main_dags.clone() {
990            let dag_name = tree_elem.0.to_string();
991            let Some(bytes) = store.get_dag(&dag_name).get(event_id.as_bytes())? else {
992                continue;
993            };
994            let event: Event = deserialize_async(&bytes).await?;
995
996            return Ok(Some(event))
997        }
998
999        Ok(None)
1000    }
1001
1002    /// Get next layer along with its N_EVENT_PARENTS from the unreferenced
1003    /// tips of the DAG. Since tips are mapped by their layer, we go backwards
1004    /// until we fill the vector, ensuring we always use latest layers tips as
1005    /// parents.
1006    async fn get_next_layer_with_parents(
1007        &self,
1008        dag_name: &u64,
1009    ) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
1010        let store = self.dag_store.read().await;
1011        let (_, unreferenced_tips) = store.header_dags.get(dag_name).unwrap();
1012
1013        let mut parents = [NULL_ID; N_EVENT_PARENTS];
1014        let mut index = 0;
1015        'outer: for (_, tips) in unreferenced_tips.iter().rev() {
1016            for tip in tips.iter() {
1017                parents[index] = *tip;
1018                index += 1;
1019                if index >= N_EVENT_PARENTS {
1020                    break 'outer;
1021                }
1022            }
1023        }
1024
1025        let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
1026
1027        assert!(parents.iter().any(|x| x != &NULL_ID));
1028        (next_layer, parents)
1029    }
1030
1031    /// Find the unreferenced tips in the current DAG state, mapped by their layers.
1032    async fn find_unreferenced_tips_static(&self, dag: &sled::Tree) -> LayerUTips {
1033        // First get all the event IDs
1034        let mut tips = HashSet::new();
1035        for iter_elem in dag.iter() {
1036            let (id, _) = iter_elem.unwrap();
1037            let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1038            tips.insert(id);
1039        }
1040        // Iterate again to find unreferenced IDs
1041        for iter_elem in dag.iter() {
1042            let (_, event) = iter_elem.unwrap();
1043            let event: Event = deserialize_async(&event).await.unwrap();
1044            for parent in event.header.parents.iter() {
1045                tips.remove(parent);
1046            }
1047        }
1048        // Build the layers map
1049        let mut map: LayerUTips = BTreeMap::new();
1050        for tip in tips {
1051            let event = self.static_fetch(&tip).await.unwrap().unwrap();
1052            if let Some(layer_tips) = map.get_mut(&event.header.layer) {
1053                layer_tips.insert(tip);
1054            } else {
1055                let mut layer_tips = HashSet::new();
1056                layer_tips.insert(tip);
1057                map.insert(event.header.layer, layer_tips);
1058            }
1059        }
1060
1061        map
1062    }
1063
1064    async fn get_next_layer_with_parents_static(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
1065        let unreferenced_tips = self.find_unreferenced_tips_static(&self.static_dag).await;
1066
1067        let mut parents = [NULL_ID; N_EVENT_PARENTS];
1068        let mut index = 0;
1069        'outer: for (_, tips) in unreferenced_tips.iter().rev() {
1070            for tip in tips.iter() {
1071                parents[index] = *tip;
1072                index += 1;
1073                if index >= N_EVENT_PARENTS {
1074                    break 'outer;
1075                }
1076            }
1077        }
1078
1079        let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
1080
1081        assert!(parents.iter().any(|x| x != &NULL_ID));
1082        (next_layer, parents)
1083    }
1084
1085    /// Internal function used for DAG sorting.
1086    async fn get_unreferenced_tips_sorted(&self) -> Vec<[blake3::Hash; N_EVENT_PARENTS]> {
1087        let mut vec_tips = vec![];
1088        let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
1089        for (i, _) in self.dag_store.read().await.header_dags.iter() {
1090            let (_, tips) = self.get_next_layer_with_parents(i).await;
1091            // Convert the hash to BigUint for sorting
1092            let mut sorted: Vec<_> =
1093                tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
1094            sorted.sort_unstable();
1095
1096            // Convert back to blake3
1097            for (i, id) in sorted.iter().enumerate() {
1098                let mut bytes = id.to_bytes_be();
1099
1100                // Ensure we have 32 bytes
1101                while bytes.len() < blake3::OUT_LEN {
1102                    bytes.insert(0, 0);
1103                }
1104
1105                tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap());
1106            }
1107
1108            vec_tips.push(tips_sorted);
1109        }
1110
1111        vec_tips
1112    }
1113
1114    /// Perform a topological sort of the DAG.
1115    pub async fn order_events(&self) -> Vec<Event> {
1116        let mut ordered_events = VecDeque::new();
1117        let mut visited = HashSet::new();
1118
1119        for i in self.get_unreferenced_tips_sorted().await {
1120            for tip in i {
1121                if !visited.contains(&tip) && tip != NULL_ID {
1122                    let tip = self.fetch_event_from_dags(&tip).await.unwrap().unwrap();
1123                    ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await);
1124                }
1125            }
1126        }
1127
1128        let mut ord_events_vec = ordered_events.make_contiguous().to_vec();
1129        // Order events by timestamp.
1130        ord_events_vec.sort_unstable_by(|a, b| a.1.header.timestamp.cmp(&b.1.header.timestamp));
1131
1132        ord_events_vec.iter().map(|a| a.1.clone()).collect::<Vec<Event>>()
1133    }
1134
1135    /// We do a non-recursive DFS (<https://en.wikipedia.org/wiki/Depth-first_search>),
1136    /// and additionally we consider the timestamps.
1137    async fn dfs_topological_sort(
1138        &self,
1139        event: Event,
1140        visited: &mut HashSet<Hash>,
1141    ) -> VecDeque<(u64, Event)> {
1142        let mut ordered_events = VecDeque::new();
1143        let mut stack = VecDeque::new();
1144        let event_id = event.id();
1145        stack.push_back(event_id);
1146
1147        while let Some(event_id) = stack.pop_front() {
1148            if !visited.contains(&event_id) && event_id != NULL_ID {
1149                visited.insert(event_id);
1150                if let Some(event) = self.fetch_event_from_dags(&event_id).await.unwrap() {
1151                    for parent in event.header.parents.iter() {
1152                        stack.push_back(*parent);
1153                    }
1154
1155                    ordered_events.push_back((event.header.layer, event))
1156                }
1157            }
1158        }
1159
1160        ordered_events
1161    }
1162
1163    /// Enable graph debugging
1164    pub async fn deg_enable(&self) {
1165        *self.deg_enabled.write().await = true;
1166        warn!("[EVENTGRAPH] Graph debugging enabled!");
1167    }
1168
1169    /// Disable graph debugging
1170    pub async fn deg_disable(&self) {
1171        *self.deg_enabled.write().await = false;
1172        warn!("[EVENTGRAPH] Graph debugging disabled!");
1173    }
1174
1175    /// Subscribe to deg events
1176    pub async fn deg_subscribe(&self) -> Subscription<DegEvent> {
1177        self.deg_publisher.clone().subscribe().await
1178    }
1179
1180    /// Send a deg notification over the publisher
1181    pub async fn deg_notify(&self, event: DegEvent) {
1182        self.deg_publisher.notify(event).await;
1183    }
1184
1185    #[cfg(feature = "rpc")]
1186    pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult {
1187        let current_genesis = self.current_genesis.read().await;
1188        let dag_name = current_genesis.header.timestamp.to_string();
1189        let mut graph = HashMap::new();
1190        for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() {
1191            let (id, val) = iter_elem.unwrap();
1192            let id = Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1193            let val: Event = deserialize_async(&val).await.unwrap();
1194            graph.insert(id, val);
1195        }
1196
1197        let json_graph = graph
1198            .into_iter()
1199            .map(|(k, v)| {
1200                let key = k.to_string();
1201                let value = JsonValue::from(v);
1202                (key, value)
1203            })
1204            .collect();
1205        let values = json_map([("dag", JsonValue::Object(json_graph))]);
1206
1207        let result = JsonValue::Object(HashMap::from([("eventgraph_info".to_string(), values)]));
1208
1209        JsonResponse::new(result, id).into()
1210    }
1211
1212    /// Fetch all events that are not ancestors of the tips
1213    pub async fn fetch_headers_with_tips(
1214        &self,
1215        dag_name: &str,
1216        tips: &LayerUTips,
1217    ) -> Result<Vec<Header>> {
1218        debug!(
1219             target: "event_graph::fetch_headers_with_tips",
1220             "fetching headers with tips {tips:?}"
1221        );
1222
1223        let tree = self.dag_store.read().await.get_dag(&format!("headers_{dag_name}"));
1224
1225        // Let's identify all the events that are ancestors of the tips so that we don't send those
1226        let mut ancestors = HashSet::new();
1227
1228        for hashes in tips.values() {
1229            for hash in hashes {
1230                ancestors.insert(*hash);
1231                let val = tree
1232                    .get(hash.as_bytes())?
1233                    .ok_or_else(|| Error::EventNotFound("The Tip is not found".to_owned()))?;
1234                let header: Header = deserialize_async(&val).await?;
1235                self.get_ancestors(&mut ancestors, header, &tree).await?
1236            }
1237        }
1238
1239        let mut result = Vec::with_capacity(tree.len() - ancestors.len());
1240        // Now find the events that are not ancestors of the tips
1241        // it is a set difference operation: unseen_events = all_events - tip_ancestors
1242        for iter_elem in tree.iter() {
1243            let (id, val) = iter_elem?;
1244            let hash = Hash::from_bytes((&id as &[u8]).try_into()?);
1245            if !ancestors.contains(&hash) {
1246                let header: Header = deserialize_async(&val).await?;
1247                result.push(header);
1248            }
1249        }
1250
1251        result.sort_unstable_by(|a, b| a.layer.cmp(&b.layer));
1252
1253        Ok(result)
1254    }
1255
1256    /// Finds all the ancestors of an event
1257    async fn get_ancestors(
1258        &self,
1259        visited: &mut HashSet<Hash>,
1260        header: Header,
1261        tree: &sled::Tree,
1262    ) -> Result<()> {
1263        let mut stack = VecDeque::new();
1264        stack.push_back(header);
1265
1266        while let Some(hdr) = stack.pop_back() {
1267            for parent in hdr.parents {
1268                if parent != NULL_ID && !visited.contains(&parent) {
1269                    visited.insert(parent);
1270                    let val = tree.get(parent.as_bytes())?.unwrap();
1271                    let header: Header = deserialize_async(&val).await?;
1272                    stack.push_back(header);
1273                }
1274            }
1275        }
1276
1277        Ok(())
1278    }
1279
1280    /// Fetch all the events that are on a higher layers than the
1281    /// provided ones.
1282    pub async fn fetch_successors_of(&self, tips: LayerUTips) -> Result<Vec<Event>> {
1283        debug!(
1284             target: "event_graph::fetch_successors_of",
1285             "fetching successors of {tips:?}"
1286        );
1287
1288        let current_genesis = self.current_genesis.read().await;
1289        let dag_name = current_genesis.header.timestamp.to_string();
1290        let mut graph = HashMap::new();
1291        for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() {
1292            let (id, val) = iter_elem.unwrap();
1293            let hash = Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1294            let event: Event = deserialize_async(&val).await.unwrap();
1295            graph.insert(hash, event);
1296        }
1297
1298        let mut result = vec![];
1299
1300        'outer: for tip in tips.iter() {
1301            for i in tip.1.iter() {
1302                if !graph.contains_key(i) {
1303                    continue 'outer;
1304                }
1305            }
1306
1307            for (_, ev) in graph.iter() {
1308                if ev.header.layer > *tip.0 && !result.contains(ev) {
1309                    result.push(ev.clone())
1310                }
1311            }
1312        }
1313
1314        result.sort_by(|a, b| a.header.layer.cmp(&b.header.layer));
1315
1316        Ok(result)
1317    }
1318
1319    pub async fn static_new(sled_db: &sled::Db) -> Result<sled::Tree> {
1320        let static_dag = sled_db.open_tree("static-dag")?;
1321
1322        let genesis = generate_genesis(0);
1323        let mut overlay = SledTreeOverlay::new(&static_dag);
1324
1325        let event_se = serialize_async(&genesis).await;
1326
1327        // Add the event to the overlay
1328        overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
1329
1330        // Aggregate changes into a single batch
1331        let batch = match overlay.aggregate() {
1332            Some(b) => b,
1333            None => return Ok(static_dag),
1334        };
1335
1336        // Atomically apply the batch.
1337        // Panic if something is corrupted.
1338        if let Err(e) = static_dag.apply_batch(batch) {
1339            panic!("Failed applying dag_insert batch to sled: {}", e);
1340        }
1341
1342        Ok(static_dag)
1343    }
1344
1345    pub async fn static_sync(&self) -> Result<()> {
1346        self.dag_sync(self.static_dag.clone(), false).await?;
1347        Ok(())
1348    }
1349
1350    pub async fn static_broadcast(&self, event: Event, blob: Vec<u8>) -> Result<()> {
1351        self.p2p.broadcast(&StaticPut(event, blob)).await;
1352        Ok(())
1353    }
1354
1355    pub async fn static_insert(&self, event: &Event) -> Result<()> {
1356        let mut overlay = SledTreeOverlay::new(&self.static_dag);
1357
1358        let event_se = serialize_async(event).await;
1359
1360        // Add the event to the overlay
1361        overlay.insert(event.id().as_bytes(), &event_se).unwrap();
1362
1363        // Aggregate changes into a single batch
1364        let batch = match overlay.aggregate() {
1365            Some(b) => b,
1366            None => return Ok(()),
1367        };
1368
1369        // Atomically apply the batch.
1370        // Panic if something is corrupted.
1371        if let Err(e) = self.static_dag.apply_batch(batch) {
1372            panic!("Failed applying dag_insert batch to sled: {}", e);
1373        }
1374
1375        // Send out notifications about the new event
1376        self.static_pub.notify(event.clone()).await;
1377
1378        Ok(())
1379    }
1380
1381    pub async fn static_fetch(&self, event_id: &Hash) -> Result<Option<Event>> {
1382        let Some(bytes) = self.static_dag.get(event_id.as_bytes())? else { return Ok(None) };
1383
1384        let event: Event = deserialize_async(&bytes).await?;
1385
1386        Ok(Some(event))
1387    }
1388
1389    pub async fn static_fetch_all(&self) -> Result<Vec<Event>> {
1390        let mut events = vec![];
1391        for iter_elem in self.static_dag.iter() {
1392            let (id, _) = iter_elem.unwrap();
1393            let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1394            let event = self.static_fetch(&id).await?.unwrap();
1395            if event.header.parents == NULL_PARENTS {
1396                continue
1397            }
1398            events.push(event);
1399        }
1400        Ok(events)
1401    }
1402
1403    pub async fn static_unreferenced_tips(&self) -> LayerUTips {
1404        // First get all the event IDs
1405        let mut tips = HashSet::new();
1406        for iter_elem in self.static_dag.iter() {
1407            let (id, _) = iter_elem.unwrap();
1408            let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1409            tips.insert(id);
1410        }
1411        // Iterate again to find unreferenced IDs
1412        for iter_elem in self.static_dag.iter() {
1413            let (_, event) = iter_elem.unwrap();
1414            let event: Event = deserialize_async(&event).await.unwrap();
1415            for parent in event.header.parents.iter() {
1416                tips.remove(parent);
1417            }
1418        }
1419        // Build the layers map
1420        let mut map: LayerUTips = BTreeMap::new();
1421        for tip in tips {
1422            let event = self.static_fetch(&tip).await.unwrap().unwrap();
1423            if let Some(layer_tips) = map.get_mut(&event.header.layer) {
1424                layer_tips.insert(tip);
1425            } else {
1426                let mut layer_tips = HashSet::new();
1427                layer_tips.insert(tip);
1428                map.insert(event.header.layer, layer_tips);
1429            }
1430        }
1431
1432        map
1433    }
1434}
1435
1436async fn request_header(
1437    peer: &Channel,
1438    tree_name: String,
1439    tips: LayerUTips,
1440    comms_timeout: u64,
1441) -> Result<Vec<Header>> {
1442    let url = peer.address();
1443
1444    let hdr_rep_sub = match peer.subscribe_msg::<HeaderRep>().await {
1445        Ok(v) => v,
1446        Err(e) => {
1447            error!(
1448                target: "event_graph::dag_sync",
1449                "[EVENTGRAPH] Sync: Couldn't subscribe HeaderReq for peer {}, skipping ({})",
1450                url, e,
1451            );
1452            return Err(Error::EventNotFound("Couldn't subscribe HeaderReq".to_owned()));
1453        }
1454    };
1455
1456    if let Err(e) = peer.send(&HeaderReq(tree_name, tips)).await {
1457        error!(
1458            target: "event_graph::dag_sync",
1459            "[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e,
1460        );
1461        return Err(Error::EventNotFound("Couldn't contact peer".to_owned()));
1462    };
1463
1464    // Node waits for response
1465    let Ok(peer_headers) = hdr_rep_sub.receive_with_timeout(comms_timeout).await else {
1466        error!(
1467            target: "event_graph::dag_sync",
1468            "[EVENTGRAPH] Sync: Peer {} didn't reply with headers in time, skipping", url,
1469        );
1470        // communicated_peers -= 1;
1471        return Err(Error::EventNotFound("Peer didn't reply with headers in time".to_owned()));
1472    };
1473
1474    hdr_rep_sub.unsubscribe().await;
1475    let peer_headers = &peer_headers.0;
1476    Ok(peer_headers.to_vec())
1477}
1478
1479async fn request_event(
1480    peer: Arc<Channel>,
1481    headers: Vec<Hash>,
1482    chunk_id: usize,
1483    comms_timeout: u64,
1484) -> (Result<Vec<Event>>, usize, Arc<Channel>) {
1485    let url = peer.address();
1486
1487    debug!(
1488        target: "event_graph::dag_sync",
1489        "Requesting {:?} from {}...", headers, url,
1490    );
1491
1492    let ev_rep_sub = match peer.subscribe_msg::<EventRep>().await {
1493        Ok(v) => v,
1494        Err(e) => {
1495            error!(
1496                target: "event_graph::dag_sync",
1497                "[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",
1498                url, e,
1499            );
1500            return (
1501                Err(Error::EventNotFound("Couldn't subscribe EventRep".to_owned())),
1502                chunk_id,
1503                peer,
1504            );
1505        }
1506    };
1507
1508    // let request_missing_events = missing_parents.clone().into_iter().collect();
1509    if let Err(e) = peer.send(&EventReq(headers.clone())).await {
1510        error!(
1511            target: "event_graph::dag_sync",
1512            "[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",
1513            headers, url, e,
1514        );
1515        return (
1516            Err(Error::EventNotFound("Failed communicating EventReq".to_owned())),
1517            chunk_id,
1518            peer,
1519        );
1520    }
1521
1522    // Node waits for response
1523    let Ok(event) = ev_rep_sub.receive_with_timeout(comms_timeout).await else {
1524        error!(
1525            target: "event_graph::dag_sync",
1526            "[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
1527            headers, url,
1528        );
1529        return (
1530            Err(Error::EventNotFound("Timeout waiting for parents".to_owned())),
1531            chunk_id,
1532            peer,
1533        );
1534    };
1535
1536    ev_rep_sub.unsubscribe().await;
1537
1538    (Ok(event.0.clone()), chunk_id, peer)
1539}