darkfi/net/
p2p.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::{
20    atomic::{AtomicBool, Ordering},
21    Arc,
22};
23
24use futures::{stream::FuturesUnordered, TryFutureExt};
25use futures_rustls::rustls::crypto::{ring, CryptoProvider};
26use smol::{fs, lock::RwLock as AsyncRwLock, stream::StreamExt};
27use tracing::{debug, error, warn};
28use url::Url;
29
30use super::{
31    channel::ChannelPtr,
32    dnet::DnetEvent,
33    hosts::{Hosts, HostsPtr},
34    message::{Message, SerializedMessage},
35    protocol::{protocol_registry::ProtocolRegistry, register_default_protocols},
36    session::{
37        DirectSession, DirectSessionPtr, InboundSession, InboundSessionPtr, ManualSession,
38        ManualSessionPtr, OutboundSession, OutboundSessionPtr, RefineSession, RefineSessionPtr,
39        SeedSyncSession, SeedSyncSessionPtr, Session,
40    },
41    settings::Settings,
42};
43use crate::{
44    system::{ExecutorPtr, Publisher, PublisherPtr, Subscription},
45    util::{logger::verbose, path::expand_path},
46    Result,
47};
48
49#[cfg(target_family = "unix")]
50use smol::fs::unix::PermissionsExt;
51
52/// Atomic pointer to the p2p interface
53pub type P2pPtr = Arc<P2p>;
54
55/// Toplevel peer-to-peer networking interface
56pub struct P2p {
57    /// Global multithreaded executor reference
58    executor: ExecutorPtr,
59    /// Known hosts (peers)
60    hosts: HostsPtr,
61    /// Protocol registry
62    protocol_registry: ProtocolRegistry,
63    /// P2P network settings
64    settings: Arc<AsyncRwLock<Settings>>,
65    /// Reference to configured [`ManualSession`]
66    session_manual: ManualSessionPtr,
67    /// Reference to configured [`InboundSession`]
68    session_inbound: InboundSessionPtr,
69    /// Reference to configured [`OutboundSession`]
70    session_outbound: OutboundSessionPtr,
71    /// Reference to configured [`RefineSession`]
72    session_refine: RefineSessionPtr,
73    /// Reference to configured [`SeedSyncSession`]
74    session_seedsync: SeedSyncSessionPtr,
75    /// Reference to configured [`DirectSession`]
76    session_direct: DirectSessionPtr,
77    /// Enable network debugging
78    pub dnet_enabled: AtomicBool,
79    /// The publisher for which we can give dnet info over
80    dnet_publisher: PublisherPtr<DnetEvent>,
81}
82
83impl P2p {
84    /// Initialize a new p2p network.
85    ///
86    /// Initializes all sessions and protocols. Adds the protocols to the protocol
87    /// registry, along with a bitflag session selector that includes or excludes
88    /// sessions from seed, version, and address protocols.
89    ///
90    /// Creates a weak pointer to self that is used by all sessions to access the
91    /// p2p parent class.
92    pub async fn new(settings: Settings, executor: ExecutorPtr) -> Result<P2pPtr> {
93        // Create the datastore
94        if let Some(ref datastore) = settings.p2p_datastore {
95            let datastore = expand_path(datastore)?;
96            fs::create_dir_all(&datastore).await?;
97            // Windows only has readonly so don't worry about it
98            #[cfg(target_family = "unix")]
99            fs::set_permissions(&datastore, PermissionsExt::from_mode(0o700)).await?;
100        }
101
102        // Register a CryptoProvider for rustls
103        let _ = CryptoProvider::install_default(ring::default_provider());
104
105        // Wrap the Settings into an Arc<RwLock>
106        let settings = Arc::new(AsyncRwLock::new(settings));
107
108        let self_ = Arc::new_cyclic(|p2p| Self {
109            executor,
110            hosts: Hosts::new(Arc::clone(&settings)),
111            protocol_registry: ProtocolRegistry::new(),
112            settings,
113            session_manual: ManualSession::new(p2p.clone()),
114            session_inbound: InboundSession::new(p2p.clone()),
115            session_outbound: OutboundSession::new(p2p.clone()),
116            session_refine: RefineSession::new(p2p.clone()),
117            session_seedsync: SeedSyncSession::new(p2p.clone()),
118            session_direct: DirectSession::new(p2p.clone()),
119            dnet_enabled: AtomicBool::new(false),
120            dnet_publisher: Publisher::new(),
121        });
122
123        register_default_protocols(self_.clone()).await;
124
125        Ok(self_)
126    }
127
128    /// Starts inbound, outbound, and manual sessions.
129    pub async fn start(self: Arc<Self>) -> Result<()> {
130        debug!(target: "net::p2p::start", "P2P::start() [BEGIN] [magic_bytes={:?}]",
131               self.settings.read().await.magic_bytes.0);
132        verbose!(target: "net::p2p::start", "[P2P] Starting P2P subsystem");
133
134        // Start the inbound session
135        if let Err(err) = self.session_inbound().start().await {
136            error!(target: "net::p2p::start", "Failed to start inbound session!: {err}");
137            return Err(err)
138        }
139
140        // Start the manual session
141        self.session_manual().start().await;
142
143        // Start the seedsync session. Seed connections will not
144        // activate yet- they wait for a call to notify().
145        self.session_seedsync().start().await;
146
147        // Start the outbound session
148        self.session_outbound().start().await;
149
150        // Start the refine session
151        self.session_refine().start().await;
152
153        // Start the direct session
154        self.session_direct().start().await;
155
156        verbose!(target: "net::p2p::start", "[P2P] P2P subsystem started successfully");
157        Ok(())
158    }
159
160    /// Reseed the P2P network.
161    pub async fn seed(self: Arc<Self>) {
162        debug!(target: "net::p2p::seed", "P2P::seed() [BEGIN]");
163
164        // Activate the seed session.
165        self.session_seedsync().notify().await;
166
167        debug!(target: "net::p2p::seed", "P2P::seed() [END]");
168    }
169
170    /// Stop the running P2P subsystem
171    pub async fn stop(&self) {
172        // Stop the sessions
173        self.session_manual().stop().await;
174        self.session_inbound().stop().await;
175        self.session_seedsync().stop().await;
176        self.session_outbound().stop().await;
177        self.session_refine().stop().await;
178        self.session_direct().stop().await;
179    }
180
181    /// Broadcasts a message concurrently across all active peers.
182    pub async fn broadcast<M: Message>(&self, message: &M) {
183        self.broadcast_with_exclude(message, &[]).await
184    }
185
186    /// Broadcasts a message concurrently across active peers, excluding
187    /// the ones provided in `exclude_list`.
188    pub async fn broadcast_with_exclude<M: Message>(&self, message: &M, exclude_list: &[Url]) {
189        let mut channels = Vec::new();
190        for channel in self.hosts().peers() {
191            if exclude_list.contains(channel.address()) {
192                continue
193            }
194            channels.push(channel);
195        }
196        self.broadcast_to(message, &channels).await
197    }
198
199    /// Broadcast a message concurrently to all given peers.
200    pub async fn broadcast_to<M: Message>(&self, message: &M, channel_list: &[ChannelPtr]) {
201        if channel_list.is_empty() {
202            warn!(target: "net::p2p::broadcast", "[P2P] No connected channels found for broadcast");
203            return
204        }
205
206        // Serialize the provided message
207        let message = SerializedMessage::new(message).await;
208
209        // Spawn a detached task to actually send the message to the channels,
210        // so we don't block wiating channels that are rate limited.
211        self.executor.spawn(broadcast_serialized_to::<M>(message, channel_list.to_vec())).detach();
212    }
213
214    /// Check whether this node has connections to any peers. This method will
215    /// not report seedsync or refinery connections.
216    pub fn is_connected(&self) -> bool {
217        !self.hosts().peers().is_empty()
218    }
219
220    /// The number of connected peers. This means channels which are not seed or refine.
221    pub fn peers_count(&self) -> usize {
222        self.hosts().peers().len()
223    }
224
225    /// Return an atomic pointer to the set network settings
226    pub fn settings(&self) -> Arc<AsyncRwLock<Settings>> {
227        Arc::clone(&self.settings)
228    }
229
230    /// Reload settings and apply any changes to the running P2P subsystem.
231    ///
232    /// Users should modify settings through the settings lock, then call this
233    /// method to apply the changes:
234    /// ```rust
235    /// let mut settings = p2p.settings().write().await;
236    /// settings.outbound_connections = new_value;
237    /// drop(settings);
238    /// p2p.reload().await;
239    /// ```
240    pub async fn reload(self: Arc<Self>) {
241        self.session_manual().reload().await;
242        self.session_inbound().reload().await;
243        self.session_outbound().reload().await;
244        self.session_refine().reload().await;
245        self.session_seedsync().reload().await;
246        self.session_direct().reload().await;
247
248        debug!(target: "net::p2p::reload", "P2P settings reloaded successfully");
249    }
250
251    /// Return an atomic pointer to the list of hosts
252    pub fn hosts(&self) -> HostsPtr {
253        self.hosts.clone()
254    }
255
256    /// Reference the global executor
257    pub fn executor(&self) -> ExecutorPtr {
258        self.executor.clone()
259    }
260
261    /// Return a reference to the internal protocol registry
262    pub fn protocol_registry(&self) -> &ProtocolRegistry {
263        &self.protocol_registry
264    }
265
266    /// Get pointer to manual session
267    pub fn session_manual(&self) -> ManualSessionPtr {
268        self.session_manual.clone()
269    }
270
271    /// Get pointer to inbound session
272    pub fn session_inbound(&self) -> InboundSessionPtr {
273        self.session_inbound.clone()
274    }
275
276    /// Get pointer to outbound session
277    pub fn session_outbound(&self) -> OutboundSessionPtr {
278        self.session_outbound.clone()
279    }
280
281    /// Get pointer to refine session
282    pub fn session_refine(&self) -> RefineSessionPtr {
283        self.session_refine.clone()
284    }
285
286    /// Get pointer to seedsync session
287    pub fn session_seedsync(&self) -> SeedSyncSessionPtr {
288        self.session_seedsync.clone()
289    }
290
291    /// Get pointer to direct session
292    pub fn session_direct(&self) -> DirectSessionPtr {
293        self.session_direct.clone()
294    }
295
296    /// Enable network debugging
297    pub fn dnet_enable(&self) {
298        self.dnet_enabled.store(true, Ordering::SeqCst);
299        warn!("[P2P] Network debugging enabled!");
300    }
301
302    /// Disable network debugging
303    pub fn dnet_disable(&self) {
304        self.dnet_enabled.store(false, Ordering::SeqCst);
305        warn!("[P2P] Network debugging disabled!");
306    }
307
308    /// Subscribe to dnet events
309    pub async fn dnet_subscribe(&self) -> Subscription<DnetEvent> {
310        self.dnet_publisher.clone().subscribe().await
311    }
312
313    /// Send a dnet notification over the publisher
314    pub(super) async fn dnet_notify(&self, event: DnetEvent) {
315        self.dnet_publisher.notify(event).await;
316    }
317
318    /// Grab the channel pointer of provided channel ID, if it exists.
319    pub fn get_channel(&self, id: u32) -> Option<ChannelPtr> {
320        self.hosts.get_channel(id)
321    }
322}
323
324/// Auxiliary function to broadcast a serialized message concurrently to all given peers.
325async fn broadcast_serialized_to<M: Message>(
326    message: SerializedMessage,
327    channel_list: Vec<ChannelPtr>,
328) {
329    let futures = FuturesUnordered::new();
330
331    for channel in &channel_list {
332        futures.push(
333            channel
334                .send_serialized(&message, &M::METERING_SCORE, &M::METERING_CONFIGURATION)
335                .map_err(|e| {
336                    error!(
337                        target: "net::p2p::broadcast",
338                        "[P2P] Broadcasting message to {} failed: {e}",
339                        channel.display_address()
340                    );
341                    // If the channel is stopped then it should automatically die
342                    // and the session will remove it from p2p.
343                    assert!(channel.is_stopped());
344                }),
345        );
346    }
347
348    let _results: Vec<_> = futures.collect().await;
349}