darkfi/net/session/
refine_session.rs1use futures::{
29 future::{select, Either},
30 pin_mut,
31};
32use smol::Timer;
33use std::{
34 sync::{Arc, Weak},
35 time::{Duration, Instant, UNIX_EPOCH},
36};
37
38use async_trait::async_trait;
39use tracing::{debug, warn};
40use url::Url;
41
42use super::super::p2p::{P2p, P2pPtr};
43
44use crate::{
45 net::{
46 connector::Connector,
47 hosts::{HostColor, HostState},
48 protocol::ProtocolVersion,
49 session::{Session, SessionBitFlag, SESSION_REFINE},
50 },
51 system::{sleep, StoppableTask, StoppableTaskPtr},
52 Error,
53};
54
55pub type RefineSessionPtr = Arc<RefineSession>;
56
57pub struct RefineSession {
58 pub(in crate::net) p2p: Weak<P2p>,
60
61 pub(in crate::net) refinery: Arc<GreylistRefinery>,
63}
64
65impl RefineSession {
66 pub fn new(p2p: Weak<P2p>) -> RefineSessionPtr {
67 Arc::new_cyclic(|session| Self { p2p, refinery: GreylistRefinery::new(session.clone()) })
68 }
69
70 pub(crate) async fn start(self: Arc<Self>) {
72 if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
73 match self.p2p().hosts().container.load_all(hostlist) {
74 Ok(()) => {
75 debug!(target: "net::refine_session::start", "Load hosts successful!");
76 }
77 Err(e) => {
78 warn!(target: "net::refine_session::start", "Error loading hosts {e}");
79 }
80 }
81 }
82
83 match self.p2p().hosts().import_blacklist().await {
84 Ok(()) => {
85 debug!(target: "net::refine_session::start", "Import blacklist successful!");
86 }
87 Err(e) => {
88 warn!(target: "net::refine_session::start",
89 "Error importing blacklist from config file {e}");
90 }
91 }
92
93 debug!(target: "net::refine_session", "Starting greylist refinery process");
94 self.refinery.clone().start().await;
95 }
96
97 pub(crate) async fn stop(&self) {
99 debug!(target: "net::refine_session", "Stopping refinery process");
100 self.refinery.clone().stop().await;
101
102 if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
103 match self.p2p().hosts().container.save_all(hostlist) {
104 Ok(()) => {
105 debug!(target: "net::refine_session::stop", "Save hosts successful!");
106 }
107 Err(e) => {
108 warn!(target: "net::refine_session::stop", "Error saving hosts {e}");
109 }
110 }
111 }
112 }
113
114 pub async fn handshake_node(self: Arc<Self>, addr: Url, p2p: P2pPtr) -> bool {
118 let self_ = Arc::downgrade(&self);
119 let connector = Connector::new(self.p2p().settings(), self_);
120
121 debug!(target: "net::refinery::handshake_node", "Attempting to connect to {addr}");
122 match connector.connect(&addr).await {
123 Ok((url, channel)) => {
124 debug!(target: "net::refinery::handshake_node", "Successfully created a channel with {url}");
125 let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
127
128 debug!(target: "net::refinery::handshake_node", "Performing handshake protocols with {url}");
129 let handshake =
131 self.perform_handshake_protocols(proto_ver, channel.clone(), p2p.executor());
132
133 debug!(target: "net::refinery::handshake_node", "Starting channel {url}");
134 channel.clone().start(p2p.executor());
135
136 let timeout = Timer::after(Duration::from_secs(5));
140
141 pin_mut!(timeout);
142 pin_mut!(handshake);
143
144 let result = match select(handshake, timeout).await {
145 Either::Left((Ok(_), _)) => {
146 debug!(target: "net::refinery::handshake_node", "Handshake success!");
147 true
148 }
149 Either::Left((Err(e), _)) => {
150 debug!(target: "net::refinery::handshake_node", "Handshake error={e}");
151 false
152 }
153 Either::Right((_, _)) => {
154 debug!(target: "net::refinery::handshake_node", "Handshake timed out");
155 false
156 }
157 };
158
159 debug!(target: "net::refinery::handshake_node", "Stopping channel {url}");
160 channel.stop().await;
161
162 result
163 }
164
165 Err(e) => {
166 debug!(target: "net::refinery::handshake_node", "Failed to connect ({e})");
167 false
168 }
169 }
170 }
171}
172
173#[async_trait]
174impl Session for RefineSession {
175 fn p2p(&self) -> P2pPtr {
176 self.p2p.upgrade().unwrap()
177 }
178
179 fn type_id(&self) -> SessionBitFlag {
180 SESSION_REFINE
181 }
182
183 async fn reload(self: Arc<Self>) {}
184}
185
186pub struct GreylistRefinery {
196 session: Weak<RefineSession>,
198 process: StoppableTaskPtr,
199}
200
201impl GreylistRefinery {
202 pub fn new(session: Weak<RefineSession>) -> Arc<Self> {
203 Arc::new(Self { session, process: StoppableTask::new() })
204 }
205
206 pub async fn start(self: Arc<Self>) {
207 let ex = self.p2p().executor();
208 self.process.clone().start(
209 async move {
210 self.run().await;
211 unreachable!();
212 },
213 |_| async {},
215 Error::NetworkServiceStopped,
216 ex,
217 );
218 }
219
220 pub async fn stop(self: Arc<Self>) {
221 self.process.stop().await;
222 }
223
224 async fn run(self: Arc<Self>) {
227 let hosts = self.p2p().hosts();
228
229 loop {
230 let settings = self.p2p().settings().read_arc().await;
232 let greylist_refinery_interval = settings.greylist_refinery_interval;
233 let time_with_no_connections = settings.time_with_no_connections;
234 let active_profiles = settings.active_profiles.clone();
235 drop(settings);
236
237 sleep(greylist_refinery_interval).await;
238
239 if hosts.container.is_empty(HostColor::Grey) {
240 debug!(target: "net::refinery",
241 "Greylist is empty! Cannot start refinery process");
242
243 continue
244 }
245
246 let offline_limit = Duration::from_secs(time_with_no_connections);
249
250 let offline_timer =
251 { Instant::now().duration_since(*hosts.last_connection.lock().unwrap()) };
252
253 if !self.p2p().is_connected() && offline_timer >= offline_limit {
254 warn!(target: "net::refinery", "No connections for {}s. GreylistRefinery paused.",
255 offline_timer.as_secs());
256
257 let suspended_hosts = hosts.suspended();
262 for host in suspended_hosts {
263 if let Err(e) = hosts.unregister(&host) {
264 warn!(target: "net::refinery", "Error while unregistering addr={host}, err={e}");
265 }
266 }
267
268 continue
269 }
270
271 match hosts.container.fetch_random_with_schemes(HostColor::Grey, &active_profiles) {
273 Some((entry, _)) => {
274 let url = &entry.0;
275
276 if let Err(e) = hosts.try_register(url.clone(), HostState::Refine) {
277 debug!(target: "net::refinery", "Unable to refine addr={}, err={e}",
278 url.clone());
279 continue
280 }
281
282 if !self.session().handshake_node(url.clone(), self.p2p().clone()).await {
283 hosts.container.remove_if_exists(HostColor::Grey, url);
284
285 debug!(
286 target: "net::refinery",
287 "Peer {url} handshake failed. Removed from greylist"
288 );
289
290 if let Err(e) = hosts.unregister(url) {
292 warn!(target: "net::refinery", "Error while unregistering addr={url}, err={e}");
293 }
294
295 continue
296 }
297 debug!(
298 target: "net::refinery",
299 "Peer {url} handshake successful. Adding to whitelist"
300 );
301 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
302
303 hosts.whitelist_host(url, last_seen).await.unwrap();
304
305 debug!(target: "net::refinery", "GreylistRefinery complete!");
306
307 continue
308 }
309 None => {
310 debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery");
311
312 continue
313 }
314 }
315 }
316 }
317
318 fn session(&self) -> RefineSessionPtr {
319 self.session.upgrade().unwrap()
320 }
321
322 fn p2p(&self) -> P2pPtr {
323 self.session().p2p()
324 }
325}