1use std::{
20 collections::{HashMap, HashSet},
21 process::exit,
22 sync::Arc,
23 time::UNIX_EPOCH,
24};
25
26use async_trait::async_trait;
27use semver::Version;
28use smol::{
29 lock::{Mutex, MutexGuard},
30 stream::StreamExt,
31 Executor,
32};
33use structopt::StructOpt;
34use structopt_toml::StructOptToml;
35use tinyjson::JsonValue;
36use toml::Value;
37use tracing::{debug, error, info, warn};
38use url::Url;
39
40use darkfi::{
41 async_daemonize, cli_desc,
42 net::{
43 self,
44 hosts::HostColor,
45 settings::{BanPolicy, MagicBytes, NetworkProfile},
46 P2p, P2pPtr,
47 },
48 rpc::{
49 jsonrpc::*,
50 server::{listen_and_serve, RequestHandler},
51 settings::{RpcSettings, RpcSettingsOpt},
52 },
53 system::{sleep, StoppableTask, StoppableTaskPtr},
54 util::path::get_config_path,
55 Error, Result,
56};
57
58const CONFIG_FILE: &str = "lilith_config.toml";
59const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
60
61#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
62#[serde(default)]
63#[structopt(name = "lilith", about = cli_desc!())]
64struct Args {
65 #[structopt(flatten)]
66 rpc: RpcSettingsOpt,
68
69 #[structopt(short, long)]
70 config: Option<String>,
72
73 #[structopt(short, long)]
74 log: Option<String>,
76
77 #[structopt(short, parse(from_occurrences))]
78 verbose: u8,
80
81 #[structopt(long, default_value = "120")]
82 whitelist_refinery_interval: u64,
84}
85
86struct Spawn {
88 pub name: String,
90 pub p2p: P2pPtr,
92}
93
94impl Spawn {
95 async fn get_whitelist(&self) -> Vec<JsonValue> {
96 self.p2p
97 .hosts()
98 .container
99 .fetch_all(HostColor::White)
100 .iter()
101 .map(|(addr, _url)| JsonValue::String(addr.to_string()))
102 .collect()
103 }
104
105 async fn get_greylist(&self) -> Vec<JsonValue> {
106 self.p2p
107 .hosts()
108 .container
109 .fetch_all(HostColor::Grey)
110 .iter()
111 .map(|(addr, _url)| JsonValue::String(addr.to_string()))
112 .collect()
113 }
114
115 async fn get_goldlist(&self) -> Vec<JsonValue> {
116 self.p2p
117 .hosts()
118 .container
119 .fetch_all(HostColor::Gold)
120 .iter()
121 .map(|(addr, _url)| JsonValue::String(addr.to_string()))
122 .collect()
123 }
124
125 async fn info(&self) -> JsonValue {
126 let mut addr_vec = vec![];
127 for addr in &self.p2p.settings().read().await.inbound_addrs {
128 addr_vec.push(JsonValue::String(addr.as_ref().to_string()));
129 }
130
131 JsonValue::Object(HashMap::from([
132 ("name".to_string(), JsonValue::String(self.name.clone())),
133 ("urls".to_string(), JsonValue::Array(addr_vec)),
134 ("whitelist".to_string(), JsonValue::Array(self.get_whitelist().await)),
135 ("greylist".to_string(), JsonValue::Array(self.get_greylist().await)),
136 ("goldlist".to_string(), JsonValue::Array(self.get_goldlist().await)),
137 ]))
138 }
139}
140
141#[derive(Clone)]
143struct NetInfo {
144 pub accept_addrs: Vec<Url>,
146 pub seeds: Vec<Url>,
148 pub peers: Vec<Url>,
150 pub version: Version,
152 pub app_name: String,
154 pub localnet: bool,
156 pub datastore: String,
158 pub hostlist: String,
160 pub magic_bytes: MagicBytes,
162}
163
164struct Lilith {
166 pub networks: Vec<Spawn>,
168 pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
170}
171
172impl Lilith {
173 async fn whitelist_refinery(
186 network_name: String,
187 p2p: P2pPtr,
188 refinery_interval: u64,
189 ) -> Result<()> {
190 debug!(target: "net::refinery::whitelist_refinery", "Starting whitelist refinery for \"{network_name}\"");
191
192 let hosts = p2p.hosts();
193
194 loop {
195 sleep(refinery_interval).await;
196
197 match hosts.container.fetch_last(HostColor::White) {
198 Some(entry) => {
199 let url = &entry.0;
200 let last_seen = &entry.1;
201
202 if !hosts.refinable(url) {
203 debug!(target: "net::refinery::whitelist_refinery", "Addr={} not available!",
204 url.clone());
205
206 continue
207 }
208
209 if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
210 debug!(target: "net::refinery:::whitelist_refinery",
211 "Host {url} is not responsive. Downgrading from whitelist");
212
213 if let Err(e) = hosts.greylist_host(url, *last_seen).await {
214 error!(target: "net::refinery::whitelist_refinery", "Could not send {url} to the greylist: {e}");
215 }
216
217 continue
218 }
219
220 debug!(target: "net::refinery::whitelist_refinery",
221 "Peer {url} is responsive. Updating last_seen");
222
223 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
225 if let Err(e) = hosts.whitelist_host(url, last_seen).await {
226 error!(target: "net::refinery::whitelist_refinery", "Could not send {url} to the whitelist: {e}");
227 }
228 }
229 None => {
230 debug!(target: "net::refinery::whitelist_refinery",
231 "Whitelist is empty! Cannot start refinery process");
232
233 continue
234 }
235 }
236 }
237 }
238 async fn spawns(&self, id: u16, _params: JsonValue) -> JsonResult {
243 let mut spawns = vec![];
244 for spawn in &self.networks {
245 spawns.push(spawn.info().await);
246 }
247
248 let json =
249 JsonValue::Object(HashMap::from([("spawns".to_string(), JsonValue::Array(spawns))]));
250
251 JsonResponse::new(json, id).into()
252 }
253}
254
255#[async_trait]
256impl RequestHandler<()> for Lilith {
257 async fn handle_request(&self, req: JsonRequest) -> JsonResult {
258 return match req.method.as_str() {
259 "ping" => self.pong(req.id, req.params).await,
260 "spawns" => self.spawns(req.id, req.params).await,
261 _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
262 }
263 }
264
265 async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
266 self.rpc_connections.lock().await
267 }
268}
269
270fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
273 let mut ret = HashMap::new();
274
275 if let Value::Table(map) = toml::from_str(data)? {
276 if map.contains_key("network") && map["network"].is_table() {
277 for net in map["network"].as_table().unwrap() {
278 info!(target: "lilith", "Found configuration for network: {}", net.0);
279 let table = net.1.as_table().unwrap();
280 if !table.contains_key("accept_addrs") {
281 warn!(target: "lilith", "Network accept addrs are mandatory, skipping network.");
282 continue
283 }
284
285 if !table.contains_key("hostlist") {
286 error!(target: "lilith", "Hostlist path is mandatory! Configure and try again.");
287 exit(1)
288 }
289
290 let name = net.0.to_string();
291 let accept_addrs: Vec<Url> = table["accept_addrs"]
292 .as_array()
293 .unwrap()
294 .iter()
295 .map(|x| Url::parse(x.as_str().unwrap()).unwrap())
296 .collect();
297
298 let mut seeds = vec![];
299 if table.contains_key("seeds") {
300 if let Some(s) = table["seeds"].as_array() {
301 for seed in s {
302 if let Some(u) = seed.as_str() {
303 if let Ok(url) = Url::parse(u) {
304 seeds.push(url);
305 }
306 }
307 }
308 }
309 }
310
311 let mut peers = vec![];
312 if table.contains_key("peers") {
313 if let Some(p) = table["peers"].as_array() {
314 for peer in p {
315 if let Some(u) = peer.as_str() {
316 if let Ok(url) = Url::parse(u) {
317 peers.push(url);
318 }
319 }
320 }
321 }
322 }
323
324 let localnet = if table.contains_key("localnet") {
325 table["localnet"].as_bool().unwrap()
326 } else {
327 false
328 };
329
330 let version = if table.contains_key("version") {
331 semver::Version::parse(table["version"].as_str().unwrap())?
332 } else {
333 semver::Version::parse(option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0"))?
334 };
335
336 let app_name = if table.contains_key("app_name") {
337 table["app_name"].as_str().unwrap().to_string()
338 } else {
339 String::new()
340 };
341
342 let datastore: String = table["datastore"].as_str().unwrap().to_string();
343
344 let hostlist: String = table["hostlist"].as_str().unwrap().to_string();
345
346 let magic_bytes: [u8; 4] = table["magic_bytes"]
347 .as_array()
348 .unwrap()
349 .iter()
350 .map(|v| v.as_integer().unwrap() as u8)
351 .collect::<Vec<u8>>()
352 .try_into()
353 .expect("Wrong magic bytes value");
354
355 let net_info = NetInfo {
356 accept_addrs,
357 seeds,
358 peers,
359 version,
360 app_name,
361 localnet,
362 datastore,
363 hostlist,
364 magic_bytes: MagicBytes(magic_bytes),
365 };
366 ret.insert(name, net_info);
367 }
368 }
369 }
370
371 Ok(ret)
372}
373
374async fn spawn_net(name: String, info: &NetInfo, ex: Arc<Executor<'static>>) -> Result<Spawn> {
375 let mut listen_urls = vec![];
376
377 for url in &info.accept_addrs {
379 listen_urls.push(url.clone());
380 }
381
382 let mut profiles = HashMap::new();
383 profiles.insert("tcp+tls".to_string(), NetworkProfile::default());
384 profiles.insert("tcp".to_string(), NetworkProfile::default());
385 profiles.insert("tor".to_string(), NetworkProfile::tor_default());
386 profiles.insert("i2p".to_string(), NetworkProfile::tor_default());
387 profiles.insert("tor+tls".to_string(), NetworkProfile::tor_default());
388 profiles.insert("i2p+tls".to_string(), NetworkProfile::tor_default());
389
390 let settings = net::Settings {
392 magic_bytes: info.magic_bytes.clone(),
393 inbound_addrs: listen_urls.clone(),
394 seeds: info.seeds.clone(),
395 peers: info.peers.clone(),
396 outbound_connections: 0,
397 inbound_connections: 512,
398 app_version: info.version.clone(),
399 app_name: info.app_name.clone(),
400 localnet: info.localnet,
401 p2p_datastore: Some(info.datastore.clone()),
402 hostlist: Some(info.hostlist.clone()),
403 active_profiles: vec![
404 "tcp".to_string(),
405 "tcp+tls".to_string(),
406 "tor".to_string(),
407 "tor+tls".to_string(),
408 "nym".to_string(),
409 "nym+tls".to_string(),
410 "i2p".to_string(),
411 "i2p+tls".to_string(),
412 ],
413 ban_policy: BanPolicy::Relaxed,
414 profiles,
415 ..Default::default()
416 };
417
418 let p2p = P2p::new(settings, ex.clone()).await?;
420
421 let addrs_str: Vec<&str> = listen_urls.iter().map(|x| x.as_str()).collect();
422 info!(target: "lilith", "Starting seed network node for \"{name}\" on {addrs_str:?}");
423 p2p.clone().start().await?;
424
425 let spawn = Spawn { name, p2p };
426 Ok(spawn)
427}
428
429async_daemonize!(realmain);
430async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
431 let cfg_path = get_config_path(args.config, CONFIG_FILE)?;
433 let toml_contents = std::fs::read_to_string(cfg_path)?;
434 let configured_nets = parse_configured_networks(&toml_contents)?;
435
436 if configured_nets.is_empty() {
437 error!(target: "lilith", "No networks are enabled in config");
438 exit(1);
439 }
440
441 let mut networks = vec![];
443 for (name, info) in &configured_nets {
444 match spawn_net(name.to_string(), info, ex.clone()).await {
445 Ok(spawn) => networks.push(spawn),
446 Err(e) => {
447 error!(target: "lilith", "Failed to start P2P network seed for \"{name}\": {e}");
448 exit(1);
449 }
450 }
451 }
452
453 let lilith = Arc::new(Lilith { networks, rpc_connections: Mutex::new(HashSet::new()) });
455 let mut refinery_tasks = HashMap::new();
456 for network in &lilith.networks {
457 let name = network.name.clone();
458 let task = StoppableTask::new();
459 task.clone().start(
460 Lilith::whitelist_refinery(name.clone(), network.p2p.clone(), args.whitelist_refinery_interval),
461 |res| async move {
462 match res {
463 Ok(()) | Err(Error::DetachedTaskStopped) => { }
464 Err(e) => error!(target: "lilith", "Failed starting refinery task for \"{name}\": {e}"),
465 }
466 },
467 Error::DetachedTaskStopped,
468 ex.clone(),
469 );
470 refinery_tasks.insert(network.name.clone(), task);
471 }
472
473 let rpc_settings: RpcSettings = args.rpc.into();
475 info!(target: "lilith", "Starting JSON-RPC server on {}", rpc_settings.listen);
476 let lilith_ = lilith.clone();
477 let rpc_task = StoppableTask::new();
478 rpc_task.clone().start(
479 listen_and_serve(rpc_settings, lilith.clone(), None, ex.clone()),
480 |res| async move {
481 match res {
482 Ok(()) | Err(Error::RpcServerStopped) => lilith_.stop_connections().await,
483 Err(e) => error!(target: "lilith", "Failed starting JSON-RPC server: {e}"),
484 }
485 },
486 Error::RpcServerStopped,
487 ex.clone(),
488 );
489
490 let (signals_handler, signals_task) = SignalHandler::new(ex)?;
492 signals_handler.wait_termination(signals_task).await?;
493 info!(target: "lilith", "Caught termination signal, cleaning up and exiting...");
494
495 info!(target: "lilith", "Stopping JSON-RPC server...");
496 rpc_task.stop().await;
497
498 for spawn in &lilith.networks {
500 info!(target: "lilith", "Stopping \"{}\" task", spawn.name);
501 refinery_tasks.get(&spawn.name).unwrap().stop().await;
502 info!(target: "lilith", "Stopping \"{}\" P2P", spawn.name);
503 spawn.p2p.stop().await;
504 }
505
506 info!(target: "lilith", "Bye!");
507 Ok(())
508}