1use std::sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc,
22};
23
24use futures::{stream::FuturesUnordered, TryFutureExt};
25use futures_rustls::rustls::crypto::{ring, CryptoProvider};
26use smol::{fs, lock::RwLock as AsyncRwLock, stream::StreamExt};
27use tracing::{debug, error, warn};
28use url::Url;
29
30use super::{
31 channel::ChannelPtr,
32 dnet::DnetEvent,
33 hosts::{Hosts, HostsPtr},
34 message::{Message, SerializedMessage},
35 protocol::{protocol_registry::ProtocolRegistry, register_default_protocols},
36 session::{
37 DirectSession, DirectSessionPtr, InboundSession, InboundSessionPtr, ManualSession,
38 ManualSessionPtr, OutboundSession, OutboundSessionPtr, RefineSession, RefineSessionPtr,
39 SeedSyncSession, SeedSyncSessionPtr, Session,
40 },
41 settings::Settings,
42};
43use crate::{
44 system::{ExecutorPtr, Publisher, PublisherPtr, Subscription},
45 util::{logger::verbose, path::expand_path},
46 Result,
47};
48
49#[cfg(target_family = "unix")]
50use smol::fs::unix::PermissionsExt;
51
52pub type P2pPtr = Arc<P2p>;
54
55pub struct P2p {
57 executor: ExecutorPtr,
59 hosts: HostsPtr,
61 protocol_registry: ProtocolRegistry,
63 settings: Arc<AsyncRwLock<Settings>>,
65 session_manual: ManualSessionPtr,
67 session_inbound: InboundSessionPtr,
69 session_outbound: OutboundSessionPtr,
71 session_refine: RefineSessionPtr,
73 session_seedsync: SeedSyncSessionPtr,
75 session_direct: DirectSessionPtr,
77 pub dnet_enabled: AtomicBool,
79 dnet_publisher: PublisherPtr<DnetEvent>,
81}
82
83impl P2p {
84 pub async fn new(settings: Settings, executor: ExecutorPtr) -> Result<P2pPtr> {
93 if let Some(ref datastore) = settings.p2p_datastore {
95 let datastore = expand_path(datastore)?;
96 fs::create_dir_all(&datastore).await?;
97 #[cfg(target_family = "unix")]
99 fs::set_permissions(&datastore, PermissionsExt::from_mode(0o700)).await?;
100 }
101
102 let _ = CryptoProvider::install_default(ring::default_provider());
104
105 let settings = Arc::new(AsyncRwLock::new(settings));
107
108 let self_ = Arc::new_cyclic(|p2p| Self {
109 executor,
110 hosts: Hosts::new(Arc::clone(&settings)),
111 protocol_registry: ProtocolRegistry::new(),
112 settings,
113 session_manual: ManualSession::new(p2p.clone()),
114 session_inbound: InboundSession::new(p2p.clone()),
115 session_outbound: OutboundSession::new(p2p.clone()),
116 session_refine: RefineSession::new(p2p.clone()),
117 session_seedsync: SeedSyncSession::new(p2p.clone()),
118 session_direct: DirectSession::new(p2p.clone()),
119 dnet_enabled: AtomicBool::new(false),
120 dnet_publisher: Publisher::new(),
121 });
122
123 register_default_protocols(self_.clone()).await;
124
125 Ok(self_)
126 }
127
128 pub async fn start(self: Arc<Self>) -> Result<()> {
130 debug!(target: "net::p2p::start", "P2P::start() [BEGIN] [magic_bytes={:?}]",
131 self.settings.read().await.magic_bytes.0);
132 verbose!(target: "net::p2p::start", "[P2P] Starting P2P subsystem");
133
134 if let Err(err) = self.session_inbound().start().await {
136 error!(target: "net::p2p::start", "Failed to start inbound session!: {err}");
137 return Err(err)
138 }
139
140 self.session_manual().start().await;
142
143 self.session_seedsync().start().await;
146
147 self.session_outbound().start().await;
149
150 self.session_refine().start().await;
152
153 self.session_direct().start().await;
155
156 verbose!(target: "net::p2p::start", "[P2P] P2P subsystem started successfully");
157 Ok(())
158 }
159
160 pub async fn seed(self: Arc<Self>) {
162 debug!(target: "net::p2p::seed", "P2P::seed() [BEGIN]");
163
164 self.session_seedsync().notify().await;
166
167 debug!(target: "net::p2p::seed", "P2P::seed() [END]");
168 }
169
170 pub async fn stop(&self) {
172 self.session_manual().stop().await;
174 self.session_inbound().stop().await;
175 self.session_seedsync().stop().await;
176 self.session_outbound().stop().await;
177 self.session_refine().stop().await;
178 self.session_direct().stop().await;
179 }
180
181 pub async fn broadcast<M: Message>(&self, message: &M) {
183 self.broadcast_with_exclude(message, &[]).await
184 }
185
186 pub async fn broadcast_with_exclude<M: Message>(&self, message: &M, exclude_list: &[Url]) {
189 let mut channels = Vec::new();
190 for channel in self.hosts().peers() {
191 if exclude_list.contains(channel.address()) {
192 continue
193 }
194 channels.push(channel);
195 }
196 self.broadcast_to(message, &channels).await
197 }
198
199 pub async fn broadcast_to<M: Message>(&self, message: &M, channel_list: &[ChannelPtr]) {
201 if channel_list.is_empty() {
202 warn!(target: "net::p2p::broadcast", "[P2P] No connected channels found for broadcast");
203 return
204 }
205
206 let message = SerializedMessage::new(message).await;
208
209 self.executor.spawn(broadcast_serialized_to::<M>(message, channel_list.to_vec())).detach();
212 }
213
214 pub fn is_connected(&self) -> bool {
217 !self.hosts().peers().is_empty()
218 }
219
220 pub fn peers_count(&self) -> usize {
222 self.hosts().peers().len()
223 }
224
225 pub fn settings(&self) -> Arc<AsyncRwLock<Settings>> {
227 Arc::clone(&self.settings)
228 }
229
230 pub async fn reload(self: Arc<Self>) {
241 self.session_manual().reload().await;
242 self.session_inbound().reload().await;
243 self.session_outbound().reload().await;
244 self.session_refine().reload().await;
245 self.session_seedsync().reload().await;
246 self.session_direct().reload().await;
247
248 debug!(target: "net::p2p::reload", "P2P settings reloaded successfully");
249 }
250
251 pub fn hosts(&self) -> HostsPtr {
253 self.hosts.clone()
254 }
255
256 pub fn executor(&self) -> ExecutorPtr {
258 self.executor.clone()
259 }
260
261 pub fn protocol_registry(&self) -> &ProtocolRegistry {
263 &self.protocol_registry
264 }
265
266 pub fn session_manual(&self) -> ManualSessionPtr {
268 self.session_manual.clone()
269 }
270
271 pub fn session_inbound(&self) -> InboundSessionPtr {
273 self.session_inbound.clone()
274 }
275
276 pub fn session_outbound(&self) -> OutboundSessionPtr {
278 self.session_outbound.clone()
279 }
280
281 pub fn session_refine(&self) -> RefineSessionPtr {
283 self.session_refine.clone()
284 }
285
286 pub fn session_seedsync(&self) -> SeedSyncSessionPtr {
288 self.session_seedsync.clone()
289 }
290
291 pub fn session_direct(&self) -> DirectSessionPtr {
293 self.session_direct.clone()
294 }
295
296 pub fn dnet_enable(&self) {
298 self.dnet_enabled.store(true, Ordering::SeqCst);
299 warn!("[P2P] Network debugging enabled!");
300 }
301
302 pub fn dnet_disable(&self) {
304 self.dnet_enabled.store(false, Ordering::SeqCst);
305 warn!("[P2P] Network debugging disabled!");
306 }
307
308 pub async fn dnet_subscribe(&self) -> Subscription<DnetEvent> {
310 self.dnet_publisher.clone().subscribe().await
311 }
312
313 pub(super) async fn dnet_notify(&self, event: DnetEvent) {
315 self.dnet_publisher.notify(event).await;
316 }
317
318 pub fn get_channel(&self, id: u32) -> Option<ChannelPtr> {
320 self.hosts.get_channel(id)
321 }
322}
323
324async fn broadcast_serialized_to<M: Message>(
326 message: SerializedMessage,
327 channel_list: Vec<ChannelPtr>,
328) {
329 let futures = FuturesUnordered::new();
330
331 for channel in &channel_list {
332 futures.push(
333 channel
334 .send_serialized(&message, &M::METERING_SCORE, &M::METERING_CONFIGURATION)
335 .map_err(|e| {
336 error!(
337 target: "net::p2p::broadcast",
338 "[P2P] Broadcasting message to {} failed: {e}",
339 channel.display_address()
340 );
341 assert!(channel.is_stopped());
344 }),
345 );
346 }
347
348 let _results: Vec<_> = futures.collect().await;
349}