darkfi/net/session/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    sync::{Arc, Weak},
21    time::UNIX_EPOCH,
22};
23
24use async_trait::async_trait;
25use smol::Executor;
26use tracing::{debug, error, trace};
27
28use super::{
29    channel::ChannelPtr,
30    dnet::{self, dnetev, DnetEvent},
31    hosts::HostColor,
32    p2p::P2pPtr,
33    protocol::ProtocolVersion,
34};
35use crate::{system::Subscription, util::logger::verbose, Error, Result};
36
37pub mod inbound_session;
38pub use inbound_session::{InboundSession, InboundSessionPtr};
39pub mod manual_session;
40pub use manual_session::{ManualSession, ManualSessionPtr};
41pub mod outbound_session;
42pub use outbound_session::{OutboundSession, OutboundSessionPtr};
43pub mod seedsync_session;
44pub use seedsync_session::{SeedSyncSession, SeedSyncSessionPtr};
45pub mod refine_session;
46pub use refine_session::{RefineSession, RefineSessionPtr};
47pub mod direct_session;
48pub use direct_session::{DirectSession, DirectSessionPtr};
49
50/// Bitwise selectors for the `protocol_registry`
51pub type SessionBitFlag = u32;
52pub const SESSION_INBOUND: SessionBitFlag = 0b000001;
53pub const SESSION_OUTBOUND: SessionBitFlag = 0b000010;
54pub const SESSION_MANUAL: SessionBitFlag = 0b000100;
55pub const SESSION_SEED: SessionBitFlag = 0b001000;
56pub const SESSION_REFINE: SessionBitFlag = 0b010000;
57pub const SESSION_DIRECT: SessionBitFlag = 0b100000;
58
59pub const SESSION_DEFAULT: SessionBitFlag = 0b100111;
60pub const SESSION_ALL: SessionBitFlag = 0b111111;
61
62pub type SessionWeakPtr = Weak<dyn Session + Send + Sync + 'static>;
63
64/// Removes channel from the list of connected channels when a stop signal
65/// is received.
66pub async fn remove_sub_on_stop(
67    p2p: P2pPtr,
68    channel: ChannelPtr,
69    type_id: SessionBitFlag,
70    stop_sub: Subscription<Error>,
71) {
72    debug!(target: "net::session::remove_sub_on_stop", "[START]");
73    let hosts = p2p.hosts();
74    let addr = channel.address();
75
76    stop_sub.receive().await;
77
78    debug!(
79        target: "net::session::remove_sub_on_stop",
80        "Received stop event. Removing channel {}",
81        channel.display_address()
82    );
83
84    // Downgrade to greylist if this is a outbound session.
85    if type_id & (SESSION_OUTBOUND | SESSION_DIRECT) != 0 {
86        debug!(
87            target: "net::session::remove_sub_on_stop",
88            "Downgrading {}",
89            channel.display_address()
90        );
91
92        // If the host we are downgrading has been moved to blacklist,
93        // fetch_last_seen(addr) can return None. We simply print an
94        // error in this case.
95        match hosts.fetch_last_seen(addr) {
96            Some(last_seen) => {
97                if let Err(e) = hosts.move_host(addr, last_seen, HostColor::Grey).await {
98                    error!(target: "net::session::remove_sub_on_stop",
99            "Failed to move host {} to Greylist! Err={e}", channel.display_address());
100                }
101            }
102            None => {
103                error!(target: "net::session::remove_sub_on_stop",
104               "Failed to fetch last seen for {}", channel.display_address());
105            }
106        }
107    }
108
109    // For all sessions that are not refine sessions, mark this addr as
110    // Free. `unregister()` frees up this addr for any future operation. We
111    // don't call this on refine sessions since the unregister() call
112    // happens in the refinery directly.
113    if type_id & SESSION_REFINE == 0 {
114        if let Err(e) = hosts.unregister(channel.address()) {
115            error!(target: "net::session::remove_sub_on_stop", "Error while unregistering addr={}, err={e}", channel.display_address());
116        }
117    }
118
119    if type_id & SESSION_DIRECT != 0 {
120        dnetev!(p2p.session_direct(), DirectDisconnected, {
121            connect_addr: channel.info.connect_addr.clone(),
122            err: "Channel stopped".to_string()
123        });
124        verbose!(
125            target: "net::direct_session",
126            "[P2P] Direct outbound disconnected [{}]",
127            channel.display_address()
128        );
129    }
130
131    if !p2p.is_connected() {
132        hosts.disconnect_publisher.notify(Error::NetworkNotConnected).await;
133    }
134    debug!(target: "net::session::remove_sub_on_stop", "[END]");
135}
136
137/// Session trait. Defines methods that are used across sessions.
138/// Implements registering the channel and initializing the channel by
139/// performing a network handshake.
140#[async_trait]
141pub trait Session: Sync {
142    /// Registers a new channel with the session.
143    /// Performs a network handshake and starts the channel.
144    /// If we need to pass `Self` as an `Arc` we can do so like this:
145    /// ```
146    /// pub trait MyTrait: Send + Sync {
147    ///     async fn foo(&self, self_: Arc<dyn MyTrait>) {}
148    /// }
149    /// ```
150    async fn register_channel(
151        &self,
152        channel: ChannelPtr,
153        executor: Arc<Executor<'_>>,
154    ) -> Result<()> {
155        trace!(target: "net::session::register_channel", "[START]");
156
157        // Protocols should all be initialized but not started.
158        // We do this so that the protocols can begin receiving and buffering
159        // messages while the handshake protocol is ongoing. They are currently
160        // in sleep mode.
161        let p2p = self.p2p();
162        let protocols =
163            p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await;
164
165        // Perform the handshake protocol
166        let protocol_version = ProtocolVersion::new(channel.clone(), p2p.settings().clone()).await;
167        debug!(
168            target: "net::session::register_channel",
169            "Performing handshake protocols {}", channel.clone().display_address(),
170        );
171
172        let handshake_task =
173            self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
174
175        // Switch on the channel
176        channel.clone().start(executor.clone());
177
178        // Wait for handshake to finish.
179        match handshake_task.await {
180            Ok(()) => {
181                debug!(target: "net::session::register_channel",
182                "Handshake successful {}", channel.clone().display_address());
183            }
184            Err(e) => {
185                debug!(target: "net::session::register_channel",
186                "Handshake error {e} {}", channel.clone().display_address());
187
188                return Err(e)
189            }
190        }
191
192        // Now the channel is ready
193        debug!(target: "net::session::register_channel", "Session handshake complete");
194        debug!(target: "net::session::register_channel", "Activating remaining protocols");
195
196        // Now start all the protocols. They are responsible for managing their own
197        // lifetimes and correctly selfdestructing when the channel ends.
198        for protocol in protocols {
199            protocol.start(executor.clone()).await?;
200        }
201
202        trace!(target: "net::session::register_channel", "[END]");
203
204        Ok(())
205    }
206
207    /// Performs network handshake to initialize channel. Adds the channel to
208    /// the list of connected channels, and prepares to remove the channel when
209    /// a stop signal is received.
210    async fn perform_handshake_protocols(
211        &self,
212        protocol_version: Arc<ProtocolVersion>,
213        channel: ChannelPtr,
214        executor: Arc<Executor<'_>>,
215    ) -> Result<()> {
216        // Subscribe to stop events
217        let stop_sub = channel.clone().subscribe_stop().await?;
218
219        // Perform handshake
220        match protocol_version.run(executor.clone()).await {
221            Ok(()) => {
222                // Upgrade to goldlist if this is a outbound session.
223                if self.type_id() & (SESSION_OUTBOUND | SESSION_DIRECT) != 0 {
224                    debug!(
225                        target: "net::session::perform_handshake_protocols",
226                        "Upgrading {}", channel.display_address(),
227                    );
228
229                    let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
230
231                    self.p2p()
232                        .hosts()
233                        .move_host(channel.address(), last_seen, HostColor::Gold)
234                        .await?;
235                }
236
237                // Attempt to add channel to registry
238                self.p2p().hosts().register_channel(channel.clone()).await;
239
240                // Subscribe to stop, so we can remove from registry
241                executor
242                    .spawn(remove_sub_on_stop(self.p2p(), channel, self.type_id(), stop_sub))
243                    .detach();
244
245                // Channel is ready for use
246                Ok(())
247            }
248            Err(e) => return Err(e),
249        }
250    }
251
252    /// Returns a pointer to the p2p network interface
253    fn p2p(&self) -> P2pPtr;
254
255    /// Return the session bit flag for the session type
256    fn type_id(&self) -> SessionBitFlag;
257
258    /// Reload settings for this session
259    async fn reload(self: Arc<Self>);
260}