darkfi/net/session/
refine_session.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! `RefineSession` manages the `GreylistRefinery`, which randomly selects
20//! entries on the greylist and updates them to whitelist if active,
21//!
22//! `GreylistRefinery` makes use of a `RefineSession` method called
23//! `handshake_node()`, which uses a `Connector` to establish a `Channel` with
24//! a provided address, and then does a version exchange across the channel
25//! (`perform_handshake_protocols`). `handshake_node()` can either succeed,
26//! fail, or timeout.
27
28use futures::{
29    future::{select, Either},
30    pin_mut,
31};
32use smol::Timer;
33use std::{
34    sync::{Arc, Weak},
35    time::{Duration, Instant, UNIX_EPOCH},
36};
37
38use async_trait::async_trait;
39use tracing::{debug, warn};
40use url::Url;
41
42use super::super::p2p::{P2p, P2pPtr};
43
44use crate::{
45    net::{
46        connector::Connector,
47        hosts::{HostColor, HostState},
48        protocol::ProtocolVersion,
49        session::{Session, SessionBitFlag, SESSION_REFINE},
50    },
51    system::{sleep, StoppableTask, StoppableTaskPtr},
52    Error,
53};
54
55pub type RefineSessionPtr = Arc<RefineSession>;
56
57pub struct RefineSession {
58    /// Weak pointer to parent p2p object
59    pub(in crate::net) p2p: Weak<P2p>,
60
61    /// Task that periodically checks entries in the greylist.
62    pub(in crate::net) refinery: Arc<GreylistRefinery>,
63}
64
65impl RefineSession {
66    pub fn new(p2p: Weak<P2p>) -> RefineSessionPtr {
67        Arc::new_cyclic(|session| Self { p2p, refinery: GreylistRefinery::new(session.clone()) })
68    }
69
70    /// Start the refinery and self handshake processes.
71    pub(crate) async fn start(self: Arc<Self>) {
72        if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
73            match self.p2p().hosts().container.load_all(hostlist) {
74                Ok(()) => {
75                    debug!(target: "net::refine_session::start", "Load hosts successful!");
76                }
77                Err(e) => {
78                    warn!(target: "net::refine_session::start", "Error loading hosts {e}");
79                }
80            }
81        }
82
83        match self.p2p().hosts().import_blacklist().await {
84            Ok(()) => {
85                debug!(target: "net::refine_session::start", "Import blacklist successful!");
86            }
87            Err(e) => {
88                warn!(target: "net::refine_session::start",
89                    "Error importing blacklist from config file {e}");
90            }
91        }
92
93        debug!(target: "net::refine_session", "Starting greylist refinery process");
94        self.refinery.clone().start().await;
95    }
96
97    /// Stop the refinery and self handshake processes.
98    pub(crate) async fn stop(&self) {
99        debug!(target: "net::refine_session", "Stopping refinery process");
100        self.refinery.clone().stop().await;
101
102        if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
103            match self.p2p().hosts().container.save_all(hostlist) {
104                Ok(()) => {
105                    debug!(target: "net::refine_session::stop", "Save hosts successful!");
106                }
107                Err(e) => {
108                    warn!(target: "net::refine_session::stop", "Error saving hosts {e}");
109                }
110            }
111        }
112    }
113
114    /// Globally accessible function to perform a version exchange with a
115    /// given address.  Returns `true` if an address is accessible, false
116    /// otherwise.  
117    pub async fn handshake_node(self: Arc<Self>, addr: Url, p2p: P2pPtr) -> bool {
118        let self_ = Arc::downgrade(&self);
119        let connector = Connector::new(self.p2p().settings(), self_);
120
121        debug!(target: "net::refinery::handshake_node", "Attempting to connect to {addr}");
122        match connector.connect(&addr).await {
123            Ok((url, channel)) => {
124                debug!(target: "net::refinery::handshake_node", "Successfully created a channel with {url}");
125                // First initialize the version protocol and its Version, Verack subscriptions.
126                let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
127
128                debug!(target: "net::refinery::handshake_node", "Performing handshake protocols with {url}");
129                // Then run the version exchange, store the channel and subscribe to a stop signal.
130                let handshake =
131                    self.perform_handshake_protocols(proto_ver, channel.clone(), p2p.executor());
132
133                debug!(target: "net::refinery::handshake_node", "Starting channel {url}");
134                channel.clone().start(p2p.executor());
135
136                // Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if
137                // the handshake does not finish channel.stop() will never get called, resulting in
138                // zombie processes.
139                let timeout = Timer::after(Duration::from_secs(5));
140
141                pin_mut!(timeout);
142                pin_mut!(handshake);
143
144                let result = match select(handshake, timeout).await {
145                    Either::Left((Ok(_), _)) => {
146                        debug!(target: "net::refinery::handshake_node", "Handshake success!");
147                        true
148                    }
149                    Either::Left((Err(e), _)) => {
150                        debug!(target: "net::refinery::handshake_node", "Handshake error={e}");
151                        false
152                    }
153                    Either::Right((_, _)) => {
154                        debug!(target: "net::refinery::handshake_node", "Handshake timed out");
155                        false
156                    }
157                };
158
159                debug!(target: "net::refinery::handshake_node", "Stopping channel {url}");
160                channel.stop().await;
161
162                result
163            }
164
165            Err(e) => {
166                debug!(target: "net::refinery::handshake_node", "Failed to connect ({e})");
167                false
168            }
169        }
170    }
171}
172
173#[async_trait]
174impl Session for RefineSession {
175    fn p2p(&self) -> P2pPtr {
176        self.p2p.upgrade().unwrap()
177    }
178
179    fn type_id(&self) -> SessionBitFlag {
180        SESSION_REFINE
181    }
182
183    async fn reload(self: Arc<Self>) {}
184}
185
186/// Periodically probes entries in the greylist.
187///
188/// Randomly selects a greylist entry and tries to establish a local
189/// connection to it using the method handshake_node(), which creates a
190/// channel and does a version exchange using `perform_handshake_protocols()`.
191///
192/// If successful, the entry is removed from the greylist and added to the
193/// whitelist with an updated last_seen timestamp. If non-successful, the
194/// entry is removed from the greylist.
195pub struct GreylistRefinery {
196    /// Weak pointer to parent object
197    session: Weak<RefineSession>,
198    process: StoppableTaskPtr,
199}
200
201impl GreylistRefinery {
202    pub fn new(session: Weak<RefineSession>) -> Arc<Self> {
203        Arc::new(Self { session, process: StoppableTask::new() })
204    }
205
206    pub async fn start(self: Arc<Self>) {
207        let ex = self.p2p().executor();
208        self.process.clone().start(
209            async move {
210                self.run().await;
211                unreachable!();
212            },
213            // Ignore stop handler
214            |_| async {},
215            Error::NetworkServiceStopped,
216            ex,
217        );
218    }
219
220    pub async fn stop(self: Arc<Self>) {
221        self.process.stop().await;
222    }
223
224    // Randomly select a peer on the greylist and probe it. This method will remove from the
225    // greylist and store on the whitelist providing the peer is responsive.
226    async fn run(self: Arc<Self>) {
227        let hosts = self.p2p().hosts();
228
229        loop {
230            // Acquire read lock on P2P settings and load necessary settings
231            let settings = self.p2p().settings().read_arc().await;
232            let greylist_refinery_interval = settings.greylist_refinery_interval;
233            let time_with_no_connections = settings.time_with_no_connections;
234            let active_profiles = settings.active_profiles.clone();
235            drop(settings);
236
237            sleep(greylist_refinery_interval).await;
238
239            if hosts.container.is_empty(HostColor::Grey) {
240                debug!(target: "net::refinery",
241                "Greylist is empty! Cannot start refinery process");
242
243                continue
244            }
245
246            // Pause the refinery if we've had zero connections for longer than the configured
247            // limit.
248            let offline_limit = Duration::from_secs(time_with_no_connections);
249
250            let offline_timer =
251                { Instant::now().duration_since(*hosts.last_connection.lock().unwrap()) };
252
253            if !self.p2p().is_connected() && offline_timer >= offline_limit {
254                warn!(target: "net::refinery", "No connections for {}s. GreylistRefinery paused.",
255                          offline_timer.as_secs());
256
257                // It is necessary to Free suspended hosts at this point, otherwise these
258                // hosts cannot be connected to in Outbound Session. Failure to do this could
259                // result in the refinery being paused forver (since connections could never be
260                // made).
261                let suspended_hosts = hosts.suspended();
262                for host in suspended_hosts {
263                    if let Err(e) = hosts.unregister(&host) {
264                        warn!(target: "net::refinery", "Error while unregistering addr={host}, err={e}");
265                    }
266                }
267
268                continue
269            }
270
271            // Only attempt to refine peers that match our transports.
272            match hosts.container.fetch_random_with_schemes(HostColor::Grey, &active_profiles) {
273                Some((entry, _)) => {
274                    let url = &entry.0;
275
276                    if let Err(e) = hosts.try_register(url.clone(), HostState::Refine) {
277                        debug!(target: "net::refinery", "Unable to refine addr={}, err={e}",
278                               url.clone());
279                        continue
280                    }
281
282                    if !self.session().handshake_node(url.clone(), self.p2p().clone()).await {
283                        hosts.container.remove_if_exists(HostColor::Grey, url);
284
285                        debug!(
286                            target: "net::refinery",
287                            "Peer {url} handshake failed. Removed from greylist"
288                        );
289
290                        // Free up this addr for future operations.
291                        if let Err(e) = hosts.unregister(url) {
292                            warn!(target: "net::refinery", "Error while unregistering addr={url}, err={e}");
293                        }
294
295                        continue
296                    }
297                    debug!(
298                        target: "net::refinery",
299                        "Peer {url} handshake successful. Adding to whitelist"
300                    );
301                    let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
302
303                    hosts.whitelist_host(url, last_seen).await.unwrap();
304
305                    debug!(target: "net::refinery", "GreylistRefinery complete!");
306
307                    continue
308                }
309                None => {
310                    debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery");
311
312                    continue
313                }
314            }
315        }
316    }
317
318    fn session(&self) -> RefineSessionPtr {
319        self.session.upgrade().unwrap()
320    }
321
322    fn p2p(&self) -> P2pPtr {
323        self.session().p2p()
324    }
325}