1use std::io::ErrorKind;
4use std::path::PathBuf;
5use std::pin::pin;
6use std::time::SystemTime;
7
8use futures_util::StreamExt;
9use objectstore_types::metadata::Metadata;
10use tokio::fs::OpenOptions;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
12use tokio_util::io::{ReaderStream, StreamReader};
13
14use crate::backend::common::{
15 Backend, DeleteResponse, GetResponse, MultipartUploadBackend, PutResponse,
16};
17use crate::error::{Error, Result};
18use crate::id::ObjectId;
19use crate::multipart::{
20 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
21 ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse,
22};
23use crate::stream::{self, ClientStream};
24
25#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
38pub struct FileSystemConfig {
39 pub path: PathBuf,
53}
54
55#[derive(Debug)]
57pub struct LocalFsBackend {
58 path: PathBuf,
59}
60
61impl LocalFsBackend {
62 pub fn new(config: FileSystemConfig) -> Self {
64 Self { path: config.path }
65 }
66}
67
68#[async_trait::async_trait]
69impl Backend for LocalFsBackend {
70 fn name(&self) -> &'static str {
71 "local-fs"
72 }
73
74 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
75 async fn put_object(
76 &self,
77 id: &ObjectId,
78 metadata: &Metadata,
79 stream: ClientStream,
80 ) -> Result<PutResponse> {
81 let path = self.path.join(id.as_storage_path().to_string());
82 objectstore_log::debug!(path=%path.display(), "Writing to local_fs backend");
83 tokio::fs::create_dir_all(path.parent().unwrap()).await?;
84 let file = OpenOptions::new()
85 .create(true)
86 .write(true)
87 .truncate(true)
88 .open(path)
89 .await?;
90
91 let mut reader = pin!(StreamReader::new(stream));
92 let mut writer = BufWriter::new(file);
93
94 let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
95 context: "failed to serialize metadata".to_string(),
96 cause,
97 })?;
98 writer.write_all(metadata_json.as_bytes()).await?;
99 writer.write_all(b"\n").await?;
100
101 tokio::io::copy(&mut reader, &mut writer)
102 .await
103 .map_err(|e| match stream::unpack_client_error(&e) {
104 Some(ce) => Error::Client(ce),
105 None => e.into(),
106 })?;
107
108 writer.flush().await?;
109 let file = writer.into_inner();
110 file.sync_data().await?;
111 drop(file);
112
113 Ok(())
114 }
115
116 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
118 async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
119 objectstore_log::debug!("Reading from local_fs backend");
120 let path = self.path.join(id.as_storage_path().to_string());
121 let file = match OpenOptions::new().read(true).open(path).await {
122 Ok(file) => file,
123 Err(err) if err.kind() == ErrorKind::NotFound => {
124 objectstore_log::debug!("Object not found");
125 return Ok(None);
126 }
127 err => err?,
128 };
129
130 let mut reader = BufReader::new(file);
131 let mut metadata_line = String::new();
132 reader.read_line(&mut metadata_line).await?;
133 let file_len = reader.get_ref().metadata().await?.len();
134 let mut metadata: Metadata =
135 serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde {
136 context: "failed to deserialize metadata".to_string(),
137 cause,
138 })?;
139 let payload_size = file_len
140 .checked_sub(metadata_line.len() as u64)
141 .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?;
142 metadata.size = Some(payload_size as usize);
143
144 let stream = ReaderStream::new(reader);
145 Ok(Some((metadata, stream.boxed())))
146 }
147
148 #[tracing::instrument(level = "trace", fields(?id), skip_all)]
149 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
150 objectstore_log::debug!("Deleting from local_fs backend");
151 let path = self.path.join(id.as_storage_path().to_string());
152 let result = tokio::fs::remove_file(path).await;
153 if let Err(e) = &result
154 && e.kind() == ErrorKind::NotFound
155 {
156 objectstore_log::debug!("Object not found");
157 }
158 Ok(result?)
159 }
160}
161
162impl LocalFsBackend {
163 fn multipart_dir(&self, id: &ObjectId, upload_id: &UploadId) -> PathBuf {
164 self.path
165 .join("__multipart__")
166 .join(id.as_storage_path().to_string())
167 .join(upload_id)
168 }
169}
170
171#[async_trait::async_trait]
172impl MultipartUploadBackend for LocalFsBackend {
173 async fn initiate_multipart(
174 &self,
175 id: &ObjectId,
176 metadata: &Metadata,
177 ) -> Result<InitiateMultipartResponse> {
178 let upload_id = uuid::Uuid::now_v7().to_string();
179 let dir = self.multipart_dir(id, &upload_id);
180 tokio::fs::create_dir_all(&dir).await?;
181
182 let meta_path = dir.join("metadata.json");
183 let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde {
184 context: "failed to serialize multipart metadata".to_string(),
185 cause,
186 })?;
187 tokio::fs::write(meta_path, metadata_json).await?;
188
189 Ok(upload_id)
190 }
191
192 async fn upload_part(
193 &self,
194 id: &ObjectId,
195 upload_id: &UploadId,
196 part_number: PartNumber,
197 content_length: u64,
198 _content_md5: Option<&str>,
199 body: ClientStream,
200 ) -> Result<UploadPartResponse> {
201 let dir = self.multipart_dir(id, upload_id);
202 if !tokio::fs::try_exists(&dir).await? {
203 return Err(Error::generic("multipart upload not found"));
204 }
205
206 let etag = format!("\"etag-{part_number}-{content_length}\"");
207
208 let header = serde_json::json!({
209 "etag": etag,
210 "uploaded_at": SystemTime::now(),
211 "size": content_length,
212 });
213 let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde {
214 context: "failed to serialize part header".to_string(),
215 cause,
216 })?;
217
218 let part_path = dir.join(format!("{part_number}.part"));
219 let file = OpenOptions::new()
220 .create(true)
221 .write(true)
222 .truncate(true)
223 .open(part_path)
224 .await?;
225
226 let mut reader = pin!(StreamReader::new(body));
227 let mut writer = BufWriter::new(file);
228 writer.write_all(header_line.as_bytes()).await?;
229 writer.write_all(b"\n").await?;
230
231 let _bytes_copied = tokio::io::copy(&mut reader, &mut writer)
232 .await
233 .map_err(|e| match stream::unpack_client_error(&e) {
234 Some(ce) => Error::Client(ce),
235 None => e.into(),
236 })?;
237
238 writer.flush().await?;
243 let file = writer.into_inner();
244 file.sync_data().await?;
245 drop(file);
246
247 Ok(etag)
248 }
249
250 async fn list_parts(
251 &self,
252 id: &ObjectId,
253 upload_id: &UploadId,
254 max_parts: Option<u32>,
255 part_number_marker: Option<PartNumber>,
256 ) -> Result<ListPartsResponse> {
257 let dir = self.multipart_dir(id, upload_id);
258 if !tokio::fs::try_exists(&dir).await? {
259 return Err(Error::generic("multipart upload not found"));
260 }
261
262 let mut entries = tokio::fs::read_dir(&dir).await?;
263 let mut parts = Vec::new();
264
265 while let Some(entry) = entries.next_entry().await? {
266 let name = entry.file_name();
267 let name_str = name.to_string_lossy();
268 let Some(pn_str) = name_str.strip_suffix(".part") else {
269 continue;
270 };
271 let Ok(pn) = pn_str.parse::<PartNumber>() else {
272 continue;
273 };
274
275 if part_number_marker.is_some_and(|marker| pn <= marker) {
276 continue;
277 }
278
279 let file = tokio::fs::File::open(entry.path()).await?;
280 let mut reader = BufReader::new(file);
281 let mut header_line = String::new();
282 reader.read_line(&mut header_line).await?;
283 let header: serde_json::Value =
284 serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
285 context: "failed to deserialize part header".to_string(),
286 cause,
287 })?;
288
289 parts.push(Part {
290 part_number: pn,
291 etag: header["etag"].as_str().unwrap_or("").to_string(),
292 last_modified: serde_json::from_value(header["uploaded_at"].clone())
293 .unwrap_or(SystemTime::UNIX_EPOCH),
294 size: header["size"].as_u64().unwrap_or(0),
295 });
296 }
297
298 parts.sort_by_key(|p| p.part_number);
299
300 let max = max_parts.unwrap_or(u32::MAX) as usize;
301 let is_truncated = parts.len() > max;
302 parts.truncate(max);
303
304 let next_part_number_marker = if is_truncated {
305 parts.last().map(|p| p.part_number)
306 } else {
307 None
308 };
309
310 Ok(ListPartsResponse {
311 parts,
312 is_truncated,
313 next_part_number_marker,
314 })
315 }
316
317 async fn abort_multipart(
318 &self,
319 id: &ObjectId,
320 upload_id: &UploadId,
321 ) -> Result<AbortMultipartResponse> {
322 let dir = self.multipart_dir(id, upload_id);
323 if tokio::fs::try_exists(&dir).await? {
324 tokio::fs::remove_dir_all(dir).await?;
325 }
326 Ok(())
327 }
328
329 async fn complete_multipart(
330 &self,
331 id: &ObjectId,
332 upload_id: &UploadId,
333 parts: Vec<CompletedPart>,
334 ) -> Result<CompleteMultipartResponse> {
335 let dir = self.multipart_dir(id, upload_id);
336 if !tokio::fs::try_exists(&dir).await? {
337 return Err(Error::generic("multipart upload not found"));
338 }
339
340 let meta_path = dir.join("metadata.json");
342 let meta_bytes = tokio::fs::read(&meta_path).await?;
343 let metadata: Metadata =
344 serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde {
345 context: "failed to deserialize multipart metadata".to_string(),
346 cause,
347 })?;
348
349 for completed in &parts {
354 let part_path = dir.join(format!("{}.part", completed.part_number));
355 if !tokio::fs::try_exists(&part_path).await? {
356 return Ok(Some(crate::multipart::CompleteMultipartError {
357 code: "InvalidPart".into(),
358 message: format!("part number {} was not uploaded", completed.part_number),
359 }));
360 }
361
362 let file = tokio::fs::File::open(&part_path).await?;
363 let mut reader = BufReader::new(file);
364 let mut header_line = String::new();
365 reader.read_line(&mut header_line).await?;
366 let header: serde_json::Value =
367 serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde {
368 context: "failed to deserialize part header".to_string(),
369 cause,
370 })?;
371
372 let stored_etag = header["etag"].as_str().unwrap_or("");
373 if stored_etag != completed.etag {
374 return Ok(Some(crate::multipart::CompleteMultipartError {
375 code: "InvalidPart".into(),
376 message: format!(
377 "etag mismatch for part {}: expected {}, got {}",
378 completed.part_number, stored_etag, completed.etag
379 ),
380 }));
381 }
382 }
383
384 let path = self.path.join(id.as_storage_path().to_string());
386 tokio::fs::create_dir_all(path.parent().unwrap()).await?;
387 let file = OpenOptions::new()
388 .create(true)
389 .write(true)
390 .truncate(true)
391 .open(path)
392 .await?;
393 let mut writer = BufWriter::new(file);
394
395 let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde {
396 context: "failed to serialize metadata".to_string(),
397 cause,
398 })?;
399 writer.write_all(metadata_json.as_bytes()).await?;
400 writer.write_all(b"\n").await?;
401
402 for completed in &parts {
403 let part_path = dir.join(format!("{}.part", completed.part_number));
404 let file = tokio::fs::File::open(&part_path).await?;
405 let mut reader = BufReader::new(file);
406 let mut header_line = String::new();
407 reader.read_line(&mut header_line).await?;
408 tokio::io::copy(&mut reader, &mut writer).await?;
409 }
410
411 writer.flush().await?;
412 let file = writer.into_inner();
413 file.sync_data().await?;
414 drop(file);
415
416 tokio::fs::remove_dir_all(dir).await?;
418
419 Ok(None)
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::time::{Duration, SystemTime};
426
427 use bytes::BytesMut;
428 use futures_util::TryStreamExt;
429 use objectstore_types::metadata::{Compression, ExpirationPolicy};
430 use objectstore_types::scope::{Scope, Scopes};
431
432 use super::*;
433 use crate::id::ObjectContext;
434 use crate::stream;
435
436 #[tokio::test]
437 async fn stores_metadata() {
438 let tempdir = tempfile::tempdir().unwrap();
439 let backend = LocalFsBackend::new(FileSystemConfig {
440 path: tempdir.path().to_path_buf(),
441 });
442
443 let id = ObjectId::random(ObjectContext {
444 usecase: "testing".into(),
445 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
446 });
447
448 let metadata = Metadata {
449 content_type: "text/plain".into(),
450 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
451 time_created: Some(SystemTime::now()),
452 time_expires: None,
453 compression: Some(Compression::Zstd),
454 origin: Some("203.0.113.42".into()),
455 custom: [("foo".into(), "bar".into())].into(),
456 size: None,
457 };
458 backend
459 .put_object(&id, &metadata, stream::single("oh hai!"))
460 .await
461 .unwrap();
462
463 let (read_metadata, stream) = backend.get_object(&id).await.unwrap().unwrap();
464 let file_contents: BytesMut = stream.try_collect().await.unwrap();
465
466 assert_eq!(
467 read_metadata,
468 Metadata {
469 size: Some(file_contents.len()),
470 ..metadata
471 }
472 );
473 assert_eq!(file_contents.as_ref(), b"oh hai!");
474 }
475
476 #[tokio::test]
477 async fn get_metadata_returns_metadata() {
478 let tempdir = tempfile::tempdir().unwrap();
479 let backend = LocalFsBackend::new(FileSystemConfig {
480 path: tempdir.path().to_path_buf(),
481 });
482
483 let id = ObjectId::random(ObjectContext {
484 usecase: "testing".into(),
485 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
486 });
487
488 let metadata = Metadata {
489 content_type: "text/plain".into(),
490 compression: Some(Compression::Zstd),
491 origin: Some("203.0.113.42".into()),
492 custom: [("foo".into(), "bar".into())].into(),
493 ..Default::default()
494 };
495 backend
496 .put_object(&id, &metadata, stream::single("oh hai!"))
497 .await
498 .unwrap();
499
500 let read_metadata = backend.get_metadata(&id).await.unwrap().unwrap();
501 assert_eq!(
502 read_metadata,
503 Metadata {
504 size: Some(7),
505 ..metadata
506 }
507 );
508 }
509
510 #[tokio::test]
511 async fn get_metadata_nonexistent() {
512 let tempdir = tempfile::tempdir().unwrap();
513 let backend = LocalFsBackend::new(FileSystemConfig {
514 path: tempdir.path().to_path_buf(),
515 });
516
517 let id = ObjectId::random(ObjectContext {
518 usecase: "testing".into(),
519 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
520 });
521
522 let result = backend.get_metadata(&id).await.unwrap();
523 assert!(result.is_none());
524 }
525
526 fn make_id() -> ObjectId {
527 ObjectId::random(ObjectContext {
528 usecase: "testing".into(),
529 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
530 })
531 }
532
533 fn make_backend() -> (tempfile::TempDir, LocalFsBackend) {
534 let tempdir = tempfile::tempdir().unwrap();
535 let backend = LocalFsBackend::new(FileSystemConfig {
536 path: tempdir.path().to_path_buf(),
537 });
538 (tempdir, backend)
539 }
540
541 #[tokio::test]
542 async fn multipart_single_part() {
543 let (_tempdir, backend) = make_backend();
544 let id = make_id();
545 let metadata = Metadata {
546 content_type: "text/plain".into(),
547 expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
548 origin: Some("203.0.113.42".into()),
549 custom: [("foo".into(), "bar".into())].into(),
550 ..Default::default()
551 };
552
553 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
554
555 let data = b"hello, multipart world!";
556 let etag = backend
557 .upload_part(
558 &id,
559 &upload_id,
560 1,
561 data.len() as u64,
562 None,
563 stream::single(data.to_vec()),
564 )
565 .await
566 .unwrap();
567
568 let result = backend
569 .complete_multipart(
570 &id,
571 &upload_id,
572 vec![crate::multipart::CompletedPart {
573 part_number: 1,
574 etag,
575 }],
576 )
577 .await
578 .unwrap();
579 assert!(result.is_none(), "expected no error on complete");
580
581 let (meta, body) = backend.get_object(&id).await.unwrap().unwrap();
582 let payload: BytesMut = body.try_collect().await.unwrap();
583 assert_eq!(payload.as_ref(), data);
584 assert_eq!(meta.content_type, "text/plain".to_string());
585 assert_eq!(
586 meta.expiration_policy,
587 ExpirationPolicy::TimeToIdle(Duration::from_secs(3600))
588 );
589 assert_eq!(meta.origin, Some("203.0.113.42".into()));
590 assert_eq!(meta.custom, [("foo".into(), "bar".into())].into());
591 }
592
593 #[tokio::test]
594 async fn multipart_multiple_parts() {
595 let (_tempdir, backend) = make_backend();
596 let id = make_id();
597 let metadata = Metadata::default();
598
599 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
600
601 let part1 = b"aaaa".to_vec();
602 let part2 = b"bbbb".to_vec();
603 let part3 = b"cc".to_vec();
604
605 let etag1 = backend
606 .upload_part(
607 &id,
608 &upload_id,
609 1,
610 part1.len() as u64,
611 None,
612 stream::single(part1.clone()),
613 )
614 .await
615 .unwrap();
616 let etag2 = backend
617 .upload_part(
618 &id,
619 &upload_id,
620 2,
621 part2.len() as u64,
622 None,
623 stream::single(part2.clone()),
624 )
625 .await
626 .unwrap();
627 let etag3 = backend
628 .upload_part(
629 &id,
630 &upload_id,
631 3,
632 part3.len() as u64,
633 None,
634 stream::single(part3.clone()),
635 )
636 .await
637 .unwrap();
638
639 let result = backend
640 .complete_multipart(
641 &id,
642 &upload_id,
643 vec![
644 crate::multipart::CompletedPart {
645 part_number: 1,
646 etag: etag1,
647 },
648 crate::multipart::CompletedPart {
649 part_number: 2,
650 etag: etag2,
651 },
652 crate::multipart::CompletedPart {
653 part_number: 3,
654 etag: etag3,
655 },
656 ],
657 )
658 .await
659 .unwrap();
660 assert!(result.is_none());
661
662 let (_, body) = backend.get_object(&id).await.unwrap().unwrap();
663 let payload: BytesMut = body.try_collect().await.unwrap();
664 assert_eq!(payload.as_ref(), b"aaaabbbbcc");
665 }
666
667 #[tokio::test]
668 async fn multipart_list_parts() {
669 let (_tempdir, backend) = make_backend();
670 let id = make_id();
671 let metadata = Metadata::default();
672
673 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
674
675 let etag1 = backend
676 .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec()))
677 .await
678 .unwrap();
679 let etag2 = backend
680 .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec()))
681 .await
682 .unwrap();
683
684 let list = backend
685 .list_parts(&id, &upload_id, None, None)
686 .await
687 .unwrap();
688 assert_eq!(list.parts.len(), 2);
689 assert_eq!(list.parts[0].part_number, 1);
690 assert_eq!(list.parts[0].etag, etag1);
691 assert_eq!(list.parts[0].size, 3);
692 assert_eq!(list.parts[1].part_number, 2);
693 assert_eq!(list.parts[1].etag, etag2);
694 assert_eq!(list.parts[1].size, 3);
695
696 let page1 = backend
698 .list_parts(&id, &upload_id, Some(1), None)
699 .await
700 .unwrap();
701 assert_eq!(page1.parts.len(), 1);
702 assert_eq!(page1.parts[0].part_number, 1);
703 assert!(page1.is_truncated);
704 assert!(page1.next_part_number_marker.is_some());
705
706 let page2 = backend
707 .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker)
708 .await
709 .unwrap();
710 assert_eq!(page2.parts.len(), 1);
711 assert_eq!(page2.parts[0].part_number, 2);
712
713 backend.abort_multipart(&id, &upload_id).await.unwrap();
714 }
715
716 #[tokio::test]
717 async fn multipart_abort() {
718 let (_tempdir, backend) = make_backend();
719 let id = make_id();
720 let metadata = Metadata::default();
721
722 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
723
724 backend
725 .upload_part(
726 &id,
727 &upload_id,
728 1,
729 5,
730 None,
731 stream::single(b"hello".to_vec()),
732 )
733 .await
734 .unwrap();
735
736 backend.abort_multipart(&id, &upload_id).await.unwrap();
737
738 let result = backend.get_object(&id).await.unwrap();
739 assert!(result.is_none(), "object should not exist after abort");
740 }
741
742 #[tokio::test]
743 async fn multipart_invalid_etag() {
744 let (_tempdir, backend) = make_backend();
745 let id = make_id();
746 let metadata = Metadata::default();
747
748 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
749
750 let etag = backend
751 .upload_part(
752 &id,
753 &upload_id,
754 1,
755 5,
756 None,
757 stream::single(b"hello".to_vec()),
758 )
759 .await
760 .unwrap();
761
762 let result = backend
763 .complete_multipart(
764 &id,
765 &upload_id,
766 vec![crate::multipart::CompletedPart {
767 part_number: 1,
768 etag: "wrong-etag".into(),
769 }],
770 )
771 .await
772 .unwrap();
773 assert!(result.is_some(), "expected error for bad etag");
774 assert_eq!(result.unwrap().code, "InvalidPart");
775
776 let result = backend
778 .complete_multipart(
779 &id,
780 &upload_id,
781 vec![crate::multipart::CompletedPart {
782 part_number: 1,
783 etag,
784 }],
785 )
786 .await
787 .unwrap();
788 assert!(result.is_none(), "retry with correct etag should succeed");
789 }
790
791 #[tokio::test]
792 async fn multipart_missing_part() {
793 let (_tempdir, backend) = make_backend();
794 let id = make_id();
795 let metadata = Metadata::default();
796
797 let upload_id = backend.initiate_multipart(&id, &metadata).await.unwrap();
798
799 let etag = backend
800 .upload_part(
801 &id,
802 &upload_id,
803 1,
804 5,
805 None,
806 stream::single(b"hello".to_vec()),
807 )
808 .await
809 .unwrap();
810
811 let result = backend
812 .complete_multipart(
813 &id,
814 &upload_id,
815 vec![crate::multipart::CompletedPart {
816 part_number: 99,
817 etag: "whatever".into(),
818 }],
819 )
820 .await
821 .unwrap();
822 assert!(result.is_some(), "expected error for missing part");
823 assert_eq!(result.unwrap().code, "InvalidPart");
824
825 let result = backend
827 .complete_multipart(
828 &id,
829 &upload_id,
830 vec![crate::multipart::CompletedPart {
831 part_number: 1,
832 etag,
833 }],
834 )
835 .await
836 .unwrap();
837 assert!(result.is_none(), "retry with correct part should succeed");
838 }
839}