darkfi/net/protocol/
protocol_ping.rs1use std::{
20 sync::Arc,
21 time::{Duration, Instant},
22};
23
24use async_trait::async_trait;
25use rand::{rngs::OsRng, Rng};
26use smol::{lock::RwLock as AsyncRwLock, Executor};
27use tracing::{debug, error, warn};
28
29use super::{
30 super::{
31 channel::ChannelPtr,
32 message::{PingMessage, PongMessage},
33 message_publisher::MessageSubscription,
34 p2p::P2pPtr,
35 settings::Settings,
36 },
37 protocol_base::{ProtocolBase, ProtocolBasePtr},
38 protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
39};
40use crate::{
41 system::{sleep, timeout::timeout},
42 Error, Result,
43};
44
45pub struct ProtocolPing {
47 channel: ChannelPtr,
48 ping_sub: MessageSubscription<PingMessage>,
49 pong_sub: MessageSubscription<PongMessage>,
50 settings: Arc<AsyncRwLock<Settings>>,
51 jobsman: ProtocolJobsManagerPtr,
52}
53
54const PROTO_NAME: &str = "ProtocolPing";
55
56impl ProtocolPing {
57 pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
59 let ping_sub =
61 channel.subscribe_msg::<PingMessage>().await.expect("Missing ping dispatcher!");
62
63 let pong_sub =
65 channel.subscribe_msg::<PongMessage>().await.expect("Missing pong dispatcher!");
66
67 Arc::new(Self {
68 channel: channel.clone(),
69 ping_sub,
70 pong_sub,
71 settings: p2p.settings(),
72 jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
73 })
74 }
75
76 async fn run_ping_pong(self: Arc<Self>) -> Result<()> {
81 debug!(
82 target: "net::protocol_ping::run_ping_pong",
83 "START => address={}", self.channel.display_address(),
84 );
85
86 loop {
87 let settings = self.settings.read().await;
88 let outbound_connect_timeout =
89 settings.outbound_connect_timeout(self.channel.address().scheme());
90 let channel_heartbeat_interval =
91 settings.channel_heartbeat_interval(self.channel.address().scheme());
92 drop(settings);
93
94 let nonce = Self::random_nonce();
96
97 let ping = PingMessage { nonce };
99 self.channel.send(&ping).await?;
100
101 let timer = Instant::now();
103
104 let pong_msg = match timeout(
106 Duration::from_secs(outbound_connect_timeout),
107 self.pong_sub.receive(),
108 )
109 .await
110 {
111 Ok(msg) => {
112 msg?
115 }
116 Err(_e) => {
117 warn!(
120 target: "net::protocol_ping::run_ping_pong",
121 "[P2P] Ping-Pong protocol timed out for {}", self.channel.display_address(),
122 );
123 self.channel.stop().await;
124 return Err(Error::ChannelStopped)
125 }
126 };
127
128 if pong_msg.nonce != nonce {
129 error!(
130 target: "net::protocol_ping::run_ping_pong",
131 "[P2P] Wrong nonce in pingpong, disconnecting {}",
132 self.channel.display_address(),
133 );
134 self.channel.stop().await;
135 return Err(Error::ChannelStopped)
136 }
137
138 debug!(
139 target: "net::protocol_ping::run_ping_pong",
140 "Received Pong from {}: {:?}",
141 self.channel.display_address(),
142 timer.elapsed(),
143 );
144
145 sleep(channel_heartbeat_interval).await;
147 }
148 }
149
150 async fn reply_to_ping(self: Arc<Self>) -> Result<()> {
153 debug!(
154 target: "net::protocol_ping::reply_to_ping",
155 "START => address={}", self.channel.display_address(),
156 );
157
158 loop {
159 let ping = self.ping_sub.receive().await?;
161 debug!(
162 target: "net::protocol_ping::reply_to_ping",
163 "Received Ping from {}", self.channel.display_address(),
164 );
165
166 let pong = PongMessage { nonce: ping.nonce };
168 self.channel.send(&pong).await?;
169
170 debug!(
171 target: "net::protocol_ping::reply_to_ping",
172 "Sent Pong reply to {}", self.channel.display_address(),
173 );
174 }
175 }
176
177 fn random_nonce() -> u16 {
178 OsRng::gen(&mut OsRng)
179 }
180}
181
182#[async_trait]
183impl ProtocolBase for ProtocolPing {
184 async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
188 debug!(target: "net::protocol_ping::start", "START => address={}", self.channel.display_address());
189 self.jobsman.clone().start(ex.clone());
190 self.jobsman.clone().spawn(self.clone().run_ping_pong(), ex.clone()).await;
191 self.jobsman.clone().spawn(self.clone().reply_to_ping(), ex).await;
192 debug!(target: "net::protocol_ping::start", "END => address={}", self.channel.display_address());
193 Ok(())
194 }
195
196 fn name(&self) -> &'static str {
197 PROTO_NAME
198 }
199}