darkfi/net/session/
inbound_session.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Inbound connections session. Manages the creation of inbound sessions.
20//! Used to create an inbound session and start and stop the session.
21//!
22//! Class consists of 3 pointers: a weak pointer to the p2p parent class,
23//! an acceptor pointer, and a stoppable task pointer. Using a weak pointer
24//! to P2P allows us to avoid circular dependencies.
25
26use std::sync::{Arc, Weak};
27
28use async_trait::async_trait;
29use smol::{lock::Mutex, Executor};
30use tracing::{debug, error, warn};
31use url::Url;
32
33use super::{
34    super::{
35        acceptor::{Acceptor, AcceptorPtr},
36        channel::ChannelPtr,
37        dnet::{self, dnetev, DnetEvent},
38        p2p::{P2p, P2pPtr},
39    },
40    Session, SessionBitFlag, SESSION_INBOUND,
41};
42use crate::{
43    system::{StoppableTask, StoppableTaskPtr, Subscription},
44    util::logger::verbose,
45    Error, Result,
46};
47
48pub type InboundSessionPtr = Arc<InboundSession>;
49
50/// Defines inbound connections session
51pub struct InboundSession {
52    pub(in crate::net) p2p: Weak<P2p>,
53    acceptors: Mutex<Vec<AcceptorPtr>>,
54    accept_tasks: Mutex<Vec<StoppableTaskPtr>>,
55}
56
57impl InboundSession {
58    /// Create a new inbound session
59    pub fn new(p2p: Weak<P2p>) -> InboundSessionPtr {
60        Arc::new(Self {
61            p2p,
62            acceptors: Mutex::new(Vec::new()),
63            accept_tasks: Mutex::new(Vec::new()),
64        })
65    }
66
67    /// Starts the inbound session. Begins by accepting connections and fails
68    /// if the addresses are not configured. Then runs the channel subscription
69    /// loop.
70    pub async fn start(self: Arc<Self>) -> Result<()> {
71        let inbound_addrs = self.p2p().settings().read().await.inbound_addrs.clone();
72
73        if inbound_addrs.is_empty() {
74            verbose!(target: "net::inbound_session", "[P2P] Not configured for inbound connections.");
75            return Ok(())
76        }
77
78        let ex = self.p2p().executor();
79
80        // Activate mutex lock on accept tasks.
81        let mut accept_tasks = self.accept_tasks.lock().await;
82
83        for (index, accept_addr) in inbound_addrs.iter().enumerate() {
84            // First initialize an Acceptor and its Subscriber.
85            let parent = Arc::downgrade(&self);
86            let acceptor = Acceptor::new(parent);
87
88            // Now start the Subscriber. The Subscriber will return a Channel once it has been
89            // prepared by the Acceptor.
90            let channel_sub = acceptor.clone().subscribe().await;
91
92            // Then start listening for a Channel returned by the Subscriber. Call setup_channel()
93            // to register the Channel when it has been received.
94            let task = StoppableTask::new();
95            task.clone().start(
96                self.clone().channel_sub_loop(channel_sub, index, ex.clone()),
97                // Ignore stop handler
98                |_| async {},
99                Error::NetworkServiceStopped,
100                ex.clone(),
101            );
102
103            accept_tasks.push(task);
104
105            // Finally, run the Acceptor to start accepting inbound connections. Only when
106            // the Subscriber has been set up can we safely do this.
107            self.clone()
108                .start_accept_session(index, accept_addr.clone(), acceptor, ex.clone())
109                .await?;
110        }
111
112        Ok(())
113    }
114
115    /// Stops the inbound session.
116    pub async fn stop(&self) {
117        if self.p2p().settings().read().await.inbound_addrs.is_empty() {
118            verbose!(target: "net::inbound_session", "[P2P] Stopping inbound session.");
119            return
120        }
121
122        let acceptors = &*self.acceptors.lock().await;
123        for acceptor in acceptors {
124            acceptor.stop().await;
125        }
126
127        let accept_tasks = &*self.accept_tasks.lock().await;
128        for accept_task in accept_tasks {
129            accept_task.stop().await;
130        }
131    }
132
133    /// Start accepting connections for inbound session.
134    async fn start_accept_session(
135        self: Arc<Self>,
136        index: usize,
137        accept_addr: Url,
138        acceptor: AcceptorPtr,
139        ex: Arc<Executor<'_>>,
140    ) -> Result<()> {
141        verbose!(target: "net::inbound_session", "[P2P] Starting Inbound session #{index} on {accept_addr}");
142        // Start listener
143        let result = acceptor.clone().start(accept_addr, ex).await;
144        if let Err(e) = &result {
145            error!(target: "net::inbound_session", "[P2P] Error starting listener #{index}: {e}");
146            acceptor.stop().await;
147        } else {
148            self.acceptors.lock().await.push(acceptor);
149        }
150
151        result
152    }
153
154    /// Wait for all new channels created by the acceptor and call setup_channel() on them.
155    async fn channel_sub_loop(
156        self: Arc<Self>,
157        channel_sub: Subscription<Result<ChannelPtr>>,
158        index: usize,
159        ex: Arc<Executor<'_>>,
160    ) -> Result<()> {
161        loop {
162            let channel = channel_sub.receive().await?;
163
164            // Spawn a detached task to process the channel.
165            // This will just perform the channel setup then exit.
166            ex.spawn(self.clone().setup_channel(index, channel, ex.clone())).detach();
167        }
168    }
169
170    /// Registers the channel. First performs a network handshake and starts the channel.
171    /// Then starts sending keep-alive and address messages across the channel.
172    async fn setup_channel(
173        self: Arc<Self>,
174        index: usize,
175        channel: ChannelPtr,
176        ex: Arc<Executor<'_>>,
177    ) {
178        verbose!(
179             target: "net::inbound_session::setup_channel",
180             "[P2P] Connected Inbound #{index} [{}]", channel.display_address()
181        );
182
183        dnetev!(self, InboundConnected, {
184            addr: channel.info.connect_addr.clone(),
185            channel_id: channel.info.id,
186        });
187
188        let stop_sub = channel.subscribe_stop().await;
189
190        match self.register_channel(channel.clone(), ex.clone()).await {
191            Ok(()) => {
192                if let Ok(stop_sub) = stop_sub {
193                    // Wait for a stop signal, then cleanup.
194                    stop_sub.receive().await;
195
196                    debug!(
197                        target: "net::inbound_session::setup_channel",
198                        "Received stop_sub, channel removed from P2P",
199                    );
200                }
201            }
202            Err(e) => {
203                warn!(
204                    target: "net::inbound_session::setup_channel",
205                    "Channel setup failed! Err={e}"
206                );
207            }
208        }
209
210        dnetev!(self, InboundDisconnected, {
211            addr: channel.info.connect_addr.clone(),
212            channel_id: channel.info.id,
213        });
214    }
215}
216
217#[async_trait]
218impl Session for InboundSession {
219    fn p2p(&self) -> P2pPtr {
220        self.p2p.upgrade().unwrap()
221    }
222
223    fn type_id(&self) -> SessionBitFlag {
224        SESSION_INBOUND
225    }
226
227    async fn reload(self: Arc<Self>) {}
228}