Skip to main content

objectstore_service/backend/
local_fs.rs

1//! Local filesystem backend for development and testing.
2
3use std::io::ErrorKind;
4use std::path::PathBuf;
5use std::pin::pin;
6use std::sync::Arc;
7use std::time::SystemTime;
8
9use futures_util::StreamExt;
10use objectstore_types::metadata::Metadata;
11use tokio::fs::OpenOptions;
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
13use tokio_util::io::{ReaderStream, StreamReader};
14
15use crate::backend::common::{
16    Backend, DeleteResponse, GetResponse, MultipartUploadBackend, PutResponse,
17};
18use crate::error::{Error, Result};
19use crate::id::ObjectId;
20use crate::multipart::{
21    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
22    ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
23};
24use crate::stream::{self, ClientStream};
25
26/// Configuration for [`LocalFsBackend`].
27///
28/// Stores objects as files on the local filesystem. Suitable for development, testing,
29/// and single-server deployments.
30///
31/// # Example
32///
33/// ```yaml
34/// storage:
35///   type: filesystem
36///   path: /data
37/// ```
38#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
39pub struct FileSystemConfig {
40    /// Directory path for storing objects.
41    ///
42    /// The directory will be created if it doesn't exist. Relative paths are resolved from
43    /// the server's working directory.
44    ///
45    /// # Default
46    ///
47    /// `"data"` (relative to the server's working directory)
48    ///
49    /// # Environment Variables
50    ///
51    /// - `OS__STORAGE__TYPE=filesystem`
52    /// - `OS__STORAGE__PATH=/path/to/storage`
53    pub path: PathBuf,
54}
55
56/// Local filesystem backend for development and testing.
57#[derive(Debug)]
58pub struct LocalFsBackend {
59    path: PathBuf,
60}
61
62impl LocalFsBackend {
63    /// Creates a new [`LocalFsBackend`] rooted at the directory in `config`.
64    pub fn new(config: FileSystemConfig) -> Self {
65        Self { path: config.path }
66    }
67}
68
69#[async_trait::async_trait]
70impl Backend for LocalFsBackend {
71    fn name(&self) -> &'static str {
72        "local-fs"
73    }
74
75    fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
76        Ok(self)
77    }
78
79    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
80    async fn put_object(
81        &self,
82        id: &ObjectId,
83        metadata: &Metadata,
84        stream: ClientStream,
85    ) -> Result<PutResponse> {
86        let path = self.path.join(id.as_storage_path().to_string());
87        objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend");
88        tokio::fs::create_dir_all(path.parent().unwrap()).await?;
89        let file = OpenOptions::new()
90            .create(true)
91            .write(true)
92            .truncate(true)
93            .open(path)
94            .await?;
95
96        let mut reader = pin!(StreamReader::new(stream));
97        let mut writer = BufWriter::new(file);
98
99        let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
100            context: "failed to serialize metadata".to_string(),
101            cause,
102        })?;
103        writer.write_all(metadata_json.as_bytes()).await?;
104        writer.write_all(b"\n").await?;
105
106        tokio::io::copy(&mut reader, &mut writer)
107            .await
108            .map_err(|e| match stream::unpack_client_error(&e) {
109                Some(ce) => Error::Client(ce),
110                None => e.into(),
111            })?;
112
113        writer.flush().await?;
114        let file = writer.into_inner();
115        file.sync_data().await?;
116        drop(file);
117
118        Ok(())
119    }
120
121    // TODO: Return `Ok(None)` if object is found but past expiry
122    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
123    async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
124        objectstore_log::debug!("Reading from local_fs backend");
125        let path = self.path.join(id.as_storage_path().to_string());
126        let file = match OpenOptions::new().read(true).open(path).await {
127            Ok(file) => file,
128            Err(err) if err.kind() == ErrorKind::NotFound => {
129                objectstore_log::debug!("Object not found");
130                return Ok(None);
131            }
132            err => err?,
133        };
134
135        let mut reader = BufReader::new(file);
136        let mut metadata_line = String::new();
137        reader.read_line(&mut metadata_line).await?;
138        let file_len = reader.get_ref().metadata().await?.len();
139        let mut metadata: Metadata =
140            serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde {
141                context: "failed to deserialize metadata".to_string(),
142                cause,
143            })?;
144        let payload_size = file_len
145            .checked_sub(metadata_line.len() as u64)
146            .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?;
147        metadata.size = Some(payload_size as usize);
148
149        let stream = ReaderStream::new(reader);
150        Ok(Some((metadata, stream.boxed())))
151    }
152
153    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
154    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
155        objectstore_log::debug!("Deleting from local_fs backend");
156        let path = self.path.join(id.as_storage_path().to_string());
157        let result = tokio::fs::remove_file(path).await;
158        if let Err(e) = &result
159            && e.kind() == ErrorKind::NotFound
160        {
161            objectstore_log::debug!("Object not found");
162        }
163        Ok(result?)
164    }
165}
166
167impl LocalFsBackend {
168    fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf {
169        self.path
170            .join("__multipart__")
171            .join(id.as_storage_path().to_string())
172            .join(upload_id.as_str())
173    }
174}
175
176#[async_trait::async_trait]
177impl MultipartUploadBackend for LocalFsBackend {
178    async fn initiate_multipart(
179        &self,
180        id: &ObjectId,
181        metadata: &Metadata,
182    ) -> Result<InitiateMultipartResponse> {
183        let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?;
184        let dir = self.multipart_dir(id, &upload_id);
185        tokio::fs::create_dir_all(&dir).await?;
186
187        let meta_path = dir.join("metadata.json");
188        let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
189            context: "failed to serialize multipart metadata".to_string(),
190            cause,
191        })?;
192        tokio::fs::write(meta_path, metadata_json).await?;
193
194        Ok(upload_id)
195    }
196
197    async fn upload_part(
198        &self,
199        id: &ObjectId,
200        upload_id: &UploadId,
201        part_number: PartNumber,
202        content_length: u64,
203        _content_md5: Option<&str>,
204        body: ClientStream,
205    ) -> Result<UploadPartResponse> {
206        let dir = self.multipart_dir(id, upload_id);
207        if !tokio::fs::try_exists(&dir).await? {
208            return Err(Error::generic("multipart upload not found"));
209        }
210
211        let etag = format!("\"etag-{part_number}-{content_length}\"");
212
213        let header = serde_json::json!({
214            "etag": etag,
215            "uploaded_at": SystemTime::now(),
216            "size": content_length,
217        });
218        let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde {
219            context: "failed to serialize part header".to_string(),
220            cause,
221        })?;
222
223        let part_path = dir.join(format!("{part_number}.part"));
224        let file = OpenOptions::new()
225            .create(true)
226            .write(true)
227            .truncate(true)
228            .open(part_path)
229            .await?;
230
231        let mut reader = pin!(StreamReader::new(body));
232        let mut writer = BufWriter::new(file);
233        writer.write_all(header_line.as_bytes()).await?;
234        writer.write_all(b"\n").await?;
235
236        let _bytes_copied = tokio::io::copy(&mut reader, &mut writer)
237            .await
238            .map_err(|e| match stream::unpack_client_error(&e) {
239                Some(ce) => Error::Client(ce),
240                None => e.into(),
241            })?;
242
243        // TODO: validate bytes_copied against content_length and return a BadRequest-style
244        // error. Needs a service-layer error variant that maps to HTTP 400 without abusing
245        // ClientError (which is meant for stream errors).
246
247        writer.flush().await?;
248        let file = writer.into_inner();
249        file.sync_data().await?;
250        drop(file);
251
252        Ok(etag)
253    }
254
255    async fn list_parts(
256        &self,
257        id: &ObjectId,
258        upload_id: &UploadId,
259        max_parts: Option<u32>,
260        part_number_marker: Option<PartNumber>,
261    ) -> Result<ListPartsResponse> {
262        let dir = self.multipart_dir(id, upload_id);
263        if !tokio::fs::try_exists(&dir).await? {
264            return Err(Error::generic("multipart upload not found"));
265        }
266
267        let mut entries = tokio::fs::read_dir(&dir).await?;
268        let mut parts = Vec::new();
269
270        while let Some(entry) = entries.next_entry().await? {
271            let name = entry.file_name();
272            let name_str = name.to_string_lossy();
273            let Some(pn_str) = name_str.strip_suffix(".part") else {
274                continue;
275            };
276            let Ok(pn) = pn_str.parse::<PartNumber>() else {
277                continue;
278            };
279
280            if part_number_marker.is_some_and(|marker| pn <= marker) {
281                continue;
282            }
283
284            let file = tokio::fs::File::open(entry.path()).await?;
285            let mut reader = BufReader::new(file);
286            let mut header_line = String::new();
287            reader.read_line(&mut header_line).await?;
288            let header: serde_json::Value =
289                serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
290                    context: "failed to deserialize part header".to_string(),
291                    cause,
292                })?;
293
294            parts.push(Part {
295                part_number: pn,
296                etag: header["etag"].as_str().unwrap_or("").to_string(),
297                last_modified: serde_json::from_value(header["uploaded_at"].clone())
298                    .unwrap_or(SystemTime::UNIX_EPOCH),
299                size: header["size"].as_u64().unwrap_or(0),
300            });
301        }
302
303        parts.sort_by_key(|p| p.part_number);
304
305        let max = max_parts.unwrap_or(u32::MAX) as usize;
306        let is_truncated = parts.len() > max;
307        parts.truncate(max);
308
309        let next_part_number_marker = if is_truncated {
310            parts.last().map(|p| p.part_number)
311        } else {
312            None
313        };
314
315        Ok(ListPartsResponse {
316            parts,
317            is_truncated,
318            next_part_number_marker,
319        })
320    }
321
322    async fn abort_multipart(
323        &self,
324        id: &ObjectId,
325        upload_id: &UploadId,
326    ) -> Result<AbortMultipartResponse> {
327        let dir = self.multipart_dir(id, upload_id);
328        if tokio::fs::try_exists(&dir).await? {
329            tokio::fs::remove_dir_all(dir).await?;
330        }
331        Ok(())
332    }
333
334    async fn complete_multipart(
335        &self,
336        id: &ObjectId,
337        upload_id: &UploadId,
338        parts: Vec<CompletedPart>,
339    ) -> Result<CompleteMultipartResponse> {
340        let dir = self.multipart_dir(id, upload_id);
341        if !tokio::fs::try_exists(&dir).await? {
342            return Err(Error::generic("multipart upload not found"));
343        }
344
345        // Read metadata
346        let meta_path = dir.join("metadata.json");
347        let meta_bytes = tokio::fs::read(&meta_path).await?;
348        let metadata: Metadata =
349            serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde {
350                context: "failed to deserialize multipart metadata".to_string(),
351                cause,
352            })?;
353
354        // TODO: validate that parts are in ascending part_number order and reject with
355        // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant.
356
357        // Validate all parts (headers only) before writing anything
358        for completed in &parts {
359            let part_path = dir.join(format!("{}.part", completed.part_number));
360            if !tokio::fs::try_exists(&part_path).await? {
361                return Ok(Some(crate::multipart::CompleteMultipartError {
362                    code: "InvalidPart".into(),
363                    message: format!("part number {} was not uploaded", completed.part_number),
364                }));
365            }
366
367            let file = tokio::fs::File::open(&part_path).await?;
368            let mut reader = BufReader::new(file);
369            let mut header_line = String::new();
370            reader.read_line(&mut header_line).await?;
371            let header: serde_json::Value =
372                serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
373                    context: "failed to deserialize part header".to_string(),
374                    cause,
375                })?;
376
377            let stored_etag = header["etag"].as_str().unwrap_or("");
378            if stored_etag != completed.etag {
379                return Ok(Some(crate::multipart::CompleteMultipartError {
380                    code: "InvalidPart".into(),
381                    message: format!(
382                        "etag mismatch for part {}: expected {}, got {}",
383                        completed.part_number, stored_etag, completed.etag
384                    ),
385                }));
386            }
387        }
388
389        // Stream parts directly to the final object file
390        let path = self.path.join(id.as_storage_path().to_string());
391        tokio::fs::create_dir_all(path.parent().unwrap()).await?;
392        let file = OpenOptions::new()
393            .create(true)
394            .write(true)
395            .truncate(true)
396            .open(path)
397            .await?;
398        let mut writer = BufWriter::new(file);
399
400        let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde {
401            context: "failed to serialize metadata".to_string(),
402            cause,
403        })?;
404        writer.write_all(metadata_json.as_bytes()).await?;
405        writer.write_all(b"\n").await?;
406
407        for completed in &parts {
408            let part_path = dir.join(format!("{}.part", completed.part_number));
409            let file = tokio::fs::File::open(&part_path).await?;
410            let mut reader = BufReader::new(file);
411            let mut header_line = String::new();
412            reader.read_line(&mut header_line).await?;
413            tokio::io::copy(&mut reader, &mut writer).await?;
414        }
415
416        writer.flush().await?;
417        let file = writer.into_inner();
418        file.sync_data().await?;
419        drop(file);
420
421        // Clean up multipart state
422        tokio::fs::remove_dir_all(dir).await?;
423
424        Ok(None)
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use std::num::NonZeroU32;
431    use std::time::{Duration, SystemTime};
432
433    use bytes::BytesMut;
434    use futures_util::TryStreamExt;
435    use objectstore_types::metadata::{Compression, ExpirationPolicy};
436    use objectstore_types::scope::{Scope, Scopes};
437
438    use super::*;
439    use crate::id::ObjectContext;
440    use crate::stream;
441
442    #[tokio::test]
443    async fn stores_metadata() {
444        let tempdir = tempfile::tempdir().unwrap();
445        let backend = LocalFsBackend::new(FileSystemConfig {
446            path: tempdir.path().to_path_buf(),
447        });
448
449        let id = ObjectId::random(ObjectContext {
450            usecase: "testing".into(),
451            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
452        });
453
454        let metadata = Metadata {
455            content_type: "text/plain".into(),
456            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
457            time_created: Some(SystemTime::now()),
458            time_expires: None,
459            compression: Some(Compression::Zstd),
460            origin: Some("203.0.113.42".into()),
461            custom: [("foo".into(), "bar".into())].into(),
462            size: None,
463        };
464        backend
465            .put_object(&id, &metadata, stream::single("oh hai!"))
466            .await
467            .unwrap();
468
469        let (read_metadata, stream) = backend.get_object(&id).await.unwrap().unwrap();
470        let file_contents: BytesMut = stream.try_collect().await.unwrap();
471
472        assert_eq!(
473            read_metadata,
474            Metadata {
475                size: Some(file_contents.len()),
476                ..metadata
477            }
478        );
479        assert_eq!(file_contents.as_ref(), b"oh hai!");
480    }
481
482    #[tokio::test]
483    async fn get_metadata_returns_metadata() {
484        let tempdir = tempfile::tempdir().unwrap();
485        let backend = LocalFsBackend::new(FileSystemConfig {
486            path: tempdir.path().to_path_buf(),
487        });
488
489        let id = ObjectId::random(ObjectContext {
490            usecase: "testing".into(),
491            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
492        });
493
494        let metadata = Metadata {
495            content_type: "text/plain".into(),
496            compression: Some(Compression::Zstd),
497            origin: Some("203.0.113.42".into()),
498            custom: [("foo".into(), "bar".into())].into(),
499            ..Default::default()
500        };
501        backend
502            .put_object(&id, &metadata, stream::single("oh hai!"))
503            .await
504            .unwrap();
505
506        let read_metadata = backend.get_metadata(&id).await.unwrap().unwrap();
507        assert_eq!(
508            read_metadata,
509            Metadata {
510                size: Some(7),
511                ..metadata
512            }
513        );
514    }
515
516    #[tokio::test]
517    async fn get_metadata_nonexistent() {
518        let tempdir = tempfile::tempdir().unwrap();
519        let backend = LocalFsBackend::new(FileSystemConfig {
520            path: tempdir.path().to_path_buf(),
521        });
522
523        let id = ObjectId::random(ObjectContext {
524            usecase: "testing".into(),
525            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
526        });
527
528        let result = backend.get_metadata(&id).await.unwrap();
529        assert!(result.is_none());
530    }
531
532    fn make_id() -> ObjectId {
533        ObjectId::random(ObjectContext {
534            usecase: "testing".into(),
535            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
536        })
537    }
538
539    fn make_backend() -> (tempfile::TempDir, LocalFsBackend) {
540        let tempdir = tempfile::tempdir().unwrap();
541        let backend = LocalFsBackend::new(FileSystemConfig {
542            path: tempdir.path().to_path_buf(),
543        });
544        (tempdir, backend)
545    }
546
547    #[tokio::test]
548    async fn multipart_single_part() {
549        let (_tempdir, backend) = make_backend();
550        let id = make_id();
551        let metadata = Metadata {
552            content_type: "text/plain".into(),
553            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
554            origin: Some("203.0.113.42".into()),
555            custom: [("foo".into(), "bar".into())].into(),
556            ..Default::default()
557        };
558
559        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
560
561        let data = b"hello, multipart world!";
562        let etag = backend
563            .upload_part(
564                &id,
565                &upload_id,
566                NonZeroU32::new(1).unwrap(),
567                data.len() as u64,
568                None,
569                stream::single(data.to_vec()),
570            )
571            .await
572            .unwrap();
573
574        let result = backend
575            .complete_multipart(
576                &id,
577                &upload_id,
578                vec![crate::multipart::CompletedPart {
579                    part_number: NonZeroU32::new(1).unwrap(),
580                    etag,
581                }],
582            )
583            .await
584            .unwrap();
585        assert!(result.is_none(), "expected no error on complete");
586
587        let (meta, body) = backend.get_object(&id).await.unwrap().unwrap();
588        let payload: BytesMut = body.try_collect().await.unwrap();
589        assert_eq!(payload.as_ref(), data);
590        assert_eq!(meta.content_type, "text/plain".to_string());
591        assert_eq!(
592            meta.expiration_policy,
593            ExpirationPolicy::TimeToIdle(Duration::from_secs(3600))
594        );
595        assert_eq!(meta.origin, Some("203.0.113.42".into()));
596        assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
597    }
598
599    #[tokio::test]
600    async fn multipart_multiple_parts() {
601        let (_tempdir, backend) = make_backend();
602        let id = make_id();
603        let metadata = Metadata::default();
604
605        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
606
607        let part1 = b"aaaa".to_vec();
608        let part2 = b"bbbb".to_vec();
609        let part3 = b"cc".to_vec();
610
611        let etag1 = backend
612            .upload_part(
613                &id,
614                &upload_id,
615                NonZeroU32::new(1).unwrap(),
616                part1.len() as u64,
617                None,
618                stream::single(part1.clone()),
619            )
620            .await
621            .unwrap();
622        let etag2 = backend
623            .upload_part(
624                &id,
625                &upload_id,
626                NonZeroU32::new(2).unwrap(),
627                part2.len() as u64,
628                None,
629                stream::single(part2.clone()),
630            )
631            .await
632            .unwrap();
633        let etag3 = backend
634            .upload_part(
635                &id,
636                &upload_id,
637                NonZeroU32::new(3).unwrap(),
638                part3.len() as u64,
639                None,
640                stream::single(part3.clone()),
641            )
642            .await
643            .unwrap();
644
645        let result = backend
646            .complete_multipart(
647                &id,
648                &upload_id,
649                vec![
650                    crate::multipart::CompletedPart {
651                        part_number: NonZeroU32::new(1).unwrap(),
652                        etag: etag1,
653                    },
654                    crate::multipart::CompletedPart {
655                        part_number: NonZeroU32::new(2).unwrap(),
656                        etag: etag2,
657                    },
658                    crate::multipart::CompletedPart {
659                        part_number: NonZeroU32::new(3).unwrap(),
660                        etag: etag3,
661                    },
662                ],
663            )
664            .await
665            .unwrap();
666        assert!(result.is_none());
667
668        let (_, body) = backend.get_object(&id).await.unwrap().unwrap();
669        let payload: BytesMut = body.try_collect().await.unwrap();
670        assert_eq!(payload.as_ref(), b"aaaabbbbcc");
671    }
672
673    #[tokio::test]
674    async fn multipart_list_parts() {
675        let (_tempdir, backend) = make_backend();
676        let id = make_id();
677        let metadata = Metadata::default();
678
679        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
680
681        let etag1 = backend
682            .upload_part(
683                &id,
684                &upload_id,
685                NonZeroU32::new(1).unwrap(),
686                3,
687                None,
688                stream::single(b"aaa".to_vec()),
689            )
690            .await
691            .unwrap();
692        let etag2 = backend
693            .upload_part(
694                &id,
695                &upload_id,
696                NonZeroU32::new(2).unwrap(),
697                3,
698                None,
699                stream::single(b"bbb".to_vec()),
700            )
701            .await
702            .unwrap();
703
704        let list = backend
705            .list_parts(&id, &upload_id, None, None)
706            .await
707            .unwrap();
708        assert_eq!(list.parts.len(), 2);
709        assert_eq!(list.parts[0].part_number.get(), 1);
710        assert_eq!(list.parts[0].etag, etag1);
711        assert_eq!(list.parts[0].size, 3);
712        assert_eq!(list.parts[1].part_number.get(), 2);
713        assert_eq!(list.parts[1].etag, etag2);
714        assert_eq!(list.parts[1].size, 3);
715
716        // Pagination
717        let page1 = backend
718            .list_parts(&id, &upload_id, Some(1), None)
719            .await
720            .unwrap();
721        assert_eq!(page1.parts.len(), 1);
722        assert_eq!(page1.parts[0].part_number.get(), 1);
723        assert!(page1.is_truncated);
724        assert!(page1.next_part_number_marker.is_some());
725
726        let page2 = backend
727            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
728            .await
729            .unwrap();
730        assert_eq!(page2.parts.len(), 1);
731        assert_eq!(page2.parts[0].part_number.get(), 2);
732
733        backend.abort_multipart(&id, &upload_id).await.unwrap();
734    }
735
736    #[tokio::test]
737    async fn multipart_abort() {
738        let (_tempdir, backend) = make_backend();
739        let id = make_id();
740        let metadata = Metadata::default();
741
742        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
743
744        backend
745            .upload_part(
746                &id,
747                &upload_id,
748                NonZeroU32::new(1).unwrap(),
749                5,
750                None,
751                stream::single(b"hello".to_vec()),
752            )
753            .await
754            .unwrap();
755
756        backend.abort_multipart(&id, &upload_id).await.unwrap();
757
758        let result = backend.get_object(&id).await.unwrap();
759        assert!(result.is_none(), "object should not exist after abort");
760    }
761
762    #[tokio::test]
763    async fn multipart_invalid_etag() {
764        let (_tempdir, backend) = make_backend();
765        let id = make_id();
766        let metadata = Metadata::default();
767
768        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
769
770        let etag = backend
771            .upload_part(
772                &id,
773                &upload_id,
774                NonZeroU32::new(1).unwrap(),
775                5,
776                None,
777                stream::single(b"hello".to_vec()),
778            )
779            .await
780            .unwrap();
781
782        let result = backend
783            .complete_multipart(
784                &id,
785                &upload_id,
786                vec![crate::multipart::CompletedPart {
787                    part_number: NonZeroU32::new(1).unwrap(),
788                    etag: "wrong-etag".into(),
789                }],
790            )
791            .await
792            .unwrap();
793        assert!(result.is_some(), "expected error for bad etag");
794        assert_eq!(result.unwrap().code, "InvalidPart");
795
796        // Upload must survive a failed complete so the client can retry.
797        let result = backend
798            .complete_multipart(
799                &id,
800                &upload_id,
801                vec![crate::multipart::CompletedPart {
802                    part_number: NonZeroU32::new(1).unwrap(),
803                    etag,
804                }],
805            )
806            .await
807            .unwrap();
808        assert!(result.is_none(), "retry with correct etag should succeed");
809    }
810
811    #[tokio::test]
812    async fn multipart_missing_part() {
813        let (_tempdir, backend) = make_backend();
814        let id = make_id();
815        let metadata = Metadata::default();
816
817        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
818
819        let etag = backend
820            .upload_part(
821                &id,
822                &upload_id,
823                NonZeroU32::new(1).unwrap(),
824                5,
825                None,
826                stream::single(b"hello".to_vec()),
827            )
828            .await
829            .unwrap();
830
831        let result = backend
832            .complete_multipart(
833                &id,
834                &upload_id,
835                vec![crate::multipart::CompletedPart {
836                    part_number: NonZeroU32::new(99).unwrap(),
837                    etag: "whatever".into(),
838                }],
839            )
840            .await
841            .unwrap();
842        assert!(result.is_some(), "expected error for missing part");
843        assert_eq!(result.unwrap().code, "InvalidPart");
844
845        // Upload must survive a failed complete so the client can retry.
846        let result = backend
847            .complete_multipart(
848                &id,
849                &upload_id,
850                vec![crate::multipart::CompletedPart {
851                    part_number: NonZeroU32::new(1).unwrap(),
852                    etag,
853                }],
854            )
855            .await
856            .unwrap();
857        assert!(result.is_none(), "retry with correct part should succeed");
858    }
859}