1use std::collections::{BTreeMap, HashMap};
9use std::sync::{Arc, Mutex};
10use std::time::SystemTime;
11
12use objectstore_types::range::ByteRange;
13
14use bytes::{Bytes, BytesMut};
15use futures_util::TryStreamExt;
16use objectstore_types::metadata::Metadata;
17
18use super::common::{
19 DeleteResponse, GetResponse, HighVolumeBackend, MultipartUploadBackend, PutResponse, TieredGet,
20 TieredMetadata, TieredWrite, Tombstone,
21};
22use crate::error::{Error, Result};
23use crate::id::ObjectId;
24use crate::multipart::{
25 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
26 ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
27};
28use crate::stream::ClientStream;
29
30#[derive(Clone, Debug)]
32enum StoreEntry {
33 Object(Metadata, Bytes),
34 Tombstone(Tombstone),
35}
36
37type Store = HashMap<ObjectId, StoreEntry>;
38
39#[derive(Clone, Debug)]
40struct MultipartUpload {
41 metadata: Metadata,
42 parts: BTreeMap<PartNumber, UploadedPart>,
43}
44
45#[derive(Clone, Debug)]
46struct UploadedPart {
47 etag: String,
48 data: Bytes,
49 uploaded_at: SystemTime,
50}
51
52type MultipartStore = HashMap<(ObjectId, UploadId), MultipartUpload>;
53
54#[derive(Debug, Clone)]
60pub struct InMemoryBackend {
61 name: &'static str,
62 store: Arc<Mutex<Store>>,
63 multipart_store: Arc<Mutex<MultipartStore>>,
64}
65
66impl InMemoryBackend {
67 pub fn new(name: &'static str) -> Self {
69 Self {
70 name,
71 store: Arc::new(Mutex::new(HashMap::new())),
72 multipart_store: Arc::new(Mutex::new(HashMap::new())),
73 }
74 }
75
76 pub fn get(&self, id: &ObjectId) -> Entry {
78 match self.store.lock().unwrap().get(id).cloned() {
79 None => Entry::NotFound,
80 Some(StoreEntry::Tombstone(tombstone)) => Entry::Tombstone(tombstone),
81 Some(StoreEntry::Object(metadata, bytes)) => Entry::Object(metadata, bytes),
82 }
83 }
84
85 pub fn contains(&self, id: &ObjectId) -> bool {
87 self.store.lock().unwrap().contains_key(id)
88 }
89
90 pub fn is_empty(&self) -> bool {
92 self.store.lock().unwrap().is_empty()
93 }
94
95 pub fn remove(&self, id: &ObjectId) {
99 self.store.lock().unwrap().remove(id);
100 }
101}
102
103#[async_trait::async_trait]
104impl super::common::Backend for InMemoryBackend {
105 fn name(&self) -> &'static str {
106 self.name
107 }
108
109 fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
110 Ok(self)
111 }
112
113 async fn put_object(
114 &self,
115 id: &ObjectId,
116 metadata: &Metadata,
117 stream: ClientStream,
118 ) -> Result<PutResponse> {
119 let bytes: BytesMut = stream.try_collect().await?;
120 self.store.lock().unwrap().insert(
121 id.clone(),
122 StoreEntry::Object(metadata.clone(), bytes.freeze()),
123 );
124 Ok(())
125 }
126
127 async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
128 let entry = self.store.lock().unwrap().get(id).cloned();
129 match entry {
130 None => Ok(None),
131 Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone),
132 Some(StoreEntry::Object(mut metadata, bytes)) => {
133 let total = bytes.len() as u64;
134 metadata.size = Some(bytes.len());
135 let (content_range, payload) = match range {
136 Some(range) => {
137 let content_range = range
138 .resolve(total)
139 .ok_or(Error::RangeNotSatisfiable { total })?;
140 let sliced =
141 bytes.slice(content_range.start as usize..=content_range.end as usize);
142 (Some(content_range), sliced)
143 }
144 None => (None, bytes),
145 };
146 Ok(Some((
147 metadata,
148 content_range,
149 crate::stream::single(payload),
150 )))
151 }
152 }
153 }
154
155 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
156 self.store.lock().unwrap().remove(id);
157 Ok(())
158 }
159}
160
161#[async_trait::async_trait]
162impl HighVolumeBackend for InMemoryBackend {
163 async fn put_non_tombstone(
164 &self,
165 id: &ObjectId,
166 metadata: &Metadata,
167 payload: Bytes,
168 ) -> Result<Option<Tombstone>> {
169 let mut store = self.store.lock().unwrap();
170 if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
171 return Ok(Some(tombstone));
172 }
173
174 let mut metadata = metadata.clone();
175 metadata.size = Some(payload.len());
176 store.insert(id.clone(), StoreEntry::Object(metadata, payload));
177 Ok(None)
178 }
179
180 async fn get_tiered_object(
181 &self,
182 id: &ObjectId,
183 range: Option<ByteRange>,
184 ) -> Result<TieredGet> {
185 let entry = self.store.lock().unwrap().get(id).cloned();
186 Ok(match entry {
187 None => TieredGet::NotFound,
188 Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone),
189 Some(StoreEntry::Object(mut metadata, bytes)) => {
190 let total = bytes.len() as u64;
191 metadata.size = Some(bytes.len());
192 let (content_range, payload) = match range {
193 Some(range) => {
194 let content_range = range
195 .resolve(total)
196 .ok_or(Error::RangeNotSatisfiable { total })?;
197 let sliced =
198 bytes.slice(content_range.start as usize..=content_range.end as usize);
199 (Some(content_range), sliced)
200 }
201 None => (None, bytes),
202 };
203 TieredGet::Object(metadata, content_range, crate::stream::single(payload))
204 }
205 })
206 }
207
208 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
209 let entry = self.store.lock().unwrap().get(id).cloned();
210 Ok(match entry {
211 None => TieredMetadata::NotFound,
212 Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone),
213 Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata),
214 })
215 }
216
217 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
218 let mut store = self.store.lock().unwrap();
219 if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
220 return Ok(Some(tombstone));
221 }
222
223 store.remove(id);
224 Ok(None)
225 }
226
227 async fn compare_and_write(
228 &self,
229 id: &ObjectId,
230 current: Option<&ObjectId>,
231 write: TieredWrite,
232 ) -> Result<bool> {
233 let mut store = self.store.lock().unwrap();
234
235 let actual = store.get(id);
236 let matches_current = matches_redirect(actual, current);
237 let matches_next = matches_redirect(actual, write.target());
238
239 if matches_current {
240 match write {
241 TieredWrite::Tombstone(tombstone) => {
242 store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
243 }
244 TieredWrite::Object(metadata, payload) => {
245 store.insert(id.clone(), StoreEntry::Object(metadata, payload));
246 }
247 TieredWrite::Delete => {
248 store.remove(id);
249 }
250 }
251 }
252
253 Ok(matches_current || matches_next)
254 }
255}
256
257#[async_trait::async_trait]
258impl MultipartUploadBackend for InMemoryBackend {
259 async fn initiate_multipart(
260 &self,
261 id: &ObjectId,
262 metadata: &Metadata,
263 ) -> Result<InitiateMultipartResponse> {
264 let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?;
265 let upload = MultipartUpload {
266 metadata: metadata.clone(),
267 parts: BTreeMap::new(),
268 };
269 self.multipart_store
270 .lock()
271 .unwrap()
272 .insert((id.clone(), upload_id.clone()), upload);
273 Ok(upload_id)
274 }
275
276 async fn upload_part(
277 &self,
278 id: &ObjectId,
279 upload_id: &UploadId,
280 part_number: PartNumber,
281 _content_length: u64,
282 _content_md5: Option<&str>,
283 body: ClientStream,
284 ) -> Result<UploadPartResponse> {
285 let data: BytesMut = body.try_collect().await?;
286 let data = data.freeze();
287 let etag = format!("\"etag-{part_number}-{}\"", data.len());
288
289 let mut store = self.multipart_store.lock().unwrap();
290 let upload = store
291 .get_mut(&(id.clone(), upload_id.clone()))
292 .ok_or_else(|| Error::generic("multipart upload not found"))?;
293
294 upload.parts.insert(
295 part_number,
296 UploadedPart {
297 etag: etag.clone(),
298 data,
299 uploaded_at: SystemTime::now(),
300 },
301 );
302
303 Ok(etag)
304 }
305
306 async fn list_parts(
307 &self,
308 id: &ObjectId,
309 upload_id: &UploadId,
310 max_parts: Option<u32>,
311 part_number_marker: Option<PartNumber>,
312 ) -> Result<ListPartsResponse> {
313 let store = self.multipart_store.lock().unwrap();
314 let upload = store
315 .get(&(id.clone(), upload_id.clone()))
316 .ok_or_else(|| Error::generic("multipart upload not found"))?;
317
318 let iter = upload
319 .parts
320 .iter()
321 .filter(|(pn, _)| part_number_marker.is_none_or(|marker| **pn > marker));
322
323 let max = max_parts.unwrap_or(u32::MAX) as usize;
324 let all: Vec<_> = iter.collect();
325 let is_truncated = all.len() > max;
326 let page: Vec<_> = all.into_iter().take(max).collect();
327
328 let next_part_number_marker = if is_truncated {
329 page.last().map(|(pn, _)| **pn)
330 } else {
331 None
332 };
333
334 let parts = page
335 .into_iter()
336 .map(|(pn, part)| Part {
337 part_number: *pn,
338 etag: part.etag.clone(),
339 last_modified: part.uploaded_at,
340 size: part.data.len() as u64,
341 })
342 .collect();
343
344 Ok(ListPartsResponse {
345 parts,
346 is_truncated,
347 next_part_number_marker,
348 })
349 }
350
351 async fn abort_multipart(
352 &self,
353 id: &ObjectId,
354 upload_id: &UploadId,
355 ) -> Result<AbortMultipartResponse> {
356 self.multipart_store
357 .lock()
358 .unwrap()
359 .remove(&(id.clone(), upload_id.clone()));
360 Ok(())
361 }
362
363 async fn complete_multipart(
364 &self,
365 id: &ObjectId,
366 upload_id: &UploadId,
367 parts: Vec<CompletedPart>,
368 ) -> Result<CompleteMultipartResponse> {
369 let key = (id.clone(), upload_id.clone());
370
371 let assembled = {
378 let store = self.multipart_store.lock().unwrap();
379 let upload = store
380 .get(&key)
381 .ok_or_else(|| Error::generic("multipart upload not found"))?;
382
383 for completed in &parts {
384 match upload.parts.get(&completed.part_number) {
385 None => {
386 return Ok(Some(crate::multipart::CompleteMultipartError {
387 code: "InvalidPart".into(),
388 message: format!(
389 "part number {} was not uploaded",
390 completed.part_number
391 ),
392 }));
393 }
394 Some(stored) if stored.etag != completed.etag => {
395 return Ok(Some(crate::multipart::CompleteMultipartError {
396 code: "InvalidPart".into(),
397 message: format!(
398 "etag mismatch for part {}: expected {}, got {}",
399 completed.part_number, stored.etag, completed.etag
400 ),
401 }));
402 }
403 _ => {}
404 }
405 }
406
407 let mut payload = BytesMut::new();
408 for completed in &parts {
409 let stored = &upload.parts[&completed.part_number];
410 payload.extend_from_slice(&stored.data);
411 }
412
413 let mut metadata = upload.metadata.clone();
414 metadata.size = Some(payload.len());
415
416 (metadata, payload.freeze())
417 };
418
419 self.store
420 .lock()
421 .unwrap()
422 .insert(id.clone(), StoreEntry::Object(assembled.0, assembled.1));
423
424 self.multipart_store.lock().unwrap().remove(&key);
425
426 Ok(None)
427 }
428}
429
430fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
435 match expected {
436 None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
437 Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
438 }
439}
440
441#[derive(Clone, Debug)]
443pub enum Entry {
444 NotFound,
446 Object(Metadata, Bytes),
448 Tombstone(Tombstone),
450}
451
452impl Entry {
453 pub fn is_not_found(&self) -> bool {
455 matches!(self, Entry::NotFound)
456 }
457
458 pub fn is_object(&self) -> bool {
460 matches!(self, Entry::Object(_, _))
461 }
462
463 pub fn is_tombstone(&self) -> bool {
465 matches!(self, Entry::Tombstone(_))
466 }
467
468 pub fn expect_not_found(&self) {
470 match self {
471 Entry::NotFound => (),
472 _ => panic!("expected not found entry, got {self:?}"),
473 }
474 }
475
476 pub fn expect_object(&self) -> (Metadata, Bytes) {
478 match self {
479 Entry::Object(metadata, bytes) => (metadata.clone(), bytes.clone()),
480 _ => panic!("expected object entry, got {self:?}"),
481 }
482 }
483
484 pub fn expect_tombstone(&self) -> Tombstone {
486 match self {
487 Entry::Tombstone(tombstone) => tombstone.clone(),
488 _ => panic!("expected tombstone entry, got {self:?}"),
489 }
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use std::num::NonZeroU32;
496 use std::time::Duration;
497
498 use objectstore_types::metadata::ExpirationPolicy;
499 use objectstore_types::scope::{Scope, Scopes};
500
501 use super::*;
502 use crate::backend::common::Backend;
503 use crate::id::ObjectContext;
504 use crate::stream;
505
506 fn make_id() -> ObjectId {
507 ObjectId::random(ObjectContext {
508 usecase: "testing".into(),
509 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
510 })
511 }
512
513 #[tokio::test]
514 async fn multipart_single_part() {
515 let backend = InMemoryBackend::new("test");
516 let id = make_id();
517 let metadata = Metadata {
518 content_type: "text/plain".into(),
519 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_hours(1)),
520 origin: Some("203.0.113.42".into()),
521 custom: [("foo".into(), "bar".into())].into(),
522 ..Default::default()
523 };
524
525 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
526
527 let data = b"hello, multipart world!";
528 let etag = backend
529 .upload_part(
530 &id,
531 &upload_id,
532 NonZeroU32::new(1).unwrap(),
533 data.len() as u64,
534 None,
535 stream::single(data.to_vec()),
536 )
537 .await
538 .unwrap();
539
540 let result = backend
541 .complete_multipart(
542 &id,
543 &upload_id,
544 vec![CompletedPart {
545 part_number: NonZeroU32::new(1).unwrap(),
546 etag,
547 }],
548 )
549 .await
550 .unwrap();
551 assert!(result.is_none(), "expected no error on complete");
552
553 let (meta, _, body) = backend.get_object(&id, None).await.unwrap().unwrap();
554 let payload = stream::read_to_vec(body).await.unwrap();
555 assert_eq!(payload, data);
556 assert_eq!(meta.content_type, "text/plain".to_string());
557 assert_eq!(
558 meta.expiration_policy,
559 ExpirationPolicy::TimeToIdle(Duration::from_hours(1))
560 );
561 assert_eq!(meta.origin, Some("203.0.113.42".into()));
562 assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
563 }
564
565 #[tokio::test]
566 async fn multipart_multiple_parts() {
567 let backend = InMemoryBackend::new("test");
568 let id = make_id();
569 let metadata = Metadata::default();
570
571 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
572
573 let part1 = b"aaaa".to_vec();
574 let part2 = b"bbbb".to_vec();
575 let part3 = b"cc".to_vec();
576
577 let etag1 = backend
578 .upload_part(
579 &id,
580 &upload_id,
581 NonZeroU32::new(1).unwrap(),
582 part1.len() as u64,
583 None,
584 stream::single(part1.clone()),
585 )
586 .await
587 .unwrap();
588 let etag2 = backend
589 .upload_part(
590 &id,
591 &upload_id,
592 NonZeroU32::new(2).unwrap(),
593 part2.len() as u64,
594 None,
595 stream::single(part2.clone()),
596 )
597 .await
598 .unwrap();
599 let etag3 = backend
600 .upload_part(
601 &id,
602 &upload_id,
603 NonZeroU32::new(3).unwrap(),
604 part3.len() as u64,
605 None,
606 stream::single(part3.clone()),
607 )
608 .await
609 .unwrap();
610
611 let result = backend
612 .complete_multipart(
613 &id,
614 &upload_id,
615 vec![
616 CompletedPart {
617 part_number: NonZeroU32::new(1).unwrap(),
618 etag: etag1,
619 },
620 CompletedPart {
621 part_number: NonZeroU32::new(2).unwrap(),
622 etag: etag2,
623 },
624 CompletedPart {
625 part_number: NonZeroU32::new(3).unwrap(),
626 etag: etag3,
627 },
628 ],
629 )
630 .await
631 .unwrap();
632 assert!(result.is_none());
633
634 let (_, _, body) = backend.get_object(&id, None).await.unwrap().unwrap();
635 let payload = stream::read_to_vec(body).await.unwrap();
636 assert_eq!(payload, b"aaaabbbbcc");
637 }
638
639 #[tokio::test]
640 async fn multipart_list_parts() {
641 let backend = InMemoryBackend::new("test");
642 let id = make_id();
643 let metadata = Metadata::default();
644
645 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
646
647 let etag1 = backend
648 .upload_part(
649 &id,
650 &upload_id,
651 NonZeroU32::new(1).unwrap(),
652 3,
653 None,
654 stream::single(b"aaa".to_vec()),
655 )
656 .await
657 .unwrap();
658 let etag2 = backend
659 .upload_part(
660 &id,
661 &upload_id,
662 NonZeroU32::new(2).unwrap(),
663 3,
664 None,
665 stream::single(b"bbb".to_vec()),
666 )
667 .await
668 .unwrap();
669
670 let list = backend
671 .list_parts(&id, &upload_id, None, None)
672 .await
673 .unwrap();
674 assert_eq!(list.parts.len(), 2);
675 assert_eq!(list.parts[0].part_number.get(), 1);
676 assert_eq!(list.parts[0].etag, etag1);
677 assert_eq!(list.parts[0].size, 3);
678 assert_eq!(list.parts[1].part_number.get(), 2);
679 assert_eq!(list.parts[1].etag, etag2);
680 assert_eq!(list.parts[1].size, 3);
681
682 let page1 = backend
684 .list_parts(&id, &upload_id, Some(1), None)
685 .await
686 .unwrap();
687 assert_eq!(page1.parts.len(), 1);
688 assert_eq!(page1.parts[0].part_number.get(), 1);
689 assert!(page1.is_truncated);
690 assert!(page1.next_part_number_marker.is_some());
691
692 let page2 = backend
693 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
694 .await
695 .unwrap();
696 assert_eq!(page2.parts.len(), 1);
697 assert_eq!(page2.parts[0].part_number.get(), 2);
698
699 backend.abort_multipart(&id, &upload_id).await.unwrap();
700 }
701
702 #[tokio::test]
703 async fn multipart_abort() {
704 let backend = InMemoryBackend::new("test");
705 let id = make_id();
706 let metadata = Metadata::default();
707
708 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
709
710 backend
711 .upload_part(
712 &id,
713 &upload_id,
714 NonZeroU32::new(1).unwrap(),
715 5,
716 None,
717 stream::single(b"hello".to_vec()),
718 )
719 .await
720 .unwrap();
721
722 backend.abort_multipart(&id, &upload_id).await.unwrap();
723
724 let result = backend.get_object(&id, None).await.unwrap();
725 assert!(result.is_none(), "object should not exist after abort");
726 }
727
728 #[tokio::test]
729 async fn multipart_invalid_etag() {
730 let backend = InMemoryBackend::new("test");
731 let id = make_id();
732 let metadata = Metadata::default();
733
734 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
735
736 let etag = backend
737 .upload_part(
738 &id,
739 &upload_id,
740 NonZeroU32::new(1).unwrap(),
741 5,
742 None,
743 stream::single(b"hello".to_vec()),
744 )
745 .await
746 .unwrap();
747
748 let result = backend
749 .complete_multipart(
750 &id,
751 &upload_id,
752 vec![CompletedPart {
753 part_number: NonZeroU32::new(1).unwrap(),
754 etag: "wrong-etag".into(),
755 }],
756 )
757 .await
758 .unwrap();
759 assert!(result.is_some(), "expected error for bad etag");
760 assert_eq!(result.unwrap().code, "InvalidPart");
761
762 let result = backend
764 .complete_multipart(
765 &id,
766 &upload_id,
767 vec![CompletedPart {
768 part_number: NonZeroU32::new(1).unwrap(),
769 etag,
770 }],
771 )
772 .await
773 .unwrap();
774 assert!(result.is_none(), "retry with correct etag should succeed");
775 }
776
777 #[tokio::test]
778 async fn multipart_missing_part() {
779 let backend = InMemoryBackend::new("test");
780 let id = make_id();
781 let metadata = Metadata::default();
782
783 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
784
785 let etag = backend
786 .upload_part(
787 &id,
788 &upload_id,
789 NonZeroU32::new(1).unwrap(),
790 5,
791 None,
792 stream::single(b"hello".to_vec()),
793 )
794 .await
795 .unwrap();
796
797 let result = backend
798 .complete_multipart(
799 &id,
800 &upload_id,
801 vec![CompletedPart {
802 part_number: NonZeroU32::new(99).unwrap(),
803 etag: "whatever".into(),
804 }],
805 )
806 .await
807 .unwrap();
808 assert!(result.is_some(), "expected error for missing part");
809 assert_eq!(result.unwrap().code, "InvalidPart");
810
811 let result = backend
813 .complete_multipart(
814 &id,
815 &upload_id,
816 vec![CompletedPart {
817 part_number: NonZeroU32::new(1).unwrap(),
818 etag,
819 }],
820 )
821 .await
822 .unwrap();
823 assert!(result.is_none(), "retry with correct part should succeed");
824 }
825}