1#![warn(missing_docs)]
6#![warn(missing_debug_implementations)]
7
8mod backend;
10mod error;
11pub use error::{ServiceError, ServiceResult};
12pub mod id;
13
14use std::path::Path;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19use bytes::{Bytes, BytesMut};
20use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
21use objectstore_types::Metadata;
22
23use crate::backend::common::BoxedBackend;
24use crate::id::{ObjectContext, ObjectId};
25
26const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; enum BackendChoice {
30 HighVolume,
31 LongTerm,
32}
33
34pub type GetResponse = Option<(Metadata, PayloadStream)>;
36pub type InsertResponse = ObjectId;
38pub type DeleteResponse = ();
40
41#[derive(Clone, Debug)]
44pub struct StorageService(Arc<StorageServiceInner>);
45
46#[derive(Debug)]
47struct StorageServiceInner {
48 high_volume_backend: BoxedBackend,
49 long_term_backend: BoxedBackend,
50}
51
52pub type PayloadStream = BoxStream<'static, std::io::Result<Bytes>>;
54
55#[derive(Debug, Clone)]
57pub enum StorageConfig<'a> {
58 FileSystem {
60 path: &'a Path,
62 },
63 S3Compatible {
65 endpoint: &'a str,
67 bucket: &'a str,
69 },
70 Gcs {
72 endpoint: Option<&'a str>,
76 bucket: &'a str,
78 },
79 BigTable {
81 endpoint: Option<&'a str>,
85 project_id: &'a str,
87 instance_name: &'a str,
89 table_name: &'a str,
91 connections: Option<usize>,
95 },
96}
97
98impl StorageService {
99 pub async fn new(
101 high_volume_config: StorageConfig<'_>,
102 long_term_config: StorageConfig<'_>,
103 ) -> anyhow::Result<Self> {
104 let high_volume_backend = create_backend(high_volume_config).await?;
105 let long_term_backend = create_backend(long_term_config).await?;
106
107 let inner = StorageServiceInner {
108 high_volume_backend,
109 long_term_backend,
110 };
111 Ok(Self(Arc::new(inner)))
112 }
113
114 pub async fn insert_object(
119 &self,
120 context: ObjectContext,
121 key: Option<String>,
122 metadata: &Metadata,
123 mut stream: PayloadStream,
124 ) -> ServiceResult<InsertResponse> {
125 let start = Instant::now();
126
127 let mut first_chunk = BytesMut::new();
128 let mut backend = BackendChoice::HighVolume;
129 while let Some(chunk) = stream.try_next().await? {
130 first_chunk.extend_from_slice(&chunk);
131
132 if first_chunk.len() > BACKEND_SIZE_THRESHOLD {
133 backend = BackendChoice::LongTerm;
134 break;
135 }
136 }
137
138 let has_key = key.is_some();
139 let id = ObjectId::optional(context, key);
140
141 if has_key {
143 let previously_stored_object = self.0.high_volume_backend.get_object(&id).await?;
144 if is_tombstoned(&previously_stored_object) {
145 backend = BackendChoice::LongTerm;
147 }
148 };
149
150 let (backend_choice, backend_ty, stored_size) = match backend {
151 BackendChoice::HighVolume => {
152 let stored_size = first_chunk.len() as u64;
153 let stream = futures_util::stream::once(async { Ok(first_chunk.into()) }).boxed();
154
155 self.0
156 .high_volume_backend
157 .put_object(&id, metadata, stream)
158 .await?;
159 (
160 "high-volume",
161 self.0.high_volume_backend.name(),
162 stored_size,
163 )
164 }
165 BackendChoice::LongTerm => {
166 let stored_size = Arc::new(AtomicU64::new(0));
167 let stream = futures_util::stream::once(async { Ok(first_chunk.into()) })
168 .chain(stream)
169 .inspect({
170 let stored_size = Arc::clone(&stored_size);
171 move |res| {
172 if let Ok(chunk) = res {
173 stored_size.fetch_add(chunk.len() as u64, Ordering::Relaxed);
174 }
175 }
176 })
177 .boxed();
178
179 self.0
181 .long_term_backend
182 .put_object(&id, metadata, stream)
183 .await?;
184
185 let redirect_metadata = Metadata {
186 is_redirect_tombstone: Some(true),
187 expiration_policy: metadata.expiration_policy,
188 ..Default::default()
189 };
190 let redirect_stream = futures_util::stream::empty().boxed();
191 let redirect_request =
192 self.0
193 .high_volume_backend
194 .put_object(&id, &redirect_metadata, redirect_stream);
195
196 let redirect_result = redirect_request.await;
198 if redirect_result.is_err() {
199 self.0.long_term_backend.delete_object(&id).await?;
201 }
202 redirect_result?;
203
204 (
205 "long-term",
206 self.0.long_term_backend.name(),
207 stored_size.load(Ordering::Acquire),
208 )
209 }
210 };
211
212 merni::distribution!(
213 "put.latency"@s: start.elapsed(),
214 "usecase" => id.usecase(),
215 "backend_choice" => backend_choice,
216 "backend_type" => backend_ty
217 );
218 merni::distribution!(
219 "put.size"@b: stored_size,
220 "usecase" => id.usecase(),
221 "backend_choice" => backend_choice,
222 "backend_type" => backend_ty
223 );
224
225 Ok(id)
226 }
227
228 pub async fn get_object(&self, id: &ObjectId) -> ServiceResult<GetResponse> {
230 let start = Instant::now();
231
232 let mut backend_choice = "high-volume";
233 let mut backend_type = self.0.high_volume_backend.name();
234 let mut result = self.0.high_volume_backend.get_object(id).await?;
235
236 if is_tombstoned(&result) {
237 result = self.0.long_term_backend.get_object(id).await?;
238 backend_choice = "long-term";
239 backend_type = self.0.long_term_backend.name();
240 }
241
242 merni::distribution!(
243 "get.latency.pre-response"@s: start.elapsed(),
244 "usecase" => id.usecase(),
245 "backend_choice" => backend_choice,
246 "backend_type" => backend_type
247 );
248
249 if let Some((metadata, _stream)) = &result {
250 if let Some(size) = metadata.size {
251 merni::distribution!(
252 "get.size"@b: size,
253 "usecase" => id.usecase(),
254 "backend_choice" => backend_choice,
255 "backend_type" => backend_type
256 );
257 } else {
258 tracing::warn!(?backend_type, "Missing object size");
259 }
260 }
261
262 Ok(result)
263 }
264
265 pub async fn delete_object(&self, id: &ObjectId) -> ServiceResult<DeleteResponse> {
267 let start = Instant::now();
268
269 if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(id).await? {
270 if metadata.is_redirect_tombstone == Some(true) {
271 self.0.long_term_backend.delete_object(id).await?;
272 }
273 self.0.high_volume_backend.delete_object(id).await?;
274 }
275
276 merni::distribution!(
277 "delete.latency"@s: start.elapsed(),
278 "usecase" => id.usecase()
279 );
280
281 Ok(())
282 }
283}
284
285fn is_tombstoned(result: &GetResponse) -> bool {
286 matches!(
287 result,
288 Some((
289 Metadata {
290 is_redirect_tombstone: Some(true),
291 ..
292 },
293 _
294 ))
295 )
296}
297
298async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
299 Ok(match config {
300 StorageConfig::FileSystem { path } => {
301 Box::new(backend::local_fs::LocalFsBackend::new(path))
302 }
303 StorageConfig::S3Compatible { endpoint, bucket } => Box::new(
304 backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket),
305 ),
306 StorageConfig::Gcs { endpoint, bucket } => {
307 Box::new(backend::gcs::GcsBackend::new(endpoint, bucket).await?)
308 }
309 StorageConfig::BigTable {
310 endpoint,
311 project_id,
312 instance_name,
313 table_name,
314 connections,
315 } => Box::new(
316 backend::bigtable::BigTableBackend::new(
317 endpoint,
318 project_id,
319 instance_name,
320 table_name,
321 connections,
322 )
323 .await?,
324 ),
325 })
326}
327
328#[cfg(test)]
329mod tests {
330 use bytes::BytesMut;
331 use futures_util::{StreamExt, TryStreamExt};
332
333 use objectstore_types::scope::{Scope, Scopes};
334
335 use super::*;
336
337 fn make_stream(contents: &[u8]) -> PayloadStream {
338 tokio_stream::once(Ok(contents.to_vec().into())).boxed()
339 }
340
341 fn make_context() -> ObjectContext {
342 ObjectContext {
343 usecase: "testing".into(),
344 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
345 }
346 }
347
348 #[tokio::test]
349 async fn stores_files() {
350 let tempdir = tempfile::tempdir().unwrap();
351 let config = StorageConfig::FileSystem {
352 path: tempdir.path(),
353 };
354 let service = StorageService::new(config.clone(), config).await.unwrap();
355
356 let key = service
357 .insert_object(
358 make_context(),
359 Some("testing".into()),
360 &Default::default(),
361 make_stream(b"oh hai!"),
362 )
363 .await
364 .unwrap();
365
366 let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
367 let file_contents: BytesMut = stream.try_collect().await.unwrap();
368
369 assert_eq!(file_contents.as_ref(), b"oh hai!");
370 }
371
372 #[tokio::test]
373 async fn works_with_gcs() {
374 let config = StorageConfig::Gcs {
375 endpoint: Some("http://localhost:8087"),
376 bucket: "test-bucket", };
378 let service = StorageService::new(config.clone(), config).await.unwrap();
379
380 let key = service
381 .insert_object(
382 make_context(),
383 Some("testing".into()),
384 &Default::default(),
385 make_stream(b"oh hai!"),
386 )
387 .await
388 .unwrap();
389
390 let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
391 let file_contents: BytesMut = stream.try_collect().await.unwrap();
392
393 assert_eq!(file_contents.as_ref(), b"oh hai!");
394 }
395}