1use std::{
20 collections::{BTreeMap, HashMap, HashSet, VecDeque},
21 path::PathBuf,
22 sync::Arc,
23};
24
25use darkfi_serial::{deserialize_async, serialize_async};
26use num_bigint::BigUint;
27use sled_overlay::{sled, SledTreeOverlay};
28use smol::{
29 lock::{OnceCell, RwLock},
30 Executor,
31};
32use tracing::{debug, error, info, warn};
33
34use crate::{
35 event_graph::util::replayer_log,
36 net::P2pPtr,
37 system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
38 Error, Result,
39};
40
41#[cfg(feature = "rpc")]
42use {
43 crate::rpc::{
44 jsonrpc::{JsonResponse, JsonResult},
45 util::json_map,
46 },
47 tinyjson::JsonValue::{self},
48};
49
50pub mod event;
52pub use event::Event;
53
54pub mod proto;
56use proto::{EventRep, EventReq, TipRep, TipReq};
57
58pub mod util;
60use util::{generate_genesis, millis_until_next_rotation, next_rotation_timestamp};
61
62pub mod deg;
64use deg::DegEvent;
65
66#[cfg(test)]
67mod tests;
68
69pub const INITIAL_GENESIS: u64 = 1_694_044_800_000;
72pub const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
74
75pub const N_EVENT_PARENTS: usize = 5;
77const EVENT_TIME_DRIFT: u64 = 60_000;
79pub const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]);
81
82pub type EventGraphPtr = Arc<EventGraph>;
84
85pub struct EventGraph {
87 p2p: P2pPtr,
89 dag: sled::Tree,
91 datastore: PathBuf,
93 replay_mode: bool,
96 unreferenced_tips: RwLock<BTreeMap<u64, HashSet<blake3::Hash>>>,
98 broadcasted_ids: RwLock<HashSet<blake3::Hash>>,
104 pub prune_task: OnceCell<StoppableTaskPtr>,
106 pub event_pub: PublisherPtr<Event>,
109 current_genesis: RwLock<Event>,
111 days_rotation: u64,
113 pub synced: RwLock<bool>,
115 pub deg_enabled: RwLock<bool>,
117 deg_publisher: PublisherPtr<DegEvent>,
119}
120
121impl EventGraph {
122 pub async fn new(
136 p2p: P2pPtr,
137 sled_db: sled::Db,
138 datastore: PathBuf,
139 replay_mode: bool,
140 dag_tree_name: &str,
141 days_rotation: u64,
142 ex: Arc<Executor<'_>>,
143 ) -> Result<EventGraphPtr> {
144 let dag = sled_db.open_tree(dag_tree_name)?;
145 let unreferenced_tips = RwLock::new(BTreeMap::new());
146 let broadcasted_ids = RwLock::new(HashSet::new());
147 let event_pub = Publisher::new();
148
149 let current_genesis = generate_genesis(days_rotation);
151 let self_ = Arc::new(Self {
152 p2p,
153 dag: dag.clone(),
154 datastore,
155 replay_mode,
156 unreferenced_tips,
157 broadcasted_ids,
158 prune_task: OnceCell::new(),
159 event_pub,
160 current_genesis: RwLock::new(current_genesis.clone()),
161 days_rotation,
162 synced: RwLock::new(false),
163 deg_enabled: RwLock::new(false),
164 deg_publisher: Publisher::new(),
165 });
166
167 if !dag.contains_key(current_genesis.id().as_bytes())? {
170 info!(
171 target: "event_graph::new",
172 "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
173 );
174 self_.dag_prune(current_genesis).await?;
175 }
176
177 *self_.unreferenced_tips.write().await = self_.find_unreferenced_tips().await;
179
180 if days_rotation > 0 {
182 let prune_task = StoppableTask::new();
183 let _ = self_.prune_task.set(prune_task.clone()).await;
184
185 prune_task.clone().start(
186 self_.clone().dag_prune_task(days_rotation),
187 |res| async move {
188 match res {
189 Ok(()) | Err(Error::DetachedTaskStopped) => { }
190 Err(e) => error!(target: "event_graph::_handle_stop", "[EVENTGRAPH] Failed stopping prune task: {e}")
191 }
192 },
193 Error::DetachedTaskStopped,
194 ex.clone(),
195 );
196 }
197
198 Ok(self_)
199 }
200
201 pub fn days_rotation(&self) -> u64 {
202 self.days_rotation
203 }
204
205 pub async fn dag_sync(&self) -> Result<()> {
207 let channels = self.p2p.hosts().peers();
224 let mut communicated_peers = channels.len();
225 info!(
226 target: "event_graph::dag_sync",
227 "[EVENTGRAPH] Syncing DAG from {communicated_peers} peers..."
228 );
229
230 let mut tips: HashMap<blake3::Hash, (u64, usize)> = HashMap::new();
232
233 for channel in channels.iter() {
236 let url = channel.display_address();
237
238 let tip_rep_sub = match channel.subscribe_msg::<TipRep>().await {
239 Ok(v) => v,
240 Err(e) => {
241 error!(
242 target: "event_graph::dag_sync",
243 "[EVENTGRAPH] Sync: Couldn't subscribe TipReq for peer {url}, skipping ({e})"
244 );
245 communicated_peers -= 1;
246 continue
247 }
248 };
249
250 if let Err(e) = channel.send(&TipReq {}).await {
251 error!(
252 target: "event_graph::dag_sync",
253 "[EVENTGRAPH] Sync: Couldn't contact peer {url}, skipping ({e})"
254 );
255 communicated_peers -= 1;
256 continue
257 };
258
259 let outbound_connect_timeout = self
260 .p2p
261 .settings()
262 .read_arc()
263 .await
264 .outbound_connect_timeout(channel.address().scheme());
265 let Ok(peer_tips) = tip_rep_sub.receive_with_timeout(outbound_connect_timeout).await
267 else {
268 error!(
269 target: "event_graph::dag_sync",
270 "[EVENTGRAPH] Sync: Peer {url} didn't reply with tips in time, skipping"
271 );
272 communicated_peers -= 1;
273 continue
274 };
275
276 let peer_tips = &peer_tips.0;
277
278 for (layer, layer_tips) in peer_tips {
280 for tip in layer_tips {
281 if let Some(seen_tip) = tips.get_mut(tip) {
282 seen_tip.1 += 1;
283 } else {
284 tips.insert(*tip, (*layer, 1));
285 }
286 }
287 }
288 }
289
290 if tips.is_empty() {
292 error!(
293 target: "event_graph::dag_sync",
294 "[EVENTGRAPH] Sync: Could not find any DAG tips",
295 );
296 return Err(Error::DagSyncFailed)
297 }
298
299 let consideration_threshold = communicated_peers * 2 / 3;
303 let mut considered_tips = HashSet::new();
304 for (tip, (_, amount)) in tips.iter() {
305 if amount > &consideration_threshold {
306 considered_tips.insert(*tip);
307 }
308 }
309 drop(tips);
310
311 let mut missing_parents = HashSet::new();
313 for tip in considered_tips.iter() {
314 assert!(tip != &NULL_ID);
315
316 if !self.dag.contains_key(tip.as_bytes()).unwrap() {
317 missing_parents.insert(*tip);
318 }
319 }
320
321 if missing_parents.is_empty() {
322 *self.synced.write().await = true;
323 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] DAG synced successfully!");
324 return Ok(())
325 }
326
327 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] Fetching events");
328 let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
329 let mut received_events_hashes = HashSet::new();
330
331 while !missing_parents.is_empty() {
332 let mut found_event = false;
333
334 for channel in channels.iter() {
335 let url = channel.display_address();
336
337 debug!(
338 target: "event_graph::dag_sync",
339 "Requesting {missing_parents:?} from {url}..."
340 );
341
342 let ev_rep_sub = match channel.subscribe_msg::<EventRep>().await {
343 Ok(v) => v,
344 Err(e) => {
345 error!(
346 target: "event_graph::dag_sync",
347 "[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {url}, skipping ({e})"
348 );
349 continue
350 }
351 };
352
353 let request_missing_events = missing_parents.clone().into_iter().collect();
354 if let Err(e) = channel.send(&EventReq(request_missing_events)).await {
355 error!(
356 target: "event_graph::dag_sync",
357 "[EVENTGRAPH] Sync: Failed communicating EventReq({missing_parents:?}) to {url}: {e}"
358 );
359 continue
360 }
361
362 let outbound_connect_timeout = self
363 .p2p
364 .settings()
365 .read_arc()
366 .await
367 .outbound_connect_timeout(channel.address().scheme());
368 let Ok(parent) = ev_rep_sub.receive_with_timeout(outbound_connect_timeout).await
370 else {
371 error!(
372 target: "event_graph::dag_sync",
373 "[EVENTGRAPH] Sync: Timeout waiting for parents {missing_parents:?} from {url}"
374 );
375 continue
376 };
377
378 let parents = parent.0.clone();
379
380 for parent in parents {
381 let parent_id = parent.id();
382 if !missing_parents.contains(&parent_id) {
383 error!(
384 target: "event_graph::dag_sync",
385 "[EVENTGRAPH] Sync: Peer {url} replied with a wrong event: {}",
386 parent.id()
387 );
388 continue
389 }
390
391 debug!(
392 target: "event_graph::dag_sync",
393 "Got correct parent event {parent_id}"
394 );
395
396 if let Some(layer_events) = received_events.get_mut(&parent.layer) {
397 layer_events.push(parent.clone());
398 } else {
399 let layer_events = vec![parent.clone()];
400 received_events.insert(parent.layer, layer_events);
401 }
402 received_events_hashes.insert(parent_id);
403
404 missing_parents.remove(&parent_id);
405 found_event = true;
406
407 for upper_parent in parent.parents.iter() {
409 if upper_parent == &NULL_ID {
410 continue
411 }
412
413 if !missing_parents.contains(upper_parent) &&
414 !received_events_hashes.contains(upper_parent) &&
415 !self.dag.contains_key(upper_parent.as_bytes()).unwrap()
416 {
417 debug!(
418 target: "event_graph::dag_sync",
419 "Found upper missing parent event {upper_parent}"
420 );
421 missing_parents.insert(*upper_parent);
422 }
423 }
424 }
425
426 break
427 }
428
429 if !found_event {
430 error!(
431 target: "event_graph::dag_sync",
432 "[EVENTGRAPH] Sync: Failed to get all events",
433 );
434 return Err(Error::DagSyncFailed)
435 }
436 } let mut events = vec![];
441 for (_, tips) in received_events {
442 for tip in tips {
443 events.push(tip);
444 }
445 }
446 self.dag_insert(&events).await?;
447
448 *self.synced.write().await = true;
449
450 info!(target: "event_graph::dag_sync", "[EVENTGRAPH] DAG synced successfully!");
451 Ok(())
452 }
453
454 async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
456 debug!(target: "event_graph::dag_prune", "Pruning DAG...");
457
458 let mut unreferenced_tips = self.unreferenced_tips.write().await;
464 let mut broadcasted_ids = self.broadcasted_ids.write().await;
465 let mut current_genesis = self.current_genesis.write().await;
466
467 let mut batch = sled::Batch::default();
469 for key in self.dag.iter().keys() {
470 batch.remove(key.unwrap());
471 }
472 batch.insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event).await);
473
474 debug!(target: "event_graph::dag_prune", "Applying batch...");
475 if let Err(e) = self.dag.apply_batch(batch) {
476 panic!("Failed pruning DAG, sled apply_batch error: {e}");
477 }
478
479 *unreferenced_tips = BTreeMap::new();
481 unreferenced_tips.insert(0, HashSet::from([genesis_event.id()]));
482 *current_genesis = genesis_event;
483 *broadcasted_ids = HashSet::new();
484 drop(unreferenced_tips);
485 drop(broadcasted_ids);
486 drop(current_genesis);
487
488 debug!(target: "event_graph::dag_prune", "DAG pruned successfully");
489 Ok(())
490 }
491
492 async fn dag_prune_task(self: Arc<Self>, days_rotation: u64) -> Result<()> {
494 debug!(target: "event_graph::dag_prune_task", "Spawned background DAG pruning task");
499
500 loop {
501 let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
503
504 let current_genesis = Event {
506 timestamp: next_rotation,
507 content: GENESIS_CONTENTS.to_vec(),
508 parents: [NULL_ID; N_EVENT_PARENTS],
509 layer: 0,
510 };
511
512 let s = millis_until_next_rotation(next_rotation);
514
515 debug!(target: "event_graph::dag_prune_task", "Sleeping {s}ms until next DAG prune");
516 msleep(s).await;
517 debug!(target: "event_graph::dag_prune_task", "Rotation period reached");
518
519 self.dag_prune(current_genesis).await?;
521 }
522 }
523
524 pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> {
535 if events.is_empty() {
537 return Ok(vec![])
538 }
539
540 let mut unreferenced_tips = self.unreferenced_tips.write().await;
542 let mut broadcasted_ids = self.broadcasted_ids.write().await;
543
544 let mut ids = Vec::with_capacity(events.len());
546
547 let mut overlay = SledTreeOverlay::new(&self.dag);
549
550 let genesis_timestamp = self.current_genesis.read().await.timestamp;
552
553 for event in events {
556 let event_id = event.id();
557 debug!(
558 target: "event_graph::dag_insert",
559 "Inserting event {event_id} into the DAG"
560 );
561
562 if !event
563 .validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay))
564 .await?
565 {
566 error!(target: "event_graph::dag_insert", "Event {event_id} is invalid!");
567 return Err(Error::EventIsInvalid)
568 }
569
570 let event_se = serialize_async(event).await;
571
572 overlay.insert(event_id.as_bytes(), &event_se)?;
574
575 if self.replay_mode {
576 replayer_log(&self.datastore, "insert".to_owned(), event_se)?;
577 }
578 ids.push(event_id);
580 }
581
582 let batch = overlay.aggregate().unwrap();
584
585 if let Err(e) = self.dag.apply_batch(batch) {
588 panic!("Failed applying dag_insert batch to sled: {e}");
589 }
590
591 for event in events {
594 let event_id = event.id();
595
596 debug!(
598 target: "event_graph::dag_insert",
599 "Event {event_id} parents {:#?}", event.parents,
600 );
601 for parent_id in event.parents.iter() {
602 if parent_id != &NULL_ID {
603 debug!(
604 target: "event_graph::dag_insert",
605 "Removing {parent_id} from unreferenced_tips"
606 );
607
608 for (layer, tips) in unreferenced_tips.iter_mut() {
614 if layer >= &event.layer {
615 continue
616 }
617 tips.remove(parent_id);
618 }
619 broadcasted_ids.insert(*parent_id);
620 }
621 }
622 unreferenced_tips.retain(|_, tips| !tips.is_empty());
623 debug!(
624 target: "event_graph::dag_insert",
625 "Adding {event_id} to unreferenced tips"
626 );
627
628 if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) {
629 layer_tips.insert(event_id);
630 } else {
631 let mut layer_tips = HashSet::new();
632 layer_tips.insert(event_id);
633 unreferenced_tips.insert(event.layer, layer_tips);
634 }
635
636 self.event_pub.notify(event.clone()).await;
638 }
639
640 drop(unreferenced_tips);
642 drop(broadcasted_ids);
643
644 Ok(ids)
645 }
646
647 pub async fn dag_get(&self, event_id: &blake3::Hash) -> Result<Option<Event>> {
649 let Some(bytes) = self.dag.get(event_id.as_bytes())? else { return Ok(None) };
650 let event: Event = deserialize_async(&bytes).await?;
651
652 Ok(Some(event))
653 }
654
655 async fn get_next_layer_with_parents(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
660 let unreferenced_tips = self.unreferenced_tips.read().await;
661
662 let mut parents = [NULL_ID; N_EVENT_PARENTS];
663 let mut index = 0;
664 'outer: for (_, tips) in unreferenced_tips.iter().rev() {
665 for tip in tips.iter() {
666 parents[index] = *tip;
667 index += 1;
668 if index >= N_EVENT_PARENTS {
669 break 'outer
670 }
671 }
672 }
673
674 let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
675
676 assert!(parents.iter().any(|x| x != &NULL_ID));
677 (next_layer, parents)
678 }
679
680 async fn find_unreferenced_tips(&self) -> BTreeMap<u64, HashSet<blake3::Hash>> {
682 let mut tips = HashSet::new();
684 for iter_elem in self.dag.iter() {
685 let (id, _) = iter_elem.unwrap();
686 let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
687 tips.insert(id);
688 }
689
690 for iter_elem in self.dag.iter() {
692 let (_, event) = iter_elem.unwrap();
693 let event: Event = deserialize_async(&event).await.unwrap();
694 for parent in event.parents.iter() {
695 tips.remove(parent);
696 }
697 }
698
699 let mut map: BTreeMap<u64, HashSet<blake3::Hash>> = BTreeMap::new();
701 for tip in tips {
702 let event = self.dag_get(&tip).await.unwrap().unwrap();
703 if let Some(layer_tips) = map.get_mut(&event.layer) {
704 layer_tips.insert(tip);
705 } else {
706 let mut layer_tips = HashSet::new();
707 layer_tips.insert(tip);
708 map.insert(event.layer, layer_tips);
709 }
710 }
711
712 map
713 }
714
715 async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
717 let (_, tips) = self.get_next_layer_with_parents().await;
718
719 let mut sorted: Vec<_> =
721 tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
722 sorted.sort_unstable();
723
724 let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
726 for (i, id) in sorted.iter().enumerate() {
727 let mut bytes = id.to_bytes_be();
728
729 while bytes.len() < blake3::OUT_LEN {
731 bytes.insert(0, 0);
732 }
733
734 tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap());
735 }
736
737 tips_sorted
738 }
739
740 pub async fn order_events(&self) -> Vec<Event> {
742 let mut ordered_events = VecDeque::new();
743 let mut visited = HashSet::new();
744
745 for tip in self.get_unreferenced_tips_sorted().await {
746 if !visited.contains(&tip) && tip != NULL_ID {
747 let tip = self.dag_get(&tip).await.unwrap().unwrap();
748 ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await);
749 }
750 }
751
752 let mut ord_events_vec = ordered_events.make_contiguous().to_vec();
753 ord_events_vec
755 .sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.timestamp.cmp(&a.1.timestamp)));
756
757 ord_events_vec.iter().map(|a| a.1.clone()).collect::<Vec<Event>>()
758 }
759
760 async fn dfs_topological_sort(
763 &self,
764 event: Event,
765 visited: &mut HashSet<blake3::Hash>,
766 ) -> VecDeque<(u64, Event)> {
767 let mut ordered_events = VecDeque::new();
768 let mut stack = VecDeque::new();
769 let event_id = event.id();
770 stack.push_back(event_id);
771
772 while let Some(event_id) = stack.pop_front() {
773 if !visited.contains(&event_id) && event_id != NULL_ID {
774 visited.insert(event_id);
775 if let Some(event) = self.dag_get(&event_id).await.unwrap() {
776 for parent in event.parents.iter() {
777 stack.push_back(*parent);
778 }
779
780 ordered_events.push_back((event.layer, event))
781 }
782 }
783 }
784
785 ordered_events
786 }
787
788 pub async fn deg_enable(&self) {
790 *self.deg_enabled.write().await = true;
791 warn!("[EVENTGRAPH] Graph debugging enabled!");
792 }
793
794 pub async fn deg_disable(&self) {
796 *self.deg_enabled.write().await = false;
797 warn!("[EVENTGRAPH] Graph debugging disabled!");
798 }
799
800 pub async fn deg_subscribe(&self) -> Subscription<DegEvent> {
802 self.deg_publisher.clone().subscribe().await
803 }
804
805 pub async fn deg_notify(&self, event: DegEvent) {
807 self.deg_publisher.notify(event).await;
808 }
809
810 #[cfg(feature = "rpc")]
811 pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult {
812 let mut graph = HashMap::new();
813 for iter_elem in self.dag.iter() {
814 let (id, val) = iter_elem.unwrap();
815 let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
816 let val: Event = deserialize_async(&val).await.unwrap();
817 graph.insert(id, val);
818 }
819
820 let json_graph = graph
821 .into_iter()
822 .map(|(k, v)| {
823 let key = k.to_string();
824 let value = JsonValue::from(v);
825 (key, value)
826 })
827 .collect();
828 let values = json_map([("dag", JsonValue::Object(json_graph))]);
829
830 let result = JsonValue::Object(HashMap::from([("eventgraph_info".to_string(), values)]));
831
832 JsonResponse::new(result, id).into()
833 }
834
835 pub async fn fetch_successors_of(
838 &self,
839 tips: BTreeMap<u64, HashSet<blake3::Hash>>,
840 ) -> Result<Vec<Event>> {
841 debug!(
842 target: "event_graph::fetch_successors_of",
843 "fetching successors of {tips:?}"
844 );
845
846 let mut graph = HashMap::new();
847 for iter_elem in self.dag.iter() {
848 let (id, val) = iter_elem.unwrap();
849 let hash = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
850 let event: Event = deserialize_async(&val).await.unwrap();
851 graph.insert(hash, event);
852 }
853
854 let mut result = vec![];
855
856 'outer: for tip in tips.iter() {
857 for i in tip.1.iter() {
858 if !graph.contains_key(i) {
859 continue 'outer;
860 }
861 }
862
863 for (_, ev) in graph.iter() {
864 if ev.layer > *tip.0 && !result.contains(ev) {
865 result.push(ev.clone())
866 }
867 }
868 }
869
870 result.sort_by(|a, b| a.layer.cmp(&b.layer));
871
872 Ok(result)
873 }
874}