darkfi/net/protocol/
protocol_version.rs1use futures::{
20 future::{join_all, select, Either},
21 pin_mut,
22};
23use smol::{lock::RwLock as AsyncRwLock, Executor, Timer};
24use std::{
25 sync::Arc,
26 time::{Duration, UNIX_EPOCH},
27};
28use tracing::{debug, error};
29
30use super::super::{
31 channel::ChannelPtr,
32 message::{VerackMessage, VersionMessage},
33 message_publisher::MessageSubscription,
34 settings::Settings,
35};
36use crate::{
37 net::{session::SESSION_OUTBOUND, BanPolicy},
38 Error, Result,
39};
40
41pub struct ProtocolVersion {
44 channel: ChannelPtr,
45 version_sub: MessageSubscription<VersionMessage>,
46 verack_sub: MessageSubscription<VerackMessage>,
47 settings: Arc<AsyncRwLock<Settings>>,
48}
49
50impl ProtocolVersion {
51 pub async fn new(channel: ChannelPtr, settings: Arc<AsyncRwLock<Settings>>) -> Arc<Self> {
56 let version_sub =
58 channel.subscribe_msg::<VersionMessage>().await.expect("Missing version dispatcher!");
59
60 let verack_sub =
62 channel.subscribe_msg::<VerackMessage>().await.expect("Missing verack dispatcher!");
63
64 Arc::new(Self { channel, version_sub, verack_sub, settings })
65 }
66
67 pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
71 debug!(target: "net::protocol_version::run", "START => address={}", self.channel.display_address());
72 let channel_handshake_timeout =
73 self.settings.read().await.channel_handshake_timeout(self.channel.address().scheme());
74
75 let timeout = Timer::after(Duration::from_secs(channel_handshake_timeout));
76 let version = self.clone().exchange_versions(executor);
77
78 pin_mut!(timeout);
79 pin_mut!(version);
80
81 match select(version, timeout).await {
85 Either::Left((Ok(_), _)) => {
86 debug!(target: "net::protocol_version::run", "END => address={}",
87 self.channel.display_address());
88
89 Ok(())
90 }
91 Either::Left((Err(e), _)) => {
92 error!(
93 target: "net::protocol_version::run",
94 "[P2P] Version Exchange failed [{}]: {e}",
95 self.channel.display_address()
96 );
97
98 self.channel.stop().await;
99 Err(e)
100 }
101
102 Either::Right((_, _)) => {
103 error!(
104 target: "net::protocol_version::run",
105 "[P2P] Version Exchange timed out [{}]",
106 self.channel.display_address(),
107 );
108
109 self.channel.stop().await;
110 Err(Error::ChannelTimeout)
111 }
112 }
113 }
114
115 async fn exchange_versions(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
117 debug!(
118 target: "net::protocol_version::exchange_versions",
119 "START => address={}", self.channel.display_address(),
120 );
121
122 let send = executor.spawn(self.clone().send_version());
123 let recv = executor.spawn(self.clone().recv_version());
124
125 let rets = join_all(vec![send, recv]).await;
126 if let Err(e) = &rets[0] {
127 error!(
128 target: "net::protocol_version::exchange_versions",
129 "send_version() failed: {e}"
130 );
131 return Err(e.clone())
132 }
133
134 if let Err(e) = &rets[1] {
135 error!(
136 target: "net::protocol_version::exchange_versions",
137 "recv_version() failed: {e}"
138 );
139 return Err(e.clone())
140 }
141
142 debug!(
143 target: "net::protocol_version::exchange_versions",
144 "END => address={}", self.channel.display_address(),
145 );
146 Ok(())
147 }
148
149 async fn send_version(self: Arc<Self>) -> Result<()> {
152 debug!(
153 target: "net::protocol_version::send_version",
154 "START => address={}", self.channel.display_address(),
155 );
156
157 let settings = self.settings.read().await;
158 let node_id = settings.node_id.clone();
159 let app_version = settings.app_version.clone();
160 let app_name = settings.app_name.clone();
161 drop(settings);
162
163 let external_addrs = self.channel.hosts().external_addrs().await;
164
165 let version = VersionMessage {
166 node_id,
167 app_name: app_name.clone(),
168 version: app_version.clone(),
169 timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(),
170 connect_recv_addr: self.channel.connect_addr().clone(),
171 resolve_recv_addr: self.channel.resolve_addr(),
172 ext_send_addr: external_addrs,
173 features: vec![],
177 };
178 self.channel.send(&version).await?;
179
180 let verack_msg = self.verack_sub.receive().await?;
182
183 debug!(
185 target: "net::protocol_version::send_version",
186 "App version: {app_version}, Recv version: {}",
187 verack_msg.app_version,
188 );
189
190 if app_version.major != verack_msg.app_version.major ||
192 app_version.minor != verack_msg.app_version.minor ||
193 app_name != verack_msg.app_name
194 {
195 error!(
196 target: "net::protocol_version::send_version",
197 "[P2P] Version mismatch from {}. Disconnecting...",
198 self.channel.display_address(),
199 );
200
201 if self.channel.session_type_id() & SESSION_OUTBOUND != 0 {
203 if let BanPolicy::Strict = self.channel.p2p().settings().read().await.ban_policy {
204 self.channel.ban().await;
205 }
206 }
207
208 self.channel.stop().await;
209 return Err(Error::ChannelStopped)
210 }
211
212 debug!(
214 target: "net::protocol_version::send_version",
215 "END => address={}", self.channel.display_address(),
216 );
217 Ok(())
218 }
219
220 async fn recv_version(self: Arc<Self>) -> Result<()> {
223 debug!(
224 target: "net::protocol_version::recv_version",
225 "START => address={}", self.channel.display_address(),
226 );
227
228 let version = self.version_sub.receive().await?;
230 if let Some(ipv6_addr) = version.get_ipv6_addr() {
231 let hosts = self.channel.p2p().hosts();
232 hosts.add_auto_addr(ipv6_addr);
233 }
234 self.channel.set_version(version).await;
235
236 let settings = self.settings.read().await;
238 let app_version = settings.app_version.clone();
239 let app_name = settings.app_name.clone();
240 drop(settings);
241
242 let verack = VerackMessage { app_version, app_name };
243 self.channel.send(&verack).await?;
244
245 debug!(
246 target: "net::protocol_version::recv_version",
247 "END => address={}", self.channel.display_address(),
248 );
249 Ok(())
250 }
251}