1use async_trait::async_trait;
20use smol::{lock::RwLock as AsyncRwLock, Executor};
21use std::{sync::Arc, time::UNIX_EPOCH};
22use tracing::debug;
23use url::Url;
24
25use super::{
26 super::{
27 channel::ChannelPtr,
28 hosts::{HostColor, HostsPtr},
29 message::{AddrsMessage, GetAddrsMessage},
30 message_publisher::MessageSubscription,
31 p2p::P2pPtr,
32 session::SESSION_OUTBOUND,
33 settings::Settings,
34 },
35 protocol_base::{ProtocolBase, ProtocolBasePtr},
36 protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
37};
38use crate::Result;
39
40pub struct ProtocolAddress {
63 channel: ChannelPtr,
64 addrs_sub: MessageSubscription<AddrsMessage>,
65 get_addrs_sub: MessageSubscription<GetAddrsMessage>,
66 hosts: HostsPtr,
67 settings: Arc<AsyncRwLock<Settings>>,
68 jobsman: ProtocolJobsManagerPtr,
69}
70
71const PROTO_NAME: &str = "ProtocolAddress";
72
73const TRANSPORT_COMBOS: [&str; 9] =
78 ["tor", "tls", "tcp", "nym", "i2p", "tor+tls", "nym+tls", "tcp+tls", "i2p+tls"];
79
80fn strip_query_params(url: &Url) -> Url {
85 let mut stripped = url.clone();
86 stripped.set_query(None);
87 stripped
88}
89
90impl ProtocolAddress {
91 pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
95 let addrs_sub =
97 channel.subscribe_msg::<AddrsMessage>().await.expect("Missing addrs dispatcher!");
98
99 let get_addrs_sub =
101 channel.subscribe_msg::<GetAddrsMessage>().await.expect("Missing getaddrs dispatcher!");
102
103 Arc::new(Self {
104 channel: channel.clone(),
105 addrs_sub,
106 get_addrs_sub,
107 hosts: p2p.hosts(),
108 jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
109 settings: p2p.settings(),
110 })
111 }
112
113 async fn handle_receive_addrs(self: Arc<Self>) -> Result<()> {
117 debug!(
118 target: "net::protocol_address::handle_receive_addrs",
119 "[START] address={}", self.channel.display_address(),
120 );
121
122 loop {
123 let addrs_msg = self.addrs_sub.receive().await?;
124 debug!(
125 target: "net::protocol_address::handle_receive_addrs",
126 "Received {} addrs from {}", addrs_msg.addrs.len(), self.channel.display_address(),
127 );
128
129 debug!(
130 target: "net::protocol_address::handle_receive_addrs",
131 "Appending to greylist...",
132 );
133
134 self.hosts.insert(HostColor::Grey, &addrs_msg.addrs).await;
135 }
136 }
137
138 async fn handle_receive_get_addrs(self: Arc<Self>) -> Result<()> {
142 debug!(
143 target: "net::protocol_address::handle_receive_get_addrs",
144 "[START] address={}", self.channel.display_address(),
145 );
146
147 loop {
148 let get_addrs_msg = self.get_addrs_sub.receive().await?;
149
150 debug!(
151 target: "net::protocol_address::handle_receive_get_addrs",
152 "Received GetAddrs({}) message from {}", get_addrs_msg.max, self.channel.display_address(),
153 );
154
155 let requested_transports: Vec<String> = get_addrs_msg
157 .transports
158 .iter()
159 .filter(|tp| TRANSPORT_COMBOS.contains(&tp.as_str()))
160 .cloned()
161 .collect();
162
163 debug!(target: "net::protocol_address::handle_receive_get_addrs",
165 "Fetching gold entries with schemes");
166 let mut addrs = self.hosts.container.fetch_n_random_with_schemes(
167 HostColor::Gold,
168 &requested_transports,
169 get_addrs_msg.max as usize,
170 );
171
172 debug!(target: "net::protocol_address::handle_receive_get_addrs",
174 "Fetching whitelist entries with schemes");
175 addrs.append(&mut self.hosts.container.fetch_n_random_with_schemes(
176 HostColor::White,
177 &requested_transports,
178 get_addrs_msg.max as usize,
179 ));
180
181 debug!(target: "net::protocol_address::handle_receive_get_addrs",
186 "Fetching gold entries without schemes");
187 let remain = 2 * get_addrs_msg.max as usize - addrs.len();
188 addrs.append(&mut self.hosts.container.fetch_n_random_excluding_schemes(
189 HostColor::Gold,
190 &requested_transports,
191 remain,
192 ));
193
194 debug!(target: "net::protocol_address::handle_receive_get_addrs",
196 "Fetching white entries without schemes");
197 let remain = 2 * get_addrs_msg.max as usize - addrs.len();
198 addrs.append(&mut self.hosts.container.fetch_n_random_excluding_schemes(
199 HostColor::White,
200 &requested_transports,
201 remain,
202 ));
203
204 debug!(target: "net::protocol_address::handle_receive_get_addrs",
212 "Fetching dark entries");
213 let remain = 2 * get_addrs_msg.max as usize - addrs.len();
214 addrs.append(&mut self.hosts.container.fetch_n_random(HostColor::Dark, remain));
215
216 addrs.retain(|addr| TRANSPORT_COMBOS.contains(&addr.0.scheme()));
218
219 debug!(
220 target: "net::protocol_address::handle_receive_get_addrs",
221 "Sending {} addresses to {}", addrs.len(), self.channel.display_address(),
222 );
223
224 let addrs_msg = AddrsMessage { addrs };
225 self.channel.send(&addrs_msg).await?;
226 }
227 }
228
229 async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
232 debug!(
233 target: "net::protocol_address::send_my_addrs",
234 "[START] channel address={}", self.channel.display_address(),
235 );
236
237 if self.channel.session_type_id() != SESSION_OUTBOUND {
238 debug!(
239 target: "net::protocol_address::send_my_addrs",
240 "Not an outbound session. Stopping",
241 );
242 return Ok(())
243 }
244
245 let external_addrs = self.channel.hosts().external_addrs().await;
246
247 if external_addrs.is_empty() {
248 debug!(
249 target: "net::protocol_address::send_my_addrs",
250 "External addr not configured. Stopping",
251 );
252 return Ok(())
253 }
254
255 let mut addrs = vec![];
256
257 for addr in external_addrs {
258 let stripped_addr = strip_query_params(&addr);
259 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
260 addrs.push((stripped_addr, last_seen));
261 }
262
263 debug!(
264 target: "net::protocol_address::send_my_addrs",
265 "Broadcasting {} addresses", addrs.len(),
266 );
267
268 let ext_addr_msg = AddrsMessage { addrs };
269 self.channel.send(&ext_addr_msg).await?;
270
271 debug!(
272 target: "net::protocol_address::send_my_addrs",
273 "[END] channel address={}", self.channel.display_address(),
274 );
275
276 Ok(())
277 }
278}
279
280#[async_trait]
281impl ProtocolBase for ProtocolAddress {
282 async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
287 debug!(
288 target: "net::protocol_address::start",
289 "START => address={}", self.channel.display_address(),
290 );
291
292 let settings = self.settings.read().await;
293 let outbound_connections = settings.outbound_connections;
294 let active_profiles = settings.active_profiles.clone();
295 let getaddrs_max = settings.getaddrs_max;
296 drop(settings);
297
298 self.jobsman.clone().start(ex.clone());
299
300 self.jobsman.clone().spawn(self.clone().send_my_addrs(), ex.clone()).await;
301
302 self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), ex.clone()).await;
303
304 self.jobsman.spawn(self.clone().handle_receive_get_addrs(), ex).await;
305
306 let get_addrs = GetAddrsMessage {
309 max: getaddrs_max.unwrap_or(outbound_connections.min(u32::MAX as usize) as u32),
310 transports: active_profiles,
311 };
312 self.channel.send(&get_addrs).await?;
313
314 debug!(
315 target: "net::protocol_address::start",
316 "END => address={}", self.channel.display_address(),
317 );
318
319 Ok(())
320 }
321 fn name(&self) -> &'static str {
322 PROTO_NAME
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use darkfi_serial::serialize;
329
330 use crate::net::message::GET_ADDRS_MAX_BYTES;
331
332 use super::{GetAddrsMessage, TRANSPORT_COMBOS};
333
334 #[test]
336 fn test_get_addrs_msg_size() {
337 let message = GetAddrsMessage {
338 max: u8::MAX as u32,
339 transports: TRANSPORT_COMBOS.iter().map(|x| x.to_string()).collect(),
340 };
341
342 assert_eq!(serialize(&message).len() as u64, GET_ADDRS_MAX_BYTES);
343 }
344}