objectstore_service/backend/
in_memory.rs1use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11use bytes::{Bytes, BytesMut};
12use futures_util::TryStreamExt;
13use objectstore_types::metadata::Metadata;
14
15use super::common::{
16 DeleteResponse, GetResponse, HighVolumeBackend, PutResponse, TieredGet, TieredMetadata,
17 TieredWrite, Tombstone,
18};
19use crate::error::{Error, Result};
20use crate::id::ObjectId;
21use crate::stream::ClientStream;
22
23#[derive(Clone, Debug)]
25enum StoreEntry {
26 Object(Metadata, Bytes),
27 Tombstone(Tombstone),
28}
29
30type Store = HashMap<ObjectId, StoreEntry>;
31
32#[derive(Debug, Clone)]
38pub struct InMemoryBackend {
39 name: &'static str,
40 store: Arc<Mutex<Store>>,
41}
42
43impl InMemoryBackend {
44 pub fn new(name: &'static str) -> Self {
46 Self {
47 name,
48 store: Arc::new(Mutex::new(HashMap::new())),
49 }
50 }
51
52 pub fn get(&self, id: &ObjectId) -> Entry {
54 match self.store.lock().unwrap().get(id).cloned() {
55 None => Entry::NotFound,
56 Some(StoreEntry::Tombstone(tombstone)) => Entry::Tombstone(tombstone),
57 Some(StoreEntry::Object(metadata, bytes)) => Entry::Object(metadata, bytes),
58 }
59 }
60
61 pub fn contains(&self, id: &ObjectId) -> bool {
63 self.store.lock().unwrap().contains_key(id)
64 }
65
66 pub fn is_empty(&self) -> bool {
68 self.store.lock().unwrap().is_empty()
69 }
70
71 pub fn remove(&self, id: &ObjectId) {
75 self.store.lock().unwrap().remove(id);
76 }
77}
78
79#[async_trait::async_trait]
80impl super::common::Backend for InMemoryBackend {
81 fn name(&self) -> &'static str {
82 self.name
83 }
84
85 async fn put_object(
86 &self,
87 id: &ObjectId,
88 metadata: &Metadata,
89 stream: ClientStream,
90 ) -> Result<PutResponse> {
91 let bytes: BytesMut = stream.try_collect().await?;
92 self.store.lock().unwrap().insert(
93 id.clone(),
94 StoreEntry::Object(metadata.clone(), bytes.freeze()),
95 );
96 Ok(())
97 }
98
99 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
100 let entry = self.store.lock().unwrap().get(id).cloned();
101 match entry {
102 None => Ok(None),
103 Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone),
104 Some(StoreEntry::Object(mut metadata, bytes)) => {
105 metadata.size = Some(bytes.len());
106 let stream = crate::stream::single(bytes);
107 Ok(Some((metadata, stream)))
108 }
109 }
110 }
111
112 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
113 self.store.lock().unwrap().remove(id);
114 Ok(())
115 }
116}
117
118#[async_trait::async_trait]
119impl HighVolumeBackend for InMemoryBackend {
120 async fn put_non_tombstone(
121 &self,
122 id: &ObjectId,
123 metadata: &Metadata,
124 payload: Bytes,
125 ) -> Result<Option<Tombstone>> {
126 let mut store = self.store.lock().unwrap();
127 if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
128 return Ok(Some(tombstone));
129 }
130
131 let mut metadata = metadata.clone();
132 metadata.size = Some(payload.len());
133 store.insert(id.clone(), StoreEntry::Object(metadata, payload));
134 Ok(None)
135 }
136
137 async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
138 let entry = self.store.lock().unwrap().get(id).cloned();
139 Ok(match entry {
140 None => TieredGet::NotFound,
141 Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone),
142 Some(StoreEntry::Object(mut metadata, bytes)) => {
143 metadata.size = Some(bytes.len());
144 TieredGet::Object(metadata, crate::stream::single(bytes))
145 }
146 })
147 }
148
149 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
150 let entry = self.store.lock().unwrap().get(id).cloned();
151 Ok(match entry {
152 None => TieredMetadata::NotFound,
153 Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone),
154 Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata),
155 })
156 }
157
158 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
159 let mut store = self.store.lock().unwrap();
160 if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
161 return Ok(Some(tombstone));
162 }
163
164 store.remove(id);
165 Ok(None)
166 }
167
168 async fn compare_and_write(
169 &self,
170 id: &ObjectId,
171 current: Option<&ObjectId>,
172 write: TieredWrite,
173 ) -> Result<bool> {
174 let mut store = self.store.lock().unwrap();
175
176 let actual = store.get(id);
177 let matches_current = matches_redirect(actual, current);
178 let matches_next = matches_redirect(actual, write.target());
179
180 if matches_current {
181 match write {
182 TieredWrite::Tombstone(tombstone) => {
183 store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
184 }
185 TieredWrite::Object(metadata, payload) => {
186 store.insert(id.clone(), StoreEntry::Object(metadata, payload));
187 }
188 TieredWrite::Delete => {
189 store.remove(id);
190 }
191 }
192 }
193
194 Ok(matches_current || matches_next)
195 }
196}
197
198fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
203 match expected {
204 None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
205 Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
206 }
207}
208
209#[derive(Clone, Debug)]
211pub enum Entry {
212 NotFound,
214 Object(Metadata, Bytes),
216 Tombstone(Tombstone),
218}
219
220impl Entry {
221 pub fn is_not_found(&self) -> bool {
223 matches!(self, Entry::NotFound)
224 }
225
226 pub fn is_object(&self) -> bool {
228 matches!(self, Entry::Object(_, _))
229 }
230
231 pub fn is_tombstone(&self) -> bool {
233 matches!(self, Entry::Tombstone(_))
234 }
235
236 pub fn expect_not_found(&self) {
238 match self {
239 Entry::NotFound => (),
240 _ => panic!("expected not found entry, got {:?}", self),
241 }
242 }
243
244 pub fn expect_object(&self) -> (Metadata, Bytes) {
246 match self {
247 Entry::Object(metadata, bytes) => (metadata.clone(), bytes.clone()),
248 _ => panic!("expected object entry, got {:?}", self),
249 }
250 }
251
252 pub fn expect_tombstone(&self) -> Tombstone {
254 match self {
255 Entry::Tombstone(tombstone) => tombstone.clone(),
256 _ => panic!("expected tombstone entry, got {:?}", self),
257 }
258 }
259}