1use std::{
21 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
22 path::PathBuf,
23 str::FromStr,
24 sync::Arc,
25};
26
27use blake3::Hash;
29use darkfi_sdk::crypto::smt::{MemoryStorageFp, PoseidonFp, SmtMemoryFp, EMPTY_NODES_FP};
30use darkfi_serial::{deserialize_async, serialize_async};
31use event::Header;
32use futures::{
33 stream::FuturesUnordered,
35 StreamExt,
36};
37use num_bigint::BigUint;
38use sled_overlay::{sled, SledTreeOverlay};
39use smol::{
40 lock::{OnceCell, RwLock},
41 Executor,
42};
43use tracing::{debug, error, info, warn};
44use url::Url;
45
46use crate::{
47 event_graph::{
48 proto::StaticPut,
49 util::{next_hour_timestamp, next_rotation_timestamp, replayer_log},
50 },
51 net::{channel::Channel, P2pPtr},
52 system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
53 Error, Result,
54};
55
56#[cfg(feature = "rpc")]
57use {
58 crate::rpc::{
59 jsonrpc::{JsonResponse, JsonResult},
60 util::json_map,
61 },
62 tinyjson::JsonValue::{self},
63};
64
65pub mod event;
67pub use event::Event;
68
69pub mod proto;
71use proto::{EventRep, EventReq, HeaderRep, HeaderReq, TipRep, TipReq};
72
73pub mod rln;
74pub mod util;
78use util::{generate_genesis, millis_until_next_rotation};
79
80pub mod deg;
82use deg::DegEvent;
83
84#[cfg(test)]
85mod tests;
86
87pub const INITIAL_GENESIS: u64 = 1_694_044_800_000;
90pub const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
92
93pub const N_EVENT_PARENTS: usize = 5;
95const EVENT_TIME_DRIFT: u64 = 60_000;
97pub const NULL_ID: Hash = Hash::from_bytes([0x00; blake3::OUT_LEN]);
99pub const NULL_PARENTS: [Hash; N_EVENT_PARENTS] = [NULL_ID; N_EVENT_PARENTS];
101
102pub const DAGS_MAX_NUMBER: i8 = 24;
104
105pub type EventGraphPtr = Arc<EventGraph>;
107pub type LayerUTips = BTreeMap<u64, HashSet<blake3::Hash>>;
108
109#[derive(Clone)]
110pub struct DAGStore {
111 db: sled::Db,
112 header_dags: BTreeMap<u64, (sled::Tree, LayerUTips)>,
113 main_dags: BTreeMap<u64, (sled::Tree, LayerUTips)>,
114}
115
116impl DAGStore {
117 pub async fn new(&self, sled_db: sled::Db, hours_rotation: u64) -> Self {
118 let mut considered_trees = BTreeMap::new();
119 let mut considered_header_trees = BTreeMap::new();
120 if hours_rotation > 0 {
121 for i in 1..=DAGS_MAX_NUMBER {
123 let i_hours_ago = next_hour_timestamp((i - DAGS_MAX_NUMBER).into());
124 let header = Header {
125 timestamp: i_hours_ago,
126 parents: [NULL_ID; N_EVENT_PARENTS],
127 layer: 0,
128 };
129 let genesis = Event { header, content: GENESIS_CONTENTS.to_vec() };
130
131 let tree_name = genesis.header.timestamp.to_string();
132 let hdr_tree_name = format!("headers_{tree_name}");
133 let hdr_dag = sled_db.open_tree(hdr_tree_name).unwrap();
134 let dag = sled_db.open_tree(tree_name).unwrap();
135
136 if hdr_dag.is_empty() {
137 let mut overlay = SledTreeOverlay::new(&hdr_dag);
138
139 let header_se = serialize_async(&genesis.header).await;
140
141 overlay.insert(genesis.id().as_bytes(), &header_se).unwrap();
143
144 let batch = overlay.aggregate().unwrap();
146
147 if let Err(e) = hdr_dag.apply_batch(batch) {
150 panic!("Failed applying header_dag_insert batch to sled: {}", e);
151 }
152 }
153 if dag.is_empty() {
154 let mut overlay = SledTreeOverlay::new(&dag);
155
156 let event_se = serialize_async(&genesis).await;
157
158 overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
160
161 let batch = overlay.aggregate().unwrap();
163
164 if let Err(e) = dag.apply_batch(batch) {
167 panic!("Failed applying dag_insert batch to sled: {}", e);
168 }
169 }
170 let utips = self.find_unreferenced_tips(&dag).await;
171 considered_header_trees.insert(genesis.header.timestamp, (hdr_dag, utips.clone()));
172 considered_trees.insert(genesis.header.timestamp, (dag, utips));
173 }
174 } else {
175 let genesis = generate_genesis(0);
176 let tree_name = genesis.header.timestamp.to_string();
177 let hdr_tree_name = format!("headers_{tree_name}");
178 let hdr_dag = sled_db.open_tree(hdr_tree_name).unwrap();
179 let dag = sled_db.open_tree(tree_name).unwrap();
180 if hdr_dag.is_empty() {
181 let mut overlay = SledTreeOverlay::new(&hdr_dag);
182
183 let header_se = serialize_async(&genesis.header).await;
184
185 overlay.insert(genesis.id().as_bytes(), &header_se).unwrap();
187
188 let batch = overlay.aggregate().unwrap();
190
191 if let Err(e) = hdr_dag.apply_batch(batch) {
194 panic!("Failed applying header_dag_insert batch to sled: {}", e);
195 }
196 }
197 if dag.is_empty() {
198 let mut overlay = SledTreeOverlay::new(&dag);
199
200 let event_se = serialize_async(&genesis).await;
201
202 overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
204
205 let batch = overlay.aggregate().unwrap();
207
208 if let Err(e) = dag.apply_batch(batch) {
211 panic!("Failed applying dag_insert batch to sled: {}", e);
212 }
213 }
214 let utips = self.find_unreferenced_tips(&dag).await;
215 considered_header_trees.insert(genesis.header.timestamp, (hdr_dag, utips.clone()));
216 considered_trees.insert(genesis.header.timestamp, (dag, utips));
217 }
218
219 Self { db: sled_db, header_dags: considered_header_trees, main_dags: considered_trees }
220 }
221
222 pub async fn add_dag(&mut self, dag_name: &str, genesis_event: &Event) {
225 debug!("add_dag::dags: {}", self.main_dags.len());
226 if self.main_dags.len() != self.header_dags.len() {
227 panic!("main dags length is not the same as header dags")
228 }
229
230 if self.main_dags.len() >= DAGS_MAX_NUMBER.try_into().unwrap() {
231 debug!("[EVENTGRAPH] dropping oldest dag");
232 let (oldest_hdr_tree, _) = self.header_dags.pop_first().unwrap().1;
233 let (oldest_tree, _) = self.main_dags.pop_first().unwrap().1;
234
235 self.db.drop_tree(oldest_hdr_tree.name()).unwrap();
236 self.db.drop_tree(oldest_tree.name()).unwrap();
237 }
238
239 let hdr_tree_name = format!("headers_{dag_name}");
241 let hdr_dag = self.get_dag(&hdr_tree_name);
242 hdr_dag
243 .insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event.header).await)
244 .unwrap();
245
246 let dag = self.get_dag(dag_name);
247 dag.insert(genesis_event.id().as_bytes(), serialize_async(genesis_event).await).unwrap();
248 let utips = self.find_unreferenced_tips(&dag).await;
249 self.header_dags.insert(genesis_event.header.timestamp, (hdr_dag, utips.clone()));
250 self.main_dags.insert(genesis_event.header.timestamp, (dag, utips));
251 }
252
253 pub fn get_dag(&self, dag_name: &str) -> sled::Tree {
255 self.db.open_tree(dag_name).unwrap()
256 }
257
258 pub async fn get_dags(&self, count: usize) -> Vec<sled::Tree> {
260 let sorted_dags = self.sort_dags().await;
261 sorted_dags.into_iter().take(count).collect()
262 }
263
264 async fn sort_dags(&self) -> Vec<sled::Tree> {
266 let mut dags = self
267 .main_dags
268 .iter()
269 .map(|x| {
270 let trees = x.1;
271 trees.0.clone()
272 })
273 .collect::<Vec<_>>();
274 dags.reverse();
276
277 dags
278 }
279
280 async fn find_unreferenced_tips(&self, dag: &sled::Tree) -> LayerUTips {
282 let mut tips = HashSet::new();
284 for iter_elem in dag.iter() {
285 let (id, _) = iter_elem.unwrap();
286 let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
287 tips.insert(id);
288 }
289 for iter_elem in dag.iter() {
291 let (_, event) = iter_elem.unwrap();
292 let event: Event = deserialize_async(&event).await.unwrap();
293 for parent in event.header.parents.iter() {
294 tips.remove(parent);
295 }
296 }
297 let mut map: LayerUTips = BTreeMap::new();
299 for tip in tips {
300 let event = self.fetch_event_from_dag(&tip, dag).await.unwrap().unwrap();
301 if let Some(layer_tips) = map.get_mut(&event.header.layer) {
302 layer_tips.insert(tip);
303 } else {
304 let mut layer_tips = HashSet::new();
305 layer_tips.insert(tip);
306 map.insert(event.header.layer, layer_tips);
307 }
308 }
309
310 map
311 }
312
313 pub async fn fetch_event_from_dag(
315 &self,
316 event_id: &blake3::Hash,
317 dag: &sled::Tree,
318 ) -> Result<Option<Event>> {
319 let Some(bytes) = dag.get(event_id.as_bytes())? else { return Ok(None) };
320
321 let event: Event = deserialize_async(&bytes).await?;
322
323 Ok(Some(event))
324 }
325}
326
327enum PeerStatus {
328 Free,
329 Busy,
330 Failed,
331}
332
333pub struct EventGraph {
335 p2p: P2pPtr,
337 dag_store: RwLock<DAGStore>,
339 static_dag: sled::Tree,
341 datastore: PathBuf,
343 replay_mode: bool,
346 broadcasted_ids: RwLock<HashSet<Hash>>,
352 pub prune_task: OnceCell<StoppableTaskPtr>,
354 pub event_pub: PublisherPtr<Event>,
357 pub static_pub: PublisherPtr<Event>,
360 pub current_genesis: RwLock<Event>,
362 hours_rotation: u64,
364 pub synced: RwLock<bool>,
366 pub deg_enabled: RwLock<bool>,
368 deg_publisher: PublisherPtr<DegEvent>,
370 fast_mode: bool,
372 sled_db: sled::Db,
374 pub rln_identity_tree: RwLock<SmtMemoryFp>,
376}
377
378impl EventGraph {
379 pub async fn new(
392 p2p: P2pPtr,
393 sled_db: sled::Db,
394 datastore: PathBuf,
395 replay_mode: bool,
396 fast_mode: bool,
397 hours_rotation: u64,
398 ex: Arc<Executor<'_>>,
399 ) -> Result<EventGraphPtr> {
400 let hasher = PoseidonFp::new();
408 let store = MemoryStorageFp::new();
410 let identity_tree = SmtMemoryFp::new(store, hasher.clone(), &EMPTY_NODES_FP);
411
412 let broadcasted_ids = RwLock::new(HashSet::new());
413 let event_pub = Publisher::new();
414 let static_pub = Publisher::new();
415
416 let current_genesis = generate_genesis(hours_rotation);
418 let current_dag_tree_name = current_genesis.header.timestamp.to_string();
419 let dag_store = DAGStore {
420 db: sled_db.clone(),
421 header_dags: BTreeMap::default(),
422 main_dags: BTreeMap::default(),
423 }
424 .new(sled_db.clone(), hours_rotation)
425 .await;
426
427 let static_dag = Self::static_new(&sled_db).await?;
428
429 let self_ = Arc::new(Self {
430 p2p,
431 dag_store: RwLock::new(dag_store.clone()),
432 static_dag,
433 datastore,
434 replay_mode,
435 fast_mode,
436 broadcasted_ids,
437 prune_task: OnceCell::new(),
438 event_pub,
439 static_pub,
440 current_genesis: RwLock::new(current_genesis.clone()),
441 hours_rotation,
442 synced: RwLock::new(false),
443 deg_enabled: RwLock::new(false),
444 deg_publisher: Publisher::new(),
445 sled_db,
446 rln_identity_tree: RwLock::new(identity_tree),
447 });
448
449 let dag = dag_store.get_dag(¤t_dag_tree_name);
452 if !dag.contains_key(current_genesis.id().as_bytes())? {
453 info!(
454 target: "event_graph::new",
455 "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
456 );
457 self_.dag_prune(current_genesis).await?;
458 }
459
460 if hours_rotation > 0 {
462 let prune_task = StoppableTask::new();
463 let _ = self_.prune_task.set(prune_task.clone()).await;
464
465 prune_task.clone().start(
466 self_.clone().dag_prune_task(hours_rotation),
467 |res| async move {
468 match res {
469 Ok(()) | Err(Error::DetachedTaskStopped) => { }
470 Err(e) => error!(target: "event_graph::_handle_stop", "[EVENTGRAPH] Failed stopping prune task: {e}")
471 }
472 },
473 Error::DetachedTaskStopped,
474 ex.clone(),
475 );
476 }
477
478 Ok(self_)
479 }
480
481 pub fn hours_rotation(&self) -> u64 {
482 self.hours_rotation
483 }
484
485 pub async fn dag_sync(&self, dag: sled::Tree, fast_mode: bool) -> Result<()> {
487 let dag_name = String::from_utf8_lossy(&dag.name()).to_string();
503
504 let channels = self.p2p.hosts().peers();
506 let mut communicated_peers = channels.len();
507 info!(
508 target: "event_graph::dag_sync",
509 "[EVENTGRAPH] Syncing DAG from {communicated_peers} peers..."
510 );
511
512 let comms_timeout = self.p2p.settings().read().await.outbound_connect_timeout_max();
513
514 let mut tips: HashMap<Hash, (u64, usize)> = HashMap::new();
516
517 for channel in channels.iter() {
520 let url = channel.display_address();
521
522 let tip_rep_sub = match channel.subscribe_msg::<TipRep>().await {
523 Ok(v) => v,
524 Err(e) => {
525 error!(
526 target: "event_graph::dag_sync",
527 "[EVENTGRAPH] Sync: Couldn't subscribe TipReq for peer {url}, skipping ({e})"
528 );
529 communicated_peers -= 1;
530 continue
531 }
532 };
533
534 if let Err(e) = channel.send(&TipReq(dag_name.clone())).await {
535 error!(
536 target: "event_graph::dag_sync",
537 "[EVENTGRAPH] Sync: Couldn't contact peer {url}, skipping ({e})"
538 );
539 communicated_peers -= 1;
540 continue
541 };
542
543 let Ok(peer_tips) = tip_rep_sub.receive_with_timeout(comms_timeout).await else {
545 error!(
546 target: "event_graph::dag_sync",
547 "[EVENTGRAPH] Sync: Peer {url} didn't reply with tips in time, skipping"
548 );
549 communicated_peers -= 1;
550 continue
551 };
552
553 let peer_tips: &BTreeMap<u64, HashSet<Hash>> = &peer_tips.0;
554
555 for (layer, layer_tips) in peer_tips {
557 for tip in layer_tips {
558 if let Some(seen_tip) = tips.get_mut(tip) {
559 seen_tip.1 += 1;
560 } else {
561 tips.insert(*tip, (*layer, 1));
562 }
563 }
564 }
565 }
566
567 if tips.is_empty() {
569 error!(
570 target: "event_graph::dag_sync",
571 "[EVENTGRAPH] Sync: Could not find any DAG tips",
572 );
573 return Err(Error::DagSyncFailed)
574 }
575
576 let consideration_threshold = communicated_peers * 2 / 3;
580 let mut considered_tips = HashSet::new();
581 for (tip, (_, amount)) in tips.iter() {
582 if amount > &consideration_threshold {
583 considered_tips.insert(*tip);
584 }
585 }
586 drop(tips);
587
588 let mut missing_parents = HashSet::new();
590 for tip in considered_tips.iter() {
591 assert!(tip != &NULL_ID);
592
593 if !dag.contains_key(tip.as_bytes()).unwrap() {
594 missing_parents.insert(*tip);
595 }
596 }
597
598 if missing_parents.is_empty() {
599 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] DAG synced successfully!");
600 return Ok(())
601 }
602 let hdr_tree_name = format!("headers_{dag_name}");
603 let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
604 let dag_timestamp = u64::from_str(&dag_name)?;
605 let our_tips =
606 self.dag_store.read().await.header_dags.get(&dag_timestamp).unwrap().1.clone();
607 let mut headers_requests = FuturesUnordered::new();
608 for channel in channels.iter() {
609 headers_requests.push(request_header(
610 channel,
611 dag_name.clone(),
612 our_tips.clone(),
613 comms_timeout,
614 ))
615 }
616
617 while let Some(peer_headers) = headers_requests.next().await {
618 self.header_dag_insert(peer_headers?, &dag_name).await?
619 }
620
621 if !fast_mode {
623 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Fetching events");
624 let mut header_sorted = vec![];
625
626 let main_dag = self.dag_store.read().await.get_dag(&dag_name);
627 for iter_elem in header_dag.iter() {
628 let (hash_bytes, val) = iter_elem.unwrap();
629 let val: Header = deserialize_async(&val).await.unwrap();
630 if val.parents != NULL_PARENTS && !main_dag.contains_key(hash_bytes)? {
631 header_sorted.push(val);
632 }
633 }
634 header_sorted.sort_by(|x, y| x.layer.cmp(&y.layer));
635
636 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Retrieving {} Events", header_sorted.len());
637 let batch = 20;
639 let mut chunks: BTreeMap<usize, Vec<blake3::Hash>> = BTreeMap::new();
642 for (i, chunk) in header_sorted.chunks(batch).enumerate() {
643 chunks.insert(i, chunk.iter().map(|h| h.id()).collect());
644 }
645 let mut remaining_chunk_ids: BTreeSet<usize> = chunks.keys().cloned().collect();
646
647 let mut received_events: BTreeMap<usize, Vec<Event>> = BTreeMap::new();
650 let mut peer_status: HashMap<Url, PeerStatus> = HashMap::new();
653
654 let mut retrieved_count = 0;
655 let mut futures = FuturesUnordered::new();
656
657 while retrieved_count < header_sorted.len() {
658 let mut free_channels = vec![];
660 let mut busy_channels = 0;
661
662 self.p2p.hosts().peers().iter().for_each(|channel| {
663 if let Some(status) = peer_status.get(channel.address()) {
664 match status {
665 PeerStatus::Free => free_channels.push(channel.clone()),
666 PeerStatus::Busy => busy_channels += 1,
667 _ => {}
668 }
669 } else {
670 peer_status.insert(channel.address().clone(), PeerStatus::Free);
671 free_channels.push(channel.clone());
672 }
673 });
674
675 if free_channels.is_empty() && busy_channels == 0 {
677 return Err(Error::DagSyncFailed)
678 }
679
680 let requested_chunks_len =
682 std::cmp::min(free_channels.len(), remaining_chunk_ids.len());
683 let requested_chunk_ids: Vec<usize> =
684 remaining_chunk_ids.iter().take(requested_chunks_len).copied().collect();
685 for (i, chunk_id) in requested_chunk_ids.iter().enumerate() {
686 futures.push(request_event(
687 free_channels[i].clone(),
688 chunks.get(chunk_id).unwrap().clone(),
689 *chunk_id,
690 comms_timeout,
691 ));
692 remaining_chunk_ids.remove(chunk_id);
693 peer_status.insert(free_channels[i].address().clone(), PeerStatus::Busy);
694 }
695
696 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Retrieving Events from {} peers", futures.len());
697 if let Some(resp) = futures.next().await {
698 let (events, chunk_id, channel) = resp;
699 if let Ok(events) = events {
700 retrieved_count += events.len();
701 received_events.insert(chunk_id, events.clone());
702 peer_status.insert(channel.address().clone(), PeerStatus::Free);
703 } else {
704 remaining_chunk_ids.insert(chunk_id);
705 peer_status.insert(channel.address().clone(), PeerStatus::Failed);
706 }
707
708 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Retrieved Events: {}/{}", retrieved_count, header_sorted.len());
709 }
710 }
711
712 let mut verified_count = 0;
713 for (_, chunk) in received_events {
714 verified_count += chunk.len();
715 self.dag_insert(&chunk, &dag_name).await?;
716 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Verified Events: {}/{}", verified_count, retrieved_count);
717 }
718 }
719 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] DAG synced successfully!");
722 Ok(())
723 }
724
725 pub async fn sync_selected(&self, count: usize, fast_mode: bool) -> Result<()> {
727 let mut dags_to_sync = self.dag_store.read().await.get_dags(count).await;
728 dags_to_sync.reverse();
730 for dag in dags_to_sync {
731 match self.dag_sync(dag, fast_mode).await {
732 Ok(()) => continue,
733 Err(e) => return Err(e),
734 }
735 }
736
737 *self.synced.write().await = true;
738 Ok(())
739 }
740
741 async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
743 debug!(target: "event_graph::dag_prune", "Pruning DAG...");
744
745 let mut broadcasted_ids = self.broadcasted_ids.write().await;
751 let mut current_genesis = self.current_genesis.write().await;
752
753 let dag_name = genesis_event.header.timestamp.to_string();
754 self.dag_store.write().await.add_dag(&dag_name, &genesis_event).await;
755
756 *current_genesis = genesis_event;
758 *broadcasted_ids = HashSet::new();
759 drop(broadcasted_ids);
760 drop(current_genesis);
761
762 debug!(target: "event_graph::dag_prune", "DAG pruned successfully");
763 Ok(())
764 }
765
766 async fn dag_prune_task(self: Arc<Self>, hours_rotation: u64) -> Result<()> {
768 debug!(target: "event_graph::dag_prune_task", "Spawned background DAG pruning task");
773
774 loop {
775 let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, hours_rotation);
777
778 let header =
779 Header { timestamp: next_rotation, parents: [NULL_ID; N_EVENT_PARENTS], layer: 0 };
780 let current_genesis = Event { header, content: GENESIS_CONTENTS.to_vec() };
782
783 let s = millis_until_next_rotation(next_rotation);
785
786 debug!(target: "event_graph::dag_prune_task", "Sleeping {s}ms until next DAG prune");
787 msleep(s).await;
788 debug!(target: "event_graph::dag_prune_task", "Rotation period reached");
789
790 self.dag_prune(current_genesis).await?;
792 }
793 }
794
795 pub async fn dag_insert(&self, events: &[Event], dag_name: &str) -> Result<Vec<Hash>> {
806 if events.is_empty() {
808 return Ok(vec![])
809 }
810
811 let dag_timestamp = u64::from_str(dag_name)?;
813 let mut broadcasted_ids = self.broadcasted_ids.write().await;
814
815 let main_dag = self.dag_store.read().await.get_dag(dag_name);
816 let hdr_tree_name = format!("headers_{dag_name}");
817 let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
818
819 let mut ids = Vec::with_capacity(events.len());
821
822 let mut overlay = SledTreeOverlay::new(&main_dag);
824
825 for event in events {
828 let event_id = event.id();
829 if event.header.parents == NULL_PARENTS {
830 continue
831 }
832 debug!(
833 target: "event_graph::dag_insert",
834 "Inserting event {event_id} into the DAG layer: {}", event.header.layer
835 );
836
837 if main_dag.contains_key(event_id.as_bytes())? {
839 continue
840 }
841
842 if !header_dag.contains_key(event_id.as_bytes())? {
844 continue
845 }
846
847 if !event.dag_validate(&header_dag).await? {
848 error!(target: "event_graph::dag_insert", "Event {} is invalid!", event_id);
849 return Err(Error::EventIsInvalid)
850 }
851
852 let event_se = serialize_async(event).await;
853
854 overlay.insert(event_id.as_bytes(), &event_se)?;
856
857 if self.replay_mode {
858 replayer_log(&self.datastore, "insert".to_owned(), event_se)?;
859 }
860 ids.push(event_id);
862 }
863
864 let batch = match overlay.aggregate() {
866 Some(x) => x,
867 None => return Ok(vec![]),
868 };
869
870 if let Err(e) = main_dag.apply_batch(batch) {
873 panic!("Failed applying dag_insert batch to sled: {e}");
874 }
875
876 let mut dag_store = self.dag_store.write().await;
877 let (_, unreferenced_tips) = &mut dag_store.main_dags.get_mut(&dag_timestamp).unwrap();
878
879 for event in events {
882 let event_id = event.id();
883 if event.header.parents == NULL_PARENTS {
884 continue
885 }
886
887 debug!(
889 target: "event_graph::dag_insert",
890 "Event {event_id} parents {:#?}", event.header.parents,
891 );
892 for parent_id in event.header.parents.iter() {
893 if parent_id != &NULL_ID {
894 debug!(
895 target: "event_graph::dag_insert",
896 "Removing {parent_id} from unreferenced_tips"
897 );
898
899 for (layer, tips) in unreferenced_tips.iter_mut() {
905 if layer >= &event.header.layer {
906 continue
907 }
908 tips.remove(parent_id);
909 }
910 broadcasted_ids.insert(*parent_id);
911 }
912 }
913 unreferenced_tips.retain(|_, tips| !tips.is_empty());
914 debug!(
915 target: "event_graph::dag_insert",
916 "Adding {event_id} to unreferenced tips"
917 );
918
919 if let Some(layer_tips) = unreferenced_tips.get_mut(&event.header.layer) {
920 layer_tips.insert(event_id);
921 } else {
922 let mut layer_tips = HashSet::new();
923 layer_tips.insert(event_id);
924 unreferenced_tips.insert(event.header.layer, layer_tips);
925 }
926
927 self.event_pub.notify(event.clone()).await;
929 }
930
931 dag_store.header_dags.get_mut(&dag_timestamp).unwrap().1 =
932 dag_store.main_dags.get(&dag_timestamp).unwrap().1.clone();
933
934 drop(dag_store);
936 drop(broadcasted_ids);
937
938 Ok(ids)
939 }
940
941 pub async fn header_dag_insert(&self, headers: Vec<Header>, dag_name: &str) -> Result<()> {
942 let hdr_tree_name = format!("headers_{dag_name}");
943 let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
944 let mut overlay = SledTreeOverlay::new(&header_dag);
946
947 let mut hdrs = headers;
948 hdrs.sort_by(|x, y| x.layer.cmp(&y.layer));
949
950 for header in hdrs {
953 let header_id = header.id();
954 if header.parents == NULL_PARENTS {
955 continue
956 }
957 debug!(
958 target: "event_graph::header_dag_insert",
959 "Inserting header {} into the DAG", header_id,
960 );
961 if !header.validate(&header_dag, self.hours_rotation, Some(&overlay)).await? {
962 error!(target: "event_graph::header_dag_insert", "Header {} is invalid!", header_id);
963 return Err(Error::HeaderIsInvalid)
964 }
965 let header_se = serialize_async(&header).await;
966
967 overlay.insert(header_id.as_bytes(), &header_se)?;
969 }
970
971 let batch = match overlay.aggregate() {
973 Some(x) => x,
974 None => return Ok(()),
975 };
976
977 if let Err(e) = header_dag.apply_batch(batch) {
980 panic!("Failed applying dag_insert batch to sled: {}", e);
981 }
982
983 Ok(())
984 }
985
986 pub async fn fetch_event_from_dags(&self, event_id: &blake3::Hash) -> Result<Option<Event>> {
988 let store = self.dag_store.read().await;
989 for tree_elem in store.main_dags.clone() {
990 let dag_name = tree_elem.0.to_string();
991 let Some(bytes) = store.get_dag(&dag_name).get(event_id.as_bytes())? else {
992 continue;
993 };
994 let event: Event = deserialize_async(&bytes).await?;
995
996 return Ok(Some(event))
997 }
998
999 Ok(None)
1000 }
1001
1002 async fn get_next_layer_with_parents(
1007 &self,
1008 dag_name: &u64,
1009 ) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
1010 let store = self.dag_store.read().await;
1011 let (_, unreferenced_tips) = store.header_dags.get(dag_name).unwrap();
1012
1013 let mut parents = [NULL_ID; N_EVENT_PARENTS];
1014 let mut index = 0;
1015 'outer: for (_, tips) in unreferenced_tips.iter().rev() {
1016 for tip in tips.iter() {
1017 parents[index] = *tip;
1018 index += 1;
1019 if index >= N_EVENT_PARENTS {
1020 break 'outer;
1021 }
1022 }
1023 }
1024
1025 let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
1026
1027 assert!(parents.iter().any(|x| x != &NULL_ID));
1028 (next_layer, parents)
1029 }
1030
1031 async fn find_unreferenced_tips_static(&self, dag: &sled::Tree) -> LayerUTips {
1033 let mut tips = HashSet::new();
1035 for iter_elem in dag.iter() {
1036 let (id, _) = iter_elem.unwrap();
1037 let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1038 tips.insert(id);
1039 }
1040 for iter_elem in dag.iter() {
1042 let (_, event) = iter_elem.unwrap();
1043 let event: Event = deserialize_async(&event).await.unwrap();
1044 for parent in event.header.parents.iter() {
1045 tips.remove(parent);
1046 }
1047 }
1048 let mut map: LayerUTips = BTreeMap::new();
1050 for tip in tips {
1051 let event = self.static_fetch(&tip).await.unwrap().unwrap();
1052 if let Some(layer_tips) = map.get_mut(&event.header.layer) {
1053 layer_tips.insert(tip);
1054 } else {
1055 let mut layer_tips = HashSet::new();
1056 layer_tips.insert(tip);
1057 map.insert(event.header.layer, layer_tips);
1058 }
1059 }
1060
1061 map
1062 }
1063
1064 async fn get_next_layer_with_parents_static(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
1065 let unreferenced_tips = self.find_unreferenced_tips_static(&self.static_dag).await;
1066
1067 let mut parents = [NULL_ID; N_EVENT_PARENTS];
1068 let mut index = 0;
1069 'outer: for (_, tips) in unreferenced_tips.iter().rev() {
1070 for tip in tips.iter() {
1071 parents[index] = *tip;
1072 index += 1;
1073 if index >= N_EVENT_PARENTS {
1074 break 'outer;
1075 }
1076 }
1077 }
1078
1079 let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
1080
1081 assert!(parents.iter().any(|x| x != &NULL_ID));
1082 (next_layer, parents)
1083 }
1084
1085 async fn get_unreferenced_tips_sorted(&self) -> Vec<[blake3::Hash; N_EVENT_PARENTS]> {
1087 let mut vec_tips = vec![];
1088 let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
1089 for (i, _) in self.dag_store.read().await.header_dags.iter() {
1090 let (_, tips) = self.get_next_layer_with_parents(i).await;
1091 let mut sorted: Vec<_> =
1093 tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
1094 sorted.sort_unstable();
1095
1096 for (i, id) in sorted.iter().enumerate() {
1098 let mut bytes = id.to_bytes_be();
1099
1100 while bytes.len() < blake3::OUT_LEN {
1102 bytes.insert(0, 0);
1103 }
1104
1105 tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap());
1106 }
1107
1108 vec_tips.push(tips_sorted);
1109 }
1110
1111 vec_tips
1112 }
1113
1114 pub async fn order_events(&self) -> Vec<Event> {
1116 let mut ordered_events = VecDeque::new();
1117 let mut visited = HashSet::new();
1118
1119 for i in self.get_unreferenced_tips_sorted().await {
1120 for tip in i {
1121 if !visited.contains(&tip) && tip != NULL_ID {
1122 let tip = self.fetch_event_from_dags(&tip).await.unwrap().unwrap();
1123 ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await);
1124 }
1125 }
1126 }
1127
1128 let mut ord_events_vec = ordered_events.make_contiguous().to_vec();
1129 ord_events_vec.sort_unstable_by(|a, b| a.1.header.timestamp.cmp(&b.1.header.timestamp));
1131
1132 ord_events_vec.iter().map(|a| a.1.clone()).collect::<Vec<Event>>()
1133 }
1134
1135 async fn dfs_topological_sort(
1138 &self,
1139 event: Event,
1140 visited: &mut HashSet<Hash>,
1141 ) -> VecDeque<(u64, Event)> {
1142 let mut ordered_events = VecDeque::new();
1143 let mut stack = VecDeque::new();
1144 let event_id = event.id();
1145 stack.push_back(event_id);
1146
1147 while let Some(event_id) = stack.pop_front() {
1148 if !visited.contains(&event_id) && event_id != NULL_ID {
1149 visited.insert(event_id);
1150 if let Some(event) = self.fetch_event_from_dags(&event_id).await.unwrap() {
1151 for parent in event.header.parents.iter() {
1152 stack.push_back(*parent);
1153 }
1154
1155 ordered_events.push_back((event.header.layer, event))
1156 }
1157 }
1158 }
1159
1160 ordered_events
1161 }
1162
1163 pub async fn deg_enable(&self) {
1165 *self.deg_enabled.write().await = true;
1166 warn!("[EVENTGRAPH] Graph debugging enabled!");
1167 }
1168
1169 pub async fn deg_disable(&self) {
1171 *self.deg_enabled.write().await = false;
1172 warn!("[EVENTGRAPH] Graph debugging disabled!");
1173 }
1174
1175 pub async fn deg_subscribe(&self) -> Subscription<DegEvent> {
1177 self.deg_publisher.clone().subscribe().await
1178 }
1179
1180 pub async fn deg_notify(&self, event: DegEvent) {
1182 self.deg_publisher.notify(event).await;
1183 }
1184
1185 #[cfg(feature = "rpc")]
1186 pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult {
1187 let current_genesis = self.current_genesis.read().await;
1188 let dag_name = current_genesis.header.timestamp.to_string();
1189 let mut graph = HashMap::new();
1190 for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() {
1191 let (id, val) = iter_elem.unwrap();
1192 let id = Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1193 let val: Event = deserialize_async(&val).await.unwrap();
1194 graph.insert(id, val);
1195 }
1196
1197 let json_graph = graph
1198 .into_iter()
1199 .map(|(k, v)| {
1200 let key = k.to_string();
1201 let value = JsonValue::from(v);
1202 (key, value)
1203 })
1204 .collect();
1205 let values = json_map([("dag", JsonValue::Object(json_graph))]);
1206
1207 let result = JsonValue::Object(HashMap::from([("eventgraph_info".to_string(), values)]));
1208
1209 JsonResponse::new(result, id).into()
1210 }
1211
1212 pub async fn fetch_headers_with_tips(
1214 &self,
1215 dag_name: &str,
1216 tips: &LayerUTips,
1217 ) -> Result<Vec<Header>> {
1218 debug!(
1219 target: "event_graph::fetch_headers_with_tips",
1220 "fetching headers with tips {tips:?}"
1221 );
1222
1223 let tree = self.dag_store.read().await.get_dag(&format!("headers_{dag_name}"));
1224
1225 let mut ancestors = HashSet::new();
1227
1228 for hashes in tips.values() {
1229 for hash in hashes {
1230 ancestors.insert(*hash);
1231 let val = tree
1232 .get(hash.as_bytes())?
1233 .ok_or_else(|| Error::EventNotFound("The Tip is not found".to_owned()))?;
1234 let header: Header = deserialize_async(&val).await?;
1235 self.get_ancestors(&mut ancestors, header, &tree).await?
1236 }
1237 }
1238
1239 let mut result = Vec::with_capacity(tree.len() - ancestors.len());
1240 for iter_elem in tree.iter() {
1243 let (id, val) = iter_elem?;
1244 let hash = Hash::from_bytes((&id as &[u8]).try_into()?);
1245 if !ancestors.contains(&hash) {
1246 let header: Header = deserialize_async(&val).await?;
1247 result.push(header);
1248 }
1249 }
1250
1251 result.sort_unstable_by(|a, b| a.layer.cmp(&b.layer));
1252
1253 Ok(result)
1254 }
1255
1256 async fn get_ancestors(
1258 &self,
1259 visited: &mut HashSet<Hash>,
1260 header: Header,
1261 tree: &sled::Tree,
1262 ) -> Result<()> {
1263 let mut stack = VecDeque::new();
1264 stack.push_back(header);
1265
1266 while let Some(hdr) = stack.pop_back() {
1267 for parent in hdr.parents {
1268 if parent != NULL_ID && !visited.contains(&parent) {
1269 visited.insert(parent);
1270 let val = tree.get(parent.as_bytes())?.unwrap();
1271 let header: Header = deserialize_async(&val).await?;
1272 stack.push_back(header);
1273 }
1274 }
1275 }
1276
1277 Ok(())
1278 }
1279
1280 pub async fn fetch_successors_of(&self, tips: LayerUTips) -> Result<Vec<Event>> {
1283 debug!(
1284 target: "event_graph::fetch_successors_of",
1285 "fetching successors of {tips:?}"
1286 );
1287
1288 let current_genesis = self.current_genesis.read().await;
1289 let dag_name = current_genesis.header.timestamp.to_string();
1290 let mut graph = HashMap::new();
1291 for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() {
1292 let (id, val) = iter_elem.unwrap();
1293 let hash = Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1294 let event: Event = deserialize_async(&val).await.unwrap();
1295 graph.insert(hash, event);
1296 }
1297
1298 let mut result = vec![];
1299
1300 'outer: for tip in tips.iter() {
1301 for i in tip.1.iter() {
1302 if !graph.contains_key(i) {
1303 continue 'outer;
1304 }
1305 }
1306
1307 for (_, ev) in graph.iter() {
1308 if ev.header.layer > *tip.0 && !result.contains(ev) {
1309 result.push(ev.clone())
1310 }
1311 }
1312 }
1313
1314 result.sort_by(|a, b| a.header.layer.cmp(&b.header.layer));
1315
1316 Ok(result)
1317 }
1318
1319 pub async fn static_new(sled_db: &sled::Db) -> Result<sled::Tree> {
1320 let static_dag = sled_db.open_tree("static-dag")?;
1321
1322 let genesis = generate_genesis(0);
1323 let mut overlay = SledTreeOverlay::new(&static_dag);
1324
1325 let event_se = serialize_async(&genesis).await;
1326
1327 overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
1329
1330 let batch = match overlay.aggregate() {
1332 Some(b) => b,
1333 None => return Ok(static_dag),
1334 };
1335
1336 if let Err(e) = static_dag.apply_batch(batch) {
1339 panic!("Failed applying dag_insert batch to sled: {}", e);
1340 }
1341
1342 Ok(static_dag)
1343 }
1344
1345 pub async fn static_sync(&self) -> Result<()> {
1346 self.dag_sync(self.static_dag.clone(), false).await?;
1347 Ok(())
1348 }
1349
1350 pub async fn static_broadcast(&self, event: Event, blob: Vec<u8>) -> Result<()> {
1351 self.p2p.broadcast(&StaticPut(event, blob)).await;
1352 Ok(())
1353 }
1354
1355 pub async fn static_insert(&self, event: &Event) -> Result<()> {
1356 let mut overlay = SledTreeOverlay::new(&self.static_dag);
1357
1358 let event_se = serialize_async(event).await;
1359
1360 overlay.insert(event.id().as_bytes(), &event_se).unwrap();
1362
1363 let batch = match overlay.aggregate() {
1365 Some(b) => b,
1366 None => return Ok(()),
1367 };
1368
1369 if let Err(e) = self.static_dag.apply_batch(batch) {
1372 panic!("Failed applying dag_insert batch to sled: {}", e);
1373 }
1374
1375 self.static_pub.notify(event.clone()).await;
1377
1378 Ok(())
1379 }
1380
1381 pub async fn static_fetch(&self, event_id: &Hash) -> Result<Option<Event>> {
1382 let Some(bytes) = self.static_dag.get(event_id.as_bytes())? else { return Ok(None) };
1383
1384 let event: Event = deserialize_async(&bytes).await?;
1385
1386 Ok(Some(event))
1387 }
1388
1389 pub async fn static_fetch_all(&self) -> Result<Vec<Event>> {
1390 let mut events = vec![];
1391 for iter_elem in self.static_dag.iter() {
1392 let (id, _) = iter_elem.unwrap();
1393 let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1394 let event = self.static_fetch(&id).await?.unwrap();
1395 if event.header.parents == NULL_PARENTS {
1396 continue
1397 }
1398 events.push(event);
1399 }
1400 Ok(events)
1401 }
1402
1403 pub async fn static_unreferenced_tips(&self) -> LayerUTips {
1404 let mut tips = HashSet::new();
1406 for iter_elem in self.static_dag.iter() {
1407 let (id, _) = iter_elem.unwrap();
1408 let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
1409 tips.insert(id);
1410 }
1411 for iter_elem in self.static_dag.iter() {
1413 let (_, event) = iter_elem.unwrap();
1414 let event: Event = deserialize_async(&event).await.unwrap();
1415 for parent in event.header.parents.iter() {
1416 tips.remove(parent);
1417 }
1418 }
1419 let mut map: LayerUTips = BTreeMap::new();
1421 for tip in tips {
1422 let event = self.static_fetch(&tip).await.unwrap().unwrap();
1423 if let Some(layer_tips) = map.get_mut(&event.header.layer) {
1424 layer_tips.insert(tip);
1425 } else {
1426 let mut layer_tips = HashSet::new();
1427 layer_tips.insert(tip);
1428 map.insert(event.header.layer, layer_tips);
1429 }
1430 }
1431
1432 map
1433 }
1434}
1435
1436async fn request_header(
1437 peer: &Channel,
1438 tree_name: String,
1439 tips: LayerUTips,
1440 comms_timeout: u64,
1441) -> Result<Vec<Header>> {
1442 let url = peer.address();
1443
1444 let hdr_rep_sub = match peer.subscribe_msg::<HeaderRep>().await {
1445 Ok(v) => v,
1446 Err(e) => {
1447 error!(
1448 target: "event_graph::dag_sync",
1449 "[EVENTGRAPH] Sync: Couldn't subscribe HeaderReq for peer {}, skipping ({})",
1450 url, e,
1451 );
1452 return Err(Error::EventNotFound("Couldn't subscribe HeaderReq".to_owned()));
1453 }
1454 };
1455
1456 if let Err(e) = peer.send(&HeaderReq(tree_name, tips)).await {
1457 error!(
1458 target: "event_graph::dag_sync",
1459 "[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e,
1460 );
1461 return Err(Error::EventNotFound("Couldn't contact peer".to_owned()));
1462 };
1463
1464 let Ok(peer_headers) = hdr_rep_sub.receive_with_timeout(comms_timeout).await else {
1466 error!(
1467 target: "event_graph::dag_sync",
1468 "[EVENTGRAPH] Sync: Peer {} didn't reply with headers in time, skipping", url,
1469 );
1470 return Err(Error::EventNotFound("Peer didn't reply with headers in time".to_owned()));
1472 };
1473
1474 hdr_rep_sub.unsubscribe().await;
1475 let peer_headers = &peer_headers.0;
1476 Ok(peer_headers.to_vec())
1477}
1478
1479async fn request_event(
1480 peer: Arc<Channel>,
1481 headers: Vec<Hash>,
1482 chunk_id: usize,
1483 comms_timeout: u64,
1484) -> (Result<Vec<Event>>, usize, Arc<Channel>) {
1485 let url = peer.address();
1486
1487 debug!(
1488 target: "event_graph::dag_sync",
1489 "Requesting {:?} from {}...", headers, url,
1490 );
1491
1492 let ev_rep_sub = match peer.subscribe_msg::<EventRep>().await {
1493 Ok(v) => v,
1494 Err(e) => {
1495 error!(
1496 target: "event_graph::dag_sync",
1497 "[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",
1498 url, e,
1499 );
1500 return (
1501 Err(Error::EventNotFound("Couldn't subscribe EventRep".to_owned())),
1502 chunk_id,
1503 peer,
1504 );
1505 }
1506 };
1507
1508 if let Err(e) = peer.send(&EventReq(headers.clone())).await {
1510 error!(
1511 target: "event_graph::dag_sync",
1512 "[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",
1513 headers, url, e,
1514 );
1515 return (
1516 Err(Error::EventNotFound("Failed communicating EventReq".to_owned())),
1517 chunk_id,
1518 peer,
1519 );
1520 }
1521
1522 let Ok(event) = ev_rep_sub.receive_with_timeout(comms_timeout).await else {
1524 error!(
1525 target: "event_graph::dag_sync",
1526 "[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
1527 headers, url,
1528 );
1529 return (
1530 Err(Error::EventNotFound("Timeout waiting for parents".to_owned())),
1531 chunk_id,
1532 peer,
1533 );
1534 };
1535
1536 ev_rep_sub.unsubscribe().await;
1537
1538 (Ok(event.0.clone()), chunk_id, peer)
1539}