darkfi/net/protocol/
protocol_ping.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    sync::Arc,
21    time::{Duration, Instant},
22};
23
24use async_trait::async_trait;
25use rand::{rngs::OsRng, Rng};
26use smol::{lock::RwLock as AsyncRwLock, Executor};
27use tracing::{debug, error, warn};
28
29use super::{
30    super::{
31        channel::ChannelPtr,
32        message::{PingMessage, PongMessage},
33        message_publisher::MessageSubscription,
34        p2p::P2pPtr,
35        settings::Settings,
36    },
37    protocol_base::{ProtocolBase, ProtocolBasePtr},
38    protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
39};
40use crate::{
41    system::{sleep, timeout::timeout},
42    Error, Result,
43};
44
45/// Defines ping and pong messages
46pub struct ProtocolPing {
47    channel: ChannelPtr,
48    ping_sub: MessageSubscription<PingMessage>,
49    pong_sub: MessageSubscription<PongMessage>,
50    settings: Arc<AsyncRwLock<Settings>>,
51    jobsman: ProtocolJobsManagerPtr,
52}
53
54const PROTO_NAME: &str = "ProtocolPing";
55
56impl ProtocolPing {
57    /// Create a new ping-pong protocol.
58    pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
59        // Creates a subscription to ping message
60        let ping_sub =
61            channel.subscribe_msg::<PingMessage>().await.expect("Missing ping dispatcher!");
62
63        // Creates a subscription to pong message
64        let pong_sub =
65            channel.subscribe_msg::<PongMessage>().await.expect("Missing pong dispatcher!");
66
67        Arc::new(Self {
68            channel: channel.clone(),
69            ping_sub,
70            pong_sub,
71            settings: p2p.settings(),
72            jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
73        })
74    }
75
76    /// Runs the ping-pong protocol. Creates a subscription to pong, then
77    /// starts a loop. Loop sleeps for the duration of the channel heartbeat,
78    /// then sends a ping message with a random nonce. Loop starts a timer,
79    /// waits for the pong reply and ensures the nonce is the same.
80    async fn run_ping_pong(self: Arc<Self>) -> Result<()> {
81        debug!(
82            target: "net::protocol_ping::run_ping_pong",
83            "START => address={}", self.channel.display_address(),
84        );
85
86        loop {
87            let settings = self.settings.read().await;
88            let outbound_connect_timeout =
89                settings.outbound_connect_timeout(self.channel.address().scheme());
90            let channel_heartbeat_interval =
91                settings.channel_heartbeat_interval(self.channel.address().scheme());
92            drop(settings);
93
94            // Create a random nonce.
95            let nonce = Self::random_nonce();
96
97            // Send ping message.
98            let ping = PingMessage { nonce };
99            self.channel.send(&ping).await?;
100
101            // Start the timer for the ping timer
102            let timer = Instant::now();
103
104            // Wait for pong, check nonce matches.
105            let pong_msg = match timeout(
106                Duration::from_secs(outbound_connect_timeout),
107                self.pong_sub.receive(),
108            )
109            .await
110            {
111                Ok(msg) => {
112                    // msg will be an error when the channel is stopped
113                    // so just yield out of this function.
114                    msg?
115                }
116                Err(_e) => {
117                    // Pong timeout. We didn't receive any message back
118                    // so close the connection.
119                    warn!(
120                        target: "net::protocol_ping::run_ping_pong",
121                        "[P2P] Ping-Pong protocol timed out for {}", self.channel.display_address(),
122                    );
123                    self.channel.stop().await;
124                    return Err(Error::ChannelStopped)
125                }
126            };
127
128            if pong_msg.nonce != nonce {
129                error!(
130                    target: "net::protocol_ping::run_ping_pong",
131                    "[P2P] Wrong nonce in pingpong, disconnecting {}",
132                    self.channel.display_address(),
133                );
134                self.channel.stop().await;
135                return Err(Error::ChannelStopped)
136            }
137
138            debug!(
139                target: "net::protocol_ping::run_ping_pong",
140                "Received Pong from {}: {:?}",
141                self.channel.display_address(),
142                timer.elapsed(),
143            );
144
145            // Sleep until next heartbeat
146            sleep(channel_heartbeat_interval).await;
147        }
148    }
149
150    /// Waits for ping, then replies with pong.
151    /// Copies ping's nonce into the pong reply.
152    async fn reply_to_ping(self: Arc<Self>) -> Result<()> {
153        debug!(
154            target: "net::protocol_ping::reply_to_ping",
155            "START => address={}", self.channel.display_address(),
156        );
157
158        loop {
159            // Wait for ping, reply with pong that has a matching nonce.
160            let ping = self.ping_sub.receive().await?;
161            debug!(
162                target: "net::protocol_ping::reply_to_ping",
163                "Received Ping from {}", self.channel.display_address(),
164            );
165
166            // Send pong message
167            let pong = PongMessage { nonce: ping.nonce };
168            self.channel.send(&pong).await?;
169
170            debug!(
171                target: "net::protocol_ping::reply_to_ping",
172                "Sent Pong reply to {}", self.channel.display_address(),
173            );
174        }
175    }
176
177    fn random_nonce() -> u16 {
178        OsRng::gen(&mut OsRng)
179    }
180}
181
182#[async_trait]
183impl ProtocolBase for ProtocolPing {
184    /// Starts ping-pong keepalive messages exchange. Runs ping-pong in the
185    /// protocol task manager, then queues the reply. Sends out a ping and
186    /// waits for pong reply. Waits for ping and replies with a pong.
187    async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
188        debug!(target: "net::protocol_ping::start", "START => address={}", self.channel.display_address());
189        self.jobsman.clone().start(ex.clone());
190        self.jobsman.clone().spawn(self.clone().run_ping_pong(), ex.clone()).await;
191        self.jobsman.clone().spawn(self.clone().reply_to_ping(), ex).await;
192        debug!(target: "net::protocol_ping::start", "END => address={}", self.channel.display_address());
193        Ok(())
194    }
195
196    fn name(&self) -> &'static str {
197        PROTO_NAME
198    }
199}