darkfid/proto/
protocol_tx.rs1use std::sync::Arc;
20
21use tinyjson::JsonValue;
22use tracing::{debug, error};
23
24use darkfi::{
25 net::{
26 protocol::protocol_generic::{
27 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
28 },
29 session::SESSION_DEFAULT,
30 P2pPtr,
31 },
32 rpc::jsonrpc::JsonSubscriber,
33 system::ExecutorPtr,
34 tx::Transaction,
35 util::encoding::base64,
36 validator::ValidatorPtr,
37 Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::registry::DarkfiMinersRegistryStatePtr;
42
43pub type ProtocolTxHandlerPtr = Arc<ProtocolTxHandler>;
45
46pub struct ProtocolTxHandler {
48 handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
50}
51
52impl ProtocolTxHandler {
53 pub async fn init(p2p: &P2pPtr) -> ProtocolTxHandlerPtr {
56 debug!(
57 target: "darkfid::proto::protocol_tx::init",
58 "Adding ProtocolTx to the protocol registry"
59 );
60
61 let handler = ProtocolGenericHandler::new(p2p, "ProtocolTx", SESSION_DEFAULT).await;
62
63 Arc::new(Self { handler })
64 }
65
66 pub async fn start(
68 &self,
69 executor: &ExecutorPtr,
70 validator: &ValidatorPtr,
71 registry_state: &DarkfiMinersRegistryStatePtr,
72 subscriber: JsonSubscriber,
73 ) -> Result<()> {
74 debug!(
75 target: "darkfid::proto::protocol_tx::start",
76 "Starting ProtocolTx handler task..."
77 );
78
79 self.handler.task.clone().start(
80 handle_receive_tx(self.handler.clone(), validator.clone(), registry_state.clone(), subscriber),
81 |res| async move {
82 match res {
83 Ok(()) | Err(Error::DetachedTaskStopped) => { }
84 Err(e) => error!(target: "darkfid::proto::protocol_tx::start", "Failed starting ProtocolTx handler task: {e}"),
85 }
86 },
87 Error::DetachedTaskStopped,
88 executor.clone(),
89 );
90
91 debug!(
92 target: "darkfid::proto::protocol_tx::start",
93 "ProtocolTx handler task started!"
94 );
95
96 Ok(())
97 }
98
99 pub async fn stop(&self) {
101 debug!(target: "darkfid::proto::protocol_tx::stop", "Terminating ProtocolTx handler task...");
102 self.handler.task.stop().await;
103 debug!(target: "darkfid::proto::protocol_tx::stop", "ProtocolTx handler task terminated!");
104 }
105}
106
107async fn handle_receive_tx(
109 handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
110 validator: ValidatorPtr,
111 registry_state: DarkfiMinersRegistryStatePtr,
112 subscriber: JsonSubscriber,
113) -> Result<()> {
114 debug!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "START");
115 loop {
116 let (channel, tx) = match handler.receiver.recv().await {
118 Ok(r) => r,
119 Err(e) => {
120 debug!(
121 target: "darkfid::proto::protocol_tx::handle_receive_tx",
122 "recv fail: {e}"
123 );
124 continue
125 }
126 };
127
128 let mut validator = validator.write().await;
130 if !validator.synced {
131 debug!(
132 target: "darkfid::proto::protocol_tx::handle_receive_tx",
133 "Node still syncing blockchain, skipping..."
134 );
135 handler.send_action(channel, ProtocolGenericAction::Skip).await;
136 continue
137 }
138
139 let result = validator.append_tx(&tx, true).await;
141
142 if let Err(e) = validator
144 .consensus
145 .purge_unreferenced_trees(&mut registry_state.read().await.new_trees())
146 .await
147 {
148 error!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "Purging unreferenced contract trees from the database failed: {e}");
149 handler.send_action(channel, ProtocolGenericAction::Skip).await;
150 continue
151 }
152
153 if let Err(e) = result {
155 debug!(
156 target: "darkfid::proto::protocol_tx::handle_receive_tx",
157 "append_tx fail: {e}"
158 );
159 handler.send_action(channel, ProtocolGenericAction::Skip).await;
160 continue
161 }
162
163 handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
165
166 let encoded_tx = JsonValue::String(base64::encode(&serialize_async(&tx).await));
168 subscriber.notify(vec![encoded_tx].into()).await;
169 }
170}