darkfi/geode/mod.rs
1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Chunk-based file storage implementation.
20//! This is a building block for a DHT or something similar.
21//!
22//! The API supports file/directory insertion and retrieval. There is
23//! intentionally no `remove` support. File removal should be handled
24//! externally, and then it is only required to run `garbage_collect()` to
25//! clean things up.
26//!
27//! The hash of a file is the BLAKE3 hash of hashed chunks in the correct
28//! order.
29//! The hash of a directory is the BLAKE3 hash of hashed chunks in the correct
30//! order and the ordered list of (file path, file sizes).
31//! All hashes (file, directory, chunk) are 32 bytes long, and are encoded in
32//! base58 whenever necessary.
33//!
34//! The filesystem hierarchy stores a `files` directory storing metadata
35//! about a full file and a `directories` directory storing metadata about all
36//! files in a directory (all subdirectories included).
37//! The filename of a file in `files` or `directories` is the hash of the
38//! file/directory as defined above.
39//! Inside a file in `files` is the ordered list of the chunks making up the
40//! full file.
41//! Inside a file in `directories` is the ordered list of the chunks making up
42//! each full file, and the (relative) file path and hash of all files in the
43//! directory.
44//!
45//! To get the chunks you split the full file into `MAX_CHUNK_SIZE` sized
46//! slices, the last chunk is the only one that can be smaller than that.
47//!
48//! It might look like the following:
49//! ```
50//! /files/B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX
51//! /files/8nA3ndjFFee3n5wMPLZampLpGaMJi3od4MSyaXPDoF91
52//! /files/...
53//! /directories/FXDduPcEohVzsSxtNVSFU64qtYxEVEHBMkF4k5cBvt3B
54//! /directories/AHjU1LizfGqsGnF8VSa9kphSQ5pqS4YjmPqme5RZajsj
55//! /directories/...
56//! ```
57//!
58//! Inside a file metadata (file in `files`) is the ordered list of chunk
59//! hashes, for example:
60//! ```
61//! 2bQPxSR8Frz7S7JW3DRAzEtkrHfLXB1CN65V7az77pUp
62//! CvjvN6MfWQYK54DgKNR7MPgFSZqsCgpWKF2p8ot66CCP
63//! ```
64//!
65//! Inside a directory metadata (file in `directories`) is, in addition to
66//! chunk hashes, the path and size of each file in the directory. For example:
67//! ```
68//! 8Kb55jeqJsq7WTBN93gvBzh2zmXAXVPh111VqD3Hi42V
69//! GLiBqpLPTbpJhSMYfzi3s7WivrTViov7ShX7uso6fG5s
70//! picture.jpg 312948
71//! ```
72//! Chunks of a directory can include multiple files, if multiple files fit
73//! into `MAX_CHUNK_SIZE`. The chunks are computed as if all the files were
74//! concatenated into a single big file, to minimize the number of chunks.
75//!
76//! The full file is not copied, and individual chunks are not stored by
77//! geode. Additionally it does not keep track of the full files path.
78
79use std::{
80 collections::HashSet,
81 path::{Path, PathBuf},
82};
83
84use futures::{AsyncRead, AsyncSeek};
85use smol::{
86 fs::{self, File},
87 io::{
88 AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor, ErrorKind,
89 SeekFrom,
90 },
91 stream::StreamExt,
92};
93use tracing::{debug, info, warn};
94
95use crate::{Error, Result};
96
97mod chunked_storage;
98pub use chunked_storage::{Chunk, ChunkedStorage};
99
100mod file_sequence;
101pub use file_sequence::FileSequence;
102
103mod util;
104pub use util::{hash_to_string, read_until_filled};
105
106/// Defined maximum size of a stored chunk (256 KiB)
107pub const MAX_CHUNK_SIZE: usize = 262_144;
108
109/// Path prefix where file metadata is stored
110const FILES_PATH: &str = "files";
111
112/// Path prefix where directory metadata is stored
113const DIRS_PATH: &str = "directories";
114
115/// Chunk-based file storage interface.
116pub struct Geode {
117 /// Path to the filesystem directory where file metadata is stored
118 pub files_path: PathBuf,
119 /// Path to the filesystem directory where directory metadata is stored
120 pub dirs_path: PathBuf,
121}
122
123impl Geode {
124 /// Instantiate a new [`Geode`] object.
125 /// `base_path` defines the root directory where Geode will store its
126 /// file metadata and chunks.
127 pub async fn new(base_path: &PathBuf) -> Result<Self> {
128 let mut files_path: PathBuf = base_path.into();
129 files_path.push(FILES_PATH);
130 let mut dirs_path: PathBuf = base_path.into();
131 dirs_path.push(DIRS_PATH);
132
133 // Create necessary directory structure if needed
134 fs::create_dir_all(&files_path).await?;
135 fs::create_dir_all(&dirs_path).await?;
136
137 Ok(Self { files_path, dirs_path })
138 }
139
140 /// Attempt to read chunk hashes and files metadata from a given metadata path.
141 /// This works for both file metadata and directory metadata.
142 /// Returns (chunk hashes, [(file path, file size)]).
143 async fn read_metadata(path: &PathBuf) -> Result<(Vec<blake3::Hash>, Vec<(PathBuf, u64)>)> {
144 debug!(target: "geode::read_dir_metadata", "Reading chunks from {path:?} (dir)");
145
146 let mut chunk_hashes = vec![];
147 let mut files = vec![];
148
149 let fd = File::open(path).await?;
150 let mut lines = BufReader::new(fd).lines();
151
152 while let Some(line) = lines.next().await {
153 let line = line?;
154 let line = line.trim();
155
156 if line.is_empty() {
157 continue; // Skip empty lines
158 }
159
160 let parts: Vec<&str> = line.split_whitespace().collect();
161 if parts.len() == 2 {
162 // File
163 let file_path = PathBuf::from(parts[0]);
164 if file_path.clone().is_absolute() {
165 return Err(Error::Custom(format!(
166 "Path of file {} is absolute, which is not allowed",
167 parts[0]
168 )))
169 }
170
171 // Check for `..` in the path components
172 for component in file_path.clone().components() {
173 if component == std::path::Component::ParentDir {
174 return Err(Error::Custom(format!("Path of file {} contains reference to parent dir, which is not allowed", parts[0])))
175 }
176 }
177
178 let file_size = parts[1].parse::<u64>()?;
179 files.push((file_path, file_size));
180 } else if parts.len() == 1 {
181 // Chunk
182 let chunk_hash_str = parts[0].trim();
183 if chunk_hash_str.is_empty() {
184 break; // Stop reading chunk hashes on empty line
185 }
186 let mut hash_buf = [0u8; 32];
187 bs58::decode(chunk_hash_str).onto(&mut hash_buf)?;
188 let chunk_hash = blake3::Hash::from_bytes(hash_buf);
189 chunk_hashes.push(chunk_hash);
190 } else {
191 // Invalid format
192 return Err(Error::Custom("Invalid directory metadata format".to_string()));
193 }
194 }
195
196 Ok((chunk_hashes, files))
197 }
198
199 /// Perform garbage collection over the filesystem hierarchy.
200 /// Returns a set representing deleted files.
201 pub async fn garbage_collect(&self) -> Result<HashSet<blake3::Hash>> {
202 info!(target: "geode::garbage_collect", "[Geode] Performing garbage collection");
203 // We track corrupt files here.
204 let mut deleted_files = HashSet::new();
205
206 // Perform health check over metadata. For now we just ensure they
207 // have the correct format.
208 let file_paths = fs::read_dir(&self.files_path).await?;
209 let dir_paths = fs::read_dir(&self.dirs_path).await?;
210 let mut paths = file_paths.chain(dir_paths);
211 while let Some(file) = paths.next().await {
212 let Ok(entry) = file else { continue };
213 let path = entry.path();
214
215 // Skip if we're not a plain file
216 if !path.is_file() {
217 continue
218 }
219
220 // Make sure that the filename is a BLAKE3 hash
221 let file_name = match path.file_name().and_then(|n| n.to_str()) {
222 Some(v) => v,
223 None => continue,
224 };
225 let mut hash_buf = [0u8; 32];
226 let hash = match bs58::decode(file_name).onto(&mut hash_buf) {
227 Ok(_) => blake3::Hash::from_bytes(hash_buf),
228 Err(_) => continue,
229 };
230
231 // The filename is a BLAKE3 hash. It should contain a newline-separated
232 // list of chunks which represent the full file. If that is not the case
233 // we will consider it a corrupted file and delete it.
234 if Self::read_metadata(&path).await.is_err() {
235 if let Err(e) = fs::remove_file(path).await {
236 warn!(
237 target: "geode::garbage_collect",
238 "[Geode] Garbage collect failed to remove corrupted metadata: {e}"
239 );
240 }
241
242 deleted_files.insert(hash);
243 continue
244 }
245 }
246
247 info!(target: "geode::garbage_collect", "[Geode] Garbage collection finished");
248 Ok(deleted_files)
249 }
250
251 /// Chunk a stream.
252 /// Returns a hasher (containing the chunk hashes), and the list of chunk hashes.
253 pub async fn chunk_stream(
254 &self,
255 mut stream: impl AsyncRead + Unpin,
256 ) -> Result<(blake3::Hasher, Vec<blake3::Hash>)> {
257 let mut hasher = blake3::Hasher::new();
258 let mut chunk_hashes = vec![];
259
260 loop {
261 let mut buf = vec![0u8; MAX_CHUNK_SIZE];
262 let bytes_read = stream.read(&mut buf).await?;
263 if bytes_read == 0 {
264 break
265 }
266
267 let chunk_hash = blake3::hash(&buf[..bytes_read]);
268 hasher.update(chunk_hash.as_bytes());
269 chunk_hashes.push(chunk_hash);
270 }
271
272 Ok((hasher, chunk_hashes))
273 }
274
275 /// Sorts files by their PathBuf.
276 pub fn sort_files(&self, files: &mut [(PathBuf, u64)]) {
277 files.sort_by(|(a, _), (b, _)| a.to_string_lossy().cmp(&b.to_string_lossy()));
278 }
279
280 /// Add chunk hashes to `hasher`.
281 pub fn hash_chunks_metadata(&self, hasher: &mut blake3::Hasher, chunk_hashes: &[blake3::Hash]) {
282 for chunk in chunk_hashes {
283 hasher.update(chunk.as_bytes());
284 }
285 }
286
287 /// Add files metadata to `hasher`.
288 /// You must sort the files using `sort_files`.
289 pub fn hash_files_metadata(
290 &self,
291 hasher: &mut blake3::Hasher,
292 relative_files: &[(PathBuf, u64)],
293 ) {
294 for file in relative_files {
295 hasher.update(file.0.to_string_lossy().to_string().as_bytes());
296 hasher.update(&file.1.to_le_bytes());
297 }
298 }
299
300 /// Create and insert file or directory metadata into Geode.
301 /// Always overwrites any existing file.
302 /// Verifies that the metadata is valid.
303 /// The `relative_files` slice is empty for files.
304 pub async fn insert_metadata(
305 &self,
306 hash: &blake3::Hash,
307 chunk_hashes: &[blake3::Hash],
308 relative_files: &[(PathBuf, u64)],
309 ) -> Result<()> {
310 info!(target: "geode::insert_metadata", "[Geode] Inserting metadata");
311
312 // Verify the metadata
313 if !self.verify_metadata(hash, chunk_hashes, relative_files) {
314 return Err(Error::GeodeNeedsGc)
315 }
316
317 // Write the metadata file
318 let mut file_path = match relative_files.is_empty() {
319 true => self.files_path.clone(),
320 false => self.dirs_path.clone(),
321 };
322 file_path.push(hash_to_string(hash).as_str());
323 let mut file_fd = File::create(&file_path).await?;
324
325 for ch in chunk_hashes {
326 file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?;
327 }
328 for file in relative_files {
329 file_fd.write(format!("{} {}\n", file.0.to_string_lossy(), file.1).as_bytes()).await?;
330 }
331 file_fd.flush().await?;
332
333 Ok(())
334 }
335
336 /// Write a single chunk given a stream.
337 /// The file must be inserted into Geode before calling this method.
338 /// Always overwrites any existing chunk. Returns the chunk hash and
339 /// the number of bytes written to the file system.
340 pub async fn write_chunk(
341 &self,
342 chunked: &mut ChunkedStorage,
343 stream: impl AsRef<[u8]>,
344 ) -> Result<(blake3::Hash, usize)> {
345 info!(target: "geode::write_chunk", "[Geode] Writing single chunk");
346
347 let mut cursor = Cursor::new(&stream);
348 let mut chunk = vec![0u8; MAX_CHUNK_SIZE];
349
350 // Read the stream to get the chunk content
351 let chunk_slice = read_until_filled(&mut cursor, &mut chunk).await?;
352
353 // Get the chunk hash from the content
354 let chunk_hash = blake3::hash(chunk_slice);
355
356 // Get the chunk index in the file/directory from the chunk hash
357 let chunk_index = match chunked.iter().position(|c| c.hash == chunk_hash) {
358 Some(index) => index,
359 None => {
360 return Err(Error::GeodeNeedsGc);
361 }
362 };
363
364 // Compute byte position from the chunk index and the chunk size
365 let position = (chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
366
367 // Seek to the correct position
368 let fileseq = &mut chunked.get_fileseq_mut();
369 fileseq.seek(SeekFrom::Start(position)).await?;
370
371 // This will write the chunk, and truncate files if `chunked` is a directory.
372 let bytes_written = fileseq.write(chunk_slice).await?;
373
374 // If it's the last chunk of a file (and it's *not* a directory),
375 // truncate the file to the correct length.
376 // This is because contrary to directories, we do not know the exact
377 // file size from its metadata, we only know the number of chunks.
378 // Therefore we only know the exact size once we know the size of the
379 // last chunk.
380 // We also update the `FileSequence` to the exact size.
381 if !chunked.is_dir() && chunk_index == chunked.len() - 1 {
382 let exact_file_size =
383 chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - chunk_slice.len());
384 if let Some(file) = &chunked.get_fileseq_mut().get_current_file() {
385 let _ = file.set_len(exact_file_size as u64);
386 }
387 chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
388 }
389
390 Ok((chunk_hash, bytes_written))
391 }
392
393 /// Fetch file/directory metadata from Geode. Returns [`ChunkedStorage`]. Returns an error if
394 /// the read failed in any way (could also be the file does not exist).
395 pub async fn get(&self, hash: &blake3::Hash, path: &Path) -> Result<ChunkedStorage> {
396 let hash_str = hash_to_string(hash);
397 info!(target: "geode::get", "[Geode] Getting chunks for {hash_str}...");
398
399 // Try to read the file or dir metadata. If it's corrupt, return an error signalling
400 // that garbage collection needs to run.
401 let metadata_paths = [self.files_path.join(&hash_str), self.dirs_path.join(&hash_str)];
402 for metadata_path in metadata_paths {
403 match Self::read_metadata(&metadata_path).await {
404 Ok((chunk_hashes, files)) => {
405 return self.create_chunked_storage(hash, path, &chunk_hashes, &files).await
406 }
407 Err(e) => {
408 if !matches!(e, Error::Io(ErrorKind::NotFound)) {
409 return Err(Error::GeodeNeedsGc)
410 }
411 }
412 };
413 }
414
415 Err(Error::GeodeFileNotFound)
416 }
417
418 /// Create a ChunkedStorage from metadata.
419 /// `hash` is the hash of the file or directory.
420 async fn create_chunked_storage(
421 &self,
422 hash: &blake3::Hash,
423 path: &Path,
424 chunk_hashes: &[blake3::Hash],
425 relative_files: &[(PathBuf, u64)], // Only used by directories
426 ) -> Result<ChunkedStorage> {
427 // Make sure the file or directory is valid
428 if !self.verify_metadata(hash, chunk_hashes, relative_files) {
429 return Err(Error::GeodeNeedsGc);
430 }
431
432 let chunked = if relative_files.is_empty() {
433 // File
434 let file_size = (chunk_hashes.len() * MAX_CHUNK_SIZE) as u64; // Upper bound, not actual file size
435 ChunkedStorage::new(chunk_hashes, &[(path.to_path_buf(), file_size)], false)
436 } else {
437 // Directory
438 let files: Vec<_> = relative_files
439 .iter()
440 .map(|(file_path, size)| (path.join(file_path), *size))
441 .collect();
442 ChunkedStorage::new(chunk_hashes, &files, true)
443 };
444
445 Ok(chunked)
446 }
447
448 /// Fetch a single chunk from Geode. Returns a Vec containing the chunk content
449 /// if it is found.
450 /// The returned chunk is NOT verified.
451 pub async fn get_chunk(
452 &self,
453 chunked: &mut ChunkedStorage,
454 chunk_hash: &blake3::Hash,
455 ) -> Result<Vec<u8>> {
456 info!(target: "geode::get_chunk", "[Geode] Getting chunk {}", hash_to_string(chunk_hash));
457
458 // Get the chunk index in the file from the chunk hash
459 let chunk_index = match chunked.iter().position(|c| c.hash == *chunk_hash) {
460 Some(index) => index,
461 None => return Err(Error::GeodeChunkNotFound),
462 };
463
464 // Read the file to get the chunk content
465 let chunk = self.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await?;
466
467 Ok(chunk)
468 }
469
470 /// Read the file at `file_path` to get its chunk with index `chunk_index`.
471 /// Returns the chunk content in a Vec.
472 pub async fn read_chunk(
473 &self,
474 mut stream: impl AsyncRead + Unpin + AsyncSeek,
475 chunk_index: &usize,
476 ) -> Result<Vec<u8>> {
477 let position = (*chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
478 let mut buf = vec![0u8; MAX_CHUNK_SIZE];
479 stream.seek(SeekFrom::Start(position)).await?;
480 let bytes_read = stream.read(&mut buf).await?;
481 Ok(buf[..bytes_read].to_vec())
482 }
483
484 /// Verifies that the file hash matches the chunk hashes.
485 pub fn verify_metadata(
486 &self,
487 hash: &blake3::Hash,
488 chunk_hashes: &[blake3::Hash],
489 files: &[(PathBuf, u64)],
490 ) -> bool {
491 info!(target: "geode::verify_metadata", "[Geode] Verifying metadata for {}", hash_to_string(hash));
492 let mut hasher = blake3::Hasher::new();
493 self.hash_chunks_metadata(&mut hasher, chunk_hashes);
494 self.hash_files_metadata(&mut hasher, files);
495 *hash == hasher.finalize()
496 }
497
498 /// Verifies that the chunk hash matches the content.
499 pub fn verify_chunk(&self, chunk_hash: &blake3::Hash, chunk_slice: &[u8]) -> bool {
500 info!(target: "geode::verify_chunk", "[Geode] Verifying chunk {}", hash_to_string(chunk_hash));
501 blake3::hash(chunk_slice) == *chunk_hash
502 }
503}