darkfi/geode/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Chunk-based file storage implementation.
20//! This is a building block for a DHT or something similar.
21//!
22//! The API supports file/directory insertion and retrieval. There is
23//! intentionally no `remove` support. File removal should be handled
24//! externally, and then it is only required to run `garbage_collect()` to
25//! clean things up.
26//!
27//! The hash of a file is the BLAKE3 hash of hashed chunks in the correct
28//! order.
29//! The hash of a directory is the BLAKE3 hash of hashed chunks in the correct
30//! order and the ordered list of (file path, file sizes).
31//! All hashes (file, directory, chunk) are 32 bytes long, and are encoded in
32//! base58 whenever necessary.
33//!
34//! The filesystem hierarchy stores a `files` directory storing metadata
35//! about a full file and a `directories` directory storing metadata about all
36//! files in a directory (all subdirectories included).
37//! The filename of a file in `files` or `directories` is the hash of the
38//! file/directory as defined above.
39//! Inside a file in `files` is the ordered list of the chunks making up the
40//! full file.
41//! Inside a file in `directories` is the ordered list of the chunks making up
42//! each full file, and the (relative) file path and hash of all files in the
43//! directory.
44//!
45//! To get the chunks you split the full file into `MAX_CHUNK_SIZE` sized
46//! slices, the last chunk is the only one that can be smaller than that.
47//!
48//! It might look like the following:
49//! ```
50//! /files/B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX
51//! /files/8nA3ndjFFee3n5wMPLZampLpGaMJi3od4MSyaXPDoF91
52//! /files/...
53//! /directories/FXDduPcEohVzsSxtNVSFU64qtYxEVEHBMkF4k5cBvt3B
54//! /directories/AHjU1LizfGqsGnF8VSa9kphSQ5pqS4YjmPqme5RZajsj
55//! /directories/...
56//! ```
57//!
58//! Inside a file metadata (file in `files`) is the ordered list of chunk
59//! hashes, for example:
60//! ```
61//! 2bQPxSR8Frz7S7JW3DRAzEtkrHfLXB1CN65V7az77pUp
62//! CvjvN6MfWQYK54DgKNR7MPgFSZqsCgpWKF2p8ot66CCP
63//! ```
64//!
65//! Inside a directory metadata (file in `directories`) is, in addition to
66//! chunk hashes, the path and size of each file in the directory. For example:
67//! ```
68//! 8Kb55jeqJsq7WTBN93gvBzh2zmXAXVPh111VqD3Hi42V
69//! GLiBqpLPTbpJhSMYfzi3s7WivrTViov7ShX7uso6fG5s
70//! picture.jpg 312948
71//! ```
72//! Chunks of a directory can include multiple files, if multiple files fit
73//! into `MAX_CHUNK_SIZE`. The chunks are computed as if all the files were
74//! concatenated into a single big file, to minimize the number of chunks.
75//!
76//! The full file is not copied, and individual chunks are not stored by
77//! geode. Additionally it does not keep track of the full files path.
78
79use std::{
80    collections::HashSet,
81    path::{Path, PathBuf},
82};
83
84use futures::{AsyncRead, AsyncSeek};
85use smol::{
86    fs::{self, File},
87    io::{
88        AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor, ErrorKind,
89        SeekFrom,
90    },
91    stream::StreamExt,
92};
93use tracing::{debug, info, warn};
94
95use crate::{Error, Result};
96
97mod chunked_storage;
98pub use chunked_storage::{Chunk, ChunkedStorage};
99
100mod file_sequence;
101pub use file_sequence::FileSequence;
102
103mod util;
104pub use util::{hash_to_string, read_until_filled};
105
106/// Defined maximum size of a stored chunk (256 KiB)
107pub const MAX_CHUNK_SIZE: usize = 262_144;
108
109/// Path prefix where file metadata is stored
110const FILES_PATH: &str = "files";
111
112/// Path prefix where directory metadata is stored
113const DIRS_PATH: &str = "directories";
114
115/// Chunk-based file storage interface.
116pub struct Geode {
117    /// Path to the filesystem directory where file metadata is stored
118    pub files_path: PathBuf,
119    /// Path to the filesystem directory where directory metadata is stored
120    pub dirs_path: PathBuf,
121}
122
123impl Geode {
124    /// Instantiate a new [`Geode`] object.
125    /// `base_path` defines the root directory where Geode will store its
126    /// file metadata and chunks.
127    pub async fn new(base_path: &PathBuf) -> Result<Self> {
128        let mut files_path: PathBuf = base_path.into();
129        files_path.push(FILES_PATH);
130        let mut dirs_path: PathBuf = base_path.into();
131        dirs_path.push(DIRS_PATH);
132
133        // Create necessary directory structure if needed
134        fs::create_dir_all(&files_path).await?;
135        fs::create_dir_all(&dirs_path).await?;
136
137        Ok(Self { files_path, dirs_path })
138    }
139
140    /// Attempt to read chunk hashes and files metadata from a given metadata path.
141    /// This works for both file metadata and directory metadata.
142    /// Returns (chunk hashes, [(file path, file size)]).
143    async fn read_metadata(path: &PathBuf) -> Result<(Vec<blake3::Hash>, Vec<(PathBuf, u64)>)> {
144        debug!(target: "geode::read_dir_metadata", "Reading chunks from {path:?} (dir)");
145
146        let mut chunk_hashes = vec![];
147        let mut files = vec![];
148
149        let fd = File::open(path).await?;
150        let mut lines = BufReader::new(fd).lines();
151
152        while let Some(line) = lines.next().await {
153            let line = line?;
154            let line = line.trim();
155
156            if line.is_empty() {
157                continue; // Skip empty lines
158            }
159
160            let parts: Vec<&str> = line.split_whitespace().collect();
161            if parts.len() == 2 {
162                // File
163                let file_path = PathBuf::from(parts[0]);
164                if file_path.clone().is_absolute() {
165                    return Err(Error::Custom(format!(
166                        "Path of file {} is absolute, which is not allowed",
167                        parts[0]
168                    )))
169                }
170
171                // Check for `..` in the path components
172                for component in file_path.clone().components() {
173                    if component == std::path::Component::ParentDir {
174                        return Err(Error::Custom(format!("Path of file {} contains reference to parent dir, which is not allowed", parts[0])))
175                    }
176                }
177
178                let file_size = parts[1].parse::<u64>()?;
179                files.push((file_path, file_size));
180            } else if parts.len() == 1 {
181                // Chunk
182                let chunk_hash_str = parts[0].trim();
183                if chunk_hash_str.is_empty() {
184                    break; // Stop reading chunk hashes on empty line
185                }
186                let mut hash_buf = [0u8; 32];
187                bs58::decode(chunk_hash_str).onto(&mut hash_buf)?;
188                let chunk_hash = blake3::Hash::from_bytes(hash_buf);
189                chunk_hashes.push(chunk_hash);
190            } else {
191                // Invalid format
192                return Err(Error::Custom("Invalid directory metadata format".to_string()));
193            }
194        }
195
196        Ok((chunk_hashes, files))
197    }
198
199    /// Perform garbage collection over the filesystem hierarchy.
200    /// Returns a set representing deleted files.
201    pub async fn garbage_collect(&self) -> Result<HashSet<blake3::Hash>> {
202        info!(target: "geode::garbage_collect", "[Geode] Performing garbage collection");
203        // We track corrupt files here.
204        let mut deleted_files = HashSet::new();
205
206        // Perform health check over metadata. For now we just ensure they
207        // have the correct format.
208        let file_paths = fs::read_dir(&self.files_path).await?;
209        let dir_paths = fs::read_dir(&self.dirs_path).await?;
210        let mut paths = file_paths.chain(dir_paths);
211        while let Some(file) = paths.next().await {
212            let Ok(entry) = file else { continue };
213            let path = entry.path();
214
215            // Skip if we're not a plain file
216            if !path.is_file() {
217                continue
218            }
219
220            // Make sure that the filename is a BLAKE3 hash
221            let file_name = match path.file_name().and_then(|n| n.to_str()) {
222                Some(v) => v,
223                None => continue,
224            };
225            let mut hash_buf = [0u8; 32];
226            let hash = match bs58::decode(file_name).onto(&mut hash_buf) {
227                Ok(_) => blake3::Hash::from_bytes(hash_buf),
228                Err(_) => continue,
229            };
230
231            // The filename is a BLAKE3 hash. It should contain a newline-separated
232            // list of chunks which represent the full file. If that is not the case
233            // we will consider it a corrupted file and delete it.
234            if Self::read_metadata(&path).await.is_err() {
235                if let Err(e) = fs::remove_file(path).await {
236                    warn!(
237                       target: "geode::garbage_collect",
238                       "[Geode] Garbage collect failed to remove corrupted metadata: {e}"
239                    );
240                }
241
242                deleted_files.insert(hash);
243                continue
244            }
245        }
246
247        info!(target: "geode::garbage_collect", "[Geode] Garbage collection finished");
248        Ok(deleted_files)
249    }
250
251    /// Chunk a stream.
252    /// Returns a hasher (containing the chunk hashes), and the list of chunk hashes.
253    pub async fn chunk_stream(
254        &self,
255        mut stream: impl AsyncRead + Unpin,
256    ) -> Result<(blake3::Hasher, Vec<blake3::Hash>)> {
257        let mut hasher = blake3::Hasher::new();
258        let mut chunk_hashes = vec![];
259
260        loop {
261            let mut buf = vec![0u8; MAX_CHUNK_SIZE];
262            let bytes_read = stream.read(&mut buf).await?;
263            if bytes_read == 0 {
264                break
265            }
266
267            let chunk_hash = blake3::hash(&buf[..bytes_read]);
268            hasher.update(chunk_hash.as_bytes());
269            chunk_hashes.push(chunk_hash);
270        }
271
272        Ok((hasher, chunk_hashes))
273    }
274
275    /// Sorts files by their PathBuf.
276    pub fn sort_files(&self, files: &mut [(PathBuf, u64)]) {
277        files.sort_by(|(a, _), (b, _)| a.to_string_lossy().cmp(&b.to_string_lossy()));
278    }
279
280    /// Add chunk hashes to `hasher`.
281    pub fn hash_chunks_metadata(&self, hasher: &mut blake3::Hasher, chunk_hashes: &[blake3::Hash]) {
282        for chunk in chunk_hashes {
283            hasher.update(chunk.as_bytes());
284        }
285    }
286
287    /// Add files metadata to `hasher`.
288    /// You must sort the files using `sort_files`.
289    pub fn hash_files_metadata(
290        &self,
291        hasher: &mut blake3::Hasher,
292        relative_files: &[(PathBuf, u64)],
293    ) {
294        for file in relative_files {
295            hasher.update(file.0.to_string_lossy().to_string().as_bytes());
296            hasher.update(&file.1.to_le_bytes());
297        }
298    }
299
300    /// Create and insert file or directory metadata into Geode.
301    /// Always overwrites any existing file.
302    /// Verifies that the metadata is valid.
303    /// The `relative_files` slice is empty for files.
304    pub async fn insert_metadata(
305        &self,
306        hash: &blake3::Hash,
307        chunk_hashes: &[blake3::Hash],
308        relative_files: &[(PathBuf, u64)],
309    ) -> Result<()> {
310        info!(target: "geode::insert_metadata", "[Geode] Inserting metadata");
311
312        // Verify the metadata
313        if !self.verify_metadata(hash, chunk_hashes, relative_files) {
314            return Err(Error::GeodeNeedsGc)
315        }
316
317        // Write the metadata file
318        let mut file_path = match relative_files.is_empty() {
319            true => self.files_path.clone(),
320            false => self.dirs_path.clone(),
321        };
322        file_path.push(hash_to_string(hash).as_str());
323        let mut file_fd = File::create(&file_path).await?;
324
325        for ch in chunk_hashes {
326            file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?;
327        }
328        for file in relative_files {
329            file_fd.write(format!("{} {}\n", file.0.to_string_lossy(), file.1).as_bytes()).await?;
330        }
331        file_fd.flush().await?;
332
333        Ok(())
334    }
335
336    /// Write a single chunk given a stream.
337    /// The file must be inserted into Geode before calling this method.
338    /// Always overwrites any existing chunk. Returns the chunk hash and
339    /// the number of bytes written to the file system.
340    pub async fn write_chunk(
341        &self,
342        chunked: &mut ChunkedStorage,
343        stream: impl AsRef<[u8]>,
344    ) -> Result<(blake3::Hash, usize)> {
345        info!(target: "geode::write_chunk", "[Geode] Writing single chunk");
346
347        let mut cursor = Cursor::new(&stream);
348        let mut chunk = vec![0u8; MAX_CHUNK_SIZE];
349
350        // Read the stream to get the chunk content
351        let chunk_slice = read_until_filled(&mut cursor, &mut chunk).await?;
352
353        // Get the chunk hash from the content
354        let chunk_hash = blake3::hash(chunk_slice);
355
356        // Get the chunk index in the file/directory from the chunk hash
357        let chunk_index = match chunked.iter().position(|c| c.hash == chunk_hash) {
358            Some(index) => index,
359            None => {
360                return Err(Error::GeodeNeedsGc);
361            }
362        };
363
364        // Compute byte position from the chunk index and the chunk size
365        let position = (chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
366
367        // Seek to the correct position
368        let fileseq = &mut chunked.get_fileseq_mut();
369        fileseq.seek(SeekFrom::Start(position)).await?;
370
371        // This will write the chunk, and truncate files if `chunked` is a directory.
372        let bytes_written = fileseq.write(chunk_slice).await?;
373
374        // If it's the last chunk of a file (and it's *not* a directory),
375        // truncate the file to the correct length.
376        // This is because contrary to directories, we do not know the exact
377        // file size from its metadata, we only know the number of chunks.
378        // Therefore we only know the exact size once we know the size of the
379        // last chunk.
380        // We also update the `FileSequence` to the exact size.
381        if !chunked.is_dir() && chunk_index == chunked.len() - 1 {
382            let exact_file_size =
383                chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - chunk_slice.len());
384            if let Some(file) = &chunked.get_fileseq_mut().get_current_file() {
385                let _ = file.set_len(exact_file_size as u64);
386            }
387            chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
388        }
389
390        Ok((chunk_hash, bytes_written))
391    }
392
393    /// Fetch file/directory metadata from Geode. Returns [`ChunkedStorage`]. Returns an error if
394    /// the read failed in any way (could also be the file does not exist).
395    pub async fn get(&self, hash: &blake3::Hash, path: &Path) -> Result<ChunkedStorage> {
396        let hash_str = hash_to_string(hash);
397        info!(target: "geode::get", "[Geode] Getting chunks for {hash_str}...");
398
399        // Try to read the file or dir metadata. If it's corrupt, return an error signalling
400        // that garbage collection needs to run.
401        let metadata_paths = [self.files_path.join(&hash_str), self.dirs_path.join(&hash_str)];
402        for metadata_path in metadata_paths {
403            match Self::read_metadata(&metadata_path).await {
404                Ok((chunk_hashes, files)) => {
405                    return self.create_chunked_storage(hash, path, &chunk_hashes, &files).await
406                }
407                Err(e) => {
408                    if !matches!(e, Error::Io(ErrorKind::NotFound)) {
409                        return Err(Error::GeodeNeedsGc)
410                    }
411                }
412            };
413        }
414
415        Err(Error::GeodeFileNotFound)
416    }
417
418    /// Create a ChunkedStorage from metadata.
419    /// `hash` is the hash of the file or directory.
420    async fn create_chunked_storage(
421        &self,
422        hash: &blake3::Hash,
423        path: &Path,
424        chunk_hashes: &[blake3::Hash],
425        relative_files: &[(PathBuf, u64)], // Only used by directories
426    ) -> Result<ChunkedStorage> {
427        // Make sure the file or directory is valid
428        if !self.verify_metadata(hash, chunk_hashes, relative_files) {
429            return Err(Error::GeodeNeedsGc);
430        }
431
432        let chunked = if relative_files.is_empty() {
433            // File
434            let file_size = (chunk_hashes.len() * MAX_CHUNK_SIZE) as u64; // Upper bound, not actual file size
435            ChunkedStorage::new(chunk_hashes, &[(path.to_path_buf(), file_size)], false)
436        } else {
437            // Directory
438            let files: Vec<_> = relative_files
439                .iter()
440                .map(|(file_path, size)| (path.join(file_path), *size))
441                .collect();
442            ChunkedStorage::new(chunk_hashes, &files, true)
443        };
444
445        Ok(chunked)
446    }
447
448    /// Fetch a single chunk from Geode. Returns a Vec containing the chunk content
449    /// if it is found.
450    /// The returned chunk is NOT verified.
451    pub async fn get_chunk(
452        &self,
453        chunked: &mut ChunkedStorage,
454        chunk_hash: &blake3::Hash,
455    ) -> Result<Vec<u8>> {
456        info!(target: "geode::get_chunk", "[Geode] Getting chunk {}", hash_to_string(chunk_hash));
457
458        // Get the chunk index in the file from the chunk hash
459        let chunk_index = match chunked.iter().position(|c| c.hash == *chunk_hash) {
460            Some(index) => index,
461            None => return Err(Error::GeodeChunkNotFound),
462        };
463
464        // Read the file to get the chunk content
465        let chunk = self.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await?;
466
467        Ok(chunk)
468    }
469
470    /// Read the file at `file_path` to get its chunk with index `chunk_index`.
471    /// Returns the chunk content in a Vec.
472    pub async fn read_chunk(
473        &self,
474        mut stream: impl AsyncRead + Unpin + AsyncSeek,
475        chunk_index: &usize,
476    ) -> Result<Vec<u8>> {
477        let position = (*chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
478        let mut buf = vec![0u8; MAX_CHUNK_SIZE];
479        stream.seek(SeekFrom::Start(position)).await?;
480        let bytes_read = stream.read(&mut buf).await?;
481        Ok(buf[..bytes_read].to_vec())
482    }
483
484    /// Verifies that the file hash matches the chunk hashes.
485    pub fn verify_metadata(
486        &self,
487        hash: &blake3::Hash,
488        chunk_hashes: &[blake3::Hash],
489        files: &[(PathBuf, u64)],
490    ) -> bool {
491        info!(target: "geode::verify_metadata", "[Geode] Verifying metadata for {}", hash_to_string(hash));
492        let mut hasher = blake3::Hasher::new();
493        self.hash_chunks_metadata(&mut hasher, chunk_hashes);
494        self.hash_files_metadata(&mut hasher, files);
495        *hash == hasher.finalize()
496    }
497
498    /// Verifies that the chunk hash matches the content.
499    pub fn verify_chunk(&self, chunk_hash: &blake3::Hash, chunk_slice: &[u8]) -> bool {
500        info!(target: "geode::verify_chunk", "[Geode] Verifying chunk {}", hash_to_string(chunk_hash));
501        blake3::hash(chunk_slice) == *chunk_hash
502    }
503}