darkfi/net/session/
direct_session.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Direct connections session. Manages the creation of direct sessions.
20//! Used to create a direct session and to stop and start the session.
21//!
22//! A direct session is a type of outbound session in which a protocol can
23//! open a temporary channel (stopped after used) to a peer. Direct sessions
24//! do not loop continually, once stopped the session will not try to reopen
25//! a connection.
26//!
27//! If there is no slots in the outbound session, the direct session can
28//! optionally handle peer discovery.
29
30use std::{
31    collections::HashMap,
32    sync::{atomic::Ordering, Arc, Weak},
33    time::Duration,
34};
35
36use async_trait::async_trait;
37use smol::lock::{Mutex as AsyncMutex, OnceCell};
38use tracing::{error, warn};
39use url::Url;
40
41use super::{
42    super::{
43        connector::Connector,
44        dnet::{self, dnetev, DnetEvent},
45        hosts::{HostColor, HostState},
46        message::GetAddrsMessage,
47        p2p::{P2p, P2pPtr},
48    },
49    Session, SessionBitFlag, SESSION_DIRECT,
50};
51use crate::{
52    net::ChannelPtr,
53    system::{
54        msleep, sleep, timeout::timeout, CondVar, PublisherPtr, StoppableTask, StoppableTaskPtr,
55    },
56    util::logger::verbose,
57    Error, Result,
58};
59
60pub type DirectSessionPtr = Arc<DirectSession>;
61
62/// Defines direct connections session.
63pub struct DirectSession {
64    /// Weak pointer to parent p2p object
65    pub(in crate::net) p2p: Weak<P2p>,
66    /// Connector to create direct connections
67    connector: OnceCell<Connector>,
68    /// Tasks that are trying to create a direct channel (they retry until they succeed).
69    /// A task is removed once the channel is successfully created.
70    retries_tasks: Arc<AsyncMutex<HashMap<Url, Arc<StoppableTask>>>>,
71    /// Peer discovery task
72    peer_discovery: Arc<PeerDiscovery>,
73    /// Channel ID -> usage count
74    channels_usage: Arc<AsyncMutex<HashMap<u32, u32>>>,
75    /// Pending channel creation tasks
76    tasks: Arc<AsyncMutex<HashMap<Url, Weak<ChannelTask>>>>,
77}
78
79impl DirectSession {
80    /// Create a new direct session.
81    pub fn new(p2p: Weak<P2p>) -> DirectSessionPtr {
82        Arc::new_cyclic(|session| Self {
83            p2p,
84            connector: OnceCell::new(),
85            retries_tasks: Arc::new(AsyncMutex::new(HashMap::new())),
86            peer_discovery: PeerDiscovery::new(session.clone()),
87            channels_usage: Arc::new(AsyncMutex::new(HashMap::new())),
88            tasks: Arc::new(AsyncMutex::new(HashMap::new())),
89        })
90    }
91
92    /// Start the direct session.
93    pub(crate) async fn start(self: Arc<Self>) {
94        self.peer_discovery.clone().start().await;
95    }
96
97    /// Stops the direct session.
98    pub async fn stop(&self) {
99        self.peer_discovery.clone().stop().await;
100
101        for (_, task) in self.retries_tasks.lock().await.iter() {
102            task.stop().await;
103        }
104    }
105
106    /// Notify the peer discovery task to start it.
107    /// The direct session's peer discovery process will not start until this
108    /// method is called.
109    /// If there are outbound slots, peer discovery does not start even if this
110    /// method is called, we let the outbound session take care of it.
111    pub fn start_peer_discovery(&self) {
112        self.peer_discovery.notify();
113    }
114
115    /// If there is an existing channel to the same address, this method will
116    /// return it (even if the channel was not created by the direct session).
117    /// Otherwise it will create a new channel to `addr` in the direct session.
118    pub async fn get_channel(self: Arc<Self>, addr: &Url) -> Result<ChannelPtr> {
119        // Check existing channels
120        let channels = self.p2p().hosts().channels();
121        if let Some(channel) =
122            channels.iter().find(|&chan| chan.info.connect_addr == *addr).cloned()
123        {
124            let mut channels_usage = self.channels_usage.lock().await;
125            if channel.is_stopped() {
126                channel.clone().start(self.p2p().executor());
127            }
128            if channel.session_type_id() & SESSION_DIRECT != 0 {
129                channels_usage.entry(channel.info.id).and_modify(|count| *count += 1).or_insert(1);
130            }
131            return Ok(channel);
132        }
133
134        let mut tasks = self.tasks.lock().await;
135
136        // Check if task is already running for this addr
137        if let Some(task) = tasks.get(addr) {
138            if let Some(task) = task.upgrade() {
139                drop(tasks);
140                // Wait for the existing task to complete
141                while task.output.lock().await.is_none() {
142                    msleep(100).await;
143                }
144                return task.output.lock().await.clone().unwrap();
145            } else {
146                drop(tasks);
147                // Wait for the existing task to be fully removed
148                loop {
149                    tasks = self.tasks.lock().await;
150                    if !tasks.contains_key(addr) {
151                        break
152                    }
153                    drop(tasks);
154                    msleep(100).await;
155                }
156            }
157        }
158
159        // If no task running, create one
160        let task = Arc::new(ChannelTask {
161            session: Arc::downgrade(&self.clone()),
162            addr: addr.clone(),
163            output: Arc::new(AsyncMutex::new(None)),
164        });
165        tasks.insert(addr.clone(), Arc::downgrade(&task));
166        drop(tasks);
167
168        // Spawn a new task to create the channel
169        let ex = self.p2p().executor();
170        let addr_ = addr.clone();
171        let self_ = self.clone();
172        let task_ = task.clone();
173        ex.spawn(async move {
174            let res = self_.clone().new_channel(addr_.clone()).await;
175
176            let mut output = task_.output.lock().await;
177            *output = Some(res);
178        })
179        .detach();
180
181        // Wait for completion
182        while task.output.lock().await.is_none() {
183            msleep(100).await;
184        }
185        let res = task.output.lock().await.as_ref().unwrap().clone();
186        if let Ok(ref channel) = res {
187            self.inc_channel_usage(channel, Arc::strong_count(&task).try_into().unwrap()).await;
188        }
189        res
190    }
191
192    /// Increment channel usage
193    pub async fn inc_channel_usage(&self, channel: &ChannelPtr, n: u32) {
194        if channel.session_type_id() & SESSION_DIRECT == 0 {
195            // Do nothing if this is not a channel created by the direct session
196            return
197        }
198        let mut channels_usage = self.channels_usage.lock().await;
199        channels_usage.entry(channel.info.id).and_modify(|count| *count += n).or_insert(n);
200    }
201
202    /// Try to create a new channel until it succeeds, then notify `channel_pub`.
203    /// If it fails to create a channel, a task will sleep
204    /// `outbound_connect_timeout` seconds and try again.
205    pub async fn get_channel_with_retries(
206        self: Arc<Self>,
207        addr: Url,
208        channel_pub: PublisherPtr<ChannelPtr>,
209    ) {
210        let task = StoppableTask::new();
211        let self_ = self.clone();
212        let mut retries_tasks = self.retries_tasks.lock().await;
213        retries_tasks.insert(addr.clone(), task.clone());
214        drop(retries_tasks);
215
216        task.clone().start(
217            async move {
218                loop {
219                    let res = self_.clone().get_channel(&addr).await;
220                    match res {
221                        Ok(channel) => {
222                            channel_pub.notify(channel).await;
223                            let mut retries_tasks = self_.retries_tasks.lock().await;
224                            retries_tasks.remove(&addr);
225                            break
226                        }
227                        Err(_) => {
228                            let outbound_connect_timeout = self_
229                                .p2p()
230                                .settings()
231                                .read_arc()
232                                .await
233                                .outbound_connect_timeout(addr.scheme());
234                            sleep(outbound_connect_timeout).await;
235                        }
236                    }
237                }
238
239                Ok(())
240            },
241            |res| async {
242                match res {
243                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
244                    Err(e) => {
245                        error!(target: "net::direct_session::get_channel_with_retries", "{e}")
246                    }
247                }
248            },
249            Error::DetachedTaskStopped,
250            self.p2p().executor(),
251        );
252    }
253
254    async fn new_channel(self: Arc<Self>, addr: Url) -> Result<ChannelPtr> {
255        if !self.connector.is_initialized() {
256            let _ = self
257                .connector
258                .set(Connector::new(self.p2p().settings(), Arc::downgrade(&self.clone()).clone()))
259                .await;
260        }
261
262        verbose!(
263            target: "net::direct_session",
264            "[P2P] Connecting to direct outbound [{addr}]",
265        );
266
267        let settings = self.p2p().settings().read_arc().await;
268        let seeds = settings.seeds.clone();
269        let active_profiles = settings.active_profiles.clone();
270        drop(settings);
271
272        // Do not establish a connection to a host that is also configured as a seed.
273        // This indicates a user misconfiguration.
274        if seeds.contains(&addr) {
275            error!(
276                target: "net::direct_session",
277                "[P2P] Suspending direct connection to seed [{}]", addr.clone(),
278            );
279            return Err(Error::ConnectFailed(format!("[{addr}]: Direct connection to seed")))
280        }
281
282        // Abort if we are trying to connect to our own external address.
283        let hosts = self.p2p().hosts();
284        let external_addrs = hosts.external_addrs().await;
285        if external_addrs.contains(&addr) {
286            warn!(
287                target: "net::hosts::check_addrs",
288                "[P2P] Suspending direct connection to external addr [{}]", addr.clone(),
289            );
290            return Err(Error::ConnectFailed(format!(
291                "[{addr}]: Direct connection to external addr"
292            )))
293        }
294
295        // Abort if we do not support this transport.
296        if !active_profiles.contains(&addr.scheme().to_string()) {
297            return Err(Error::UnsupportedTransport(addr.scheme().to_string()))
298        }
299
300        // Abort if this peer is IPv6 and we do not support it.
301        if !hosts.ipv6_available.load(Ordering::SeqCst) && hosts.is_ipv6(&addr) {
302            return Err(Error::ConnectFailed(format!("[{addr}]: IPv6 is unavailable")))
303        }
304
305        // Set the addr to HostState::Connect
306        loop {
307            if let Err(e) = hosts.try_register(addr.clone(), HostState::Connect) {
308                // If `try_register` failed because the addr is being refined, try again in a bit.
309                if let Error::HostStateBlocked(from, _) = &e {
310                    if from == "Refine" {
311                        // TODO: Add a setting or have a way to wait for the refinery to complete
312                        sleep(5).await;
313                        continue
314                    }
315                }
316
317                error!(target: "net::direct_session",
318                    "[P2P] Cannot connect to direct={addr}, err={e}");
319                return Err(e)
320            }
321            break
322        }
323
324        dnetev!(self, DirectConnecting, {
325            connect_addr: addr.clone(),
326        });
327
328        // Attempt channel creation
329        match self.connector.get().unwrap().connect(&addr).await {
330            Ok((_, channel)) => {
331                verbose!(
332                    target: "net::direct_session",
333                    "[P2P] Direct outbound connected [{}]",
334                    channel.display_address()
335                );
336
337                dnetev!(self, DirectConnected, {
338                    connect_addr: channel.info.connect_addr.clone(),
339                    addr: channel.display_address().clone(),
340                    channel_id: channel.info.id
341                });
342
343                // Register the new channel
344                match self.register_channel(channel.clone(), self.p2p().executor()).await {
345                    Ok(()) => Ok(channel),
346                    Err(e) => {
347                        warn!(
348                            target: "net::direct_session",
349                            "[P2P] Unable to connect to direct outbound [{}]: {e}",
350                            channel.display_address(),
351                        );
352
353                        dnetev!(self, DirectDisconnected, {
354                            connect_addr: channel.info.connect_addr.clone(),
355                            err: e.to_string()
356                        });
357
358                        // Free up this addr for future operations.
359                        if let Err(e) = self.p2p().hosts().unregister(channel.address()) {
360                            warn!(target: "net::direct_session", "[P2P] Error while unregistering addr={}, err={e}", channel.display_address());
361                        }
362
363                        Err(e)
364                    }
365                }
366            }
367            Err(e) => {
368                warn!(
369                    target: "net::direct_session",
370                    "[P2P] Unable to connect to direct outbound: {e}",
371                );
372
373                dnetev!(self, DirectDisconnected, {
374                    connect_addr: addr.clone(),
375                    err: e.to_string()
376                });
377
378                // Free up this addr for future operations.
379                if let Err(e) = self.p2p().hosts().unregister(&addr) {
380                    warn!(target: "net::direct_session", "[P2P] Error while unregistering addr={addr}, err={e}");
381                }
382
383                Err(e)
384            }
385        }
386    }
387
388    /// Close a direct channel if it's not used by anything.
389    /// `AsyncDrop` would be great here (<https://doc.rust-lang.org/std/future/trait.AsyncDrop.html>)
390    /// but it's still in nightly. For now you must call this method manually
391    /// once you are done with a direct channel.
392    /// Returns `true` if the channel is stopped.
393    pub async fn cleanup_channel(self: Arc<Self>, channel: ChannelPtr) -> bool {
394        if channel.session_type_id() & SESSION_DIRECT == 0 {
395            // Do nothing if this is not a channel created by the direct session
396            return false
397        }
398
399        let mut channels_usage = self.channels_usage.lock().await;
400        let usage_count = channels_usage.get_mut(&channel.info.id);
401        if usage_count.is_none() {
402            let _ = self.p2p().hosts().unregister(channel.address());
403            channel.stop().await;
404            return true
405        }
406        let usage_count = usage_count.unwrap();
407        if *usage_count > 0 {
408            *usage_count -= 1;
409        }
410
411        if *usage_count == 0 {
412            channels_usage.remove(&channel.info.id);
413            let _ = self.p2p().hosts().unregister(channel.address());
414            channel.stop().await;
415            return true
416        }
417
418        false
419    }
420}
421
422#[async_trait]
423impl Session for DirectSession {
424    fn p2p(&self) -> P2pPtr {
425        self.p2p.upgrade().unwrap()
426    }
427
428    fn type_id(&self) -> SessionBitFlag {
429        SESSION_DIRECT
430    }
431
432    async fn reload(self: Arc<Self>) {}
433}
434
435struct ChannelTask {
436    session: Weak<DirectSession>,
437    addr: Url,
438    output: Arc<AsyncMutex<Option<Result<ChannelPtr>>>>,
439}
440
441impl Drop for ChannelTask {
442    fn drop(&mut self) {
443        let session = self.session.upgrade().unwrap();
444        let addr = self.addr.clone();
445        session
446            .p2p()
447            .executor()
448            .spawn(async move {
449                let mut tasks = session.tasks.lock().await;
450                tasks.remove(&addr);
451            })
452            .detach();
453    }
454}
455
456/// PeerDiscovery process for that sends `GetAddrs` messages to a random
457/// whitelist or greylist host (creating a channel in the direct session).
458/// If it's unsuccessful after two attempts, connect to our seed nodes and
459/// perform `SeedSyncSession`.
460struct PeerDiscovery {
461    process: StoppableTaskPtr,
462    init: CondVar,
463    session: Weak<DirectSession>,
464}
465
466impl PeerDiscovery {
467    fn new(session: Weak<DirectSession>) -> Arc<Self> {
468        Arc::new(Self { process: StoppableTask::new(), init: CondVar::new(), session })
469    }
470}
471
472impl PeerDiscovery {
473    async fn start(self: Arc<Self>) {
474        let ex = self.p2p().executor();
475        self.process.clone().start(
476            async move {
477                self.run().await;
478                Ok(())
479            },
480            // Ignore stop handler
481            |_| async {},
482            Error::NetworkServiceStopped,
483            ex,
484        );
485    }
486    async fn stop(self: Arc<Self>) {
487        self.process.stop().await;
488    }
489
490    /// Peer discovery's main process. For the first two attempts, this will
491    /// broadcast a `GetAddrs` message to request more peers. If we are not
492    /// connected to any peer, we try to create a channel in the direct session
493    /// to a random whitelist or greylist host.
494    /// Other parts of the P2P stack will then handle the incoming addresses
495    /// and place them in the hosts list.
496    ///
497    /// On the third attempt, and if we still haven't made any connections,
498    /// this function will then call `p2p.seed()` which triggers a
499    /// `SeedSyncSession` that will connect to configured seeds and request
500    /// peers from them.
501    ///
502    /// This function will also sleep `outbound_peer_discovery_attempt_time`
503    /// seconds after broadcasting in order to let the P2P stack receive and
504    /// work through the addresses it is expecting.
505    ///
506    /// Peer discovery will only start once `notify()` is called.
507    async fn run(self: Arc<Self>) {
508        // DirectSession can handle peer discovery only if there is no outbound
509        // slot. Otherwise we let the outbound session take care of it.
510        let settings = self.p2p().settings().read_arc().await;
511        if settings.outbound_connections > 0 {
512            return
513        }
514
515        // Wait for the peer discovery to be notified
516        self.init.wait().await;
517
518        let mut current_attempt = 0;
519        loop {
520            dnetev!(self, DirectPeerDiscovery, {
521                attempt: current_attempt,
522                state: "wait",
523            });
524
525            // Read the current P2P settings
526            let settings = self.p2p().settings().read_arc().await;
527            let outbound_peer_discovery_cooloff_time =
528                settings.outbound_peer_discovery_cooloff_time;
529            let outbound_peer_discovery_attempt_time =
530                settings.outbound_peer_discovery_attempt_time;
531            let getaddrs_max = settings.getaddrs_max;
532            let active_profiles = settings.active_profiles.clone();
533            let seeds = settings.seeds.clone();
534            drop(settings);
535
536            current_attempt += 1;
537
538            if current_attempt >= 4 {
539                verbose!(
540                    target: "net::direct_session::peer_discovery",
541                    "[P2P] [PEER DISCOVERY] Sleeping and trying again. Attempt {current_attempt}"
542                );
543
544                dnetev!(self, DirectPeerDiscovery, {
545                    attempt: current_attempt,
546                    state: "sleep",
547                });
548
549                sleep(outbound_peer_discovery_cooloff_time).await;
550                current_attempt = 1;
551            }
552
553            // If we are not connected to any peer, try to create a channel
554            // (using the direct session) to a random host from the goldlist,
555            // whitelist, or greylist.
556            let mut channel = None;
557            if !self.p2p().is_connected() {
558                dnetev!(self, DirectPeerDiscovery, {
559                    attempt: current_attempt,
560                    state: "newchan",
561                });
562
563                for color in [HostColor::Gold, HostColor::White, HostColor::Grey].iter() {
564                    if let Some((entry, _)) = self
565                        .p2p()
566                        .hosts()
567                        .container
568                        .fetch_random_with_schemes(color.clone(), &active_profiles)
569                    {
570                        channel = self.p2p().session_direct().get_channel(&entry.0).await.ok();
571                        break;
572                    }
573                }
574            }
575
576            // First 2 times try sending GetAddr to the network.
577            // 3rd time do a seed sync (providing we have seeds configured).
578            if self.p2p().is_connected() && current_attempt <= 2 {
579                // Broadcast the GetAddrs message to all active peers.
580                // If we have no active peers, we will perform a SeedSyncSession instead.
581                verbose!(
582                    target: "net::direct_session::peer_discovery",
583                    "[P2P] [PEER DISCOVERY] Asking peers for new peers to connect to...");
584
585                dnetev!(self, DirectPeerDiscovery, {
586                    attempt: current_attempt,
587                    state: "getaddr",
588                });
589
590                let get_addrs =
591                    GetAddrsMessage { max: getaddrs_max.unwrap_or(1), transports: active_profiles };
592
593                self.p2p().broadcast(&get_addrs).await;
594
595                // Wait for a hosts store update event
596                let store_sub = self.p2p().hosts().subscribe_store().await;
597
598                let result = timeout(
599                    Duration::from_secs(outbound_peer_discovery_attempt_time),
600                    store_sub.receive(),
601                )
602                .await;
603
604                match result {
605                    Ok(addrs_len) => {
606                        verbose!(
607                            target: "net::direct_session::peer_discovery",
608                            "[P2P] [PEER DISCOVERY] Discovered {addrs_len} peers"
609                        );
610                        // Found some addrs, reset `current_attempt`
611                        if addrs_len > 0 {
612                            current_attempt = 0;
613                        }
614                    }
615                    Err(_) => {
616                        warn!(
617                            target: "net::direct_session::peer_discovery",
618                            "[P2P] [PEER DISCOVERY] Waiting for addrs timed out."
619                        );
620                        // Just do seed next time
621                        current_attempt = 3;
622                    }
623                }
624
625                // NOTE: not every call to subscribe() in net/ has a
626                // corresponding unsubscribe(). To do this we need async
627                // Drop. For now it's sufficient for publishers to be
628                // de-allocated when the Session completes.
629                store_sub.unsubscribe().await;
630            } else if !seeds.is_empty() {
631                verbose!(
632                    target: "net::direct_session::peer_discovery",
633                    "[P2P] [PEER DISCOVERY] Asking seeds for new peers to connect to...");
634
635                dnetev!(self, DirectPeerDiscovery, {
636                    attempt: current_attempt,
637                    state: "seed",
638                });
639
640                self.p2p().seed().await;
641            }
642
643            // Stop the channel we created for peer discovery
644            if let Some(ch) = channel {
645                self.p2p().session_direct().cleanup_channel(ch).await;
646            }
647
648            // Give some time for new connections to be established
649            sleep(outbound_peer_discovery_attempt_time).await;
650        }
651    }
652
653    /// Init peer discovery by sending a notification to `init`.
654    /// Uses the underlying `CondVar` method `notify()`.
655    pub fn notify(&self) {
656        self.init.notify()
657    }
658
659    fn session(&self) -> DirectSessionPtr {
660        self.session.upgrade().unwrap()
661    }
662
663    fn p2p(&self) -> P2pPtr {
664        self.session().p2p()
665    }
666}