1#![warn(missing_docs)]
6#![warn(missing_debug_implementations)]
7
8mod backend;
9mod path;
10
11use bytes::BytesMut;
12use futures_util::{StreamExt, TryStreamExt};
13use objectstore_types::Metadata;
14
15use std::path::Path;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::time::Instant;
19
20use crate::backend::common::{BackendStream, BoxedBackend};
21
22pub use path::*;
23
24const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; enum BackendChoice {
28 HighVolume,
29 LongTerm,
30}
31
32#[derive(Clone, Debug)]
34pub struct StorageService(Arc<StorageServiceInner>);
35
36#[derive(Debug)]
37struct StorageServiceInner {
38 high_volume_backend: BoxedBackend,
39 long_term_backend: BoxedBackend,
40}
41
42#[derive(Debug, Clone)]
44pub enum StorageConfig<'a> {
45 FileSystem {
47 path: &'a Path,
49 },
50 S3Compatible {
52 endpoint: &'a str,
54 bucket: &'a str,
56 },
57 Gcs {
59 endpoint: Option<&'a str>,
63 bucket: &'a str,
65 },
66 BigTable {
68 endpoint: Option<&'a str>,
72 project_id: &'a str,
74 instance_name: &'a str,
76 table_name: &'a str,
78 connections: Option<usize>,
82 },
83}
84
85impl StorageService {
86 pub async fn new(
88 high_volume_config: StorageConfig<'_>,
89 long_term_config: StorageConfig<'_>,
90 ) -> anyhow::Result<Self> {
91 let high_volume_backend = create_backend(high_volume_config).await?;
92 let long_term_backend = create_backend(long_term_config).await?;
93
94 let inner = StorageServiceInner {
95 high_volume_backend,
96 long_term_backend,
97 };
98 Ok(Self(Arc::new(inner)))
99 }
100
101 pub async fn put_object(
103 &self,
104 path: ObjectPath,
105 metadata: &Metadata,
106 mut stream: BackendStream,
107 ) -> anyhow::Result<ObjectPath> {
108 let start = Instant::now();
109
110 let mut first_chunk = BytesMut::new();
111 let mut backend = BackendChoice::HighVolume;
112 while let Some(chunk) = stream.try_next().await? {
113 first_chunk.extend_from_slice(&chunk);
114
115 if first_chunk.len() > BACKEND_SIZE_THRESHOLD {
116 backend = BackendChoice::LongTerm;
117 break;
118 }
119 }
120
121 let previously_stored_object = self.0.high_volume_backend.get_object(&path).await?;
123 if is_tombstoned(&previously_stored_object) {
124 backend = BackendChoice::LongTerm;
126 }
127
128 let (backend_choice, backend_ty, stored_size) = match backend {
129 BackendChoice::HighVolume => {
130 let stored_size = first_chunk.len() as u64;
131 let stream = futures_util::stream::once(async { Ok(first_chunk.into()) }).boxed();
132
133 self.0
134 .high_volume_backend
135 .put_object(&path, metadata, stream)
136 .await?;
137 (
138 "high-volume",
139 self.0.high_volume_backend.name(),
140 stored_size,
141 )
142 }
143 BackendChoice::LongTerm => {
144 let stored_size = Arc::new(AtomicU64::new(0));
145 let stream = futures_util::stream::once(async { Ok(first_chunk.into()) })
146 .chain(stream)
147 .inspect({
148 let stored_size = Arc::clone(&stored_size);
149 move |res| {
150 if let Ok(chunk) = res {
151 stored_size.fetch_add(chunk.len() as u64, Ordering::Relaxed);
152 }
153 }
154 })
155 .boxed();
156
157 self.0
159 .long_term_backend
160 .put_object(&path, metadata, stream)
161 .await?;
162
163 let redirect_metadata = Metadata {
164 is_redirect_tombstone: Some(true),
165 expiration_policy: metadata.expiration_policy,
166 ..Default::default()
167 };
168 let redirect_stream = futures_util::stream::empty().boxed();
169 let redirect_request = self.0.high_volume_backend.put_object(
170 &path,
171 &redirect_metadata,
172 redirect_stream,
173 );
174
175 let redirect_result = redirect_request.await;
177 if redirect_result.is_err() {
178 self.0.long_term_backend.delete_object(&path).await?;
180 }
181 redirect_result?;
182
183 (
184 "long-term",
185 self.0.long_term_backend.name(),
186 stored_size.load(Ordering::Acquire),
187 )
188 }
189 };
190
191 merni::distribution!(
192 "put.latency"@s: start.elapsed(),
193 "usecase" => path.usecase,
194 "backend_choice" => backend_choice,
195 "backend_type" => backend_ty
196 );
197 merni::distribution!(
198 "put.size"@b: stored_size,
199 "usecase" => path.usecase,
200 "backend_choice" => backend_choice,
201 "backend_type" => backend_ty
202 );
203
204 Ok(path)
205 }
206
207 pub async fn get_object(
209 &self,
210 path: &ObjectPath,
211 ) -> anyhow::Result<Option<(Metadata, BackendStream)>> {
212 let start = Instant::now();
213
214 let mut backend_choice = "high-volume";
215 let mut backend_type = self.0.high_volume_backend.name();
216 let mut result = self.0.high_volume_backend.get_object(path).await?;
217
218 if is_tombstoned(&result) {
219 result = self.0.long_term_backend.get_object(path).await?;
220 backend_choice = "long-term";
221 backend_type = self.0.long_term_backend.name();
222 }
223
224 merni::distribution!(
225 "get.latency.pre-response"@s: start.elapsed(),
226 "usecase" => path.usecase,
227 "backend_choice" => backend_choice,
228 "backend_type" => backend_type
229 );
230
231 if let Some((metadata, _stream)) = &result {
232 if let Some(size) = metadata.size {
233 merni::distribution!(
234 "get.size"@b: size,
235 "usecase" => path.usecase,
236 "backend_choice" => backend_choice,
237 "backend_type" => backend_type
238 );
239 } else {
240 tracing::warn!(?backend_type, "Missing object size");
241 }
242 }
243
244 Ok(result)
245 }
246
247 pub async fn delete_object(&self, path: &ObjectPath) -> anyhow::Result<()> {
249 let start = Instant::now();
250
251 if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(path).await? {
252 if metadata.is_redirect_tombstone == Some(true) {
253 self.0.long_term_backend.delete_object(path).await?;
254 }
255 self.0.high_volume_backend.delete_object(path).await?;
256 }
257
258 merni::distribution!(
259 "delete.latency"@s: start.elapsed(),
260 "usecase" => path.usecase
261 );
262
263 Ok(())
264 }
265}
266
267fn is_tombstoned(result: &Option<(Metadata, BackendStream)>) -> bool {
268 matches!(
269 result,
270 Some((
271 Metadata {
272 is_redirect_tombstone: Some(true),
273 ..
274 },
275 _
276 ))
277 )
278}
279
280async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
281 Ok(match config {
282 StorageConfig::FileSystem { path } => {
283 Box::new(backend::local_fs::LocalFsBackend::new(path))
284 }
285 StorageConfig::S3Compatible { endpoint, bucket } => Box::new(
286 backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket),
287 ),
288 StorageConfig::Gcs { endpoint, bucket } => {
289 Box::new(backend::gcs::GcsBackend::new(endpoint, bucket).await?)
290 }
291 StorageConfig::BigTable {
292 endpoint,
293 project_id,
294 instance_name,
295 table_name,
296 connections,
297 } => Box::new(
298 backend::bigtable::BigTableBackend::new(
299 endpoint,
300 project_id,
301 instance_name,
302 table_name,
303 connections,
304 )
305 .await?,
306 ),
307 })
308}
309
310#[cfg(test)]
311mod tests {
312 use bytes::BytesMut;
313 use futures_util::{StreamExt, TryStreamExt};
314
315 use super::*;
316
317 fn make_stream(contents: &[u8]) -> BackendStream {
318 tokio_stream::once(Ok(contents.to_vec().into())).boxed()
319 }
320
321 fn make_path() -> ObjectPath {
322 ObjectPath {
323 usecase: "testing".into(),
324 scope: "testing".into(),
325 key: "testing".into(),
326 }
327 }
328
329 #[tokio::test]
330 async fn stores_files() {
331 let tempdir = tempfile::tempdir().unwrap();
332 let config = StorageConfig::FileSystem {
333 path: tempdir.path(),
334 };
335 let service = StorageService::new(config.clone(), config).await.unwrap();
336
337 let key = service
338 .put_object(make_path(), &Default::default(), make_stream(b"oh hai!"))
339 .await
340 .unwrap();
341
342 let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
343 let file_contents: BytesMut = stream.try_collect().await.unwrap();
344
345 assert_eq!(file_contents.as_ref(), b"oh hai!");
346 }
347
348 #[tokio::test]
349 async fn works_with_gcs() {
350 let config = StorageConfig::Gcs {
351 endpoint: Some("http://localhost:8087"),
352 bucket: "test-bucket", };
354 let service = StorageService::new(config.clone(), config).await.unwrap();
355
356 let key = service
357 .put_object(make_path(), &Default::default(), make_stream(b"oh hai!"))
358 .await
359 .unwrap();
360
361 let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
362 let file_contents: BytesMut = stream.try_collect().await.unwrap();
363
364 assert_eq!(file_contents.as_ref(), b"oh hai!");
365 }
366}