darkfi/net/session/
inbound_session.rs1use std::sync::{Arc, Weak};
27
28use async_trait::async_trait;
29use smol::lock::Mutex;
30use tracing::{debug, error, warn};
31use url::Url;
32
33use super::{
34 super::{
35 acceptor::{Acceptor, AcceptorPtr},
36 channel::ChannelPtr,
37 dnet::{self, dnetev, DnetEvent},
38 p2p::{P2p, P2pPtr},
39 },
40 Session, SessionBitFlag, SESSION_INBOUND,
41};
42use crate::{
43 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr, Subscription},
44 util::logger::verbose,
45 Error, Result,
46};
47
48pub type InboundSessionPtr = Arc<InboundSession>;
49
50pub struct InboundSession {
52 pub(in crate::net) p2p: Weak<P2p>,
53 acceptors: Mutex<Vec<AcceptorPtr>>,
54 accept_tasks: Mutex<Vec<StoppableTaskPtr>>,
55}
56
57impl InboundSession {
58 pub fn new(p2p: Weak<P2p>) -> InboundSessionPtr {
60 Arc::new(Self {
61 p2p,
62 acceptors: Mutex::new(Vec::new()),
63 accept_tasks: Mutex::new(Vec::new()),
64 })
65 }
66
67 pub async fn start(self: Arc<Self>) -> Result<()> {
71 let inbound_addrs = self.p2p().settings().read().await.inbound_addrs.clone();
72
73 if inbound_addrs.is_empty() {
74 verbose!(target: "net::inbound_session", "[P2P] Not configured for inbound connections.");
75 return Ok(())
76 }
77
78 let ex = self.p2p().executor();
79
80 let mut accept_tasks = self.accept_tasks.lock().await;
82
83 for (index, accept_addr) in inbound_addrs.iter().enumerate() {
84 let parent = Arc::downgrade(&self);
86 let acceptor = Acceptor::new(parent);
87
88 let channel_sub = acceptor.clone().subscribe().await;
91
92 let task = StoppableTask::new();
95 task.clone().start(
96 self.clone().channel_sub_loop(channel_sub, index, ex.clone()),
97 |_| async {},
99 Error::NetworkServiceStopped,
100 ex.clone(),
101 );
102
103 accept_tasks.push(task);
104
105 self.clone()
108 .start_accept_session(index, accept_addr.clone(), acceptor, ex.clone())
109 .await?;
110 }
111
112 Ok(())
113 }
114
115 pub async fn stop(&self) {
117 if self.p2p().settings().read().await.inbound_addrs.is_empty() {
118 verbose!(target: "net::inbound_session", "[P2P] Stopping inbound session.");
119 return
120 }
121
122 let acceptors = &*self.acceptors.lock().await;
123 for acceptor in acceptors {
124 acceptor.stop().await;
125 }
126
127 let accept_tasks = &*self.accept_tasks.lock().await;
128 for accept_task in accept_tasks {
129 accept_task.stop().await;
130 }
131 }
132
133 async fn start_accept_session(
135 self: Arc<Self>,
136 index: usize,
137 accept_addr: Url,
138 acceptor: AcceptorPtr,
139 ex: ExecutorPtr,
140 ) -> Result<()> {
141 verbose!(target: "net::inbound_session", "[P2P] Starting Inbound session #{index} on {accept_addr}");
142 let result = acceptor.clone().start(accept_addr, ex).await;
144 if let Err(e) = &result {
145 error!(target: "net::inbound_session", "[P2P] Error starting listener #{index}: {e}");
146 acceptor.stop().await;
147 } else {
148 self.acceptors.lock().await.push(acceptor);
149 }
150
151 result
152 }
153
154 async fn channel_sub_loop(
156 self: Arc<Self>,
157 channel_sub: Subscription<Result<ChannelPtr>>,
158 index: usize,
159 ex: ExecutorPtr,
160 ) -> Result<()> {
161 loop {
162 let channel = channel_sub.receive().await?;
163
164 ex.spawn(self.clone().setup_channel(index, channel, ex.clone())).detach();
167 }
168 }
169
170 async fn setup_channel(self: Arc<Self>, index: usize, channel: ChannelPtr, ex: ExecutorPtr) {
173 verbose!(
174 target: "net::inbound_session::setup_channel",
175 "[P2P] Connected Inbound #{index} [{}]", channel.display_address()
176 );
177
178 dnetev!(self, InboundConnected, {
179 addr: channel.info.connect_addr.clone(),
180 channel_id: channel.info.id,
181 });
182
183 let stop_sub = channel.subscribe_stop().await;
184
185 match self.register_channel(channel.clone(), ex.clone()).await {
186 Ok(()) => {
187 if let Ok(stop_sub) = stop_sub {
188 stop_sub.receive().await;
190
191 debug!(
192 target: "net::inbound_session::setup_channel",
193 "Received stop_sub, channel removed from P2P",
194 );
195 }
196 }
197 Err(e) => {
198 warn!(
199 target: "net::inbound_session::setup_channel",
200 "Channel setup failed! Err={e}"
201 );
202 }
203 }
204
205 dnetev!(self, InboundDisconnected, {
206 addr: channel.info.connect_addr.clone(),
207 channel_id: channel.info.id,
208 });
209 }
210}
211
212#[async_trait]
213impl Session for InboundSession {
214 fn p2p(&self) -> P2pPtr {
215 self.p2p.upgrade().unwrap()
216 }
217
218 fn type_id(&self) -> SessionBitFlag {
219 SESSION_INBOUND
220 }
221
222 async fn reload(self: Arc<Self>) {}
223}