1use std::{
31 collections::HashMap,
32 sync::{atomic::Ordering, Arc, Weak},
33 time::Duration,
34};
35
36use async_trait::async_trait;
37use smol::lock::{Mutex as AsyncMutex, OnceCell};
38use tracing::{error, warn};
39use url::Url;
40
41use super::{
42 super::{
43 connector::Connector,
44 dnet::{self, dnetev, DnetEvent},
45 hosts::{HostColor, HostState},
46 message::GetAddrsMessage,
47 p2p::{P2p, P2pPtr},
48 },
49 Session, SessionBitFlag, SESSION_DIRECT,
50};
51use crate::{
52 net::ChannelPtr,
53 system::{
54 msleep, sleep, timeout::timeout, CondVar, PublisherPtr, StoppableTask, StoppableTaskPtr,
55 },
56 util::logger::verbose,
57 Error, Result,
58};
59
60pub type DirectSessionPtr = Arc<DirectSession>;
61
62pub struct DirectSession {
64 pub(in crate::net) p2p: Weak<P2p>,
66 connector: OnceCell<Connector>,
68 retries_tasks: Arc<AsyncMutex<HashMap<Url, Arc<StoppableTask>>>>,
71 peer_discovery: Arc<PeerDiscovery>,
73 channels_usage: Arc<AsyncMutex<HashMap<u32, u32>>>,
75 tasks: Arc<AsyncMutex<HashMap<Url, Weak<ChannelTask>>>>,
77}
78
79impl DirectSession {
80 pub fn new(p2p: Weak<P2p>) -> DirectSessionPtr {
82 Arc::new_cyclic(|session| Self {
83 p2p,
84 connector: OnceCell::new(),
85 retries_tasks: Arc::new(AsyncMutex::new(HashMap::new())),
86 peer_discovery: PeerDiscovery::new(session.clone()),
87 channels_usage: Arc::new(AsyncMutex::new(HashMap::new())),
88 tasks: Arc::new(AsyncMutex::new(HashMap::new())),
89 })
90 }
91
92 pub(crate) async fn start(self: Arc<Self>) {
94 self.peer_discovery.clone().start().await;
95 }
96
97 pub async fn stop(&self) {
99 self.peer_discovery.clone().stop().await;
100
101 for (_, task) in self.retries_tasks.lock().await.iter() {
102 task.stop().await;
103 }
104 }
105
106 pub fn start_peer_discovery(&self) {
112 self.peer_discovery.notify();
113 }
114
115 pub async fn get_channel(self: Arc<Self>, addr: &Url) -> Result<ChannelPtr> {
119 let channels = self.p2p().hosts().channels();
121 if let Some(channel) =
122 channels.iter().find(|&chan| chan.info.connect_addr == *addr).cloned()
123 {
124 let mut channels_usage = self.channels_usage.lock().await;
125 if channel.is_stopped() {
126 channel.clone().start(self.p2p().executor());
127 }
128 if channel.session_type_id() & SESSION_DIRECT != 0 {
129 channels_usage.entry(channel.info.id).and_modify(|count| *count += 1).or_insert(1);
130 }
131 return Ok(channel);
132 }
133
134 let mut tasks = self.tasks.lock().await;
135
136 if let Some(task) = tasks.get(addr) {
138 if let Some(task) = task.upgrade() {
139 drop(tasks);
140 while task.output.lock().await.is_none() {
142 msleep(100).await;
143 }
144 return task.output.lock().await.clone().unwrap();
145 } else {
146 drop(tasks);
147 loop {
149 tasks = self.tasks.lock().await;
150 if !tasks.contains_key(addr) {
151 break
152 }
153 drop(tasks);
154 msleep(100).await;
155 }
156 }
157 }
158
159 let task = Arc::new(ChannelTask {
161 session: Arc::downgrade(&self.clone()),
162 addr: addr.clone(),
163 output: Arc::new(AsyncMutex::new(None)),
164 });
165 tasks.insert(addr.clone(), Arc::downgrade(&task));
166 drop(tasks);
167
168 let ex = self.p2p().executor();
170 let addr_ = addr.clone();
171 let self_ = self.clone();
172 let task_ = task.clone();
173 ex.spawn(async move {
174 let res = self_.clone().new_channel(addr_.clone()).await;
175
176 let mut output = task_.output.lock().await;
177 *output = Some(res);
178 })
179 .detach();
180
181 while task.output.lock().await.is_none() {
183 msleep(100).await;
184 }
185 let res = task.output.lock().await.as_ref().unwrap().clone();
186 if let Ok(ref channel) = res {
187 self.inc_channel_usage(channel, Arc::strong_count(&task).try_into().unwrap()).await;
188 }
189 res
190 }
191
192 pub async fn inc_channel_usage(&self, channel: &ChannelPtr, n: u32) {
194 if channel.session_type_id() & SESSION_DIRECT == 0 {
195 return
197 }
198 let mut channels_usage = self.channels_usage.lock().await;
199 channels_usage.entry(channel.info.id).and_modify(|count| *count += n).or_insert(n);
200 }
201
202 pub async fn get_channel_with_retries(
206 self: Arc<Self>,
207 addr: Url,
208 channel_pub: PublisherPtr<ChannelPtr>,
209 ) {
210 let task = StoppableTask::new();
211 let self_ = self.clone();
212 let mut retries_tasks = self.retries_tasks.lock().await;
213 retries_tasks.insert(addr.clone(), task.clone());
214 drop(retries_tasks);
215
216 task.clone().start(
217 async move {
218 loop {
219 let res = self_.clone().get_channel(&addr).await;
220 match res {
221 Ok(channel) => {
222 channel_pub.notify(channel).await;
223 let mut retries_tasks = self_.retries_tasks.lock().await;
224 retries_tasks.remove(&addr);
225 break
226 }
227 Err(_) => {
228 let outbound_connect_timeout = self_
229 .p2p()
230 .settings()
231 .read_arc()
232 .await
233 .outbound_connect_timeout(addr.scheme());
234 sleep(outbound_connect_timeout).await;
235 }
236 }
237 }
238
239 Ok(())
240 },
241 |res| async {
242 match res {
243 Ok(()) | Err(Error::DetachedTaskStopped) => { }
244 Err(e) => {
245 error!(target: "net::direct_session::get_channel_with_retries", "{e}")
246 }
247 }
248 },
249 Error::DetachedTaskStopped,
250 self.p2p().executor(),
251 );
252 }
253
254 async fn new_channel(self: Arc<Self>, addr: Url) -> Result<ChannelPtr> {
255 if !self.connector.is_initialized() {
256 let _ = self
257 .connector
258 .set(Connector::new(self.p2p().settings(), Arc::downgrade(&self.clone()).clone()))
259 .await;
260 }
261
262 verbose!(
263 target: "net::direct_session",
264 "[P2P] Connecting to direct outbound [{addr}]",
265 );
266
267 let settings = self.p2p().settings().read_arc().await;
268 let seeds = settings.seeds.clone();
269 let active_profiles = settings.active_profiles.clone();
270 drop(settings);
271
272 if seeds.contains(&addr) {
275 error!(
276 target: "net::direct_session",
277 "[P2P] Suspending direct connection to seed [{}]", addr.clone(),
278 );
279 return Err(Error::ConnectFailed(format!("[{addr}]: Direct connection to seed")))
280 }
281
282 let hosts = self.p2p().hosts();
284 let external_addrs = hosts.external_addrs().await;
285 if external_addrs.contains(&addr) {
286 warn!(
287 target: "net::hosts::check_addrs",
288 "[P2P] Suspending direct connection to external addr [{}]", addr.clone(),
289 );
290 return Err(Error::ConnectFailed(format!(
291 "[{addr}]: Direct connection to external addr"
292 )))
293 }
294
295 if !active_profiles.contains(&addr.scheme().to_string()) {
297 return Err(Error::UnsupportedTransport(addr.scheme().to_string()))
298 }
299
300 if !hosts.ipv6_available.load(Ordering::SeqCst) && hosts.is_ipv6(&addr) {
302 return Err(Error::ConnectFailed(format!("[{addr}]: IPv6 is unavailable")))
303 }
304
305 loop {
307 if let Err(e) = hosts.try_register(addr.clone(), HostState::Connect) {
308 if let Error::HostStateBlocked(from, _) = &e {
310 if from == "Refine" {
311 sleep(5).await;
313 continue
314 }
315 }
316
317 error!(target: "net::direct_session",
318 "[P2P] Cannot connect to direct={addr}, err={e}");
319 return Err(e)
320 }
321 break
322 }
323
324 dnetev!(self, DirectConnecting, {
325 connect_addr: addr.clone(),
326 });
327
328 match self.connector.get().unwrap().connect(&addr).await {
330 Ok((_, channel)) => {
331 verbose!(
332 target: "net::direct_session",
333 "[P2P] Direct outbound connected [{}]",
334 channel.display_address()
335 );
336
337 dnetev!(self, DirectConnected, {
338 connect_addr: channel.info.connect_addr.clone(),
339 addr: channel.display_address().clone(),
340 channel_id: channel.info.id
341 });
342
343 match self.register_channel(channel.clone(), self.p2p().executor()).await {
345 Ok(()) => Ok(channel),
346 Err(e) => {
347 warn!(
348 target: "net::direct_session",
349 "[P2P] Unable to connect to direct outbound [{}]: {e}",
350 channel.display_address(),
351 );
352
353 dnetev!(self, DirectDisconnected, {
354 connect_addr: channel.info.connect_addr.clone(),
355 err: e.to_string()
356 });
357
358 if let Err(e) = self.p2p().hosts().unregister(channel.address()) {
360 warn!(target: "net::direct_session", "[P2P] Error while unregistering addr={}, err={e}", channel.display_address());
361 }
362
363 Err(e)
364 }
365 }
366 }
367 Err(e) => {
368 warn!(
369 target: "net::direct_session",
370 "[P2P] Unable to connect to direct outbound: {e}",
371 );
372
373 dnetev!(self, DirectDisconnected, {
374 connect_addr: addr.clone(),
375 err: e.to_string()
376 });
377
378 if let Err(e) = self.p2p().hosts().unregister(&addr) {
380 warn!(target: "net::direct_session", "[P2P] Error while unregistering addr={addr}, err={e}");
381 }
382
383 Err(e)
384 }
385 }
386 }
387
388 pub async fn cleanup_channel(self: Arc<Self>, channel: ChannelPtr) -> bool {
394 if channel.session_type_id() & SESSION_DIRECT == 0 {
395 return false
397 }
398
399 let mut channels_usage = self.channels_usage.lock().await;
400 let usage_count = channels_usage.get_mut(&channel.info.id);
401 if usage_count.is_none() {
402 let _ = self.p2p().hosts().unregister(channel.address());
403 channel.stop().await;
404 return true
405 }
406 let usage_count = usage_count.unwrap();
407 if *usage_count > 0 {
408 *usage_count -= 1;
409 }
410
411 if *usage_count == 0 {
412 channels_usage.remove(&channel.info.id);
413 let _ = self.p2p().hosts().unregister(channel.address());
414 channel.stop().await;
415 return true
416 }
417
418 false
419 }
420}
421
422#[async_trait]
423impl Session for DirectSession {
424 fn p2p(&self) -> P2pPtr {
425 self.p2p.upgrade().unwrap()
426 }
427
428 fn type_id(&self) -> SessionBitFlag {
429 SESSION_DIRECT
430 }
431
432 async fn reload(self: Arc<Self>) {}
433}
434
435struct ChannelTask {
436 session: Weak<DirectSession>,
437 addr: Url,
438 output: Arc<AsyncMutex<Option<Result<ChannelPtr>>>>,
439}
440
441impl Drop for ChannelTask {
442 fn drop(&mut self) {
443 let session = self.session.upgrade().unwrap();
444 let addr = self.addr.clone();
445 session
446 .p2p()
447 .executor()
448 .spawn(async move {
449 let mut tasks = session.tasks.lock().await;
450 tasks.remove(&addr);
451 })
452 .detach();
453 }
454}
455
456struct PeerDiscovery {
461 process: StoppableTaskPtr,
462 init: CondVar,
463 session: Weak<DirectSession>,
464}
465
466impl PeerDiscovery {
467 fn new(session: Weak<DirectSession>) -> Arc<Self> {
468 Arc::new(Self { process: StoppableTask::new(), init: CondVar::new(), session })
469 }
470}
471
472impl PeerDiscovery {
473 async fn start(self: Arc<Self>) {
474 let ex = self.p2p().executor();
475 self.process.clone().start(
476 async move {
477 self.run().await;
478 Ok(())
479 },
480 |_| async {},
482 Error::NetworkServiceStopped,
483 ex,
484 );
485 }
486 async fn stop(self: Arc<Self>) {
487 self.process.stop().await;
488 }
489
490 async fn run(self: Arc<Self>) {
508 let settings = self.p2p().settings().read_arc().await;
511 if settings.outbound_connections > 0 {
512 return
513 }
514
515 self.init.wait().await;
517
518 let mut current_attempt = 0;
519 loop {
520 dnetev!(self, DirectPeerDiscovery, {
521 attempt: current_attempt,
522 state: "wait",
523 });
524
525 let settings = self.p2p().settings().read_arc().await;
527 let outbound_peer_discovery_cooloff_time =
528 settings.outbound_peer_discovery_cooloff_time;
529 let outbound_peer_discovery_attempt_time =
530 settings.outbound_peer_discovery_attempt_time;
531 let getaddrs_max = settings.getaddrs_max;
532 let active_profiles = settings.active_profiles.clone();
533 let seeds = settings.seeds.clone();
534 drop(settings);
535
536 current_attempt += 1;
537
538 if current_attempt >= 4 {
539 verbose!(
540 target: "net::direct_session::peer_discovery",
541 "[P2P] [PEER DISCOVERY] Sleeping and trying again. Attempt {current_attempt}"
542 );
543
544 dnetev!(self, DirectPeerDiscovery, {
545 attempt: current_attempt,
546 state: "sleep",
547 });
548
549 sleep(outbound_peer_discovery_cooloff_time).await;
550 current_attempt = 1;
551 }
552
553 let mut channel = None;
557 if !self.p2p().is_connected() {
558 dnetev!(self, DirectPeerDiscovery, {
559 attempt: current_attempt,
560 state: "newchan",
561 });
562
563 for color in [HostColor::Gold, HostColor::White, HostColor::Grey].iter() {
564 if let Some((entry, _)) = self
565 .p2p()
566 .hosts()
567 .container
568 .fetch_random_with_schemes(color.clone(), &active_profiles)
569 {
570 channel = self.p2p().session_direct().get_channel(&entry.0).await.ok();
571 break;
572 }
573 }
574 }
575
576 if self.p2p().is_connected() && current_attempt <= 2 {
579 verbose!(
582 target: "net::direct_session::peer_discovery",
583 "[P2P] [PEER DISCOVERY] Asking peers for new peers to connect to...");
584
585 dnetev!(self, DirectPeerDiscovery, {
586 attempt: current_attempt,
587 state: "getaddr",
588 });
589
590 let get_addrs =
591 GetAddrsMessage { max: getaddrs_max.unwrap_or(1), transports: active_profiles };
592
593 self.p2p().broadcast(&get_addrs).await;
594
595 let store_sub = self.p2p().hosts().subscribe_store().await;
597
598 let result = timeout(
599 Duration::from_secs(outbound_peer_discovery_attempt_time),
600 store_sub.receive(),
601 )
602 .await;
603
604 match result {
605 Ok(addrs_len) => {
606 verbose!(
607 target: "net::direct_session::peer_discovery",
608 "[P2P] [PEER DISCOVERY] Discovered {addrs_len} peers"
609 );
610 if addrs_len > 0 {
612 current_attempt = 0;
613 }
614 }
615 Err(_) => {
616 warn!(
617 target: "net::direct_session::peer_discovery",
618 "[P2P] [PEER DISCOVERY] Waiting for addrs timed out."
619 );
620 current_attempt = 3;
622 }
623 }
624
625 store_sub.unsubscribe().await;
630 } else if !seeds.is_empty() {
631 verbose!(
632 target: "net::direct_session::peer_discovery",
633 "[P2P] [PEER DISCOVERY] Asking seeds for new peers to connect to...");
634
635 dnetev!(self, DirectPeerDiscovery, {
636 attempt: current_attempt,
637 state: "seed",
638 });
639
640 self.p2p().seed().await;
641 }
642
643 if let Some(ch) = channel {
645 self.p2p().session_direct().cleanup_channel(ch).await;
646 }
647
648 sleep(outbound_peer_discovery_attempt_time).await;
650 }
651 }
652
653 pub fn notify(&self) {
656 self.init.notify()
657 }
658
659 fn session(&self) -> DirectSessionPtr {
660 self.session.upgrade().unwrap()
661 }
662
663 fn p2p(&self) -> P2pPtr {
664 self.session().p2p()
665 }
666}