lilith/
main.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    process::exit,
22    sync::Arc,
23    time::UNIX_EPOCH,
24};
25
26use async_trait::async_trait;
27use semver::Version;
28use smol::{
29    lock::{Mutex, MutexGuard},
30    stream::StreamExt,
31    Executor,
32};
33use structopt::StructOpt;
34use structopt_toml::StructOptToml;
35use tinyjson::JsonValue;
36use toml::Value;
37use tracing::{debug, error, info, warn};
38use url::Url;
39
40use darkfi::{
41    async_daemonize, cli_desc,
42    net::{
43        self,
44        hosts::HostColor,
45        settings::{BanPolicy, MagicBytes, NetworkProfile},
46        P2p, P2pPtr,
47    },
48    rpc::{
49        jsonrpc::*,
50        server::{listen_and_serve, RequestHandler},
51        settings::{RpcSettings, RpcSettingsOpt},
52    },
53    system::{sleep, StoppableTask, StoppableTaskPtr},
54    util::path::get_config_path,
55    Error, Result,
56};
57
58const CONFIG_FILE: &str = "lilith_config.toml";
59const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
60
61#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
62#[serde(default)]
63#[structopt(name = "lilith", about = cli_desc!())]
64struct Args {
65    #[structopt(flatten)]
66    /// JSON-RPC settings
67    rpc: RpcSettingsOpt,
68
69    #[structopt(short, long)]
70    /// Configuration file to use
71    config: Option<String>,
72
73    #[structopt(short, long)]
74    /// Set log file to ouput into
75    log: Option<String>,
76
77    #[structopt(short, parse(from_occurrences))]
78    /// Increase verbosity (-vvv supported)
79    verbose: u8,
80
81    #[structopt(long, default_value = "120")]
82    /// Interval after which to check whitelist peers
83    whitelist_refinery_interval: u64,
84}
85
86/// Struct representing a spawned P2P network
87struct Spawn {
88    /// String identifier,
89    pub name: String,
90    /// P2P pointer
91    pub p2p: P2pPtr,
92}
93
94impl Spawn {
95    async fn get_whitelist(&self) -> Vec<JsonValue> {
96        self.p2p
97            .hosts()
98            .container
99            .fetch_all(HostColor::White)
100            .iter()
101            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
102            .collect()
103    }
104
105    async fn get_greylist(&self) -> Vec<JsonValue> {
106        self.p2p
107            .hosts()
108            .container
109            .fetch_all(HostColor::Grey)
110            .iter()
111            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
112            .collect()
113    }
114
115    async fn get_goldlist(&self) -> Vec<JsonValue> {
116        self.p2p
117            .hosts()
118            .container
119            .fetch_all(HostColor::Gold)
120            .iter()
121            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
122            .collect()
123    }
124
125    async fn info(&self) -> JsonValue {
126        let mut addr_vec = vec![];
127        for addr in &self.p2p.settings().read().await.inbound_addrs {
128            addr_vec.push(JsonValue::String(addr.as_ref().to_string()));
129        }
130
131        JsonValue::Object(HashMap::from([
132            ("name".to_string(), JsonValue::String(self.name.clone())),
133            ("urls".to_string(), JsonValue::Array(addr_vec)),
134            ("whitelist".to_string(), JsonValue::Array(self.get_whitelist().await)),
135            ("greylist".to_string(), JsonValue::Array(self.get_greylist().await)),
136            ("goldlist".to_string(), JsonValue::Array(self.get_goldlist().await)),
137        ]))
138    }
139}
140
141/// Defines the network-specific settings
142#[derive(Clone)]
143struct NetInfo {
144    /// Accept addresses the network will use
145    pub accept_addrs: Vec<Url>,
146    /// Other seeds to connect to
147    pub seeds: Vec<Url>,
148    /// Manual peers to connect to
149    pub peers: Vec<Url>,
150    /// Supported network version
151    pub version: Version,
152    /// App Identifier for the app running on the network
153    pub app_name: String,
154    /// Enable localnet hosts
155    pub localnet: bool,
156    /// Path to P2P datastore
157    pub datastore: String,
158    /// Path to hostlist
159    pub hostlist: String,
160    /// Magic bytes used to distinguish the p2p network
161    pub magic_bytes: MagicBytes,
162}
163
164/// Struct representing the daemon
165struct Lilith {
166    /// Spawned networks
167    pub networks: Vec<Spawn>,
168    /// JSON-RPC connection tracker
169    pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
170}
171
172impl Lilith {
173    /// Since `Lilith` does not make outbound connections, if a peer is
174    /// upgraded to whitelist it will remain on the whitelist even if the
175    /// give peer is no longer online.
176    ///
177    /// To protect `Lilith` from sharing potentially offline nodes,
178    /// `whitelist_refinery` periodically ping nodes on the whitelist. If they
179    /// are reachable, we update their last seen field. Otherwise, we downgrade
180    /// them to the greylist.
181    ///
182    /// Note: if `Lilith` loses connectivity this method will delete peers from
183    /// the whitelist, meaning `Lilith` will need to rebuild its hostlist when
184    /// it comes back online.
185    async fn whitelist_refinery(
186        network_name: String,
187        p2p: P2pPtr,
188        refinery_interval: u64,
189    ) -> Result<()> {
190        debug!(target: "net::refinery::whitelist_refinery", "Starting whitelist refinery for \"{network_name}\"");
191
192        let hosts = p2p.hosts();
193
194        loop {
195            sleep(refinery_interval).await;
196
197            match hosts.container.fetch_last(HostColor::White) {
198                Some(entry) => {
199                    let url = &entry.0;
200                    let last_seen = &entry.1;
201
202                    if !hosts.refinable(url.clone()) {
203                        debug!(target: "net::refinery::whitelist_refinery", "Addr={} not available!",
204                       url.clone());
205
206                        continue
207                    }
208
209                    if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
210                        debug!(target: "net::refinery:::whitelist_refinery",
211                       "Host {url} is not responsive. Downgrading from whitelist");
212
213                        hosts.greylist_host(url, *last_seen).await?;
214
215                        continue
216                    }
217
218                    debug!(target: "net::refinery::whitelist_refinery",
219                   "Peer {url} is responsive. Updating last_seen");
220
221                    // This node is active. Update the last seen field.
222                    let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
223
224                    hosts.whitelist_host(url, last_seen).await?;
225                }
226                None => {
227                    debug!(target: "net::refinery::whitelist_refinery",
228                              "Whitelist is empty! Cannot start refinery process");
229
230                    continue
231                }
232            }
233        }
234    }
235    // RPCAPI:
236    // Returns all spawned networks names with their node addresses.
237    // --> {"jsonrpc": "2.0", "method": "spawns", "params": [], "id": 42}
238    // <-- {"jsonrpc": "2.0", "result": {"spawns": spawns_info}, "id": 42}
239    async fn spawns(&self, id: u16, _params: JsonValue) -> JsonResult {
240        let mut spawns = vec![];
241        for spawn in &self.networks {
242            spawns.push(spawn.info().await);
243        }
244
245        let json =
246            JsonValue::Object(HashMap::from([("spawns".to_string(), JsonValue::Array(spawns))]));
247
248        JsonResponse::new(json, id).into()
249    }
250}
251
252#[async_trait]
253impl RequestHandler<()> for Lilith {
254    async fn handle_request(&self, req: JsonRequest) -> JsonResult {
255        return match req.method.as_str() {
256            "ping" => self.pong(req.id, req.params).await,
257            "spawns" => self.spawns(req.id, req.params).await,
258            _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
259        }
260    }
261
262    async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
263        self.rpc_connections.lock().await
264    }
265}
266
267/// Parse a TOML string for any configured network and return a map containing
268/// said configurations.
269fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
270    let mut ret = HashMap::new();
271
272    if let Value::Table(map) = toml::from_str(data)? {
273        if map.contains_key("network") && map["network"].is_table() {
274            for net in map["network"].as_table().unwrap() {
275                info!(target: "lilith", "Found configuration for network: {}", net.0);
276                let table = net.1.as_table().unwrap();
277                if !table.contains_key("accept_addrs") {
278                    warn!(target: "lilith", "Network accept addrs are mandatory, skipping network.");
279                    continue
280                }
281
282                if !table.contains_key("hostlist") {
283                    error!(target: "lilith", "Hostlist path is mandatory! Configure and try again.");
284                    exit(1)
285                }
286
287                let name = net.0.to_string();
288                let accept_addrs: Vec<Url> = table["accept_addrs"]
289                    .as_array()
290                    .unwrap()
291                    .iter()
292                    .map(|x| Url::parse(x.as_str().unwrap()).unwrap())
293                    .collect();
294
295                let mut seeds = vec![];
296                if table.contains_key("seeds") {
297                    if let Some(s) = table["seeds"].as_array() {
298                        for seed in s {
299                            if let Some(u) = seed.as_str() {
300                                if let Ok(url) = Url::parse(u) {
301                                    seeds.push(url);
302                                }
303                            }
304                        }
305                    }
306                }
307
308                let mut peers = vec![];
309                if table.contains_key("peers") {
310                    if let Some(p) = table["peers"].as_array() {
311                        for peer in p {
312                            if let Some(u) = peer.as_str() {
313                                if let Ok(url) = Url::parse(u) {
314                                    peers.push(url);
315                                }
316                            }
317                        }
318                    }
319                }
320
321                let localnet = if table.contains_key("localnet") {
322                    table["localnet"].as_bool().unwrap()
323                } else {
324                    false
325                };
326
327                let version = if table.contains_key("version") {
328                    semver::Version::parse(table["version"].as_str().unwrap())?
329                } else {
330                    semver::Version::parse(option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0"))?
331                };
332
333                let app_name = if table.contains_key("app_name") {
334                    table["app_name"].as_str().unwrap().to_string()
335                } else {
336                    String::new()
337                };
338
339                let datastore: String = table["datastore"].as_str().unwrap().to_string();
340
341                let hostlist: String = table["hostlist"].as_str().unwrap().to_string();
342
343                let magic_bytes: [u8; 4] = table["magic_bytes"]
344                    .as_array()
345                    .unwrap()
346                    .iter()
347                    .map(|v| v.as_integer().unwrap() as u8)
348                    .collect::<Vec<u8>>()
349                    .try_into()
350                    .expect("Wrong magic bytes value");
351
352                let net_info = NetInfo {
353                    accept_addrs,
354                    seeds,
355                    peers,
356                    version,
357                    app_name,
358                    localnet,
359                    datastore,
360                    hostlist,
361                    magic_bytes: MagicBytes(magic_bytes),
362                };
363                ret.insert(name, net_info);
364            }
365        }
366    }
367
368    Ok(ret)
369}
370
371async fn spawn_net(name: String, info: &NetInfo, ex: Arc<Executor<'static>>) -> Result<Spawn> {
372    let mut listen_urls = vec![];
373
374    // Configure listen addrs for this network
375    for url in &info.accept_addrs {
376        listen_urls.push(url.clone());
377    }
378
379    let mut profiles = HashMap::new();
380    profiles.insert("tcp+tls".to_string(), NetworkProfile::default());
381    profiles.insert("tcp".to_string(), NetworkProfile::default());
382    profiles.insert("tor".to_string(), NetworkProfile::tor_default());
383    profiles.insert("i2p".to_string(), NetworkProfile::tor_default());
384    profiles.insert("tor+tls".to_string(), NetworkProfile::tor_default());
385    profiles.insert("i2p+tls".to_string(), NetworkProfile::tor_default());
386
387    // P2P network settings
388    let settings = net::Settings {
389        magic_bytes: info.magic_bytes.clone(),
390        inbound_addrs: listen_urls.clone(),
391        seeds: info.seeds.clone(),
392        peers: info.peers.clone(),
393        outbound_connections: 0,
394        inbound_connections: 512,
395        app_version: info.version.clone(),
396        app_name: info.app_name.clone(),
397        localnet: info.localnet,
398        p2p_datastore: Some(info.datastore.clone()),
399        hostlist: Some(info.hostlist.clone()),
400        active_profiles: vec![
401            "tcp".to_string(),
402            "tcp+tls".to_string(),
403            "tor".to_string(),
404            "tor+tls".to_string(),
405            "nym".to_string(),
406            "nym+tls".to_string(),
407            "i2p".to_string(),
408            "i2p+tls".to_string(),
409        ],
410        ban_policy: BanPolicy::Relaxed,
411        profiles,
412        ..Default::default()
413    };
414
415    // Create P2P instance
416    let p2p = P2p::new(settings, ex.clone()).await?;
417
418    let addrs_str: Vec<&str> = listen_urls.iter().map(|x| x.as_str()).collect();
419    info!(target: "lilith", "Starting seed network node for \"{name}\" on {addrs_str:?}");
420    p2p.clone().start().await?;
421
422    let spawn = Spawn { name, p2p };
423    Ok(spawn)
424}
425
426async_daemonize!(realmain);
427async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
428    // Pick up network settings from the TOML config
429    let cfg_path = get_config_path(args.config, CONFIG_FILE)?;
430    let toml_contents = std::fs::read_to_string(cfg_path)?;
431    let configured_nets = parse_configured_networks(&toml_contents)?;
432
433    if configured_nets.is_empty() {
434        error!(target: "lilith", "No networks are enabled in config");
435        exit(1);
436    }
437
438    // Spawn configured networks
439    let mut networks = vec![];
440    for (name, info) in &configured_nets {
441        match spawn_net(name.to_string(), info, ex.clone()).await {
442            Ok(spawn) => networks.push(spawn),
443            Err(e) => {
444                error!(target: "lilith", "Failed to start P2P network seed for \"{name}\": {e}");
445                exit(1);
446            }
447        }
448    }
449
450    // Set up main daemon and background refinery_tasks
451    let lilith = Arc::new(Lilith { networks, rpc_connections: Mutex::new(HashSet::new()) });
452    let mut refinery_tasks = HashMap::new();
453    for network in &lilith.networks {
454        let name = network.name.clone();
455        let task = StoppableTask::new();
456        task.clone().start(
457            Lilith::whitelist_refinery(name.clone(), network.p2p.clone(), args.whitelist_refinery_interval),
458            |res| async move {
459                match res {
460                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
461                    Err(e) => error!(target: "lilith", "Failed starting refinery task for \"{name}\": {e}"),
462                }
463            },
464            Error::DetachedTaskStopped,
465            ex.clone(),
466        );
467        refinery_tasks.insert(network.name.clone(), task);
468    }
469
470    // JSON-RPC server
471    let rpc_settings: RpcSettings = args.rpc.into();
472    info!(target: "lilith", "Starting JSON-RPC server on {}", rpc_settings.listen);
473    let lilith_ = lilith.clone();
474    let rpc_task = StoppableTask::new();
475    rpc_task.clone().start(
476        listen_and_serve(rpc_settings, lilith.clone(), None, ex.clone()),
477        |res| async move {
478            match res {
479                Ok(()) | Err(Error::RpcServerStopped) => lilith_.stop_connections().await,
480                Err(e) => error!(target: "lilith", "Failed starting JSON-RPC server: {e}"),
481            }
482        },
483        Error::RpcServerStopped,
484        ex.clone(),
485    );
486
487    // Signal handling for graceful termination.
488    let (signals_handler, signals_task) = SignalHandler::new(ex)?;
489    signals_handler.wait_termination(signals_task).await?;
490    info!(target: "lilith", "Caught termination signal, cleaning up and exiting...");
491
492    info!(target: "lilith", "Stopping JSON-RPC server...");
493    rpc_task.stop().await;
494
495    // Cleanly stop p2p networks
496    for spawn in &lilith.networks {
497        info!(target: "lilith", "Stopping \"{}\" task", spawn.name);
498        refinery_tasks.get(&spawn.name).unwrap().stop().await;
499        info!(target: "lilith", "Stopping \"{}\" P2P", spawn.name);
500        spawn.p2p.stop().await;
501    }
502
503    info!(target: "lilith", "Bye!");
504    Ok(())
505}