darkfi/net/session/
outbound_session.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Outbound connections session. Manages the creation of outbound sessions.
20//! Used to create an outbound session and to stop and start the session.
21//!
22//! Class consists of a weak pointer to the p2p interface and a vector of
23//! outbound connection slots. Using a weak pointer to p2p allows us to
24//! avoid circular dependencies. The vector of slots is wrapped in a mutex
25//! lock. This is switched on every time we instantiate a connection slot
26//! and insures that no other part of the program uses the slots at the
27//! same time.
28
29use std::{
30    sync::{
31        atomic::{AtomicU32, Ordering},
32        Arc, Weak,
33    },
34    time::{Duration, Instant},
35};
36
37use async_trait::async_trait;
38use futures::stream::{FuturesUnordered, StreamExt};
39use smol::lock::Mutex;
40use tracing::{debug, error, info, warn};
41use url::Url;
42
43use super::{
44    super::{
45        channel::ChannelPtr,
46        connector::Connector,
47        dnet::{self, dnetev, DnetEvent},
48        hosts::{HostColor, HostState},
49        message::GetAddrsMessage,
50        p2p::{P2p, P2pPtr},
51    },
52    Session, SessionBitFlag, SESSION_OUTBOUND,
53};
54use crate::{
55    system::{sleep, timeout::timeout, CondVar, StoppableTask, StoppableTaskPtr},
56    util::logger::verbose,
57    Error, Result,
58};
59
60pub type OutboundSessionPtr = Arc<OutboundSession>;
61
62/// Defines outbound connections session.
63pub struct OutboundSession {
64    /// Weak pointer to parent p2p object
65    pub(in crate::net) p2p: Weak<P2p>,
66    /// Outbound connection slots
67    slots: Mutex<Vec<Arc<Slot>>>,
68    /// Peer discovery task
69    peer_discovery: Arc<PeerDiscovery>,
70}
71
72impl OutboundSession {
73    /// Create a new outbound session.
74    pub(crate) fn new(p2p: Weak<P2p>) -> OutboundSessionPtr {
75        Arc::new_cyclic(|session| Self {
76            p2p,
77            slots: Mutex::new(Vec::new()),
78            peer_discovery: PeerDiscovery::new(session.clone()),
79        })
80    }
81
82    /// Start the outbound session. Runs the channel connect loop.
83    pub(crate) async fn start(self: Arc<Self>) {
84        let n_slots = self.p2p().settings().read().await.outbound_connections;
85        verbose!(target: "net::outbound_session", "[P2P] Starting {n_slots} outbound connection slots.");
86
87        // Activate mutex lock on connection slots.
88        let mut slots = self.slots.lock().await;
89
90        let mut futures = FuturesUnordered::new();
91
92        let self_ = Arc::downgrade(&self);
93
94        for i in 0..n_slots as u32 {
95            let slot = Slot::new(self_.clone(), i);
96            futures.push(slot.clone().start());
97            slots.push(slot);
98        }
99
100        while (futures.next().await).is_some() {}
101
102        self.peer_discovery.clone().start().await;
103    }
104
105    /// Stops the outbound session.
106    pub(crate) async fn stop(&self) {
107        debug!(target: "net::outbound_session", "Stopping outbound session..");
108        let slots = &*self.slots.lock().await;
109        let mut futures = FuturesUnordered::new();
110
111        for slot in slots {
112            futures.push(slot.clone().stop());
113        }
114
115        while (futures.next().await).is_some() {}
116
117        self.peer_discovery.clone().stop().await;
118        debug!(target: "net::outbound_session", "Outbound session stopped!");
119    }
120
121    pub async fn slot_info(&self) -> Vec<u32> {
122        let mut info = Vec::new();
123        let slots = &*self.slots.lock().await;
124        for slot in slots {
125            info.push(slot.channel_id.load(Ordering::Relaxed));
126        }
127        info
128    }
129
130    fn wakeup_peer_discovery(&self) {
131        self.peer_discovery.notify()
132    }
133
134    async fn wakeup_slots(&self) {
135        let slots = &*self.slots.lock().await;
136        for slot in slots {
137            slot.notify();
138        }
139    }
140
141    /// Sets the number of outbound connections.
142    /// If the number is less than the current, then it will first drop empty slots.
143    async fn set_outbound_connections(self: Arc<Self>, n: usize) {
144        // Guaranteed to be correct since slots is locked for the duration of this method.
145        let mut slots = self.slots.lock().await;
146        let slots_len = slots.len();
147
148        if n > slots_len {
149            self.clone().add_slots(&mut slots, n).await;
150        } else if n < slots_len {
151            self.remove_slots(&mut slots, n).await;
152        }
153        // Do nothing when n == current
154    }
155
156    async fn add_slots(self: Arc<Self>, slots: &mut Vec<Arc<Slot>>, target: usize) {
157        let slots_len = slots.len();
158        let self_ = Arc::downgrade(&self);
159        for i in slots_len..target {
160            let slot = Slot::new(self_.clone(), i as u32);
161            slot.clone().start().await;
162            slots.push(slot);
163        }
164        info!(target: "net::outbound_session",
165            "[P2P] Increased outbound slots from {slots_len} to {target}");
166    }
167
168    /// Prefers to first remove empty slots.
169    async fn remove_slots(&self, slots: &mut Vec<Arc<Slot>>, target: usize) {
170        let slots_len = slots.len();
171        let num_to_remove = slots_len - target;
172        let mut removed = 0;
173
174        // First pass: remove empty slots (channel_id == 0)
175        let mut i = 0;
176        while i < slots.len() && removed < num_to_remove {
177            // Skip connected slots
178            if slots[i].channel_id.load(Ordering::Relaxed) != 0 {
179                i += 1;
180                continue
181            }
182
183            // Disconnect empty slots
184            let slot = slots.remove(i);
185            slot.stop().await;
186            removed += 1;
187        }
188
189        // Second pass: remove remaining slots (connected ones)
190        while removed < num_to_remove && !slots.is_empty() {
191            let slot = slots.remove(0);
192            slot.stop().await;
193            removed += 1;
194        }
195
196        info!(target: "net::outbound_session",
197            "[P2P] Decreased outbound slots from {slots_len} to {target}");
198    }
199}
200
201#[async_trait]
202impl Session for OutboundSession {
203    fn p2p(&self) -> P2pPtr {
204        self.p2p.upgrade().unwrap()
205    }
206
207    fn type_id(&self) -> SessionBitFlag {
208        SESSION_OUTBOUND
209    }
210
211    async fn reload(self: Arc<Self>) {
212        let outbound_connections = self.p2p().settings().read().await.outbound_connections;
213        self.set_outbound_connections(outbound_connections).await;
214    }
215}
216
217struct Slot {
218    slot: u32,
219    process: StoppableTaskPtr,
220    wakeup_self: CondVar,
221    session: Weak<OutboundSession>,
222    connector: Connector,
223    // For debugging
224    channel_id: AtomicU32,
225}
226
227impl Slot {
228    fn new(session: Weak<OutboundSession>, slot: u32) -> Arc<Self> {
229        let settings = session.upgrade().unwrap().p2p().settings();
230
231        Arc::new(Self {
232            slot,
233            process: StoppableTask::new(),
234            wakeup_self: CondVar::new(),
235            session: session.clone(),
236            connector: Connector::new(settings, session),
237            channel_id: AtomicU32::new(0),
238        })
239    }
240
241    async fn start(self: Arc<Self>) {
242        let ex = self.p2p().executor();
243
244        self.process.clone().start(
245            self.run(),
246            |res| async {
247                match res {
248                    Ok(()) | Err(Error::NetworkServiceStopped) => {}
249                    Err(e) => error!("net::outbound_session {e}"),
250                }
251            },
252            Error::NetworkServiceStopped,
253            ex,
254        );
255    }
256
257    async fn stop(self: Arc<Self>) {
258        self.connector.stop();
259        self.process.stop().await;
260    }
261
262    /// Address selection algorithm that works as follows: up to
263    /// gold_count, select from the goldlist. Up to white_count,
264    /// select from the whitelist. For all other slots, select from
265    /// the greylist. If none of these preferences are satisfied, do
266    /// peer discovery.
267    ///
268    /// Selecting from the greylist for some % of the slots is necessary
269    /// and healthy since we require the network retains some unreliable
270    /// connections. A network that purely favors uptime over unreliable
271    /// connections may be vulnerable to sybil by attackers with good uptime.
272    async fn fetch_addrs(&self) -> Option<(Url, u64)> {
273        let hosts = self.p2p().hosts();
274        let slot = self.slot as usize;
275        let container = &self.p2p().hosts().container;
276
277        // Acquire Settings read lock
278        let settings = self.p2p().settings().read_arc().await;
279
280        let white_count = (settings.white_connect_percent * settings.outbound_connections) / 100;
281        let gold_count = settings.gold_connect_count;
282
283        let transports = settings.active_profiles.clone();
284        let preference_strict = settings.slot_preference_strict;
285
286        // Drop Settings read lock
287        drop(settings);
288
289        let grey_only = hosts.container.is_empty(HostColor::White) &&
290            hosts.container.is_empty(HostColor::Gold) &&
291            !hosts.container.is_empty(HostColor::Grey);
292
293        // If we only have grey entries, select from the greylist. Otherwise,
294        // use the preference defined in settings.
295        let addrs = if grey_only && !preference_strict {
296            container.fetch_with_schemes(HostColor::Grey as usize, &transports, None)
297        } else if slot < gold_count {
298            container.fetch_with_schemes(HostColor::Gold as usize, &transports, None)
299        } else if slot < white_count {
300            container.fetch_with_schemes(HostColor::White as usize, &transports, None)
301        } else {
302            container.fetch_with_schemes(HostColor::Grey as usize, &transports, None)
303        };
304
305        hosts.check_addrs(addrs).await
306    }
307
308    // We first try to make connections to the addresses on our gold list. We then find some
309    // whitelist connections according to the whitelist percent default. Finally, any remaining
310    // connections we make from the greylist.
311    async fn run(self: Arc<Self>) -> Result<()> {
312        let hosts = self.p2p().hosts();
313
314        loop {
315            // Activate the slot
316            debug!(
317                target: "net::outbound_session::try_connect",
318                "[P2P] Finding a host to connect to for outbound slot #{}",
319                self.slot,
320            );
321
322            // Do peer discovery if we don't have any peers on the Grey, White or Gold list
323            // (first time connecting to the network).
324            if hosts.container.is_empty(HostColor::Grey) &&
325                hosts.container.is_empty(HostColor::White) &&
326                hosts.container.is_empty(HostColor::Gold)
327            {
328                dnetev!(self, OutboundSlotSleeping, {
329                    slot: self.slot,
330                });
331
332                self.wakeup_self.reset();
333                // Peer discovery
334                self.session().wakeup_peer_discovery();
335                // Wait to be woken up by peer discovery
336                self.wakeup_self.wait().await;
337
338                continue
339            }
340
341            let addr = if let Some(addr) = self.fetch_addrs().await {
342                debug!(target: "net::outbound_session::run", "Fetched addr={}, slot #{}", addr.0,
343                self.slot);
344                addr
345            } else {
346                debug!(target: "net::outbound_session::run", "No address found! Activating peer discovery...");
347                dnetev!(self, OutboundSlotSleeping, {
348                    slot: self.slot,
349                });
350
351                self.wakeup_self.reset();
352                // Peer discovery
353                self.session().wakeup_peer_discovery();
354                // Wait to be woken up by peer discovery
355                self.wakeup_self.wait().await;
356
357                continue
358            };
359
360            let host = addr.0;
361            let last_seen = addr.1;
362            let slot = self.slot;
363
364            verbose!(
365                target: "net::outbound_session::try_connect",
366                "[P2P] Connecting outbound slot #{slot} [{host}]"
367            );
368
369            dnetev!(self, OutboundSlotConnecting, {
370                slot,
371                addr: host.clone(),
372            });
373
374            let (_, channel) = match self.try_connect(host.clone(), last_seen).await {
375                Ok(connect_info) => connect_info,
376                Err(err) => {
377                    debug!(
378                        target: "net::outbound_session::try_connect",
379                        "[P2P] Outbound slot #{slot} connection failed: {err}"
380                    );
381
382                    dnetev!(self, OutboundSlotDisconnected, {
383                        slot,
384                        err: err.to_string()
385                    });
386
387                    self.channel_id.store(0, Ordering::Relaxed);
388
389                    continue
390                }
391            };
392
393            // At this point we've managed to connect.
394            let stop_sub = channel.subscribe_stop().await?;
395
396            verbose!(
397                target: "net::outbound_session::try_connect",
398                "[P2P] Outbound slot #{slot} connected [{}]",
399                channel.display_address()
400            );
401
402            dnetev!(self, OutboundSlotConnected, {
403                slot: self.slot,
404                addr: channel.display_address().clone(),
405                channel_id: channel.info.id
406            });
407
408            // Setup new channel
409            if let Err(err) =
410                self.session().register_channel(channel.clone(), self.p2p().executor()).await
411            {
412                verbose!(
413                    target: "net::outbound_session",
414                    "[P2P] Outbound slot #{slot} disconnected: {err}"
415                );
416
417                dnetev!(self, OutboundSlotDisconnected, {
418                    slot: self.slot,
419                    err: err.to_string()
420                });
421
422                self.channel_id.store(0, Ordering::Relaxed);
423
424                warn!(
425                    target: "net::outbound_session::try_connect",
426                    "[P2P] Suspending addr=[{}] slot #{slot}",
427                    channel.display_address()
428                );
429
430                // Peer disconnected during the registry process. We'll downgrade this peer now.
431                if let Err(e) = self
432                    .p2p()
433                    .hosts()
434                    .move_host(channel.address(), last_seen, HostColor::Grey)
435                    .await
436                {
437                    warn!(target: "net::outbound_session", "Error while moving addr={} to greylist: {e}", channel.display_address());
438                    continue
439                }
440
441                // Mark its state as Suspend, which sends this node to the Refinery for processing.
442                if let Err(e) =
443                    self.p2p().hosts().try_register(channel.address().clone(), HostState::Suspend)
444                {
445                    warn!(target: "net::outbound_session", "Error while suspending addr={}: {e}", channel.display_address());
446                }
447
448                continue
449            }
450
451            self.channel_id.store(channel.info.id, Ordering::Relaxed);
452
453            // Wait for channel to close
454            stop_sub.receive().await;
455
456            self.channel_id.store(0, Ordering::Relaxed);
457        }
458    }
459
460    /// Start making an outbound connection, using provided [`Connector`].
461    /// Tries to find a valid address to connect to, otherwise does peer
462    /// discovery. The peer discovery loops until some peer we can connect
463    /// to is found. Once connected, registers the channel, removes it from
464    /// the list of pending channels, and starts sending messages across the
465    /// channel. In case of any failures, a network error is returned and the
466    /// main connect loop (parent of this function) will iterate again.
467    async fn try_connect(&self, addr: Url, last_seen: u64) -> Result<(Url, ChannelPtr)> {
468        match self.connector.connect(&addr).await {
469            Ok((addr_final, channel)) => Ok((addr_final, channel)),
470
471            Err(err) => {
472                verbose!(
473                    target: "net::outbound_session::try_connect",
474                    "[P2P] Unable to connect outbound slot #{} {err}",
475                    self.slot
476                );
477
478                // Immediately return if the Connector has stopped.
479                // This indicates a shutdown of the P2P network and
480                // should not result in hostlist modifications.
481                if let Error::ConnectorStopped(message) = err {
482                    return Err(Error::ConnectFailed(message));
483                }
484
485                // At this point we failed to connect. We'll downgrade this peer now.
486                self.p2p().hosts().move_host(&addr, last_seen, HostColor::Grey).await?;
487
488                // Mark its state as Suspend, which sends it to the Refinery for processing.
489                if let Err(e) = self.p2p().hosts().try_register(addr.clone(), HostState::Suspend) {
490                    warn!(target: "net::outbound_session::try_connect", "Error while suspending addr={addr}: {e}");
491                }
492
493                // Notify that channel processing failed
494                self.p2p().hosts().channel_publisher.notify(Err(err.clone())).await;
495
496                Err(err)
497            }
498        }
499    }
500
501    fn notify(&self) {
502        self.wakeup_self.notify()
503    }
504
505    fn session(&self) -> OutboundSessionPtr {
506        self.session.upgrade().unwrap()
507    }
508    fn p2p(&self) -> P2pPtr {
509        self.session().p2p()
510    }
511}
512
513/// Defines a common interface for multiple peer discovery processes.
514///
515/// NOTE: Currently only one Peer Discovery implementation exists. Making
516/// Peer Discovery generic enables us to support network swarming, since
517/// the peer discovery process will differ depending on whether it occurs
518/// on the overlay network or a subnet.
519#[async_trait]
520pub trait PeerDiscoveryBase {
521    async fn start(self: Arc<Self>);
522
523    async fn stop(self: Arc<Self>);
524
525    async fn run(self: Arc<Self>);
526
527    async fn wait(&self) -> bool;
528
529    fn notify(&self);
530
531    fn session(&self) -> OutboundSessionPtr;
532
533    fn p2p(&self) -> P2pPtr;
534}
535
536/// Main PeerDiscovery process that loops through connected peers
537/// and sends out a `GetAddrs` when it is active. If there are no
538/// connected peers after two attempts, connect to our seed nodes
539/// and perform `SeedSyncSession`.
540struct PeerDiscovery {
541    process: StoppableTaskPtr,
542    wakeup_self: CondVar,
543    session: Weak<OutboundSession>,
544}
545
546impl PeerDiscovery {
547    fn new(session: Weak<OutboundSession>) -> Arc<Self> {
548        Arc::new(Self { process: StoppableTask::new(), wakeup_self: CondVar::new(), session })
549    }
550}
551
552#[async_trait]
553impl PeerDiscoveryBase for PeerDiscovery {
554    async fn start(self: Arc<Self>) {
555        let ex = self.p2p().executor();
556        self.process.clone().start(
557            async move {
558                self.run().await;
559                unreachable!();
560            },
561            // Ignore stop handler
562            |_| async {},
563            Error::NetworkServiceStopped,
564            ex,
565        );
566    }
567    async fn stop(self: Arc<Self>) {
568        self.process.stop().await;
569    }
570
571    /// Activate peer discovery if not active already. For the first two
572    /// attempts, this will loop through all connected P2P peers and send
573    /// out a `GetAddrs` message to request more peers. Other parts of the
574    /// P2P stack will then handle the incoming addresses and place them in
575    /// the hosts list.  
576    ///
577    /// On the third attempt, and if we still haven't made any connections,
578    /// this function will then call `p2p.seed()` which triggers a
579    /// `SeedSyncSession` that will connect to configured seeds and request
580    /// peers from them.
581    ///
582    /// This function will also sleep `outbound_peer_discovery_attempt_time`
583    /// seconds after broadcasting in order to let the P2P stack receive and
584    /// work through the addresses it is expecting.
585    async fn run(self: Arc<Self>) {
586        let mut current_attempt = 0;
587        loop {
588            dnetev!(self, OutboundPeerDiscovery, {
589                attempt: current_attempt,
590                state: "wait",
591            });
592
593            // wait to be woken up by notify()
594            let sleep_was_instant = self.wait().await;
595
596            // Read the current P2P settings
597            let settings = self.p2p().settings().read_arc().await;
598            let outbound_peer_discovery_cooloff_time =
599                settings.outbound_peer_discovery_cooloff_time;
600            let outbound_peer_discovery_attempt_time =
601                settings.outbound_peer_discovery_attempt_time;
602            let outbound_connections = settings.outbound_connections;
603            let getaddrs_max = settings.getaddrs_max;
604            let active_profiles = settings.active_profiles.clone();
605            let seeds = settings.seeds.clone();
606            drop(settings);
607
608            if sleep_was_instant {
609                // Try again
610                current_attempt += 1;
611            } else {
612                // reset back to start
613                current_attempt = 1;
614            }
615
616            if current_attempt >= 4 {
617                verbose!(
618                    target: "net::outbound_session::peer_discovery",
619                    "[P2P] [PEER DISCOVERY] Sleeping and trying again. Attempt {current_attempt}"
620                );
621
622                dnetev!(self, OutboundPeerDiscovery, {
623                    attempt: current_attempt,
624                    state: "sleep",
625                });
626
627                sleep(outbound_peer_discovery_cooloff_time).await;
628                current_attempt = 1;
629            }
630
631            // First 2 times try sending GetAddr to the network.
632            // 3rd time do a seed sync (providing we have seeds
633            // configured).
634            if self.p2p().is_connected() && current_attempt <= 2 {
635                // Broadcast the GetAddrs message to all active peers.
636                // If we have no active peers, we will perform a SeedSyncSession instead.
637                verbose!(
638                    target: "net::outbound_session::peer_discovery",
639                    "[P2P] [PEER DISCOVERY] Asking peers for new peers to connect to...");
640
641                dnetev!(self, OutboundPeerDiscovery, {
642                    attempt: current_attempt,
643                    state: "getaddr",
644                });
645
646                let get_addrs = GetAddrsMessage {
647                    max: getaddrs_max.unwrap_or(outbound_connections.min(u32::MAX as usize) as u32),
648                    transports: active_profiles,
649                };
650
651                self.p2p().broadcast(&get_addrs).await;
652
653                // Wait for a hosts store update event
654                let store_sub = self.p2p().hosts().subscribe_store().await;
655
656                let result = timeout(
657                    Duration::from_secs(outbound_peer_discovery_attempt_time),
658                    store_sub.receive(),
659                )
660                .await;
661
662                match result {
663                    Ok(addrs_len) => {
664                        verbose!(
665                            target: "net::outbound_session::peer_discovery",
666                            "[P2P] [PEER DISCOVERY] Discovered {addrs_len} peers"
667                        );
668                    }
669                    Err(_) => {
670                        warn!(
671                            target: "net::outbound_session::peer_discovery",
672                            "[P2P] [PEER DISCOVERY] Waiting for addrs timed out."
673                        );
674                        // Just do seed next time
675                        current_attempt = 3;
676                    }
677                }
678
679                // NOTE: not every call to subscribe() in net/ has a
680                // corresponding unsubscribe(). To do this we need async
681                // Drop. For now it's sufficient for publishers to be
682                // de-allocated when the Session completes.
683                store_sub.unsubscribe().await;
684            } else if !seeds.is_empty() {
685                verbose!(
686                    target: "net::outbound_session::peer_discovery",
687                    "[P2P] [PEER DISCOVERY] Asking seeds for new peers to connect to...");
688
689                dnetev!(self, OutboundPeerDiscovery, {
690                    attempt: current_attempt,
691                    state: "seed",
692                });
693
694                self.p2p().seed().await;
695            }
696
697            self.wakeup_self.reset();
698            self.session().wakeup_slots().await;
699
700            // Give some time for new connections to be established
701            sleep(outbound_peer_discovery_attempt_time).await;
702        }
703    }
704
705    /// Blocks execution until we receive a notification from notify().
706    /// `wakeup_self.wait()` resets the condition variable (`CondVar`) and waits
707    /// for a call from `notify()`. Returns `true` if the function completed
708    /// instantly (i.e. no wait occured). Returns false otherwise.
709    async fn wait(&self) -> bool {
710        let wakeup_start = Instant::now();
711        self.wakeup_self.wait().await;
712        let wakeup_end = Instant::now();
713
714        let epsilon = Duration::from_millis(200);
715        wakeup_end - wakeup_start <= epsilon
716    }
717
718    /// Wakeup peer discovery by sending a notification to `wakeup_self`.
719    /// Uses the underlying `CondVar` method `notify()`. Subsequent calls
720    /// to this do nothing until `wait()` is called.
721    fn notify(&self) {
722        self.wakeup_self.notify()
723    }
724
725    fn session(&self) -> OutboundSessionPtr {
726        self.session.upgrade().unwrap()
727    }
728
729    fn p2p(&self) -> P2pPtr {
730        self.session().p2p()
731    }
732}