darkfi/rpc/
client.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use smol::{channel, io::BufReader, Executor};
22use tinyjson::JsonValue;
23use tracing::{debug, error};
24use url::Url;
25
26use super::{
27    common::{
28        http_read_from_stream_response, http_write_to_stream, read_from_stream, write_to_stream,
29        INIT_BUF_SIZE, READ_TIMEOUT,
30    },
31    jsonrpc::*,
32};
33use crate::{
34    net::transport::{Dialer, PtStream},
35    system::{io_timeout, PublisherPtr, StoppableTask, StoppableTaskPtr},
36    Error, Result,
37};
38
39/// JSON-RPC client implementation using asynchronous channels.
40pub struct RpcClient {
41    /// The channel used to send JSON-RPC request objects.
42    /// The `bool` marks if we should have a reply read timeout.
43    req_send: channel::Sender<(JsonRequest, bool)>,
44    /// The channel used to read the JSON-RPC response object.
45    rep_recv: channel::Receiver<JsonResult>,
46    /// The channel used to skip waiting for a JSON-RPC client request
47    req_skip_send: channel::Sender<()>,
48    /// The stoppable task pointer, used on [`RpcClient::stop()`]
49    task: StoppableTaskPtr,
50}
51
52impl RpcClient {
53    /// Instantiate a new JSON-RPC client that connects to the given endpoint.
54    /// The function takes an `Executor` object, which is needed to start the
55    /// `StoppableTask` which represents the client-server connection.
56    pub async fn new(endpoint: Url, ex: Arc<Executor<'_>>) -> Result<Self> {
57        // Instantiate communication channels
58        let (req_send, req_recv) = channel::unbounded();
59        let (rep_send, rep_recv) = channel::unbounded();
60        let (req_skip_send, req_skip_recv) = channel::unbounded();
61
62        // Figure out if we're using HTTP and rewrite the URL accordingly.
63        let mut dialer_url = endpoint.clone();
64        if endpoint.scheme().starts_with("http+") {
65            let scheme = endpoint.scheme().strip_prefix("http+").unwrap();
66            let url_str = endpoint.as_str().replace(endpoint.scheme(), scheme);
67            dialer_url = url_str.parse()?;
68        }
69        let use_http = endpoint.scheme().starts_with("http+");
70
71        // Instantiate Dialer and dial the server
72        // TODO: Could add a timeout here
73        let dialer = Dialer::new(dialer_url, None, None).await?;
74        let stream = dialer.dial(None).await?;
75
76        // Create the StoppableTask running the request-reply loop.
77        // This represents the actual connection, which can be stopped
78        // using `RpcClient::stop()`.
79        let task = StoppableTask::new();
80        task.clone().start(
81            Self::reqrep_loop(use_http, stream, rep_send, req_recv, req_skip_recv),
82            |res| async move {
83                match res {
84                    Ok(()) | Err(Error::RpcClientStopped) => {}
85                    Err(e) => error!(target: "rpc::client", "[RPC] Client error: {e}"),
86                }
87            },
88            Error::RpcClientStopped,
89            ex.clone(),
90        );
91
92        Ok(Self { req_send, rep_recv, task, req_skip_send })
93    }
94
95    /// Stop the JSON-RPC client. This will trigger `stop()` on the inner
96    /// `StoppableTaskPtr` resulting in stopping the internal reqrep loop
97    /// and therefore closing the connection.
98    pub async fn stop(&self) {
99        self.task.stop().await;
100    }
101
102    /// Internal function that loops on a given stream and multiplexes the data
103    async fn reqrep_loop(
104        use_http: bool,
105        stream: Box<dyn PtStream>,
106        rep_send: channel::Sender<JsonResult>,
107        req_recv: channel::Receiver<(JsonRequest, bool)>,
108        req_skip_recv: channel::Receiver<()>,
109    ) -> Result<()> {
110        debug!(target: "rpc::client::reqrep_loop", "Starting reqrep loop");
111
112        let (reader, mut writer) = smol::io::split(stream);
113        let mut reader = BufReader::new(reader);
114
115        loop {
116            let mut buf = Vec::with_capacity(INIT_BUF_SIZE);
117            let mut with_timeout = false;
118
119            // Read an incoming client request, or skip it if triggered from
120            // a JSONRPC notification subscriber
121            smol::future::or(
122                async {
123                    let (request, timeout) = req_recv.recv().await?;
124                    with_timeout = timeout;
125
126                    let request = JsonResult::Request(request);
127                    if use_http {
128                        http_write_to_stream(&mut writer, &request).await?;
129                    } else {
130                        write_to_stream(&mut writer, &request).await?;
131                    }
132                    Ok::<(), crate::Error>(())
133                },
134                async {
135                    req_skip_recv.recv().await?;
136                    Ok::<(), crate::Error>(())
137                },
138            )
139            .await?;
140
141            if with_timeout {
142                if use_http {
143                    let _ = io_timeout(
144                        READ_TIMEOUT,
145                        http_read_from_stream_response(&mut reader, &mut buf),
146                    )
147                    .await?;
148                } else {
149                    let _ =
150                        io_timeout(READ_TIMEOUT, read_from_stream(&mut reader, &mut buf)).await?;
151                }
152            } else {
153                #[allow(clippy::collapsible_else_if)]
154                if use_http {
155                    let _ = http_read_from_stream_response(&mut reader, &mut buf).await?;
156                } else {
157                    let _ = read_from_stream(&mut reader, &mut buf).await?;
158                }
159            }
160
161            let val: JsonValue = String::from_utf8(buf)?.parse()?;
162            let rep = JsonResult::try_from_value(&val)?;
163            rep_send.send(rep).await?;
164        }
165    }
166
167    /// Send a given JSON-RPC request over the instantiated client and
168    /// return a possible result. If the response is an error, returns
169    /// a `JsonRpcError`.
170    pub async fn request(&self, req: JsonRequest) -> Result<JsonValue> {
171        let req_id = req.id;
172        debug!(target: "rpc::client", "--> {}", req.stringify()?);
173
174        // If the connection is closed, the sender will get an error
175        // for sending to a closed channel.
176        self.req_send.send((req, true)).await?;
177
178        // If the connection is closed, the receiver will get an error
179        // for waiting on a closed channel.
180        let reply = self.rep_recv.recv().await?;
181
182        // Handle the response
183        match reply {
184            JsonResult::Response(rep) | JsonResult::SubscriberWithReply(_, rep) => {
185                debug!(target: "rpc::client", "<-- {}", rep.stringify()?);
186
187                // Check if the IDs match
188                if req_id != rep.id {
189                    let e = JsonError::new(ErrorCode::IdMismatch, None, rep.id);
190                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
191                }
192
193                Ok(rep.result)
194            }
195
196            JsonResult::Error(e) => {
197                debug!(target: "rpc::client", "<-- {}", e.stringify()?);
198                Err(Error::JsonRpcError((e.error.code, e.error.message)))
199            }
200
201            JsonResult::Notification(n) => {
202                debug!(target: "rpc::client", "<-- {}", n.stringify()?);
203                let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
204                Err(Error::JsonRpcError((e.error.code, e.error.message)))
205            }
206
207            JsonResult::Request(r) => {
208                debug!(target: "rpc::client", "<-- {}", r.stringify()?);
209                let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
210                Err(Error::JsonRpcError((e.error.code, e.error.message)))
211            }
212
213            JsonResult::Subscriber(_) => {
214                // When?
215                let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
216                Err(Error::JsonRpcError((e.error.code, e.error.message)))
217            }
218        }
219    }
220
221    /// Oneshot send a given JSON-RPC request over the instantiated client
222    /// and immediately close the channels upon receiving a reply.
223    pub async fn oneshot_request(&self, req: JsonRequest) -> Result<JsonValue> {
224        let rep = match self.request(req).await {
225            Ok(v) => v,
226            Err(e) => {
227                self.stop().await;
228                return Err(e)
229            }
230        };
231
232        self.stop().await;
233        Ok(rep)
234    }
235
236    /// Listen instantiated client for notifications.
237    /// NOTE: Subscriber listeners must perform response handling.
238    pub async fn subscribe(
239        &self,
240        req: JsonRequest,
241        publisher: PublisherPtr<JsonResult>,
242    ) -> Result<()> {
243        // Perform initial request
244        debug!(target: "rpc::client", "--> {}", req.stringify()?);
245        let req_id = req.id;
246
247        // If the connection is closed, the sender will get an error for
248        // sending to a closed channel.
249        self.req_send.send((req, false)).await?;
250
251        // Now loop and listen to notifications
252        loop {
253            // If the connection is closed, the receiver will get an error
254            // for waiting on a closed channel.
255            let notification = self.rep_recv.recv().await?;
256
257            // Handle the response
258            match notification {
259                JsonResult::Notification(ref n) => {
260                    debug!(target: "rpc::client", "<-- {}", n.stringify()?);
261                    self.req_skip_send.send(()).await?;
262                    publisher.notify(notification.clone()).await;
263                    continue
264                }
265
266                JsonResult::Error(e) => {
267                    debug!(target: "rpc::client", "<-- {}", e.stringify()?);
268                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
269                }
270
271                JsonResult::Response(r) | JsonResult::SubscriberWithReply(_, r) => {
272                    debug!(target: "rpc::client", "<-- {}", r.stringify()?);
273                    let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
274                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
275                }
276
277                JsonResult::Request(r) => {
278                    debug!(target: "rpc::client", "<-- {}", r.stringify()?);
279                    let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
280                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
281                }
282
283                JsonResult::Subscriber(_) => {
284                    // When?
285                    let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
286                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
287                }
288            }
289        }
290    }
291}
292
293/// Highly experimental JSON-RPC client implementation using asynchronous channels,
294/// with each new request canceling waiting for the previous one. All requests are
295/// executed without a timeout.
296pub struct RpcChadClient {
297    /// The channel used to send JSON-RPC request objects
298    req_send: channel::Sender<JsonRequest>,
299    /// The channel used to read the JSON-RPC response object
300    rep_recv: channel::Receiver<JsonResult>,
301    /// The stoppable task pointer, used on [`RpcChadClient::stop()`]
302    task: StoppableTaskPtr,
303}
304
305impl RpcChadClient {
306    /// Instantiate a new JSON-RPC client that connects to the given endpoint.
307    /// The function takes an `Executor` object, which is needed to start the
308    /// `StoppableTask` which represents the client-server connection.
309    pub async fn new(endpoint: Url, ex: Arc<Executor<'_>>) -> Result<Self> {
310        // Instantiate communication channels
311        let (req_send, req_recv) = channel::unbounded();
312        let (rep_send, rep_recv) = channel::unbounded();
313
314        // Figure out if we're using HTTP and rewrite the URL accordingly.
315        let mut dialer_url = endpoint.clone();
316        if endpoint.scheme().starts_with("http+") {
317            let scheme = endpoint.scheme().strip_prefix("http+").unwrap();
318            let url_str = endpoint.as_str().replace(endpoint.scheme(), scheme);
319            dialer_url = url_str.parse()?;
320        }
321        let use_http = endpoint.scheme().starts_with("http+");
322
323        // Instantiate Dialer and dial the server
324        // TODO: Could add a timeout here
325        let dialer = Dialer::new(dialer_url, None, None).await?;
326        let stream = dialer.dial(None).await?;
327
328        // Create the StoppableTask running the request-reply loop.
329        // This represents the actual connection, which can be stopped
330        // using `RpcChadClient::stop()`.
331        let task = StoppableTask::new();
332        task.clone().start(
333            Self::reqrep_loop(use_http, stream, rep_send, req_recv),
334            |res| async move {
335                match res {
336                    Ok(()) | Err(Error::RpcClientStopped) => {}
337                    Err(e) => error!(target: "rpc::chad_client", "[RPC] Client error: {e}"),
338                }
339            },
340            Error::RpcClientStopped,
341            ex.clone(),
342        );
343
344        Ok(Self { req_send, rep_recv, task })
345    }
346
347    /// Stop the JSON-RPC client. This will trigger `stop()` on the inner
348    /// `StoppableTaskPtr` resulting in stopping the internal reqrep loop
349    /// and therefore closing the connection.
350    pub async fn stop(&self) {
351        self.task.stop().await;
352    }
353
354    /// Internal function that loops on a given stream and multiplexes the data
355    async fn reqrep_loop(
356        use_http: bool,
357        stream: Box<dyn PtStream>,
358        rep_send: channel::Sender<JsonResult>,
359        req_recv: channel::Receiver<JsonRequest>,
360    ) -> Result<()> {
361        debug!(target: "rpc::chad_client::reqrep_loop", "Starting reqrep loop");
362
363        let (reader, mut writer) = smol::io::split(stream);
364        let mut reader = BufReader::new(reader);
365
366        loop {
367            let mut buf = Vec::with_capacity(INIT_BUF_SIZE);
368
369            // Read an incoming client request, or wait for a response
370            smol::future::or(
371                async {
372                    let request = req_recv.recv().await?;
373                    let request = JsonResult::Request(request);
374                    if use_http {
375                        http_write_to_stream(&mut writer, &request).await?;
376                    } else {
377                        write_to_stream(&mut writer, &request).await?;
378                    }
379                    Ok::<(), crate::Error>(())
380                },
381                async {
382                    if use_http {
383                        let _ = http_read_from_stream_response(&mut reader, &mut buf).await?;
384                    } else {
385                        let _ = read_from_stream(&mut reader, &mut buf).await?;
386                    }
387                    let val: JsonValue = String::from_utf8(buf)?.parse()?;
388                    let rep = JsonResult::try_from_value(&val)?;
389                    rep_send.send(rep).await?;
390                    Ok::<(), crate::Error>(())
391                },
392            )
393            .await?;
394        }
395    }
396
397    /// Send a given JSON-RPC request over the instantiated client and
398    /// return a possible result. If the response is an error, returns
399    /// a `JsonRpcError`.
400    pub async fn request(&self, req: JsonRequest) -> Result<JsonValue> {
401        // Perform request
402        let req_id = req.id;
403        debug!(target: "rpc::chad_client", "--> {}", req.stringify()?);
404
405        // If the connection is closed, the sender will get an error
406        // for sending to a closed channel.
407        self.req_send.send(req).await?;
408
409        // Now loop until we receive our response
410        loop {
411            // If the connection is closed, the receiver will get an error
412            // for waiting on a closed channel.
413            let reply = self.rep_recv.recv().await?;
414
415            // Handle the response
416            match reply {
417                JsonResult::Response(rep) | JsonResult::SubscriberWithReply(_, rep) => {
418                    debug!(target: "rpc::chad_client", "<-- {}", rep.stringify()?);
419
420                    // Check if the IDs match
421                    if req_id != rep.id {
422                        debug!(target: "rpc::chad_client", "Skipping response for request {} as its not our latest({req_id})", rep.id);
423                        continue
424                    }
425
426                    return Ok(rep.result)
427                }
428
429                JsonResult::Error(e) => {
430                    debug!(target: "rpc::chad_client", "<-- {}", e.stringify()?);
431
432                    // Check if the IDs match
433                    if req_id != e.id {
434                        debug!(target: "rpc::chad_client", "Skipping response for request {} as its not our latest({req_id})", e.id);
435                        continue
436                    }
437
438                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
439                }
440
441                JsonResult::Notification(n) => {
442                    debug!(target: "rpc::chad_client", "<-- {}", n.stringify()?);
443                    let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
444                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
445                }
446
447                JsonResult::Request(r) => {
448                    debug!(target: "rpc::chad_client", "<-- {}", r.stringify()?);
449                    let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
450                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
451                }
452
453                JsonResult::Subscriber(_) => {
454                    // When?
455                    let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
456                    return Err(Error::JsonRpcError((e.error.code, e.error.message)))
457                }
458            }
459        }
460    }
461}