objectstore_service/backend/
local_fs.rs

1//! Local filesystem backend for development and testing.
2
3use std::io::ErrorKind;
4use std::path::PathBuf;
5use std::pin::pin;
6
7use futures_util::StreamExt;
8use objectstore_types::metadata::Metadata;
9use tokio::fs::OpenOptions;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
11use tokio_util::io::{ReaderStream, StreamReader};
12
13use crate::backend::common::{Backend, DeleteResponse, GetResponse, PutResponse};
14use crate::error::{Error, Result};
15use crate::id::ObjectId;
16use crate::stream::{self, ClientStream};
17
18/// Configuration for [`LocalFsBackend`].
19///
20/// Stores objects as files on the local filesystem. Suitable for development, testing,
21/// and single-server deployments.
22///
23/// # Example
24///
25/// ```yaml
26/// storage:
27///   type: filesystem
28///   path: /data
29/// ```
30#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
31pub struct FileSystemConfig {
32    /// Directory path for storing objects.
33    ///
34    /// The directory will be created if it doesn't exist. Relative paths are resolved from
35    /// the server's working directory.
36    ///
37    /// # Default
38    ///
39    /// `"data"` (relative to the server's working directory)
40    ///
41    /// # Environment Variables
42    ///
43    /// - `OS__STORAGE__TYPE=filesystem`
44    /// - `OS__STORAGE__PATH=/path/to/storage`
45    pub path: PathBuf,
46}
47
48/// Local filesystem backend for development and testing.
49#[derive(Debug)]
50pub struct LocalFsBackend {
51    path: PathBuf,
52}
53
54impl LocalFsBackend {
55    /// Creates a new [`LocalFsBackend`] rooted at the directory in `config`.
56    pub fn new(config: FileSystemConfig) -> Self {
57        Self { path: config.path }
58    }
59}
60
61#[async_trait::async_trait]
62impl Backend for LocalFsBackend {
63    fn name(&self) -> &'static str {
64        "local-fs"
65    }
66
67    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
68    async fn put_object(
69        &self,
70        id: &ObjectId,
71        metadata: &Metadata,
72        stream: ClientStream,
73    ) -> Result<PutResponse> {
74        let path = self.path.join(id.as_storage_path().to_string());
75        objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend");
76        tokio::fs::create_dir_all(path.parent().unwrap()).await?;
77        let file = OpenOptions::new()
78            .create(true)
79            .write(true)
80            .truncate(true)
81            .open(path)
82            .await?;
83
84        let mut reader = pin!(StreamReader::new(stream));
85        let mut writer = BufWriter::new(file);
86
87        let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
88            context: "failed to serialize metadata".to_string(),
89            cause,
90        })?;
91        writer.write_all(metadata_json.as_bytes()).await?;
92        writer.write_all(b"\n").await?;
93
94        tokio::io::copy(&mut reader, &mut writer)
95            .await
96            .map_err(|e| match stream::unpack_client_error(&e) {
97                Some(ce) => Error::Client(ce),
98                None => e.into(),
99            })?;
100
101        writer.flush().await?;
102        let file = writer.into_inner();
103        file.sync_data().await?;
104        drop(file);
105
106        Ok(())
107    }
108
109    // TODO: Return `Ok(None)` if object is found but past expiry
110    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
111    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
112        objectstore_log::debug!("Reading from local_fs backend");
113        let path = self.path.join(id.as_storage_path().to_string());
114        let file = match OpenOptions::new().read(true).open(path).await {
115            Ok(file) => file,
116            Err(err) if err.kind() == ErrorKind::NotFound => {
117                objectstore_log::debug!("Object not found");
118                return Ok(None);
119            }
120            err => err?,
121        };
122
123        let mut reader = BufReader::new(file);
124        let mut metadata_line = String::new();
125        reader.read_line(&mut metadata_line).await?;
126        let file_len = reader.get_ref().metadata().await?.len();
127        let mut metadata: Metadata =
128            serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde {
129                context: "failed to deserialize metadata".to_string(),
130                cause,
131            })?;
132        let payload_size = file_len
133            .checked_sub(metadata_line.len() as u64)
134            .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?;
135        metadata.size = Some(payload_size as usize);
136
137        let stream = ReaderStream::new(reader);
138        Ok(Some((metadata, stream.boxed())))
139    }
140
141    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
142    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
143        objectstore_log::debug!("Deleting from local_fs backend");
144        let path = self.path.join(id.as_storage_path().to_string());
145        let result = tokio::fs::remove_file(path).await;
146        if let Err(e) = &result
147            && e.kind() == ErrorKind::NotFound
148        {
149            objectstore_log::debug!("Object not found");
150        }
151        Ok(result?)
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use std::time::{Duration, SystemTime};
158
159    use bytes::BytesMut;
160    use futures_util::TryStreamExt;
161    use objectstore_types::metadata::{Compression, ExpirationPolicy};
162    use objectstore_types::scope::{Scope, Scopes};
163
164    use super::*;
165    use crate::id::ObjectContext;
166    use crate::stream;
167
168    #[tokio::test]
169    async fn stores_metadata() {
170        let tempdir = tempfile::tempdir().unwrap();
171        let backend = LocalFsBackend::new(FileSystemConfig {
172            path: tempdir.path().to_path_buf(),
173        });
174
175        let id = ObjectId::random(ObjectContext {
176            usecase: "testing".into(),
177            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
178        });
179
180        let metadata = Metadata {
181            content_type: "text/plain".into(),
182            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
183            time_created: Some(SystemTime::now()),
184            time_expires: None,
185            compression: Some(Compression::Zstd),
186            origin: Some("203.0.113.42".into()),
187            custom: [("foo".into(), "bar".into())].into(),
188            size: None,
189        };
190        backend
191            .put_object(&id, &metadata, stream::single("oh hai!"))
192            .await
193            .unwrap();
194
195        let (read_metadata, stream) = backend.get_object(&id).await.unwrap().unwrap();
196        let file_contents: BytesMut = stream.try_collect().await.unwrap();
197
198        assert_eq!(
199            read_metadata,
200            Metadata {
201                size: Some(file_contents.len()),
202                ..metadata
203            }
204        );
205        assert_eq!(file_contents.as_ref(), b"oh hai!");
206    }
207
208    #[tokio::test]
209    async fn get_metadata_returns_metadata() {
210        let tempdir = tempfile::tempdir().unwrap();
211        let backend = LocalFsBackend::new(FileSystemConfig {
212            path: tempdir.path().to_path_buf(),
213        });
214
215        let id = ObjectId::random(ObjectContext {
216            usecase: "testing".into(),
217            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
218        });
219
220        let metadata = Metadata {
221            content_type: "text/plain".into(),
222            compression: Some(Compression::Zstd),
223            origin: Some("203.0.113.42".into()),
224            custom: [("foo".into(), "bar".into())].into(),
225            ..Default::default()
226        };
227        backend
228            .put_object(&id, &metadata, stream::single("oh hai!"))
229            .await
230            .unwrap();
231
232        let read_metadata = backend.get_metadata(&id).await.unwrap().unwrap();
233        assert_eq!(
234            read_metadata,
235            Metadata {
236                size: Some(7),
237                ..metadata
238            }
239        );
240    }
241
242    #[tokio::test]
243    async fn get_metadata_nonexistent() {
244        let tempdir = tempfile::tempdir().unwrap();
245        let backend = LocalFsBackend::new(FileSystemConfig {
246            path: tempdir.path().to_path_buf(),
247        });
248
249        let id = ObjectId::random(ObjectContext {
250            usecase: "testing".into(),
251            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
252        });
253
254        let result = backend.get_metadata(&id).await.unwrap();
255        assert!(result.is_none());
256    }
257}