Skip to main content

objectstore_service/backend/
local_fs.rs

1//! Local filesystem backend for development and testing.
2
3use std::io::ErrorKind;
4use std::path::PathBuf;
5use std::pin::pin;
6use std::time::SystemTime;
7
8use futures_util::StreamExt;
9use objectstore_types::metadata::Metadata;
10use tokio::fs::OpenOptions;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
12use tokio_util::io::{ReaderStream, StreamReader};
13
14use crate::backend::common::{
15    Backend, DeleteResponse, GetResponse, MultipartUploadBackend, PutResponse,
16};
17use crate::error::{Error, Result};
18use crate::id::ObjectId;
19use crate::multipart::{
20    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
21    ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
22};
23use crate::stream::{self, ClientStream};
24
25/// Configuration for [`LocalFsBackend`].
26///
27/// Stores objects as files on the local filesystem. Suitable for development, testing,
28/// and single-server deployments.
29///
30/// # Example
31///
32/// ```yaml
33/// storage:
34///   type: filesystem
35///   path: /data
36/// ```
37#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
38pub struct FileSystemConfig {
39    /// Directory path for storing objects.
40    ///
41    /// The directory will be created if it doesn't exist. Relative paths are resolved from
42    /// the server's working directory.
43    ///
44    /// # Default
45    ///
46    /// `"data"` (relative to the server's working directory)
47    ///
48    /// # Environment Variables
49    ///
50    /// - `OS__STORAGE__TYPE=filesystem`
51    /// - `OS__STORAGE__PATH=/path/to/storage`
52    pub path: PathBuf,
53}
54
55/// Local filesystem backend for development and testing.
56#[derive(Debug)]
57pub struct LocalFsBackend {
58    path: PathBuf,
59}
60
61impl LocalFsBackend {
62    /// Creates a new [`LocalFsBackend`] rooted at the directory in `config`.
63    pub fn new(config: FileSystemConfig) -> Self {
64        Self { path: config.path }
65    }
66}
67
68#[async_trait::async_trait]
69impl Backend for LocalFsBackend {
70    fn name(&self) -> &'static str {
71        "local-fs"
72    }
73
74    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
75    async fn put_object(
76        &self,
77        id: &ObjectId,
78        metadata: &Metadata,
79        stream: ClientStream,
80    ) -> Result<PutResponse> {
81        let path = self.path.join(id.as_storage_path().to_string());
82        objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend");
83        tokio::fs::create_dir_all(path.parent().unwrap()).await?;
84        let file = OpenOptions::new()
85            .create(true)
86            .write(true)
87            .truncate(true)
88            .open(path)
89            .await?;
90
91        let mut reader = pin!(StreamReader::new(stream));
92        let mut writer = BufWriter::new(file);
93
94        let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
95            context: "failed to serialize metadata".to_string(),
96            cause,
97        })?;
98        writer.write_all(metadata_json.as_bytes()).await?;
99        writer.write_all(b"\n").await?;
100
101        tokio::io::copy(&mut reader, &mut writer)
102            .await
103            .map_err(|e| match stream::unpack_client_error(&e) {
104                Some(ce) => Error::Client(ce),
105                None => e.into(),
106            })?;
107
108        writer.flush().await?;
109        let file = writer.into_inner();
110        file.sync_data().await?;
111        drop(file);
112
113        Ok(())
114    }
115
116    // TODO: Return `Ok(None)` if object is found but past expiry
117    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
118    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
119        objectstore_log::debug!("Reading from local_fs backend");
120        let path = self.path.join(id.as_storage_path().to_string());
121        let file = match OpenOptions::new().read(true).open(path).await {
122            Ok(file) => file,
123            Err(err) if err.kind() == ErrorKind::NotFound => {
124                objectstore_log::debug!("Object not found");
125                return Ok(None);
126            }
127            err => err?,
128        };
129
130        let mut reader = BufReader::new(file);
131        let mut metadata_line = String::new();
132        reader.read_line(&mut metadata_line).await?;
133        let file_len = reader.get_ref().metadata().await?.len();
134        let mut metadata: Metadata =
135            serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde {
136                context: "failed to deserialize metadata".to_string(),
137                cause,
138            })?;
139        let payload_size = file_len
140            .checked_sub(metadata_line.len() as u64)
141            .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?;
142        metadata.size = Some(payload_size as usize);
143
144        let stream = ReaderStream::new(reader);
145        Ok(Some((metadata, stream.boxed())))
146    }
147
148    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
149    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
150        objectstore_log::debug!("Deleting from local_fs backend");
151        let path = self.path.join(id.as_storage_path().to_string());
152        let result = tokio::fs::remove_file(path).await;
153        if let Err(e) = &result
154            && e.kind() == ErrorKind::NotFound
155        {
156            objectstore_log::debug!("Object not found");
157        }
158        Ok(result?)
159    }
160}
161
162impl LocalFsBackend {
163    fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf {
164        self.path
165            .join("__multipart__")
166            .join(id.as_storage_path().to_string())
167            .join(upload_id)
168    }
169}
170
171#[async_trait::async_trait]
172impl MultipartUploadBackend for LocalFsBackend {
173    async fn initiate_multipart(
174        &self,
175        id: &ObjectId,
176        metadata: &Metadata,
177    ) -> Result<InitiateMultipartResponse> {
178        let upload_id = uuid::Uuid::now_v7().to_string();
179        let dir = self.multipart_dir(id, &upload_id);
180        tokio::fs::create_dir_all(&dir).await?;
181
182        let meta_path = dir.join("metadata.json");
183        let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
184            context: "failed to serialize multipart metadata".to_string(),
185            cause,
186        })?;
187        tokio::fs::write(meta_path, metadata_json).await?;
188
189        Ok(upload_id)
190    }
191
192    async fn upload_part(
193        &self,
194        id: &ObjectId,
195        upload_id: &UploadId,
196        part_number: PartNumber,
197        content_length: u64,
198        _content_md5: Option<&str>,
199        body: ClientStream,
200    ) -> Result<UploadPartResponse> {
201        let dir = self.multipart_dir(id, upload_id);
202        if !tokio::fs::try_exists(&dir).await? {
203            return Err(Error::generic("multipart upload not found"));
204        }
205
206        let etag = format!("\"etag-{part_number}-{content_length}\"");
207
208        let header = serde_json::json!({
209            "etag": etag,
210            "uploaded_at": SystemTime::now(),
211            "size": content_length,
212        });
213        let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde {
214            context: "failed to serialize part header".to_string(),
215            cause,
216        })?;
217
218        let part_path = dir.join(format!("{part_number}.part"));
219        let file = OpenOptions::new()
220            .create(true)
221            .write(true)
222            .truncate(true)
223            .open(part_path)
224            .await?;
225
226        let mut reader = pin!(StreamReader::new(body));
227        let mut writer = BufWriter::new(file);
228        writer.write_all(header_line.as_bytes()).await?;
229        writer.write_all(b"\n").await?;
230
231        let _bytes_copied = tokio::io::copy(&mut reader, &mut writer)
232            .await
233            .map_err(|e| match stream::unpack_client_error(&e) {
234                Some(ce) => Error::Client(ce),
235                None => e.into(),
236            })?;
237
238        // TODO: validate bytes_copied against content_length and return a BadRequest-style
239        // error. Needs a service-layer error variant that maps to HTTP 400 without abusing
240        // ClientError (which is meant for stream errors).
241
242        writer.flush().await?;
243        let file = writer.into_inner();
244        file.sync_data().await?;
245        drop(file);
246
247        Ok(etag)
248    }
249
250    async fn list_parts(
251        &self,
252        id: &ObjectId,
253        upload_id: &UploadId,
254        max_parts: Option<u32>,
255        part_number_marker: Option<PartNumber>,
256    ) -> Result<ListPartsResponse> {
257        let dir = self.multipart_dir(id, upload_id);
258        if !tokio::fs::try_exists(&dir).await? {
259            return Err(Error::generic("multipart upload not found"));
260        }
261
262        let mut entries = tokio::fs::read_dir(&dir).await?;
263        let mut parts = Vec::new();
264
265        while let Some(entry) = entries.next_entry().await? {
266            let name = entry.file_name();
267            let name_str = name.to_string_lossy();
268            let Some(pn_str) = name_str.strip_suffix(".part") else {
269                continue;
270            };
271            let Ok(pn) = pn_str.parse::<PartNumber>() else {
272                continue;
273            };
274
275            if part_number_marker.is_some_and(|marker| pn <= marker) {
276                continue;
277            }
278
279            let file = tokio::fs::File::open(entry.path()).await?;
280            let mut reader = BufReader::new(file);
281            let mut header_line = String::new();
282            reader.read_line(&mut header_line).await?;
283            let header: serde_json::Value =
284                serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
285                    context: "failed to deserialize part header".to_string(),
286                    cause,
287                })?;
288
289            parts.push(Part {
290                part_number: pn,
291                etag: header["etag"].as_str().unwrap_or("").to_string(),
292                last_modified: serde_json::from_value(header["uploaded_at"].clone())
293                    .unwrap_or(SystemTime::UNIX_EPOCH),
294                size: header["size"].as_u64().unwrap_or(0),
295            });
296        }
297
298        parts.sort_by_key(|p| p.part_number);
299
300        let max = max_parts.unwrap_or(u32::MAX) as usize;
301        let is_truncated = parts.len() > max;
302        parts.truncate(max);
303
304        let next_part_number_marker = if is_truncated {
305            parts.last().map(|p| p.part_number)
306        } else {
307            None
308        };
309
310        Ok(ListPartsResponse {
311            parts,
312            is_truncated,
313            next_part_number_marker,
314        })
315    }
316
317    async fn abort_multipart(
318        &self,
319        id: &ObjectId,
320        upload_id: &UploadId,
321    ) -> Result<AbortMultipartResponse> {
322        let dir = self.multipart_dir(id, upload_id);
323        if tokio::fs::try_exists(&dir).await? {
324            tokio::fs::remove_dir_all(dir).await?;
325        }
326        Ok(())
327    }
328
329    async fn complete_multipart(
330        &self,
331        id: &ObjectId,
332        upload_id: &UploadId,
333        parts: Vec<CompletedPart>,
334    ) -> Result<CompleteMultipartResponse> {
335        let dir = self.multipart_dir(id, upload_id);
336        if !tokio::fs::try_exists(&dir).await? {
337            return Err(Error::generic("multipart upload not found"));
338        }
339
340        // Read metadata
341        let meta_path = dir.join("metadata.json");
342        let meta_bytes = tokio::fs::read(&meta_path).await?;
343        let metadata: Metadata =
344            serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde {
345                context: "failed to deserialize multipart metadata".to_string(),
346                cause,
347            })?;
348
349        // TODO: validate that parts are in ascending part_number order and reject with
350        // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant.
351
352        // Validate all parts (headers only) before writing anything
353        for completed in &parts {
354            let part_path = dir.join(format!("{}.part", completed.part_number));
355            if !tokio::fs::try_exists(&part_path).await? {
356                return Ok(Some(crate::multipart::CompleteMultipartError {
357                    code: "InvalidPart".into(),
358                    message: format!("part number {} was not uploaded", completed.part_number),
359                }));
360            }
361
362            let file = tokio::fs::File::open(&part_path).await?;
363            let mut reader = BufReader::new(file);
364            let mut header_line = String::new();
365            reader.read_line(&mut header_line).await?;
366            let header: serde_json::Value =
367                serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
368                    context: "failed to deserialize part header".to_string(),
369                    cause,
370                })?;
371
372            let stored_etag = header["etag"].as_str().unwrap_or("");
373            if stored_etag != completed.etag {
374                return Ok(Some(crate::multipart::CompleteMultipartError {
375                    code: "InvalidPart".into(),
376                    message: format!(
377                        "etag mismatch for part {}: expected {}, got {}",
378                        completed.part_number, stored_etag, completed.etag
379                    ),
380                }));
381            }
382        }
383
384        // Stream parts directly to the final object file
385        let path = self.path.join(id.as_storage_path().to_string());
386        tokio::fs::create_dir_all(path.parent().unwrap()).await?;
387        let file = OpenOptions::new()
388            .create(true)
389            .write(true)
390            .truncate(true)
391            .open(path)
392            .await?;
393        let mut writer = BufWriter::new(file);
394
395        let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde {
396            context: "failed to serialize metadata".to_string(),
397            cause,
398        })?;
399        writer.write_all(metadata_json.as_bytes()).await?;
400        writer.write_all(b"\n").await?;
401
402        for completed in &parts {
403            let part_path = dir.join(format!("{}.part", completed.part_number));
404            let file = tokio::fs::File::open(&part_path).await?;
405            let mut reader = BufReader::new(file);
406            let mut header_line = String::new();
407            reader.read_line(&mut header_line).await?;
408            tokio::io::copy(&mut reader, &mut writer).await?;
409        }
410
411        writer.flush().await?;
412        let file = writer.into_inner();
413        file.sync_data().await?;
414        drop(file);
415
416        // Clean up multipart state
417        tokio::fs::remove_dir_all(dir).await?;
418
419        Ok(None)
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::time::{Duration, SystemTime};
426
427    use bytes::BytesMut;
428    use futures_util::TryStreamExt;
429    use objectstore_types::metadata::{Compression, ExpirationPolicy};
430    use objectstore_types::scope::{Scope, Scopes};
431
432    use super::*;
433    use crate::id::ObjectContext;
434    use crate::stream;
435
436    #[tokio::test]
437    async fn stores_metadata() {
438        let tempdir = tempfile::tempdir().unwrap();
439        let backend = LocalFsBackend::new(FileSystemConfig {
440            path: tempdir.path().to_path_buf(),
441        });
442
443        let id = ObjectId::random(ObjectContext {
444            usecase: "testing".into(),
445            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
446        });
447
448        let metadata = Metadata {
449            content_type: "text/plain".into(),
450            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
451            time_created: Some(SystemTime::now()),
452            time_expires: None,
453            compression: Some(Compression::Zstd),
454            origin: Some("203.0.113.42".into()),
455            custom: [("foo".into(), "bar".into())].into(),
456            size: None,
457        };
458        backend
459            .put_object(&id, &metadata, stream::single("oh hai!"))
460            .await
461            .unwrap();
462
463        let (read_metadata, stream) = backend.get_object(&id).await.unwrap().unwrap();
464        let file_contents: BytesMut = stream.try_collect().await.unwrap();
465
466        assert_eq!(
467            read_metadata,
468            Metadata {
469                size: Some(file_contents.len()),
470                ..metadata
471            }
472        );
473        assert_eq!(file_contents.as_ref(), b"oh hai!");
474    }
475
476    #[tokio::test]
477    async fn get_metadata_returns_metadata() {
478        let tempdir = tempfile::tempdir().unwrap();
479        let backend = LocalFsBackend::new(FileSystemConfig {
480            path: tempdir.path().to_path_buf(),
481        });
482
483        let id = ObjectId::random(ObjectContext {
484            usecase: "testing".into(),
485            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
486        });
487
488        let metadata = Metadata {
489            content_type: "text/plain".into(),
490            compression: Some(Compression::Zstd),
491            origin: Some("203.0.113.42".into()),
492            custom: [("foo".into(), "bar".into())].into(),
493            ..Default::default()
494        };
495        backend
496            .put_object(&id, &metadata, stream::single("oh hai!"))
497            .await
498            .unwrap();
499
500        let read_metadata = backend.get_metadata(&id).await.unwrap().unwrap();
501        assert_eq!(
502            read_metadata,
503            Metadata {
504                size: Some(7),
505                ..metadata
506            }
507        );
508    }
509
510    #[tokio::test]
511    async fn get_metadata_nonexistent() {
512        let tempdir = tempfile::tempdir().unwrap();
513        let backend = LocalFsBackend::new(FileSystemConfig {
514            path: tempdir.path().to_path_buf(),
515        });
516
517        let id = ObjectId::random(ObjectContext {
518            usecase: "testing".into(),
519            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
520        });
521
522        let result = backend.get_metadata(&id).await.unwrap();
523        assert!(result.is_none());
524    }
525
526    fn make_id() -> ObjectId {
527        ObjectId::random(ObjectContext {
528            usecase: "testing".into(),
529            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
530        })
531    }
532
533    fn make_backend() -> (tempfile::TempDir, LocalFsBackend) {
534        let tempdir = tempfile::tempdir().unwrap();
535        let backend = LocalFsBackend::new(FileSystemConfig {
536            path: tempdir.path().to_path_buf(),
537        });
538        (tempdir, backend)
539    }
540
541    #[tokio::test]
542    async fn multipart_single_part() {
543        let (_tempdir, backend) = make_backend();
544        let id = make_id();
545        let metadata = Metadata {
546            content_type: "text/plain".into(),
547            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
548            origin: Some("203.0.113.42".into()),
549            custom: [("foo".into(), "bar".into())].into(),
550            ..Default::default()
551        };
552
553        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
554
555        let data = b"hello, multipart world!";
556        let etag = backend
557            .upload_part(
558                &id,
559                &upload_id,
560                1,
561                data.len() as u64,
562                None,
563                stream::single(data.to_vec()),
564            )
565            .await
566            .unwrap();
567
568        let result = backend
569            .complete_multipart(
570                &id,
571                &upload_id,
572                vec![crate::multipart::CompletedPart {
573                    part_number: 1,
574                    etag,
575                }],
576            )
577            .await
578            .unwrap();
579        assert!(result.is_none(), "expected no error on complete");
580
581        let (meta, body) = backend.get_object(&id).await.unwrap().unwrap();
582        let payload: BytesMut = body.try_collect().await.unwrap();
583        assert_eq!(payload.as_ref(), data);
584        assert_eq!(meta.content_type, "text/plain".to_string());
585        assert_eq!(
586            meta.expiration_policy,
587            ExpirationPolicy::TimeToIdle(Duration::from_secs(3600))
588        );
589        assert_eq!(meta.origin, Some("203.0.113.42".into()));
590        assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
591    }
592
593    #[tokio::test]
594    async fn multipart_multiple_parts() {
595        let (_tempdir, backend) = make_backend();
596        let id = make_id();
597        let metadata = Metadata::default();
598
599        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
600
601        let part1 = b"aaaa".to_vec();
602        let part2 = b"bbbb".to_vec();
603        let part3 = b"cc".to_vec();
604
605        let etag1 = backend
606            .upload_part(
607                &id,
608                &upload_id,
609                1,
610                part1.len() as u64,
611                None,
612                stream::single(part1.clone()),
613            )
614            .await
615            .unwrap();
616        let etag2 = backend
617            .upload_part(
618                &id,
619                &upload_id,
620                2,
621                part2.len() as u64,
622                None,
623                stream::single(part2.clone()),
624            )
625            .await
626            .unwrap();
627        let etag3 = backend
628            .upload_part(
629                &id,
630                &upload_id,
631                3,
632                part3.len() as u64,
633                None,
634                stream::single(part3.clone()),
635            )
636            .await
637            .unwrap();
638
639        let result = backend
640            .complete_multipart(
641                &id,
642                &upload_id,
643                vec![
644                    crate::multipart::CompletedPart {
645                        part_number: 1,
646                        etag: etag1,
647                    },
648                    crate::multipart::CompletedPart {
649                        part_number: 2,
650                        etag: etag2,
651                    },
652                    crate::multipart::CompletedPart {
653                        part_number: 3,
654                        etag: etag3,
655                    },
656                ],
657            )
658            .await
659            .unwrap();
660        assert!(result.is_none());
661
662        let (_, body) = backend.get_object(&id).await.unwrap().unwrap();
663        let payload: BytesMut = body.try_collect().await.unwrap();
664        assert_eq!(payload.as_ref(), b"aaaabbbbcc");
665    }
666
667    #[tokio::test]
668    async fn multipart_list_parts() {
669        let (_tempdir, backend) = make_backend();
670        let id = make_id();
671        let metadata = Metadata::default();
672
673        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
674
675        let etag1 = backend
676            .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec()))
677            .await
678            .unwrap();
679        let etag2 = backend
680            .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec()))
681            .await
682            .unwrap();
683
684        let list = backend
685            .list_parts(&id, &upload_id, None, None)
686            .await
687            .unwrap();
688        assert_eq!(list.parts.len(), 2);
689        assert_eq!(list.parts[0].part_number, 1);
690        assert_eq!(list.parts[0].etag, etag1);
691        assert_eq!(list.parts[0].size, 3);
692        assert_eq!(list.parts[1].part_number, 2);
693        assert_eq!(list.parts[1].etag, etag2);
694        assert_eq!(list.parts[1].size, 3);
695
696        // Pagination
697        let page1 = backend
698            .list_parts(&id, &upload_id, Some(1), None)
699            .await
700            .unwrap();
701        assert_eq!(page1.parts.len(), 1);
702        assert_eq!(page1.parts[0].part_number, 1);
703        assert!(page1.is_truncated);
704        assert!(page1.next_part_number_marker.is_some());
705
706        let page2 = backend
707            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
708            .await
709            .unwrap();
710        assert_eq!(page2.parts.len(), 1);
711        assert_eq!(page2.parts[0].part_number, 2);
712
713        backend.abort_multipart(&id, &upload_id).await.unwrap();
714    }
715
716    #[tokio::test]
717    async fn multipart_abort() {
718        let (_tempdir, backend) = make_backend();
719        let id = make_id();
720        let metadata = Metadata::default();
721
722        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
723
724        backend
725            .upload_part(
726                &id,
727                &upload_id,
728                1,
729                5,
730                None,
731                stream::single(b"hello".to_vec()),
732            )
733            .await
734            .unwrap();
735
736        backend.abort_multipart(&id, &upload_id).await.unwrap();
737
738        let result = backend.get_object(&id).await.unwrap();
739        assert!(result.is_none(), "object should not exist after abort");
740    }
741
742    #[tokio::test]
743    async fn multipart_invalid_etag() {
744        let (_tempdir, backend) = make_backend();
745        let id = make_id();
746        let metadata = Metadata::default();
747
748        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
749
750        let etag = backend
751            .upload_part(
752                &id,
753                &upload_id,
754                1,
755                5,
756                None,
757                stream::single(b"hello".to_vec()),
758            )
759            .await
760            .unwrap();
761
762        let result = backend
763            .complete_multipart(
764                &id,
765                &upload_id,
766                vec![crate::multipart::CompletedPart {
767                    part_number: 1,
768                    etag: "wrong-etag".into(),
769                }],
770            )
771            .await
772            .unwrap();
773        assert!(result.is_some(), "expected error for bad etag");
774        assert_eq!(result.unwrap().code, "InvalidPart");
775
776        // Upload must survive a failed complete so the client can retry.
777        let result = backend
778            .complete_multipart(
779                &id,
780                &upload_id,
781                vec![crate::multipart::CompletedPart {
782                    part_number: 1,
783                    etag,
784                }],
785            )
786            .await
787            .unwrap();
788        assert!(result.is_none(), "retry with correct etag should succeed");
789    }
790
791    #[tokio::test]
792    async fn multipart_missing_part() {
793        let (_tempdir, backend) = make_backend();
794        let id = make_id();
795        let metadata = Metadata::default();
796
797        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
798
799        let etag = backend
800            .upload_part(
801                &id,
802                &upload_id,
803                1,
804                5,
805                None,
806                stream::single(b"hello".to_vec()),
807            )
808            .await
809            .unwrap();
810
811        let result = backend
812            .complete_multipart(
813                &id,
814                &upload_id,
815                vec![crate::multipart::CompletedPart {
816                    part_number: 99,
817                    etag: "whatever".into(),
818                }],
819            )
820            .await
821            .unwrap();
822        assert!(result.is_some(), "expected error for missing part");
823        assert_eq!(result.unwrap().code, "InvalidPart");
824
825        // Upload must survive a failed complete so the client can retry.
826        let result = backend
827            .complete_multipart(
828                &id,
829                &upload_id,
830                vec![crate::multipart::CompletedPart {
831                    part_number: 1,
832                    etag,
833                }],
834            )
835            .await
836            .unwrap();
837        assert!(result.is_none(), "retry with correct part should succeed");
838    }
839}