Skip to main content

objectstore_client/
many.rs

1use std::collections::{HashMap, HashSet};
2use std::fmt;
3use std::io;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures_util::{Stream, StreamExt as _};
9use multer::Field;
10use objectstore_types::metadata::{Compression, Metadata};
11use percent_encoding::NON_ALPHANUMERIC;
12use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
13use reqwest::multipart::Part;
14
15use crate::error::Error;
16use crate::put::PutBody;
17use crate::{
18    DeleteBuilder, DeleteResponse, GetBuilder, GetResponse, HeadBuilder, HeadResponse, ObjectKey,
19    PutBuilder, PutResponse, Session, get, put,
20};
21
22const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key";
23const HEADER_BATCH_OPERATION_KIND: &str = "x-sn-batch-operation-kind";
24const HEADER_BATCH_OPERATION_INDEX: &str = "x-sn-batch-operation-index";
25const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status";
26
27/// Maximum number of operations to send in a batch request.
28const MAX_BATCH_OPS: usize = 1000;
29
30/// Maximum amount of bytes to send as a part's body in a batch request.
31const MAX_BATCH_PART_SIZE: u32 = 1024 * 1024; // 1 MB
32
33/// Default maximum number of concurrent individual (non-batch) requests.
34///
35/// Can be overridden via [`ManyBuilder::max_individual_concurrency`].
36const DEFAULT_INDIVIDUAL_CONCURRENCY: usize = 5;
37
38/// Default maximum number of concurrent batch requests.
39///
40/// Can be overridden via [`ManyBuilder::max_batch_concurrency`].
41const DEFAULT_BATCH_CONCURRENCY: usize = 3;
42
43/// Maximum total body (post-compression, estimated) size to include in a single batch request.
44const MAX_BATCH_BODY_SIZE: u64 = 100 * 1024 * 1024; // 100 MB
45
46/// A builder that can be used to enqueue multiple operations.
47///
48/// The client can optionally execute the operations as batch requests, leading to
49/// reduced network overhead.
50#[derive(Debug)]
51pub struct ManyBuilder {
52    session: Session,
53    operations: Vec<BatchOperation>,
54    max_individual_concurrency: Option<usize>,
55    max_batch_concurrency: Option<usize>,
56}
57
58impl Session {
59    /// Creates a [`ManyBuilder`] associated with this session.
60    ///
61    /// A [`ManyBuilder`] can be used to enqueue multiple operations, which the client can choose to
62    /// send as batch requests via a dedicated endpoint, minimizing network overhead.
63    pub fn many(&self) -> ManyBuilder {
64        ManyBuilder {
65            session: self.clone(),
66            operations: vec![],
67            max_individual_concurrency: None,
68            max_batch_concurrency: None,
69        }
70    }
71}
72
73#[derive(Debug)]
74#[allow(clippy::large_enum_variant)]
75enum BatchOperation {
76    Get {
77        key: ObjectKey,
78        decompress: bool,
79        accept_encoding: Vec<Compression>,
80    },
81    Insert {
82        key: Option<ObjectKey>,
83        metadata: Metadata,
84        body: PutBody,
85    },
86    Delete {
87        key: ObjectKey,
88    },
89    Head {
90        key: ObjectKey,
91    },
92}
93
94impl From<GetBuilder> for BatchOperation {
95    fn from(value: GetBuilder) -> Self {
96        let GetBuilder {
97            key,
98            decompress,
99            accept_encoding,
100            session: _session,
101        } = value;
102        BatchOperation::Get {
103            key,
104            decompress,
105            accept_encoding,
106        }
107    }
108}
109
110impl From<PutBuilder> for BatchOperation {
111    fn from(value: PutBuilder) -> Self {
112        let PutBuilder {
113            key,
114            metadata,
115            body,
116            session: _session,
117        } = value;
118        BatchOperation::Insert {
119            key,
120            metadata,
121            body,
122        }
123    }
124}
125
126impl From<DeleteBuilder> for BatchOperation {
127    fn from(value: DeleteBuilder) -> Self {
128        let DeleteBuilder {
129            key,
130            session: _session,
131        } = value;
132        BatchOperation::Delete { key }
133    }
134}
135
136impl From<HeadBuilder> for BatchOperation {
137    fn from(value: HeadBuilder) -> Self {
138        let HeadBuilder {
139            key,
140            session: _session,
141        } = value;
142        BatchOperation::Head { key }
143    }
144}
145
146impl BatchOperation {
147    async fn into_part(self) -> crate::Result<Part> {
148        match self {
149            BatchOperation::Get { key, .. } => {
150                let headers = operation_headers("get", Some(&key));
151                Ok(Part::text("").headers(headers))
152            }
153            BatchOperation::Insert {
154                key,
155                metadata,
156                body,
157            } => {
158                let mut headers = operation_headers("insert", key.as_deref());
159                headers.extend(metadata.to_headers("")?);
160
161                let body = put::maybe_compress(body, metadata.compression).await?;
162                Ok(Part::stream(body).headers(headers))
163            }
164            BatchOperation::Delete { key } => {
165                let headers = operation_headers("delete", Some(&key));
166                Ok(Part::text("").headers(headers))
167            }
168            BatchOperation::Head { key } => {
169                let headers = operation_headers("head", Some(&key));
170                Ok(Part::text("").headers(headers))
171            }
172        }
173    }
174}
175
176fn operation_headers(operation: &str, key: Option<&str>) -> HeaderMap {
177    let mut headers = HeaderMap::new();
178    headers.insert(
179        HeaderName::from_static(HEADER_BATCH_OPERATION_KIND),
180        HeaderValue::from_str(operation).expect("operation kind is always a valid header value"),
181    );
182    if let Some(key) = key {
183        let encoded =
184            percent_encoding::percent_encode(key.as_bytes(), NON_ALPHANUMERIC).to_string();
185        headers.insert(
186            HeaderName::from_static(HEADER_BATCH_OPERATION_KEY),
187            HeaderValue::try_from(encoded)
188                .expect("percent-encoded string is always a valid header value"),
189        );
190    }
191    headers
192}
193
194/// The result of an individual operation.
195#[derive(Debug)]
196pub enum OperationResult {
197    /// The result of a get operation.
198    ///
199    /// Returns `Ok(None)` if the object was not found.
200    Get(ObjectKey, Result<Option<GetResponse>, Error>),
201    /// The result of a put operation.
202    Put(ObjectKey, Result<PutResponse, Error>),
203    /// The result of a delete operation.
204    Delete(ObjectKey, Result<DeleteResponse, Error>),
205    /// The result of a head (metadata-only) operation.
206    ///
207    /// Returns `Ok(None)` if the object was not found.
208    Head(ObjectKey, Result<HeadResponse, Error>),
209    /// An error occurred while parsing or correlating a response part.
210    ///
211    /// This makes it impossible to attribute the error to a specific operation.
212    /// It can happen if the response contains malformed or missing headers, references
213    /// unknown operation indices, or if a network error occurs while reading a response part.
214    Error(Error),
215}
216
217/// Context for an operation, used to map a response part to a proper `OperationResult`.
218enum OperationContext {
219    Get {
220        key: ObjectKey,
221        decompress: bool,
222        accept_encoding: Vec<Compression>,
223    },
224    Insert {
225        key: Option<ObjectKey>,
226    },
227    Delete {
228        key: ObjectKey,
229    },
230    Head {
231        key: ObjectKey,
232    },
233}
234
235impl From<&BatchOperation> for OperationContext {
236    fn from(op: &BatchOperation) -> Self {
237        match op {
238            BatchOperation::Get {
239                key,
240                decompress,
241                accept_encoding,
242            } => OperationContext::Get {
243                key: key.clone(),
244                decompress: *decompress,
245                accept_encoding: accept_encoding.clone(),
246            },
247            BatchOperation::Insert { key, .. } => OperationContext::Insert { key: key.clone() },
248            BatchOperation::Delete { key } => OperationContext::Delete { key: key.clone() },
249            BatchOperation::Head { key } => OperationContext::Head { key: key.clone() },
250        }
251    }
252}
253
254impl OperationContext {
255    fn key(&self) -> Option<&str> {
256        match self {
257            OperationContext::Get { key, .. }
258            | OperationContext::Delete { key }
259            | OperationContext::Head { key } => Some(key),
260            OperationContext::Insert { key } => key.as_deref(),
261        }
262    }
263}
264
265/// The result of classifying a single operation for batch processing.
266#[derive(Debug)]
267enum Classified {
268    /// The operation can be included in a batch request, with its estimated body size in bytes.
269    Batchable(BatchOperation, u64),
270    /// The operation must be executed as an individual request (e.g., oversized file body).
271    Individual(BatchOperation),
272    /// An error was encountered during classification.
273    Failed(OperationResult),
274}
275
276/// Creates a typed error [`OperationResult`] for the given operation context.
277fn error_result(ctx: OperationContext, error: Error) -> OperationResult {
278    let key = ctx.key().unwrap_or("<unknown>").to_owned();
279    match ctx {
280        OperationContext::Get { .. } => OperationResult::Get(key, Err(error)),
281        OperationContext::Insert { .. } => OperationResult::Put(key, Err(error)),
282        OperationContext::Delete { .. } => OperationResult::Delete(key, Err(error)),
283        OperationContext::Head { .. } => OperationResult::Head(key, Err(error)),
284    }
285}
286
287impl OperationResult {
288    async fn from_field(
289        field: Field<'_>,
290        context_map: &HashMap<usize, OperationContext>,
291    ) -> (Option<usize>, Self) {
292        match Self::try_from_field(field, context_map).await {
293            Ok((index, result)) => (Some(index), result),
294            Err(e) => (None, OperationResult::Error(e)),
295        }
296    }
297
298    async fn try_from_field(
299        field: Field<'_>,
300        context_map: &HashMap<usize, OperationContext>,
301    ) -> Result<(usize, Self), Error> {
302        let mut headers = field.headers().clone();
303
304        let index: usize = headers
305            .remove(HEADER_BATCH_OPERATION_INDEX)
306            .and_then(|v| v.to_str().ok().and_then(|s| s.parse().ok()))
307            .ok_or_else(|| {
308                Error::MalformedResponse(format!(
309                    "missing or invalid {HEADER_BATCH_OPERATION_INDEX} header"
310                ))
311            })?;
312
313        let status: u16 = headers
314            .remove(HEADER_BATCH_OPERATION_STATUS)
315            .and_then(|v| {
316                v.to_str().ok().and_then(|s| {
317                    // Status header format is "code reason" (e.g., "200 OK")
318                    // Split on first space and parse the code
319                    s.split_once(' ')
320                        .map(|(code, _)| code)
321                        .unwrap_or(s)
322                        .parse()
323                        .ok()
324                })
325            })
326            .ok_or_else(|| {
327                Error::MalformedResponse(format!(
328                    "missing or invalid {HEADER_BATCH_OPERATION_STATUS} header"
329                ))
330            })?;
331
332        let ctx = context_map.get(&index).ok_or_else(|| {
333            Error::MalformedResponse(format!(
334                "response references unknown operation index {index}"
335            ))
336        })?;
337
338        // Prioritize the server-provided key, fall back to the one from context.
339        let key = headers
340            .remove(HEADER_BATCH_OPERATION_KEY)
341            .and_then(|v| {
342                v.to_str()
343                    .ok()
344                    .and_then(|encoded| {
345                        percent_encoding::percent_decode_str(encoded)
346                            .decode_utf8()
347                            .ok()
348                    })
349                    .map(|s| s.into_owned())
350            })
351            .or_else(|| ctx.key().map(str::to_owned));
352
353        let body = field.bytes().await?;
354
355        let is_error = status >= 400
356            && !(matches!(
357                ctx,
358                OperationContext::Get { .. } | OperationContext::Head { .. }
359            ) && status == 404);
360
361        // For error responses, the key may be absent (e.g., server-generated key inserts
362        // that fail before execution — the server never generated a key and the client
363        // never provided one). Use a sentinel fallback since there is no key to report.
364        // For success responses, the key is always required.
365        let key = match key {
366            Some(key) => key,
367            None if is_error => "<unknown>".to_owned(),
368            None => {
369                return Err(Error::MalformedResponse(format!(
370                    "missing or invalid {HEADER_BATCH_OPERATION_KEY} header"
371                )));
372            }
373        };
374        if is_error {
375            let message = String::from_utf8_lossy(&body).into_owned();
376            let error = Error::OperationFailure { status, message };
377
378            return Ok((
379                index,
380                match ctx {
381                    OperationContext::Get { .. } => OperationResult::Get(key, Err(error)),
382                    OperationContext::Insert { .. } => OperationResult::Put(key, Err(error)),
383                    OperationContext::Delete { .. } => OperationResult::Delete(key, Err(error)),
384                    OperationContext::Head { .. } => OperationResult::Head(key, Err(error)),
385                },
386            ));
387        }
388
389        let result = match ctx {
390            OperationContext::Get {
391                decompress,
392                accept_encoding,
393                ..
394            } => {
395                if status == 404 {
396                    OperationResult::Get(key, Ok(None))
397                } else {
398                    let mut metadata = Metadata::from_headers(&headers, "")?;
399
400                    let stream =
401                        futures_util::stream::once(async move { Ok::<_, io::Error>(body) }).boxed();
402                    let stream =
403                        get::maybe_decompress(stream, &mut metadata, *decompress, accept_encoding);
404
405                    OperationResult::Get(key, Ok(Some(GetResponse { metadata, stream })))
406                }
407            }
408            OperationContext::Insert { .. } => {
409                OperationResult::Put(key.clone(), Ok(PutResponse { key }))
410            }
411            OperationContext::Delete { .. } => OperationResult::Delete(key, Ok(())),
412            OperationContext::Head { .. } => {
413                if status == 404 {
414                    OperationResult::Head(key, Ok(None))
415                } else {
416                    let metadata = Metadata::from_headers(&headers, "")?;
417                    OperationResult::Head(key, Ok(Some(metadata)))
418                }
419            }
420        };
421        Ok((index, result))
422    }
423}
424
425/// Container for the results of all operations in a many request.
426pub struct OperationResults(Pin<Box<dyn Stream<Item = OperationResult> + Send>>);
427
428impl fmt::Debug for OperationResults {
429    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430        f.write_str("OperationResults([Stream])")
431    }
432}
433
434impl Stream for OperationResults {
435    type Item = OperationResult;
436
437    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
438        self.0.as_mut().poll_next(cx)
439    }
440}
441
442impl OperationResults {
443    /// Drains the stream and collects any per-operation errors.
444    ///
445    /// Returns an error containing an iterator of all individual errors for the operations
446    /// that failed, if any.
447    pub async fn error_for_failures(mut self) -> crate::Result<(), impl Iterator<Item = Error>> {
448        let mut errs = Vec::new();
449        while let Some(res) = self.next().await {
450            match res {
451                OperationResult::Get(_, get) => {
452                    if let Err(e) = get {
453                        errs.push(e);
454                    }
455                }
456                OperationResult::Put(_, put) => {
457                    if let Err(e) = put {
458                        errs.push(e);
459                    }
460                }
461                OperationResult::Delete(_, delete) => {
462                    if let Err(e) = delete {
463                        errs.push(e);
464                    }
465                }
466                OperationResult::Head(_, head) => {
467                    if let Err(e) = head {
468                        errs.push(e);
469                    }
470                }
471                OperationResult::Error(error) => errs.push(error),
472            }
473        }
474        if errs.is_empty() {
475            return Ok(());
476        }
477        Err(errs.into_iter())
478    }
479}
480
481async fn send_batch(
482    session: &Session,
483    operations: Vec<BatchOperation>,
484) -> crate::Result<Vec<OperationResult>> {
485    let mut context_map: HashMap<usize, OperationContext> = operations
486        .iter()
487        .enumerate()
488        .map(|(idx, op)| (idx, OperationContext::from(op)))
489        .collect();
490    let num_operations = operations.len();
491
492    let mut form = reqwest::multipart::Form::new();
493    for op in operations {
494        let part = op.into_part().await?;
495        form = form.part("part", part);
496    }
497
498    let request = session.batch_request()?.multipart(form);
499    let response = request.send().await?.error_for_status()?;
500
501    let boundary = response
502        .headers()
503        .get(CONTENT_TYPE)
504        .and_then(|v| v.to_str().ok())
505        .ok_or_else(|| Error::MalformedResponse("missing Content-Type header".to_owned()))
506        .map(multer::parse_boundary)??;
507
508    let byte_stream = response.bytes_stream().map(|r| r.map_err(io::Error::other));
509    let mut multipart = multer::Multipart::new(byte_stream, boundary);
510
511    let mut results = Vec::new();
512    let mut seen_indices = HashSet::new();
513    while let Some(field) = multipart.next_field().await? {
514        let (index, result) = OperationResult::from_field(field, &context_map).await;
515        if let Some(idx) = index {
516            seen_indices.insert(idx);
517        }
518        results.push(result);
519    }
520
521    for idx in 0..num_operations {
522        if !seen_indices.contains(&idx) {
523            let error = Error::MalformedResponse(format!(
524                "server did not return a response for operation at index {idx}"
525            ));
526            let result = match context_map.remove(&idx) {
527                Some(ctx) => error_result(ctx, error),
528                None => OperationResult::Error(error),
529            };
530            results.push(result);
531        }
532    }
533
534    Ok(results)
535}
536
537fn classify_fail(key: Option<ObjectKey>, error: Error) -> Classified {
538    Classified::Failed(OperationResult::Put(
539        key.unwrap_or_else(|| "<unknown>".to_owned()),
540        Err(error),
541    ))
542}
543
544/// Classifies a single operation for batch processing.
545///
546/// Insert operations whose body exceeds [`MAX_BATCH_PART_SIZE`] are marked as
547/// [`Classified::Individual`]. Everything else is [`Classified::Batchable`].
548async fn classify(op: BatchOperation) -> Classified {
549    match op {
550        BatchOperation::Insert {
551            key,
552            metadata,
553            body,
554        } => {
555            let size = match &body {
556                PutBody::Buffer(bytes) => Some(bytes.len() as u64),
557                PutBody::File(file) => match file.metadata().await {
558                    Ok(meta) => Some(meta.len()),
559                    Err(err) => return classify_fail(key, err.into()),
560                },
561                PutBody::Path(path) => match tokio::fs::metadata(path).await {
562                    Ok(meta) => Some(meta.len()),
563                    Err(err) => return classify_fail(key, err.into()),
564                },
565                // Streams have unknown size and must not go through the batch endpoint.
566                PutBody::Stream(_) => None,
567            };
568
569            let size = match (metadata.compression, size) {
570                (Some(Compression::Zstd), Some(size)) => {
571                    usize::try_from(size).ok().map(zstd_safe::compress_bound)
572                }
573                (None, Some(size)) => usize::try_from(size).ok(),
574                (_, None) => None,
575            };
576
577            let op = BatchOperation::Insert {
578                key,
579                metadata,
580                body,
581            };
582
583            match size {
584                Some(s) if s <= MAX_BATCH_PART_SIZE as usize => Classified::Batchable(op, s as u64),
585                _ => Classified::Individual(op),
586            }
587        }
588        other => Classified::Batchable(other, 0),
589    }
590}
591
592/// Classifies all operations, partitioning them into batchable, individual, and failed.
593///
594/// Classification is parallelized since it may involve FS I/O (e.g., stat calls).
595async fn partition(
596    operations: Vec<BatchOperation>,
597) -> (
598    Vec<(BatchOperation, u64)>,
599    Vec<BatchOperation>,
600    Vec<OperationResult>,
601) {
602    let classified = futures_util::future::join_all(operations.into_iter().map(classify)).await;
603    let mut batchable = Vec::new();
604    let mut individual = Vec::new();
605    let mut failed = Vec::new();
606    for item in classified {
607        match item {
608            Classified::Batchable(op, size) => batchable.push((op, size)),
609            Classified::Individual(op) => individual.push(op),
610            Classified::Failed(result) => failed.push(result),
611        }
612    }
613    (batchable, individual, failed)
614}
615
616/// Executes a single operation as an individual (non-batch) request.
617async fn execute_individual(op: BatchOperation, session: &Session) -> OperationResult {
618    match op {
619        BatchOperation::Get {
620            key,
621            decompress,
622            accept_encoding,
623        } => {
624            let get = GetBuilder {
625                session: session.clone(),
626                key: key.clone(),
627                decompress,
628                accept_encoding,
629            };
630            OperationResult::Get(key, get.send().await)
631        }
632        BatchOperation::Insert {
633            key,
634            metadata,
635            body,
636        } => {
637            let error_key = key.clone().unwrap_or_else(|| "<unknown>".to_owned());
638            let put = PutBuilder {
639                session: session.clone(),
640                metadata,
641                key,
642                body,
643            };
644            match put.send().await {
645                Ok(response) => OperationResult::Put(response.key.clone(), Ok(response)),
646                Err(err) => OperationResult::Put(error_key, Err(err)),
647            }
648        }
649        BatchOperation::Delete { key } => {
650            let delete = DeleteBuilder {
651                session: session.clone(),
652                key: key.clone(),
653            };
654            OperationResult::Delete(key, delete.send().await)
655        }
656        BatchOperation::Head { key } => {
657            let head = HeadBuilder {
658                session: session.clone(),
659                key: key.clone(),
660            };
661            OperationResult::Head(key, head.send().await)
662        }
663    }
664}
665
666/// Sends a chunk of operations as a single batch request.
667///
668/// On batch-level failure, produces per-operation error results.
669async fn execute_batch(operations: Vec<BatchOperation>, session: &Session) -> Vec<OperationResult> {
670    let contexts: Vec<_> = operations.iter().map(OperationContext::from).collect();
671    match send_batch(session, operations).await {
672        Ok(results) => results,
673        Err(e) => {
674            let shared = Arc::new(e);
675            contexts
676                .into_iter()
677                .map(|ctx| error_result(ctx, Error::Batch(shared.clone())))
678                .collect()
679        }
680    }
681}
682
683/// Returns a lazy iterator over batches of operations.
684///
685/// Each batch respects both the operation-count limit ([`MAX_BATCH_OPS`]) and the total body-size
686/// limit ([`MAX_BATCH_BODY_SIZE`]).
687fn iter_batches(ops: Vec<(BatchOperation, u64)>) -> impl Iterator<Item = Vec<BatchOperation>> {
688    let mut remaining = ops.into_iter().peekable();
689
690    std::iter::from_fn(move || {
691        remaining.peek()?;
692        let mut batch_size = 0;
693        let mut batch = Vec::new();
694
695        while let Some((_, op_size)) = remaining.peek() {
696            if batch.len() >= MAX_BATCH_OPS
697                || (!batch.is_empty() && batch_size + op_size > MAX_BATCH_BODY_SIZE)
698            {
699                break;
700            }
701
702            let (op, op_size) = remaining.next().expect("peeked above");
703            batch_size += op_size;
704            batch.push(op);
705        }
706
707        Some(batch)
708    })
709}
710
711impl ManyBuilder {
712    /// Consumes this builder, returning a lazy stream over all the enqueued operations' results.
713    ///
714    /// The results are not guaranteed to be in the order they were originally enqueued in.
715    pub async fn send(self) -> OperationResults {
716        let session = self.session;
717        let individual_concurrency = self
718            .max_individual_concurrency
719            .unwrap_or(DEFAULT_INDIVIDUAL_CONCURRENCY)
720            .max(1);
721        let batch_concurrency = self
722            .max_batch_concurrency
723            .unwrap_or(DEFAULT_BATCH_CONCURRENCY)
724            .max(1);
725
726        // Classify all operations
727        let (batchable, individual, failed) = partition(self.operations).await;
728
729        // Execute individual requests for items that are too large, concurrently
730        let individual_results = futures_util::stream::iter(individual)
731            .map({
732                let session = session.clone();
733                move |op| {
734                    let session = session.clone();
735                    async move { execute_individual(op, &session).await }
736                }
737            })
738            .buffer_unordered(individual_concurrency);
739
740        // Chunk batchable operations and execute as batch requests, concurrently
741        let batch_results = futures_util::stream::iter(iter_batches(batchable))
742            .map(move |chunk| {
743                let session = session.clone();
744                async move { execute_batch(chunk, &session).await }
745            })
746            .buffer_unordered(batch_concurrency)
747            .flat_map(futures_util::stream::iter);
748
749        let results = futures_util::stream::iter(failed)
750            .chain(individual_results)
751            .chain(batch_results);
752
753        OperationResults(results.boxed())
754    }
755
756    /// Sets the maximum number of concurrent individual (non-batch) requests.
757    ///
758    /// Operations that exceed the per-part size limit are sent as individual requests.
759    /// This controls how many such requests can be in-flight simultaneously.
760    /// Defaults to 5 if not set.
761    pub fn max_individual_concurrency(mut self, concurrency: usize) -> Self {
762        self.max_individual_concurrency = Some(concurrency);
763        self
764    }
765
766    /// Sets the maximum number of concurrent batch requests.
767    ///
768    /// Batchable operations are grouped into chunks and sent as multipart batch requests.
769    /// This controls how many such batch requests can be in-flight simultaneously.
770    /// Defaults to 3 if not set.
771    pub fn max_batch_concurrency(mut self, concurrency: usize) -> Self {
772        self.max_batch_concurrency = Some(concurrency);
773        self
774    }
775
776    /// Enqueues an operation.
777    ///
778    /// This method takes a [`GetBuilder`]/[`PutBuilder`]/[`DeleteBuilder`], which you can
779    /// construct using [`Session::get`]/[`Session::put`]/[`Session::delete`].
780    ///
781    /// **Important**: All pushed builders must originate from the same [`Session`] that was used
782    /// to create this [`ManyBuilder`]. Mixing builders from different sessions is not supported
783    /// and will result in all operations being executed against this [`ManyBuilder`]'s session,
784    /// silently ignoring the original builder's session.
785    #[allow(private_bounds)]
786    pub fn push<B: Into<BatchOperation>>(mut self, builder: B) -> Self {
787        self.operations.push(builder.into());
788        self
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795
796    /// Creates a dummy sized op for use in `iter_batches` tests.
797    fn op(size: u64) -> (BatchOperation, u64) {
798        (
799            BatchOperation::Delete {
800                key: "k".to_owned(),
801            },
802            size,
803        )
804    }
805
806    fn batch_sizes(batches: &[Vec<BatchOperation>]) -> Vec<usize> {
807        batches.iter().map(Vec::len).collect()
808    }
809
810    fn batches(ops: Vec<(BatchOperation, u64)>) -> Vec<Vec<BatchOperation>> {
811        iter_batches(ops).collect()
812    }
813
814    fn put_with_zstd(size: usize) -> BatchOperation {
815        BatchOperation::Insert {
816            key: Some("k".to_owned()),
817            metadata: Metadata {
818                compression: Some(Compression::Zstd),
819                ..Default::default()
820            },
821            body: PutBody::Buffer(vec![0; size].into()),
822        }
823    }
824
825    #[tokio::test]
826    async fn zstd_put_at_limit_is_batchable() {
827        let size = 1_044_496;
828        let post_compression = zstd_safe::compress_bound(size);
829        assert!(post_compression == MAX_BATCH_PART_SIZE as usize);
830
831        core::assert_matches!(
832            classify(put_with_zstd(size)).await,
833            Classified::Batchable(_, s) if s == post_compression as u64
834        );
835    }
836
837    #[tokio::test]
838    async fn zstd_put_above_limit_is_individual() {
839        let size = 1_044_497;
840        let post_compression = zstd_safe::compress_bound(size);
841        assert!(post_compression > MAX_BATCH_PART_SIZE as usize);
842
843        core::assert_matches!(
844            classify(put_with_zstd(size)).await,
845            Classified::Individual(_)
846        );
847    }
848
849    #[test]
850    fn iter_batches_empty() {
851        assert!(batches(vec![]).is_empty());
852    }
853
854    #[test]
855    fn iter_batches_single_batch_count_limit() {
856        // 1000 tiny ops → exactly one batch
857        let ops: Vec<_> = (0..1000).map(|_| op(1)).collect();
858        assert_eq!(batch_sizes(&batches(ops)), vec![1000]);
859    }
860
861    #[test]
862    fn iter_batches_splits_on_count_limit() {
863        // 1001 tiny ops → two batches: 1000 + 1
864        let ops: Vec<_> = (0..1001).map(|_| op(1)).collect();
865        assert_eq!(batch_sizes(&batches(ops)), vec![1000, 1]);
866    }
867
868    #[test]
869    fn iter_batches_exactly_at_size_limit() {
870        // 100 ops of 1 MB each = exactly 100 MB → one batch
871        let ops: Vec<_> = (0..100).map(|_| op(1024 * 1024)).collect();
872        assert_eq!(batch_sizes(&batches(ops)), vec![100]);
873    }
874
875    #[test]
876    fn iter_batches_splits_on_size_limit() {
877        // 101 ops of 1 MB each = 101 MB → two batches: 100 + 1
878        let ops: Vec<_> = (0..101).map(|_| op(1024 * 1024)).collect();
879        assert_eq!(batch_sizes(&batches(ops)), vec![100, 1]);
880    }
881
882    #[test]
883    fn iter_batches_size_limit_hits_before_count_limit() {
884        // 200 ops of ~600 KB each → size limit triggers before the 1000-op count limit
885        let op_size = 600 * 1024;
886        let ops: Vec<_> = (0..200).map(|_| op(op_size)).collect();
887        let result = batches(ops);
888        // Each batch holds floor(100 MB / 600 KB) ops
889        let per_batch = (MAX_BATCH_BODY_SIZE / op_size) as usize;
890        assert!(result.len() > 1, "expected multiple batches");
891        for batch in &result[..result.len() - 1] {
892            assert_eq!(batch.len(), per_batch);
893        }
894    }
895}