1use std::io::ErrorKind;
4use std::path::PathBuf;
5use std::pin::pin;
6use std::time::SystemTime;
7
8use futures_util::StreamExt;
9use objectstore_types::metadata::Metadata;
10use objectstore_types::range::ByteRange;
11use tokio::fs::OpenOptions;
12use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};
13use tokio_util::io::{ReaderStream, StreamReader};
14
15use crate::backend::common::{
16 Backend, DeleteResponse, GetResponse, MultipartUploadBackend, PutResponse,
17};
18use crate::error::{Error, Result};
19use crate::id::ObjectId;
20use crate::multipart::{
21 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
22 ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
23};
24use crate::stream::{self, ClientStream};
25
26#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
39pub struct FileSystemConfig {
40 pub path: PathBuf,
54}
55
56#[derive(Debug)]
58pub struct LocalFsBackend {
59 path: PathBuf,
60}
61
62impl LocalFsBackend {
63 pub fn new(config: FileSystemConfig) -> Self {
65 Self { path: config.path }
66 }
67}
68
69#[async_trait::async_trait]
70impl Backend for LocalFsBackend {
71 fn name(&self) -> &'static str {
72 "local-fs"
73 }
74
75 fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
76 Ok(self)
77 }
78
79 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
80 async fn put_object(
81 &self,
82 id: &ObjectId,
83 metadata: &Metadata,
84 stream: ClientStream,
85 ) -> Result<PutResponse> {
86 let path = self.path.join(id.as_storage_path().to_string());
87 objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend");
88 tokio::fs::create_dir_all(path.parent().unwrap()).await?;
89 let file = OpenOptions::new()
90 .create(true)
91 .write(true)
92 .truncate(true)
93 .open(path)
94 .await?;
95
96 let mut reader = pin!(StreamReader::new(stream));
97 let mut writer = BufWriter::new(file);
98
99 let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
100 context: "failed to serialize metadata".to_string(),
101 cause,
102 })?;
103 writer.write_all(metadata_json.as_bytes()).await?;
104 writer.write_all(b"\n").await?;
105
106 tokio::io::copy(&mut reader, &mut writer)
107 .await
108 .map_err(|e| match stream::unpack_client_error(&e) {
109 Some(ce) => Error::Client(ce),
110 None => e.into(),
111 })?;
112
113 writer.flush().await?;
114 let file = writer.into_inner();
115 file.sync_data().await?;
116 drop(file);
117
118 Ok(())
119 }
120
121 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
123 async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
124 objectstore_log::debug!("Reading from local_fs backend");
125 let path = self.path.join(id.as_storage_path().to_string());
126 let file = match OpenOptions::new().read(true).open(path).await {
127 Ok(file) => file,
128 Err(err) if err.kind() == ErrorKind::NotFound => {
129 objectstore_log::debug!("Object not found");
130 return Ok(None);
131 }
132 err => err?,
133 };
134
135 let mut reader = BufReader::new(file);
136 let mut metadata_line = String::new();
137 reader.read_line(&mut metadata_line).await?;
138 let file_len = reader.get_ref().metadata().await?.len();
139 let mut metadata: Metadata =
140 serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde {
141 context: "failed to deserialize metadata".to_string(),
142 cause,
143 })?;
144 let payload_size = file_len
145 .checked_sub(metadata_line.len() as u64)
146 .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?;
147 metadata.size = Some(payload_size as usize);
148
149 let (content_range, stream) = match range {
150 Some(byte_range) => {
151 let content_range =
152 byte_range
153 .resolve(payload_size)
154 .ok_or(Error::RangeNotSatisfiable {
155 total: payload_size,
156 })?;
157 reader
158 .seek(std::io::SeekFrom::Current(content_range.start as i64))
159 .await?;
160 let limited = reader.take(content_range.len());
161 (Some(content_range), ReaderStream::new(limited).boxed())
162 }
163 None => (None, ReaderStream::new(reader).boxed()),
164 };
165 Ok(Some((metadata, content_range, stream)))
166 }
167
168 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
169 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
170 objectstore_log::debug!("Deleting from local_fs backend");
171 let path = self.path.join(id.as_storage_path().to_string());
172 let result = tokio::fs::remove_file(path).await;
173 if let Err(e) = &result
174 && e.kind() == ErrorKind::NotFound
175 {
176 objectstore_log::debug!("Object not found");
177 }
178 Ok(result?)
179 }
180}
181
182impl LocalFsBackend {
183 fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf {
184 self.path
185 .join("__multipart__")
186 .join(id.as_storage_path().to_string())
187 .join(upload_id.as_str())
188 }
189}
190
191#[async_trait::async_trait]
192impl MultipartUploadBackend for LocalFsBackend {
193 async fn initiate_multipart(
194 &self,
195 id: &ObjectId,
196 metadata: &Metadata,
197 ) -> Result<InitiateMultipartResponse> {
198 let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?;
199 let dir = self.multipart_dir(id, &upload_id);
200 tokio::fs::create_dir_all(&dir).await?;
201
202 let meta_path = dir.join("metadata.json");
203 let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
204 context: "failed to serialize multipart metadata".to_string(),
205 cause,
206 })?;
207 tokio::fs::write(meta_path, metadata_json).await?;
208
209 Ok(upload_id)
210 }
211
212 async fn upload_part(
213 &self,
214 id: &ObjectId,
215 upload_id: &UploadId,
216 part_number: PartNumber,
217 content_length: u64,
218 _content_md5: Option<&str>,
219 body: ClientStream,
220 ) -> Result<UploadPartResponse> {
221 let dir = self.multipart_dir(id, upload_id);
222 if !tokio::fs::try_exists(&dir).await? {
223 return Err(Error::generic("multipart upload not found"));
224 }
225
226 let etag = format!("\"etag-{part_number}-{content_length}\"");
227
228 let header = serde_json::json!({
229 "etag": etag,
230 "uploaded_at": SystemTime::now(),
231 "size": content_length,
232 });
233 let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde {
234 context: "failed to serialize part header".to_string(),
235 cause,
236 })?;
237
238 let part_path = dir.join(format!("{part_number}.part"));
239 let file = OpenOptions::new()
240 .create(true)
241 .write(true)
242 .truncate(true)
243 .open(part_path)
244 .await?;
245
246 let mut reader = pin!(StreamReader::new(body));
247 let mut writer = BufWriter::new(file);
248 writer.write_all(header_line.as_bytes()).await?;
249 writer.write_all(b"\n").await?;
250
251 let _bytes_copied = tokio::io::copy(&mut reader, &mut writer)
252 .await
253 .map_err(|e| match stream::unpack_client_error(&e) {
254 Some(ce) => Error::Client(ce),
255 None => e.into(),
256 })?;
257
258 writer.flush().await?;
263 let file = writer.into_inner();
264 file.sync_data().await?;
265 drop(file);
266
267 Ok(etag)
268 }
269
270 async fn list_parts(
271 &self,
272 id: &ObjectId,
273 upload_id: &UploadId,
274 max_parts: Option<u32>,
275 part_number_marker: Option<PartNumber>,
276 ) -> Result<ListPartsResponse> {
277 let dir = self.multipart_dir(id, upload_id);
278 if !tokio::fs::try_exists(&dir).await? {
279 return Err(Error::generic("multipart upload not found"));
280 }
281
282 let mut entries = tokio::fs::read_dir(&dir).await?;
283 let mut parts = Vec::new();
284
285 while let Some(entry) = entries.next_entry().await? {
286 let name = entry.file_name();
287 let name_str = name.to_string_lossy();
288 let Some(pn_str) = name_str.strip_suffix(".part") else {
289 continue;
290 };
291 let Ok(pn) = pn_str.parse::<PartNumber>() else {
292 continue;
293 };
294
295 if part_number_marker.is_some_and(|marker| pn <= marker) {
296 continue;
297 }
298
299 let file = tokio::fs::File::open(entry.path()).await?;
300 let mut reader = BufReader::new(file);
301 let mut header_line = String::new();
302 reader.read_line(&mut header_line).await?;
303 let header: serde_json::Value =
304 serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
305 context: "failed to deserialize part header".to_string(),
306 cause,
307 })?;
308
309 parts.push(Part {
310 part_number: pn,
311 etag: header["etag"].as_str().unwrap_or("").to_string(),
312 last_modified: serde_json::from_value(header["uploaded_at"].clone())
313 .unwrap_or(SystemTime::UNIX_EPOCH),
314 size: header["size"].as_u64().unwrap_or(0),
315 });
316 }
317
318 parts.sort_by_key(|p| p.part_number);
319
320 let max = max_parts.unwrap_or(u32::MAX) as usize;
321 let is_truncated = parts.len() > max;
322 parts.truncate(max);
323
324 let next_part_number_marker = if is_truncated {
325 parts.last().map(|p| p.part_number)
326 } else {
327 None
328 };
329
330 Ok(ListPartsResponse {
331 parts,
332 is_truncated,
333 next_part_number_marker,
334 })
335 }
336
337 async fn abort_multipart(
338 &self,
339 id: &ObjectId,
340 upload_id: &UploadId,
341 ) -> Result<AbortMultipartResponse> {
342 let dir = self.multipart_dir(id, upload_id);
343 if tokio::fs::try_exists(&dir).await? {
344 tokio::fs::remove_dir_all(dir).await?;
345 }
346 Ok(())
347 }
348
349 async fn complete_multipart(
350 &self,
351 id: &ObjectId,
352 upload_id: &UploadId,
353 parts: Vec<CompletedPart>,
354 ) -> Result<CompleteMultipartResponse> {
355 let dir = self.multipart_dir(id, upload_id);
356 if !tokio::fs::try_exists(&dir).await? {
357 return Err(Error::generic("multipart upload not found"));
358 }
359
360 let meta_path = dir.join("metadata.json");
362 let meta_bytes = tokio::fs::read(&meta_path).await?;
363 let metadata: Metadata =
364 serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde {
365 context: "failed to deserialize multipart metadata".to_string(),
366 cause,
367 })?;
368
369 for completed in &parts {
374 let part_path = dir.join(format!("{}.part", completed.part_number));
375 if !tokio::fs::try_exists(&part_path).await? {
376 return Ok(Some(crate::multipart::CompleteMultipartError {
377 code: "InvalidPart".into(),
378 message: format!("part number {} was not uploaded", completed.part_number),
379 }));
380 }
381
382 let file = tokio::fs::File::open(&part_path).await?;
383 let mut reader = BufReader::new(file);
384 let mut header_line = String::new();
385 reader.read_line(&mut header_line).await?;
386 let header: serde_json::Value =
387 serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
388 context: "failed to deserialize part header".to_string(),
389 cause,
390 })?;
391
392 let stored_etag = header["etag"].as_str().unwrap_or("");
393 if stored_etag != completed.etag {
394 return Ok(Some(crate::multipart::CompleteMultipartError {
395 code: "InvalidPart".into(),
396 message: format!(
397 "etag mismatch for part {}: expected {}, got {}",
398 completed.part_number, stored_etag, completed.etag
399 ),
400 }));
401 }
402 }
403
404 let path = self.path.join(id.as_storage_path().to_string());
406 tokio::fs::create_dir_all(path.parent().unwrap()).await?;
407 let file = OpenOptions::new()
408 .create(true)
409 .write(true)
410 .truncate(true)
411 .open(path)
412 .await?;
413 let mut writer = BufWriter::new(file);
414
415 let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde {
416 context: "failed to serialize metadata".to_string(),
417 cause,
418 })?;
419 writer.write_all(metadata_json.as_bytes()).await?;
420 writer.write_all(b"\n").await?;
421
422 for completed in &parts {
423 let part_path = dir.join(format!("{}.part", completed.part_number));
424 let file = tokio::fs::File::open(&part_path).await?;
425 let mut reader = BufReader::new(file);
426 let mut header_line = String::new();
427 reader.read_line(&mut header_line).await?;
428 tokio::io::copy(&mut reader, &mut writer).await?;
429 }
430
431 writer.flush().await?;
432 let file = writer.into_inner();
433 file.sync_data().await?;
434 drop(file);
435
436 tokio::fs::remove_dir_all(dir).await?;
438
439 Ok(None)
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use std::num::NonZeroU32;
446 use std::time::{Duration, SystemTime};
447
448 use bytes::BytesMut;
449 use futures_util::TryStreamExt;
450 use objectstore_types::metadata::{Compression, ExpirationPolicy};
451 use objectstore_types::scope::{Scope, Scopes};
452
453 use super::*;
454 use crate::id::ObjectContext;
455 use crate::stream;
456
457 #[tokio::test]
458 async fn stores_metadata() {
459 let tempdir = tempfile::tempdir().unwrap();
460 let backend = LocalFsBackend::new(FileSystemConfig {
461 path: tempdir.path().to_path_buf(),
462 });
463
464 let id = ObjectId::random(ObjectContext {
465 usecase: "testing".into(),
466 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
467 });
468
469 let metadata = Metadata {
470 content_type: "text/plain".into(),
471 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_hours(1)),
472 time_created: Some(SystemTime::now()),
473 time_expires: None,
474 compression: Some(Compression::Zstd),
475 origin: Some("203.0.113.42".into()),
476 custom: [("foo".into(), "bar".into())].into(),
477 size: None,
478 };
479 backend
480 .put_object(&id, &metadata, stream::single("oh hai!"))
481 .await
482 .unwrap();
483
484 let (read_metadata, _, stream) = backend.get_object(&id, None).await.unwrap().unwrap();
485 let file_contents: BytesMut = stream.try_collect().await.unwrap();
486
487 assert_eq!(
488 read_metadata,
489 Metadata {
490 size: Some(file_contents.len()),
491 ..metadata
492 }
493 );
494 assert_eq!(file_contents.as_ref(), b"oh hai!");
495 }
496
497 #[tokio::test]
498 async fn get_metadata_returns_metadata() {
499 let tempdir = tempfile::tempdir().unwrap();
500 let backend = LocalFsBackend::new(FileSystemConfig {
501 path: tempdir.path().to_path_buf(),
502 });
503
504 let id = ObjectId::random(ObjectContext {
505 usecase: "testing".into(),
506 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
507 });
508
509 let metadata = Metadata {
510 content_type: "text/plain".into(),
511 compression: Some(Compression::Zstd),
512 origin: Some("203.0.113.42".into()),
513 custom: [("foo".into(), "bar".into())].into(),
514 ..Default::default()
515 };
516 backend
517 .put_object(&id, &metadata, stream::single("oh hai!"))
518 .await
519 .unwrap();
520
521 let read_metadata = backend.get_metadata(&id).await.unwrap().unwrap();
522 assert_eq!(
523 read_metadata,
524 Metadata {
525 size: Some(7),
526 ..metadata
527 }
528 );
529 }
530
531 #[tokio::test]
532 async fn get_metadata_nonexistent() {
533 let tempdir = tempfile::tempdir().unwrap();
534 let backend = LocalFsBackend::new(FileSystemConfig {
535 path: tempdir.path().to_path_buf(),
536 });
537
538 let id = ObjectId::random(ObjectContext {
539 usecase: "testing".into(),
540 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
541 });
542
543 let result = backend.get_metadata(&id).await.unwrap();
544 assert!(result.is_none());
545 }
546
547 fn make_id() -> ObjectId {
548 ObjectId::random(ObjectContext {
549 usecase: "testing".into(),
550 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
551 })
552 }
553
554 fn make_backend() -> (tempfile::TempDir, LocalFsBackend) {
555 let tempdir = tempfile::tempdir().unwrap();
556 let backend = LocalFsBackend::new(FileSystemConfig {
557 path: tempdir.path().to_path_buf(),
558 });
559 (tempdir, backend)
560 }
561
562 #[tokio::test]
563 async fn multipart_single_part() {
564 let (_tempdir, backend) = make_backend();
565 let id = make_id();
566 let metadata = Metadata {
567 content_type: "text/plain".into(),
568 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_hours(1)),
569 origin: Some("203.0.113.42".into()),
570 custom: [("foo".into(), "bar".into())].into(),
571 ..Default::default()
572 };
573
574 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
575
576 let data = b"hello, multipart world!";
577 let etag = backend
578 .upload_part(
579 &id,
580 &upload_id,
581 NonZeroU32::new(1).unwrap(),
582 data.len() as u64,
583 None,
584 stream::single(data.to_vec()),
585 )
586 .await
587 .unwrap();
588
589 let result = backend
590 .complete_multipart(
591 &id,
592 &upload_id,
593 vec![CompletedPart {
594 part_number: NonZeroU32::new(1).unwrap(),
595 etag,
596 }],
597 )
598 .await
599 .unwrap();
600 assert!(result.is_none(), "expected no error on complete");
601
602 let (meta, _, body) = backend.get_object(&id, None).await.unwrap().unwrap();
603 let payload: BytesMut = body.try_collect().await.unwrap();
604 assert_eq!(payload.as_ref(), data);
605 assert_eq!(meta.content_type, "text/plain".to_string());
606 assert_eq!(
607 meta.expiration_policy,
608 ExpirationPolicy::TimeToIdle(Duration::from_hours(1))
609 );
610 assert_eq!(meta.origin, Some("203.0.113.42".into()));
611 assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
612 }
613
614 #[tokio::test]
615 async fn multipart_multiple_parts() {
616 let (_tempdir, backend) = make_backend();
617 let id = make_id();
618 let metadata = Metadata::default();
619
620 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
621
622 let part1 = b"aaaa".to_vec();
623 let part2 = b"bbbb".to_vec();
624 let part3 = b"cc".to_vec();
625
626 let etag1 = backend
627 .upload_part(
628 &id,
629 &upload_id,
630 NonZeroU32::new(1).unwrap(),
631 part1.len() as u64,
632 None,
633 stream::single(part1.clone()),
634 )
635 .await
636 .unwrap();
637 let etag2 = backend
638 .upload_part(
639 &id,
640 &upload_id,
641 NonZeroU32::new(2).unwrap(),
642 part2.len() as u64,
643 None,
644 stream::single(part2.clone()),
645 )
646 .await
647 .unwrap();
648 let etag3 = backend
649 .upload_part(
650 &id,
651 &upload_id,
652 NonZeroU32::new(3).unwrap(),
653 part3.len() as u64,
654 None,
655 stream::single(part3.clone()),
656 )
657 .await
658 .unwrap();
659
660 let result = backend
661 .complete_multipart(
662 &id,
663 &upload_id,
664 vec![
665 CompletedPart {
666 part_number: NonZeroU32::new(1).unwrap(),
667 etag: etag1,
668 },
669 CompletedPart {
670 part_number: NonZeroU32::new(2).unwrap(),
671 etag: etag2,
672 },
673 CompletedPart {
674 part_number: NonZeroU32::new(3).unwrap(),
675 etag: etag3,
676 },
677 ],
678 )
679 .await
680 .unwrap();
681 assert!(result.is_none());
682
683 let (_, _, body) = backend.get_object(&id, None).await.unwrap().unwrap();
684 let payload: BytesMut = body.try_collect().await.unwrap();
685 assert_eq!(payload.as_ref(), b"aaaabbbbcc");
686 }
687
688 #[tokio::test]
689 async fn multipart_list_parts() {
690 let (_tempdir, backend) = make_backend();
691 let id = make_id();
692 let metadata = Metadata::default();
693
694 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
695
696 let etag1 = backend
697 .upload_part(
698 &id,
699 &upload_id,
700 NonZeroU32::new(1).unwrap(),
701 3,
702 None,
703 stream::single(b"aaa".to_vec()),
704 )
705 .await
706 .unwrap();
707 let etag2 = backend
708 .upload_part(
709 &id,
710 &upload_id,
711 NonZeroU32::new(2).unwrap(),
712 3,
713 None,
714 stream::single(b"bbb".to_vec()),
715 )
716 .await
717 .unwrap();
718
719 let list = backend
720 .list_parts(&id, &upload_id, None, None)
721 .await
722 .unwrap();
723 assert_eq!(list.parts.len(), 2);
724 assert_eq!(list.parts[0].part_number.get(), 1);
725 assert_eq!(list.parts[0].etag, etag1);
726 assert_eq!(list.parts[0].size, 3);
727 assert_eq!(list.parts[1].part_number.get(), 2);
728 assert_eq!(list.parts[1].etag, etag2);
729 assert_eq!(list.parts[1].size, 3);
730
731 let page1 = backend
733 .list_parts(&id, &upload_id, Some(1), None)
734 .await
735 .unwrap();
736 assert_eq!(page1.parts.len(), 1);
737 assert_eq!(page1.parts[0].part_number.get(), 1);
738 assert!(page1.is_truncated);
739 assert!(page1.next_part_number_marker.is_some());
740
741 let page2 = backend
742 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
743 .await
744 .unwrap();
745 assert_eq!(page2.parts.len(), 1);
746 assert_eq!(page2.parts[0].part_number.get(), 2);
747
748 backend.abort_multipart(&id, &upload_id).await.unwrap();
749 }
750
751 #[tokio::test]
752 async fn get_object_range_bounded() {
753 let (_tempdir, backend) = make_backend();
754 let id = make_id();
755 let metadata = Metadata::default();
756
757 let payload = b"Hello, range requests!";
758 backend
759 .put_object(&id, &metadata, stream::single(payload.to_vec()))
760 .await
761 .unwrap();
762
763 let (_, content_range, body) = backend
765 .get_object(&id, Some(ByteRange::Bounded(7, 11)))
766 .await
767 .unwrap()
768 .unwrap();
769 let data: BytesMut = body.try_collect().await.unwrap();
770
771 assert_eq!(data.as_ref(), b"range");
772 let content_range = content_range.unwrap();
773 assert_eq!(content_range.start, 7);
774 assert_eq!(content_range.end, 11);
775 assert_eq!(content_range.total, payload.len() as u64);
776 }
777
778 #[tokio::test]
779 async fn get_object_range_from() {
780 let (_tempdir, backend) = make_backend();
781 let id = make_id();
782 let metadata = Metadata::default();
783
784 let payload = b"Hello, range requests!";
785 backend
786 .put_object(&id, &metadata, stream::single(payload.to_vec()))
787 .await
788 .unwrap();
789
790 let (_, content_range, body) = backend
792 .get_object(&id, Some(ByteRange::From(7)))
793 .await
794 .unwrap()
795 .unwrap();
796 let data: BytesMut = body.try_collect().await.unwrap();
797
798 assert_eq!(data.as_ref(), b"range requests!");
799 let content_range = content_range.unwrap();
800 assert_eq!(content_range.start, 7);
801 assert_eq!(content_range.end, 21);
802 assert_eq!(content_range.total, payload.len() as u64);
803 }
804
805 #[tokio::test]
806 async fn get_object_range_last() {
807 let (_tempdir, backend) = make_backend();
808 let id = make_id();
809 let metadata = Metadata::default();
810
811 let payload = b"Hello, range requests!";
812 backend
813 .put_object(&id, &metadata, stream::single(payload.to_vec()))
814 .await
815 .unwrap();
816
817 let (_, content_range, body) = backend
819 .get_object(&id, Some(ByteRange::Last(9)))
820 .await
821 .unwrap()
822 .unwrap();
823 let data: BytesMut = body.try_collect().await.unwrap();
824
825 assert_eq!(data.as_ref(), b"requests!");
826 let content_range = content_range.unwrap();
827 assert_eq!(content_range.start, 13);
828 assert_eq!(content_range.end, 21);
829 assert_eq!(content_range.total, payload.len() as u64);
830 }
831
832 #[tokio::test]
833 async fn get_object_range_unsatisfiable() {
834 let (_tempdir, backend) = make_backend();
835 let id = make_id();
836 let metadata = Metadata::default();
837
838 backend
839 .put_object(&id, &metadata, stream::single(b"short".to_vec()))
840 .await
841 .unwrap();
842
843 match backend.get_object(&id, Some(ByteRange::From(100))).await {
844 Err(Error::RangeNotSatisfiable { total: 5 }) => {}
845 Err(other) => panic!("expected RangeNotSatisfiable, got: {other:?}"),
846 Ok(_) => panic!("expected RangeNotSatisfiable, got Ok"),
847 }
848 }
849
850 #[tokio::test]
851 async fn multipart_abort() {
852 let (_tempdir, backend) = make_backend();
853 let id = make_id();
854 let metadata = Metadata::default();
855
856 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
857
858 backend
859 .upload_part(
860 &id,
861 &upload_id,
862 NonZeroU32::new(1).unwrap(),
863 5,
864 None,
865 stream::single(b"hello".to_vec()),
866 )
867 .await
868 .unwrap();
869
870 backend.abort_multipart(&id, &upload_id).await.unwrap();
871
872 let result = backend.get_object(&id, None).await.unwrap();
873 assert!(result.is_none(), "object should not exist after abort");
874 }
875
876 #[tokio::test]
877 async fn multipart_invalid_etag() {
878 let (_tempdir, backend) = make_backend();
879 let id = make_id();
880 let metadata = Metadata::default();
881
882 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
883
884 let etag = backend
885 .upload_part(
886 &id,
887 &upload_id,
888 NonZeroU32::new(1).unwrap(),
889 5,
890 None,
891 stream::single(b"hello".to_vec()),
892 )
893 .await
894 .unwrap();
895
896 let result = backend
897 .complete_multipart(
898 &id,
899 &upload_id,
900 vec![CompletedPart {
901 part_number: NonZeroU32::new(1).unwrap(),
902 etag: "wrong-etag".into(),
903 }],
904 )
905 .await
906 .unwrap();
907 assert!(result.is_some(), "expected error for bad etag");
908 assert_eq!(result.unwrap().code, "InvalidPart");
909
910 let result = backend
912 .complete_multipart(
913 &id,
914 &upload_id,
915 vec![CompletedPart {
916 part_number: NonZeroU32::new(1).unwrap(),
917 etag,
918 }],
919 )
920 .await
921 .unwrap();
922 assert!(result.is_none(), "retry with correct etag should succeed");
923 }
924
925 #[tokio::test]
926 async fn multipart_missing_part() {
927 let (_tempdir, backend) = make_backend();
928 let id = make_id();
929 let metadata = Metadata::default();
930
931 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
932
933 let etag = backend
934 .upload_part(
935 &id,
936 &upload_id,
937 NonZeroU32::new(1).unwrap(),
938 5,
939 None,
940 stream::single(b"hello".to_vec()),
941 )
942 .await
943 .unwrap();
944
945 let result = backend
946 .complete_multipart(
947 &id,
948 &upload_id,
949 vec![CompletedPart {
950 part_number: NonZeroU32::new(99).unwrap(),
951 etag: "whatever".into(),
952 }],
953 )
954 .await
955 .unwrap();
956 assert!(result.is_some(), "expected error for missing part");
957 assert_eq!(result.unwrap().code, "InvalidPart");
958
959 let result = backend
961 .complete_multipart(
962 &id,
963 &upload_id,
964 vec![CompletedPart {
965 part_number: NonZeroU32::new(1).unwrap(),
966 etag,
967 }],
968 )
969 .await
970 .unwrap();
971 assert!(result.is_none(), "retry with correct part should succeed");
972 }
973}