darkfi/rpc/
common.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{io, time::Duration};
20
21use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
22use tracing::error;
23
24use super::jsonrpc::*;
25use crate::net::transport::PtStream;
26
27pub(super) const INIT_BUF_SIZE: usize = 4096; // 4K
28pub(super) const MAX_BUF_SIZE: usize = 1024 * 1024 * 16; // 16M
29pub(super) const READ_TIMEOUT: Duration = Duration::from_secs(30);
30
31/// Internal read function that reads from the active stream into a buffer.
32/// Performs HTTP POST request parsing. Returns the request body length.
33pub(super) async fn http_read_from_stream_request(
34    reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
35    buf: &mut Vec<u8>,
36) -> io::Result<usize> {
37    let mut total_read = 0;
38
39    // Intermediate buffer we use to read byte-by-byte.
40    let mut tmpbuf = [0_u8];
41
42    while total_read < MAX_BUF_SIZE {
43        buf.resize(total_read + INIT_BUF_SIZE, 0u8);
44
45        match reader.read(&mut tmpbuf).await {
46            Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
47            Ok(0) => break, // Finished reading
48            Ok(_) => {
49                // Copy the read byte to the destination buffer.
50                buf[total_read] = tmpbuf[0];
51                total_read += 1;
52
53                // In HTTP, when we reach '\r\n\r\n' we know we've read the headers.
54                // The rest is the body. Headers should contain Content-Length which
55                // tells us the remaining amount of bytes to read.
56                if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
57                {
58                    break
59                }
60            }
61
62            Err(e) => return Err(e),
63        }
64    }
65
66    // Here we parse the HTTP for correctness and find Content-Length
67    let mut headers = [httparse::EMPTY_HEADER; 8];
68    let mut req = httparse::Request::new(&mut headers);
69    match req.parse(buf) {
70        Ok(httparse::Status::Complete(_)) => { /* Do nothing */ }
71        Ok(httparse::Status::Partial) => {
72            error!("[RPC] Failed parsing HTTP request: partial headers");
73            return Err(io::ErrorKind::InvalidData.into())
74        }
75        Err(e) => {
76            error!("[RPC] Failed parsing HTTP request: {e}");
77            return Err(io::ErrorKind::InvalidData.into())
78        }
79    }
80
81    let mut content_length: usize = 0;
82    for header in headers {
83        if header.name.to_lowercase() == "content-length" {
84            let s = String::from_utf8_lossy(header.value);
85            content_length = match s.parse() {
86                Ok(v) => v,
87                Err(_) => return Err(io::ErrorKind::InvalidData.into()),
88            };
89        }
90    }
91
92    if content_length == 0 || content_length > MAX_BUF_SIZE {
93        return Err(io::ErrorKind::InvalidData.into())
94    }
95
96    // Now we know the request body size. Read it into the buffer.
97    buf.clear();
98    buf.resize(content_length, 0_u8);
99    reader.read(buf).await?;
100
101    assert!(buf.len() == content_length);
102    Ok(content_length)
103}
104
105/// Internal read function that reads from the active stream into a buffer.
106/// Performs HTTP POST response parsing. Returns the response body length.
107pub(super) async fn http_read_from_stream_response(
108    reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
109    buf: &mut Vec<u8>,
110) -> io::Result<usize> {
111    let mut total_read = 0;
112
113    // Intermediate buffer we use to read byte-by-byte.
114    let mut tmpbuf = [0_u8];
115
116    while total_read < MAX_BUF_SIZE {
117        buf.resize(total_read + INIT_BUF_SIZE, 0u8);
118
119        match reader.read(&mut tmpbuf).await {
120            Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
121            Ok(0) => break, // Finished reading
122            Ok(_) => {
123                // Copy the read byte to the destination buffer.
124                buf[total_read] = tmpbuf[0];
125                total_read += 1;
126
127                // In HTTP, when we reach '\r\n\r\n' we know we've read the headers.
128                // The rest is the body. Headers should contain Content-Length which
129                // tells us the remaining amount of bytes to read.
130                if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
131                {
132                    break
133                }
134            }
135
136            Err(e) => return Err(e),
137        }
138    }
139
140    // Here we parse the HTTP for correctness and find Content-Length
141    let mut headers = [httparse::EMPTY_HEADER; 8];
142    let mut resp = httparse::Response::new(&mut headers);
143    match resp.parse(buf) {
144        Ok(httparse::Status::Complete(_)) => { /* Do nothing */ }
145        Ok(httparse::Status::Partial) => {
146            error!("[RPC] Failed parsing HTTP response: partial headers");
147            return Err(io::ErrorKind::InvalidData.into())
148        }
149        Err(e) => {
150            error!("[RPC] Failed parsing HTTP response: {e}");
151            return Err(io::ErrorKind::InvalidData.into())
152        }
153    }
154
155    let mut content_length: usize = 0;
156    for header in headers {
157        if header.name.to_lowercase() == "content-length" {
158            let s = String::from_utf8_lossy(header.value);
159            content_length = match s.parse() {
160                Ok(v) => v,
161                Err(_) => return Err(io::ErrorKind::InvalidData.into()),
162            };
163        }
164    }
165
166    if content_length == 0 || content_length > MAX_BUF_SIZE {
167        return Err(io::ErrorKind::InvalidData.into())
168    }
169
170    // Now we know the response body size. Read it into the buffer.
171    buf.clear();
172    buf.resize(content_length, 0_u8);
173    reader.read(buf).await?;
174
175    assert!(buf.len() == content_length);
176    Ok(content_length)
177}
178
179/// Internal read function that reads from the active stream into a buffer.
180/// Reading stops upon reaching CRLF or LF, or when `MAX_BUF_SIZE` is reached.
181pub(super) async fn read_from_stream(
182    reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
183    buf: &mut Vec<u8>,
184) -> io::Result<usize> {
185    let mut total_read = 0;
186
187    // Intermediate buffer we use to read byte-by-byte.
188    let mut tmpbuf = [0_u8];
189
190    while total_read < MAX_BUF_SIZE {
191        buf.resize(total_read + INIT_BUF_SIZE, 0u8);
192
193        match reader.read(&mut tmpbuf).await {
194            Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
195            Ok(0) => break, // Finished reading
196            Ok(_) => {
197                // When we reach '\n', pop a possible '\r' from the buffer and bail.
198                if tmpbuf[0] == b'\n' {
199                    if buf[total_read - 1] == b'\r' {
200                        buf.pop();
201                        total_read -= 1;
202                    }
203                    break
204                }
205
206                // Copy the read byte to the destination buffer.
207                buf[total_read] = tmpbuf[0];
208                total_read += 1;
209            }
210
211            Err(e) => return Err(e),
212        }
213    }
214
215    // Truncate buffer to actual data size
216    buf.truncate(total_read);
217    Ok(total_read)
218}
219
220/// Internal write function that writes a JSON-RPC object to the active stream.
221/// Sent as an HTTP response.
222pub(super) async fn http_write_to_stream(
223    writer: &mut WriteHalf<Box<dyn PtStream>>,
224    object: &JsonResult,
225) -> io::Result<()> {
226    let (status_line, object_str) = match object {
227        JsonResult::Notification(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
228        JsonResult::Response(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
229        JsonResult::Error(v) => ("HTTP/1.1 400 Bad Request", v.stringify().unwrap()),
230        JsonResult::Request(v) => ("POST /json_rpc HTTP/1.1", v.stringify().unwrap()),
231        _ => unreachable!(),
232    };
233
234    let length = object_str.len();
235    let data = format!("{status_line}\r\nContent-Length: {length}\r\nContent-Type: application/json\r\n\r\n{object_str}");
236
237    writer.write_all(data.as_bytes()).await?;
238    writer.flush().await?;
239
240    Ok(())
241}
242
243/// Internal write function that writes a JSON-RPC object to the active stream.
244pub(super) async fn write_to_stream(
245    writer: &mut WriteHalf<Box<dyn PtStream>>,
246    object: &JsonResult,
247) -> io::Result<()> {
248    let object_str = match object {
249        JsonResult::Notification(v) => v.stringify().unwrap(),
250        JsonResult::Response(v) => v.stringify().unwrap(),
251        JsonResult::Error(v) => v.stringify().unwrap(),
252        JsonResult::Request(v) => v.stringify().unwrap(),
253        _ => unreachable!(),
254    };
255
256    // As we're a line-based protocol, we append CRLF to the end of the JSON string.
257    for i in [object_str.as_bytes(), b"\r\n"] {
258        writer.write_all(i).await?
259    }
260
261    writer.flush().await?;
262
263    Ok(())
264}