1use std::{sync::Arc, time::UNIX_EPOCH};
20use tracing::{error, info, warn};
21
22use crate::{
23 dht::{event::DhtEvent, ChannelCacheItem, DhtHandler, DhtNode, SESSION_MANUAL},
24 net::{
25 hosts::HostColor,
26 session::{SESSION_DIRECT, SESSION_INBOUND, SESSION_OUTBOUND},
27 },
28 system::sleep,
29 util::time::Timestamp,
30 Result,
31};
32
33pub async fn events_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
35 let dht = handler.dht();
36 let sub = dht.event_publisher.clone().subscribe().await;
37 loop {
38 let event = sub.receive().await;
39
40 match event {
41 DhtEvent::PingReceived { from, .. } => {
43 let channel_cache_lock = dht.channel_cache.clone();
44 let mut channel_cache = channel_cache_lock.write().await;
45 if let Some(cached) = channel_cache.get_mut(&from.info.id) {
46 cached.ping_received = true;
47 }
48 }
49 DhtEvent::PingSent { to, .. } => {
51 let channel_cache_lock = dht.channel_cache.clone();
52 let mut channel_cache = channel_cache_lock.write().await;
53 if let Some(cached) = channel_cache.get_mut(&to.info.id) {
54 cached.ping_sent = true;
55 }
56 }
57 _ => {}
58 }
59 }
60}
61
62pub async fn channel_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
65 let dht = handler.dht();
66 let p2p = dht.p2p.clone();
67 let channel_sub = p2p.hosts().subscribe_channel().await;
68 loop {
69 let res = channel_sub.receive().await;
70 if res.is_err() {
71 continue;
72 }
73 let channel = res.unwrap();
74
75 let channel_cache_lock = dht.channel_cache.clone();
76 let mut channel_cache = channel_cache_lock.write().await;
77
78 if channel_cache.keys().any(|&k| k == channel.info.id) {
80 continue;
81 }
82
83 channel_cache.insert(
84 channel.info.id,
85 ChannelCacheItem {
86 node: None,
87 last_used: Timestamp::current_time(),
88 ping_received: false,
89 ping_sent: false,
90 },
91 );
92 drop(channel_cache);
93
94 if channel.session_type_id() & SESSION_MANUAL != 0 {
96 let ping_res = dht.ping(channel.clone()).await;
97
98 if let Err(e) = ping_res {
99 warn!(target: "dht::channel_task", "Error while pinging manual connection (requesting node id) {}: {e}", channel.display_address());
100 continue;
101 }
102 }
103
104 if channel.session_type_id() & SESSION_OUTBOUND != 0 {
106 let _ = dht.ping(channel.clone()).await;
107 continue;
108 }
109
110 if channel.session_type_id() & SESSION_DIRECT != 0 {
112 p2p.session_direct().inc_channel_usage(&channel, 1).await;
113 let _ = dht.ping(channel.clone()).await;
114 dht.cleanup_channel(channel).await;
115 continue;
116 }
117 }
118}
119
120pub async fn dht_refinery_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
126 let interval = 60; let min_ping_interval = 10 * 60; let dht = handler.dht();
129 let hosts = dht.p2p.hosts();
130
131 loop {
132 let mut hostlist = hosts.container.fetch_all(HostColor::Gold);
133 hostlist.extend(hosts.container.fetch_all(HostColor::White));
134
135 if !handler.dht().is_bootstrapped().await {
137 hostlist.extend(hosts.container.fetch_all(HostColor::Grey));
138 }
139
140 for entry in &hostlist {
141 let url = &entry.0;
142 let host_cache = dht.host_cache.read().await;
143 let last_ping = host_cache.get(url).map(|h| h.last_ping.inner());
144 if last_ping.is_some() &&
145 last_ping.unwrap() > Timestamp::current_time().inner() - min_ping_interval
146 {
147 continue
148 }
149 drop(host_cache);
150
151 let res = dht.create_channel(url).await;
152 if res.is_err() {
153 continue
154 }
155 let (channel, _) = res.unwrap();
156 dht.cleanup_channel(channel).await;
157
158 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
159 if let Err(e) = hosts.whitelist_host(url, last_seen).await {
160 error!(target: "dht::tasks::whitelist_refinery_task", "Could not send {url} to the whitelist: {e}");
161 }
162 break
163 }
164
165 match hostlist.is_empty() {
166 true => sleep(5).await,
167 false => sleep(interval).await,
168 }
169 }
170}
171
172pub async fn add_node_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
180 let dht = handler.dht();
181 loop {
182 let (node, channel) = dht.add_node_rx.recv().await.unwrap();
183
184 let self_node = handler.node().await;
185 if self_node.is_err() {
186 continue;
187 }
188 let self_node = self_node.unwrap();
189
190 let bucket_index = dht.get_bucket_index(&self_node.id(), &node.id()).await;
191 let buckets_lock = dht.buckets.clone();
192 let mut buckets = buckets_lock.write().await;
193 let bucket = &mut buckets[bucket_index];
194
195 if node.id() == self_node.id() {
197 dht.cleanup_channel(channel).await;
198 continue;
199 }
200
201 let node_addresses = node.addresses();
203 if self_node.addresses().iter().any(|addr| node_addresses.contains(addr)) {
204 dht.cleanup_channel(channel).await;
205 continue;
206 }
207
208 if node.addresses().is_empty() {
210 dht.cleanup_channel(channel).await;
211 continue;
212 }
213
214 if let Some(node_index) = bucket.nodes.iter().position(|n| n.id() == node.id()) {
216 bucket.nodes.remove(node_index);
217 bucket.nodes.push(node);
218 dht.cleanup_channel(channel).await;
219 continue;
220 }
221
222 if bucket.nodes.len() >= dht.settings.k {
224 if let Ok((channel2, node)) = dht.get_channel(&bucket.nodes[0]).await {
226 let n = bucket.nodes.remove(0);
228 bucket.nodes.push(n);
229 drop(buckets);
230 dht.on_new_node(&node.clone(), channel2.clone()).await;
231 dht.cleanup_channel(channel2).await;
232 dht.cleanup_channel(channel).await;
233 continue;
234 }
235
236 bucket.nodes.remove(0);
238 bucket.nodes.push(node.clone());
239 drop(buckets);
240 dht.on_new_node(&node.clone(), channel.clone()).await;
241 dht.cleanup_channel(channel).await;
242 continue;
243 }
244
245 bucket.nodes.push(node.clone());
247 drop(buckets);
248 dht.on_new_node(&node.clone(), channel.clone()).await;
249 dht.cleanup_channel(channel).await;
250 }
251}
252
253pub async fn disconnect_inbounds_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
255 let interval = 10; let dht = handler.dht();
257
258 loop {
259 sleep(interval).await;
260
261 let min_last_used = Timestamp::current_time().inner() - dht.settings.inbound_timeout;
262
263 let channel_cache_lock = dht.channel_cache.clone();
264 let mut channel_cache = channel_cache_lock.write().await;
265
266 for (channel_id, cached) in channel_cache.clone() {
267 if cached.last_used.inner() >= min_last_used {
270 continue;
271 }
272 let channel = dht.p2p.get_channel(channel_id);
274 if channel.is_none() {
275 channel_cache.remove(&channel_id);
276 continue;
277 }
278 let channel = channel.unwrap();
279 if channel.session_type_id() & SESSION_INBOUND == 0 {
281 continue;
282 }
283
284 info!(target: "dht::disconnect_inbounds_task", "Closing expired inbound channel [{}]", channel.display_address());
286 channel.stop().await;
287 channel_cache.remove(&channel.info.id);
288 }
289 }
290}
291
292pub async fn cleanup_channels_task<H: DhtHandler>(handler: Arc<H>) -> Result<()> {
295 let interval = 60; let dht = handler.dht();
297
298 loop {
299 sleep(interval).await;
300
301 let channel_cache_lock = dht.channel_cache.clone();
302 let mut channel_cache = channel_cache_lock.write().await;
303
304 for (channel_id, _) in channel_cache.clone() {
305 match dht.p2p.get_channel(channel_id) {
306 Some(channel) => {
307 if channel.is_stopped() {
308 channel_cache.remove(&channel_id);
309 }
310 }
311 None => {
312 channel_cache.remove(&channel_id);
313 }
314 }
315 }
316 }
317}