darkfi/net/session/
manual_session.rs1use std::sync::{Arc, Weak};
35
36use async_trait::async_trait;
37use futures::stream::{FuturesUnordered, StreamExt};
38use smol::lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
39use tracing::{debug, error, warn};
40use url::Url;
41
42use super::{
43 super::{
44 connector::Connector,
45 p2p::{P2p, P2pPtr},
46 },
47 Session, SessionBitFlag, SESSION_MANUAL,
48};
49use crate::{
50 net::{hosts::HostState, settings::Settings},
51 system::{sleep, StoppableTask, StoppableTaskPtr},
52 util::logger::verbose,
53 Error, Result,
54};
55
56pub type ManualSessionPtr = Arc<ManualSession>;
57
58pub struct ManualSession {
60 pub(in crate::net) p2p: Weak<P2p>,
61 slots: AsyncMutex<Vec<Arc<Slot>>>,
62}
63
64impl ManualSession {
65 pub fn new(p2p: Weak<P2p>) -> ManualSessionPtr {
67 Arc::new(Self { p2p, slots: AsyncMutex::new(Vec::new()) })
68 }
69
70 pub(crate) async fn start(self: Arc<Self>) {
71 let mut slots = self.slots.lock().await;
73
74 let mut futures = FuturesUnordered::new();
75
76 let self_ = Arc::downgrade(&self);
77
78 for peer in &self.p2p().settings().read().await.peers {
81 let slot = Slot::new(self_.clone(), peer.clone(), self.p2p().settings());
82 futures.push(slot.clone().start());
83 slots.push(slot);
84 }
85
86 while (futures.next().await).is_some() {}
87 }
88
89 pub async fn stop(&self) {
91 let slots = &*self.slots.lock().await;
92 let mut futures = FuturesUnordered::new();
93
94 for slot in slots {
95 futures.push(slot.stop());
96 }
97
98 while (futures.next().await).is_some() {}
99 }
100}
101
102#[async_trait]
103impl Session for ManualSession {
104 fn p2p(&self) -> P2pPtr {
105 self.p2p.upgrade().unwrap()
106 }
107
108 fn type_id(&self) -> SessionBitFlag {
109 SESSION_MANUAL
110 }
111
112 async fn reload(self: Arc<Self>) {}
113}
114
115struct Slot {
116 addr: Url,
117 process: StoppableTaskPtr,
118 session: Weak<ManualSession>,
119 connector: Connector,
120}
121
122impl Slot {
123 fn new(
124 session: Weak<ManualSession>,
125 addr: Url,
126 settings: Arc<AsyncRwLock<Settings>>,
127 ) -> Arc<Self> {
128 Arc::new(Self {
129 addr,
130 process: StoppableTask::new(),
131 session: session.clone(),
132 connector: Connector::new(settings, session),
133 })
134 }
135
136 async fn start(self: Arc<Self>) {
137 let ex = self.p2p().executor();
138
139 self.process.clone().start(
140 self.run(),
141 |res| async {
142 match res {
143 Ok(()) | Err(Error::NetworkServiceStopped) => {}
144 Err(e) => error!("net::manual_session {e}"),
145 }
146 },
147 Error::NetworkServiceStopped,
148 ex,
149 );
150 }
151
152 async fn run(self: Arc<Self>) -> Result<()> {
154 let ex = self.p2p().executor();
155
156 let mut attempts = 0;
157 loop {
158 attempts += 1;
159
160 verbose!(
161 target: "net::manual_session",
162 "[P2P] Connecting to manual outbound [{}] (attempt #{})",
163 self.addr, attempts
164 );
165
166 let settings = self.p2p().settings().read_arc().await;
167 let seeds = settings.seeds.clone();
168 let outbound_connect_timeout = settings.outbound_connect_timeout(self.addr.scheme());
169 drop(settings);
170
171 if seeds.contains(&self.addr) {
174 error!(
175 target: "net::manual_session",
176 "[P2P] Suspending manual connection to seed [{}]", self.addr.clone(),
177 );
178 return Ok(())
179 }
180
181 if let Err(e) = self.p2p().hosts().try_register(self.addr.clone(), HostState::Connect) {
182 debug!(target: "net::manual_session",
183 "Cannot connect to manual={}, err={e}", &self.addr);
184
185 sleep(outbound_connect_timeout).await;
186
187 continue
188 }
189
190 match self.connector.connect(&self.addr).await {
191 Ok((_, channel)) => {
192 verbose!(
193 target: "net::manual_session",
194 "[P2P] Manual outbound connected [{}]",
195 channel.display_address()
196 );
197
198 let stop_sub = channel.subscribe_stop().await?;
199
200 match self.session().register_channel(channel.clone(), ex.clone()).await {
204 Ok(()) => {
205 stop_sub.receive().await;
207
208 verbose!(
209 target: "net::manual_session",
210 "[P2P] Manual outbound disconnected [{}]",
211 channel.display_address()
212 );
213 }
214 Err(e) => {
215 warn!(
216 target: "net::manual_session",
217 "[P2P] Unable to connect to manual outbound [{}]: {e}",
218 channel.display_address(),
219 );
220
221 if let Err(e) = self.p2p().hosts().unregister(channel.address()) {
223 warn!(target: "net::manual_session", "[P2P] Error while unregistering addr={}, err={e}", channel.display_address());
224 }
225 }
226 }
227 }
228 Err(e) => {
229 warn!(
230 target: "net::manual_session",
231 "[P2P] Unable to connect to manual outbound: {e}",
232 );
233
234 if let Err(e) = self.p2p().hosts().unregister(&self.addr) {
236 warn!(target: "net::manual_session", "[P2P] Error while unregistering addr={}, err={e}", self.addr);
237 }
238 }
239 }
240
241 verbose!(
242 target: "net::manual_session",
243 "[P2P] Waiting {outbound_connect_timeout} seconds until next manual outbound connection attempt [{}]",
244 self.addr,
245 );
246
247 sleep(outbound_connect_timeout).await;
248 }
249 }
250
251 fn session(&self) -> ManualSessionPtr {
252 self.session.upgrade().unwrap()
253 }
254
255 fn p2p(&self) -> P2pPtr {
256 self.session().p2p()
257 }
258
259 async fn stop(&self) {
260 self.connector.stop();
261 self.process.stop().await;
262 }
263}