darkfi/net/session/
seedsync_session.rs1use std::sync::{
47 atomic::{AtomicBool, Ordering::SeqCst},
48 Arc, Weak,
49};
50
51use async_trait::async_trait;
52use futures::stream::{FuturesUnordered, StreamExt};
53use smol::lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
54use tracing::{debug, warn};
55use url::Url;
56
57use super::{
58 super::{
59 connector::Connector,
60 hosts::HostColor,
61 p2p::{P2p, P2pPtr},
62 settings::Settings,
63 },
64 Session, SessionBitFlag, SESSION_SEED,
65};
66use crate::{
67 net::hosts::HostState,
68 system::{CondVar, StoppableTask, StoppableTaskPtr},
69 util::logger::verbose,
70 Error,
71};
72
73pub type SeedSyncSessionPtr = Arc<SeedSyncSession>;
74
75pub struct SeedSyncSession {
77 pub(in crate::net) p2p: Weak<P2p>,
78 slots: AsyncMutex<Vec<Arc<Slot>>>,
79}
80
81impl SeedSyncSession {
82 pub(crate) fn new(p2p: Weak<P2p>) -> SeedSyncSessionPtr {
84 Arc::new(Self { p2p, slots: AsyncMutex::new(Vec::new()) })
85 }
86
87 pub(crate) async fn start(self: Arc<Self>) {
90 let mut slots = self.slots.lock().await;
92
93 let mut futures = FuturesUnordered::new();
94
95 let self_ = Arc::downgrade(&self);
96
97 for seed in &self.p2p().settings().read().await.seeds {
100 let slot = Slot::new(self_.clone(), seed.clone(), self.p2p().settings());
101 futures.push(slot.clone().start());
102 slots.push(slot);
103 }
104
105 while (futures.next().await).is_some() {}
106 }
107
108 pub(crate) async fn notify(&self) {
111 let slots = &*self.slots.lock().await;
112
113 for slot in slots {
114 slot.notify();
115 }
116 }
117
118 pub(crate) async fn stop(&self) {
120 debug!(target: "net::seedsync_session", "Stopping seed sync session...");
121 let slots = &*self.slots.lock().await;
122 let mut futures = FuturesUnordered::new();
123
124 for slot in slots {
125 futures.push(slot.clone().stop());
126 }
127
128 while (futures.next().await).is_some() {}
129 debug!(target: "net::seedsync_session", "Seed sync session stopped!");
130 }
131
132 async fn _failed(&self) -> bool {
134 let slots = &*self.slots.lock().await;
135 slots.iter().all(|s| s._failed())
136 }
137}
138
139#[async_trait]
140impl Session for SeedSyncSession {
141 fn p2p(&self) -> P2pPtr {
142 self.p2p.upgrade().unwrap()
143 }
144
145 fn type_id(&self) -> SessionBitFlag {
146 SESSION_SEED
147 }
148
149 async fn reload(self: Arc<Self>) {}
150}
151
152struct Slot {
153 addr: Url,
154 process: StoppableTaskPtr,
155 wakeup_self: CondVar,
156 session: Weak<SeedSyncSession>,
157 connector: Connector,
158 failed: AtomicBool,
159}
160
161impl Slot {
162 fn new(
163 session: Weak<SeedSyncSession>,
164 addr: Url,
165 settings: Arc<AsyncRwLock<Settings>>,
166 ) -> Arc<Self> {
167 Arc::new(Self {
168 addr,
169 process: StoppableTask::new(),
170 wakeup_self: CondVar::new(),
171 session: session.clone(),
172 connector: Connector::new(settings, session),
173 failed: AtomicBool::new(false),
174 })
175 }
176
177 async fn start(self: Arc<Self>) {
178 let ex = self.p2p().executor();
179
180 self.process.clone().start(
181 async move {
182 self.run().await;
183 unreachable!();
184 },
185 |_| async {},
187 Error::NetworkServiceStopped,
188 ex,
189 );
190 }
191
192 async fn run(self: Arc<Self>) {
197 let ex = self.p2p().executor();
198 let hosts = self.p2p().hosts();
199
200 loop {
201 self.wait().await;
203
204 debug!(
205 target: "net::session::seedsync_session", "SeedSyncSession::start_seed() [START]",
206 );
207
208 if let Err(e) = hosts.try_register(self.addr.clone(), HostState::Connect) {
209 debug!(target: "net::session::seedsync_session",
210 "Cannot connect to seed={}, err={e}", &self.addr);
211
212 self.reset();
214
215 continue
216 }
217
218 match self.connector.connect(&self.addr).await {
219 Ok((_, ch)) => {
220 verbose!(
221 target: "net::session::seedsync_session",
222 "[P2P] Connected seed [{}]",
223 ch.display_address()
224 );
225
226 match self.session().register_channel(ch.clone(), ex.clone()).await {
227 Ok(()) => {
228 self.failed.store(false, SeqCst);
229
230 verbose!(
231 target: "net::session::seedsync_session",
232 "[P2P] Disconnecting from seed [{}]",
233 ch.display_address()
234 );
235 ch.stop().await;
236
237 if hosts.container.is_empty(HostColor::Grey) {
239 warn!(target: "net::session::seedsync_session",
240 "[P2P] Greylist empty after seeding");
241 }
242
243 self.reset();
245 }
246
247 Err(e) => {
248 warn!(
249 target: "net::session::seedsync_session",
250 "[P2P] Unable to connect to seed [{}]: {e}",
251 ch.display_address()
252 );
253 self.handle_failure(ch.address());
254
255 continue
256 }
257 }
258 }
259
260 Err(e) => {
261 warn!(
262 target: "net::session::seedsync_session",
263 "[P2P] Unable to connect to seed: {e}",
264 );
265 self.handle_failure(&self.addr);
266
267 continue
268 }
269 }
270 debug!(
271 target: "net::session::seedsync_session",
272 "SeedSyncSession::start_seed() [END]",
273 );
274 }
275 }
276
277 fn handle_failure(&self, addr: &Url) {
278 self.failed.store(true, SeqCst);
279
280 if let Err(e) = self.p2p().hosts().unregister(addr) {
282 warn!(target: "net::session::seedsync_session", "[P2P] Error while unregistering addr={addr}, err={e}");
283 }
284
285 self.reset();
287 }
288
289 fn _failed(&self) -> bool {
290 self.failed.load(SeqCst)
291 }
292
293 fn session(&self) -> SeedSyncSessionPtr {
294 self.session.upgrade().unwrap()
295 }
296
297 fn p2p(&self) -> P2pPtr {
298 self.session().p2p()
299 }
300
301 async fn wait(&self) {
302 self.wakeup_self.wait().await;
303 }
304
305 fn reset(&self) {
306 self.wakeup_self.reset()
307 }
308
309 fn notify(&self) {
310 self.wakeup_self.notify()
311 }
312
313 async fn stop(self: Arc<Self>) {
314 self.connector.stop();
315 self.process.stop().await;
316 }
317}