lilith/
main.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    process::exit,
22    sync::Arc,
23    time::UNIX_EPOCH,
24};
25
26use async_trait::async_trait;
27use semver::Version;
28use smol::{
29    lock::{Mutex, MutexGuard},
30    stream::StreamExt,
31    Executor,
32};
33use structopt::StructOpt;
34use structopt_toml::StructOptToml;
35use tinyjson::JsonValue;
36use toml::Value;
37use tracing::{debug, error, info, warn};
38use url::Url;
39
40use darkfi::{
41    async_daemonize, cli_desc,
42    net::{
43        self,
44        hosts::HostColor,
45        settings::{BanPolicy, MagicBytes, NetworkProfile},
46        P2p, P2pPtr,
47    },
48    rpc::{
49        jsonrpc::*,
50        server::{listen_and_serve, RequestHandler},
51        settings::{RpcSettings, RpcSettingsOpt},
52    },
53    system::{sleep, StoppableTask, StoppableTaskPtr},
54    util::path::get_config_path,
55    Error, Result,
56};
57
58const CONFIG_FILE: &str = "lilith_config.toml";
59const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
60
61#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
62#[serde(default)]
63#[structopt(name = "lilith", about = cli_desc!())]
64struct Args {
65    #[structopt(flatten)]
66    /// JSON-RPC settings
67    rpc: RpcSettingsOpt,
68
69    #[structopt(short, long)]
70    /// Configuration file to use
71    config: Option<String>,
72
73    #[structopt(short, long)]
74    /// Set log file to ouput into
75    log: Option<String>,
76
77    #[structopt(short, parse(from_occurrences))]
78    /// Increase verbosity (-vvv supported)
79    verbose: u8,
80
81    #[structopt(long, default_value = "120")]
82    /// Interval after which to check whitelist peers
83    whitelist_refinery_interval: u64,
84}
85
86/// Struct representing a spawned P2P network
87struct Spawn {
88    /// String identifier,
89    pub name: String,
90    /// P2P pointer
91    pub p2p: P2pPtr,
92}
93
94impl Spawn {
95    async fn get_whitelist(&self) -> Vec<JsonValue> {
96        self.p2p
97            .hosts()
98            .container
99            .fetch_all(HostColor::White)
100            .iter()
101            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
102            .collect()
103    }
104
105    async fn get_greylist(&self) -> Vec<JsonValue> {
106        self.p2p
107            .hosts()
108            .container
109            .fetch_all(HostColor::Grey)
110            .iter()
111            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
112            .collect()
113    }
114
115    async fn get_goldlist(&self) -> Vec<JsonValue> {
116        self.p2p
117            .hosts()
118            .container
119            .fetch_all(HostColor::Gold)
120            .iter()
121            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
122            .collect()
123    }
124
125    async fn info(&self) -> JsonValue {
126        let mut addr_vec = vec![];
127        for addr in &self.p2p.settings().read().await.inbound_addrs {
128            addr_vec.push(JsonValue::String(addr.as_ref().to_string()));
129        }
130
131        JsonValue::Object(HashMap::from([
132            ("name".to_string(), JsonValue::String(self.name.clone())),
133            ("urls".to_string(), JsonValue::Array(addr_vec)),
134            ("whitelist".to_string(), JsonValue::Array(self.get_whitelist().await)),
135            ("greylist".to_string(), JsonValue::Array(self.get_greylist().await)),
136            ("goldlist".to_string(), JsonValue::Array(self.get_goldlist().await)),
137        ]))
138    }
139}
140
141/// Defines the network-specific settings
142#[derive(Clone)]
143struct NetInfo {
144    /// Accept addresses the network will use
145    pub accept_addrs: Vec<Url>,
146    /// Other seeds to connect to
147    pub seeds: Vec<Url>,
148    /// Manual peers to connect to
149    pub peers: Vec<Url>,
150    /// Supported network version
151    pub version: Version,
152    /// App Identifier for the app running on the network
153    pub app_name: String,
154    /// Enable localnet hosts
155    pub localnet: bool,
156    /// Path to P2P datastore
157    pub datastore: String,
158    /// Path to hostlist
159    pub hostlist: String,
160    /// Magic bytes used to distinguish the p2p network
161    pub magic_bytes: MagicBytes,
162}
163
164/// Struct representing the daemon
165struct Lilith {
166    /// Spawned networks
167    pub networks: Vec<Spawn>,
168    /// JSON-RPC connection tracker
169    pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
170}
171
172impl Lilith {
173    /// Since `Lilith` does not make outbound connections, if a peer is
174    /// upgraded to whitelist it will remain on the whitelist even if the
175    /// give peer is no longer online.
176    ///
177    /// To protect `Lilith` from sharing potentially offline nodes,
178    /// `whitelist_refinery` periodically ping nodes on the whitelist. If they
179    /// are reachable, we update their last seen field. Otherwise, we downgrade
180    /// them to the greylist.
181    ///
182    /// Note: if `Lilith` loses connectivity this method will delete peers from
183    /// the whitelist, meaning `Lilith` will need to rebuild its hostlist when
184    /// it comes back online.
185    async fn whitelist_refinery(
186        network_name: String,
187        p2p: P2pPtr,
188        refinery_interval: u64,
189    ) -> Result<()> {
190        debug!(target: "net::refinery::whitelist_refinery", "Starting whitelist refinery for \"{network_name}\"");
191
192        let hosts = p2p.hosts();
193
194        loop {
195            sleep(refinery_interval).await;
196
197            match hosts.container.fetch_last(HostColor::White) {
198                Some(entry) => {
199                    let url = &entry.0;
200                    let last_seen = &entry.1;
201
202                    if !hosts.refinable(url) {
203                        debug!(target: "net::refinery::whitelist_refinery", "Addr={} not available!",
204                       url.clone());
205
206                        continue
207                    }
208
209                    if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
210                        debug!(target: "net::refinery:::whitelist_refinery",
211                       "Host {url} is not responsive. Downgrading from whitelist");
212
213                        if let Err(e) = hosts.greylist_host(url, *last_seen).await {
214                            error!(target: "net::refinery::whitelist_refinery", "Could not send {url} to the greylist: {e}");
215                        }
216
217                        continue
218                    }
219
220                    debug!(target: "net::refinery::whitelist_refinery",
221                   "Peer {url} is responsive. Updating last_seen");
222
223                    // This node is active. Update the last seen field.
224                    let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
225                    if let Err(e) = hosts.whitelist_host(url, last_seen).await {
226                        error!(target: "net::refinery::whitelist_refinery", "Could not send {url} to the whitelist: {e}");
227                    }
228                }
229                None => {
230                    debug!(target: "net::refinery::whitelist_refinery",
231                              "Whitelist is empty! Cannot start refinery process");
232
233                    continue
234                }
235            }
236        }
237    }
238    // RPCAPI:
239    // Returns all spawned networks names with their node addresses.
240    // --> {"jsonrpc": "2.0", "method": "spawns", "params": [], "id": 42}
241    // <-- {"jsonrpc": "2.0", "result": {"spawns": spawns_info}, "id": 42}
242    async fn spawns(&self, id: u16, _params: JsonValue) -> JsonResult {
243        let mut spawns = vec![];
244        for spawn in &self.networks {
245            spawns.push(spawn.info().await);
246        }
247
248        let json =
249            JsonValue::Object(HashMap::from([("spawns".to_string(), JsonValue::Array(spawns))]));
250
251        JsonResponse::new(json, id).into()
252    }
253}
254
255#[async_trait]
256impl RequestHandler<()> for Lilith {
257    async fn handle_request(&self, req: JsonRequest) -> JsonResult {
258        return match req.method.as_str() {
259            "ping" => self.pong(req.id, req.params).await,
260            "spawns" => self.spawns(req.id, req.params).await,
261            _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
262        }
263    }
264
265    async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
266        self.rpc_connections.lock().await
267    }
268}
269
270/// Parse a TOML string for any configured network and return a map containing
271/// said configurations.
272fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
273    let mut ret = HashMap::new();
274
275    if let Value::Table(map) = toml::from_str(data)? {
276        if map.contains_key("network") && map["network"].is_table() {
277            for net in map["network"].as_table().unwrap() {
278                info!(target: "lilith", "Found configuration for network: {}", net.0);
279                let table = net.1.as_table().unwrap();
280                if !table.contains_key("accept_addrs") {
281                    warn!(target: "lilith", "Network accept addrs are mandatory, skipping network.");
282                    continue
283                }
284
285                if !table.contains_key("hostlist") {
286                    error!(target: "lilith", "Hostlist path is mandatory! Configure and try again.");
287                    exit(1)
288                }
289
290                let name = net.0.to_string();
291                let accept_addrs: Vec<Url> = table["accept_addrs"]
292                    .as_array()
293                    .unwrap()
294                    .iter()
295                    .map(|x| Url::parse(x.as_str().unwrap()).unwrap())
296                    .collect();
297
298                let mut seeds = vec![];
299                if table.contains_key("seeds") {
300                    if let Some(s) = table["seeds"].as_array() {
301                        for seed in s {
302                            if let Some(u) = seed.as_str() {
303                                if let Ok(url) = Url::parse(u) {
304                                    seeds.push(url);
305                                }
306                            }
307                        }
308                    }
309                }
310
311                let mut peers = vec![];
312                if table.contains_key("peers") {
313                    if let Some(p) = table["peers"].as_array() {
314                        for peer in p {
315                            if let Some(u) = peer.as_str() {
316                                if let Ok(url) = Url::parse(u) {
317                                    peers.push(url);
318                                }
319                            }
320                        }
321                    }
322                }
323
324                let localnet = if table.contains_key("localnet") {
325                    table["localnet"].as_bool().unwrap()
326                } else {
327                    false
328                };
329
330                let version = if table.contains_key("version") {
331                    semver::Version::parse(table["version"].as_str().unwrap())?
332                } else {
333                    semver::Version::parse(option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0"))?
334                };
335
336                let app_name = if table.contains_key("app_name") {
337                    table["app_name"].as_str().unwrap().to_string()
338                } else {
339                    String::new()
340                };
341
342                let datastore: String = table["datastore"].as_str().unwrap().to_string();
343
344                let hostlist: String = table["hostlist"].as_str().unwrap().to_string();
345
346                let magic_bytes: [u8; 4] = table["magic_bytes"]
347                    .as_array()
348                    .unwrap()
349                    .iter()
350                    .map(|v| v.as_integer().unwrap() as u8)
351                    .collect::<Vec<u8>>()
352                    .try_into()
353                    .expect("Wrong magic bytes value");
354
355                let net_info = NetInfo {
356                    accept_addrs,
357                    seeds,
358                    peers,
359                    version,
360                    app_name,
361                    localnet,
362                    datastore,
363                    hostlist,
364                    magic_bytes: MagicBytes(magic_bytes),
365                };
366                ret.insert(name, net_info);
367            }
368        }
369    }
370
371    Ok(ret)
372}
373
374async fn spawn_net(name: String, info: &NetInfo, ex: Arc<Executor<'static>>) -> Result<Spawn> {
375    let mut listen_urls = vec![];
376
377    // Configure listen addrs for this network
378    for url in &info.accept_addrs {
379        listen_urls.push(url.clone());
380    }
381
382    let mut profiles = HashMap::new();
383    profiles.insert("tcp+tls".to_string(), NetworkProfile::default());
384    profiles.insert("tcp".to_string(), NetworkProfile::default());
385    profiles.insert("tor".to_string(), NetworkProfile::tor_default());
386    profiles.insert("i2p".to_string(), NetworkProfile::tor_default());
387    profiles.insert("tor+tls".to_string(), NetworkProfile::tor_default());
388    profiles.insert("i2p+tls".to_string(), NetworkProfile::tor_default());
389
390    // P2P network settings
391    let settings = net::Settings {
392        magic_bytes: info.magic_bytes.clone(),
393        inbound_addrs: listen_urls.clone(),
394        seeds: info.seeds.clone(),
395        peers: info.peers.clone(),
396        outbound_connections: 0,
397        inbound_connections: 512,
398        app_version: info.version.clone(),
399        app_name: info.app_name.clone(),
400        localnet: info.localnet,
401        p2p_datastore: Some(info.datastore.clone()),
402        hostlist: Some(info.hostlist.clone()),
403        active_profiles: vec![
404            "tcp".to_string(),
405            "tcp+tls".to_string(),
406            "tor".to_string(),
407            "tor+tls".to_string(),
408            "nym".to_string(),
409            "nym+tls".to_string(),
410            "i2p".to_string(),
411            "i2p+tls".to_string(),
412        ],
413        ban_policy: BanPolicy::Relaxed,
414        profiles,
415        ..Default::default()
416    };
417
418    // Create P2P instance
419    let p2p = P2p::new(settings, ex.clone()).await?;
420
421    let addrs_str: Vec<&str> = listen_urls.iter().map(|x| x.as_str()).collect();
422    info!(target: "lilith", "Starting seed network node for \"{name}\" on {addrs_str:?}");
423    p2p.clone().start().await?;
424
425    let spawn = Spawn { name, p2p };
426    Ok(spawn)
427}
428
429async_daemonize!(realmain);
430async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
431    // Pick up network settings from the TOML config
432    let cfg_path = get_config_path(args.config, CONFIG_FILE)?;
433    let toml_contents = std::fs::read_to_string(cfg_path)?;
434    let configured_nets = parse_configured_networks(&toml_contents)?;
435
436    if configured_nets.is_empty() {
437        error!(target: "lilith", "No networks are enabled in config");
438        exit(1);
439    }
440
441    // Spawn configured networks
442    let mut networks = vec![];
443    for (name, info) in &configured_nets {
444        match spawn_net(name.to_string(), info, ex.clone()).await {
445            Ok(spawn) => networks.push(spawn),
446            Err(e) => {
447                error!(target: "lilith", "Failed to start P2P network seed for \"{name}\": {e}");
448                exit(1);
449            }
450        }
451    }
452
453    // Set up main daemon and background refinery_tasks
454    let lilith = Arc::new(Lilith { networks, rpc_connections: Mutex::new(HashSet::new()) });
455    let mut refinery_tasks = HashMap::new();
456    for network in &lilith.networks {
457        let name = network.name.clone();
458        let task = StoppableTask::new();
459        task.clone().start(
460            Lilith::whitelist_refinery(name.clone(), network.p2p.clone(), args.whitelist_refinery_interval),
461            |res| async move {
462                match res {
463                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
464                    Err(e) => error!(target: "lilith", "Failed starting refinery task for \"{name}\": {e}"),
465                }
466            },
467            Error::DetachedTaskStopped,
468            ex.clone(),
469        );
470        refinery_tasks.insert(network.name.clone(), task);
471    }
472
473    // JSON-RPC server
474    let rpc_settings: RpcSettings = args.rpc.into();
475    info!(target: "lilith", "Starting JSON-RPC server on {}", rpc_settings.listen);
476    let lilith_ = lilith.clone();
477    let rpc_task = StoppableTask::new();
478    rpc_task.clone().start(
479        listen_and_serve(rpc_settings, lilith.clone(), None, ex.clone()),
480        |res| async move {
481            match res {
482                Ok(()) | Err(Error::RpcServerStopped) => lilith_.stop_connections().await,
483                Err(e) => error!(target: "lilith", "Failed starting JSON-RPC server: {e}"),
484            }
485        },
486        Error::RpcServerStopped,
487        ex.clone(),
488    );
489
490    // Signal handling for graceful termination.
491    let (signals_handler, signals_task) = SignalHandler::new(ex)?;
492    signals_handler.wait_termination(signals_task).await?;
493    info!(target: "lilith", "Caught termination signal, cleaning up and exiting...");
494
495    info!(target: "lilith", "Stopping JSON-RPC server...");
496    rpc_task.stop().await;
497
498    // Cleanly stop p2p networks
499    for spawn in &lilith.networks {
500        info!(target: "lilith", "Stopping \"{}\" task", spawn.name);
501        refinery_tasks.get(&spawn.name).unwrap().stop().await;
502        info!(target: "lilith", "Stopping \"{}\" P2P", spawn.name);
503        spawn.p2p.stop().await;
504    }
505
506    info!(target: "lilith", "Bye!");
507    Ok(())
508}