Skip to main content

objectstore_service/backend/
local_fs.rs

1//! Local filesystem backend for development and testing.
2
3use std::io::ErrorKind;
4use std::path::PathBuf;
5use std::pin::pin;
6use std::time::SystemTime;
7
8use futures_util::StreamExt;
9use objectstore_types::metadata::Metadata;
10use objectstore_types::range::ByteRange;
11use tokio::fs::OpenOptions;
12use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};
13use tokio_util::io::{ReaderStream, StreamReader};
14
15use crate::backend::common::{
16    Backend, DeleteResponse, GetResponse, MultipartUploadBackend, PutResponse,
17};
18use crate::error::{Error, Result};
19use crate::id::ObjectId;
20use crate::multipart::{
21    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
22    ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
23};
24use crate::stream::{self, ClientStream};
25
26/// Configuration for [`LocalFsBackend`].
27///
28/// Stores objects as files on the local filesystem. Suitable for development, testing,
29/// and single-server deployments.
30///
31/// # Example
32///
33/// ```yaml
34/// storage:
35///   type: filesystem
36///   path: /data
37/// ```
38#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
39pub struct FileSystemConfig {
40    /// Directory path for storing objects.
41    ///
42    /// The directory will be created if it doesn't exist. Relative paths are resolved from
43    /// the server's working directory.
44    ///
45    /// # Default
46    ///
47    /// `"data"` (relative to the server's working directory)
48    ///
49    /// # Environment Variables
50    ///
51    /// - `OS__STORAGE__TYPE=filesystem`
52    /// - `OS__STORAGE__PATH=/path/to/storage`
53    pub path: PathBuf,
54}
55
56/// Local filesystem backend for development and testing.
57#[derive(Debug)]
58pub struct LocalFsBackend {
59    path: PathBuf,
60}
61
62impl LocalFsBackend {
63    /// Creates a new [`LocalFsBackend`] rooted at the directory in `config`.
64    pub fn new(config: FileSystemConfig) -> Self {
65        Self { path: config.path }
66    }
67}
68
69#[async_trait::async_trait]
70impl Backend for LocalFsBackend {
71    fn name(&self) -> &'static str {
72        "local-fs"
73    }
74
75    fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
76        Ok(self)
77    }
78
79    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
80    async fn put_object(
81        &self,
82        id: &ObjectId,
83        metadata: &Metadata,
84        stream: ClientStream,
85    ) -> Result<PutResponse> {
86        let path = self.path.join(id.as_storage_path().to_string());
87        objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend");
88        tokio::fs::create_dir_all(path.parent().unwrap()).await?;
89        let file = OpenOptions::new()
90            .create(true)
91            .write(true)
92            .truncate(true)
93            .open(path)
94            .await?;
95
96        let mut reader = pin!(StreamReader::new(stream));
97        let mut writer = BufWriter::new(file);
98
99        let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
100            context: "failed to serialize metadata".to_string(),
101            cause,
102        })?;
103        writer.write_all(metadata_json.as_bytes()).await?;
104        writer.write_all(b"\n").await?;
105
106        tokio::io::copy(&mut reader, &mut writer)
107            .await
108            .map_err(|e| match stream::unpack_client_error(&e) {
109                Some(ce) => Error::Client(ce),
110                None => e.into(),
111            })?;
112
113        writer.flush().await?;
114        let file = writer.into_inner();
115        file.sync_data().await?;
116        drop(file);
117
118        Ok(())
119    }
120
121    // TODO: Return `Ok(None)` if object is found but past expiry
122    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
123    async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
124        objectstore_log::debug!("Reading from local_fs backend");
125        let path = self.path.join(id.as_storage_path().to_string());
126        let file = match OpenOptions::new().read(true).open(path).await {
127            Ok(file) => file,
128            Err(err) if err.kind() == ErrorKind::NotFound => {
129                objectstore_log::debug!("Object not found");
130                return Ok(None);
131            }
132            err => err?,
133        };
134
135        let mut reader = BufReader::new(file);
136        let mut metadata_line = String::new();
137        reader.read_line(&mut metadata_line).await?;
138        let file_len = reader.get_ref().metadata().await?.len();
139        let mut metadata: Metadata =
140            serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde {
141                context: "failed to deserialize metadata".to_string(),
142                cause,
143            })?;
144        let payload_size = file_len
145            .checked_sub(metadata_line.len() as u64)
146            .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?;
147        metadata.size = Some(payload_size as usize);
148
149        let (content_range, stream) = match range {
150            Some(byte_range) => {
151                let content_range =
152                    byte_range
153                        .resolve(payload_size)
154                        .ok_or(Error::RangeNotSatisfiable {
155                            total: payload_size,
156                        })?;
157                reader
158                    .seek(std::io::SeekFrom::Current(content_range.start as i64))
159                    .await?;
160                let limited = reader.take(content_range.len());
161                (Some(content_range), ReaderStream::new(limited).boxed())
162            }
163            None => (None, ReaderStream::new(reader).boxed()),
164        };
165        Ok(Some((metadata, content_range, stream)))
166    }
167
168    #[tracing::instrument(level = "trace", fields(?id), skip_all)]
169    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
170        objectstore_log::debug!("Deleting from local_fs backend");
171        let path = self.path.join(id.as_storage_path().to_string());
172        let result = tokio::fs::remove_file(path).await;
173        if let Err(e) = &result
174            && e.kind() == ErrorKind::NotFound
175        {
176            objectstore_log::debug!("Object not found");
177        }
178        Ok(result?)
179    }
180}
181
182impl LocalFsBackend {
183    fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf {
184        self.path
185            .join("__multipart__")
186            .join(id.as_storage_path().to_string())
187            .join(upload_id.as_str())
188    }
189}
190
191#[async_trait::async_trait]
192impl MultipartUploadBackend for LocalFsBackend {
193    async fn initiate_multipart(
194        &self,
195        id: &ObjectId,
196        metadata: &Metadata,
197    ) -> Result<InitiateMultipartResponse> {
198        let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?;
199        let dir = self.multipart_dir(id, &upload_id);
200        tokio::fs::create_dir_all(&dir).await?;
201
202        let meta_path = dir.join("metadata.json");
203        let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
204            context: "failed to serialize multipart metadata".to_string(),
205            cause,
206        })?;
207        tokio::fs::write(meta_path, metadata_json).await?;
208
209        Ok(upload_id)
210    }
211
212    async fn upload_part(
213        &self,
214        id: &ObjectId,
215        upload_id: &UploadId,
216        part_number: PartNumber,
217        content_length: u64,
218        _content_md5: Option<&str>,
219        body: ClientStream,
220    ) -> Result<UploadPartResponse> {
221        let dir = self.multipart_dir(id, upload_id);
222        if !tokio::fs::try_exists(&dir).await? {
223            return Err(Error::generic("multipart upload not found"));
224        }
225
226        let etag = format!("\"etag-{part_number}-{content_length}\"");
227
228        let header = serde_json::json!({
229            "etag": etag,
230            "uploaded_at": SystemTime::now(),
231            "size": content_length,
232        });
233        let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde {
234            context: "failed to serialize part header".to_string(),
235            cause,
236        })?;
237
238        let part_path = dir.join(format!("{part_number}.part"));
239        let file = OpenOptions::new()
240            .create(true)
241            .write(true)
242            .truncate(true)
243            .open(part_path)
244            .await?;
245
246        let mut reader = pin!(StreamReader::new(body));
247        let mut writer = BufWriter::new(file);
248        writer.write_all(header_line.as_bytes()).await?;
249        writer.write_all(b"\n").await?;
250
251        let _bytes_copied = tokio::io::copy(&mut reader, &mut writer)
252            .await
253            .map_err(|e| match stream::unpack_client_error(&e) {
254                Some(ce) => Error::Client(ce),
255                None => e.into(),
256            })?;
257
258        // TODO: validate bytes_copied against content_length and return a BadRequest-style
259        // error. Needs a service-layer error variant that maps to HTTP 400 without abusing
260        // ClientError (which is meant for stream errors).
261
262        writer.flush().await?;
263        let file = writer.into_inner();
264        file.sync_data().await?;
265        drop(file);
266
267        Ok(etag)
268    }
269
270    async fn list_parts(
271        &self,
272        id: &ObjectId,
273        upload_id: &UploadId,
274        max_parts: Option<u32>,
275        part_number_marker: Option<PartNumber>,
276    ) -> Result<ListPartsResponse> {
277        let dir = self.multipart_dir(id, upload_id);
278        if !tokio::fs::try_exists(&dir).await? {
279            return Err(Error::generic("multipart upload not found"));
280        }
281
282        let mut entries = tokio::fs::read_dir(&dir).await?;
283        let mut parts = Vec::new();
284
285        while let Some(entry) = entries.next_entry().await? {
286            let name = entry.file_name();
287            let name_str = name.to_string_lossy();
288            let Some(pn_str) = name_str.strip_suffix(".part") else {
289                continue;
290            };
291            let Ok(pn) = pn_str.parse::<PartNumber>() else {
292                continue;
293            };
294
295            if part_number_marker.is_some_and(|marker| pn <= marker) {
296                continue;
297            }
298
299            let file = tokio::fs::File::open(entry.path()).await?;
300            let mut reader = BufReader::new(file);
301            let mut header_line = String::new();
302            reader.read_line(&mut header_line).await?;
303            let header: serde_json::Value =
304                serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
305                    context: "failed to deserialize part header".to_string(),
306                    cause,
307                })?;
308
309            parts.push(Part {
310                part_number: pn,
311                etag: header["etag"].as_str().unwrap_or("").to_string(),
312                last_modified: serde_json::from_value(header["uploaded_at"].clone())
313                    .unwrap_or(SystemTime::UNIX_EPOCH),
314                size: header["size"].as_u64().unwrap_or(0),
315            });
316        }
317
318        parts.sort_by_key(|p| p.part_number);
319
320        let max = max_parts.unwrap_or(u32::MAX) as usize;
321        let is_truncated = parts.len() > max;
322        parts.truncate(max);
323
324        let next_part_number_marker = if is_truncated {
325            parts.last().map(|p| p.part_number)
326        } else {
327            None
328        };
329
330        Ok(ListPartsResponse {
331            parts,
332            is_truncated,
333            next_part_number_marker,
334        })
335    }
336
337    async fn abort_multipart(
338        &self,
339        id: &ObjectId,
340        upload_id: &UploadId,
341    ) -> Result<AbortMultipartResponse> {
342        let dir = self.multipart_dir(id, upload_id);
343        if tokio::fs::try_exists(&dir).await? {
344            tokio::fs::remove_dir_all(dir).await?;
345        }
346        Ok(())
347    }
348
349    async fn complete_multipart(
350        &self,
351        id: &ObjectId,
352        upload_id: &UploadId,
353        parts: Vec<CompletedPart>,
354    ) -> Result<CompleteMultipartResponse> {
355        let dir = self.multipart_dir(id, upload_id);
356        if !tokio::fs::try_exists(&dir).await? {
357            return Err(Error::generic("multipart upload not found"));
358        }
359
360        // Read metadata
361        let meta_path = dir.join("metadata.json");
362        let meta_bytes = tokio::fs::read(&meta_path).await?;
363        let metadata: Metadata =
364            serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde {
365                context: "failed to deserialize multipart metadata".to_string(),
366                cause,
367            })?;
368
369        // TODO: validate that parts are in ascending part_number order and reject with
370        // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant.
371
372        // Validate all parts (headers only) before writing anything
373        for completed in &parts {
374            let part_path = dir.join(format!("{}.part", completed.part_number));
375            if !tokio::fs::try_exists(&part_path).await? {
376                return Ok(Some(crate::multipart::CompleteMultipartError {
377                    code: "InvalidPart".into(),
378                    message: format!("part number {} was not uploaded", completed.part_number),
379                }));
380            }
381
382            let file = tokio::fs::File::open(&part_path).await?;
383            let mut reader = BufReader::new(file);
384            let mut header_line = String::new();
385            reader.read_line(&mut header_line).await?;
386            let header: serde_json::Value =
387                serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
388                    context: "failed to deserialize part header".to_string(),
389                    cause,
390                })?;
391
392            let stored_etag = header["etag"].as_str().unwrap_or("");
393            if stored_etag != completed.etag {
394                return Ok(Some(crate::multipart::CompleteMultipartError {
395                    code: "InvalidPart".into(),
396                    message: format!(
397                        "etag mismatch for part {}: expected {}, got {}",
398                        completed.part_number, stored_etag, completed.etag
399                    ),
400                }));
401            }
402        }
403
404        // Stream parts directly to the final object file
405        let path = self.path.join(id.as_storage_path().to_string());
406        tokio::fs::create_dir_all(path.parent().unwrap()).await?;
407        let file = OpenOptions::new()
408            .create(true)
409            .write(true)
410            .truncate(true)
411            .open(path)
412            .await?;
413        let mut writer = BufWriter::new(file);
414
415        let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde {
416            context: "failed to serialize metadata".to_string(),
417            cause,
418        })?;
419        writer.write_all(metadata_json.as_bytes()).await?;
420        writer.write_all(b"\n").await?;
421
422        for completed in &parts {
423            let part_path = dir.join(format!("{}.part", completed.part_number));
424            let file = tokio::fs::File::open(&part_path).await?;
425            let mut reader = BufReader::new(file);
426            let mut header_line = String::new();
427            reader.read_line(&mut header_line).await?;
428            tokio::io::copy(&mut reader, &mut writer).await?;
429        }
430
431        writer.flush().await?;
432        let file = writer.into_inner();
433        file.sync_data().await?;
434        drop(file);
435
436        // Clean up multipart state
437        tokio::fs::remove_dir_all(dir).await?;
438
439        Ok(None)
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use std::num::NonZeroU32;
446    use std::time::{Duration, SystemTime};
447
448    use bytes::BytesMut;
449    use futures_util::TryStreamExt;
450    use objectstore_types::metadata::{Compression, ExpirationPolicy};
451    use objectstore_types::scope::{Scope, Scopes};
452
453    use super::*;
454    use crate::id::ObjectContext;
455    use crate::stream;
456
457    #[tokio::test]
458    async fn stores_metadata() {
459        let tempdir = tempfile::tempdir().unwrap();
460        let backend = LocalFsBackend::new(FileSystemConfig {
461            path: tempdir.path().to_path_buf(),
462        });
463
464        let id = ObjectId::random(ObjectContext {
465            usecase: "testing".into(),
466            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
467        });
468
469        let metadata = Metadata {
470            content_type: "text/plain".into(),
471            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_hours(1)),
472            time_created: Some(SystemTime::now()),
473            time_expires: None,
474            compression: Some(Compression::Zstd),
475            origin: Some("203.0.113.42".into()),
476            custom: [("foo".into(), "bar".into())].into(),
477            size: None,
478        };
479        backend
480            .put_object(&id, &metadata, stream::single("oh hai!"))
481            .await
482            .unwrap();
483
484        let (read_metadata, _, stream) = backend.get_object(&id, None).await.unwrap().unwrap();
485        let file_contents: BytesMut = stream.try_collect().await.unwrap();
486
487        assert_eq!(
488            read_metadata,
489            Metadata {
490                size: Some(file_contents.len()),
491                ..metadata
492            }
493        );
494        assert_eq!(file_contents.as_ref(), b"oh hai!");
495    }
496
497    #[tokio::test]
498    async fn get_metadata_returns_metadata() {
499        let tempdir = tempfile::tempdir().unwrap();
500        let backend = LocalFsBackend::new(FileSystemConfig {
501            path: tempdir.path().to_path_buf(),
502        });
503
504        let id = ObjectId::random(ObjectContext {
505            usecase: "testing".into(),
506            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
507        });
508
509        let metadata = Metadata {
510            content_type: "text/plain".into(),
511            compression: Some(Compression::Zstd),
512            origin: Some("203.0.113.42".into()),
513            custom: [("foo".into(), "bar".into())].into(),
514            ..Default::default()
515        };
516        backend
517            .put_object(&id, &metadata, stream::single("oh hai!"))
518            .await
519            .unwrap();
520
521        let read_metadata = backend.get_metadata(&id).await.unwrap().unwrap();
522        assert_eq!(
523            read_metadata,
524            Metadata {
525                size: Some(7),
526                ..metadata
527            }
528        );
529    }
530
531    #[tokio::test]
532    async fn get_metadata_nonexistent() {
533        let tempdir = tempfile::tempdir().unwrap();
534        let backend = LocalFsBackend::new(FileSystemConfig {
535            path: tempdir.path().to_path_buf(),
536        });
537
538        let id = ObjectId::random(ObjectContext {
539            usecase: "testing".into(),
540            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
541        });
542
543        let result = backend.get_metadata(&id).await.unwrap();
544        assert!(result.is_none());
545    }
546
547    fn make_id() -> ObjectId {
548        ObjectId::random(ObjectContext {
549            usecase: "testing".into(),
550            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
551        })
552    }
553
554    fn make_backend() -> (tempfile::TempDir, LocalFsBackend) {
555        let tempdir = tempfile::tempdir().unwrap();
556        let backend = LocalFsBackend::new(FileSystemConfig {
557            path: tempdir.path().to_path_buf(),
558        });
559        (tempdir, backend)
560    }
561
562    #[tokio::test]
563    async fn multipart_single_part() {
564        let (_tempdir, backend) = make_backend();
565        let id = make_id();
566        let metadata = Metadata {
567            content_type: "text/plain".into(),
568            expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_hours(1)),
569            origin: Some("203.0.113.42".into()),
570            custom: [("foo".into(), "bar".into())].into(),
571            ..Default::default()
572        };
573
574        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
575
576        let data = b"hello, multipart world!";
577        let etag = backend
578            .upload_part(
579                &id,
580                &upload_id,
581                NonZeroU32::new(1).unwrap(),
582                data.len() as u64,
583                None,
584                stream::single(data.to_vec()),
585            )
586            .await
587            .unwrap();
588
589        let result = backend
590            .complete_multipart(
591                &id,
592                &upload_id,
593                vec![CompletedPart {
594                    part_number: NonZeroU32::new(1).unwrap(),
595                    etag,
596                }],
597            )
598            .await
599            .unwrap();
600        assert!(result.is_none(), "expected no error on complete");
601
602        let (meta, _, body) = backend.get_object(&id, None).await.unwrap().unwrap();
603        let payload: BytesMut = body.try_collect().await.unwrap();
604        assert_eq!(payload.as_ref(), data);
605        assert_eq!(meta.content_type, "text/plain".to_string());
606        assert_eq!(
607            meta.expiration_policy,
608            ExpirationPolicy::TimeToIdle(Duration::from_hours(1))
609        );
610        assert_eq!(meta.origin, Some("203.0.113.42".into()));
611        assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
612    }
613
614    #[tokio::test]
615    async fn multipart_multiple_parts() {
616        let (_tempdir, backend) = make_backend();
617        let id = make_id();
618        let metadata = Metadata::default();
619
620        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
621
622        let part1 = b"aaaa".to_vec();
623        let part2 = b"bbbb".to_vec();
624        let part3 = b"cc".to_vec();
625
626        let etag1 = backend
627            .upload_part(
628                &id,
629                &upload_id,
630                NonZeroU32::new(1).unwrap(),
631                part1.len() as u64,
632                None,
633                stream::single(part1.clone()),
634            )
635            .await
636            .unwrap();
637        let etag2 = backend
638            .upload_part(
639                &id,
640                &upload_id,
641                NonZeroU32::new(2).unwrap(),
642                part2.len() as u64,
643                None,
644                stream::single(part2.clone()),
645            )
646            .await
647            .unwrap();
648        let etag3 = backend
649            .upload_part(
650                &id,
651                &upload_id,
652                NonZeroU32::new(3).unwrap(),
653                part3.len() as u64,
654                None,
655                stream::single(part3.clone()),
656            )
657            .await
658            .unwrap();
659
660        let result = backend
661            .complete_multipart(
662                &id,
663                &upload_id,
664                vec![
665                    CompletedPart {
666                        part_number: NonZeroU32::new(1).unwrap(),
667                        etag: etag1,
668                    },
669                    CompletedPart {
670                        part_number: NonZeroU32::new(2).unwrap(),
671                        etag: etag2,
672                    },
673                    CompletedPart {
674                        part_number: NonZeroU32::new(3).unwrap(),
675                        etag: etag3,
676                    },
677                ],
678            )
679            .await
680            .unwrap();
681        assert!(result.is_none());
682
683        let (_, _, body) = backend.get_object(&id, None).await.unwrap().unwrap();
684        let payload: BytesMut = body.try_collect().await.unwrap();
685        assert_eq!(payload.as_ref(), b"aaaabbbbcc");
686    }
687
688    #[tokio::test]
689    async fn multipart_list_parts() {
690        let (_tempdir, backend) = make_backend();
691        let id = make_id();
692        let metadata = Metadata::default();
693
694        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
695
696        let etag1 = backend
697            .upload_part(
698                &id,
699                &upload_id,
700                NonZeroU32::new(1).unwrap(),
701                3,
702                None,
703                stream::single(b"aaa".to_vec()),
704            )
705            .await
706            .unwrap();
707        let etag2 = backend
708            .upload_part(
709                &id,
710                &upload_id,
711                NonZeroU32::new(2).unwrap(),
712                3,
713                None,
714                stream::single(b"bbb".to_vec()),
715            )
716            .await
717            .unwrap();
718
719        let list = backend
720            .list_parts(&id, &upload_id, None, None)
721            .await
722            .unwrap();
723        assert_eq!(list.parts.len(), 2);
724        assert_eq!(list.parts[0].part_number.get(), 1);
725        assert_eq!(list.parts[0].etag, etag1);
726        assert_eq!(list.parts[0].size, 3);
727        assert_eq!(list.parts[1].part_number.get(), 2);
728        assert_eq!(list.parts[1].etag, etag2);
729        assert_eq!(list.parts[1].size, 3);
730
731        // Pagination
732        let page1 = backend
733            .list_parts(&id, &upload_id, Some(1), None)
734            .await
735            .unwrap();
736        assert_eq!(page1.parts.len(), 1);
737        assert_eq!(page1.parts[0].part_number.get(), 1);
738        assert!(page1.is_truncated);
739        assert!(page1.next_part_number_marker.is_some());
740
741        let page2 = backend
742            .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
743            .await
744            .unwrap();
745        assert_eq!(page2.parts.len(), 1);
746        assert_eq!(page2.parts[0].part_number.get(), 2);
747
748        backend.abort_multipart(&id, &upload_id).await.unwrap();
749    }
750
751    #[tokio::test]
752    async fn get_object_range_bounded() {
753        let (_tempdir, backend) = make_backend();
754        let id = make_id();
755        let metadata = Metadata::default();
756
757        let payload = b"Hello, range requests!";
758        backend
759            .put_object(&id, &metadata, stream::single(payload.to_vec()))
760            .await
761            .unwrap();
762
763        // Request bytes 7-11 → "range"
764        let (_, content_range, body) = backend
765            .get_object(&id, Some(ByteRange::Bounded(7, 11)))
766            .await
767            .unwrap()
768            .unwrap();
769        let data: BytesMut = body.try_collect().await.unwrap();
770
771        assert_eq!(data.as_ref(), b"range");
772        let content_range = content_range.unwrap();
773        assert_eq!(content_range.start, 7);
774        assert_eq!(content_range.end, 11);
775        assert_eq!(content_range.total, payload.len() as u64);
776    }
777
778    #[tokio::test]
779    async fn get_object_range_from() {
780        let (_tempdir, backend) = make_backend();
781        let id = make_id();
782        let metadata = Metadata::default();
783
784        let payload = b"Hello, range requests!";
785        backend
786            .put_object(&id, &metadata, stream::single(payload.to_vec()))
787            .await
788            .unwrap();
789
790        // Request bytes 7- → "range requests!"
791        let (_, content_range, body) = backend
792            .get_object(&id, Some(ByteRange::From(7)))
793            .await
794            .unwrap()
795            .unwrap();
796        let data: BytesMut = body.try_collect().await.unwrap();
797
798        assert_eq!(data.as_ref(), b"range requests!");
799        let content_range = content_range.unwrap();
800        assert_eq!(content_range.start, 7);
801        assert_eq!(content_range.end, 21);
802        assert_eq!(content_range.total, payload.len() as u64);
803    }
804
805    #[tokio::test]
806    async fn get_object_range_last() {
807        let (_tempdir, backend) = make_backend();
808        let id = make_id();
809        let metadata = Metadata::default();
810
811        let payload = b"Hello, range requests!";
812        backend
813            .put_object(&id, &metadata, stream::single(payload.to_vec()))
814            .await
815            .unwrap();
816
817        // Request last 9 bytes → "requests!"
818        let (_, content_range, body) = backend
819            .get_object(&id, Some(ByteRange::Last(9)))
820            .await
821            .unwrap()
822            .unwrap();
823        let data: BytesMut = body.try_collect().await.unwrap();
824
825        assert_eq!(data.as_ref(), b"requests!");
826        let content_range = content_range.unwrap();
827        assert_eq!(content_range.start, 13);
828        assert_eq!(content_range.end, 21);
829        assert_eq!(content_range.total, payload.len() as u64);
830    }
831
832    #[tokio::test]
833    async fn get_object_range_unsatisfiable() {
834        let (_tempdir, backend) = make_backend();
835        let id = make_id();
836        let metadata = Metadata::default();
837
838        backend
839            .put_object(&id, &metadata, stream::single(b"short".to_vec()))
840            .await
841            .unwrap();
842
843        match backend.get_object(&id, Some(ByteRange::From(100))).await {
844            Err(Error::RangeNotSatisfiable { total: 5 }) => {}
845            Err(other) => panic!("expected RangeNotSatisfiable, got: {other:?}"),
846            Ok(_) => panic!("expected RangeNotSatisfiable, got Ok"),
847        }
848    }
849
850    #[tokio::test]
851    async fn multipart_abort() {
852        let (_tempdir, backend) = make_backend();
853        let id = make_id();
854        let metadata = Metadata::default();
855
856        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
857
858        backend
859            .upload_part(
860                &id,
861                &upload_id,
862                NonZeroU32::new(1).unwrap(),
863                5,
864                None,
865                stream::single(b"hello".to_vec()),
866            )
867            .await
868            .unwrap();
869
870        backend.abort_multipart(&id, &upload_id).await.unwrap();
871
872        let result = backend.get_object(&id, None).await.unwrap();
873        assert!(result.is_none(), "object should not exist after abort");
874    }
875
876    #[tokio::test]
877    async fn multipart_invalid_etag() {
878        let (_tempdir, backend) = make_backend();
879        let id = make_id();
880        let metadata = Metadata::default();
881
882        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
883
884        let etag = backend
885            .upload_part(
886                &id,
887                &upload_id,
888                NonZeroU32::new(1).unwrap(),
889                5,
890                None,
891                stream::single(b"hello".to_vec()),
892            )
893            .await
894            .unwrap();
895
896        let result = backend
897            .complete_multipart(
898                &id,
899                &upload_id,
900                vec![CompletedPart {
901                    part_number: NonZeroU32::new(1).unwrap(),
902                    etag: "wrong-etag".into(),
903                }],
904            )
905            .await
906            .unwrap();
907        assert!(result.is_some(), "expected error for bad etag");
908        assert_eq!(result.unwrap().code, "InvalidPart");
909
910        // Upload must survive a failed complete so the client can retry.
911        let result = backend
912            .complete_multipart(
913                &id,
914                &upload_id,
915                vec![CompletedPart {
916                    part_number: NonZeroU32::new(1).unwrap(),
917                    etag,
918                }],
919            )
920            .await
921            .unwrap();
922        assert!(result.is_none(), "retry with correct etag should succeed");
923    }
924
925    #[tokio::test]
926    async fn multipart_missing_part() {
927        let (_tempdir, backend) = make_backend();
928        let id = make_id();
929        let metadata = Metadata::default();
930
931        let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
932
933        let etag = backend
934            .upload_part(
935                &id,
936                &upload_id,
937                NonZeroU32::new(1).unwrap(),
938                5,
939                None,
940                stream::single(b"hello".to_vec()),
941            )
942            .await
943            .unwrap();
944
945        let result = backend
946            .complete_multipart(
947                &id,
948                &upload_id,
949                vec![CompletedPart {
950                    part_number: NonZeroU32::new(99).unwrap(),
951                    etag: "whatever".into(),
952                }],
953            )
954            .await
955            .unwrap();
956        assert!(result.is_some(), "expected error for missing part");
957        assert_eq!(result.unwrap().code, "InvalidPart");
958
959        // Upload must survive a failed complete so the client can retry.
960        let result = backend
961            .complete_multipart(
962                &id,
963                &upload_id,
964                vec![CompletedPart {
965                    part_number: NonZeroU32::new(1).unwrap(),
966                    etag,
967                }],
968            )
969            .await
970            .unwrap();
971        assert!(result.is_none(), "retry with correct part should succeed");
972    }
973}