darkfi/net/protocol/
protocol_version.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use futures::{
20    future::{join_all, select, Either},
21    pin_mut,
22};
23use smol::{lock::RwLock as AsyncRwLock, Executor, Timer};
24use std::{
25    sync::Arc,
26    time::{Duration, UNIX_EPOCH},
27};
28use tracing::{debug, error};
29
30use super::super::{
31    channel::ChannelPtr,
32    message::{VerackMessage, VersionMessage},
33    message_publisher::MessageSubscription,
34    settings::Settings,
35};
36use crate::{
37    net::{session::SESSION_OUTBOUND, BanPolicy},
38    Error, Result,
39};
40
41/// Implements the protocol version handshake sent out by nodes at
42/// the beginning of a connection.
43pub struct ProtocolVersion {
44    channel: ChannelPtr,
45    version_sub: MessageSubscription<VersionMessage>,
46    verack_sub: MessageSubscription<VerackMessage>,
47    settings: Arc<AsyncRwLock<Settings>>,
48}
49
50impl ProtocolVersion {
51    /// Create a new version protocol. Makes a version and version ack
52    /// subscription, then adds them to a version protocol instance.
53    // TODO: This function takes settings as a param, however, it is also reachable through Channel.
54    //       Maybe we want to navigate towards Settings through channel->session->p2p->settings
55    pub async fn new(channel: ChannelPtr, settings: Arc<AsyncRwLock<Settings>>) -> Arc<Self> {
56        // Creates a version subscription
57        let version_sub =
58            channel.subscribe_msg::<VersionMessage>().await.expect("Missing version dispatcher!");
59
60        // Creates a version acknowledgement subscription
61        let verack_sub =
62            channel.subscribe_msg::<VerackMessage>().await.expect("Missing verack dispatcher!");
63
64        Arc::new(Self { channel, version_sub, verack_sub, settings })
65    }
66
67    /// Start version information exchange. Start the timer. Send version
68    /// info and wait for version ack. Wait for version info and send
69    /// version ack.
70    pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
71        debug!(target: "net::protocol_version::run", "START => address={}", self.channel.display_address());
72        let channel_handshake_timeout =
73            self.settings.read().await.channel_handshake_timeout(self.channel.address().scheme());
74
75        let timeout = Timer::after(Duration::from_secs(channel_handshake_timeout));
76        let version = self.clone().exchange_versions(executor);
77
78        pin_mut!(timeout);
79        pin_mut!(version);
80
81        // Run timer and version exchange at the same time. Either deal
82        // with the success or failure of the version exchange or
83        // time out.
84        match select(version, timeout).await {
85            Either::Left((Ok(_), _)) => {
86                debug!(target: "net::protocol_version::run", "END => address={}",
87                self.channel.display_address());
88
89                Ok(())
90            }
91            Either::Left((Err(e), _)) => {
92                error!(
93                    target: "net::protocol_version::run",
94                    "[P2P] Version Exchange failed [{}]: {e}",
95                    self.channel.display_address()
96                );
97
98                self.channel.stop().await;
99                Err(e)
100            }
101
102            Either::Right((_, _)) => {
103                error!(
104                    target: "net::protocol_version::run",
105                    "[P2P] Version Exchange timed out [{}]",
106                    self.channel.display_address(),
107                );
108
109                self.channel.stop().await;
110                Err(Error::ChannelTimeout)
111            }
112        }
113    }
114
115    /// Send and receive version information
116    async fn exchange_versions(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
117        debug!(
118            target: "net::protocol_version::exchange_versions",
119            "START => address={}", self.channel.display_address(),
120        );
121
122        let send = executor.spawn(self.clone().send_version());
123        let recv = executor.spawn(self.clone().recv_version());
124
125        let rets = join_all(vec![send, recv]).await;
126        if let Err(e) = &rets[0] {
127            error!(
128                target: "net::protocol_version::exchange_versions",
129                "send_version() failed: {e}"
130            );
131            return Err(e.clone())
132        }
133
134        if let Err(e) = &rets[1] {
135            error!(
136                target: "net::protocol_version::exchange_versions",
137                "recv_version() failed: {e}"
138            );
139            return Err(e.clone())
140        }
141
142        debug!(
143            target: "net::protocol_version::exchange_versions",
144            "END => address={}", self.channel.display_address(),
145        );
146        Ok(())
147    }
148
149    /// Send version info and wait for version acknowledgement.
150    /// Ensures that the app version is the same.
151    async fn send_version(self: Arc<Self>) -> Result<()> {
152        debug!(
153            target: "net::protocol_version::send_version",
154            "START => address={}", self.channel.display_address(),
155        );
156
157        let settings = self.settings.read().await;
158        let node_id = settings.node_id.clone();
159        let app_version = settings.app_version.clone();
160        let app_name = settings.app_name.clone();
161        drop(settings);
162
163        let external_addrs = self.channel.hosts().external_addrs().await;
164
165        let version = VersionMessage {
166            node_id,
167            app_name: app_name.clone(),
168            version: app_version.clone(),
169            timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(),
170            connect_recv_addr: self.channel.connect_addr().clone(),
171            resolve_recv_addr: self.channel.resolve_addr(),
172            ext_send_addr: external_addrs,
173            /* NOTE: `features` is a list of enabled features in the
174            format Vec<(service, version)>. In the future, Protocols will
175            add their own data to this field when they are attached.*/
176            features: vec![],
177        };
178        self.channel.send(&version).await?;
179
180        // Wait for verack
181        let verack_msg = self.verack_sub.receive().await?;
182
183        // Validate peer received version against our version.
184        debug!(
185            target: "net::protocol_version::send_version",
186            "App version: {app_version}, Recv version: {}",
187            verack_msg.app_version,
188        );
189
190        // MAJOR and MINOR should be the same, as well as the app identifier
191        if app_version.major != verack_msg.app_version.major ||
192            app_version.minor != verack_msg.app_version.minor ||
193            app_name != verack_msg.app_name
194        {
195            error!(
196                target: "net::protocol_version::send_version",
197                "[P2P] Version mismatch from {}. Disconnecting...",
198                self.channel.display_address(),
199            );
200
201            // If it is outbound, ban the host so we don't share it with other nodes
202            if self.channel.session_type_id() & SESSION_OUTBOUND != 0 {
203                if let BanPolicy::Strict = self.channel.p2p().settings().read().await.ban_policy {
204                    self.channel.ban().await;
205                }
206            }
207
208            self.channel.stop().await;
209            return Err(Error::ChannelStopped)
210        }
211
212        // Versions are compatible
213        debug!(
214            target: "net::protocol_version::send_version",
215            "END => address={}", self.channel.display_address(),
216        );
217        Ok(())
218    }
219
220    /// Receive version info, check the message is okay and send verack
221    /// with app version attached.
222    async fn recv_version(self: Arc<Self>) -> Result<()> {
223        debug!(
224            target: "net::protocol_version::recv_version",
225            "START => address={}", self.channel.display_address(),
226        );
227
228        // Receive version message
229        let version = self.version_sub.receive().await?;
230        if let Some(ipv6_addr) = version.get_ipv6_addr() {
231            let hosts = self.channel.p2p().hosts();
232            hosts.add_auto_addr(ipv6_addr);
233        }
234        self.channel.set_version(version).await;
235
236        // Send verack
237        let settings = self.settings.read().await;
238        let app_version = settings.app_version.clone();
239        let app_name = settings.app_name.clone();
240        drop(settings);
241
242        let verack = VerackMessage { app_version, app_name };
243        self.channel.send(&verack).await?;
244
245        debug!(
246            target: "net::protocol_version::recv_version",
247            "END => address={}", self.channel.display_address(),
248        );
249        Ok(())
250    }
251}