darkfi/net/session/
mod.rs1use std::{
20 sync::{Arc, Weak},
21 time::UNIX_EPOCH,
22};
23
24use async_trait::async_trait;
25use smol::Executor;
26use tracing::{debug, error, trace};
27
28use super::{
29 channel::ChannelPtr,
30 dnet::{self, dnetev, DnetEvent},
31 hosts::HostColor,
32 p2p::P2pPtr,
33 protocol::ProtocolVersion,
34};
35use crate::{system::Subscription, util::logger::verbose, Error, Result};
36
37pub mod inbound_session;
38pub use inbound_session::{InboundSession, InboundSessionPtr};
39pub mod manual_session;
40pub use manual_session::{ManualSession, ManualSessionPtr};
41pub mod outbound_session;
42pub use outbound_session::{OutboundSession, OutboundSessionPtr};
43pub mod seedsync_session;
44pub use seedsync_session::{SeedSyncSession, SeedSyncSessionPtr};
45pub mod refine_session;
46pub use refine_session::{RefineSession, RefineSessionPtr};
47pub mod direct_session;
48pub use direct_session::{DirectSession, DirectSessionPtr};
49
50pub type SessionBitFlag = u32;
52pub const SESSION_INBOUND: SessionBitFlag = 0b000001;
53pub const SESSION_OUTBOUND: SessionBitFlag = 0b000010;
54pub const SESSION_MANUAL: SessionBitFlag = 0b000100;
55pub const SESSION_SEED: SessionBitFlag = 0b001000;
56pub const SESSION_REFINE: SessionBitFlag = 0b010000;
57pub const SESSION_DIRECT: SessionBitFlag = 0b100000;
58
59pub const SESSION_DEFAULT: SessionBitFlag = 0b100111;
60pub const SESSION_ALL: SessionBitFlag = 0b111111;
61
62pub type SessionWeakPtr = Weak<dyn Session + Send + Sync + 'static>;
63
64pub async fn remove_sub_on_stop(
67 p2p: P2pPtr,
68 channel: ChannelPtr,
69 type_id: SessionBitFlag,
70 stop_sub: Subscription<Error>,
71) {
72 debug!(target: "net::session::remove_sub_on_stop", "[START]");
73 let hosts = p2p.hosts();
74 let addr = channel.address();
75
76 stop_sub.receive().await;
77
78 debug!(
79 target: "net::session::remove_sub_on_stop",
80 "Received stop event. Removing channel {}",
81 channel.display_address()
82 );
83
84 if type_id & (SESSION_OUTBOUND | SESSION_DIRECT) != 0 {
86 debug!(
87 target: "net::session::remove_sub_on_stop",
88 "Downgrading {}",
89 channel.display_address()
90 );
91
92 match hosts.fetch_last_seen(addr) {
96 Some(last_seen) => {
97 if let Err(e) = hosts.move_host(addr, last_seen, HostColor::Grey).await {
98 error!(target: "net::session::remove_sub_on_stop",
99 "Failed to move host {} to Greylist! Err={e}", channel.display_address());
100 }
101 }
102 None => {
103 error!(target: "net::session::remove_sub_on_stop",
104 "Failed to fetch last seen for {}", channel.display_address());
105 }
106 }
107 }
108
109 if type_id & SESSION_REFINE == 0 {
114 if let Err(e) = hosts.unregister(channel.address()) {
115 error!(target: "net::session::remove_sub_on_stop", "Error while unregistering addr={}, err={e}", channel.display_address());
116 }
117 }
118
119 if type_id & SESSION_DIRECT != 0 {
120 dnetev!(p2p.session_direct(), DirectDisconnected, {
121 connect_addr: channel.info.connect_addr.clone(),
122 err: "Channel stopped".to_string()
123 });
124 verbose!(
125 target: "net::direct_session",
126 "[P2P] Direct outbound disconnected [{}]",
127 channel.display_address()
128 );
129 }
130
131 if !p2p.is_connected() {
132 hosts.disconnect_publisher.notify(Error::NetworkNotConnected).await;
133 }
134 debug!(target: "net::session::remove_sub_on_stop", "[END]");
135}
136
137#[async_trait]
141pub trait Session: Sync {
142 async fn register_channel(
151 &self,
152 channel: ChannelPtr,
153 executor: Arc<Executor<'_>>,
154 ) -> Result<()> {
155 trace!(target: "net::session::register_channel", "[START]");
156
157 let p2p = self.p2p();
162 let protocols =
163 p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await;
164
165 let protocol_version = ProtocolVersion::new(channel.clone(), p2p.settings().clone()).await;
167 debug!(
168 target: "net::session::register_channel",
169 "Performing handshake protocols {}", channel.clone().display_address(),
170 );
171
172 let handshake_task =
173 self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
174
175 channel.clone().start(executor.clone());
177
178 match handshake_task.await {
180 Ok(()) => {
181 debug!(target: "net::session::register_channel",
182 "Handshake successful {}", channel.clone().display_address());
183 }
184 Err(e) => {
185 debug!(target: "net::session::register_channel",
186 "Handshake error {e} {}", channel.clone().display_address());
187
188 return Err(e)
189 }
190 }
191
192 debug!(target: "net::session::register_channel", "Session handshake complete");
194 debug!(target: "net::session::register_channel", "Activating remaining protocols");
195
196 for protocol in protocols {
199 protocol.start(executor.clone()).await?;
200 }
201
202 trace!(target: "net::session::register_channel", "[END]");
203
204 Ok(())
205 }
206
207 async fn perform_handshake_protocols(
211 &self,
212 protocol_version: Arc<ProtocolVersion>,
213 channel: ChannelPtr,
214 executor: Arc<Executor<'_>>,
215 ) -> Result<()> {
216 let stop_sub = channel.clone().subscribe_stop().await?;
218
219 match protocol_version.run(executor.clone()).await {
221 Ok(()) => {
222 if self.type_id() & (SESSION_OUTBOUND | SESSION_DIRECT) != 0 {
224 debug!(
225 target: "net::session::perform_handshake_protocols",
226 "Upgrading {}", channel.display_address(),
227 );
228
229 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
230
231 self.p2p()
232 .hosts()
233 .move_host(channel.address(), last_seen, HostColor::Gold)
234 .await?;
235 }
236
237 self.p2p().hosts().register_channel(channel.clone()).await;
239
240 executor
242 .spawn(remove_sub_on_stop(self.p2p(), channel, self.type_id(), stop_sub))
243 .detach();
244
245 Ok(())
247 }
248 Err(e) => return Err(e),
249 }
250 }
251
252 fn p2p(&self) -> P2pPtr;
254
255 fn type_id(&self) -> SessionBitFlag;
257
258 async fn reload(self: Arc<Self>);
260}