1use std::{
30 sync::{
31 atomic::{AtomicU32, Ordering},
32 Arc, Weak,
33 },
34 time::{Duration, Instant},
35};
36
37use async_trait::async_trait;
38use futures::stream::{FuturesUnordered, StreamExt};
39use smol::lock::Mutex;
40use tracing::{debug, error, info, warn};
41use url::Url;
42
43use super::{
44 super::{
45 channel::ChannelPtr,
46 connector::Connector,
47 dnet::{self, dnetev, DnetEvent},
48 hosts::{HostColor, HostState},
49 message::GetAddrsMessage,
50 p2p::{P2p, P2pPtr},
51 },
52 Session, SessionBitFlag, SESSION_OUTBOUND,
53};
54use crate::{
55 system::{sleep, timeout::timeout, CondVar, StoppableTask, StoppableTaskPtr},
56 util::logger::verbose,
57 Error, Result,
58};
59
60pub type OutboundSessionPtr = Arc<OutboundSession>;
61
62pub struct OutboundSession {
64 pub(in crate::net) p2p: Weak<P2p>,
66 slots: Mutex<Vec<Arc<Slot>>>,
68 peer_discovery: Arc<PeerDiscovery>,
70}
71
72impl OutboundSession {
73 pub(crate) fn new(p2p: Weak<P2p>) -> OutboundSessionPtr {
75 Arc::new_cyclic(|session| Self {
76 p2p,
77 slots: Mutex::new(Vec::new()),
78 peer_discovery: PeerDiscovery::new(session.clone()),
79 })
80 }
81
82 pub(crate) async fn start(self: Arc<Self>) {
84 let n_slots = self.p2p().settings().read().await.outbound_connections;
85 verbose!(target: "net::outbound_session", "[P2P] Starting {n_slots} outbound connection slots.");
86
87 let mut slots = self.slots.lock().await;
89
90 let mut futures = FuturesUnordered::new();
91
92 let self_ = Arc::downgrade(&self);
93
94 for i in 0..n_slots as u32 {
95 let slot = Slot::new(self_.clone(), i);
96 futures.push(slot.clone().start());
97 slots.push(slot);
98 }
99
100 while (futures.next().await).is_some() {}
101
102 self.peer_discovery.clone().start().await;
103 }
104
105 pub(crate) async fn stop(&self) {
107 debug!(target: "net::outbound_session", "Stopping outbound session..");
108 let slots = &*self.slots.lock().await;
109 let mut futures = FuturesUnordered::new();
110
111 for slot in slots {
112 futures.push(slot.clone().stop());
113 }
114
115 while (futures.next().await).is_some() {}
116
117 self.peer_discovery.clone().stop().await;
118 debug!(target: "net::outbound_session", "Outbound session stopped!");
119 }
120
121 pub async fn slot_info(&self) -> Vec<u32> {
122 let mut info = Vec::new();
123 let slots = &*self.slots.lock().await;
124 for slot in slots {
125 info.push(slot.channel_id.load(Ordering::Relaxed));
126 }
127 info
128 }
129
130 fn wakeup_peer_discovery(&self) {
131 self.peer_discovery.notify()
132 }
133
134 async fn wakeup_slots(&self) {
135 let slots = &*self.slots.lock().await;
136 for slot in slots {
137 slot.notify();
138 }
139 }
140
141 async fn set_outbound_connections(self: Arc<Self>, n: usize) {
144 let mut slots = self.slots.lock().await;
146 let slots_len = slots.len();
147
148 if n > slots_len {
149 self.clone().add_slots(&mut slots, n).await;
150 } else if n < slots_len {
151 self.remove_slots(&mut slots, n).await;
152 }
153 }
155
156 async fn add_slots(self: Arc<Self>, slots: &mut Vec<Arc<Slot>>, target: usize) {
157 let slots_len = slots.len();
158 let self_ = Arc::downgrade(&self);
159 for i in slots_len..target {
160 let slot = Slot::new(self_.clone(), i as u32);
161 slot.clone().start().await;
162 slots.push(slot);
163 }
164 info!(target: "net::outbound_session",
165 "[P2P] Increased outbound slots from {slots_len} to {target}");
166 }
167
168 async fn remove_slots(&self, slots: &mut Vec<Arc<Slot>>, target: usize) {
170 let slots_len = slots.len();
171 let num_to_remove = slots_len - target;
172 let mut removed = 0;
173
174 let mut i = 0;
176 while i < slots.len() && removed < num_to_remove {
177 if slots[i].channel_id.load(Ordering::Relaxed) != 0 {
179 i += 1;
180 continue
181 }
182
183 let slot = slots.remove(i);
185 slot.stop().await;
186 removed += 1;
187 }
188
189 while removed < num_to_remove && !slots.is_empty() {
191 let slot = slots.remove(0);
192 slot.stop().await;
193 removed += 1;
194 }
195
196 info!(target: "net::outbound_session",
197 "[P2P] Decreased outbound slots from {slots_len} to {target}");
198 }
199}
200
201#[async_trait]
202impl Session for OutboundSession {
203 fn p2p(&self) -> P2pPtr {
204 self.p2p.upgrade().unwrap()
205 }
206
207 fn type_id(&self) -> SessionBitFlag {
208 SESSION_OUTBOUND
209 }
210
211 async fn reload(self: Arc<Self>) {
212 let outbound_connections = self.p2p().settings().read().await.outbound_connections;
213 self.set_outbound_connections(outbound_connections).await;
214 }
215}
216
217struct Slot {
218 slot: u32,
219 process: StoppableTaskPtr,
220 wakeup_self: CondVar,
221 session: Weak<OutboundSession>,
222 connector: Connector,
223 channel_id: AtomicU32,
225}
226
227impl Slot {
228 fn new(session: Weak<OutboundSession>, slot: u32) -> Arc<Self> {
229 let settings = session.upgrade().unwrap().p2p().settings();
230
231 Arc::new(Self {
232 slot,
233 process: StoppableTask::new(),
234 wakeup_self: CondVar::new(),
235 session: session.clone(),
236 connector: Connector::new(settings, session),
237 channel_id: AtomicU32::new(0),
238 })
239 }
240
241 async fn start(self: Arc<Self>) {
242 let ex = self.p2p().executor();
243
244 self.process.clone().start(
245 self.run(),
246 |res| async {
247 match res {
248 Ok(()) | Err(Error::NetworkServiceStopped) => {}
249 Err(e) => error!("net::outbound_session {e}"),
250 }
251 },
252 Error::NetworkServiceStopped,
253 ex,
254 );
255 }
256
257 async fn stop(self: Arc<Self>) {
258 self.connector.stop();
259 self.process.stop().await;
260 }
261
262 async fn fetch_addrs(&self) -> Option<(Url, u64)> {
273 let hosts = self.p2p().hosts();
274 let slot = self.slot as usize;
275 let container = &self.p2p().hosts().container;
276
277 let settings = self.p2p().settings().read_arc().await;
279
280 let white_count = (settings.white_connect_percent * settings.outbound_connections) / 100;
281 let gold_count = settings.gold_connect_count;
282
283 let transports = settings.active_profiles.clone();
284 let preference_strict = settings.slot_preference_strict;
285
286 drop(settings);
288
289 let grey_only = hosts.container.is_empty(HostColor::White) &&
290 hosts.container.is_empty(HostColor::Gold) &&
291 !hosts.container.is_empty(HostColor::Grey);
292
293 let addrs = if grey_only && !preference_strict {
296 container.fetch_with_schemes(HostColor::Grey as usize, &transports, None)
297 } else if slot < gold_count {
298 container.fetch_with_schemes(HostColor::Gold as usize, &transports, None)
299 } else if slot < white_count {
300 container.fetch_with_schemes(HostColor::White as usize, &transports, None)
301 } else {
302 container.fetch_with_schemes(HostColor::Grey as usize, &transports, None)
303 };
304
305 hosts.check_addrs(addrs).await
306 }
307
308 async fn run(self: Arc<Self>) -> Result<()> {
312 let hosts = self.p2p().hosts();
313
314 loop {
315 debug!(
317 target: "net::outbound_session::try_connect",
318 "[P2P] Finding a host to connect to for outbound slot #{}",
319 self.slot,
320 );
321
322 if hosts.container.is_empty(HostColor::Grey) &&
325 hosts.container.is_empty(HostColor::White) &&
326 hosts.container.is_empty(HostColor::Gold)
327 {
328 dnetev!(self, OutboundSlotSleeping, {
329 slot: self.slot,
330 });
331
332 self.wakeup_self.reset();
333 self.session().wakeup_peer_discovery();
335 self.wakeup_self.wait().await;
337
338 continue
339 }
340
341 let addr = if let Some(addr) = self.fetch_addrs().await {
342 debug!(target: "net::outbound_session::run", "Fetched addr={}, slot #{}", addr.0,
343 self.slot);
344 addr
345 } else {
346 debug!(target: "net::outbound_session::run", "No address found! Activating peer discovery...");
347 dnetev!(self, OutboundSlotSleeping, {
348 slot: self.slot,
349 });
350
351 self.wakeup_self.reset();
352 self.session().wakeup_peer_discovery();
354 self.wakeup_self.wait().await;
356
357 continue
358 };
359
360 let host = addr.0;
361 let last_seen = addr.1;
362 let slot = self.slot;
363
364 verbose!(
365 target: "net::outbound_session::try_connect",
366 "[P2P] Connecting outbound slot #{slot} [{host}]"
367 );
368
369 dnetev!(self, OutboundSlotConnecting, {
370 slot,
371 addr: host.clone(),
372 });
373
374 let (_, channel) = match self.try_connect(host.clone(), last_seen).await {
375 Ok(connect_info) => connect_info,
376 Err(err) => {
377 debug!(
378 target: "net::outbound_session::try_connect",
379 "[P2P] Outbound slot #{slot} connection failed: {err}"
380 );
381
382 dnetev!(self, OutboundSlotDisconnected, {
383 slot,
384 err: err.to_string()
385 });
386
387 self.channel_id.store(0, Ordering::Relaxed);
388
389 continue
390 }
391 };
392
393 let stop_sub = channel.subscribe_stop().await?;
395
396 verbose!(
397 target: "net::outbound_session::try_connect",
398 "[P2P] Outbound slot #{slot} connected [{}]",
399 channel.display_address()
400 );
401
402 dnetev!(self, OutboundSlotConnected, {
403 slot: self.slot,
404 addr: channel.display_address().clone(),
405 channel_id: channel.info.id
406 });
407
408 if let Err(err) =
410 self.session().register_channel(channel.clone(), self.p2p().executor()).await
411 {
412 verbose!(
413 target: "net::outbound_session",
414 "[P2P] Outbound slot #{slot} disconnected: {err}"
415 );
416
417 dnetev!(self, OutboundSlotDisconnected, {
418 slot: self.slot,
419 err: err.to_string()
420 });
421
422 self.channel_id.store(0, Ordering::Relaxed);
423
424 warn!(
425 target: "net::outbound_session::try_connect",
426 "[P2P] Suspending addr=[{}] slot #{slot}",
427 channel.display_address()
428 );
429
430 if let Err(e) = self
432 .p2p()
433 .hosts()
434 .move_host(channel.address(), last_seen, HostColor::Grey)
435 .await
436 {
437 warn!(target: "net::outbound_session", "Error while moving addr={} to greylist: {e}", channel.display_address());
438 continue
439 }
440
441 if let Err(e) =
443 self.p2p().hosts().try_register(channel.address().clone(), HostState::Suspend)
444 {
445 warn!(target: "net::outbound_session", "Error while suspending addr={}: {e}", channel.display_address());
446 }
447
448 continue
449 }
450
451 self.channel_id.store(channel.info.id, Ordering::Relaxed);
452
453 stop_sub.receive().await;
455
456 self.channel_id.store(0, Ordering::Relaxed);
457 }
458 }
459
460 async fn try_connect(&self, addr: Url, last_seen: u64) -> Result<(Url, ChannelPtr)> {
468 match self.connector.connect(&addr).await {
469 Ok((addr_final, channel)) => Ok((addr_final, channel)),
470
471 Err(err) => {
472 verbose!(
473 target: "net::outbound_session::try_connect",
474 "[P2P] Unable to connect outbound slot #{} {err}",
475 self.slot
476 );
477
478 if let Error::ConnectorStopped(message) = err {
482 return Err(Error::ConnectFailed(message));
483 }
484
485 self.p2p().hosts().move_host(&addr, last_seen, HostColor::Grey).await?;
487
488 if let Err(e) = self.p2p().hosts().try_register(addr.clone(), HostState::Suspend) {
490 warn!(target: "net::outbound_session::try_connect", "Error while suspending addr={addr}: {e}");
491 }
492
493 self.p2p().hosts().channel_publisher.notify(Err(err.clone())).await;
495
496 Err(err)
497 }
498 }
499 }
500
501 fn notify(&self) {
502 self.wakeup_self.notify()
503 }
504
505 fn session(&self) -> OutboundSessionPtr {
506 self.session.upgrade().unwrap()
507 }
508 fn p2p(&self) -> P2pPtr {
509 self.session().p2p()
510 }
511}
512
513#[async_trait]
520pub trait PeerDiscoveryBase {
521 async fn start(self: Arc<Self>);
522
523 async fn stop(self: Arc<Self>);
524
525 async fn run(self: Arc<Self>);
526
527 async fn wait(&self) -> bool;
528
529 fn notify(&self);
530
531 fn session(&self) -> OutboundSessionPtr;
532
533 fn p2p(&self) -> P2pPtr;
534}
535
536struct PeerDiscovery {
541 process: StoppableTaskPtr,
542 wakeup_self: CondVar,
543 session: Weak<OutboundSession>,
544}
545
546impl PeerDiscovery {
547 fn new(session: Weak<OutboundSession>) -> Arc<Self> {
548 Arc::new(Self { process: StoppableTask::new(), wakeup_self: CondVar::new(), session })
549 }
550}
551
552#[async_trait]
553impl PeerDiscoveryBase for PeerDiscovery {
554 async fn start(self: Arc<Self>) {
555 let ex = self.p2p().executor();
556 self.process.clone().start(
557 async move {
558 self.run().await;
559 unreachable!();
560 },
561 |_| async {},
563 Error::NetworkServiceStopped,
564 ex,
565 );
566 }
567 async fn stop(self: Arc<Self>) {
568 self.process.stop().await;
569 }
570
571 async fn run(self: Arc<Self>) {
586 let mut current_attempt = 0;
587 loop {
588 dnetev!(self, OutboundPeerDiscovery, {
589 attempt: current_attempt,
590 state: "wait",
591 });
592
593 let sleep_was_instant = self.wait().await;
595
596 let settings = self.p2p().settings().read_arc().await;
598 let outbound_peer_discovery_cooloff_time =
599 settings.outbound_peer_discovery_cooloff_time;
600 let outbound_peer_discovery_attempt_time =
601 settings.outbound_peer_discovery_attempt_time;
602 let outbound_connections = settings.outbound_connections;
603 let getaddrs_max = settings.getaddrs_max;
604 let active_profiles = settings.active_profiles.clone();
605 let seeds = settings.seeds.clone();
606 drop(settings);
607
608 if sleep_was_instant {
609 current_attempt += 1;
611 } else {
612 current_attempt = 1;
614 }
615
616 if current_attempt >= 4 {
617 verbose!(
618 target: "net::outbound_session::peer_discovery",
619 "[P2P] [PEER DISCOVERY] Sleeping and trying again. Attempt {current_attempt}"
620 );
621
622 dnetev!(self, OutboundPeerDiscovery, {
623 attempt: current_attempt,
624 state: "sleep",
625 });
626
627 sleep(outbound_peer_discovery_cooloff_time).await;
628 current_attempt = 1;
629 }
630
631 if self.p2p().is_connected() && current_attempt <= 2 {
635 verbose!(
638 target: "net::outbound_session::peer_discovery",
639 "[P2P] [PEER DISCOVERY] Asking peers for new peers to connect to...");
640
641 dnetev!(self, OutboundPeerDiscovery, {
642 attempt: current_attempt,
643 state: "getaddr",
644 });
645
646 let get_addrs = GetAddrsMessage {
647 max: getaddrs_max.unwrap_or(outbound_connections.min(u32::MAX as usize) as u32),
648 transports: active_profiles,
649 };
650
651 self.p2p().broadcast(&get_addrs).await;
652
653 let store_sub = self.p2p().hosts().subscribe_store().await;
655
656 let result = timeout(
657 Duration::from_secs(outbound_peer_discovery_attempt_time),
658 store_sub.receive(),
659 )
660 .await;
661
662 match result {
663 Ok(addrs_len) => {
664 verbose!(
665 target: "net::outbound_session::peer_discovery",
666 "[P2P] [PEER DISCOVERY] Discovered {addrs_len} peers"
667 );
668 }
669 Err(_) => {
670 warn!(
671 target: "net::outbound_session::peer_discovery",
672 "[P2P] [PEER DISCOVERY] Waiting for addrs timed out."
673 );
674 current_attempt = 3;
676 }
677 }
678
679 store_sub.unsubscribe().await;
684 } else if !seeds.is_empty() {
685 verbose!(
686 target: "net::outbound_session::peer_discovery",
687 "[P2P] [PEER DISCOVERY] Asking seeds for new peers to connect to...");
688
689 dnetev!(self, OutboundPeerDiscovery, {
690 attempt: current_attempt,
691 state: "seed",
692 });
693
694 self.p2p().seed().await;
695 }
696
697 self.wakeup_self.reset();
698 self.session().wakeup_slots().await;
699
700 sleep(outbound_peer_discovery_attempt_time).await;
702 }
703 }
704
705 async fn wait(&self) -> bool {
710 let wakeup_start = Instant::now();
711 self.wakeup_self.wait().await;
712 let wakeup_end = Instant::now();
713
714 let epsilon = Duration::from_millis(200);
715 wakeup_end - wakeup_start <= epsilon
716 }
717
718 fn notify(&self) {
722 self.wakeup_self.notify()
723 }
724
725 fn session(&self) -> OutboundSessionPtr {
726 self.session.upgrade().unwrap()
727 }
728
729 fn p2p(&self) -> P2pPtr {
730 self.session().p2p()
731 }
732}