Skip to main content

objectstore_service/backend/
in_memory.rs

1//! In-memory backend for tests.
2//!
3//! This provides a [`Backend`](super::common::Backend) backed by a `HashMap`,
4//! removing the need for filesystem tempdir management in unit tests. The
5//! backend is [`Clone`] so tests can hold a handle for direct inspection while
6//! the service owns a boxed copy.
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::{Arc, Mutex};
10use std::time::SystemTime;
11
12use bytes::{Bytes, BytesMut};
13use futures_util::TryStreamExt;
14use objectstore_types::metadata::Metadata;
15
16use super::common::{
17    DeleteResponse, GetResponse, HighVolumeBackend, MultipartUploadBackend, PutResponse, TieredGet,
18    TieredMetadata, TieredWrite, Tombstone,
19};
20use crate::error::{Error, Result};
21use crate::id::ObjectId;
22use crate::multipart::{
23    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
24    ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
25};
26use crate::stream::ClientStream;
27
28/// An entry in the in-memory store.
29#[derive(Clone, Debug)]
30enum StoreEntry {
31    Object(Metadata, Bytes),
32    Tombstone(Tombstone),
33}
34
35type Store = HashMap<ObjectId, StoreEntry>;
36
37#[derive(Clone, Debug)]
38struct MultipartUpload {
39    metadata: Metadata,
40    parts: BTreeMap<PartNumber, UploadedPart>,
41}
42
43#[derive(Clone, Debug)]
44struct UploadedPart {
45    etag: String,
46    data: Bytes,
47    uploaded_at: SystemTime,
48}
49
50type MultipartStore = HashMap<(ObjectId, UploadId), MultipartUpload>;
51
52/// In-memory [`Backend`](super::common::Backend) backed by a `HashMap`.
53///
54/// Removes the need for filesystem tempdir management in unit tests. The
55/// backend is [`Clone`] so tests can hold a handle for direct inspection while
56/// the service owns a boxed copy.
57#[derive(Debug, Clone)]
58pub struct InMemoryBackend {
59    name: &'static str,
60    store: Arc<Mutex<Store>>,
61    multipart_store: Arc<Mutex<MultipartStore>>,
62}
63
64impl InMemoryBackend {
65    /// Creates a new `InMemoryBackend` with the given diagnostic `name`.
66    pub fn new(name: &'static str) -> Self {
67        Self {
68            name,
69            store: Arc::new(Mutex::new(HashMap::new())),
70            multipart_store: Arc::new(Mutex::new(HashMap::new())),
71        }
72    }
73
74    /// Returns the stored entry for `id`, for direct inspection in tests.
75    pub fn get(&self, id: &ObjectId) -> Entry {
76        match self.store.lock().unwrap().get(id).cloned() {
77            None => Entry::NotFound,
78            Some(StoreEntry::Tombstone(tombstone)) => Entry::Tombstone(tombstone),
79            Some(StoreEntry::Object(metadata, bytes)) => Entry::Object(metadata, bytes),
80        }
81    }
82
83    /// Returns `true` if the backend contains an entry for the given id.
84    pub fn contains(&self, id: &ObjectId) -> bool {
85        self.store.lock().unwrap().contains_key(id)
86    }
87
88    /// Returns `true` if the backend has no stored objects.
89    pub fn is_empty(&self) -> bool {
90        self.store.lock().unwrap().is_empty()
91    }
92
93    /// Removes an entry directly, bypassing the `Backend` trait.
94    ///
95    /// Useful for simulating partial failures (e.g. orphan tombstones).
96    pub fn remove(&self, id: &ObjectId) {
97        self.store.lock().unwrap().remove(id);
98    }
99}
100
101#[async_trait::async_trait]
102impl super::common::Backend for InMemoryBackend {
103    fn name(&self) -> &'static str {
104        self.name
105    }
106
107    async fn put_object(
108        &self,
109        id: &ObjectId,
110        metadata: &Metadata,
111        stream: ClientStream,
112    ) -> Result<PutResponse> {
113        let bytes: BytesMut = stream.try_collect().await?;
114        self.store.lock().unwrap().insert(
115            id.clone(),
116            StoreEntry::Object(metadata.clone(), bytes.freeze()),
117        );
118        Ok(())
119    }
120
121    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
122        let entry = self.store.lock().unwrap().get(id).cloned();
123        match entry {
124            None => Ok(None),
125            Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone),
126            Some(StoreEntry::Object(mut metadata, bytes)) => {
127                metadata.size = Some(bytes.len());
128                let stream = crate::stream::single(bytes);
129                Ok(Some((metadata, stream)))
130            }
131        }
132    }
133
134    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
135        self.store.lock().unwrap().remove(id);
136        Ok(())
137    }
138}
139
140#[async_trait::async_trait]
141impl HighVolumeBackend for InMemoryBackend {
142    async fn put_non_tombstone(
143        &self,
144        id: &ObjectId,
145        metadata: &Metadata,
146        payload: Bytes,
147    ) -> Result<Option<Tombstone>> {
148        let mut store = self.store.lock().unwrap();
149        if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
150            return Ok(Some(tombstone));
151        }
152
153        let mut metadata = metadata.clone();
154        metadata.size = Some(payload.len());
155        store.insert(id.clone(), StoreEntry::Object(metadata, payload));
156        Ok(None)
157    }
158
159    async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
160        let entry = self.store.lock().unwrap().get(id).cloned();
161        Ok(match entry {
162            None => TieredGet::NotFound,
163            Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone),
164            Some(StoreEntry::Object(mut metadata, bytes)) => {
165                metadata.size = Some(bytes.len());
166                TieredGet::Object(metadata, crate::stream::single(bytes))
167            }
168        })
169    }
170
171    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
172        let entry = self.store.lock().unwrap().get(id).cloned();
173        Ok(match entry {
174            None => TieredMetadata::NotFound,
175            Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone),
176            Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata),
177        })
178    }
179
180    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
181        let mut store = self.store.lock().unwrap();
182        if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
183            return Ok(Some(tombstone));
184        }
185
186        store.remove(id);
187        Ok(None)
188    }
189
190    async fn compare_and_write(
191        &self,
192        id: &ObjectId,
193        current: Option<&ObjectId>,
194        write: TieredWrite,
195    ) -> Result<bool> {
196        let mut store = self.store.lock().unwrap();
197
198        let actual = store.get(id);
199        let matches_current = matches_redirect(actual, current);
200        let matches_next = matches_redirect(actual, write.target());
201
202        if matches_current {
203            match write {
204                TieredWrite::Tombstone(tombstone) => {
205                    store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
206                }
207                TieredWrite::Object(metadata, payload) => {
208                    store.insert(id.clone(), StoreEntry::Object(metadata, payload));
209                }
210                TieredWrite::Delete => {
211                    store.remove(id);
212                }
213            }
214        }
215
216        Ok(matches_current || matches_next)
217    }
218}
219
220#[async_trait::async_trait]
221impl MultipartUploadBackend for InMemoryBackend {
222    async fn initiate_multipart(
223        &self,
224        id: &ObjectId,
225        metadata: &Metadata,
226    ) -> Result<InitiateMultipartResponse> {
227        let upload_id = uuid::Uuid::now_v7().to_string();
228        let upload = MultipartUpload {
229            metadata: metadata.clone(),
230            parts: BTreeMap::new(),
231        };
232        self.multipart_store
233            .lock()
234            .unwrap()
235            .insert((id.clone(), upload_id.clone()), upload);
236        Ok(upload_id)
237    }
238
239    async fn upload_part(
240        &self,
241        id: &ObjectId,
242        upload_id: &UploadId,
243        part_number: PartNumber,
244        _content_length: u64,
245        _content_md5: Option<&str>,
246        body: ClientStream,
247    ) -> Result<UploadPartResponse> {
248        let data: BytesMut = body.try_collect().await?;
249        let data = data.freeze();
250        let etag = format!("\"etag-{part_number}-{}\"", data.len());
251
252        let mut store = self.multipart_store.lock().unwrap();
253        let upload = store
254            .get_mut(&(id.clone(), upload_id.clone()))
255            .ok_or_else(|| Error::generic("multipart upload not found"))?;
256
257        upload.parts.insert(
258            part_number,
259            UploadedPart {
260                etag: etag.clone(),
261                data,
262                uploaded_at: SystemTime::now(),
263            },
264        );
265
266        Ok(etag)
267    }
268
269    async fn list_parts(
270        &self,
271        id: &ObjectId,
272        upload_id: &UploadId,
273        max_parts: Option<u32>,
274        part_number_marker: Option<PartNumber>,
275    ) -> Result<ListPartsResponse> {
276        let store = self.multipart_store.lock().unwrap();
277        let upload = store
278            .get(&(id.clone(), upload_id.clone()))
279            .ok_or_else(|| Error::generic("multipart upload not found"))?;
280
281        let iter = upload
282            .parts
283            .iter()
284            .filter(|(pn, _)| part_number_marker.is_none_or(|marker| **pn > marker));
285
286        let max = max_parts.unwrap_or(u32::MAX) as usize;
287        let all: Vec<_> = iter.collect();
288        let is_truncated = all.len() > max;
289        let page: Vec<_> = all.into_iter().take(max).collect();
290
291        let next_part_number_marker = if is_truncated {
292            page.last().map(|(pn, _)| **pn)
293        } else {
294            None
295        };
296
297        let parts = page
298            .into_iter()
299            .map(|(pn, part)| Part {
300                part_number: *pn,
301                etag: part.etag.clone(),
302                last_modified: part.uploaded_at,
303                size: part.data.len() as u64,
304            })
305            .collect();
306
307        Ok(ListPartsResponse {
308            parts,
309            is_truncated,
310            next_part_number_marker,
311        })
312    }
313
314    async fn abort_multipart(
315        &self,
316        id: &ObjectId,
317        upload_id: &UploadId,
318    ) -> Result<AbortMultipartResponse> {
319        self.multipart_store
320            .lock()
321            .unwrap()
322            .remove(&(id.clone(), upload_id.clone()));
323        Ok(())
324    }
325
326    async fn complete_multipart(
327        &self,
328        id: &ObjectId,
329        upload_id: &UploadId,
330        parts: Vec<CompletedPart>,
331    ) -> Result<CompleteMultipartResponse> {
332        let key = (id.clone(), upload_id.clone());
333
334        // TODO: validate that parts are in ascending part_number order and reject with
335        // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant.
336
337        // Validate and assemble while holding the multipart lock, but don't
338        // remove the upload yet — a failed validation must leave it intact so
339        // the client can retry.
340        let assembled = {
341            let store = self.multipart_store.lock().unwrap();
342            let upload = store
343                .get(&key)
344                .ok_or_else(|| Error::generic("multipart upload not found"))?;
345
346            for completed in &parts {
347                match upload.parts.get(&completed.part_number) {
348                    None => {
349                        return Ok(Some(crate::multipart::CompleteMultipartError {
350                            code: "InvalidPart".into(),
351                            message: format!(
352                                "part number {} was not uploaded",
353                                completed.part_number
354                            ),
355                        }));
356                    }
357                    Some(stored) if stored.etag != completed.etag => {
358                        return Ok(Some(crate::multipart::CompleteMultipartError {
359                            code: "InvalidPart".into(),
360                            message: format!(
361                                "etag mismatch for part {}: expected {}, got {}",
362                                completed.part_number, stored.etag, completed.etag
363                            ),
364                        }));
365                    }
366                    _ => {}
367                }
368            }
369
370            let mut payload = BytesMut::new();
371            for completed in &parts {
372                let stored = &upload.parts[&completed.part_number];
373                payload.extend_from_slice(&stored.data);
374            }
375
376            let mut metadata = upload.metadata.clone();
377            metadata.size = Some(payload.len());
378
379            (metadata, payload.freeze())
380        };
381
382        self.store
383            .lock()
384            .unwrap()
385            .insert(id.clone(), StoreEntry::Object(assembled.0, assembled.1));
386
387        self.multipart_store.lock().unwrap().remove(&key);
388
389        Ok(None)
390    }
391}
392
393/// Returns `true` if `entry` matches the expected tombstone redirect state.
394///
395/// - `expected = None`: matches any non-tombstone (absent or inline object).
396/// - `expected = Some(target)`: matches a tombstone whose redirect target equals `target`.
397fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
398    match expected {
399        None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
400        Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
401    }
402}
403
404/// Type returned by [`InMemoryBackend::get`] for direct inspection of stored entries.
405#[derive(Clone, Debug)]
406pub enum Entry {
407    /// No entry exists at this key.
408    NotFound,
409    /// A real object with its metadata and payload bytes.
410    Object(Metadata, Bytes),
411    /// A redirect tombstone indicating the real object lives in the long-term backend.
412    Tombstone(Tombstone),
413}
414
415impl Entry {
416    /// Returns `true` if the entry is [`Entry::NotFound`].
417    pub fn is_not_found(&self) -> bool {
418        matches!(self, Entry::NotFound)
419    }
420
421    /// Returns `true` if the entry is [`Entry::Object`].
422    pub fn is_object(&self) -> bool {
423        matches!(self, Entry::Object(_, _))
424    }
425
426    /// Returns `true` if the entry is [`Entry::Tombstone`].
427    pub fn is_tombstone(&self) -> bool {
428        matches!(self, Entry::Tombstone(_))
429    }
430
431    /// Panics unless the entry is [`Entry::NotFound`].
432    pub fn expect_not_found(&self) {
433        match self {
434            Entry::NotFound => (),
435            _ => panic!("expected not found entry, got {:?}", self),
436        }
437    }
438
439    /// Returns the metadata and payload bytes, panicking if the entry is not [`Entry::Object`].
440    pub fn expect_object(&self) -> (Metadata, Bytes) {
441        match self {
442            Entry::Object(metadata, bytes) => (metadata.clone(), bytes.clone()),
443            _ => panic!("expected object entry, got {:?}", self),
444        }
445    }
446
447    /// Returns the tombstone, panicking if the entry is not [`Entry::Tombstone`].
448    pub fn expect_tombstone(&self) -> Tombstone {
449        match self {
450            Entry::Tombstone(tombstone) => tombstone.clone(),
451            _ => panic!("expected tombstone entry, got {:?}", self),
452        }
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use std::time::Duration;
459
460    use objectstore_types::metadata::ExpirationPolicy;
461    use objectstore_types::scope::{Scope, Scopes};
462
463    use super::*;
464    use crate::backend::common::Backend;
465    use crate::id::ObjectContext;
466    use crate::stream;
467
468    fn make_id() -> ObjectId {
469        ObjectId::random(ObjectContext {
470            usecase: "testing".into(),
471            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
472        })
473    }
474
475    #[tokio::test]
476    async fn multipart_single_part() {
477        let backend = InMemoryBackend::new("test");
478        let id = make_id();
479        let metadata = Metadata {
480            content_type: "text/plain".into(),
481            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
482            origin: Some("203.0.113.42".into()),
483            custom: [("foo".into(), "bar".into())].into(),
484            ..Default::default()
485        };
486
487        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
488
489        let data = b"hello, multipart world!";
490        let etag = backend
491            .upload_part(
492                &id,
493                &upload_id,
494                1,
495                data.len() as u64,
496                None,
497                stream::single(data.to_vec()),
498            )
499            .await
500            .unwrap();
501
502        let result = backend
503            .complete_multipart(
504                &id,
505                &upload_id,
506                vec![CompletedPart {
507                    part_number: 1,
508                    etag,
509                }],
510            )
511            .await
512            .unwrap();
513        assert!(result.is_none(), "expected no error on complete");
514
515        let (meta, body) = backend.get_object(&id).await.unwrap().unwrap();
516        let payload = stream::read_to_vec(body).await.unwrap();
517        assert_eq!(payload, data);
518        assert_eq!(meta.content_type, "text/plain".to_string());
519        assert_eq!(
520            meta.expiration_policy,
521            ExpirationPolicy::TimeToIdle(Duration::from_secs(3600))
522        );
523        assert_eq!(meta.origin, Some("203.0.113.42".into()));
524        assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
525    }
526
527    #[tokio::test]
528    async fn multipart_multiple_parts() {
529        let backend = InMemoryBackend::new("test");
530        let id = make_id();
531        let metadata = Metadata::default();
532
533        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
534
535        let part1 = b"aaaa".to_vec();
536        let part2 = b"bbbb".to_vec();
537        let part3 = b"cc".to_vec();
538
539        let etag1 = backend
540            .upload_part(
541                &id,
542                &upload_id,
543                1,
544                part1.len() as u64,
545                None,
546                stream::single(part1.clone()),
547            )
548            .await
549            .unwrap();
550        let etag2 = backend
551            .upload_part(
552                &id,
553                &upload_id,
554                2,
555                part2.len() as u64,
556                None,
557                stream::single(part2.clone()),
558            )
559            .await
560            .unwrap();
561        let etag3 = backend
562            .upload_part(
563                &id,
564                &upload_id,
565                3,
566                part3.len() as u64,
567                None,
568                stream::single(part3.clone()),
569            )
570            .await
571            .unwrap();
572
573        let result = backend
574            .complete_multipart(
575                &id,
576                &upload_id,
577                vec![
578                    CompletedPart {
579                        part_number: 1,
580                        etag: etag1,
581                    },
582                    CompletedPart {
583                        part_number: 2,
584                        etag: etag2,
585                    },
586                    CompletedPart {
587                        part_number: 3,
588                        etag: etag3,
589                    },
590                ],
591            )
592            .await
593            .unwrap();
594        assert!(result.is_none());
595
596        let (_, body) = backend.get_object(&id).await.unwrap().unwrap();
597        let payload = stream::read_to_vec(body).await.unwrap();
598        assert_eq!(payload, b"aaaabbbbcc");
599    }
600
601    #[tokio::test]
602    async fn multipart_list_parts() {
603        let backend = InMemoryBackend::new("test");
604        let id = make_id();
605        let metadata = Metadata::default();
606
607        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
608
609        let etag1 = backend
610            .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec()))
611            .await
612            .unwrap();
613        let etag2 = backend
614            .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec()))
615            .await
616            .unwrap();
617
618        let list = backend
619            .list_parts(&id, &upload_id, None, None)
620            .await
621            .unwrap();
622        assert_eq!(list.parts.len(), 2);
623        assert_eq!(list.parts[0].part_number, 1);
624        assert_eq!(list.parts[0].etag, etag1);
625        assert_eq!(list.parts[0].size, 3);
626        assert_eq!(list.parts[1].part_number, 2);
627        assert_eq!(list.parts[1].etag, etag2);
628        assert_eq!(list.parts[1].size, 3);
629
630        // Pagination
631        let page1 = backend
632            .list_parts(&id, &upload_id, Some(1), None)
633            .await
634            .unwrap();
635        assert_eq!(page1.parts.len(), 1);
636        assert_eq!(page1.parts[0].part_number, 1);
637        assert!(page1.is_truncated);
638        assert!(page1.next_part_number_marker.is_some());
639
640        let page2 = backend
641            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
642            .await
643            .unwrap();
644        assert_eq!(page2.parts.len(), 1);
645        assert_eq!(page2.parts[0].part_number, 2);
646
647        backend.abort_multipart(&id, &upload_id).await.unwrap();
648    }
649
650    #[tokio::test]
651    async fn multipart_abort() {
652        let backend = InMemoryBackend::new("test");
653        let id = make_id();
654        let metadata = Metadata::default();
655
656        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
657
658        backend
659            .upload_part(
660                &id,
661                &upload_id,
662                1,
663                5,
664                None,
665                stream::single(b"hello".to_vec()),
666            )
667            .await
668            .unwrap();
669
670        backend.abort_multipart(&id, &upload_id).await.unwrap();
671
672        let result = backend.get_object(&id).await.unwrap();
673        assert!(result.is_none(), "object should not exist after abort");
674    }
675
676    #[tokio::test]
677    async fn multipart_invalid_etag() {
678        let backend = InMemoryBackend::new("test");
679        let id = make_id();
680        let metadata = Metadata::default();
681
682        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
683
684        let etag = backend
685            .upload_part(
686                &id,
687                &upload_id,
688                1,
689                5,
690                None,
691                stream::single(b"hello".to_vec()),
692            )
693            .await
694            .unwrap();
695
696        let result = backend
697            .complete_multipart(
698                &id,
699                &upload_id,
700                vec![CompletedPart {
701                    part_number: 1,
702                    etag: "wrong-etag".into(),
703                }],
704            )
705            .await
706            .unwrap();
707        assert!(result.is_some(), "expected error for bad etag");
708        assert_eq!(result.unwrap().code, "InvalidPart");
709
710        // Upload must survive a failed complete so the client can retry.
711        let result = backend
712            .complete_multipart(
713                &id,
714                &upload_id,
715                vec![CompletedPart {
716                    part_number: 1,
717                    etag,
718                }],
719            )
720            .await
721            .unwrap();
722        assert!(result.is_none(), "retry with correct etag should succeed");
723    }
724
725    #[tokio::test]
726    async fn multipart_missing_part() {
727        let backend = InMemoryBackend::new("test");
728        let id = make_id();
729        let metadata = Metadata::default();
730
731        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
732
733        let etag = backend
734            .upload_part(
735                &id,
736                &upload_id,
737                1,
738                5,
739                None,
740                stream::single(b"hello".to_vec()),
741            )
742            .await
743            .unwrap();
744
745        let result = backend
746            .complete_multipart(
747                &id,
748                &upload_id,
749                vec![CompletedPart {
750                    part_number: 99,
751                    etag: "whatever".into(),
752                }],
753            )
754            .await
755            .unwrap();
756        assert!(result.is_some(), "expected error for missing part");
757        assert_eq!(result.unwrap().code, "InvalidPart");
758
759        // Upload must survive a failed complete so the client can retry.
760        let result = backend
761            .complete_multipart(
762                &id,
763                &upload_id,
764                vec![CompletedPart {
765                    part_number: 1,
766                    etag,
767                }],
768            )
769            .await
770            .unwrap();
771        assert!(result.is_none(), "retry with correct part should succeed");
772    }
773}