1use std::collections::{BTreeMap, HashMap};
9use std::sync::{Arc, Mutex};
10use std::time::SystemTime;
11
12use bytes::{Bytes, BytesMut};
13use futures_util::TryStreamExt;
14use objectstore_types::metadata::Metadata;
15
16use super::common::{
17 DeleteResponse, GetResponse, HighVolumeBackend, MultipartUploadBackend, PutResponse, TieredGet,
18 TieredMetadata, TieredWrite, Tombstone,
19};
20use crate::error::{Error, Result};
21use crate::id::ObjectId;
22use crate::multipart::{
23 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
24 ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
25};
26use crate::stream::ClientStream;
27
28#[derive(Clone, Debug)]
30enum StoreEntry {
31 Object(Metadata, Bytes),
32 Tombstone(Tombstone),
33}
34
35type Store = HashMap<ObjectId, StoreEntry>;
36
37#[derive(Clone, Debug)]
38struct MultipartUpload {
39 metadata: Metadata,
40 parts: BTreeMap<PartNumber, UploadedPart>,
41}
42
43#[derive(Clone, Debug)]
44struct UploadedPart {
45 etag: String,
46 data: Bytes,
47 uploaded_at: SystemTime,
48}
49
50type MultipartStore = HashMap<(ObjectId, UploadId), MultipartUpload>;
51
52#[derive(Debug, Clone)]
58pub struct InMemoryBackend {
59 name: &'static str,
60 store: Arc<Mutex<Store>>,
61 multipart_store: Arc<Mutex<MultipartStore>>,
62}
63
64impl InMemoryBackend {
65 pub fn new(name: &'static str) -> Self {
67 Self {
68 name,
69 store: Arc::new(Mutex::new(HashMap::new())),
70 multipart_store: Arc::new(Mutex::new(HashMap::new())),
71 }
72 }
73
74 pub fn get(&self, id: &ObjectId) -> Entry {
76 match self.store.lock().unwrap().get(id).cloned() {
77 None => Entry::NotFound,
78 Some(StoreEntry::Tombstone(tombstone)) => Entry::Tombstone(tombstone),
79 Some(StoreEntry::Object(metadata, bytes)) => Entry::Object(metadata, bytes),
80 }
81 }
82
83 pub fn contains(&self, id: &ObjectId) -> bool {
85 self.store.lock().unwrap().contains_key(id)
86 }
87
88 pub fn is_empty(&self) -> bool {
90 self.store.lock().unwrap().is_empty()
91 }
92
93 pub fn remove(&self, id: &ObjectId) {
97 self.store.lock().unwrap().remove(id);
98 }
99}
100
101#[async_trait::async_trait]
102impl super::common::Backend for InMemoryBackend {
103 fn name(&self) -> &'static str {
104 self.name
105 }
106
107 fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
108 Ok(self)
109 }
110
111 async fn put_object(
112 &self,
113 id: &ObjectId,
114 metadata: &Metadata,
115 stream: ClientStream,
116 ) -> Result<PutResponse> {
117 let bytes: BytesMut = stream.try_collect().await?;
118 self.store.lock().unwrap().insert(
119 id.clone(),
120 StoreEntry::Object(metadata.clone(), bytes.freeze()),
121 );
122 Ok(())
123 }
124
125 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
126 let entry = self.store.lock().unwrap().get(id).cloned();
127 match entry {
128 None => Ok(None),
129 Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone),
130 Some(StoreEntry::Object(mut metadata, bytes)) => {
131 metadata.size = Some(bytes.len());
132 let stream = crate::stream::single(bytes);
133 Ok(Some((metadata, stream)))
134 }
135 }
136 }
137
138 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
139 self.store.lock().unwrap().remove(id);
140 Ok(())
141 }
142}
143
144#[async_trait::async_trait]
145impl HighVolumeBackend for InMemoryBackend {
146 async fn put_non_tombstone(
147 &self,
148 id: &ObjectId,
149 metadata: &Metadata,
150 payload: Bytes,
151 ) -> Result<Option<Tombstone>> {
152 let mut store = self.store.lock().unwrap();
153 if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
154 return Ok(Some(tombstone));
155 }
156
157 let mut metadata = metadata.clone();
158 metadata.size = Some(payload.len());
159 store.insert(id.clone(), StoreEntry::Object(metadata, payload));
160 Ok(None)
161 }
162
163 async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
164 let entry = self.store.lock().unwrap().get(id).cloned();
165 Ok(match entry {
166 None => TieredGet::NotFound,
167 Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone),
168 Some(StoreEntry::Object(mut metadata, bytes)) => {
169 metadata.size = Some(bytes.len());
170 TieredGet::Object(metadata, crate::stream::single(bytes))
171 }
172 })
173 }
174
175 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
176 let entry = self.store.lock().unwrap().get(id).cloned();
177 Ok(match entry {
178 None => TieredMetadata::NotFound,
179 Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone),
180 Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata),
181 })
182 }
183
184 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
185 let mut store = self.store.lock().unwrap();
186 if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
187 return Ok(Some(tombstone));
188 }
189
190 store.remove(id);
191 Ok(None)
192 }
193
194 async fn compare_and_write(
195 &self,
196 id: &ObjectId,
197 current: Option<&ObjectId>,
198 write: TieredWrite,
199 ) -> Result<bool> {
200 let mut store = self.store.lock().unwrap();
201
202 let actual = store.get(id);
203 let matches_current = matches_redirect(actual, current);
204 let matches_next = matches_redirect(actual, write.target());
205
206 if matches_current {
207 match write {
208 TieredWrite::Tombstone(tombstone) => {
209 store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
210 }
211 TieredWrite::Object(metadata, payload) => {
212 store.insert(id.clone(), StoreEntry::Object(metadata, payload));
213 }
214 TieredWrite::Delete => {
215 store.remove(id);
216 }
217 }
218 }
219
220 Ok(matches_current || matches_next)
221 }
222}
223
224#[async_trait::async_trait]
225impl MultipartUploadBackend for InMemoryBackend {
226 async fn initiate_multipart(
227 &self,
228 id: &ObjectId,
229 metadata: &Metadata,
230 ) -> Result<InitiateMultipartResponse> {
231 let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?;
232 let upload = MultipartUpload {
233 metadata: metadata.clone(),
234 parts: BTreeMap::new(),
235 };
236 self.multipart_store
237 .lock()
238 .unwrap()
239 .insert((id.clone(), upload_id.clone()), upload);
240 Ok(upload_id)
241 }
242
243 async fn upload_part(
244 &self,
245 id: &ObjectId,
246 upload_id: &UploadId,
247 part_number: PartNumber,
248 _content_length: u64,
249 _content_md5: Option<&str>,
250 body: ClientStream,
251 ) -> Result<UploadPartResponse> {
252 let data: BytesMut = body.try_collect().await?;
253 let data = data.freeze();
254 let etag = format!("\"etag-{part_number}-{}\"", data.len());
255
256 let mut store = self.multipart_store.lock().unwrap();
257 let upload = store
258 .get_mut(&(id.clone(), upload_id.clone()))
259 .ok_or_else(|| Error::generic("multipart upload not found"))?;
260
261 upload.parts.insert(
262 part_number,
263 UploadedPart {
264 etag: etag.clone(),
265 data,
266 uploaded_at: SystemTime::now(),
267 },
268 );
269
270 Ok(etag)
271 }
272
273 async fn list_parts(
274 &self,
275 id: &ObjectId,
276 upload_id: &UploadId,
277 max_parts: Option<u32>,
278 part_number_marker: Option<PartNumber>,
279 ) -> Result<ListPartsResponse> {
280 let store = self.multipart_store.lock().unwrap();
281 let upload = store
282 .get(&(id.clone(), upload_id.clone()))
283 .ok_or_else(|| Error::generic("multipart upload not found"))?;
284
285 let iter = upload
286 .parts
287 .iter()
288 .filter(|(pn, _)| part_number_marker.is_none_or(|marker| **pn > marker));
289
290 let max = max_parts.unwrap_or(u32::MAX) as usize;
291 let all: Vec<_> = iter.collect();
292 let is_truncated = all.len() > max;
293 let page: Vec<_> = all.into_iter().take(max).collect();
294
295 let next_part_number_marker = if is_truncated {
296 page.last().map(|(pn, _)| **pn)
297 } else {
298 None
299 };
300
301 let parts = page
302 .into_iter()
303 .map(|(pn, part)| Part {
304 part_number: *pn,
305 etag: part.etag.clone(),
306 last_modified: part.uploaded_at,
307 size: part.data.len() as u64,
308 })
309 .collect();
310
311 Ok(ListPartsResponse {
312 parts,
313 is_truncated,
314 next_part_number_marker,
315 })
316 }
317
318 async fn abort_multipart(
319 &self,
320 id: &ObjectId,
321 upload_id: &UploadId,
322 ) -> Result<AbortMultipartResponse> {
323 self.multipart_store
324 .lock()
325 .unwrap()
326 .remove(&(id.clone(), upload_id.clone()));
327 Ok(())
328 }
329
330 async fn complete_multipart(
331 &self,
332 id: &ObjectId,
333 upload_id: &UploadId,
334 parts: Vec<CompletedPart>,
335 ) -> Result<CompleteMultipartResponse> {
336 let key = (id.clone(), upload_id.clone());
337
338 let assembled = {
345 let store = self.multipart_store.lock().unwrap();
346 let upload = store
347 .get(&key)
348 .ok_or_else(|| Error::generic("multipart upload not found"))?;
349
350 for completed in &parts {
351 match upload.parts.get(&completed.part_number) {
352 None => {
353 return Ok(Some(crate::multipart::CompleteMultipartError {
354 code: "InvalidPart".into(),
355 message: format!(
356 "part number {} was not uploaded",
357 completed.part_number
358 ),
359 }));
360 }
361 Some(stored) if stored.etag != completed.etag => {
362 return Ok(Some(crate::multipart::CompleteMultipartError {
363 code: "InvalidPart".into(),
364 message: format!(
365 "etag mismatch for part {}: expected {}, got {}",
366 completed.part_number, stored.etag, completed.etag
367 ),
368 }));
369 }
370 _ => {}
371 }
372 }
373
374 let mut payload = BytesMut::new();
375 for completed in &parts {
376 let stored = &upload.parts[&completed.part_number];
377 payload.extend_from_slice(&stored.data);
378 }
379
380 let mut metadata = upload.metadata.clone();
381 metadata.size = Some(payload.len());
382
383 (metadata, payload.freeze())
384 };
385
386 self.store
387 .lock()
388 .unwrap()
389 .insert(id.clone(), StoreEntry::Object(assembled.0, assembled.1));
390
391 self.multipart_store.lock().unwrap().remove(&key);
392
393 Ok(None)
394 }
395}
396
397fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
402 match expected {
403 None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
404 Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
405 }
406}
407
408#[derive(Clone, Debug)]
410pub enum Entry {
411 NotFound,
413 Object(Metadata, Bytes),
415 Tombstone(Tombstone),
417}
418
419impl Entry {
420 pub fn is_not_found(&self) -> bool {
422 matches!(self, Entry::NotFound)
423 }
424
425 pub fn is_object(&self) -> bool {
427 matches!(self, Entry::Object(_, _))
428 }
429
430 pub fn is_tombstone(&self) -> bool {
432 matches!(self, Entry::Tombstone(_))
433 }
434
435 pub fn expect_not_found(&self) {
437 match self {
438 Entry::NotFound => (),
439 _ => panic!("expected not found entry, got {:?}", self),
440 }
441 }
442
443 pub fn expect_object(&self) -> (Metadata, Bytes) {
445 match self {
446 Entry::Object(metadata, bytes) => (metadata.clone(), bytes.clone()),
447 _ => panic!("expected object entry, got {:?}", self),
448 }
449 }
450
451 pub fn expect_tombstone(&self) -> Tombstone {
453 match self {
454 Entry::Tombstone(tombstone) => tombstone.clone(),
455 _ => panic!("expected tombstone entry, got {:?}", self),
456 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use std::num::NonZeroU32;
463 use std::time::Duration;
464
465 use objectstore_types::metadata::ExpirationPolicy;
466 use objectstore_types::scope::{Scope, Scopes};
467
468 use super::*;
469 use crate::backend::common::Backend;
470 use crate::id::ObjectContext;
471 use crate::stream;
472
473 fn make_id() -> ObjectId {
474 ObjectId::random(ObjectContext {
475 usecase: "testing".into(),
476 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
477 })
478 }
479
480 #[tokio::test]
481 async fn multipart_single_part() {
482 let backend = InMemoryBackend::new("test");
483 let id = make_id();
484 let metadata = Metadata {
485 content_type: "text/plain".into(),
486 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
487 origin: Some("203.0.113.42".into()),
488 custom: [("foo".into(), "bar".into())].into(),
489 ..Default::default()
490 };
491
492 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
493
494 let data = b"hello, multipart world!";
495 let etag = backend
496 .upload_part(
497 &id,
498 &upload_id,
499 NonZeroU32::new(1).unwrap(),
500 data.len() as u64,
501 None,
502 stream::single(data.to_vec()),
503 )
504 .await
505 .unwrap();
506
507 let result = backend
508 .complete_multipart(
509 &id,
510 &upload_id,
511 vec![CompletedPart {
512 part_number: NonZeroU32::new(1).unwrap(),
513 etag,
514 }],
515 )
516 .await
517 .unwrap();
518 assert!(result.is_none(), "expected no error on complete");
519
520 let (meta, body) = backend.get_object(&id).await.unwrap().unwrap();
521 let payload = stream::read_to_vec(body).await.unwrap();
522 assert_eq!(payload, data);
523 assert_eq!(meta.content_type, "text/plain".to_string());
524 assert_eq!(
525 meta.expiration_policy,
526 ExpirationPolicy::TimeToIdle(Duration::from_secs(3600))
527 );
528 assert_eq!(meta.origin, Some("203.0.113.42".into()));
529 assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
530 }
531
532 #[tokio::test]
533 async fn multipart_multiple_parts() {
534 let backend = InMemoryBackend::new("test");
535 let id = make_id();
536 let metadata = Metadata::default();
537
538 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
539
540 let part1 = b"aaaa".to_vec();
541 let part2 = b"bbbb".to_vec();
542 let part3 = b"cc".to_vec();
543
544 let etag1 = backend
545 .upload_part(
546 &id,
547 &upload_id,
548 NonZeroU32::new(1).unwrap(),
549 part1.len() as u64,
550 None,
551 stream::single(part1.clone()),
552 )
553 .await
554 .unwrap();
555 let etag2 = backend
556 .upload_part(
557 &id,
558 &upload_id,
559 NonZeroU32::new(2).unwrap(),
560 part2.len() as u64,
561 None,
562 stream::single(part2.clone()),
563 )
564 .await
565 .unwrap();
566 let etag3 = backend
567 .upload_part(
568 &id,
569 &upload_id,
570 NonZeroU32::new(3).unwrap(),
571 part3.len() as u64,
572 None,
573 stream::single(part3.clone()),
574 )
575 .await
576 .unwrap();
577
578 let result = backend
579 .complete_multipart(
580 &id,
581 &upload_id,
582 vec![
583 CompletedPart {
584 part_number: NonZeroU32::new(1).unwrap(),
585 etag: etag1,
586 },
587 CompletedPart {
588 part_number: NonZeroU32::new(2).unwrap(),
589 etag: etag2,
590 },
591 CompletedPart {
592 part_number: NonZeroU32::new(3).unwrap(),
593 etag: etag3,
594 },
595 ],
596 )
597 .await
598 .unwrap();
599 assert!(result.is_none());
600
601 let (_, body) = backend.get_object(&id).await.unwrap().unwrap();
602 let payload = stream::read_to_vec(body).await.unwrap();
603 assert_eq!(payload, b"aaaabbbbcc");
604 }
605
606 #[tokio::test]
607 async fn multipart_list_parts() {
608 let backend = InMemoryBackend::new("test");
609 let id = make_id();
610 let metadata = Metadata::default();
611
612 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
613
614 let etag1 = backend
615 .upload_part(
616 &id,
617 &upload_id,
618 NonZeroU32::new(1).unwrap(),
619 3,
620 None,
621 stream::single(b"aaa".to_vec()),
622 )
623 .await
624 .unwrap();
625 let etag2 = backend
626 .upload_part(
627 &id,
628 &upload_id,
629 NonZeroU32::new(2).unwrap(),
630 3,
631 None,
632 stream::single(b"bbb".to_vec()),
633 )
634 .await
635 .unwrap();
636
637 let list = backend
638 .list_parts(&id, &upload_id, None, None)
639 .await
640 .unwrap();
641 assert_eq!(list.parts.len(), 2);
642 assert_eq!(list.parts[0].part_number.get(), 1);
643 assert_eq!(list.parts[0].etag, etag1);
644 assert_eq!(list.parts[0].size, 3);
645 assert_eq!(list.parts[1].part_number.get(), 2);
646 assert_eq!(list.parts[1].etag, etag2);
647 assert_eq!(list.parts[1].size, 3);
648
649 let page1 = backend
651 .list_parts(&id, &upload_id, Some(1), None)
652 .await
653 .unwrap();
654 assert_eq!(page1.parts.len(), 1);
655 assert_eq!(page1.parts[0].part_number.get(), 1);
656 assert!(page1.is_truncated);
657 assert!(page1.next_part_number_marker.is_some());
658
659 let page2 = backend
660 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
661 .await
662 .unwrap();
663 assert_eq!(page2.parts.len(), 1);
664 assert_eq!(page2.parts[0].part_number.get(), 2);
665
666 backend.abort_multipart(&id, &upload_id).await.unwrap();
667 }
668
669 #[tokio::test]
670 async fn multipart_abort() {
671 let backend = InMemoryBackend::new("test");
672 let id = make_id();
673 let metadata = Metadata::default();
674
675 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
676
677 backend
678 .upload_part(
679 &id,
680 &upload_id,
681 NonZeroU32::new(1).unwrap(),
682 5,
683 None,
684 stream::single(b"hello".to_vec()),
685 )
686 .await
687 .unwrap();
688
689 backend.abort_multipart(&id, &upload_id).await.unwrap();
690
691 let result = backend.get_object(&id).await.unwrap();
692 assert!(result.is_none(), "object should not exist after abort");
693 }
694
695 #[tokio::test]
696 async fn multipart_invalid_etag() {
697 let backend = InMemoryBackend::new("test");
698 let id = make_id();
699 let metadata = Metadata::default();
700
701 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
702
703 let etag = backend
704 .upload_part(
705 &id,
706 &upload_id,
707 NonZeroU32::new(1).unwrap(),
708 5,
709 None,
710 stream::single(b"hello".to_vec()),
711 )
712 .await
713 .unwrap();
714
715 let result = backend
716 .complete_multipart(
717 &id,
718 &upload_id,
719 vec![CompletedPart {
720 part_number: NonZeroU32::new(1).unwrap(),
721 etag: "wrong-etag".into(),
722 }],
723 )
724 .await
725 .unwrap();
726 assert!(result.is_some(), "expected error for bad etag");
727 assert_eq!(result.unwrap().code, "InvalidPart");
728
729 let result = backend
731 .complete_multipart(
732 &id,
733 &upload_id,
734 vec![CompletedPart {
735 part_number: NonZeroU32::new(1).unwrap(),
736 etag,
737 }],
738 )
739 .await
740 .unwrap();
741 assert!(result.is_none(), "retry with correct etag should succeed");
742 }
743
744 #[tokio::test]
745 async fn multipart_missing_part() {
746 let backend = InMemoryBackend::new("test");
747 let id = make_id();
748 let metadata = Metadata::default();
749
750 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
751
752 let etag = backend
753 .upload_part(
754 &id,
755 &upload_id,
756 NonZeroU32::new(1).unwrap(),
757 5,
758 None,
759 stream::single(b"hello".to_vec()),
760 )
761 .await
762 .unwrap();
763
764 let result = backend
765 .complete_multipart(
766 &id,
767 &upload_id,
768 vec![CompletedPart {
769 part_number: NonZeroU32::new(99).unwrap(),
770 etag: "whatever".into(),
771 }],
772 )
773 .await
774 .unwrap();
775 assert!(result.is_some(), "expected error for missing part");
776 assert_eq!(result.unwrap().code, "InvalidPart");
777
778 let result = backend
780 .complete_multipart(
781 &id,
782 &upload_id,
783 vec![CompletedPart {
784 part_number: NonZeroU32::new(1).unwrap(),
785 etag,
786 }],
787 )
788 .await
789 .unwrap();
790 assert!(result.is_none(), "retry with correct part should succeed");
791 }
792}