1use std::{
25 collections::hash_map::DefaultHasher,
26 hash::{Hash, Hasher},
27 sync::Arc,
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use oxy_upnp_igd::{add_port_mapping_lazy, Protocol, RenewalHandle};
33use smol::lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
34use tracing::error;
35use url::Url;
36
37use crate::{
38 net::settings::Settings,
39 system::{sleep, ExecutorPtr, StoppableTask, StoppableTaskPtr},
40 util::logger::verbose,
41 Error, Result,
42};
43
44pub trait PortMapping: Send + Sync {
52 fn start(
54 self: Arc<Self>,
55 settings: Arc<AsyncRwLock<Settings>>,
56 executor: ExecutorPtr,
57 ) -> Result<()>;
58
59 fn stop(self: Arc<Self>);
61}
62
63#[derive(Clone, Debug)]
65pub struct UpnpConfig {
66 pub lease_duration: u32,
68 pub discovery_timeout_secs: u64,
70 pub mapping_description: String,
72 pub ext_addr_refresh: u64,
74 pub retry_interval_secs: u64,
76}
77
78impl Default for UpnpConfig {
79 fn default() -> Self {
80 Self {
81 lease_duration: 300,
82 discovery_timeout_secs: 3,
83 mapping_description: "DarkFi".to_string(),
84 ext_addr_refresh: 120,
85 retry_interval_secs: 60,
86 }
87 }
88}
89
90pub struct UpnpPortMapping {
98 config: UpnpConfig,
99 internal_endpoint: Url,
100 handle: AsyncMutex<Option<RenewalHandle>>,
101 task: StoppableTaskPtr,
102}
103
104impl UpnpPortMapping {
105 pub fn new(config: UpnpConfig, internal_endpoint: Url) -> Self {
107 Self {
108 config,
109 internal_endpoint,
110 handle: AsyncMutex::new(None),
111 task: StoppableTask::new(),
112 }
113 }
114
115 async fn run(&self, settings: Arc<AsyncRwLock<Settings>>, ex: &ExecutorPtr) -> Result<()> {
117 loop {
118 if self.try_create_mapping(ex).await.is_err() {
119 verbose!(
120 target: "net::upnp",
121 "[P2P] UPnP: Gateway discovery failed, retrying in {}s",
122 self.config.retry_interval_secs
123 );
124 sleep(self.config.retry_interval_secs).await;
125 continue;
126 }
127
128 verbose!(
129 target: "net::upnp",
130 "[P2P] UPnP: Gateway discovered, mapping active for {}",
131 self.internal_endpoint
132 );
133
134 if self.run_refresh_loop(settings.clone()).await.is_err() {
135 verbose!(
136 target: "net::upnp",
137 "[P2P] UPnP: Gateway lost, retrying discovery in {}s",
138 self.config.retry_interval_secs
139 );
140 sleep(self.config.retry_interval_secs).await;
141 continue;
142 }
143
144 unreachable!("UPnP refresh loop should never complete normally");
145 }
146 }
147
148 async fn try_create_mapping(&self, ex: &ExecutorPtr) -> Result<()> {
150 let protocol = match self.internal_endpoint.scheme() {
151 "tcp" | "tcp+tls" => Protocol::TCP,
152 "quic" => Protocol::UDP,
153 s => {
154 verbose!(
155 target: "net::upnp",
156 "[P2P] UPnP: Unsupported scheme '{s}', skipping"
157 );
158 return Err(Error::NetworkServiceStopped);
159 }
160 };
161
162 let is_ipv4 = match self.internal_endpoint.host() {
164 Some(url::Host::Ipv4(_)) => true,
165 Some(url::Host::Ipv6(_)) => false,
166 Some(url::Host::Domain(_)) => true,
168 None => false,
169 };
170
171 if !is_ipv4 {
172 verbose!(
173 target: "net::upnp",
174 "[P2P] UPnP: Skipping IPv6 endpoint {} (IGD pinhole not implemented)",
175 self.internal_endpoint
176 );
177 return Err(Error::NetworkServiceStopped);
178 }
179
180 let internal_port = match self.internal_endpoint.port() {
181 Some(port) => port,
182 None => {
183 verbose!(
184 target: "net::upnp",
185 "[P2P] UPnP: Invalid endpoint (missing port): {}",
186 self.internal_endpoint
187 );
188 return Err(Error::NetworkServiceStopped);
189 }
190 };
191
192 let timeout = Duration::from_secs(self.config.discovery_timeout_secs);
193
194 verbose!(
195 target: "net::upnp",
196 "[P2P] UPnP: Attempting port mapping for internal port {}",
197 internal_port
198 );
199
200 let handle = add_port_mapping_lazy(
202 ex.clone(),
203 internal_port,
204 protocol,
205 &self.config.mapping_description,
206 self.config.lease_duration,
207 timeout,
208 )
209 .await?;
210
211 *self.handle.lock().await = Some(handle);
212 Ok(())
213 }
214
215 async fn run_refresh_loop(&self, settings: Arc<AsyncRwLock<Settings>>) -> Result<()> {
217 loop {
218 sleep(self.config.ext_addr_refresh).await;
219
220 let Some(external_url) = self.get_external_address().await else {
221 verbose!(
222 target: "net::upnp",
223 "[P2P] UPnP: Gateway no longer available"
224 );
225 return Err(Error::NetworkServiceStopped);
226 };
227
228 let mut settings = settings.write().await;
230
231 let internal_id = format_address_id(&self.internal_endpoint, "upnp");
233 settings.external_addrs.retain(|addr: &Url| {
234 if let Some(query) = addr.query() {
235 !query.contains(internal_id.as_str())
236 } else {
237 true }
239 });
240
241 settings.external_addrs.push(external_url.clone());
243
244 verbose!(
245 target: "net::upnp",
246 "[P2P] UPnP: Updated external address: {}",
247 external_url
248 );
249 }
250 }
251
252 async fn get_external_address(&self) -> Option<Url> {
254 let handle = self.handle.lock().await;
255 let handle = handle.as_ref()?;
256
257 let external_ip = handle.external_ip().await;
258 if external_ip.is_unspecified() {
259 return None;
260 }
261
262 let external_port = handle.external_port();
263 if external_port == 0 {
264 return None;
265 }
266
267 let scheme = self.internal_endpoint.scheme();
268 let internal_id = format_address_id(&self.internal_endpoint, "upnp");
269
270 Url::parse(&format!(
271 "{}://{}:{}?source=upnp&{}",
272 scheme, external_ip, external_port, internal_id
273 ))
274 .ok()
275 }
276}
277
278#[async_trait]
279impl PortMapping for UpnpPortMapping {
280 fn start(self: Arc<Self>, settings: Arc<AsyncRwLock<Settings>>, ex: ExecutorPtr) -> Result<()> {
281 let self_ = self.clone();
282 let settings_ = settings.clone();
283 let ex_ = ex.clone();
284 self.task.clone().start(
285 async move { self_.run(settings_, &ex_).await },
286 |result| async move {
287 match result {
288 Ok(()) => {
289 error!("[P2P] UPnP task completed unexpectedly");
291 }
292 Err(Error::NetworkServiceStopped) => {
293 }
295 Err(e) => {
296 error!("[P2P] UPnP task error: {e}");
297 }
298 }
299 },
300 Error::NetworkServiceStopped,
301 ex,
302 );
303 Ok(())
304 }
305
306 fn stop(self: Arc<Self>) {
307 self.task.stop_nowait();
309 verbose!(
311 target: "net::upnp",
312 "[P2P] UPnP: Stopped port mapping for {}",
313 self.internal_endpoint
314 );
315 }
316}
317
318pub fn format_address_id(endpoint: &Url, protocol: &str) -> String {
323 let mut hasher = DefaultHasher::new();
325 endpoint.hash(&mut hasher);
326 let hash = hasher.finish();
327
328 format!("{}_cookie={:016x}", protocol, hash)
329}
330
331pub fn create_upnp_from_url(url: &Url) -> Option<Arc<dyn PortMapping>> {
333 if !url.query_pairs().any(|(key, value)| key == "upnp_igd" && value == "true") {
335 return None;
336 }
337
338 let mut config = UpnpConfig::default();
340
341 for (key, value) in url.query_pairs() {
342 match key.as_ref() {
343 "upnp_igd_lease_duration" => {
344 if let Ok(val) = value.parse::<u32>() {
345 config.lease_duration = val;
346 }
347 }
348 "upnp_igd_timeout" => {
349 if let Ok(val) = value.parse::<u64>() {
350 config.discovery_timeout_secs = val;
351 }
352 }
353 "upnp_igd_description" => {
354 config.mapping_description = value.into_owned();
355 }
356 "upnp_igd_ext_addr_refresh" => {
357 if let Ok(val) = value.parse::<u64>() {
358 config.ext_addr_refresh = val;
359 }
360 }
361 _ => {}
362 }
363 }
364
365 Some(Arc::new(UpnpPortMapping::new(config, url.clone())))
366}
367
368pub fn setup_port_mappings(
396 actual_endpoint: &Url,
397 settings: Arc<AsyncRwLock<Settings>>,
398 ex: ExecutorPtr,
399) -> Vec<Arc<dyn PortMapping>> {
400 let Some(mapping) = create_upnp_from_url(actual_endpoint) else { return vec![] };
401
402 if let Err(e) = Arc::clone(&mapping).start(settings.clone(), ex.clone()) {
403 error!(
404 target: "net::upnp",
405 "[P2P] UPnP port mapping: Failed to start for {}: {e}",
406 actual_endpoint
407 );
408 return vec![]
409 }
410
411 verbose!(
412 target: "net::upnp",
413 "[P2P] UPnP: Port mapping started for {}",
414 actual_endpoint
415 );
416 vec![mapping]
417
418 }