objectstore_service/backend/
in_memory.rs

1//! In-memory backend for tests.
2//!
3//! This provides a [`Backend`](super::common::Backend) backed by a `HashMap`,
4//! removing the need for filesystem tempdir management in unit tests. The
5//! backend is [`Clone`] so tests can hold a handle for direct inspection while
6//! the service owns a boxed copy.
7
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11use bytes::{Bytes, BytesMut};
12use futures_util::TryStreamExt;
13use objectstore_types::metadata::Metadata;
14
15use super::common::{
16    DeleteResponse, GetResponse, HighVolumeBackend, PutResponse, TieredGet, TieredMetadata,
17    TieredWrite, Tombstone,
18};
19use crate::error::{Error, Result};
20use crate::id::ObjectId;
21use crate::stream::ClientStream;
22
23/// An entry in the in-memory store.
24#[derive(Clone, Debug)]
25enum StoreEntry {
26    Object(Metadata, Bytes),
27    Tombstone(Tombstone),
28}
29
30type Store = HashMap<ObjectId, StoreEntry>;
31
32/// In-memory [`Backend`](super::common::Backend) backed by a `HashMap`.
33///
34/// Removes the need for filesystem tempdir management in unit tests. The
35/// backend is [`Clone`] so tests can hold a handle for direct inspection while
36/// the service owns a boxed copy.
37#[derive(Debug, Clone)]
38pub struct InMemoryBackend {
39    name: &'static str,
40    store: Arc<Mutex<Store>>,
41}
42
43impl InMemoryBackend {
44    /// Creates a new `InMemoryBackend` with the given diagnostic `name`.
45    pub fn new(name: &'static str) -> Self {
46        Self {
47            name,
48            store: Arc::new(Mutex::new(HashMap::new())),
49        }
50    }
51
52    /// Returns the stored entry for `id`, for direct inspection in tests.
53    pub fn get(&self, id: &ObjectId) -> Entry {
54        match self.store.lock().unwrap().get(id).cloned() {
55            None => Entry::NotFound,
56            Some(StoreEntry::Tombstone(tombstone)) => Entry::Tombstone(tombstone),
57            Some(StoreEntry::Object(metadata, bytes)) => Entry::Object(metadata, bytes),
58        }
59    }
60
61    /// Returns `true` if the backend contains an entry for the given id.
62    pub fn contains(&self, id: &ObjectId) -> bool {
63        self.store.lock().unwrap().contains_key(id)
64    }
65
66    /// Returns `true` if the backend has no stored objects.
67    pub fn is_empty(&self) -> bool {
68        self.store.lock().unwrap().is_empty()
69    }
70
71    /// Removes an entry directly, bypassing the `Backend` trait.
72    ///
73    /// Useful for simulating partial failures (e.g. orphan tombstones).
74    pub fn remove(&self, id: &ObjectId) {
75        self.store.lock().unwrap().remove(id);
76    }
77}
78
79#[async_trait::async_trait]
80impl super::common::Backend for InMemoryBackend {
81    fn name(&self) -> &'static str {
82        self.name
83    }
84
85    async fn put_object(
86        &self,
87        id: &ObjectId,
88        metadata: &Metadata,
89        stream: ClientStream,
90    ) -> Result<PutResponse> {
91        let bytes: BytesMut = stream.try_collect().await?;
92        self.store.lock().unwrap().insert(
93            id.clone(),
94            StoreEntry::Object(metadata.clone(), bytes.freeze()),
95        );
96        Ok(())
97    }
98
99    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
100        let entry = self.store.lock().unwrap().get(id).cloned();
101        match entry {
102            None => Ok(None),
103            Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone),
104            Some(StoreEntry::Object(mut metadata, bytes)) => {
105                metadata.size = Some(bytes.len());
106                let stream = crate::stream::single(bytes);
107                Ok(Some((metadata, stream)))
108            }
109        }
110    }
111
112    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
113        self.store.lock().unwrap().remove(id);
114        Ok(())
115    }
116}
117
118#[async_trait::async_trait]
119impl HighVolumeBackend for InMemoryBackend {
120    async fn put_non_tombstone(
121        &self,
122        id: &ObjectId,
123        metadata: &Metadata,
124        payload: Bytes,
125    ) -> Result<Option<Tombstone>> {
126        let mut store = self.store.lock().unwrap();
127        if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
128            return Ok(Some(tombstone));
129        }
130
131        let mut metadata = metadata.clone();
132        metadata.size = Some(payload.len());
133        store.insert(id.clone(), StoreEntry::Object(metadata, payload));
134        Ok(None)
135    }
136
137    async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
138        let entry = self.store.lock().unwrap().get(id).cloned();
139        Ok(match entry {
140            None => TieredGet::NotFound,
141            Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone),
142            Some(StoreEntry::Object(mut metadata, bytes)) => {
143                metadata.size = Some(bytes.len());
144                TieredGet::Object(metadata, crate::stream::single(bytes))
145            }
146        })
147    }
148
149    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
150        let entry = self.store.lock().unwrap().get(id).cloned();
151        Ok(match entry {
152            None => TieredMetadata::NotFound,
153            Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone),
154            Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata),
155        })
156    }
157
158    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
159        let mut store = self.store.lock().unwrap();
160        if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
161            return Ok(Some(tombstone));
162        }
163
164        store.remove(id);
165        Ok(None)
166    }
167
168    async fn compare_and_write(
169        &self,
170        id: &ObjectId,
171        current: Option<&ObjectId>,
172        write: TieredWrite,
173    ) -> Result<bool> {
174        let mut store = self.store.lock().unwrap();
175
176        let actual = store.get(id);
177        let matches_current = matches_redirect(actual, current);
178        let matches_next = matches_redirect(actual, write.target());
179
180        if matches_current {
181            match write {
182                TieredWrite::Tombstone(tombstone) => {
183                    store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
184                }
185                TieredWrite::Object(metadata, payload) => {
186                    store.insert(id.clone(), StoreEntry::Object(metadata, payload));
187                }
188                TieredWrite::Delete => {
189                    store.remove(id);
190                }
191            }
192        }
193
194        Ok(matches_current || matches_next)
195    }
196}
197
198/// Returns `true` if `entry` matches the expected tombstone redirect state.
199///
200/// - `expected = None`: matches any non-tombstone (absent or inline object).
201/// - `expected = Some(target)`: matches a tombstone whose redirect target equals `target`.
202fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
203    match expected {
204        None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
205        Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
206    }
207}
208
209/// Type returned by [`InMemoryBackend::get`] for direct inspection of stored entries.
210#[derive(Clone, Debug)]
211pub enum Entry {
212    /// No entry exists at this key.
213    NotFound,
214    /// A real object with its metadata and payload bytes.
215    Object(Metadata, Bytes),
216    /// A redirect tombstone indicating the real object lives in the long-term backend.
217    Tombstone(Tombstone),
218}
219
220impl Entry {
221    /// Returns `true` if the entry is [`Entry::NotFound`].
222    pub fn is_not_found(&self) -> bool {
223        matches!(self, Entry::NotFound)
224    }
225
226    /// Returns `true` if the entry is [`Entry::Object`].
227    pub fn is_object(&self) -> bool {
228        matches!(self, Entry::Object(_, _))
229    }
230
231    /// Returns `true` if the entry is [`Entry::Tombstone`].
232    pub fn is_tombstone(&self) -> bool {
233        matches!(self, Entry::Tombstone(_))
234    }
235
236    /// Panics unless the entry is [`Entry::NotFound`].
237    pub fn expect_not_found(&self) {
238        match self {
239            Entry::NotFound => (),
240            _ => panic!("expected not found entry, got {:?}", self),
241        }
242    }
243
244    /// Returns the metadata and payload bytes, panicking if the entry is not [`Entry::Object`].
245    pub fn expect_object(&self) -> (Metadata, Bytes) {
246        match self {
247            Entry::Object(metadata, bytes) => (metadata.clone(), bytes.clone()),
248            _ => panic!("expected object entry, got {:?}", self),
249        }
250    }
251
252    /// Returns the tombstone, panicking if the entry is not [`Entry::Tombstone`].
253    pub fn expect_tombstone(&self) -> Tombstone {
254        match self {
255            Entry::Tombstone(tombstone) => tombstone.clone(),
256            _ => panic!("expected tombstone entry, got {:?}", self),
257        }
258    }
259}