Skip to main content

objectstore_service/backend/
in_memory.rs

1//! In-memory backend for tests.
2//!
3//! This provides a [`Backend`](super::common::Backend) backed by a `HashMap`,
4//! removing the need for filesystem tempdir management in unit tests. The
5//! backend is [`Clone`] so tests can hold a handle for direct inspection while
6//! the service owns a boxed copy.
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::{Arc, Mutex};
10use std::time::SystemTime;
11
12use objectstore_types::range::ByteRange;
13
14use bytes::{Bytes, BytesMut};
15use futures_util::TryStreamExt;
16use objectstore_types::metadata::Metadata;
17
18use super::common::{
19    DeleteResponse, GetResponse, HighVolumeBackend, MultipartUploadBackend, PutResponse, TieredGet,
20    TieredMetadata, TieredWrite, Tombstone,
21};
22use crate::error::{Error, Result};
23use crate::id::ObjectId;
24use crate::multipart::{
25    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
26    ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
27};
28use crate::stream::ClientStream;
29
30/// An entry in the in-memory store.
31#[derive(Clone, Debug)]
32enum StoreEntry {
33    Object(Metadata, Bytes),
34    Tombstone(Tombstone),
35}
36
37type Store = HashMap<ObjectId, StoreEntry>;
38
39#[derive(Clone, Debug)]
40struct MultipartUpload {
41    metadata: Metadata,
42    parts: BTreeMap<PartNumber, UploadedPart>,
43}
44
45#[derive(Clone, Debug)]
46struct UploadedPart {
47    etag: String,
48    data: Bytes,
49    uploaded_at: SystemTime,
50}
51
52type MultipartStore = HashMap<(ObjectId, UploadId), MultipartUpload>;
53
54/// In-memory [`Backend`](super::common::Backend) backed by a `HashMap`.
55///
56/// Removes the need for filesystem tempdir management in unit tests. The
57/// backend is [`Clone`] so tests can hold a handle for direct inspection while
58/// the service owns a boxed copy.
59#[derive(Debug, Clone)]
60pub struct InMemoryBackend {
61    name: &'static str,
62    store: Arc<Mutex<Store>>,
63    multipart_store: Arc<Mutex<MultipartStore>>,
64}
65
66impl InMemoryBackend {
67    /// Creates a new `InMemoryBackend` with the given diagnostic `name`.
68    pub fn new(name: &'static str) -> Self {
69        Self {
70            name,
71            store: Arc::new(Mutex::new(HashMap::new())),
72            multipart_store: Arc::new(Mutex::new(HashMap::new())),
73        }
74    }
75
76    /// Returns the stored entry for `id`, for direct inspection in tests.
77    pub fn get(&self, id: &ObjectId) -> Entry {
78        match self.store.lock().unwrap().get(id).cloned() {
79            None => Entry::NotFound,
80            Some(StoreEntry::Tombstone(tombstone)) => Entry::Tombstone(tombstone),
81            Some(StoreEntry::Object(metadata, bytes)) => Entry::Object(metadata, bytes),
82        }
83    }
84
85    /// Returns `true` if the backend contains an entry for the given id.
86    pub fn contains(&self, id: &ObjectId) -> bool {
87        self.store.lock().unwrap().contains_key(id)
88    }
89
90    /// Returns `true` if the backend has no stored objects.
91    pub fn is_empty(&self) -> bool {
92        self.store.lock().unwrap().is_empty()
93    }
94
95    /// Removes an entry directly, bypassing the `Backend` trait.
96    ///
97    /// Useful for simulating partial failures (e.g. orphan tombstones).
98    pub fn remove(&self, id: &ObjectId) {
99        self.store.lock().unwrap().remove(id);
100    }
101}
102
103#[async_trait::async_trait]
104impl super::common::Backend for InMemoryBackend {
105    fn name(&self) -> &'static str {
106        self.name
107    }
108
109    fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
110        Ok(self)
111    }
112
113    async fn put_object(
114        &self,
115        id: &ObjectId,
116        metadata: &Metadata,
117        stream: ClientStream,
118    ) -> Result<PutResponse> {
119        let bytes: BytesMut = stream.try_collect().await?;
120        self.store.lock().unwrap().insert(
121            id.clone(),
122            StoreEntry::Object(metadata.clone(), bytes.freeze()),
123        );
124        Ok(())
125    }
126
127    async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
128        let entry = self.store.lock().unwrap().get(id).cloned();
129        match entry {
130            None => Ok(None),
131            Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone),
132            Some(StoreEntry::Object(mut metadata, bytes)) => {
133                let total = bytes.len() as u64;
134                metadata.size = Some(bytes.len());
135                let (content_range, payload) = match range {
136                    Some(range) => {
137                        let content_range = range
138                            .resolve(total)
139                            .ok_or(Error::RangeNotSatisfiable { total })?;
140                        let sliced =
141                            bytes.slice(content_range.start as usize..=content_range.end as usize);
142                        (Some(content_range), sliced)
143                    }
144                    None => (None, bytes),
145                };
146                Ok(Some((
147                    metadata,
148                    content_range,
149                    crate::stream::single(payload),
150                )))
151            }
152        }
153    }
154
155    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
156        self.store.lock().unwrap().remove(id);
157        Ok(())
158    }
159}
160
161#[async_trait::async_trait]
162impl HighVolumeBackend for InMemoryBackend {
163    async fn put_non_tombstone(
164        &self,
165        id: &ObjectId,
166        metadata: &Metadata,
167        payload: Bytes,
168    ) -> Result<Option<Tombstone>> {
169        let mut store = self.store.lock().unwrap();
170        if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
171            return Ok(Some(tombstone));
172        }
173
174        let mut metadata = metadata.clone();
175        metadata.size = Some(payload.len());
176        store.insert(id.clone(), StoreEntry::Object(metadata, payload));
177        Ok(None)
178    }
179
180    async fn get_tiered_object(
181        &self,
182        id: &ObjectId,
183        range: Option<ByteRange>,
184    ) -> Result<TieredGet> {
185        let entry = self.store.lock().unwrap().get(id).cloned();
186        Ok(match entry {
187            None => TieredGet::NotFound,
188            Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone),
189            Some(StoreEntry::Object(mut metadata, bytes)) => {
190                let total = bytes.len() as u64;
191                metadata.size = Some(bytes.len());
192                let (content_range, payload) = match range {
193                    Some(range) => {
194                        let content_range = range
195                            .resolve(total)
196                            .ok_or(Error::RangeNotSatisfiable { total })?;
197                        let sliced =
198                            bytes.slice(content_range.start as usize..=content_range.end as usize);
199                        (Some(content_range), sliced)
200                    }
201                    None => (None, bytes),
202                };
203                TieredGet::Object(metadata, content_range, crate::stream::single(payload))
204            }
205        })
206    }
207
208    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
209        let entry = self.store.lock().unwrap().get(id).cloned();
210        Ok(match entry {
211            None => TieredMetadata::NotFound,
212            Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone),
213            Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata),
214        })
215    }
216
217    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
218        let mut store = self.store.lock().unwrap();
219        if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
220            return Ok(Some(tombstone));
221        }
222
223        store.remove(id);
224        Ok(None)
225    }
226
227    async fn compare_and_write(
228        &self,
229        id: &ObjectId,
230        current: Option<&ObjectId>,
231        write: TieredWrite,
232    ) -> Result<bool> {
233        let mut store = self.store.lock().unwrap();
234
235        let actual = store.get(id);
236        let matches_current = matches_redirect(actual, current);
237        let matches_next = matches_redirect(actual, write.target());
238
239        if matches_current {
240            match write {
241                TieredWrite::Tombstone(tombstone) => {
242                    store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
243                }
244                TieredWrite::Object(metadata, payload) => {
245                    store.insert(id.clone(), StoreEntry::Object(metadata, payload));
246                }
247                TieredWrite::Delete => {
248                    store.remove(id);
249                }
250            }
251        }
252
253        Ok(matches_current || matches_next)
254    }
255}
256
257#[async_trait::async_trait]
258impl MultipartUploadBackend for InMemoryBackend {
259    async fn initiate_multipart(
260        &self,
261        id: &ObjectId,
262        metadata: &Metadata,
263    ) -> Result<InitiateMultipartResponse> {
264        let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?;
265        let upload = MultipartUpload {
266            metadata: metadata.clone(),
267            parts: BTreeMap::new(),
268        };
269        self.multipart_store
270            .lock()
271            .unwrap()
272            .insert((id.clone(), upload_id.clone()), upload);
273        Ok(upload_id)
274    }
275
276    async fn upload_part(
277        &self,
278        id: &ObjectId,
279        upload_id: &UploadId,
280        part_number: PartNumber,
281        _content_length: u64,
282        _content_md5: Option<&str>,
283        body: ClientStream,
284    ) -> Result<UploadPartResponse> {
285        let data: BytesMut = body.try_collect().await?;
286        let data = data.freeze();
287        let etag = format!("\"etag-{part_number}-{}\"", data.len());
288
289        let mut store = self.multipart_store.lock().unwrap();
290        let upload = store
291            .get_mut(&(id.clone(), upload_id.clone()))
292            .ok_or_else(|| Error::generic("multipart upload not found"))?;
293
294        upload.parts.insert(
295            part_number,
296            UploadedPart {
297                etag: etag.clone(),
298                data,
299                uploaded_at: SystemTime::now(),
300            },
301        );
302
303        Ok(etag)
304    }
305
306    async fn list_parts(
307        &self,
308        id: &ObjectId,
309        upload_id: &UploadId,
310        max_parts: Option<u32>,
311        part_number_marker: Option<PartNumber>,
312    ) -> Result<ListPartsResponse> {
313        let store = self.multipart_store.lock().unwrap();
314        let upload = store
315            .get(&(id.clone(), upload_id.clone()))
316            .ok_or_else(|| Error::generic("multipart upload not found"))?;
317
318        let iter = upload
319            .parts
320            .iter()
321            .filter(|(pn, _)| part_number_marker.is_none_or(|marker| **pn > marker));
322
323        let max = max_parts.unwrap_or(u32::MAX) as usize;
324        let all: Vec<_> = iter.collect();
325        let is_truncated = all.len() > max;
326        let page: Vec<_> = all.into_iter().take(max).collect();
327
328        let next_part_number_marker = if is_truncated {
329            page.last().map(|(pn, _)| **pn)
330        } else {
331            None
332        };
333
334        let parts = page
335            .into_iter()
336            .map(|(pn, part)| Part {
337                part_number: *pn,
338                etag: part.etag.clone(),
339                last_modified: part.uploaded_at,
340                size: part.data.len() as u64,
341            })
342            .collect();
343
344        Ok(ListPartsResponse {
345            parts,
346            is_truncated,
347            next_part_number_marker,
348        })
349    }
350
351    async fn abort_multipart(
352        &self,
353        id: &ObjectId,
354        upload_id: &UploadId,
355    ) -> Result<AbortMultipartResponse> {
356        self.multipart_store
357            .lock()
358            .unwrap()
359            .remove(&(id.clone(), upload_id.clone()));
360        Ok(())
361    }
362
363    async fn complete_multipart(
364        &self,
365        id: &ObjectId,
366        upload_id: &UploadId,
367        parts: Vec<CompletedPart>,
368    ) -> Result<CompleteMultipartResponse> {
369        let key = (id.clone(), upload_id.clone());
370
371        // TODO: validate that parts are in ascending part_number order and reject with
372        // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant.
373
374        // Validate and assemble while holding the multipart lock, but don't
375        // remove the upload yet — a failed validation must leave it intact so
376        // the client can retry.
377        let assembled = {
378            let store = self.multipart_store.lock().unwrap();
379            let upload = store
380                .get(&key)
381                .ok_or_else(|| Error::generic("multipart upload not found"))?;
382
383            for completed in &parts {
384                match upload.parts.get(&completed.part_number) {
385                    None => {
386                        return Ok(Some(crate::multipart::CompleteMultipartError {
387                            code: "InvalidPart".into(),
388                            message: format!(
389                                "part number {} was not uploaded",
390                                completed.part_number
391                            ),
392                        }));
393                    }
394                    Some(stored) if stored.etag != completed.etag => {
395                        return Ok(Some(crate::multipart::CompleteMultipartError {
396                            code: "InvalidPart".into(),
397                            message: format!(
398                                "etag mismatch for part {}: expected {}, got {}",
399                                completed.part_number, stored.etag, completed.etag
400                            ),
401                        }));
402                    }
403                    _ => {}
404                }
405            }
406
407            let mut payload = BytesMut::new();
408            for completed in &parts {
409                let stored = &upload.parts[&completed.part_number];
410                payload.extend_from_slice(&stored.data);
411            }
412
413            let mut metadata = upload.metadata.clone();
414            metadata.size = Some(payload.len());
415
416            (metadata, payload.freeze())
417        };
418
419        self.store
420            .lock()
421            .unwrap()
422            .insert(id.clone(), StoreEntry::Object(assembled.0, assembled.1));
423
424        self.multipart_store.lock().unwrap().remove(&key);
425
426        Ok(None)
427    }
428}
429
430/// Returns `true` if `entry` matches the expected tombstone redirect state.
431///
432/// - `expected = None`: matches any non-tombstone (absent or inline object).
433/// - `expected = Some(target)`: matches a tombstone whose redirect target equals `target`.
434fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
435    match expected {
436        None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
437        Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
438    }
439}
440
441/// Type returned by [`InMemoryBackend::get`] for direct inspection of stored entries.
442#[derive(Clone, Debug)]
443pub enum Entry {
444    /// No entry exists at this key.
445    NotFound,
446    /// A real object with its metadata and payload bytes.
447    Object(Metadata, Bytes),
448    /// A redirect tombstone indicating the real object lives in the long-term backend.
449    Tombstone(Tombstone),
450}
451
452impl Entry {
453    /// Returns `true` if the entry is [`Entry::NotFound`].
454    pub fn is_not_found(&self) -> bool {
455        matches!(self, Entry::NotFound)
456    }
457
458    /// Returns `true` if the entry is [`Entry::Object`].
459    pub fn is_object(&self) -> bool {
460        matches!(self, Entry::Object(_, _))
461    }
462
463    /// Returns `true` if the entry is [`Entry::Tombstone`].
464    pub fn is_tombstone(&self) -> bool {
465        matches!(self, Entry::Tombstone(_))
466    }
467
468    /// Panics unless the entry is [`Entry::NotFound`].
469    pub fn expect_not_found(&self) {
470        match self {
471            Entry::NotFound => (),
472            _ => panic!("expected not found entry, got {self:?}"),
473        }
474    }
475
476    /// Returns the metadata and payload bytes, panicking if the entry is not [`Entry::Object`].
477    pub fn expect_object(&self) -> (Metadata, Bytes) {
478        match self {
479            Entry::Object(metadata, bytes) => (metadata.clone(), bytes.clone()),
480            _ => panic!("expected object entry, got {self:?}"),
481        }
482    }
483
484    /// Returns the tombstone, panicking if the entry is not [`Entry::Tombstone`].
485    pub fn expect_tombstone(&self) -> Tombstone {
486        match self {
487            Entry::Tombstone(tombstone) => tombstone.clone(),
488            _ => panic!("expected tombstone entry, got {self:?}"),
489        }
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use std::num::NonZeroU32;
496    use std::time::Duration;
497
498    use objectstore_types::metadata::ExpirationPolicy;
499    use objectstore_types::scope::{Scope, Scopes};
500
501    use super::*;
502    use crate::backend::common::Backend;
503    use crate::id::ObjectContext;
504    use crate::stream;
505
506    fn make_id() -> ObjectId {
507        ObjectId::random(ObjectContext {
508            usecase: "testing".into(),
509            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
510        })
511    }
512
513    #[tokio::test]
514    async fn multipart_single_part() {
515        let backend = InMemoryBackend::new("test");
516        let id = make_id();
517        let metadata = Metadata {
518            content_type: "text/plain".into(),
519            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_hours(1)),
520            origin: Some("203.0.113.42".into()),
521            custom: [("foo".into(), "bar".into())].into(),
522            ..Default::default()
523        };
524
525        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
526
527        let data = b"hello, multipart world!";
528        let etag = backend
529            .upload_part(
530                &id,
531                &upload_id,
532                NonZeroU32::new(1).unwrap(),
533                data.len() as u64,
534                None,
535                stream::single(data.to_vec()),
536            )
537            .await
538            .unwrap();
539
540        let result = backend
541            .complete_multipart(
542                &id,
543                &upload_id,
544                vec![CompletedPart {
545                    part_number: NonZeroU32::new(1).unwrap(),
546                    etag,
547                }],
548            )
549            .await
550            .unwrap();
551        assert!(result.is_none(), "expected no error on complete");
552
553        let (meta, _, body) = backend.get_object(&id, None).await.unwrap().unwrap();
554        let payload = stream::read_to_vec(body).await.unwrap();
555        assert_eq!(payload, data);
556        assert_eq!(meta.content_type, "text/plain".to_string());
557        assert_eq!(
558            meta.expiration_policy,
559            ExpirationPolicy::TimeToIdle(Duration::from_hours(1))
560        );
561        assert_eq!(meta.origin, Some("203.0.113.42".into()));
562        assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
563    }
564
565    #[tokio::test]
566    async fn multipart_multiple_parts() {
567        let backend = InMemoryBackend::new("test");
568        let id = make_id();
569        let metadata = Metadata::default();
570
571        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
572
573        let part1 = b"aaaa".to_vec();
574        let part2 = b"bbbb".to_vec();
575        let part3 = b"cc".to_vec();
576
577        let etag1 = backend
578            .upload_part(
579                &id,
580                &upload_id,
581                NonZeroU32::new(1).unwrap(),
582                part1.len() as u64,
583                None,
584                stream::single(part1.clone()),
585            )
586            .await
587            .unwrap();
588        let etag2 = backend
589            .upload_part(
590                &id,
591                &upload_id,
592                NonZeroU32::new(2).unwrap(),
593                part2.len() as u64,
594                None,
595                stream::single(part2.clone()),
596            )
597            .await
598            .unwrap();
599        let etag3 = backend
600            .upload_part(
601                &id,
602                &upload_id,
603                NonZeroU32::new(3).unwrap(),
604                part3.len() as u64,
605                None,
606                stream::single(part3.clone()),
607            )
608            .await
609            .unwrap();
610
611        let result = backend
612            .complete_multipart(
613                &id,
614                &upload_id,
615                vec![
616                    CompletedPart {
617                        part_number: NonZeroU32::new(1).unwrap(),
618                        etag: etag1,
619                    },
620                    CompletedPart {
621                        part_number: NonZeroU32::new(2).unwrap(),
622                        etag: etag2,
623                    },
624                    CompletedPart {
625                        part_number: NonZeroU32::new(3).unwrap(),
626                        etag: etag3,
627                    },
628                ],
629            )
630            .await
631            .unwrap();
632        assert!(result.is_none());
633
634        let (_, _, body) = backend.get_object(&id, None).await.unwrap().unwrap();
635        let payload = stream::read_to_vec(body).await.unwrap();
636        assert_eq!(payload, b"aaaabbbbcc");
637    }
638
639    #[tokio::test]
640    async fn multipart_list_parts() {
641        let backend = InMemoryBackend::new("test");
642        let id = make_id();
643        let metadata = Metadata::default();
644
645        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
646
647        let etag1 = backend
648            .upload_part(
649                &id,
650                &upload_id,
651                NonZeroU32::new(1).unwrap(),
652                3,
653                None,
654                stream::single(b"aaa".to_vec()),
655            )
656            .await
657            .unwrap();
658        let etag2 = backend
659            .upload_part(
660                &id,
661                &upload_id,
662                NonZeroU32::new(2).unwrap(),
663                3,
664                None,
665                stream::single(b"bbb".to_vec()),
666            )
667            .await
668            .unwrap();
669
670        let list = backend
671            .list_parts(&id, &upload_id, None, None)
672            .await
673            .unwrap();
674        assert_eq!(list.parts.len(), 2);
675        assert_eq!(list.parts[0].part_number.get(), 1);
676        assert_eq!(list.parts[0].etag, etag1);
677        assert_eq!(list.parts[0].size, 3);
678        assert_eq!(list.parts[1].part_number.get(), 2);
679        assert_eq!(list.parts[1].etag, etag2);
680        assert_eq!(list.parts[1].size, 3);
681
682        // Pagination
683        let page1 = backend
684            .list_parts(&id, &upload_id, Some(1), None)
685            .await
686            .unwrap();
687        assert_eq!(page1.parts.len(), 1);
688        assert_eq!(page1.parts[0].part_number.get(), 1);
689        assert!(page1.is_truncated);
690        assert!(page1.next_part_number_marker.is_some());
691
692        let page2 = backend
693            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
694            .await
695            .unwrap();
696        assert_eq!(page2.parts.len(), 1);
697        assert_eq!(page2.parts[0].part_number.get(), 2);
698
699        backend.abort_multipart(&id, &upload_id).await.unwrap();
700    }
701
702    #[tokio::test]
703    async fn multipart_abort() {
704        let backend = InMemoryBackend::new("test");
705        let id = make_id();
706        let metadata = Metadata::default();
707
708        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
709
710        backend
711            .upload_part(
712                &id,
713                &upload_id,
714                NonZeroU32::new(1).unwrap(),
715                5,
716                None,
717                stream::single(b"hello".to_vec()),
718            )
719            .await
720            .unwrap();
721
722        backend.abort_multipart(&id, &upload_id).await.unwrap();
723
724        let result = backend.get_object(&id, None).await.unwrap();
725        assert!(result.is_none(), "object should not exist after abort");
726    }
727
728    #[tokio::test]
729    async fn multipart_invalid_etag() {
730        let backend = InMemoryBackend::new("test");
731        let id = make_id();
732        let metadata = Metadata::default();
733
734        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
735
736        let etag = backend
737            .upload_part(
738                &id,
739                &upload_id,
740                NonZeroU32::new(1).unwrap(),
741                5,
742                None,
743                stream::single(b"hello".to_vec()),
744            )
745            .await
746            .unwrap();
747
748        let result = backend
749            .complete_multipart(
750                &id,
751                &upload_id,
752                vec![CompletedPart {
753                    part_number: NonZeroU32::new(1).unwrap(),
754                    etag: "wrong-etag".into(),
755                }],
756            )
757            .await
758            .unwrap();
759        assert!(result.is_some(), "expected error for bad etag");
760        assert_eq!(result.unwrap().code, "InvalidPart");
761
762        // Upload must survive a failed complete so the client can retry.
763        let result = backend
764            .complete_multipart(
765                &id,
766                &upload_id,
767                vec![CompletedPart {
768                    part_number: NonZeroU32::new(1).unwrap(),
769                    etag,
770                }],
771            )
772            .await
773            .unwrap();
774        assert!(result.is_none(), "retry with correct etag should succeed");
775    }
776
777    #[tokio::test]
778    async fn multipart_missing_part() {
779        let backend = InMemoryBackend::new("test");
780        let id = make_id();
781        let metadata = Metadata::default();
782
783        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
784
785        let etag = backend
786            .upload_part(
787                &id,
788                &upload_id,
789                NonZeroU32::new(1).unwrap(),
790                5,
791                None,
792                stream::single(b"hello".to_vec()),
793            )
794            .await
795            .unwrap();
796
797        let result = backend
798            .complete_multipart(
799                &id,
800                &upload_id,
801                vec![CompletedPart {
802                    part_number: NonZeroU32::new(99).unwrap(),
803                    etag: "whatever".into(),
804                }],
805            )
806            .await
807            .unwrap();
808        assert!(result.is_some(), "expected error for missing part");
809        assert_eq!(result.unwrap().code, "InvalidPart");
810
811        // Upload must survive a failed complete so the client can retry.
812        let result = backend
813            .complete_multipart(
814                &id,
815                &upload_id,
816                vec![CompletedPart {
817                    part_number: NonZeroU32::new(1).unwrap(),
818                    etag,
819                }],
820            )
821            .await
822            .unwrap();
823        assert!(result.is_none(), "retry with correct part should succeed");
824    }
825}