1#![warn(missing_docs)]
6#![warn(missing_debug_implementations)]
7
8mod backend;
10mod error;
11pub use error::{ServiceError, ServiceResult};
12pub mod id;
13
14use std::path::Path;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19use bytes::{Bytes, BytesMut};
20use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
21use objectstore_types::Metadata;
22
23use crate::backend::common::BoxedBackend;
24use crate::id::{ObjectContext, ObjectId};
25
26const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; enum BackendChoice {
30 HighVolume,
31 LongTerm,
32}
33
34#[derive(Clone, Debug)]
37pub struct StorageService(Arc<StorageServiceInner>);
38
39#[derive(Debug)]
40struct StorageServiceInner {
41 high_volume_backend: BoxedBackend,
42 long_term_backend: BoxedBackend,
43}
44
45pub type PayloadStream = BoxStream<'static, std::io::Result<Bytes>>;
47
48#[derive(Debug, Clone)]
50pub enum StorageConfig<'a> {
51 FileSystem {
53 path: &'a Path,
55 },
56 S3Compatible {
58 endpoint: &'a str,
60 bucket: &'a str,
62 },
63 Gcs {
65 endpoint: Option<&'a str>,
69 bucket: &'a str,
71 },
72 BigTable {
74 endpoint: Option<&'a str>,
78 project_id: &'a str,
80 instance_name: &'a str,
82 table_name: &'a str,
84 connections: Option<usize>,
88 },
89}
90
91impl StorageService {
92 pub async fn new(
94 high_volume_config: StorageConfig<'_>,
95 long_term_config: StorageConfig<'_>,
96 ) -> anyhow::Result<Self> {
97 let high_volume_backend = create_backend(high_volume_config).await?;
98 let long_term_backend = create_backend(long_term_config).await?;
99
100 let inner = StorageServiceInner {
101 high_volume_backend,
102 long_term_backend,
103 };
104 Ok(Self(Arc::new(inner)))
105 }
106
107 pub async fn insert_object(
112 &self,
113 context: ObjectContext,
114 key: Option<String>,
115 metadata: &Metadata,
116 mut stream: PayloadStream,
117 ) -> ServiceResult<ObjectId> {
118 let start = Instant::now();
119
120 let mut first_chunk = BytesMut::new();
121 let mut backend = BackendChoice::HighVolume;
122 while let Some(chunk) = stream.try_next().await? {
123 first_chunk.extend_from_slice(&chunk);
124
125 if first_chunk.len() > BACKEND_SIZE_THRESHOLD {
126 backend = BackendChoice::LongTerm;
127 break;
128 }
129 }
130
131 let has_key = key.is_some();
132 let id = ObjectId::optional(context, key);
133
134 if has_key {
136 let previously_stored_object = self.0.high_volume_backend.get_object(&id).await?;
137 if is_tombstoned(&previously_stored_object) {
138 backend = BackendChoice::LongTerm;
140 }
141 };
142
143 let (backend_choice, backend_ty, stored_size) = match backend {
144 BackendChoice::HighVolume => {
145 let stored_size = first_chunk.len() as u64;
146 let stream = futures_util::stream::once(async { Ok(first_chunk.into()) }).boxed();
147
148 self.0
149 .high_volume_backend
150 .put_object(&id, metadata, stream)
151 .await?;
152 (
153 "high-volume",
154 self.0.high_volume_backend.name(),
155 stored_size,
156 )
157 }
158 BackendChoice::LongTerm => {
159 let stored_size = Arc::new(AtomicU64::new(0));
160 let stream = futures_util::stream::once(async { Ok(first_chunk.into()) })
161 .chain(stream)
162 .inspect({
163 let stored_size = Arc::clone(&stored_size);
164 move |res| {
165 if let Ok(chunk) = res {
166 stored_size.fetch_add(chunk.len() as u64, Ordering::Relaxed);
167 }
168 }
169 })
170 .boxed();
171
172 self.0
174 .long_term_backend
175 .put_object(&id, metadata, stream)
176 .await?;
177
178 let redirect_metadata = Metadata {
179 is_redirect_tombstone: Some(true),
180 expiration_policy: metadata.expiration_policy,
181 ..Default::default()
182 };
183 let redirect_stream = futures_util::stream::empty().boxed();
184 let redirect_request =
185 self.0
186 .high_volume_backend
187 .put_object(&id, &redirect_metadata, redirect_stream);
188
189 let redirect_result = redirect_request.await;
191 if redirect_result.is_err() {
192 self.0.long_term_backend.delete_object(&id).await?;
194 }
195 redirect_result?;
196
197 (
198 "long-term",
199 self.0.long_term_backend.name(),
200 stored_size.load(Ordering::Acquire),
201 )
202 }
203 };
204
205 merni::distribution!(
206 "put.latency"@s: start.elapsed(),
207 "usecase" => id.usecase(),
208 "backend_choice" => backend_choice,
209 "backend_type" => backend_ty
210 );
211 merni::distribution!(
212 "put.size"@b: stored_size,
213 "usecase" => id.usecase(),
214 "backend_choice" => backend_choice,
215 "backend_type" => backend_ty
216 );
217
218 Ok(id)
219 }
220
221 pub async fn get_object(
223 &self,
224 id: &ObjectId,
225 ) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
226 let start = Instant::now();
227
228 let mut backend_choice = "high-volume";
229 let mut backend_type = self.0.high_volume_backend.name();
230 let mut result = self.0.high_volume_backend.get_object(id).await?;
231
232 if is_tombstoned(&result) {
233 result = self.0.long_term_backend.get_object(id).await?;
234 backend_choice = "long-term";
235 backend_type = self.0.long_term_backend.name();
236 }
237
238 merni::distribution!(
239 "get.latency.pre-response"@s: start.elapsed(),
240 "usecase" => id.usecase(),
241 "backend_choice" => backend_choice,
242 "backend_type" => backend_type
243 );
244
245 if let Some((metadata, _stream)) = &result {
246 if let Some(size) = metadata.size {
247 merni::distribution!(
248 "get.size"@b: size,
249 "usecase" => id.usecase(),
250 "backend_choice" => backend_choice,
251 "backend_type" => backend_type
252 );
253 } else {
254 tracing::warn!(?backend_type, "Missing object size");
255 }
256 }
257
258 Ok(result)
259 }
260
261 pub async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
263 let start = Instant::now();
264
265 if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(id).await? {
266 if metadata.is_redirect_tombstone == Some(true) {
267 self.0.long_term_backend.delete_object(id).await?;
268 }
269 self.0.high_volume_backend.delete_object(id).await?;
270 }
271
272 merni::distribution!(
273 "delete.latency"@s: start.elapsed(),
274 "usecase" => id.usecase()
275 );
276
277 Ok(())
278 }
279}
280
281fn is_tombstoned(result: &Option<(Metadata, PayloadStream)>) -> bool {
282 matches!(
283 result,
284 Some((
285 Metadata {
286 is_redirect_tombstone: Some(true),
287 ..
288 },
289 _
290 ))
291 )
292}
293
294async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
295 Ok(match config {
296 StorageConfig::FileSystem { path } => {
297 Box::new(backend::local_fs::LocalFsBackend::new(path))
298 }
299 StorageConfig::S3Compatible { endpoint, bucket } => Box::new(
300 backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket),
301 ),
302 StorageConfig::Gcs { endpoint, bucket } => {
303 Box::new(backend::gcs::GcsBackend::new(endpoint, bucket).await?)
304 }
305 StorageConfig::BigTable {
306 endpoint,
307 project_id,
308 instance_name,
309 table_name,
310 connections,
311 } => Box::new(
312 backend::bigtable::BigTableBackend::new(
313 endpoint,
314 project_id,
315 instance_name,
316 table_name,
317 connections,
318 )
319 .await?,
320 ),
321 })
322}
323
324#[cfg(test)]
325mod tests {
326 use bytes::BytesMut;
327 use futures_util::{StreamExt, TryStreamExt};
328
329 use objectstore_types::scope::{Scope, Scopes};
330
331 use super::*;
332
333 fn make_stream(contents: &[u8]) -> PayloadStream {
334 tokio_stream::once(Ok(contents.to_vec().into())).boxed()
335 }
336
337 fn make_context() -> ObjectContext {
338 ObjectContext {
339 usecase: "testing".into(),
340 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
341 }
342 }
343
344 #[tokio::test]
345 async fn stores_files() {
346 let tempdir = tempfile::tempdir().unwrap();
347 let config = StorageConfig::FileSystem {
348 path: tempdir.path(),
349 };
350 let service = StorageService::new(config.clone(), config).await.unwrap();
351
352 let key = service
353 .insert_object(
354 make_context(),
355 Some("testing".into()),
356 &Default::default(),
357 make_stream(b"oh hai!"),
358 )
359 .await
360 .unwrap();
361
362 let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
363 let file_contents: BytesMut = stream.try_collect().await.unwrap();
364
365 assert_eq!(file_contents.as_ref(), b"oh hai!");
366 }
367
368 #[tokio::test]
369 async fn works_with_gcs() {
370 let config = StorageConfig::Gcs {
371 endpoint: Some("http://localhost:8087"),
372 bucket: "test-bucket", };
374 let service = StorageService::new(config.clone(), config).await.unwrap();
375
376 let key = service
377 .insert_object(
378 make_context(),
379 Some("testing".into()),
380 &Default::default(),
381 make_stream(b"oh hai!"),
382 )
383 .await
384 .unwrap();
385
386 let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap();
387 let file_contents: BytesMut = stream.try_collect().await.unwrap();
388
389 assert_eq!(file_contents.as_ref(), b"oh hai!");
390 }
391}