darkfi/net/session/
inbound_session.rs1use std::sync::{Arc, Weak};
27
28use async_trait::async_trait;
29use smol::{lock::Mutex, Executor};
30use tracing::{debug, error, warn};
31use url::Url;
32
33use super::{
34 super::{
35 acceptor::{Acceptor, AcceptorPtr},
36 channel::ChannelPtr,
37 dnet::{self, dnetev, DnetEvent},
38 p2p::{P2p, P2pPtr},
39 },
40 Session, SessionBitFlag, SESSION_INBOUND,
41};
42use crate::{
43 system::{StoppableTask, StoppableTaskPtr, Subscription},
44 util::logger::verbose,
45 Error, Result,
46};
47
48pub type InboundSessionPtr = Arc<InboundSession>;
49
50pub struct InboundSession {
52 pub(in crate::net) p2p: Weak<P2p>,
53 acceptors: Mutex<Vec<AcceptorPtr>>,
54 accept_tasks: Mutex<Vec<StoppableTaskPtr>>,
55}
56
57impl InboundSession {
58 pub fn new(p2p: Weak<P2p>) -> InboundSessionPtr {
60 Arc::new(Self {
61 p2p,
62 acceptors: Mutex::new(Vec::new()),
63 accept_tasks: Mutex::new(Vec::new()),
64 })
65 }
66
67 pub async fn start(self: Arc<Self>) -> Result<()> {
71 let inbound_addrs = self.p2p().settings().read().await.inbound_addrs.clone();
72
73 if inbound_addrs.is_empty() {
74 verbose!(target: "net::inbound_session", "[P2P] Not configured for inbound connections.");
75 return Ok(())
76 }
77
78 let ex = self.p2p().executor();
79
80 let mut accept_tasks = self.accept_tasks.lock().await;
82
83 for (index, accept_addr) in inbound_addrs.iter().enumerate() {
84 let parent = Arc::downgrade(&self);
86 let acceptor = Acceptor::new(parent);
87
88 let channel_sub = acceptor.clone().subscribe().await;
91
92 let task = StoppableTask::new();
95 task.clone().start(
96 self.clone().channel_sub_loop(channel_sub, index, ex.clone()),
97 |_| async {},
99 Error::NetworkServiceStopped,
100 ex.clone(),
101 );
102
103 accept_tasks.push(task);
104
105 self.clone()
108 .start_accept_session(index, accept_addr.clone(), acceptor, ex.clone())
109 .await?;
110 }
111
112 Ok(())
113 }
114
115 pub async fn stop(&self) {
117 if self.p2p().settings().read().await.inbound_addrs.is_empty() {
118 verbose!(target: "net::inbound_session", "[P2P] Stopping inbound session.");
119 return
120 }
121
122 let acceptors = &*self.acceptors.lock().await;
123 for acceptor in acceptors {
124 acceptor.stop().await;
125 }
126
127 let accept_tasks = &*self.accept_tasks.lock().await;
128 for accept_task in accept_tasks {
129 accept_task.stop().await;
130 }
131 }
132
133 async fn start_accept_session(
135 self: Arc<Self>,
136 index: usize,
137 accept_addr: Url,
138 acceptor: AcceptorPtr,
139 ex: Arc<Executor<'_>>,
140 ) -> Result<()> {
141 verbose!(target: "net::inbound_session", "[P2P] Starting Inbound session #{index} on {accept_addr}");
142 let result = acceptor.clone().start(accept_addr, ex).await;
144 if let Err(e) = &result {
145 error!(target: "net::inbound_session", "[P2P] Error starting listener #{index}: {e}");
146 acceptor.stop().await;
147 } else {
148 self.acceptors.lock().await.push(acceptor);
149 }
150
151 result
152 }
153
154 async fn channel_sub_loop(
156 self: Arc<Self>,
157 channel_sub: Subscription<Result<ChannelPtr>>,
158 index: usize,
159 ex: Arc<Executor<'_>>,
160 ) -> Result<()> {
161 loop {
162 let channel = channel_sub.receive().await?;
163
164 ex.spawn(self.clone().setup_channel(index, channel, ex.clone())).detach();
167 }
168 }
169
170 async fn setup_channel(
173 self: Arc<Self>,
174 index: usize,
175 channel: ChannelPtr,
176 ex: Arc<Executor<'_>>,
177 ) {
178 verbose!(
179 target: "net::inbound_session::setup_channel",
180 "[P2P] Connected Inbound #{index} [{}]", channel.display_address()
181 );
182
183 dnetev!(self, InboundConnected, {
184 addr: channel.info.connect_addr.clone(),
185 channel_id: channel.info.id,
186 });
187
188 let stop_sub = channel.subscribe_stop().await;
189
190 match self.register_channel(channel.clone(), ex.clone()).await {
191 Ok(()) => {
192 if let Ok(stop_sub) = stop_sub {
193 stop_sub.receive().await;
195
196 debug!(
197 target: "net::inbound_session::setup_channel",
198 "Received stop_sub, channel removed from P2P",
199 );
200 }
201 }
202 Err(e) => {
203 warn!(
204 target: "net::inbound_session::setup_channel",
205 "Channel setup failed! Err={e}"
206 );
207 }
208 }
209
210 dnetev!(self, InboundDisconnected, {
211 addr: channel.info.connect_addr.clone(),
212 channel_id: channel.info.id,
213 });
214 }
215}
216
217#[async_trait]
218impl Session for InboundSession {
219 fn p2p(&self) -> P2pPtr {
220 self.p2p.upgrade().unwrap()
221 }
222
223 fn type_id(&self) -> SessionBitFlag {
224 SESSION_INBOUND
225 }
226
227 async fn reload(self: Arc<Self>) {}
228}