darkfi/net/session/
seedsync_session.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Seed sync session creates a connection to the seed nodes specified in settings.
20//!
21//! A new seed sync session is created every time we call [`P2p::start()`]. The
22//! seed sync session loops through all the configured seeds and creates a corresponding
23//! `Slot`. `Slot`'s are started, but sit in a suspended state until they are activated
24//! by a call to notify (see: `p2p.seed()`).
25//!
26//! When a `Slot` has been activated by a call to `notify()`, it will try to connect
27//! to the given seed address using a [`Connector`]. This will either connect successfully
28//! or fail with a warning. With gather the results of each `Slot` in an `AtomicBool`
29//! so that we can handle the error elsewhere in the code base.
30//!
31//! If a seed node connects successfully, it runs a version exchange protocol,
32//! stores the channel in the p2p list of channels, and disconnects, removing
33//! the channel from the channel list.
34//!
35//! The channel is registered using the [`Session::register_channel()`] trait
36//! method. This invokes the Protocol Registry method `attach()`. Usually this
37//! returns a list of protocols that we loop through and start. In this case,
38//! `attach()` uses the bitflag selector to identify seed sessions and exclude
39//! them.
40//!
41//! The version exchange occurs inside `register_channel()`. We create a handshake
42//! task that runs the version exchange with the `perform_handshake_protocols()`
43//! function. This runs the version exchange protocol, stores the channel in the
44//! p2p list of channels, and subscribes to a stop signal.
45
46use std::sync::{
47    atomic::{AtomicBool, Ordering::SeqCst},
48    Arc, Weak,
49};
50
51use async_trait::async_trait;
52use futures::stream::{FuturesUnordered, StreamExt};
53use smol::lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
54use tracing::{debug, warn};
55use url::Url;
56
57use super::{
58    super::{
59        connector::Connector,
60        hosts::HostColor,
61        p2p::{P2p, P2pPtr},
62        settings::Settings,
63    },
64    Session, SessionBitFlag, SESSION_SEED,
65};
66use crate::{
67    net::hosts::HostState,
68    system::{CondVar, StoppableTask, StoppableTaskPtr},
69    util::logger::verbose,
70    Error,
71};
72
73pub type SeedSyncSessionPtr = Arc<SeedSyncSession>;
74
75/// Defines seed connections session
76pub struct SeedSyncSession {
77    pub(in crate::net) p2p: Weak<P2p>,
78    slots: AsyncMutex<Vec<Arc<Slot>>>,
79}
80
81impl SeedSyncSession {
82    /// Create a new seed sync session instance
83    pub(crate) fn new(p2p: Weak<P2p>) -> SeedSyncSessionPtr {
84        Arc::new(Self { p2p, slots: AsyncMutex::new(Vec::new()) })
85    }
86
87    /// Initialize the seedsync session. Each slot is suspended while it waits
88    /// for a call to notify().
89    pub(crate) async fn start(self: Arc<Self>) {
90        // Activate mutex lock on connection slots.
91        let mut slots = self.slots.lock().await;
92
93        let mut futures = FuturesUnordered::new();
94
95        let self_ = Arc::downgrade(&self);
96
97        // Initialize a slot for each configured seed.
98        // Connections will be started by not yet activated.
99        for seed in &self.p2p().settings().read().await.seeds {
100            let slot = Slot::new(self_.clone(), seed.clone(), self.p2p().settings());
101            futures.push(slot.clone().start());
102            slots.push(slot);
103        }
104
105        while (futures.next().await).is_some() {}
106    }
107
108    /// Activate the slots so they can continue with the seedsync process.
109    /// Called in `p2p.seed()`.
110    pub(crate) async fn notify(&self) {
111        let slots = &*self.slots.lock().await;
112
113        for slot in slots {
114            slot.notify();
115        }
116    }
117
118    /// Stop the seedsync session.
119    pub(crate) async fn stop(&self) {
120        debug!(target: "net::seedsync_session", "Stopping seed sync session...");
121        let slots = &*self.slots.lock().await;
122        let mut futures = FuturesUnordered::new();
123
124        for slot in slots {
125            futures.push(slot.clone().stop());
126        }
127
128        while (futures.next().await).is_some() {}
129        debug!(target: "net::seedsync_session", "Seed sync session stopped!");
130    }
131
132    /// Returns true if every seed attempt per slot has failed.
133    async fn _failed(&self) -> bool {
134        let slots = &*self.slots.lock().await;
135        slots.iter().all(|s| s._failed())
136    }
137}
138
139#[async_trait]
140impl Session for SeedSyncSession {
141    fn p2p(&self) -> P2pPtr {
142        self.p2p.upgrade().unwrap()
143    }
144
145    fn type_id(&self) -> SessionBitFlag {
146        SESSION_SEED
147    }
148
149    async fn reload(self: Arc<Self>) {}
150}
151
152struct Slot {
153    addr: Url,
154    process: StoppableTaskPtr,
155    wakeup_self: CondVar,
156    session: Weak<SeedSyncSession>,
157    connector: Connector,
158    failed: AtomicBool,
159}
160
161impl Slot {
162    fn new(
163        session: Weak<SeedSyncSession>,
164        addr: Url,
165        settings: Arc<AsyncRwLock<Settings>>,
166    ) -> Arc<Self> {
167        Arc::new(Self {
168            addr,
169            process: StoppableTask::new(),
170            wakeup_self: CondVar::new(),
171            session: session.clone(),
172            connector: Connector::new(settings, session),
173            failed: AtomicBool::new(false),
174        })
175    }
176
177    async fn start(self: Arc<Self>) {
178        let ex = self.p2p().executor();
179
180        self.process.clone().start(
181            async move {
182                self.run().await;
183                unreachable!();
184            },
185            // Ignore stop handler
186            |_| async {},
187            Error::NetworkServiceStopped,
188            ex,
189        );
190    }
191
192    /// Main seedsync connection process that is started on `p2p.start()` but does
193    /// not proceed until it receives a call to `notify()` (called in `p2p.seed()`).
194    /// Resets the CondVar after each run to re-suspend the connection process until
195    /// `notify()` is called again.
196    async fn run(self: Arc<Self>) {
197        let ex = self.p2p().executor();
198        let hosts = self.p2p().hosts();
199
200        loop {
201            // Wait for a signal from notify() before proceeding with the seedsync.
202            self.wait().await;
203
204            debug!(
205                target: "net::session::seedsync_session", "SeedSyncSession::start_seed() [START]",
206            );
207
208            if let Err(e) = hosts.try_register(self.addr.clone(), HostState::Connect) {
209                debug!(target: "net::session::seedsync_session",
210                    "Cannot connect to seed={}, err={e}", &self.addr);
211
212                // Reset the CondVar for future use.
213                self.reset();
214
215                continue
216            }
217
218            match self.connector.connect(&self.addr).await {
219                Ok((_, ch)) => {
220                    verbose!(
221                        target: "net::session::seedsync_session",
222                        "[P2P] Connected seed [{}]",
223                        ch.display_address()
224                    );
225
226                    match self.session().register_channel(ch.clone(), ex.clone()).await {
227                        Ok(()) => {
228                            self.failed.store(false, SeqCst);
229
230                            verbose!(
231                                target: "net::session::seedsync_session",
232                                "[P2P] Disconnecting from seed [{}]",
233                                ch.display_address()
234                            );
235                            ch.stop().await;
236
237                            // Seed process complete
238                            if hosts.container.is_empty(HostColor::Grey) {
239                                warn!(target: "net::session::seedsync_session",
240                                "[P2P] Greylist empty after seeding");
241                            }
242
243                            // Reset the CondVar for future use.
244                            self.reset();
245                        }
246
247                        Err(e) => {
248                            warn!(
249                                target: "net::session::seedsync_session",
250                                "[P2P] Unable to connect to seed [{}]: {e}",
251                                ch.display_address()
252                            );
253                            self.handle_failure(ch.address());
254
255                            continue
256                        }
257                    }
258                }
259
260                Err(e) => {
261                    warn!(
262                        target: "net::session::seedsync_session",
263                        "[P2P] Unable to connect to seed: {e}",
264                    );
265                    self.handle_failure(&self.addr);
266
267                    continue
268                }
269            }
270            debug!(
271                target: "net::session::seedsync_session",
272                "SeedSyncSession::start_seed() [END]",
273            );
274        }
275    }
276
277    fn handle_failure(&self, addr: &Url) {
278        self.failed.store(true, SeqCst);
279
280        // Free up this addr for future operations.
281        if let Err(e) = self.p2p().hosts().unregister(addr) {
282            warn!(target: "net::session::seedsync_session", "[P2P] Error while unregistering addr={addr}, err={e}");
283        }
284
285        // Reset the CondVar for future use.
286        self.reset();
287    }
288
289    fn _failed(&self) -> bool {
290        self.failed.load(SeqCst)
291    }
292
293    fn session(&self) -> SeedSyncSessionPtr {
294        self.session.upgrade().unwrap()
295    }
296
297    fn p2p(&self) -> P2pPtr {
298        self.session().p2p()
299    }
300
301    async fn wait(&self) {
302        self.wakeup_self.wait().await;
303    }
304
305    fn reset(&self) {
306        self.wakeup_self.reset()
307    }
308
309    fn notify(&self) {
310        self.wakeup_self.notify()
311    }
312
313    async fn stop(self: Arc<Self>) {
314        self.connector.stop();
315        self.process.stop().await;
316    }
317}