objectstore_service/backend/
local_fs.rs1use std::io::ErrorKind;
4use std::path::PathBuf;
5use std::pin::pin;
6
7use futures_util::StreamExt;
8use objectstore_types::metadata::Metadata;
9use tokio::fs::OpenOptions;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
11use tokio_util::io::{ReaderStream, StreamReader};
12
13use crate::backend::common::{Backend, DeleteResponse, GetResponse, PutResponse};
14use crate::error::{Error, Result};
15use crate::id::ObjectId;
16use crate::stream::{self, ClientStream};
17
18#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
31pub struct FileSystemConfig {
32 pub path: PathBuf,
46}
47
48#[derive(Debug)]
50pub struct LocalFsBackend {
51 path: PathBuf,
52}
53
54impl LocalFsBackend {
55 pub fn new(config: FileSystemConfig) -> Self {
57 Self { path: config.path }
58 }
59}
60
61#[async_trait::async_trait]
62impl Backend for LocalFsBackend {
63 fn name(&self) -> &'static str {
64 "local-fs"
65 }
66
67 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
68 async fn put_object(
69 &self,
70 id: &ObjectId,
71 metadata: &Metadata,
72 stream: ClientStream,
73 ) -> Result<PutResponse> {
74 let path = self.path.join(id.as_storage_path().to_string());
75 objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend");
76 tokio::fs::create_dir_all(path.parent().unwrap()).await?;
77 let file = OpenOptions::new()
78 .create(true)
79 .write(true)
80 .truncate(true)
81 .open(path)
82 .await?;
83
84 let mut reader = pin!(StreamReader::new(stream));
85 let mut writer = BufWriter::new(file);
86
87 let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
88 context: "failed to serialize metadata".to_string(),
89 cause,
90 })?;
91 writer.write_all(metadata_json.as_bytes()).await?;
92 writer.write_all(b"\n").await?;
93
94 tokio::io::copy(&mut reader, &mut writer)
95 .await
96 .map_err(|e| match stream::unpack_client_error(&e) {
97 Some(ce) => Error::Client(ce),
98 None => e.into(),
99 })?;
100
101 writer.flush().await?;
102 let file = writer.into_inner();
103 file.sync_data().await?;
104 drop(file);
105
106 Ok(())
107 }
108
109 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
111 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
112 objectstore_log::debug!("Reading from local_fs backend");
113 let path = self.path.join(id.as_storage_path().to_string());
114 let file = match OpenOptions::new().read(true).open(path).await {
115 Ok(file) => file,
116 Err(err) if err.kind() == ErrorKind::NotFound => {
117 objectstore_log::debug!("Object not found");
118 return Ok(None);
119 }
120 err => err?,
121 };
122
123 let mut reader = BufReader::new(file);
124 let mut metadata_line = String::new();
125 reader.read_line(&mut metadata_line).await?;
126 let file_len = reader.get_ref().metadata().await?.len();
127 let mut metadata: Metadata =
128 serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde {
129 context: "failed to deserialize metadata".to_string(),
130 cause,
131 })?;
132 let payload_size = file_len
133 .checked_sub(metadata_line.len() as u64)
134 .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?;
135 metadata.size = Some(payload_size as usize);
136
137 let stream = ReaderStream::new(reader);
138 Ok(Some((metadata, stream.boxed())))
139 }
140
141 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
142 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
143 objectstore_log::debug!("Deleting from local_fs backend");
144 let path = self.path.join(id.as_storage_path().to_string());
145 let result = tokio::fs::remove_file(path).await;
146 if let Err(e) = &result
147 && e.kind() == ErrorKind::NotFound
148 {
149 objectstore_log::debug!("Object not found");
150 }
151 Ok(result?)
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use std::time::{Duration, SystemTime};
158
159 use bytes::BytesMut;
160 use futures_util::TryStreamExt;
161 use objectstore_types::metadata::{Compression, ExpirationPolicy};
162 use objectstore_types::scope::{Scope, Scopes};
163
164 use super::*;
165 use crate::id::ObjectContext;
166 use crate::stream;
167
168 #[tokio::test]
169 async fn stores_metadata() {
170 let tempdir = tempfile::tempdir().unwrap();
171 let backend = LocalFsBackend::new(FileSystemConfig {
172 path: tempdir.path().to_path_buf(),
173 });
174
175 let id = ObjectId::random(ObjectContext {
176 usecase: "testing".into(),
177 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
178 });
179
180 let metadata = Metadata {
181 content_type: "text/plain".into(),
182 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
183 time_created: Some(SystemTime::now()),
184 time_expires: None,
185 compression: Some(Compression::Zstd),
186 origin: Some("203.0.113.42".into()),
187 custom: [("foo".into(), "bar".into())].into(),
188 size: None,
189 };
190 backend
191 .put_object(&id, &metadata, stream::single("oh hai!"))
192 .await
193 .unwrap();
194
195 let (read_metadata, stream) = backend.get_object(&id).await.unwrap().unwrap();
196 let file_contents: BytesMut = stream.try_collect().await.unwrap();
197
198 assert_eq!(
199 read_metadata,
200 Metadata {
201 size: Some(file_contents.len()),
202 ..metadata
203 }
204 );
205 assert_eq!(file_contents.as_ref(), b"oh hai!");
206 }
207
208 #[tokio::test]
209 async fn get_metadata_returns_metadata() {
210 let tempdir = tempfile::tempdir().unwrap();
211 let backend = LocalFsBackend::new(FileSystemConfig {
212 path: tempdir.path().to_path_buf(),
213 });
214
215 let id = ObjectId::random(ObjectContext {
216 usecase: "testing".into(),
217 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
218 });
219
220 let metadata = Metadata {
221 content_type: "text/plain".into(),
222 compression: Some(Compression::Zstd),
223 origin: Some("203.0.113.42".into()),
224 custom: [("foo".into(), "bar".into())].into(),
225 ..Default::default()
226 };
227 backend
228 .put_object(&id, &metadata, stream::single("oh hai!"))
229 .await
230 .unwrap();
231
232 let read_metadata = backend.get_metadata(&id).await.unwrap().unwrap();
233 assert_eq!(
234 read_metadata,
235 Metadata {
236 size: Some(7),
237 ..metadata
238 }
239 );
240 }
241
242 #[tokio::test]
243 async fn get_metadata_nonexistent() {
244 let tempdir = tempfile::tempdir().unwrap();
245 let backend = LocalFsBackend::new(FileSystemConfig {
246 path: tempdir.path().to_path_buf(),
247 });
248
249 let id = ObjectId::random(ObjectContext {
250 usecase: "testing".into(),
251 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
252 });
253
254 let result = backend.get_metadata(&id).await.unwrap();
255 assert!(result.is_none());
256 }
257}