1use async_trait::async_trait;
20use smol::{lock::RwLock as AsyncRwLock, Executor};
21use std::{sync::Arc, time::UNIX_EPOCH};
22use tracing::debug;
23
24use super::{
25 super::{
26 channel::ChannelPtr,
27 hosts::{HostColor, HostsPtr},
28 message::{AddrsMessage, GetAddrsMessage},
29 message_publisher::MessageSubscription,
30 p2p::P2pPtr,
31 session::SESSION_OUTBOUND,
32 settings::Settings,
33 },
34 protocol_base::{ProtocolBase, ProtocolBasePtr},
35 protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
36};
37use crate::Result;
38
39pub struct ProtocolAddress {
62 channel: ChannelPtr,
63 addrs_sub: MessageSubscription<AddrsMessage>,
64 get_addrs_sub: MessageSubscription<GetAddrsMessage>,
65 hosts: HostsPtr,
66 settings: Arc<AsyncRwLock<Settings>>,
67 jobsman: ProtocolJobsManagerPtr,
68}
69
70const PROTO_NAME: &str = "ProtocolAddress";
71
72const TRANSPORT_COMBOS: [&str; 9] =
77 ["tor", "tls", "tcp", "nym", "i2p", "tor+tls", "nym+tls", "tcp+tls", "i2p+tls"];
78
79impl ProtocolAddress {
80 pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
84 let addrs_sub =
86 channel.subscribe_msg::<AddrsMessage>().await.expect("Missing addrs dispatcher!");
87
88 let get_addrs_sub =
90 channel.subscribe_msg::<GetAddrsMessage>().await.expect("Missing getaddrs dispatcher!");
91
92 Arc::new(Self {
93 channel: channel.clone(),
94 addrs_sub,
95 get_addrs_sub,
96 hosts: p2p.hosts(),
97 jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
98 settings: p2p.settings(),
99 })
100 }
101
102 async fn handle_receive_addrs(self: Arc<Self>) -> Result<()> {
106 debug!(
107 target: "net::protocol_address::handle_receive_addrs",
108 "[START] address={}", self.channel.display_address(),
109 );
110
111 loop {
112 let addrs_msg = self.addrs_sub.receive().await?;
113 debug!(
114 target: "net::protocol_address::handle_receive_addrs",
115 "Received {} addrs from {}", addrs_msg.addrs.len(), self.channel.display_address(),
116 );
117
118 debug!(
119 target: "net::protocol_address::handle_receive_addrs",
120 "Appending to greylist...",
121 );
122
123 self.hosts.insert(HostColor::Grey, &addrs_msg.addrs).await;
124 }
125 }
126
127 async fn handle_receive_get_addrs(self: Arc<Self>) -> Result<()> {
131 debug!(
132 target: "net::protocol_address::handle_receive_get_addrs",
133 "[START] address={}", self.channel.display_address(),
134 );
135
136 loop {
137 let get_addrs_msg = self.get_addrs_sub.receive().await?;
138
139 debug!(
140 target: "net::protocol_address::handle_receive_get_addrs",
141 "Received GetAddrs({}) message from {}", get_addrs_msg.max, self.channel.display_address(),
142 );
143
144 let requested_transports: Vec<String> = get_addrs_msg
146 .transports
147 .iter()
148 .filter(|tp| TRANSPORT_COMBOS.contains(&tp.as_str()))
149 .cloned()
150 .collect();
151
152 debug!(target: "net::protocol_address::handle_receive_get_addrs",
154 "Fetching gold entries with schemes");
155 let mut addrs = self.hosts.container.fetch_n_random_with_schemes(
156 HostColor::Gold,
157 &requested_transports,
158 get_addrs_msg.max,
159 );
160
161 debug!(target: "net::protocol_address::handle_receive_get_addrs",
163 "Fetching whitelist entries with schemes");
164 addrs.append(&mut self.hosts.container.fetch_n_random_with_schemes(
165 HostColor::White,
166 &requested_transports,
167 get_addrs_msg.max,
168 ));
169
170 debug!(target: "net::protocol_address::handle_receive_get_addrs",
175 "Fetching gold entries without schemes");
176 let remain = 2 * get_addrs_msg.max - addrs.len() as u32;
177 addrs.append(&mut self.hosts.container.fetch_n_random_excluding_schemes(
178 HostColor::Gold,
179 &requested_transports,
180 remain,
181 ));
182
183 debug!(target: "net::protocol_address::handle_receive_get_addrs",
185 "Fetching white entries without schemes");
186 let remain = 2 * get_addrs_msg.max - addrs.len() as u32;
187 addrs.append(&mut self.hosts.container.fetch_n_random_excluding_schemes(
188 HostColor::White,
189 &requested_transports,
190 remain,
191 ));
192
193 debug!(target: "net::protocol_address::handle_receive_get_addrs",
201 "Fetching dark entries");
202 let remain = 2 * get_addrs_msg.max - addrs.len() as u32;
203 addrs.append(&mut self.hosts.container.fetch_n_random(HostColor::Dark, remain));
204
205 addrs.retain(|addr| TRANSPORT_COMBOS.contains(&addr.0.scheme()));
207
208 debug!(
209 target: "net::protocol_address::handle_receive_get_addrs",
210 "Sending {} addresses to {}", addrs.len(), self.channel.display_address(),
211 );
212
213 let addrs_msg = AddrsMessage { addrs };
214 self.channel.send(&addrs_msg).await?;
215 }
216 }
217
218 async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
221 debug!(
222 target: "net::protocol_address::send_my_addrs",
223 "[START] channel address={}", self.channel.display_address(),
224 );
225
226 if self.channel.session_type_id() != SESSION_OUTBOUND {
227 debug!(
228 target: "net::protocol_address::send_my_addrs",
229 "Not an outbound session. Stopping",
230 );
231 return Ok(())
232 }
233
234 let external_addrs = self.channel.hosts().external_addrs().await;
235
236 if external_addrs.is_empty() {
237 debug!(
238 target: "net::protocol_address::send_my_addrs",
239 "External addr not configured. Stopping",
240 );
241 return Ok(())
242 }
243
244 let mut addrs = vec![];
245
246 for addr in external_addrs {
247 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
248 addrs.push((addr, last_seen));
249 }
250
251 debug!(
252 target: "net::protocol_address::send_my_addrs",
253 "Broadcasting {} addresses", addrs.len(),
254 );
255
256 let ext_addr_msg = AddrsMessage { addrs };
257 self.channel.send(&ext_addr_msg).await?;
258
259 debug!(
260 target: "net::protocol_address::send_my_addrs",
261 "[END] channel address={}", self.channel.display_address(),
262 );
263
264 Ok(())
265 }
266}
267
268#[async_trait]
269impl ProtocolBase for ProtocolAddress {
270 async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
275 debug!(
276 target: "net::protocol_address::start",
277 "START => address={}", self.channel.display_address(),
278 );
279
280 let settings = self.settings.read().await;
281 let outbound_connections = settings.outbound_connections;
282 let active_profiles = settings.active_profiles.clone();
283 let getaddrs_max = settings.getaddrs_max;
284 drop(settings);
285
286 self.jobsman.clone().start(ex.clone());
287
288 self.jobsman.clone().spawn(self.clone().send_my_addrs(), ex.clone()).await;
289
290 self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), ex.clone()).await;
291
292 self.jobsman.spawn(self.clone().handle_receive_get_addrs(), ex).await;
293
294 let get_addrs = GetAddrsMessage {
297 max: getaddrs_max.unwrap_or(outbound_connections.min(u32::MAX as usize) as u32),
298 transports: active_profiles,
299 };
300 self.channel.send(&get_addrs).await?;
301
302 debug!(
303 target: "net::protocol_address::start",
304 "END => address={}", self.channel.display_address(),
305 );
306
307 Ok(())
308 }
309 fn name(&self) -> &'static str {
310 PROTO_NAME
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use darkfi_serial::serialize;
317
318 use crate::net::message::GET_ADDRS_MAX_BYTES;
319
320 use super::{GetAddrsMessage, TRANSPORT_COMBOS};
321
322 #[test]
324 fn test_get_addrs_msg_size() {
325 let message = GetAddrsMessage {
326 max: u8::MAX as u32,
327 transports: TRANSPORT_COMBOS.iter().map(|x| x.to_string()).collect(),
328 };
329
330 assert_eq!(serialize(&message).len() as u64, GET_ADDRS_MAX_BYTES);
331 }
332}