1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use async_trait::async_trait;
25use smol::{channel::Sender, lock::RwLock};
26use tinyjson::JsonValue;
27use tracing::{debug, error};
28
29use darkfi::{
30 impl_p2p_message,
31 net::{
32 metering::MeteringConfiguration,
33 protocol::protocol_generic::{
34 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
35 },
36 session::SESSION_DEFAULT,
37 Message, P2pPtr,
38 },
39 rpc::jsonrpc::JsonSubscriber,
40 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
41 util::{
42 encoding::base64,
43 time::{NanoTimestamp, Timestamp},
44 },
45 validator::{consensus::Proposal, ValidatorPtr},
46 Error, Result,
47};
48use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
49
50use crate::{task::handle_unknown_proposals, DarkfiNodePtr};
51
52#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
54pub struct ProposalMessage(pub Proposal);
55
56impl_p2p_message!(
61 ProposalMessage,
62 "proposal",
63 0,
64 1,
65 MeteringConfiguration {
66 threshold: 50,
67 sleep_step: 500,
68 expiry_time: NanoTimestamp::from_secs(5),
69 }
70);
71
72pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
74
75pub struct ProtocolProposalHandler {
78 proposals_handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
80 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
82 unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
85 unknown_proposals_handler: StoppableTaskPtr,
87}
88
89impl ProtocolProposalHandler {
90 pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
93 debug!(
94 target: "darkfid::proto::protocol_proposal::init",
95 "Adding ProtocolProposal to the protocol registry"
96 );
97
98 let proposals_handler =
99 ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
100 let unknown_proposals = Arc::new(RwLock::new(HashSet::new()));
101 let unknown_proposals_channels = Arc::new(RwLock::new(HashMap::new()));
102 let unknown_proposals_handler = StoppableTask::new();
103
104 Arc::new(Self {
105 proposals_handler,
106 unknown_proposals,
107 unknown_proposals_channels,
108 unknown_proposals_handler,
109 })
110 }
111
112 pub async fn start(&self, executor: &ExecutorPtr, node: &DarkfiNodePtr) -> Result<()> {
114 debug!(
115 target: "darkfid::proto::protocol_proposal::start",
116 "Starting ProtocolProposal handler task..."
117 );
118
119 let (sender, receiver) = smol::channel::unbounded::<(Proposal, u32)>();
121
122 self.unknown_proposals_handler.clone().start(
124 handle_unknown_proposals(receiver, self.unknown_proposals.clone(), self.unknown_proposals_channels.clone(), node.clone()),
125 |res| async move {
126 match res {
127 Ok(()) | Err(Error::DetachedTaskStopped) => { }
128 Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting unknown proposals handler task: {e}"),
129 }
130 },
131 Error::DetachedTaskStopped,
132 executor.clone(),
133 );
134
135 self.proposals_handler.task.clone().start(
137 handle_receive_proposal(self.proposals_handler.clone(), sender, self.unknown_proposals.clone(), self.unknown_proposals_channels.clone(), node.validator.clone(), node.subscribers.get("proposals").unwrap().clone()),
138 |res| async move {
139 match res {
140 Ok(()) | Err(Error::DetachedTaskStopped) => { }
141 Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
142 }
143 },
144 Error::DetachedTaskStopped,
145 executor.clone(),
146 );
147
148 debug!(
149 target: "darkfid::proto::protocol_proposal::start",
150 "ProtocolProposal handler task started!"
151 );
152
153 Ok(())
154 }
155
156 pub async fn stop(&self) {
158 debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
159 self.unknown_proposals_handler.stop().await;
160 self.proposals_handler.task.stop().await;
161 let mut unknown_proposals = self.unknown_proposals.write().await;
162 *unknown_proposals = HashSet::new();
163 drop(unknown_proposals);
164 debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
165 }
166}
167
168async fn handle_receive_proposal(
170 handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
171 sender: Sender<(Proposal, u32)>,
172 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
173 unknown_proposals_channels: Arc<RwLock<HashMap<u32, (u8, u64)>>>,
174 validator: ValidatorPtr,
175 proposals_sub: JsonSubscriber,
176) -> Result<()> {
177 debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
178 loop {
179 let (channel, proposal) = match handler.receiver.recv().await {
181 Ok(r) => r,
182 Err(e) => {
183 debug!(
184 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
185 "recv fail: {e}"
186 );
187 continue
188 }
189 };
190
191 if !*validator.synced.read().await {
193 debug!(
194 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
195 "Node still syncing blockchain, skipping..."
196 );
197 handler.send_action(channel, ProtocolGenericAction::Skip).await;
198 continue
199 }
200
201 if unknown_proposals.read().await.is_empty() {
204 let mut unknown_proposals_channels = unknown_proposals_channels.write().await;
205 if !unknown_proposals_channels.is_empty() {
206 let now = Timestamp::current_time().inner();
207 let mut expired_channels = vec![];
208 for (channel, (_, timestamp)) in unknown_proposals_channels.iter() {
209 if now - timestamp >= 600 {
210 expired_channels.push(*channel);
211 }
212 }
213 unknown_proposals_channels.retain(|channel, _| !expired_channels.contains(channel));
214 }
215 drop(unknown_proposals_channels);
216 }
217
218 match validator.append_proposal(&proposal.0).await {
220 Ok(()) => {
221 handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
223
224 let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
226 proposals_sub.notify(vec![enc_prop].into()).await;
227
228 unknown_proposals_channels.write().await.remove(&channel);
230
231 continue
232 }
233 Err(e) => {
234 debug!(
235 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
236 "append_proposal fail: {e}",
237 );
238
239 handler.send_action(channel, ProtocolGenericAction::Skip).await;
240
241 match e {
242 Error::ExtendedChainIndexNotFound => { }
243 _ => continue,
244 }
245 }
246 };
247
248 let mut lock = unknown_proposals.write().await;
251 if lock.contains(proposal.0.hash.inner()) {
252 debug!(
253 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
254 "Proposal {} is already in our unknown proposals queue.",
255 proposal.0.hash,
256 );
257 drop(lock);
258 continue
259 }
260
261 lock.insert(proposal.0.hash.0);
263 drop(lock);
264
265 if let Err(e) = sender.send((proposal.0, channel)).await {
267 debug!(
268 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
269 "Channel {channel} send fail: {e}"
270 );
271 };
272 }
273}