1use std::{io, time::Duration};
20
21use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
22use tracing::error;
23
24use super::jsonrpc::*;
25use crate::net::transport::PtStream;
26
27pub(super) const INIT_BUF_SIZE: usize = 4096; pub(super) const MAX_BUF_SIZE: usize = 1024 * 1024 * 16; pub(super) const READ_TIMEOUT: Duration = Duration::from_secs(30);
30
31pub(super) async fn http_read_from_stream_request(
34 reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
35 buf: &mut Vec<u8>,
36) -> io::Result<usize> {
37 let mut total_read = 0;
38
39 let mut tmpbuf = [0_u8];
41
42 while total_read < MAX_BUF_SIZE {
43 buf.resize(total_read + INIT_BUF_SIZE, 0u8);
44
45 match reader.read(&mut tmpbuf).await {
46 Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
47 Ok(0) => break, Ok(_) => {
49 buf[total_read] = tmpbuf[0];
51 total_read += 1;
52
53 if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
57 {
58 break
59 }
60 }
61
62 Err(e) => return Err(e),
63 }
64 }
65
66 let mut headers = [httparse::EMPTY_HEADER; 8];
68 let mut req = httparse::Request::new(&mut headers);
69 match req.parse(buf) {
70 Ok(httparse::Status::Complete(_)) => { }
71 Ok(httparse::Status::Partial) => {
72 error!("[RPC] Failed parsing HTTP request: partial headers");
73 return Err(io::ErrorKind::InvalidData.into())
74 }
75 Err(e) => {
76 error!("[RPC] Failed parsing HTTP request: {e}");
77 return Err(io::ErrorKind::InvalidData.into())
78 }
79 }
80
81 let mut content_length: usize = 0;
82 for header in headers {
83 if header.name.to_lowercase() == "content-length" {
84 let s = String::from_utf8_lossy(header.value);
85 content_length = match s.parse() {
86 Ok(v) => v,
87 Err(_) => return Err(io::ErrorKind::InvalidData.into()),
88 };
89 }
90 }
91
92 if content_length == 0 || content_length > MAX_BUF_SIZE {
93 return Err(io::ErrorKind::InvalidData.into())
94 }
95
96 buf.clear();
98 buf.resize(content_length, 0_u8);
99 reader.read(buf).await?;
100
101 assert!(buf.len() == content_length);
102 Ok(content_length)
103}
104
105pub(super) async fn http_read_from_stream_response(
108 reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
109 buf: &mut Vec<u8>,
110) -> io::Result<usize> {
111 let mut total_read = 0;
112
113 let mut tmpbuf = [0_u8];
115
116 while total_read < MAX_BUF_SIZE {
117 buf.resize(total_read + INIT_BUF_SIZE, 0u8);
118
119 match reader.read(&mut tmpbuf).await {
120 Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
121 Ok(0) => break, Ok(_) => {
123 buf[total_read] = tmpbuf[0];
125 total_read += 1;
126
127 if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
131 {
132 break
133 }
134 }
135
136 Err(e) => return Err(e),
137 }
138 }
139
140 let mut headers = [httparse::EMPTY_HEADER; 8];
142 let mut resp = httparse::Response::new(&mut headers);
143 match resp.parse(buf) {
144 Ok(httparse::Status::Complete(_)) => { }
145 Ok(httparse::Status::Partial) => {
146 error!("[RPC] Failed parsing HTTP response: partial headers");
147 return Err(io::ErrorKind::InvalidData.into())
148 }
149 Err(e) => {
150 error!("[RPC] Failed parsing HTTP response: {e}");
151 return Err(io::ErrorKind::InvalidData.into())
152 }
153 }
154
155 let mut content_length: usize = 0;
156 for header in headers {
157 if header.name.to_lowercase() == "content-length" {
158 let s = String::from_utf8_lossy(header.value);
159 content_length = match s.parse() {
160 Ok(v) => v,
161 Err(_) => return Err(io::ErrorKind::InvalidData.into()),
162 };
163 }
164 }
165
166 if content_length == 0 || content_length > MAX_BUF_SIZE {
167 return Err(io::ErrorKind::InvalidData.into())
168 }
169
170 buf.clear();
172 buf.resize(content_length, 0_u8);
173 reader.read(buf).await?;
174
175 assert!(buf.len() == content_length);
176 Ok(content_length)
177}
178
179pub(super) async fn read_from_stream(
182 reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
183 buf: &mut Vec<u8>,
184) -> io::Result<usize> {
185 let mut total_read = 0;
186
187 let mut tmpbuf = [0_u8];
189
190 while total_read < MAX_BUF_SIZE {
191 buf.resize(total_read + INIT_BUF_SIZE, 0u8);
192
193 match reader.read(&mut tmpbuf).await {
194 Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
195 Ok(0) => break, Ok(_) => {
197 if tmpbuf[0] == b'\n' {
199 if buf[total_read - 1] == b'\r' {
200 buf.pop();
201 total_read -= 1;
202 }
203 break
204 }
205
206 buf[total_read] = tmpbuf[0];
208 total_read += 1;
209 }
210
211 Err(e) => return Err(e),
212 }
213 }
214
215 buf.truncate(total_read);
217 Ok(total_read)
218}
219
220pub(super) async fn http_write_to_stream(
223 writer: &mut WriteHalf<Box<dyn PtStream>>,
224 object: &JsonResult,
225) -> io::Result<()> {
226 let (status_line, object_str) = match object {
227 JsonResult::Notification(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
228 JsonResult::Response(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
229 JsonResult::Error(v) => ("HTTP/1.1 400 Bad Request", v.stringify().unwrap()),
230 JsonResult::Request(v) => ("POST /json_rpc HTTP/1.1", v.stringify().unwrap()),
231 _ => unreachable!(),
232 };
233
234 let length = object_str.len();
235 let data = format!("{status_line}\r\nContent-Length: {length}\r\nContent-Type: application/json\r\n\r\n{object_str}");
236
237 writer.write_all(data.as_bytes()).await?;
238 writer.flush().await?;
239
240 Ok(())
241}
242
243pub(super) async fn write_to_stream(
245 writer: &mut WriteHalf<Box<dyn PtStream>>,
246 object: &JsonResult,
247) -> io::Result<()> {
248 let object_str = match object {
249 JsonResult::Notification(v) => v.stringify().unwrap(),
250 JsonResult::Response(v) => v.stringify().unwrap(),
251 JsonResult::Error(v) => v.stringify().unwrap(),
252 JsonResult::Request(v) => v.stringify().unwrap(),
253 _ => unreachable!(),
254 };
255
256 for i in [object_str.as_bytes(), b"\r\n"] {
258 writer.write_all(i).await?
259 }
260
261 writer.flush().await?;
262
263 Ok(())
264}