1use std::collections::{BTreeMap, HashMap};
9use std::sync::{Arc, Mutex};
10use std::time::SystemTime;
11
12use bytes::{Bytes, BytesMut};
13use futures_util::TryStreamExt;
14use objectstore_types::metadata::Metadata;
15
16use super::common::{
17 DeleteResponse, GetResponse, HighVolumeBackend, MultipartUploadBackend, PutResponse, TieredGet,
18 TieredMetadata, TieredWrite, Tombstone,
19};
20use crate::error::{Error, Result};
21use crate::id::ObjectId;
22use crate::multipart::{
23 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
24 ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
25};
26use crate::stream::ClientStream;
27
28#[derive(Clone, Debug)]
30enum StoreEntry {
31 Object(Metadata, Bytes),
32 Tombstone(Tombstone),
33}
34
35type Store = HashMap<ObjectId, StoreEntry>;
36
37#[derive(Clone, Debug)]
38struct MultipartUpload {
39 metadata: Metadata,
40 parts: BTreeMap<PartNumber, UploadedPart>,
41}
42
43#[derive(Clone, Debug)]
44struct UploadedPart {
45 etag: String,
46 data: Bytes,
47 uploaded_at: SystemTime,
48}
49
50type MultipartStore = HashMap<(ObjectId, UploadId), MultipartUpload>;
51
52#[derive(Debug, Clone)]
58pub struct InMemoryBackend {
59 name: &'static str,
60 store: Arc<Mutex<Store>>,
61 multipart_store: Arc<Mutex<MultipartStore>>,
62}
63
64impl InMemoryBackend {
65 pub fn new(name: &'static str) -> Self {
67 Self {
68 name,
69 store: Arc::new(Mutex::new(HashMap::new())),
70 multipart_store: Arc::new(Mutex::new(HashMap::new())),
71 }
72 }
73
74 pub fn get(&self, id: &ObjectId) -> Entry {
76 match self.store.lock().unwrap().get(id).cloned() {
77 None => Entry::NotFound,
78 Some(StoreEntry::Tombstone(tombstone)) => Entry::Tombstone(tombstone),
79 Some(StoreEntry::Object(metadata, bytes)) => Entry::Object(metadata, bytes),
80 }
81 }
82
83 pub fn contains(&self, id: &ObjectId) -> bool {
85 self.store.lock().unwrap().contains_key(id)
86 }
87
88 pub fn is_empty(&self) -> bool {
90 self.store.lock().unwrap().is_empty()
91 }
92
93 pub fn remove(&self, id: &ObjectId) {
97 self.store.lock().unwrap().remove(id);
98 }
99}
100
101#[async_trait::async_trait]
102impl super::common::Backend for InMemoryBackend {
103 fn name(&self) -> &'static str {
104 self.name
105 }
106
107 async fn put_object(
108 &self,
109 id: &ObjectId,
110 metadata: &Metadata,
111 stream: ClientStream,
112 ) -> Result<PutResponse> {
113 let bytes: BytesMut = stream.try_collect().await?;
114 self.store.lock().unwrap().insert(
115 id.clone(),
116 StoreEntry::Object(metadata.clone(), bytes.freeze()),
117 );
118 Ok(())
119 }
120
121 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
122 let entry = self.store.lock().unwrap().get(id).cloned();
123 match entry {
124 None => Ok(None),
125 Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone),
126 Some(StoreEntry::Object(mut metadata, bytes)) => {
127 metadata.size = Some(bytes.len());
128 let stream = crate::stream::single(bytes);
129 Ok(Some((metadata, stream)))
130 }
131 }
132 }
133
134 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
135 self.store.lock().unwrap().remove(id);
136 Ok(())
137 }
138}
139
140#[async_trait::async_trait]
141impl HighVolumeBackend for InMemoryBackend {
142 async fn put_non_tombstone(
143 &self,
144 id: &ObjectId,
145 metadata: &Metadata,
146 payload: Bytes,
147 ) -> Result<Option<Tombstone>> {
148 let mut store = self.store.lock().unwrap();
149 if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
150 return Ok(Some(tombstone));
151 }
152
153 let mut metadata = metadata.clone();
154 metadata.size = Some(payload.len());
155 store.insert(id.clone(), StoreEntry::Object(metadata, payload));
156 Ok(None)
157 }
158
159 async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
160 let entry = self.store.lock().unwrap().get(id).cloned();
161 Ok(match entry {
162 None => TieredGet::NotFound,
163 Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone),
164 Some(StoreEntry::Object(mut metadata, bytes)) => {
165 metadata.size = Some(bytes.len());
166 TieredGet::Object(metadata, crate::stream::single(bytes))
167 }
168 })
169 }
170
171 async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
172 let entry = self.store.lock().unwrap().get(id).cloned();
173 Ok(match entry {
174 None => TieredMetadata::NotFound,
175 Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone),
176 Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata),
177 })
178 }
179
180 async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
181 let mut store = self.store.lock().unwrap();
182 if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
183 return Ok(Some(tombstone));
184 }
185
186 store.remove(id);
187 Ok(None)
188 }
189
190 async fn compare_and_write(
191 &self,
192 id: &ObjectId,
193 current: Option<&ObjectId>,
194 write: TieredWrite,
195 ) -> Result<bool> {
196 let mut store = self.store.lock().unwrap();
197
198 let actual = store.get(id);
199 let matches_current = matches_redirect(actual, current);
200 let matches_next = matches_redirect(actual, write.target());
201
202 if matches_current {
203 match write {
204 TieredWrite::Tombstone(tombstone) => {
205 store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
206 }
207 TieredWrite::Object(metadata, payload) => {
208 store.insert(id.clone(), StoreEntry::Object(metadata, payload));
209 }
210 TieredWrite::Delete => {
211 store.remove(id);
212 }
213 }
214 }
215
216 Ok(matches_current || matches_next)
217 }
218}
219
220#[async_trait::async_trait]
221impl MultipartUploadBackend for InMemoryBackend {
222 async fn initiate_multipart(
223 &self,
224 id: &ObjectId,
225 metadata: &Metadata,
226 ) -> Result<InitiateMultipartResponse> {
227 let upload_id = uuid::Uuid::now_v7().to_string();
228 let upload = MultipartUpload {
229 metadata: metadata.clone(),
230 parts: BTreeMap::new(),
231 };
232 self.multipart_store
233 .lock()
234 .unwrap()
235 .insert((id.clone(), upload_id.clone()), upload);
236 Ok(upload_id)
237 }
238
239 async fn upload_part(
240 &self,
241 id: &ObjectId,
242 upload_id: &UploadId,
243 part_number: PartNumber,
244 _content_length: u64,
245 _content_md5: Option<&str>,
246 body: ClientStream,
247 ) -> Result<UploadPartResponse> {
248 let data: BytesMut = body.try_collect().await?;
249 let data = data.freeze();
250 let etag = format!("\"etag-{part_number}-{}\"", data.len());
251
252 let mut store = self.multipart_store.lock().unwrap();
253 let upload = store
254 .get_mut(&(id.clone(), upload_id.clone()))
255 .ok_or_else(|| Error::generic("multipart upload not found"))?;
256
257 upload.parts.insert(
258 part_number,
259 UploadedPart {
260 etag: etag.clone(),
261 data,
262 uploaded_at: SystemTime::now(),
263 },
264 );
265
266 Ok(etag)
267 }
268
269 async fn list_parts(
270 &self,
271 id: &ObjectId,
272 upload_id: &UploadId,
273 max_parts: Option<u32>,
274 part_number_marker: Option<PartNumber>,
275 ) -> Result<ListPartsResponse> {
276 let store = self.multipart_store.lock().unwrap();
277 let upload = store
278 .get(&(id.clone(), upload_id.clone()))
279 .ok_or_else(|| Error::generic("multipart upload not found"))?;
280
281 let iter = upload
282 .parts
283 .iter()
284 .filter(|(pn, _)| part_number_marker.is_none_or(|marker| **pn > marker));
285
286 let max = max_parts.unwrap_or(u32::MAX) as usize;
287 let all: Vec<_> = iter.collect();
288 let is_truncated = all.len() > max;
289 let page: Vec<_> = all.into_iter().take(max).collect();
290
291 let next_part_number_marker = if is_truncated {
292 page.last().map(|(pn, _)| **pn)
293 } else {
294 None
295 };
296
297 let parts = page
298 .into_iter()
299 .map(|(pn, part)| Part {
300 part_number: *pn,
301 etag: part.etag.clone(),
302 last_modified: part.uploaded_at,
303 size: part.data.len() as u64,
304 })
305 .collect();
306
307 Ok(ListPartsResponse {
308 parts,
309 is_truncated,
310 next_part_number_marker,
311 })
312 }
313
314 async fn abort_multipart(
315 &self,
316 id: &ObjectId,
317 upload_id: &UploadId,
318 ) -> Result<AbortMultipartResponse> {
319 self.multipart_store
320 .lock()
321 .unwrap()
322 .remove(&(id.clone(), upload_id.clone()));
323 Ok(())
324 }
325
326 async fn complete_multipart(
327 &self,
328 id: &ObjectId,
329 upload_id: &UploadId,
330 parts: Vec<CompletedPart>,
331 ) -> Result<CompleteMultipartResponse> {
332 let key = (id.clone(), upload_id.clone());
333
334 let assembled = {
341 let store = self.multipart_store.lock().unwrap();
342 let upload = store
343 .get(&key)
344 .ok_or_else(|| Error::generic("multipart upload not found"))?;
345
346 for completed in &parts {
347 match upload.parts.get(&completed.part_number) {
348 None => {
349 return Ok(Some(crate::multipart::CompleteMultipartError {
350 code: "InvalidPart".into(),
351 message: format!(
352 "part number {} was not uploaded",
353 completed.part_number
354 ),
355 }));
356 }
357 Some(stored) if stored.etag != completed.etag => {
358 return Ok(Some(crate::multipart::CompleteMultipartError {
359 code: "InvalidPart".into(),
360 message: format!(
361 "etag mismatch for part {}: expected {}, got {}",
362 completed.part_number, stored.etag, completed.etag
363 ),
364 }));
365 }
366 _ => {}
367 }
368 }
369
370 let mut payload = BytesMut::new();
371 for completed in &parts {
372 let stored = &upload.parts[&completed.part_number];
373 payload.extend_from_slice(&stored.data);
374 }
375
376 let mut metadata = upload.metadata.clone();
377 metadata.size = Some(payload.len());
378
379 (metadata, payload.freeze())
380 };
381
382 self.store
383 .lock()
384 .unwrap()
385 .insert(id.clone(), StoreEntry::Object(assembled.0, assembled.1));
386
387 self.multipart_store.lock().unwrap().remove(&key);
388
389 Ok(None)
390 }
391}
392
393fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
398 match expected {
399 None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
400 Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
401 }
402}
403
404#[derive(Clone, Debug)]
406pub enum Entry {
407 NotFound,
409 Object(Metadata, Bytes),
411 Tombstone(Tombstone),
413}
414
415impl Entry {
416 pub fn is_not_found(&self) -> bool {
418 matches!(self, Entry::NotFound)
419 }
420
421 pub fn is_object(&self) -> bool {
423 matches!(self, Entry::Object(_, _))
424 }
425
426 pub fn is_tombstone(&self) -> bool {
428 matches!(self, Entry::Tombstone(_))
429 }
430
431 pub fn expect_not_found(&self) {
433 match self {
434 Entry::NotFound => (),
435 _ => panic!("expected not found entry, got {:?}", self),
436 }
437 }
438
439 pub fn expect_object(&self) -> (Metadata, Bytes) {
441 match self {
442 Entry::Object(metadata, bytes) => (metadata.clone(), bytes.clone()),
443 _ => panic!("expected object entry, got {:?}", self),
444 }
445 }
446
447 pub fn expect_tombstone(&self) -> Tombstone {
449 match self {
450 Entry::Tombstone(tombstone) => tombstone.clone(),
451 _ => panic!("expected tombstone entry, got {:?}", self),
452 }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use std::time::Duration;
459
460 use objectstore_types::metadata::ExpirationPolicy;
461 use objectstore_types::scope::{Scope, Scopes};
462
463 use super::*;
464 use crate::backend::common::Backend;
465 use crate::id::ObjectContext;
466 use crate::stream;
467
468 fn make_id() -> ObjectId {
469 ObjectId::random(ObjectContext {
470 usecase: "testing".into(),
471 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
472 })
473 }
474
475 #[tokio::test]
476 async fn multipart_single_part() {
477 let backend = InMemoryBackend::new("test");
478 let id = make_id();
479 let metadata = Metadata {
480 content_type: "text/plain".into(),
481 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
482 origin: Some("203.0.113.42".into()),
483 custom: [("foo".into(), "bar".into())].into(),
484 ..Default::default()
485 };
486
487 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
488
489 let data = b"hello, multipart world!";
490 let etag = backend
491 .upload_part(
492 &id,
493 &upload_id,
494 1,
495 data.len() as u64,
496 None,
497 stream::single(data.to_vec()),
498 )
499 .await
500 .unwrap();
501
502 let result = backend
503 .complete_multipart(
504 &id,
505 &upload_id,
506 vec![CompletedPart {
507 part_number: 1,
508 etag,
509 }],
510 )
511 .await
512 .unwrap();
513 assert!(result.is_none(), "expected no error on complete");
514
515 let (meta, body) = backend.get_object(&id).await.unwrap().unwrap();
516 let payload = stream::read_to_vec(body).await.unwrap();
517 assert_eq!(payload, data);
518 assert_eq!(meta.content_type, "text/plain".to_string());
519 assert_eq!(
520 meta.expiration_policy,
521 ExpirationPolicy::TimeToIdle(Duration::from_secs(3600))
522 );
523 assert_eq!(meta.origin, Some("203.0.113.42".into()));
524 assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
525 }
526
527 #[tokio::test]
528 async fn multipart_multiple_parts() {
529 let backend = InMemoryBackend::new("test");
530 let id = make_id();
531 let metadata = Metadata::default();
532
533 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
534
535 let part1 = b"aaaa".to_vec();
536 let part2 = b"bbbb".to_vec();
537 let part3 = b"cc".to_vec();
538
539 let etag1 = backend
540 .upload_part(
541 &id,
542 &upload_id,
543 1,
544 part1.len() as u64,
545 None,
546 stream::single(part1.clone()),
547 )
548 .await
549 .unwrap();
550 let etag2 = backend
551 .upload_part(
552 &id,
553 &upload_id,
554 2,
555 part2.len() as u64,
556 None,
557 stream::single(part2.clone()),
558 )
559 .await
560 .unwrap();
561 let etag3 = backend
562 .upload_part(
563 &id,
564 &upload_id,
565 3,
566 part3.len() as u64,
567 None,
568 stream::single(part3.clone()),
569 )
570 .await
571 .unwrap();
572
573 let result = backend
574 .complete_multipart(
575 &id,
576 &upload_id,
577 vec![
578 CompletedPart {
579 part_number: 1,
580 etag: etag1,
581 },
582 CompletedPart {
583 part_number: 2,
584 etag: etag2,
585 },
586 CompletedPart {
587 part_number: 3,
588 etag: etag3,
589 },
590 ],
591 )
592 .await
593 .unwrap();
594 assert!(result.is_none());
595
596 let (_, body) = backend.get_object(&id).await.unwrap().unwrap();
597 let payload = stream::read_to_vec(body).await.unwrap();
598 assert_eq!(payload, b"aaaabbbbcc");
599 }
600
601 #[tokio::test]
602 async fn multipart_list_parts() {
603 let backend = InMemoryBackend::new("test");
604 let id = make_id();
605 let metadata = Metadata::default();
606
607 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
608
609 let etag1 = backend
610 .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec()))
611 .await
612 .unwrap();
613 let etag2 = backend
614 .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec()))
615 .await
616 .unwrap();
617
618 let list = backend
619 .list_parts(&id, &upload_id, None, None)
620 .await
621 .unwrap();
622 assert_eq!(list.parts.len(), 2);
623 assert_eq!(list.parts[0].part_number, 1);
624 assert_eq!(list.parts[0].etag, etag1);
625 assert_eq!(list.parts[0].size, 3);
626 assert_eq!(list.parts[1].part_number, 2);
627 assert_eq!(list.parts[1].etag, etag2);
628 assert_eq!(list.parts[1].size, 3);
629
630 let page1 = backend
632 .list_parts(&id, &upload_id, Some(1), None)
633 .await
634 .unwrap();
635 assert_eq!(page1.parts.len(), 1);
636 assert_eq!(page1.parts[0].part_number, 1);
637 assert!(page1.is_truncated);
638 assert!(page1.next_part_number_marker.is_some());
639
640 let page2 = backend
641 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
642 .await
643 .unwrap();
644 assert_eq!(page2.parts.len(), 1);
645 assert_eq!(page2.parts[0].part_number, 2);
646
647 backend.abort_multipart(&id, &upload_id).await.unwrap();
648 }
649
650 #[tokio::test]
651 async fn multipart_abort() {
652 let backend = InMemoryBackend::new("test");
653 let id = make_id();
654 let metadata = Metadata::default();
655
656 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
657
658 backend
659 .upload_part(
660 &id,
661 &upload_id,
662 1,
663 5,
664 None,
665 stream::single(b"hello".to_vec()),
666 )
667 .await
668 .unwrap();
669
670 backend.abort_multipart(&id, &upload_id).await.unwrap();
671
672 let result = backend.get_object(&id).await.unwrap();
673 assert!(result.is_none(), "object should not exist after abort");
674 }
675
676 #[tokio::test]
677 async fn multipart_invalid_etag() {
678 let backend = InMemoryBackend::new("test");
679 let id = make_id();
680 let metadata = Metadata::default();
681
682 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
683
684 let etag = backend
685 .upload_part(
686 &id,
687 &upload_id,
688 1,
689 5,
690 None,
691 stream::single(b"hello".to_vec()),
692 )
693 .await
694 .unwrap();
695
696 let result = backend
697 .complete_multipart(
698 &id,
699 &upload_id,
700 vec![CompletedPart {
701 part_number: 1,
702 etag: "wrong-etag".into(),
703 }],
704 )
705 .await
706 .unwrap();
707 assert!(result.is_some(), "expected error for bad etag");
708 assert_eq!(result.unwrap().code, "InvalidPart");
709
710 let result = backend
712 .complete_multipart(
713 &id,
714 &upload_id,
715 vec![CompletedPart {
716 part_number: 1,
717 etag,
718 }],
719 )
720 .await
721 .unwrap();
722 assert!(result.is_none(), "retry with correct etag should succeed");
723 }
724
725 #[tokio::test]
726 async fn multipart_missing_part() {
727 let backend = InMemoryBackend::new("test");
728 let id = make_id();
729 let metadata = Metadata::default();
730
731 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
732
733 let etag = backend
734 .upload_part(
735 &id,
736 &upload_id,
737 1,
738 5,
739 None,
740 stream::single(b"hello".to_vec()),
741 )
742 .await
743 .unwrap();
744
745 let result = backend
746 .complete_multipart(
747 &id,
748 &upload_id,
749 vec![CompletedPart {
750 part_number: 99,
751 etag: "whatever".into(),
752 }],
753 )
754 .await
755 .unwrap();
756 assert!(result.is_some(), "expected error for missing part");
757 assert_eq!(result.unwrap().code, "InvalidPart");
758
759 let result = backend
761 .complete_multipart(
762 &id,
763 &upload_id,
764 vec![CompletedPart {
765 part_number: 1,
766 etag,
767 }],
768 )
769 .await
770 .unwrap();
771 assert!(result.is_none(), "retry with correct part should succeed");
772 }
773}