Skip to main content

objectstore_service/backend/
in_memory.rs

1//! In-memory backend for tests.
2//!
3//! This provides a [`Backend`](super::common::Backend) backed by a `HashMap`,
4//! removing the need for filesystem tempdir management in unit tests. The
5//! backend is [`Clone`] so tests can hold a handle for direct inspection while
6//! the service owns a boxed copy.
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::{Arc, Mutex};
10use std::time::SystemTime;
11
12use bytes::{Bytes, BytesMut};
13use futures_util::TryStreamExt;
14use objectstore_types::metadata::Metadata;
15
16use super::common::{
17    DeleteResponse, GetResponse, HighVolumeBackend, MultipartUploadBackend, PutResponse, TieredGet,
18    TieredMetadata, TieredWrite, Tombstone,
19};
20use crate::error::{Error, Result};
21use crate::id::ObjectId;
22use crate::multipart::{
23    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
24    ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
25};
26use crate::stream::ClientStream;
27
28/// An entry in the in-memory store.
29#[derive(Clone, Debug)]
30enum StoreEntry {
31    Object(Metadata, Bytes),
32    Tombstone(Tombstone),
33}
34
35type Store = HashMap<ObjectId, StoreEntry>;
36
37#[derive(Clone, Debug)]
38struct MultipartUpload {
39    metadata: Metadata,
40    parts: BTreeMap<PartNumber, UploadedPart>,
41}
42
43#[derive(Clone, Debug)]
44struct UploadedPart {
45    etag: String,
46    data: Bytes,
47    uploaded_at: SystemTime,
48}
49
50type MultipartStore = HashMap<(ObjectId, UploadId), MultipartUpload>;
51
52/// In-memory [`Backend`](super::common::Backend) backed by a `HashMap`.
53///
54/// Removes the need for filesystem tempdir management in unit tests. The
55/// backend is [`Clone`] so tests can hold a handle for direct inspection while
56/// the service owns a boxed copy.
57#[derive(Debug, Clone)]
58pub struct InMemoryBackend {
59    name: &'static str,
60    store: Arc<Mutex<Store>>,
61    multipart_store: Arc<Mutex<MultipartStore>>,
62}
63
64impl InMemoryBackend {
65    /// Creates a new `InMemoryBackend` with the given diagnostic `name`.
66    pub fn new(name: &'static str) -> Self {
67        Self {
68            name,
69            store: Arc::new(Mutex::new(HashMap::new())),
70            multipart_store: Arc::new(Mutex::new(HashMap::new())),
71        }
72    }
73
74    /// Returns the stored entry for `id`, for direct inspection in tests.
75    pub fn get(&self, id: &ObjectId) -> Entry {
76        match self.store.lock().unwrap().get(id).cloned() {
77            None => Entry::NotFound,
78            Some(StoreEntry::Tombstone(tombstone)) => Entry::Tombstone(tombstone),
79            Some(StoreEntry::Object(metadata, bytes)) => Entry::Object(metadata, bytes),
80        }
81    }
82
83    /// Returns `true` if the backend contains an entry for the given id.
84    pub fn contains(&self, id: &ObjectId) -> bool {
85        self.store.lock().unwrap().contains_key(id)
86    }
87
88    /// Returns `true` if the backend has no stored objects.
89    pub fn is_empty(&self) -> bool {
90        self.store.lock().unwrap().is_empty()
91    }
92
93    /// Removes an entry directly, bypassing the `Backend` trait.
94    ///
95    /// Useful for simulating partial failures (e.g. orphan tombstones).
96    pub fn remove(&self, id: &ObjectId) {
97        self.store.lock().unwrap().remove(id);
98    }
99}
100
101#[async_trait::async_trait]
102impl super::common::Backend for InMemoryBackend {
103    fn name(&self) -> &'static str {
104        self.name
105    }
106
107    fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
108        Ok(self)
109    }
110
111    async fn put_object(
112        &self,
113        id: &ObjectId,
114        metadata: &Metadata,
115        stream: ClientStream,
116    ) -> Result<PutResponse> {
117        let bytes: BytesMut = stream.try_collect().await?;
118        self.store.lock().unwrap().insert(
119            id.clone(),
120            StoreEntry::Object(metadata.clone(), bytes.freeze()),
121        );
122        Ok(())
123    }
124
125    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
126        let entry = self.store.lock().unwrap().get(id).cloned();
127        match entry {
128            None => Ok(None),
129            Some(StoreEntry::Tombstone(_)) => Err(Error::UnexpectedTombstone),
130            Some(StoreEntry::Object(mut metadata, bytes)) => {
131                metadata.size = Some(bytes.len());
132                let stream = crate::stream::single(bytes);
133                Ok(Some((metadata, stream)))
134            }
135        }
136    }
137
138    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
139        self.store.lock().unwrap().remove(id);
140        Ok(())
141    }
142}
143
144#[async_trait::async_trait]
145impl HighVolumeBackend for InMemoryBackend {
146    async fn put_non_tombstone(
147        &self,
148        id: &ObjectId,
149        metadata: &Metadata,
150        payload: Bytes,
151    ) -> Result<Option<Tombstone>> {
152        let mut store = self.store.lock().unwrap();
153        if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
154            return Ok(Some(tombstone));
155        }
156
157        let mut metadata = metadata.clone();
158        metadata.size = Some(payload.len());
159        store.insert(id.clone(), StoreEntry::Object(metadata, payload));
160        Ok(None)
161    }
162
163    async fn get_tiered_object(&self, id: &ObjectId) -> Result<TieredGet> {
164        let entry = self.store.lock().unwrap().get(id).cloned();
165        Ok(match entry {
166            None => TieredGet::NotFound,
167            Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone),
168            Some(StoreEntry::Object(mut metadata, bytes)) => {
169                metadata.size = Some(bytes.len());
170                TieredGet::Object(metadata, crate::stream::single(bytes))
171            }
172        })
173    }
174
175    async fn get_tiered_metadata(&self, id: &ObjectId) -> Result<TieredMetadata> {
176        let entry = self.store.lock().unwrap().get(id).cloned();
177        Ok(match entry {
178            None => TieredMetadata::NotFound,
179            Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone),
180            Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata),
181        })
182    }
183
184    async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<Option<Tombstone>> {
185        let mut store = self.store.lock().unwrap();
186        if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() {
187            return Ok(Some(tombstone));
188        }
189
190        store.remove(id);
191        Ok(None)
192    }
193
194    async fn compare_and_write(
195        &self,
196        id: &ObjectId,
197        current: Option<&ObjectId>,
198        write: TieredWrite,
199    ) -> Result<bool> {
200        let mut store = self.store.lock().unwrap();
201
202        let actual = store.get(id);
203        let matches_current = matches_redirect(actual, current);
204        let matches_next = matches_redirect(actual, write.target());
205
206        if matches_current {
207            match write {
208                TieredWrite::Tombstone(tombstone) => {
209                    store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
210                }
211                TieredWrite::Object(metadata, payload) => {
212                    store.insert(id.clone(), StoreEntry::Object(metadata, payload));
213                }
214                TieredWrite::Delete => {
215                    store.remove(id);
216                }
217            }
218        }
219
220        Ok(matches_current || matches_next)
221    }
222}
223
224#[async_trait::async_trait]
225impl MultipartUploadBackend for InMemoryBackend {
226    async fn initiate_multipart(
227        &self,
228        id: &ObjectId,
229        metadata: &Metadata,
230    ) -> Result<InitiateMultipartResponse> {
231        let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?;
232        let upload = MultipartUpload {
233            metadata: metadata.clone(),
234            parts: BTreeMap::new(),
235        };
236        self.multipart_store
237            .lock()
238            .unwrap()
239            .insert((id.clone(), upload_id.clone()), upload);
240        Ok(upload_id)
241    }
242
243    async fn upload_part(
244        &self,
245        id: &ObjectId,
246        upload_id: &UploadId,
247        part_number: PartNumber,
248        _content_length: u64,
249        _content_md5: Option<&str>,
250        body: ClientStream,
251    ) -> Result<UploadPartResponse> {
252        let data: BytesMut = body.try_collect().await?;
253        let data = data.freeze();
254        let etag = format!("\"etag-{part_number}-{}\"", data.len());
255
256        let mut store = self.multipart_store.lock().unwrap();
257        let upload = store
258            .get_mut(&(id.clone(), upload_id.clone()))
259            .ok_or_else(|| Error::generic("multipart upload not found"))?;
260
261        upload.parts.insert(
262            part_number,
263            UploadedPart {
264                etag: etag.clone(),
265                data,
266                uploaded_at: SystemTime::now(),
267            },
268        );
269
270        Ok(etag)
271    }
272
273    async fn list_parts(
274        &self,
275        id: &ObjectId,
276        upload_id: &UploadId,
277        max_parts: Option<u32>,
278        part_number_marker: Option<PartNumber>,
279    ) -> Result<ListPartsResponse> {
280        let store = self.multipart_store.lock().unwrap();
281        let upload = store
282            .get(&(id.clone(), upload_id.clone()))
283            .ok_or_else(|| Error::generic("multipart upload not found"))?;
284
285        let iter = upload
286            .parts
287            .iter()
288            .filter(|(pn, _)| part_number_marker.is_none_or(|marker| **pn > marker));
289
290        let max = max_parts.unwrap_or(u32::MAX) as usize;
291        let all: Vec<_> = iter.collect();
292        let is_truncated = all.len() > max;
293        let page: Vec<_> = all.into_iter().take(max).collect();
294
295        let next_part_number_marker = if is_truncated {
296            page.last().map(|(pn, _)| **pn)
297        } else {
298            None
299        };
300
301        let parts = page
302            .into_iter()
303            .map(|(pn, part)| Part {
304                part_number: *pn,
305                etag: part.etag.clone(),
306                last_modified: part.uploaded_at,
307                size: part.data.len() as u64,
308            })
309            .collect();
310
311        Ok(ListPartsResponse {
312            parts,
313            is_truncated,
314            next_part_number_marker,
315        })
316    }
317
318    async fn abort_multipart(
319        &self,
320        id: &ObjectId,
321        upload_id: &UploadId,
322    ) -> Result<AbortMultipartResponse> {
323        self.multipart_store
324            .lock()
325            .unwrap()
326            .remove(&(id.clone(), upload_id.clone()));
327        Ok(())
328    }
329
330    async fn complete_multipart(
331        &self,
332        id: &ObjectId,
333        upload_id: &UploadId,
334        parts: Vec<CompletedPart>,
335    ) -> Result<CompleteMultipartResponse> {
336        let key = (id.clone(), upload_id.clone());
337
338        // TODO: validate that parts are in ascending part_number order and reject with
339        // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant.
340
341        // Validate and assemble while holding the multipart lock, but don't
342        // remove the upload yet — a failed validation must leave it intact so
343        // the client can retry.
344        let assembled = {
345            let store = self.multipart_store.lock().unwrap();
346            let upload = store
347                .get(&key)
348                .ok_or_else(|| Error::generic("multipart upload not found"))?;
349
350            for completed in &parts {
351                match upload.parts.get(&completed.part_number) {
352                    None => {
353                        return Ok(Some(crate::multipart::CompleteMultipartError {
354                            code: "InvalidPart".into(),
355                            message: format!(
356                                "part number {} was not uploaded",
357                                completed.part_number
358                            ),
359                        }));
360                    }
361                    Some(stored) if stored.etag != completed.etag => {
362                        return Ok(Some(crate::multipart::CompleteMultipartError {
363                            code: "InvalidPart".into(),
364                            message: format!(
365                                "etag mismatch for part {}: expected {}, got {}",
366                                completed.part_number, stored.etag, completed.etag
367                            ),
368                        }));
369                    }
370                    _ => {}
371                }
372            }
373
374            let mut payload = BytesMut::new();
375            for completed in &parts {
376                let stored = &upload.parts[&completed.part_number];
377                payload.extend_from_slice(&stored.data);
378            }
379
380            let mut metadata = upload.metadata.clone();
381            metadata.size = Some(payload.len());
382
383            (metadata, payload.freeze())
384        };
385
386        self.store
387            .lock()
388            .unwrap()
389            .insert(id.clone(), StoreEntry::Object(assembled.0, assembled.1));
390
391        self.multipart_store.lock().unwrap().remove(&key);
392
393        Ok(None)
394    }
395}
396
397/// Returns `true` if `entry` matches the expected tombstone redirect state.
398///
399/// - `expected = None`: matches any non-tombstone (absent or inline object).
400/// - `expected = Some(target)`: matches a tombstone whose redirect target equals `target`.
401fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
402    match expected {
403        None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
404        Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
405    }
406}
407
408/// Type returned by [`InMemoryBackend::get`] for direct inspection of stored entries.
409#[derive(Clone, Debug)]
410pub enum Entry {
411    /// No entry exists at this key.
412    NotFound,
413    /// A real object with its metadata and payload bytes.
414    Object(Metadata, Bytes),
415    /// A redirect tombstone indicating the real object lives in the long-term backend.
416    Tombstone(Tombstone),
417}
418
419impl Entry {
420    /// Returns `true` if the entry is [`Entry::NotFound`].
421    pub fn is_not_found(&self) -> bool {
422        matches!(self, Entry::NotFound)
423    }
424
425    /// Returns `true` if the entry is [`Entry::Object`].
426    pub fn is_object(&self) -> bool {
427        matches!(self, Entry::Object(_, _))
428    }
429
430    /// Returns `true` if the entry is [`Entry::Tombstone`].
431    pub fn is_tombstone(&self) -> bool {
432        matches!(self, Entry::Tombstone(_))
433    }
434
435    /// Panics unless the entry is [`Entry::NotFound`].
436    pub fn expect_not_found(&self) {
437        match self {
438            Entry::NotFound => (),
439            _ => panic!("expected not found entry, got {:?}", self),
440        }
441    }
442
443    /// Returns the metadata and payload bytes, panicking if the entry is not [`Entry::Object`].
444    pub fn expect_object(&self) -> (Metadata, Bytes) {
445        match self {
446            Entry::Object(metadata, bytes) => (metadata.clone(), bytes.clone()),
447            _ => panic!("expected object entry, got {:?}", self),
448        }
449    }
450
451    /// Returns the tombstone, panicking if the entry is not [`Entry::Tombstone`].
452    pub fn expect_tombstone(&self) -> Tombstone {
453        match self {
454            Entry::Tombstone(tombstone) => tombstone.clone(),
455            _ => panic!("expected tombstone entry, got {:?}", self),
456        }
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use std::num::NonZeroU32;
463    use std::time::Duration;
464
465    use objectstore_types::metadata::ExpirationPolicy;
466    use objectstore_types::scope::{Scope, Scopes};
467
468    use super::*;
469    use crate::backend::common::Backend;
470    use crate::id::ObjectContext;
471    use crate::stream;
472
473    fn make_id() -> ObjectId {
474        ObjectId::random(ObjectContext {
475            usecase: "testing".into(),
476            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
477        })
478    }
479
480    #[tokio::test]
481    async fn multipart_single_part() {
482        let backend = InMemoryBackend::new("test");
483        let id = make_id();
484        let metadata = Metadata {
485            content_type: "text/plain".into(),
486            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
487            origin: Some("203.0.113.42".into()),
488            custom: [("foo".into(), "bar".into())].into(),
489            ..Default::default()
490        };
491
492        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
493
494        let data = b"hello, multipart world!";
495        let etag = backend
496            .upload_part(
497                &id,
498                &upload_id,
499                NonZeroU32::new(1).unwrap(),
500                data.len() as u64,
501                None,
502                stream::single(data.to_vec()),
503            )
504            .await
505            .unwrap();
506
507        let result = backend
508            .complete_multipart(
509                &id,
510                &upload_id,
511                vec![CompletedPart {
512                    part_number: NonZeroU32::new(1).unwrap(),
513                    etag,
514                }],
515            )
516            .await
517            .unwrap();
518        assert!(result.is_none(), "expected no error on complete");
519
520        let (meta, body) = backend.get_object(&id).await.unwrap().unwrap();
521        let payload = stream::read_to_vec(body).await.unwrap();
522        assert_eq!(payload, data);
523        assert_eq!(meta.content_type, "text/plain".to_string());
524        assert_eq!(
525            meta.expiration_policy,
526            ExpirationPolicy::TimeToIdle(Duration::from_secs(3600))
527        );
528        assert_eq!(meta.origin, Some("203.0.113.42".into()));
529        assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
530    }
531
532    #[tokio::test]
533    async fn multipart_multiple_parts() {
534        let backend = InMemoryBackend::new("test");
535        let id = make_id();
536        let metadata = Metadata::default();
537
538        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
539
540        let part1 = b"aaaa".to_vec();
541        let part2 = b"bbbb".to_vec();
542        let part3 = b"cc".to_vec();
543
544        let etag1 = backend
545            .upload_part(
546                &id,
547                &upload_id,
548                NonZeroU32::new(1).unwrap(),
549                part1.len() as u64,
550                None,
551                stream::single(part1.clone()),
552            )
553            .await
554            .unwrap();
555        let etag2 = backend
556            .upload_part(
557                &id,
558                &upload_id,
559                NonZeroU32::new(2).unwrap(),
560                part2.len() as u64,
561                None,
562                stream::single(part2.clone()),
563            )
564            .await
565            .unwrap();
566        let etag3 = backend
567            .upload_part(
568                &id,
569                &upload_id,
570                NonZeroU32::new(3).unwrap(),
571                part3.len() as u64,
572                None,
573                stream::single(part3.clone()),
574            )
575            .await
576            .unwrap();
577
578        let result = backend
579            .complete_multipart(
580                &id,
581                &upload_id,
582                vec![
583                    CompletedPart {
584                        part_number: NonZeroU32::new(1).unwrap(),
585                        etag: etag1,
586                    },
587                    CompletedPart {
588                        part_number: NonZeroU32::new(2).unwrap(),
589                        etag: etag2,
590                    },
591                    CompletedPart {
592                        part_number: NonZeroU32::new(3).unwrap(),
593                        etag: etag3,
594                    },
595                ],
596            )
597            .await
598            .unwrap();
599        assert!(result.is_none());
600
601        let (_, body) = backend.get_object(&id).await.unwrap().unwrap();
602        let payload = stream::read_to_vec(body).await.unwrap();
603        assert_eq!(payload, b"aaaabbbbcc");
604    }
605
606    #[tokio::test]
607    async fn multipart_list_parts() {
608        let backend = InMemoryBackend::new("test");
609        let id = make_id();
610        let metadata = Metadata::default();
611
612        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
613
614        let etag1 = backend
615            .upload_part(
616                &id,
617                &upload_id,
618                NonZeroU32::new(1).unwrap(),
619                3,
620                None,
621                stream::single(b"aaa".to_vec()),
622            )
623            .await
624            .unwrap();
625        let etag2 = backend
626            .upload_part(
627                &id,
628                &upload_id,
629                NonZeroU32::new(2).unwrap(),
630                3,
631                None,
632                stream::single(b"bbb".to_vec()),
633            )
634            .await
635            .unwrap();
636
637        let list = backend
638            .list_parts(&id, &upload_id, None, None)
639            .await
640            .unwrap();
641        assert_eq!(list.parts.len(), 2);
642        assert_eq!(list.parts[0].part_number.get(), 1);
643        assert_eq!(list.parts[0].etag, etag1);
644        assert_eq!(list.parts[0].size, 3);
645        assert_eq!(list.parts[1].part_number.get(), 2);
646        assert_eq!(list.parts[1].etag, etag2);
647        assert_eq!(list.parts[1].size, 3);
648
649        // Pagination
650        let page1 = backend
651            .list_parts(&id, &upload_id, Some(1), None)
652            .await
653            .unwrap();
654        assert_eq!(page1.parts.len(), 1);
655        assert_eq!(page1.parts[0].part_number.get(), 1);
656        assert!(page1.is_truncated);
657        assert!(page1.next_part_number_marker.is_some());
658
659        let page2 = backend
660            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
661            .await
662            .unwrap();
663        assert_eq!(page2.parts.len(), 1);
664        assert_eq!(page2.parts[0].part_number.get(), 2);
665
666        backend.abort_multipart(&id, &upload_id).await.unwrap();
667    }
668
669    #[tokio::test]
670    async fn multipart_abort() {
671        let backend = InMemoryBackend::new("test");
672        let id = make_id();
673        let metadata = Metadata::default();
674
675        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
676
677        backend
678            .upload_part(
679                &id,
680                &upload_id,
681                NonZeroU32::new(1).unwrap(),
682                5,
683                None,
684                stream::single(b"hello".to_vec()),
685            )
686            .await
687            .unwrap();
688
689        backend.abort_multipart(&id, &upload_id).await.unwrap();
690
691        let result = backend.get_object(&id).await.unwrap();
692        assert!(result.is_none(), "object should not exist after abort");
693    }
694
695    #[tokio::test]
696    async fn multipart_invalid_etag() {
697        let backend = InMemoryBackend::new("test");
698        let id = make_id();
699        let metadata = Metadata::default();
700
701        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
702
703        let etag = backend
704            .upload_part(
705                &id,
706                &upload_id,
707                NonZeroU32::new(1).unwrap(),
708                5,
709                None,
710                stream::single(b"hello".to_vec()),
711            )
712            .await
713            .unwrap();
714
715        let result = backend
716            .complete_multipart(
717                &id,
718                &upload_id,
719                vec![CompletedPart {
720                    part_number: NonZeroU32::new(1).unwrap(),
721                    etag: "wrong-etag".into(),
722                }],
723            )
724            .await
725            .unwrap();
726        assert!(result.is_some(), "expected error for bad etag");
727        assert_eq!(result.unwrap().code, "InvalidPart");
728
729        // Upload must survive a failed complete so the client can retry.
730        let result = backend
731            .complete_multipart(
732                &id,
733                &upload_id,
734                vec![CompletedPart {
735                    part_number: NonZeroU32::new(1).unwrap(),
736                    etag,
737                }],
738            )
739            .await
740            .unwrap();
741        assert!(result.is_none(), "retry with correct etag should succeed");
742    }
743
744    #[tokio::test]
745    async fn multipart_missing_part() {
746        let backend = InMemoryBackend::new("test");
747        let id = make_id();
748        let metadata = Metadata::default();
749
750        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
751
752        let etag = backend
753            .upload_part(
754                &id,
755                &upload_id,
756                NonZeroU32::new(1).unwrap(),
757                5,
758                None,
759                stream::single(b"hello".to_vec()),
760            )
761            .await
762            .unwrap();
763
764        let result = backend
765            .complete_multipart(
766                &id,
767                &upload_id,
768                vec![CompletedPart {
769                    part_number: NonZeroU32::new(99).unwrap(),
770                    etag: "whatever".into(),
771                }],
772            )
773            .await
774            .unwrap();
775        assert!(result.is_some(), "expected error for missing part");
776        assert_eq!(result.unwrap().code, "InvalidPart");
777
778        // Upload must survive a failed complete so the client can retry.
779        let result = backend
780            .complete_multipart(
781                &id,
782                &upload_id,
783                vec![CompletedPart {
784                    part_number: NonZeroU32::new(1).unwrap(),
785                    etag,
786                }],
787            )
788            .await
789            .unwrap();
790        assert!(result.is_none(), "retry with correct part should succeed");
791    }
792}