1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use async_trait::async_trait;
25use smol::{channel::Sender, lock::RwLock};
26use tinyjson::JsonValue;
27use tracing::{debug, error};
28
29use darkfi::{
30 impl_p2p_message,
31 net::{
32 metering::MeteringConfiguration,
33 protocol::protocol_generic::{
34 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
35 },
36 session::SESSION_DEFAULT,
37 Message, P2pPtr,
38 },
39 rpc::jsonrpc::JsonSubscriber,
40 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
41 util::{
42 encoding::base64,
43 time::{NanoTimestamp, Timestamp},
44 },
45 validator::{consensus::Proposal, ValidatorPtr},
46 Error, Result,
47};
48use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
49
50use crate::task::handle_unknown_proposals;
51
52#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
54pub struct ProposalMessage(pub Proposal);
55
56impl_p2p_message!(
61 ProposalMessage,
62 "proposal",
63 0,
64 1,
65 MeteringConfiguration {
66 threshold: 50,
67 sleep_step: 500,
68 expiry_time: NanoTimestamp::from_secs(5),
69 }
70);
71
72pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
74
75pub struct ProtocolProposalHandler {
78 proposals_handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
80 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
82 unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
85 unknown_proposals_handler: StoppableTaskPtr,
87}
88
89impl ProtocolProposalHandler {
90 pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
93 debug!(
94 target: "darkfid::proto::protocol_proposal::init",
95 "Adding ProtocolProposal to the protocol registry"
96 );
97
98 let proposals_handler =
99 ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
100 let unknown_proposals = Arc::new(RwLock::new(HashSet::new()));
101 let unknown_proposals_channels = Arc::new(RwLock::new(HashMap::new()));
102 let unknown_proposals_handler = StoppableTask::new();
103
104 Arc::new(Self {
105 proposals_handler,
106 unknown_proposals,
107 unknown_proposals_channels,
108 unknown_proposals_handler,
109 })
110 }
111
112 pub async fn start(
114 &self,
115 executor: &ExecutorPtr,
116 validator: &ValidatorPtr,
117 p2p: &P2pPtr,
118 proposals_sub: JsonSubscriber,
119 blocks_sub: JsonSubscriber,
120 ) -> Result<()> {
121 debug!(
122 target: "darkfid::proto::protocol_proposal::start",
123 "Starting ProtocolProposal handler task..."
124 );
125
126 let (sender, receiver) = smol::channel::unbounded::<(Proposal, u32)>();
128
129 self.unknown_proposals_handler.clone().start(
131 handle_unknown_proposals(receiver, self.unknown_proposals.clone(), self.unknown_proposals_channels.clone(), validator.clone(), p2p.clone(), proposals_sub.clone(), blocks_sub),
132 |res| async move {
133 match res {
134 Ok(()) | Err(Error::DetachedTaskStopped) => { }
135 Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting unknown proposals handler task: {e}"),
136 }
137 },
138 Error::DetachedTaskStopped,
139 executor.clone(),
140 );
141
142 self.proposals_handler.task.clone().start(
144 handle_receive_proposal(self.proposals_handler.clone(), sender, self.unknown_proposals.clone(), self.unknown_proposals_channels.clone(), validator.clone(), proposals_sub),
145 |res| async move {
146 match res {
147 Ok(()) | Err(Error::DetachedTaskStopped) => { }
148 Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
149 }
150 },
151 Error::DetachedTaskStopped,
152 executor.clone(),
153 );
154
155 debug!(
156 target: "darkfid::proto::protocol_proposal::start",
157 "ProtocolProposal handler task started!"
158 );
159
160 Ok(())
161 }
162
163 pub async fn stop(&self) {
165 debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
166 self.unknown_proposals_handler.stop().await;
167 self.proposals_handler.task.stop().await;
168 let mut unknown_proposals = self.unknown_proposals.write().await;
169 *unknown_proposals = HashSet::new();
170 drop(unknown_proposals);
171 debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
172 }
173}
174
175async fn handle_receive_proposal(
177 handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
178 sender: Sender<(Proposal, u32)>,
179 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
180 unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
181 validator: ValidatorPtr,
182 proposals_sub: JsonSubscriber,
183) -> Result<()> {
184 debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
185 loop {
186 let (channel, proposal) = match handler.receiver.recv().await {
188 Ok(r) => r,
189 Err(e) => {
190 debug!(
191 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
192 "recv fail: {e}"
193 );
194 continue
195 }
196 };
197
198 if !*validator.synced.read().await {
200 debug!(
201 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
202 "Node still syncing blockchain, skipping..."
203 );
204 handler.send_action(channel, ProtocolGenericAction::Skip).await;
205 continue
206 }
207
208 if unknown_proposals.read().await.is_empty() {
211 let mut unknown_proposals_channels = unknown_proposals_channels.write().await;
212 if !unknown_proposals_channels.is_empty() {
213 let now = Timestamp::current_time().inner();
214 let mut expired_channels = vec![];
215 for (channel, (_, timestamp)) in unknown_proposals_channels.iter() {
216 if now - timestamp >= 600 {
217 expired_channels.push(*channel);
218 }
219 }
220 unknown_proposals_channels.retain(|channel, _| !expired_channels.contains(channel));
221 }
222 drop(unknown_proposals_channels);
223 }
224
225 match validator.append_proposal(&proposal.0).await {
227 Ok(()) => {
228 handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
230
231 let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
233 proposals_sub.notify(vec![enc_prop].into()).await;
234
235 unknown_proposals_channels.write().await.remove(&channel);
237
238 continue
239 }
240 Err(e) => {
241 debug!(
242 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
243 "append_proposal fail: {e}",
244 );
245
246 handler.send_action(channel, ProtocolGenericAction::Skip).await;
247
248 match e {
249 Error::ExtendedChainIndexNotFound => { }
250 _ => continue,
251 }
252 }
253 };
254
255 let mut lock = unknown_proposals.write().await;
258 if lock.contains(proposal.0.hash.inner()) {
259 debug!(
260 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
261 "Proposal {} is already in our unknown proposals queue.",
262 proposal.0.hash,
263 );
264 drop(lock);
265 continue
266 }
267
268 lock.insert(proposal.0.hash.0);
270 drop(lock);
271
272 if let Err(e) = sender.send((proposal.0, channel)).await {
274 debug!(
275 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
276 "Channel {channel} send fail: {e}"
277 );
278 };
279 }
280}