darkfi/geode/
file_sequence.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::HashSet,
21    fs::{File, OpenOptions},
22    io::{Read, Seek, Write},
23    path::PathBuf,
24    pin::Pin,
25};
26
27use futures::{
28    task::{Context, Poll},
29    AsyncRead, AsyncSeek, AsyncWrite,
30};
31use smol::io::{self, SeekFrom};
32
33/// `FileSequence` is an object that implements `AsyncRead`, `AsyncSeek`, and
34/// `AsyncWrite` for an ordered list of (file path, file size).
35///
36/// You can use it to read and write from/to a list a file, without having to
37/// manage individual file operations explicitly.
38///
39/// This allows seamless handling of multiple files as if they were a single
40/// continuous file. It automatically opens the next file in the list when the
41/// current file is exhausted.
42///
43/// It's also made so that files in `files` that do not exist on the filesystem
44/// will get skipped, without returning an Error. All files you want to read,
45/// write, and seek to should be created before using the FileSequence.
46#[derive(Debug)]
47pub struct FileSequence {
48    /// List of (file path, file size). File sizes are not the sizes of the
49    /// files as they currently are on the file system, but the sizes we want
50    files: Vec<(PathBuf, u64)>,
51    /// Currently opened file
52    current_file: Option<File>,
53    /// Index of the currently opened file in the `files` vector
54    current_file_index: Option<usize>,
55
56    position: u64,
57    /// Set to `true` to automatically set the length of the file on the
58    /// filesystem to it's size as defined in the `files` vector, after a write
59    auto_set_len: bool,
60}
61
62impl FileSequence {
63    pub fn new(files: &[(PathBuf, u64)], auto_set_len: bool) -> Self {
64        Self {
65            files: files.to_vec(),
66            current_file: None,
67            current_file_index: None,
68            position: 0,
69            auto_set_len,
70        }
71    }
72
73    /// Update a single file size.
74    pub fn set_file_size(&mut self, file_index: usize, file_size: u64) {
75        self.files[file_index].1 = file_size;
76    }
77
78    /// Return `current_file`.
79    pub fn get_current_file(&self) -> &Option<File> {
80        &self.current_file
81    }
82
83    /// Return `files`.
84    pub fn get_files(&self) -> &Vec<(PathBuf, u64)> {
85        &self.files
86    }
87
88    /// Return the combined file size of all files.
89    pub fn len(&self) -> u64 {
90        self.files.iter().map(|(_, size)| size).sum()
91    }
92
93    /// Return `true` if the `FileSequence` contains no file.
94    pub fn is_empty(&self) -> bool {
95        self.files.is_empty()
96    }
97
98    /// Return the combined file size of all files.
99    pub fn subset_len(&self, files: HashSet<PathBuf>) -> u64 {
100        self.files.iter().filter(|(path, _)| files.contains(path)).map(|(_, size)| size).sum()
101    }
102
103    /// Compute the starting position of the file (in bytes) by suming up
104    /// the size of the previous files.
105    pub fn get_file_position(&self, file_index: usize) -> u64 {
106        let mut pos = 0;
107        for i in 0..file_index {
108            pos += self.files[i].1;
109        }
110        pos
111    }
112
113    /// Open the file at (`current_file_index` + 1).
114    /// If no file is currently open (`current_file_index` is None), it opens
115    /// the first file.
116    fn open_next_file(&mut self) -> io::Result<()> {
117        self.current_file = None;
118        self.current_file_index = match self.current_file_index {
119            Some(i) => Some(i + 1),
120            None => Some(0),
121        };
122        if self.current_file_index.unwrap() >= self.files.len() {
123            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "No more files to open"))
124        }
125        let file = OpenOptions::new()
126            .read(true)
127            .write(true)
128            .create(false)
129            .open(self.files[self.current_file_index.unwrap()].0.clone())?;
130        self.current_file = Some(file);
131        Ok(())
132    }
133
134    /// Open the file at `file_index`.
135    fn open_file(&mut self, file_index: usize) -> io::Result<()> {
136        self.current_file = None;
137        self.current_file_index = Some(file_index);
138        let file = OpenOptions::new()
139            .read(true)
140            .write(true)
141            .create(false)
142            .open(self.files[file_index].0.clone())?;
143        self.current_file = Some(file);
144        Ok(())
145    }
146}
147
148impl AsyncRead for FileSequence {
149    fn poll_read(
150        self: Pin<&mut Self>,
151        _: &mut Context<'_>,
152        buf: &mut [u8],
153    ) -> Poll<io::Result<usize>> {
154        let this = self.get_mut();
155        let mut total_read = 0;
156
157        while total_read < buf.len() {
158            if this.current_file.is_none() {
159                if let Some(file_index) = this.current_file_index {
160                    // Stop if there are no more files to read
161                    if file_index >= this.files.len() - 1 {
162                        return Poll::Ready(Ok(total_read));
163                    }
164                    let start_pos = this.get_file_position(file_index);
165                    let file_size = this.files[file_index].1 as usize;
166                    let file_pos = this.position - start_pos;
167                    let space_left = file_size - file_pos as usize;
168                    let skip_bytes = (buf.len() - total_read).min(space_left);
169                    this.position += skip_bytes as u64;
170                    total_read += skip_bytes;
171                }
172
173                // Open the next file
174                match this.open_next_file() {
175                    Ok(_) => {}
176                    Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
177                        return Poll::Ready(Ok(total_read));
178                    }
179                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
180                        this.current_file = None;
181                        continue; // Skip to next file
182                    }
183                    Err(e) => return Poll::Ready(Err(e)),
184                }
185            }
186
187            // Read from the current file
188            let file = this.current_file.as_mut().unwrap();
189            match file.read(&mut buf[total_read..]) {
190                Ok(bytes_read) => {
191                    if bytes_read == 0 {
192                        this.current_file = None; // Move to the next file
193                    } else {
194                        total_read += bytes_read;
195                        this.position += bytes_read as u64;
196                    }
197                }
198                Err(e) => return Poll::Ready(Err(e)),
199            }
200        }
201
202        Poll::Ready(Ok(total_read))
203    }
204}
205
206impl AsyncSeek for FileSequence {
207    fn poll_seek(
208        self: Pin<&mut Self>,
209        _: &mut Context<'_>,
210        pos: SeekFrom,
211    ) -> Poll<io::Result<u64>> {
212        let this = self.get_mut();
213
214        let abs_pos = match pos {
215            SeekFrom::Start(offset) => offset,
216            _ => todo!(), // TODO
217        };
218
219        // Determine which file to seek in
220        let mut file_index = 0;
221        let mut bytes_offset = 0;
222
223        while file_index < this.files.len() {
224            if bytes_offset + this.files[file_index].1 >= abs_pos {
225                break;
226            }
227            bytes_offset += this.files[file_index].1;
228            file_index += 1;
229        }
230
231        if file_index >= this.files.len() {
232            return Poll::Ready(Err(io::Error::new(
233                io::ErrorKind::InvalidInput,
234                "Seek position out of bounds",
235            )))
236        }
237
238        this.position = abs_pos; // Update FileSequence position
239
240        // Open the file
241        if this.current_file.is_none() ||
242            this.current_file_index.is_some() && this.current_file_index.unwrap() != file_index
243        {
244            match this.open_file(file_index) {
245                Ok(_) => {}
246                Err(e) if e.kind() == io::ErrorKind::NotFound => {
247                    // If the file does not exist, return without actually seeking it
248                    return Poll::Ready(Ok(this.position));
249                }
250                Err(e) => return Poll::Ready(Err(e)),
251            };
252        }
253
254        let file = this.current_file.as_mut().unwrap();
255        let file_pos = abs_pos - bytes_offset;
256
257        // Seek in the current file
258        match file.seek(SeekFrom::Start(file_pos)) {
259            Ok(_) => Poll::Ready(Ok(this.position)),
260            Err(e) => Poll::Ready(Err(e)),
261        }
262    }
263}
264
265impl AsyncWrite for FileSequence {
266    fn poll_write(
267        self: Pin<&mut Self>,
268        _: &mut Context<'_>,
269        buf: &[u8],
270    ) -> Poll<io::Result<usize>> {
271        let this = self.get_mut();
272        let mut total_bytes_written = 0;
273        let mut remaining_buf = buf;
274        let auto_set_len = this.auto_set_len;
275
276        let finalize_current_file = |file: &mut File, max_size: u64| {
277            if auto_set_len {
278                file.set_len(max_size)?;
279            }
280            file.flush()?;
281            Ok(())
282        };
283
284        loop {
285            // Ensure the current file is open
286            if this.current_file.is_none() {
287                if let Some(file_index) = this.current_file_index {
288                    if file_index >= this.files.len() - 1 {
289                        break; // No more files
290                    }
291                    if remaining_buf.is_empty() {
292                        break; // No more data to write
293                    }
294                    let start_pos = this.get_file_position(file_index);
295                    let file_size = this.files[file_index].1 as usize;
296                    let file_pos = this.position - start_pos;
297                    let space_left = file_size - file_pos as usize;
298                    let skip_bytes = remaining_buf.len().min(space_left);
299                    this.position += skip_bytes as u64;
300                    remaining_buf = &remaining_buf[skip_bytes..]; // Update the remaining buffer
301                }
302
303                // Switch to the next file
304                match this.open_next_file() {
305                    Ok(_) => {}
306                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
307                        this.current_file = None;
308                        continue; // Skip to next file
309                    }
310                    Err(e) => return Poll::Ready(Err(e)),
311                }
312            }
313
314            let file = this.current_file.as_mut().unwrap();
315            let max_size = this.files[this.current_file_index.unwrap()].1;
316
317            // Check how much space is left in the current file
318            let current_position = file.stream_position()?;
319            let space_left = max_size - current_position;
320            let bytes_to_write = remaining_buf.len().min(space_left as usize);
321
322            if bytes_to_write == 0 {
323                // Continue to the next iteration to check the new file
324                if let Err(e) = finalize_current_file(file, max_size) {
325                    return Poll::Ready(Err(e));
326                }
327                this.current_file = None;
328                continue;
329            }
330
331            // Write to the current file
332            match file.write(&remaining_buf[..bytes_to_write]) {
333                Ok(bytes_written) => {
334                    total_bytes_written += bytes_written;
335                    this.position += bytes_written as u64;
336                    remaining_buf = &remaining_buf[bytes_written..]; // Update the remaining buffer
337                    if remaining_buf.is_empty() {
338                        if let Err(e) = finalize_current_file(file, max_size) {
339                            return Poll::Ready(Err(e));
340                        }
341                        break; // No more data to write
342                    }
343
344                    // We wrote to the end of this file, use new file on next iteration
345                    if bytes_written == bytes_to_write {
346                        if let Err(e) = finalize_current_file(file, max_size) {
347                            return Poll::Ready(Err(e));
348                        }
349                        this.current_file = None;
350                    }
351                }
352                Err(e) => return Poll::Ready(Err(e)), // Return error if write fails
353            }
354        }
355
356        Poll::Ready(Ok(total_bytes_written))
357    }
358
359    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
360        Poll::Ready(Ok(())) // TODO
361    }
362
363    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
364        let this = self.get_mut();
365        if let Some(file) = this.current_file.take() {
366            match file.sync_all() {
367                Ok(()) => Poll::Ready(Ok(())),
368                Err(e) => Poll::Ready(Err(e)),
369            }
370        } else {
371            Poll::Ready(Ok(())) // No file to close
372        }
373    }
374}
375
376impl Clone for FileSequence {
377    fn clone(&self) -> Self {
378        Self {
379            files: self.files.clone(),
380            current_file: None,
381            current_file_index: None,
382            position: 0,
383            auto_set_len: self.auto_set_len,
384        }
385    }
386}