darkfi/geode/
file_sequence.rs1use std::{
20 collections::HashSet,
21 fs::{File, OpenOptions},
22 io::{Read, Seek, Write},
23 path::PathBuf,
24 pin::Pin,
25};
26
27use futures::{
28 task::{Context, Poll},
29 AsyncRead, AsyncSeek, AsyncWrite,
30};
31use smol::io::{self, SeekFrom};
32
33#[derive(Debug)]
47pub struct FileSequence {
48 files: Vec<(PathBuf, u64)>,
51 current_file: Option<File>,
53 current_file_index: Option<usize>,
55
56 position: u64,
57 auto_set_len: bool,
60}
61
62impl FileSequence {
63 pub fn new(files: &[(PathBuf, u64)], auto_set_len: bool) -> Self {
64 Self {
65 files: files.to_vec(),
66 current_file: None,
67 current_file_index: None,
68 position: 0,
69 auto_set_len,
70 }
71 }
72
73 pub fn set_file_size(&mut self, file_index: usize, file_size: u64) {
75 self.files[file_index].1 = file_size;
76 }
77
78 pub fn get_current_file(&self) -> &Option<File> {
80 &self.current_file
81 }
82
83 pub fn get_files(&self) -> &Vec<(PathBuf, u64)> {
85 &self.files
86 }
87
88 pub fn len(&self) -> u64 {
90 self.files.iter().map(|(_, size)| size).sum()
91 }
92
93 pub fn is_empty(&self) -> bool {
95 self.files.is_empty()
96 }
97
98 pub fn subset_len(&self, files: HashSet<PathBuf>) -> u64 {
100 self.files.iter().filter(|(path, _)| files.contains(path)).map(|(_, size)| size).sum()
101 }
102
103 pub fn get_file_position(&self, file_index: usize) -> u64 {
106 let mut pos = 0;
107 for i in 0..file_index {
108 pos += self.files[i].1;
109 }
110 pos
111 }
112
113 fn open_next_file(&mut self) -> io::Result<()> {
117 self.current_file = None;
118 self.current_file_index = match self.current_file_index {
119 Some(i) => Some(i + 1),
120 None => Some(0),
121 };
122 if self.current_file_index.unwrap() >= self.files.len() {
123 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "No more files to open"))
124 }
125 let file = OpenOptions::new()
126 .read(true)
127 .write(true)
128 .create(false)
129 .open(self.files[self.current_file_index.unwrap()].0.clone())?;
130 self.current_file = Some(file);
131 Ok(())
132 }
133
134 fn open_file(&mut self, file_index: usize) -> io::Result<()> {
136 self.current_file = None;
137 self.current_file_index = Some(file_index);
138 let file = OpenOptions::new()
139 .read(true)
140 .write(true)
141 .create(false)
142 .open(self.files[file_index].0.clone())?;
143 self.current_file = Some(file);
144 Ok(())
145 }
146}
147
148impl AsyncRead for FileSequence {
149 fn poll_read(
150 self: Pin<&mut Self>,
151 _: &mut Context<'_>,
152 buf: &mut [u8],
153 ) -> Poll<io::Result<usize>> {
154 let this = self.get_mut();
155 let mut total_read = 0;
156
157 while total_read < buf.len() {
158 if this.current_file.is_none() {
159 if let Some(file_index) = this.current_file_index {
160 if file_index >= this.files.len() - 1 {
162 return Poll::Ready(Ok(total_read));
163 }
164 let start_pos = this.get_file_position(file_index);
165 let file_size = this.files[file_index].1 as usize;
166 let file_pos = this.position - start_pos;
167 let space_left = file_size - file_pos as usize;
168 let skip_bytes = (buf.len() - total_read).min(space_left);
169 this.position += skip_bytes as u64;
170 total_read += skip_bytes;
171 }
172
173 match this.open_next_file() {
175 Ok(_) => {}
176 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
177 return Poll::Ready(Ok(total_read));
178 }
179 Err(e) if e.kind() == io::ErrorKind::NotFound => {
180 this.current_file = None;
181 continue; }
183 Err(e) => return Poll::Ready(Err(e)),
184 }
185 }
186
187 let file = this.current_file.as_mut().unwrap();
189 match file.read(&mut buf[total_read..]) {
190 Ok(bytes_read) => {
191 if bytes_read == 0 {
192 this.current_file = None; } else {
194 total_read += bytes_read;
195 this.position += bytes_read as u64;
196 }
197 }
198 Err(e) => return Poll::Ready(Err(e)),
199 }
200 }
201
202 Poll::Ready(Ok(total_read))
203 }
204}
205
206impl AsyncSeek for FileSequence {
207 fn poll_seek(
208 self: Pin<&mut Self>,
209 _: &mut Context<'_>,
210 pos: SeekFrom,
211 ) -> Poll<io::Result<u64>> {
212 let this = self.get_mut();
213
214 let abs_pos = match pos {
215 SeekFrom::Start(offset) => offset,
216 _ => todo!(), };
218
219 let mut file_index = 0;
221 let mut bytes_offset = 0;
222
223 while file_index < this.files.len() {
224 if bytes_offset + this.files[file_index].1 >= abs_pos {
225 break;
226 }
227 bytes_offset += this.files[file_index].1;
228 file_index += 1;
229 }
230
231 if file_index >= this.files.len() {
232 return Poll::Ready(Err(io::Error::new(
233 io::ErrorKind::InvalidInput,
234 "Seek position out of bounds",
235 )))
236 }
237
238 this.position = abs_pos; if this.current_file.is_none() ||
242 this.current_file_index.is_some() && this.current_file_index.unwrap() != file_index
243 {
244 match this.open_file(file_index) {
245 Ok(_) => {}
246 Err(e) if e.kind() == io::ErrorKind::NotFound => {
247 return Poll::Ready(Ok(this.position));
249 }
250 Err(e) => return Poll::Ready(Err(e)),
251 };
252 }
253
254 let file = this.current_file.as_mut().unwrap();
255 let file_pos = abs_pos - bytes_offset;
256
257 match file.seek(SeekFrom::Start(file_pos)) {
259 Ok(_) => Poll::Ready(Ok(this.position)),
260 Err(e) => Poll::Ready(Err(e)),
261 }
262 }
263}
264
265impl AsyncWrite for FileSequence {
266 fn poll_write(
267 self: Pin<&mut Self>,
268 _: &mut Context<'_>,
269 buf: &[u8],
270 ) -> Poll<io::Result<usize>> {
271 let this = self.get_mut();
272 let mut total_bytes_written = 0;
273 let mut remaining_buf = buf;
274 let auto_set_len = this.auto_set_len;
275
276 let finalize_current_file = |file: &mut File, max_size: u64| {
277 if auto_set_len {
278 file.set_len(max_size)?;
279 }
280 file.flush()?;
281 Ok(())
282 };
283
284 loop {
285 if this.current_file.is_none() {
287 if let Some(file_index) = this.current_file_index {
288 if file_index >= this.files.len() - 1 {
289 break; }
291 if remaining_buf.is_empty() {
292 break; }
294 let start_pos = this.get_file_position(file_index);
295 let file_size = this.files[file_index].1 as usize;
296 let file_pos = this.position - start_pos;
297 let space_left = file_size - file_pos as usize;
298 let skip_bytes = remaining_buf.len().min(space_left);
299 this.position += skip_bytes as u64;
300 remaining_buf = &remaining_buf[skip_bytes..]; }
302
303 match this.open_next_file() {
305 Ok(_) => {}
306 Err(e) if e.kind() == io::ErrorKind::NotFound => {
307 this.current_file = None;
308 continue; }
310 Err(e) => return Poll::Ready(Err(e)),
311 }
312 }
313
314 let file = this.current_file.as_mut().unwrap();
315 let max_size = this.files[this.current_file_index.unwrap()].1;
316
317 let current_position = file.stream_position()?;
319 let space_left = max_size - current_position;
320 let bytes_to_write = remaining_buf.len().min(space_left as usize);
321
322 if bytes_to_write == 0 {
323 if let Err(e) = finalize_current_file(file, max_size) {
325 return Poll::Ready(Err(e));
326 }
327 this.current_file = None;
328 continue;
329 }
330
331 match file.write(&remaining_buf[..bytes_to_write]) {
333 Ok(bytes_written) => {
334 total_bytes_written += bytes_written;
335 this.position += bytes_written as u64;
336 remaining_buf = &remaining_buf[bytes_written..]; if remaining_buf.is_empty() {
338 if let Err(e) = finalize_current_file(file, max_size) {
339 return Poll::Ready(Err(e));
340 }
341 break; }
343
344 if bytes_written == bytes_to_write {
346 if let Err(e) = finalize_current_file(file, max_size) {
347 return Poll::Ready(Err(e));
348 }
349 this.current_file = None;
350 }
351 }
352 Err(e) => return Poll::Ready(Err(e)), }
354 }
355
356 Poll::Ready(Ok(total_bytes_written))
357 }
358
359 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
360 Poll::Ready(Ok(())) }
362
363 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
364 let this = self.get_mut();
365 if let Some(file) = this.current_file.take() {
366 match file.sync_all() {
367 Ok(()) => Poll::Ready(Ok(())),
368 Err(e) => Poll::Ready(Err(e)),
369 }
370 } else {
371 Poll::Ready(Ok(())) }
373 }
374}
375
376impl Clone for FileSequence {
377 fn clone(&self) -> Self {
378 Self {
379 files: self.files.clone(),
380 current_file: None,
381 current_file_index: None,
382 position: 0,
383 auto_set_len: self.auto_set_len,
384 }
385 }
386}