1use std::io::ErrorKind;
4use std::path::PathBuf;
5use std::pin::pin;
6use std::sync::Arc;
7use std::time::SystemTime;
8
9use futures_util::StreamExt;
10use objectstore_types::metadata::Metadata;
11use tokio::fs::OpenOptions;
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
13use tokio_util::io::{ReaderStream, StreamReader};
14
15use crate::backend::common::{
16 Backend, DeleteResponse, GetResponse, MultipartUploadBackend, PutResponse,
17};
18use crate::error::{Error, Result};
19use crate::id::ObjectId;
20use crate::multipart::{
21 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
22 ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
23};
24use crate::stream::{self, ClientStream};
25
26#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
39pub struct FileSystemConfig {
40 pub path: PathBuf,
54}
55
56#[derive(Debug)]
58pub struct LocalFsBackend {
59 path: PathBuf,
60}
61
62impl LocalFsBackend {
63 pub fn new(config: FileSystemConfig) -> Self {
65 Self { path: config.path }
66 }
67}
68
69#[async_trait::async_trait]
70impl Backend for LocalFsBackend {
71 fn name(&self) -> &'static str {
72 "local-fs"
73 }
74
75 fn as_multipart_upload_backend(self: Arc<Self>) -> Result<Arc<dyn MultipartUploadBackend>> {
76 Ok(self)
77 }
78
79 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
80 async fn put_object(
81 &self,
82 id: &ObjectId,
83 metadata: &Metadata,
84 stream: ClientStream,
85 ) -> Result<PutResponse> {
86 let path = self.path.join(id.as_storage_path().to_string());
87 objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend");
88 tokio::fs::create_dir_all(path.parent().unwrap()).await?;
89 let file = OpenOptions::new()
90 .create(true)
91 .write(true)
92 .truncate(true)
93 .open(path)
94 .await?;
95
96 let mut reader = pin!(StreamReader::new(stream));
97 let mut writer = BufWriter::new(file);
98
99 let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
100 context: "failed to serialize metadata".to_string(),
101 cause,
102 })?;
103 writer.write_all(metadata_json.as_bytes()).await?;
104 writer.write_all(b"\n").await?;
105
106 tokio::io::copy(&mut reader, &mut writer)
107 .await
108 .map_err(|e| match stream::unpack_client_error(&e) {
109 Some(ce) => Error::Client(ce),
110 None => e.into(),
111 })?;
112
113 writer.flush().await?;
114 let file = writer.into_inner();
115 file.sync_data().await?;
116 drop(file);
117
118 Ok(())
119 }
120
121 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
123 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
124 objectstore_log::debug!("Reading from local_fs backend");
125 let path = self.path.join(id.as_storage_path().to_string());
126 let file = match OpenOptions::new().read(true).open(path).await {
127 Ok(file) => file,
128 Err(err) if err.kind() == ErrorKind::NotFound => {
129 objectstore_log::debug!("Object not found");
130 return Ok(None);
131 }
132 err => err?,
133 };
134
135 let mut reader = BufReader::new(file);
136 let mut metadata_line = String::new();
137 reader.read_line(&mut metadata_line).await?;
138 let file_len = reader.get_ref().metadata().await?.len();
139 let mut metadata: Metadata =
140 serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde {
141 context: "failed to deserialize metadata".to_string(),
142 cause,
143 })?;
144 let payload_size = file_len
145 .checked_sub(metadata_line.len() as u64)
146 .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?;
147 metadata.size = Some(payload_size as usize);
148
149 let stream = ReaderStream::new(reader);
150 Ok(Some((metadata, stream.boxed())))
151 }
152
153 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
154 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
155 objectstore_log::debug!("Deleting from local_fs backend");
156 let path = self.path.join(id.as_storage_path().to_string());
157 let result = tokio::fs::remove_file(path).await;
158 if let Err(e) = &result
159 && e.kind() == ErrorKind::NotFound
160 {
161 objectstore_log::debug!("Object not found");
162 }
163 Ok(result?)
164 }
165}
166
167impl LocalFsBackend {
168 fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf {
169 self.path
170 .join("__multipart__")
171 .join(id.as_storage_path().to_string())
172 .join(upload_id.as_str())
173 }
174}
175
176#[async_trait::async_trait]
177impl MultipartUploadBackend for LocalFsBackend {
178 async fn initiate_multipart(
179 &self,
180 id: &ObjectId,
181 metadata: &Metadata,
182 ) -> Result<InitiateMultipartResponse> {
183 let upload_id = UploadId::new(uuid::Uuid::now_v7().to_string())?;
184 let dir = self.multipart_dir(id, &upload_id);
185 tokio::fs::create_dir_all(&dir).await?;
186
187 let meta_path = dir.join("metadata.json");
188 let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
189 context: "failed to serialize multipart metadata".to_string(),
190 cause,
191 })?;
192 tokio::fs::write(meta_path, metadata_json).await?;
193
194 Ok(upload_id)
195 }
196
197 async fn upload_part(
198 &self,
199 id: &ObjectId,
200 upload_id: &UploadId,
201 part_number: PartNumber,
202 content_length: u64,
203 _content_md5: Option<&str>,
204 body: ClientStream,
205 ) -> Result<UploadPartResponse> {
206 let dir = self.multipart_dir(id, upload_id);
207 if !tokio::fs::try_exists(&dir).await? {
208 return Err(Error::generic("multipart upload not found"));
209 }
210
211 let etag = format!("\"etag-{part_number}-{content_length}\"");
212
213 let header = serde_json::json!({
214 "etag": etag,
215 "uploaded_at": SystemTime::now(),
216 "size": content_length,
217 });
218 let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde {
219 context: "failed to serialize part header".to_string(),
220 cause,
221 })?;
222
223 let part_path = dir.join(format!("{part_number}.part"));
224 let file = OpenOptions::new()
225 .create(true)
226 .write(true)
227 .truncate(true)
228 .open(part_path)
229 .await?;
230
231 let mut reader = pin!(StreamReader::new(body));
232 let mut writer = BufWriter::new(file);
233 writer.write_all(header_line.as_bytes()).await?;
234 writer.write_all(b"\n").await?;
235
236 let _bytes_copied = tokio::io::copy(&mut reader, &mut writer)
237 .await
238 .map_err(|e| match stream::unpack_client_error(&e) {
239 Some(ce) => Error::Client(ce),
240 None => e.into(),
241 })?;
242
243 writer.flush().await?;
248 let file = writer.into_inner();
249 file.sync_data().await?;
250 drop(file);
251
252 Ok(etag)
253 }
254
255 async fn list_parts(
256 &self,
257 id: &ObjectId,
258 upload_id: &UploadId,
259 max_parts: Option<u32>,
260 part_number_marker: Option<PartNumber>,
261 ) -> Result<ListPartsResponse> {
262 let dir = self.multipart_dir(id, upload_id);
263 if !tokio::fs::try_exists(&dir).await? {
264 return Err(Error::generic("multipart upload not found"));
265 }
266
267 let mut entries = tokio::fs::read_dir(&dir).await?;
268 let mut parts = Vec::new();
269
270 while let Some(entry) = entries.next_entry().await? {
271 let name = entry.file_name();
272 let name_str = name.to_string_lossy();
273 let Some(pn_str) = name_str.strip_suffix(".part") else {
274 continue;
275 };
276 let Ok(pn) = pn_str.parse::<PartNumber>() else {
277 continue;
278 };
279
280 if part_number_marker.is_some_and(|marker| pn <= marker) {
281 continue;
282 }
283
284 let file = tokio::fs::File::open(entry.path()).await?;
285 let mut reader = BufReader::new(file);
286 let mut header_line = String::new();
287 reader.read_line(&mut header_line).await?;
288 let header: serde_json::Value =
289 serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
290 context: "failed to deserialize part header".to_string(),
291 cause,
292 })?;
293
294 parts.push(Part {
295 part_number: pn,
296 etag: header["etag"].as_str().unwrap_or("").to_string(),
297 last_modified: serde_json::from_value(header["uploaded_at"].clone())
298 .unwrap_or(SystemTime::UNIX_EPOCH),
299 size: header["size"].as_u64().unwrap_or(0),
300 });
301 }
302
303 parts.sort_by_key(|p| p.part_number);
304
305 let max = max_parts.unwrap_or(u32::MAX) as usize;
306 let is_truncated = parts.len() > max;
307 parts.truncate(max);
308
309 let next_part_number_marker = if is_truncated {
310 parts.last().map(|p| p.part_number)
311 } else {
312 None
313 };
314
315 Ok(ListPartsResponse {
316 parts,
317 is_truncated,
318 next_part_number_marker,
319 })
320 }
321
322 async fn abort_multipart(
323 &self,
324 id: &ObjectId,
325 upload_id: &UploadId,
326 ) -> Result<AbortMultipartResponse> {
327 let dir = self.multipart_dir(id, upload_id);
328 if tokio::fs::try_exists(&dir).await? {
329 tokio::fs::remove_dir_all(dir).await?;
330 }
331 Ok(())
332 }
333
334 async fn complete_multipart(
335 &self,
336 id: &ObjectId,
337 upload_id: &UploadId,
338 parts: Vec<CompletedPart>,
339 ) -> Result<CompleteMultipartResponse> {
340 let dir = self.multipart_dir(id, upload_id);
341 if !tokio::fs::try_exists(&dir).await? {
342 return Err(Error::generic("multipart upload not found"));
343 }
344
345 let meta_path = dir.join("metadata.json");
347 let meta_bytes = tokio::fs::read(&meta_path).await?;
348 let metadata: Metadata =
349 serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde {
350 context: "failed to deserialize multipart metadata".to_string(),
351 cause,
352 })?;
353
354 for completed in &parts {
359 let part_path = dir.join(format!("{}.part", completed.part_number));
360 if !tokio::fs::try_exists(&part_path).await? {
361 return Ok(Some(crate::multipart::CompleteMultipartError {
362 code: "InvalidPart".into(),
363 message: format!("part number {} was not uploaded", completed.part_number),
364 }));
365 }
366
367 let file = tokio::fs::File::open(&part_path).await?;
368 let mut reader = BufReader::new(file);
369 let mut header_line = String::new();
370 reader.read_line(&mut header_line).await?;
371 let header: serde_json::Value =
372 serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
373 context: "failed to deserialize part header".to_string(),
374 cause,
375 })?;
376
377 let stored_etag = header["etag"].as_str().unwrap_or("");
378 if stored_etag != completed.etag {
379 return Ok(Some(crate::multipart::CompleteMultipartError {
380 code: "InvalidPart".into(),
381 message: format!(
382 "etag mismatch for part {}: expected {}, got {}",
383 completed.part_number, stored_etag, completed.etag
384 ),
385 }));
386 }
387 }
388
389 let path = self.path.join(id.as_storage_path().to_string());
391 tokio::fs::create_dir_all(path.parent().unwrap()).await?;
392 let file = OpenOptions::new()
393 .create(true)
394 .write(true)
395 .truncate(true)
396 .open(path)
397 .await?;
398 let mut writer = BufWriter::new(file);
399
400 let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde {
401 context: "failed to serialize metadata".to_string(),
402 cause,
403 })?;
404 writer.write_all(metadata_json.as_bytes()).await?;
405 writer.write_all(b"\n").await?;
406
407 for completed in &parts {
408 let part_path = dir.join(format!("{}.part", completed.part_number));
409 let file = tokio::fs::File::open(&part_path).await?;
410 let mut reader = BufReader::new(file);
411 let mut header_line = String::new();
412 reader.read_line(&mut header_line).await?;
413 tokio::io::copy(&mut reader, &mut writer).await?;
414 }
415
416 writer.flush().await?;
417 let file = writer.into_inner();
418 file.sync_data().await?;
419 drop(file);
420
421 tokio::fs::remove_dir_all(dir).await?;
423
424 Ok(None)
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use std::num::NonZeroU32;
431 use std::time::{Duration, SystemTime};
432
433 use bytes::BytesMut;
434 use futures_util::TryStreamExt;
435 use objectstore_types::metadata::{Compression, ExpirationPolicy};
436 use objectstore_types::scope::{Scope, Scopes};
437
438 use super::*;
439 use crate::id::ObjectContext;
440 use crate::stream;
441
442 #[tokio::test]
443 async fn stores_metadata() {
444 let tempdir = tempfile::tempdir().unwrap();
445 let backend = LocalFsBackend::new(FileSystemConfig {
446 path: tempdir.path().to_path_buf(),
447 });
448
449 let id = ObjectId::random(ObjectContext {
450 usecase: "testing".into(),
451 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
452 });
453
454 let metadata = Metadata {
455 content_type: "text/plain".into(),
456 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
457 time_created: Some(SystemTime::now()),
458 time_expires: None,
459 compression: Some(Compression::Zstd),
460 origin: Some("203.0.113.42".into()),
461 custom: [("foo".into(), "bar".into())].into(),
462 size: None,
463 };
464 backend
465 .put_object(&id, &metadata, stream::single("oh hai!"))
466 .await
467 .unwrap();
468
469 let (read_metadata, stream) = backend.get_object(&id).await.unwrap().unwrap();
470 let file_contents: BytesMut = stream.try_collect().await.unwrap();
471
472 assert_eq!(
473 read_metadata,
474 Metadata {
475 size: Some(file_contents.len()),
476 ..metadata
477 }
478 );
479 assert_eq!(file_contents.as_ref(), b"oh hai!");
480 }
481
482 #[tokio::test]
483 async fn get_metadata_returns_metadata() {
484 let tempdir = tempfile::tempdir().unwrap();
485 let backend = LocalFsBackend::new(FileSystemConfig {
486 path: tempdir.path().to_path_buf(),
487 });
488
489 let id = ObjectId::random(ObjectContext {
490 usecase: "testing".into(),
491 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
492 });
493
494 let metadata = Metadata {
495 content_type: "text/plain".into(),
496 compression: Some(Compression::Zstd),
497 origin: Some("203.0.113.42".into()),
498 custom: [("foo".into(), "bar".into())].into(),
499 ..Default::default()
500 };
501 backend
502 .put_object(&id, &metadata, stream::single("oh hai!"))
503 .await
504 .unwrap();
505
506 let read_metadata = backend.get_metadata(&id).await.unwrap().unwrap();
507 assert_eq!(
508 read_metadata,
509 Metadata {
510 size: Some(7),
511 ..metadata
512 }
513 );
514 }
515
516 #[tokio::test]
517 async fn get_metadata_nonexistent() {
518 let tempdir = tempfile::tempdir().unwrap();
519 let backend = LocalFsBackend::new(FileSystemConfig {
520 path: tempdir.path().to_path_buf(),
521 });
522
523 let id = ObjectId::random(ObjectContext {
524 usecase: "testing".into(),
525 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
526 });
527
528 let result = backend.get_metadata(&id).await.unwrap();
529 assert!(result.is_none());
530 }
531
532 fn make_id() -> ObjectId {
533 ObjectId::random(ObjectContext {
534 usecase: "testing".into(),
535 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
536 })
537 }
538
539 fn make_backend() -> (tempfile::TempDir, LocalFsBackend) {
540 let tempdir = tempfile::tempdir().unwrap();
541 let backend = LocalFsBackend::new(FileSystemConfig {
542 path: tempdir.path().to_path_buf(),
543 });
544 (tempdir, backend)
545 }
546
547 #[tokio::test]
548 async fn multipart_single_part() {
549 let (_tempdir, backend) = make_backend();
550 let id = make_id();
551 let metadata = Metadata {
552 content_type: "text/plain".into(),
553 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
554 origin: Some("203.0.113.42".into()),
555 custom: [("foo".into(), "bar".into())].into(),
556 ..Default::default()
557 };
558
559 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
560
561 let data = b"hello, multipart world!";
562 let etag = backend
563 .upload_part(
564 &id,
565 &upload_id,
566 NonZeroU32::new(1).unwrap(),
567 data.len() as u64,
568 None,
569 stream::single(data.to_vec()),
570 )
571 .await
572 .unwrap();
573
574 let result = backend
575 .complete_multipart(
576 &id,
577 &upload_id,
578 vec![crate::multipart::CompletedPart {
579 part_number: NonZeroU32::new(1).unwrap(),
580 etag,
581 }],
582 )
583 .await
584 .unwrap();
585 assert!(result.is_none(), "expected no error on complete");
586
587 let (meta, body) = backend.get_object(&id).await.unwrap().unwrap();
588 let payload: BytesMut = body.try_collect().await.unwrap();
589 assert_eq!(payload.as_ref(), data);
590 assert_eq!(meta.content_type, "text/plain".to_string());
591 assert_eq!(
592 meta.expiration_policy,
593 ExpirationPolicy::TimeToIdle(Duration::from_secs(3600))
594 );
595 assert_eq!(meta.origin, Some("203.0.113.42".into()));
596 assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
597 }
598
599 #[tokio::test]
600 async fn multipart_multiple_parts() {
601 let (_tempdir, backend) = make_backend();
602 let id = make_id();
603 let metadata = Metadata::default();
604
605 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
606
607 let part1 = b"aaaa".to_vec();
608 let part2 = b"bbbb".to_vec();
609 let part3 = b"cc".to_vec();
610
611 let etag1 = backend
612 .upload_part(
613 &id,
614 &upload_id,
615 NonZeroU32::new(1).unwrap(),
616 part1.len() as u64,
617 None,
618 stream::single(part1.clone()),
619 )
620 .await
621 .unwrap();
622 let etag2 = backend
623 .upload_part(
624 &id,
625 &upload_id,
626 NonZeroU32::new(2).unwrap(),
627 part2.len() as u64,
628 None,
629 stream::single(part2.clone()),
630 )
631 .await
632 .unwrap();
633 let etag3 = backend
634 .upload_part(
635 &id,
636 &upload_id,
637 NonZeroU32::new(3).unwrap(),
638 part3.len() as u64,
639 None,
640 stream::single(part3.clone()),
641 )
642 .await
643 .unwrap();
644
645 let result = backend
646 .complete_multipart(
647 &id,
648 &upload_id,
649 vec![
650 crate::multipart::CompletedPart {
651 part_number: NonZeroU32::new(1).unwrap(),
652 etag: etag1,
653 },
654 crate::multipart::CompletedPart {
655 part_number: NonZeroU32::new(2).unwrap(),
656 etag: etag2,
657 },
658 crate::multipart::CompletedPart {
659 part_number: NonZeroU32::new(3).unwrap(),
660 etag: etag3,
661 },
662 ],
663 )
664 .await
665 .unwrap();
666 assert!(result.is_none());
667
668 let (_, body) = backend.get_object(&id).await.unwrap().unwrap();
669 let payload: BytesMut = body.try_collect().await.unwrap();
670 assert_eq!(payload.as_ref(), b"aaaabbbbcc");
671 }
672
673 #[tokio::test]
674 async fn multipart_list_parts() {
675 let (_tempdir, backend) = make_backend();
676 let id = make_id();
677 let metadata = Metadata::default();
678
679 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
680
681 let etag1 = backend
682 .upload_part(
683 &id,
684 &upload_id,
685 NonZeroU32::new(1).unwrap(),
686 3,
687 None,
688 stream::single(b"aaa".to_vec()),
689 )
690 .await
691 .unwrap();
692 let etag2 = backend
693 .upload_part(
694 &id,
695 &upload_id,
696 NonZeroU32::new(2).unwrap(),
697 3,
698 None,
699 stream::single(b"bbb".to_vec()),
700 )
701 .await
702 .unwrap();
703
704 let list = backend
705 .list_parts(&id, &upload_id, None, None)
706 .await
707 .unwrap();
708 assert_eq!(list.parts.len(), 2);
709 assert_eq!(list.parts[0].part_number.get(), 1);
710 assert_eq!(list.parts[0].etag, etag1);
711 assert_eq!(list.parts[0].size, 3);
712 assert_eq!(list.parts[1].part_number.get(), 2);
713 assert_eq!(list.parts[1].etag, etag2);
714 assert_eq!(list.parts[1].size, 3);
715
716 let page1 = backend
718 .list_parts(&id, &upload_id, Some(1), None)
719 .await
720 .unwrap();
721 assert_eq!(page1.parts.len(), 1);
722 assert_eq!(page1.parts[0].part_number.get(), 1);
723 assert!(page1.is_truncated);
724 assert!(page1.next_part_number_marker.is_some());
725
726 let page2 = backend
727 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
728 .await
729 .unwrap();
730 assert_eq!(page2.parts.len(), 1);
731 assert_eq!(page2.parts[0].part_number.get(), 2);
732
733 backend.abort_multipart(&id, &upload_id).await.unwrap();
734 }
735
736 #[tokio::test]
737 async fn multipart_abort() {
738 let (_tempdir, backend) = make_backend();
739 let id = make_id();
740 let metadata = Metadata::default();
741
742 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
743
744 backend
745 .upload_part(
746 &id,
747 &upload_id,
748 NonZeroU32::new(1).unwrap(),
749 5,
750 None,
751 stream::single(b"hello".to_vec()),
752 )
753 .await
754 .unwrap();
755
756 backend.abort_multipart(&id, &upload_id).await.unwrap();
757
758 let result = backend.get_object(&id).await.unwrap();
759 assert!(result.is_none(), "object should not exist after abort");
760 }
761
762 #[tokio::test]
763 async fn multipart_invalid_etag() {
764 let (_tempdir, backend) = make_backend();
765 let id = make_id();
766 let metadata = Metadata::default();
767
768 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
769
770 let etag = backend
771 .upload_part(
772 &id,
773 &upload_id,
774 NonZeroU32::new(1).unwrap(),
775 5,
776 None,
777 stream::single(b"hello".to_vec()),
778 )
779 .await
780 .unwrap();
781
782 let result = backend
783 .complete_multipart(
784 &id,
785 &upload_id,
786 vec![crate::multipart::CompletedPart {
787 part_number: NonZeroU32::new(1).unwrap(),
788 etag: "wrong-etag".into(),
789 }],
790 )
791 .await
792 .unwrap();
793 assert!(result.is_some(), "expected error for bad etag");
794 assert_eq!(result.unwrap().code, "InvalidPart");
795
796 let result = backend
798 .complete_multipart(
799 &id,
800 &upload_id,
801 vec![crate::multipart::CompletedPart {
802 part_number: NonZeroU32::new(1).unwrap(),
803 etag,
804 }],
805 )
806 .await
807 .unwrap();
808 assert!(result.is_none(), "retry with correct etag should succeed");
809 }
810
811 #[tokio::test]
812 async fn multipart_missing_part() {
813 let (_tempdir, backend) = make_backend();
814 let id = make_id();
815 let metadata = Metadata::default();
816
817 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
818
819 let etag = backend
820 .upload_part(
821 &id,
822 &upload_id,
823 NonZeroU32::new(1).unwrap(),
824 5,
825 None,
826 stream::single(b"hello".to_vec()),
827 )
828 .await
829 .unwrap();
830
831 let result = backend
832 .complete_multipart(
833 &id,
834 &upload_id,
835 vec![crate::multipart::CompletedPart {
836 part_number: NonZeroU32::new(99).unwrap(),
837 etag: "whatever".into(),
838 }],
839 )
840 .await
841 .unwrap();
842 assert!(result.is_some(), "expected error for missing part");
843 assert_eq!(result.unwrap().code, "InvalidPart");
844
845 let result = backend
847 .complete_multipart(
848 &id,
849 &upload_id,
850 vec![crate::multipart::CompletedPart {
851 part_number: NonZeroU32::new(1).unwrap(),
852 etag,
853 }],
854 )
855 .await
856 .unwrap();
857 assert!(result.is_none(), "retry with correct part should succeed");
858 }
859}