darkfi/net/session/
manual_session.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Manual connections session. Manages the creation of manual sessions.
20//! Used to create a manual session and to stop and start the session.
21//!
22//! A manual session is a type of outbound session in which we attempt
23//! connection to a predefined set of peers. Manual sessions loop forever
24//! continually trying to connect to a given peer, and sleep
25//! `outbound_connect_timeout` times between each attempt.
26//!
27//! Class consists of a weak pointer to the p2p interface and a vector of
28//! outbound connection slots. Using a weak pointer to p2p allows us to
29//! avoid circular dependencies. The vector of slots is wrapped in a mutex
30//! lock. This is switched on every time we instantiate a connection slot
31//! and insures that no other part of the program uses the slots at the
32//! same time.
33
34use std::sync::{Arc, Weak};
35
36use async_trait::async_trait;
37use futures::stream::{FuturesUnordered, StreamExt};
38use smol::lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
39use tracing::{debug, error, warn};
40use url::Url;
41
42use super::{
43    super::{
44        connector::Connector,
45        p2p::{P2p, P2pPtr},
46    },
47    Session, SessionBitFlag, SESSION_MANUAL,
48};
49use crate::{
50    net::{hosts::HostState, settings::Settings},
51    system::{sleep, StoppableTask, StoppableTaskPtr},
52    util::logger::verbose,
53    Error, Result,
54};
55
56pub type ManualSessionPtr = Arc<ManualSession>;
57
58/// Defines manual connections session.
59pub struct ManualSession {
60    pub(in crate::net) p2p: Weak<P2p>,
61    slots: AsyncMutex<Vec<Arc<Slot>>>,
62}
63
64impl ManualSession {
65    /// Create a new manual session.
66    pub fn new(p2p: Weak<P2p>) -> ManualSessionPtr {
67        Arc::new(Self { p2p, slots: AsyncMutex::new(Vec::new()) })
68    }
69
70    pub(crate) async fn start(self: Arc<Self>) {
71        // Activate mutex lock on connection slots.
72        let mut slots = self.slots.lock().await;
73
74        let mut futures = FuturesUnordered::new();
75
76        let self_ = Arc::downgrade(&self);
77
78        // Initialize a slot for each configured peer.
79        // Connections will be started by not yet activated.
80        for peer in &self.p2p().settings().read().await.peers {
81            let slot = Slot::new(self_.clone(), peer.clone(), self.p2p().settings());
82            futures.push(slot.clone().start());
83            slots.push(slot);
84        }
85
86        while (futures.next().await).is_some() {}
87    }
88
89    /// Stops the manual session.
90    pub async fn stop(&self) {
91        let slots = &*self.slots.lock().await;
92        let mut futures = FuturesUnordered::new();
93
94        for slot in slots {
95            futures.push(slot.stop());
96        }
97
98        while (futures.next().await).is_some() {}
99    }
100}
101
102#[async_trait]
103impl Session for ManualSession {
104    fn p2p(&self) -> P2pPtr {
105        self.p2p.upgrade().unwrap()
106    }
107
108    fn type_id(&self) -> SessionBitFlag {
109        SESSION_MANUAL
110    }
111
112    async fn reload(self: Arc<Self>) {}
113}
114
115struct Slot {
116    addr: Url,
117    process: StoppableTaskPtr,
118    session: Weak<ManualSession>,
119    connector: Connector,
120}
121
122impl Slot {
123    fn new(
124        session: Weak<ManualSession>,
125        addr: Url,
126        settings: Arc<AsyncRwLock<Settings>>,
127    ) -> Arc<Self> {
128        Arc::new(Self {
129            addr,
130            process: StoppableTask::new(),
131            session: session.clone(),
132            connector: Connector::new(settings, session),
133        })
134    }
135
136    async fn start(self: Arc<Self>) {
137        let ex = self.p2p().executor();
138
139        self.process.clone().start(
140            self.run(),
141            |res| async {
142                match res {
143                    Ok(()) | Err(Error::NetworkServiceStopped) => {}
144                    Err(e) => error!("net::manual_session {e}"),
145                }
146            },
147            Error::NetworkServiceStopped,
148            ex,
149        );
150    }
151
152    /// Attempts a connection on the associated Connector object.
153    async fn run(self: Arc<Self>) -> Result<()> {
154        let ex = self.p2p().executor();
155
156        let mut attempts = 0;
157        loop {
158            attempts += 1;
159
160            verbose!(
161                target: "net::manual_session",
162                "[P2P] Connecting to manual outbound [{}] (attempt #{})",
163                self.addr, attempts
164            );
165
166            let settings = self.p2p().settings().read_arc().await;
167            let seeds = settings.seeds.clone();
168            let outbound_connect_timeout = settings.outbound_connect_timeout(self.addr.scheme());
169            drop(settings);
170
171            // Do not establish a connection to a host that is also configured as a seed.
172            // This indicates a user misconfiguration.
173            if seeds.contains(&self.addr) {
174                error!(
175                    target: "net::manual_session",
176                    "[P2P] Suspending manual connection to seed [{}]", self.addr.clone(),
177                );
178                return Ok(())
179            }
180
181            if let Err(e) = self.p2p().hosts().try_register(self.addr.clone(), HostState::Connect) {
182                debug!(target: "net::manual_session",
183                    "Cannot connect to manual={}, err={e}", &self.addr);
184
185                sleep(outbound_connect_timeout).await;
186
187                continue
188            }
189
190            match self.connector.connect(&self.addr).await {
191                Ok((_, channel)) => {
192                    verbose!(
193                        target: "net::manual_session",
194                        "[P2P] Manual outbound connected [{}]",
195                        channel.display_address()
196                    );
197
198                    let stop_sub = channel.subscribe_stop().await?;
199
200                    // Channel is now connected but not yet setup
201
202                    // Register the new channel
203                    match self.session().register_channel(channel.clone(), ex.clone()).await {
204                        Ok(()) => {
205                            // Wait for channel to close
206                            stop_sub.receive().await;
207
208                            verbose!(
209                                target: "net::manual_session",
210                                "[P2P] Manual outbound disconnected [{}]",
211                                channel.display_address()
212                            );
213                        }
214                        Err(e) => {
215                            warn!(
216                                target: "net::manual_session",
217                                "[P2P] Unable to connect to manual outbound [{}]: {e}",
218                                channel.display_address(),
219                            );
220
221                            // Free up this addr for future operations.
222                            if let Err(e) = self.p2p().hosts().unregister(channel.address()) {
223                                warn!(target: "net::manual_session", "[P2P] Error while unregistering addr={}, err={e}", channel.display_address());
224                            }
225                        }
226                    }
227                }
228                Err(e) => {
229                    warn!(
230                        target: "net::manual_session",
231                        "[P2P] Unable to connect to manual outbound: {e}",
232                    );
233
234                    // Free up this addr for future operations.
235                    if let Err(e) = self.p2p().hosts().unregister(&self.addr) {
236                        warn!(target: "net::manual_session", "[P2P] Error while unregistering addr={}, err={e}", self.addr);
237                    }
238                }
239            }
240
241            verbose!(
242                target: "net::manual_session",
243                "[P2P] Waiting {outbound_connect_timeout} seconds until next manual outbound connection attempt [{}]",
244                self.addr,
245            );
246
247            sleep(outbound_connect_timeout).await;
248        }
249    }
250
251    fn session(&self) -> ManualSessionPtr {
252        self.session.upgrade().unwrap()
253    }
254
255    fn p2p(&self) -> P2pPtr {
256        self.session().p2p()
257    }
258
259    async fn stop(&self) {
260        self.connector.stop();
261        self.process.stop().await;
262    }
263}