1use std::sync::Arc;
20
21use smol::{channel, io::BufReader, Executor};
22use tinyjson::JsonValue;
23use tracing::{debug, error};
24use url::Url;
25
26use super::{
27 common::{
28 http_read_from_stream_response, http_write_to_stream, read_from_stream, write_to_stream,
29 INIT_BUF_SIZE, READ_TIMEOUT,
30 },
31 jsonrpc::*,
32};
33use crate::{
34 net::transport::{Dialer, PtStream},
35 system::{io_timeout, PublisherPtr, StoppableTask, StoppableTaskPtr},
36 Error, Result,
37};
38
39pub struct RpcClient {
41 req_send: channel::Sender<(JsonRequest, bool)>,
44 rep_recv: channel::Receiver<JsonResult>,
46 req_skip_send: channel::Sender<()>,
48 task: StoppableTaskPtr,
50}
51
52impl RpcClient {
53 pub async fn new(endpoint: Url, ex: Arc<Executor<'_>>) -> Result<Self> {
57 let (req_send, req_recv) = channel::unbounded();
59 let (rep_send, rep_recv) = channel::unbounded();
60 let (req_skip_send, req_skip_recv) = channel::unbounded();
61
62 let mut dialer_url = endpoint.clone();
64 if endpoint.scheme().starts_with("http+") {
65 let scheme = endpoint.scheme().strip_prefix("http+").unwrap();
66 let url_str = endpoint.as_str().replace(endpoint.scheme(), scheme);
67 dialer_url = url_str.parse()?;
68 }
69 let use_http = endpoint.scheme().starts_with("http+");
70
71 let dialer = Dialer::new(dialer_url, None, None).await?;
74 let stream = dialer.dial(None).await?;
75
76 let task = StoppableTask::new();
80 task.clone().start(
81 Self::reqrep_loop(use_http, stream, rep_send, req_recv, req_skip_recv),
82 |res| async move {
83 match res {
84 Ok(()) | Err(Error::RpcClientStopped) => {}
85 Err(e) => error!(target: "rpc::client", "[RPC] Client error: {e}"),
86 }
87 },
88 Error::RpcClientStopped,
89 ex.clone(),
90 );
91
92 Ok(Self { req_send, rep_recv, task, req_skip_send })
93 }
94
95 pub async fn stop(&self) {
99 self.task.stop().await;
100 }
101
102 async fn reqrep_loop(
104 use_http: bool,
105 stream: Box<dyn PtStream>,
106 rep_send: channel::Sender<JsonResult>,
107 req_recv: channel::Receiver<(JsonRequest, bool)>,
108 req_skip_recv: channel::Receiver<()>,
109 ) -> Result<()> {
110 debug!(target: "rpc::client::reqrep_loop", "Starting reqrep loop");
111
112 let (reader, mut writer) = smol::io::split(stream);
113 let mut reader = BufReader::new(reader);
114
115 loop {
116 let mut buf = Vec::with_capacity(INIT_BUF_SIZE);
117 let mut with_timeout = false;
118
119 smol::future::or(
122 async {
123 let (request, timeout) = req_recv.recv().await?;
124 with_timeout = timeout;
125
126 let request = JsonResult::Request(request);
127 if use_http {
128 http_write_to_stream(&mut writer, &request).await?;
129 } else {
130 write_to_stream(&mut writer, &request).await?;
131 }
132 Ok::<(), crate::Error>(())
133 },
134 async {
135 req_skip_recv.recv().await?;
136 Ok::<(), crate::Error>(())
137 },
138 )
139 .await?;
140
141 if with_timeout {
142 if use_http {
143 let _ = io_timeout(
144 READ_TIMEOUT,
145 http_read_from_stream_response(&mut reader, &mut buf),
146 )
147 .await?;
148 } else {
149 let _ =
150 io_timeout(READ_TIMEOUT, read_from_stream(&mut reader, &mut buf)).await?;
151 }
152 } else {
153 #[allow(clippy::collapsible_else_if)]
154 if use_http {
155 let _ = http_read_from_stream_response(&mut reader, &mut buf).await?;
156 } else {
157 let _ = read_from_stream(&mut reader, &mut buf).await?;
158 }
159 }
160
161 let val: JsonValue = String::from_utf8(buf)?.parse()?;
162 let rep = JsonResult::try_from_value(&val)?;
163 rep_send.send(rep).await?;
164 }
165 }
166
167 pub async fn request(&self, req: JsonRequest) -> Result<JsonValue> {
171 let req_id = req.id;
172 debug!(target: "rpc::client", "--> {}", req.stringify()?);
173
174 self.req_send.send((req, true)).await?;
177
178 let reply = self.rep_recv.recv().await?;
181
182 match reply {
184 JsonResult::Response(rep) | JsonResult::SubscriberWithReply(_, rep) => {
185 debug!(target: "rpc::client", "<-- {}", rep.stringify()?);
186
187 if req_id != rep.id {
189 let e = JsonError::new(ErrorCode::IdMismatch, None, rep.id);
190 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
191 }
192
193 Ok(rep.result)
194 }
195
196 JsonResult::Error(e) => {
197 debug!(target: "rpc::client", "<-- {}", e.stringify()?);
198 Err(Error::JsonRpcError((e.error.code, e.error.message)))
199 }
200
201 JsonResult::Notification(n) => {
202 debug!(target: "rpc::client", "<-- {}", n.stringify()?);
203 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
204 Err(Error::JsonRpcError((e.error.code, e.error.message)))
205 }
206
207 JsonResult::Request(r) => {
208 debug!(target: "rpc::client", "<-- {}", r.stringify()?);
209 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
210 Err(Error::JsonRpcError((e.error.code, e.error.message)))
211 }
212
213 JsonResult::Subscriber(_) => {
214 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
216 Err(Error::JsonRpcError((e.error.code, e.error.message)))
217 }
218 }
219 }
220
221 pub async fn oneshot_request(&self, req: JsonRequest) -> Result<JsonValue> {
224 let rep = match self.request(req).await {
225 Ok(v) => v,
226 Err(e) => {
227 self.stop().await;
228 return Err(e)
229 }
230 };
231
232 self.stop().await;
233 Ok(rep)
234 }
235
236 pub async fn subscribe(
239 &self,
240 req: JsonRequest,
241 publisher: PublisherPtr<JsonResult>,
242 ) -> Result<()> {
243 debug!(target: "rpc::client", "--> {}", req.stringify()?);
245 let req_id = req.id;
246
247 self.req_send.send((req, false)).await?;
250
251 loop {
253 let notification = self.rep_recv.recv().await?;
256
257 match notification {
259 JsonResult::Notification(ref n) => {
260 debug!(target: "rpc::client", "<-- {}", n.stringify()?);
261 self.req_skip_send.send(()).await?;
262 publisher.notify(notification.clone()).await;
263 continue
264 }
265
266 JsonResult::Error(e) => {
267 debug!(target: "rpc::client", "<-- {}", e.stringify()?);
268 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
269 }
270
271 JsonResult::Response(r) | JsonResult::SubscriberWithReply(_, r) => {
272 debug!(target: "rpc::client", "<-- {}", r.stringify()?);
273 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
274 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
275 }
276
277 JsonResult::Request(r) => {
278 debug!(target: "rpc::client", "<-- {}", r.stringify()?);
279 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
280 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
281 }
282
283 JsonResult::Subscriber(_) => {
284 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
286 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
287 }
288 }
289 }
290 }
291}
292
293pub struct RpcChadClient {
297 req_send: channel::Sender<JsonRequest>,
299 rep_recv: channel::Receiver<JsonResult>,
301 task: StoppableTaskPtr,
303}
304
305impl RpcChadClient {
306 pub async fn new(endpoint: Url, ex: Arc<Executor<'_>>) -> Result<Self> {
310 let (req_send, req_recv) = channel::unbounded();
312 let (rep_send, rep_recv) = channel::unbounded();
313
314 let mut dialer_url = endpoint.clone();
316 if endpoint.scheme().starts_with("http+") {
317 let scheme = endpoint.scheme().strip_prefix("http+").unwrap();
318 let url_str = endpoint.as_str().replace(endpoint.scheme(), scheme);
319 dialer_url = url_str.parse()?;
320 }
321 let use_http = endpoint.scheme().starts_with("http+");
322
323 let dialer = Dialer::new(dialer_url, None, None).await?;
326 let stream = dialer.dial(None).await?;
327
328 let task = StoppableTask::new();
332 task.clone().start(
333 Self::reqrep_loop(use_http, stream, rep_send, req_recv),
334 |res| async move {
335 match res {
336 Ok(()) | Err(Error::RpcClientStopped) => {}
337 Err(e) => error!(target: "rpc::chad_client", "[RPC] Client error: {e}"),
338 }
339 },
340 Error::RpcClientStopped,
341 ex.clone(),
342 );
343
344 Ok(Self { req_send, rep_recv, task })
345 }
346
347 pub async fn stop(&self) {
351 self.task.stop().await;
352 }
353
354 async fn reqrep_loop(
356 use_http: bool,
357 stream: Box<dyn PtStream>,
358 rep_send: channel::Sender<JsonResult>,
359 req_recv: channel::Receiver<JsonRequest>,
360 ) -> Result<()> {
361 debug!(target: "rpc::chad_client::reqrep_loop", "Starting reqrep loop");
362
363 let (reader, mut writer) = smol::io::split(stream);
364 let mut reader = BufReader::new(reader);
365
366 loop {
367 let mut buf = Vec::with_capacity(INIT_BUF_SIZE);
368
369 smol::future::or(
371 async {
372 let request = req_recv.recv().await?;
373 let request = JsonResult::Request(request);
374 if use_http {
375 http_write_to_stream(&mut writer, &request).await?;
376 } else {
377 write_to_stream(&mut writer, &request).await?;
378 }
379 Ok::<(), crate::Error>(())
380 },
381 async {
382 if use_http {
383 let _ = http_read_from_stream_response(&mut reader, &mut buf).await?;
384 } else {
385 let _ = read_from_stream(&mut reader, &mut buf).await?;
386 }
387 let val: JsonValue = String::from_utf8(buf)?.parse()?;
388 let rep = JsonResult::try_from_value(&val)?;
389 rep_send.send(rep).await?;
390 Ok::<(), crate::Error>(())
391 },
392 )
393 .await?;
394 }
395 }
396
397 pub async fn request(&self, req: JsonRequest) -> Result<JsonValue> {
401 let req_id = req.id;
403 debug!(target: "rpc::chad_client", "--> {}", req.stringify()?);
404
405 self.req_send.send(req).await?;
408
409 loop {
411 let reply = self.rep_recv.recv().await?;
414
415 match reply {
417 JsonResult::Response(rep) | JsonResult::SubscriberWithReply(_, rep) => {
418 debug!(target: "rpc::chad_client", "<-- {}", rep.stringify()?);
419
420 if req_id != rep.id {
422 debug!(target: "rpc::chad_client", "Skipping response for request {} as its not our latest({req_id})", rep.id);
423 continue
424 }
425
426 return Ok(rep.result)
427 }
428
429 JsonResult::Error(e) => {
430 debug!(target: "rpc::chad_client", "<-- {}", e.stringify()?);
431
432 if req_id != e.id {
434 debug!(target: "rpc::chad_client", "Skipping response for request {} as its not our latest({req_id})", e.id);
435 continue
436 }
437
438 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
439 }
440
441 JsonResult::Notification(n) => {
442 debug!(target: "rpc::chad_client", "<-- {}", n.stringify()?);
443 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
444 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
445 }
446
447 JsonResult::Request(r) => {
448 debug!(target: "rpc::chad_client", "<-- {}", r.stringify()?);
449 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
450 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
451 }
452
453 JsonResult::Subscriber(_) => {
454 let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
456 return Err(Error::JsonRpcError((e.error.code, e.error.message)))
457 }
458 }
459 }
460 }
461}