1use std::collections::{HashMap, HashSet};
2use std::fmt;
3use std::io;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures_util::{Stream, StreamExt as _};
9use multer::Field;
10use objectstore_types::metadata::{Compression, Metadata};
11use percent_encoding::NON_ALPHANUMERIC;
12use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
13use reqwest::multipart::Part;
14
15use crate::error::Error;
16use crate::put::PutBody;
17use crate::{
18 DeleteBuilder, DeleteResponse, GetBuilder, GetResponse, HeadBuilder, HeadResponse, ObjectKey,
19 PutBuilder, PutResponse, Session, get, put,
20};
21
22const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key";
23const HEADER_BATCH_OPERATION_KIND: &str = "x-sn-batch-operation-kind";
24const HEADER_BATCH_OPERATION_INDEX: &str = "x-sn-batch-operation-index";
25const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status";
26
27const MAX_BATCH_OPS: usize = 1000;
29
30const MAX_BATCH_PART_SIZE: u32 = 1024 * 1024; const DEFAULT_INDIVIDUAL_CONCURRENCY: usize = 5;
37
38const DEFAULT_BATCH_CONCURRENCY: usize = 3;
42
43const MAX_BATCH_BODY_SIZE: u64 = 100 * 1024 * 1024; #[derive(Debug)]
51pub struct ManyBuilder {
52 session: Session,
53 operations: Vec<BatchOperation>,
54 max_individual_concurrency: Option<usize>,
55 max_batch_concurrency: Option<usize>,
56}
57
58impl Session {
59 pub fn many(&self) -> ManyBuilder {
64 ManyBuilder {
65 session: self.clone(),
66 operations: vec![],
67 max_individual_concurrency: None,
68 max_batch_concurrency: None,
69 }
70 }
71}
72
73#[derive(Debug)]
74#[allow(clippy::large_enum_variant)]
75enum BatchOperation {
76 Get {
77 key: ObjectKey,
78 decompress: bool,
79 accept_encoding: Vec<Compression>,
80 },
81 Insert {
82 key: Option<ObjectKey>,
83 metadata: Metadata,
84 body: PutBody,
85 },
86 Delete {
87 key: ObjectKey,
88 },
89 Head {
90 key: ObjectKey,
91 },
92}
93
94impl From<GetBuilder> for BatchOperation {
95 fn from(value: GetBuilder) -> Self {
96 let GetBuilder {
97 key,
98 decompress,
99 accept_encoding,
100 session: _session,
101 } = value;
102 BatchOperation::Get {
103 key,
104 decompress,
105 accept_encoding,
106 }
107 }
108}
109
110impl From<PutBuilder> for BatchOperation {
111 fn from(value: PutBuilder) -> Self {
112 let PutBuilder {
113 key,
114 metadata,
115 body,
116 session: _session,
117 } = value;
118 BatchOperation::Insert {
119 key,
120 metadata,
121 body,
122 }
123 }
124}
125
126impl From<DeleteBuilder> for BatchOperation {
127 fn from(value: DeleteBuilder) -> Self {
128 let DeleteBuilder {
129 key,
130 session: _session,
131 } = value;
132 BatchOperation::Delete { key }
133 }
134}
135
136impl From<HeadBuilder> for BatchOperation {
137 fn from(value: HeadBuilder) -> Self {
138 let HeadBuilder {
139 key,
140 session: _session,
141 } = value;
142 BatchOperation::Head { key }
143 }
144}
145
146impl BatchOperation {
147 async fn into_part(self) -> crate::Result<Part> {
148 match self {
149 BatchOperation::Get { key, .. } => {
150 let headers = operation_headers("get", Some(&key));
151 Ok(Part::text("").headers(headers))
152 }
153 BatchOperation::Insert {
154 key,
155 metadata,
156 body,
157 } => {
158 let mut headers = operation_headers("insert", key.as_deref());
159 headers.extend(metadata.to_headers("")?);
160
161 let body = put::maybe_compress(body, metadata.compression).await?;
162 Ok(Part::stream(body).headers(headers))
163 }
164 BatchOperation::Delete { key } => {
165 let headers = operation_headers("delete", Some(&key));
166 Ok(Part::text("").headers(headers))
167 }
168 BatchOperation::Head { key } => {
169 let headers = operation_headers("head", Some(&key));
170 Ok(Part::text("").headers(headers))
171 }
172 }
173 }
174}
175
176fn operation_headers(operation: &str, key: Option<&str>) -> HeaderMap {
177 let mut headers = HeaderMap::new();
178 headers.insert(
179 HeaderName::from_static(HEADER_BATCH_OPERATION_KIND),
180 HeaderValue::from_str(operation).expect("operation kind is always a valid header value"),
181 );
182 if let Some(key) = key {
183 let encoded =
184 percent_encoding::percent_encode(key.as_bytes(), NON_ALPHANUMERIC).to_string();
185 headers.insert(
186 HeaderName::from_static(HEADER_BATCH_OPERATION_KEY),
187 HeaderValue::try_from(encoded)
188 .expect("percent-encoded string is always a valid header value"),
189 );
190 }
191 headers
192}
193
194#[derive(Debug)]
196pub enum OperationResult {
197 Get(ObjectKey, Result<Option<GetResponse>, Error>),
201 Put(ObjectKey, Result<PutResponse, Error>),
203 Delete(ObjectKey, Result<DeleteResponse, Error>),
205 Head(ObjectKey, Result<HeadResponse, Error>),
209 Error(Error),
215}
216
217enum OperationContext {
219 Get {
220 key: ObjectKey,
221 decompress: bool,
222 accept_encoding: Vec<Compression>,
223 },
224 Insert {
225 key: Option<ObjectKey>,
226 },
227 Delete {
228 key: ObjectKey,
229 },
230 Head {
231 key: ObjectKey,
232 },
233}
234
235impl From<&BatchOperation> for OperationContext {
236 fn from(op: &BatchOperation) -> Self {
237 match op {
238 BatchOperation::Get {
239 key,
240 decompress,
241 accept_encoding,
242 } => OperationContext::Get {
243 key: key.clone(),
244 decompress: *decompress,
245 accept_encoding: accept_encoding.clone(),
246 },
247 BatchOperation::Insert { key, .. } => OperationContext::Insert { key: key.clone() },
248 BatchOperation::Delete { key } => OperationContext::Delete { key: key.clone() },
249 BatchOperation::Head { key } => OperationContext::Head { key: key.clone() },
250 }
251 }
252}
253
254impl OperationContext {
255 fn key(&self) -> Option<&str> {
256 match self {
257 OperationContext::Get { key, .. }
258 | OperationContext::Delete { key }
259 | OperationContext::Head { key } => Some(key),
260 OperationContext::Insert { key } => key.as_deref(),
261 }
262 }
263}
264
265#[derive(Debug)]
267enum Classified {
268 Batchable(BatchOperation, u64),
270 Individual(BatchOperation),
272 Failed(OperationResult),
274}
275
276fn error_result(ctx: OperationContext, error: Error) -> OperationResult {
278 let key = ctx.key().unwrap_or("<unknown>").to_owned();
279 match ctx {
280 OperationContext::Get { .. } => OperationResult::Get(key, Err(error)),
281 OperationContext::Insert { .. } => OperationResult::Put(key, Err(error)),
282 OperationContext::Delete { .. } => OperationResult::Delete(key, Err(error)),
283 OperationContext::Head { .. } => OperationResult::Head(key, Err(error)),
284 }
285}
286
287impl OperationResult {
288 async fn from_field(
289 field: Field<'_>,
290 context_map: &HashMap<usize, OperationContext>,
291 ) -> (Option<usize>, Self) {
292 match Self::try_from_field(field, context_map).await {
293 Ok((index, result)) => (Some(index), result),
294 Err(e) => (None, OperationResult::Error(e)),
295 }
296 }
297
298 async fn try_from_field(
299 field: Field<'_>,
300 context_map: &HashMap<usize, OperationContext>,
301 ) -> Result<(usize, Self), Error> {
302 let mut headers = field.headers().clone();
303
304 let index: usize = headers
305 .remove(HEADER_BATCH_OPERATION_INDEX)
306 .and_then(|v| v.to_str().ok().and_then(|s| s.parse().ok()))
307 .ok_or_else(|| {
308 Error::MalformedResponse(format!(
309 "missing or invalid {HEADER_BATCH_OPERATION_INDEX} header"
310 ))
311 })?;
312
313 let status: u16 = headers
314 .remove(HEADER_BATCH_OPERATION_STATUS)
315 .and_then(|v| {
316 v.to_str().ok().and_then(|s| {
317 s.split_once(' ')
320 .map(|(code, _)| code)
321 .unwrap_or(s)
322 .parse()
323 .ok()
324 })
325 })
326 .ok_or_else(|| {
327 Error::MalformedResponse(format!(
328 "missing or invalid {HEADER_BATCH_OPERATION_STATUS} header"
329 ))
330 })?;
331
332 let ctx = context_map.get(&index).ok_or_else(|| {
333 Error::MalformedResponse(format!(
334 "response references unknown operation index {index}"
335 ))
336 })?;
337
338 let key = headers
340 .remove(HEADER_BATCH_OPERATION_KEY)
341 .and_then(|v| {
342 v.to_str()
343 .ok()
344 .and_then(|encoded| {
345 percent_encoding::percent_decode_str(encoded)
346 .decode_utf8()
347 .ok()
348 })
349 .map(|s| s.into_owned())
350 })
351 .or_else(|| ctx.key().map(str::to_owned));
352
353 let body = field.bytes().await?;
354
355 let is_error = status >= 400
356 && !(matches!(
357 ctx,
358 OperationContext::Get { .. } | OperationContext::Head { .. }
359 ) && status == 404);
360
361 let key = match key {
366 Some(key) => key,
367 None if is_error => "<unknown>".to_owned(),
368 None => {
369 return Err(Error::MalformedResponse(format!(
370 "missing or invalid {HEADER_BATCH_OPERATION_KEY} header"
371 )));
372 }
373 };
374 if is_error {
375 let message = String::from_utf8_lossy(&body).into_owned();
376 let error = Error::OperationFailure { status, message };
377
378 return Ok((
379 index,
380 match ctx {
381 OperationContext::Get { .. } => OperationResult::Get(key, Err(error)),
382 OperationContext::Insert { .. } => OperationResult::Put(key, Err(error)),
383 OperationContext::Delete { .. } => OperationResult::Delete(key, Err(error)),
384 OperationContext::Head { .. } => OperationResult::Head(key, Err(error)),
385 },
386 ));
387 }
388
389 let result = match ctx {
390 OperationContext::Get {
391 decompress,
392 accept_encoding,
393 ..
394 } => {
395 if status == 404 {
396 OperationResult::Get(key, Ok(None))
397 } else {
398 let mut metadata = Metadata::from_headers(&headers, "")?;
399
400 let stream =
401 futures_util::stream::once(async move { Ok::<_, io::Error>(body) }).boxed();
402 let stream =
403 get::maybe_decompress(stream, &mut metadata, *decompress, accept_encoding);
404
405 OperationResult::Get(key, Ok(Some(GetResponse { metadata, stream })))
406 }
407 }
408 OperationContext::Insert { .. } => {
409 OperationResult::Put(key.clone(), Ok(PutResponse { key }))
410 }
411 OperationContext::Delete { .. } => OperationResult::Delete(key, Ok(())),
412 OperationContext::Head { .. } => {
413 if status == 404 {
414 OperationResult::Head(key, Ok(None))
415 } else {
416 let metadata = Metadata::from_headers(&headers, "")?;
417 OperationResult::Head(key, Ok(Some(metadata)))
418 }
419 }
420 };
421 Ok((index, result))
422 }
423}
424
425pub struct OperationResults(Pin<Box<dyn Stream<Item = OperationResult> + Send>>);
427
428impl fmt::Debug for OperationResults {
429 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430 f.write_str("OperationResults([Stream])")
431 }
432}
433
434impl Stream for OperationResults {
435 type Item = OperationResult;
436
437 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
438 self.0.as_mut().poll_next(cx)
439 }
440}
441
442impl OperationResults {
443 pub async fn error_for_failures(mut self) -> crate::Result<(), impl Iterator<Item = Error>> {
448 let mut errs = Vec::new();
449 while let Some(res) = self.next().await {
450 match res {
451 OperationResult::Get(_, get) => {
452 if let Err(e) = get {
453 errs.push(e);
454 }
455 }
456 OperationResult::Put(_, put) => {
457 if let Err(e) = put {
458 errs.push(e);
459 }
460 }
461 OperationResult::Delete(_, delete) => {
462 if let Err(e) = delete {
463 errs.push(e);
464 }
465 }
466 OperationResult::Head(_, head) => {
467 if let Err(e) = head {
468 errs.push(e);
469 }
470 }
471 OperationResult::Error(error) => errs.push(error),
472 }
473 }
474 if errs.is_empty() {
475 return Ok(());
476 }
477 Err(errs.into_iter())
478 }
479}
480
481async fn send_batch(
482 session: &Session,
483 operations: Vec<BatchOperation>,
484) -> crate::Result<Vec<OperationResult>> {
485 let mut context_map: HashMap<usize, OperationContext> = operations
486 .iter()
487 .enumerate()
488 .map(|(idx, op)| (idx, OperationContext::from(op)))
489 .collect();
490 let num_operations = operations.len();
491
492 let mut form = reqwest::multipart::Form::new();
493 for op in operations {
494 let part = op.into_part().await?;
495 form = form.part("part", part);
496 }
497
498 let request = session.batch_request()?.multipart(form);
499 let response = request.send().await?.error_for_status()?;
500
501 let boundary = response
502 .headers()
503 .get(CONTENT_TYPE)
504 .and_then(|v| v.to_str().ok())
505 .ok_or_else(|| Error::MalformedResponse("missing Content-Type header".to_owned()))
506 .map(multer::parse_boundary)??;
507
508 let byte_stream = response.bytes_stream().map(|r| r.map_err(io::Error::other));
509 let mut multipart = multer::Multipart::new(byte_stream, boundary);
510
511 let mut results = Vec::new();
512 let mut seen_indices = HashSet::new();
513 while let Some(field) = multipart.next_field().await? {
514 let (index, result) = OperationResult::from_field(field, &context_map).await;
515 if let Some(idx) = index {
516 seen_indices.insert(idx);
517 }
518 results.push(result);
519 }
520
521 for idx in 0..num_operations {
522 if !seen_indices.contains(&idx) {
523 let error = Error::MalformedResponse(format!(
524 "server did not return a response for operation at index {idx}"
525 ));
526 let result = match context_map.remove(&idx) {
527 Some(ctx) => error_result(ctx, error),
528 None => OperationResult::Error(error),
529 };
530 results.push(result);
531 }
532 }
533
534 Ok(results)
535}
536
537fn classify_fail(key: Option<ObjectKey>, error: Error) -> Classified {
538 Classified::Failed(OperationResult::Put(
539 key.unwrap_or_else(|| "<unknown>".to_owned()),
540 Err(error),
541 ))
542}
543
544async fn classify(op: BatchOperation) -> Classified {
549 match op {
550 BatchOperation::Insert {
551 key,
552 metadata,
553 body,
554 } => {
555 let size = match &body {
556 PutBody::Buffer(bytes) => Some(bytes.len() as u64),
557 PutBody::File(file) => match file.metadata().await {
558 Ok(meta) => Some(meta.len()),
559 Err(err) => return classify_fail(key, err.into()),
560 },
561 PutBody::Path(path) => match tokio::fs::metadata(path).await {
562 Ok(meta) => Some(meta.len()),
563 Err(err) => return classify_fail(key, err.into()),
564 },
565 PutBody::Stream(_) => None,
567 };
568
569 let size = match (metadata.compression, size) {
570 (Some(Compression::Zstd), Some(size)) => {
571 usize::try_from(size).ok().map(zstd_safe::compress_bound)
572 }
573 (None, Some(size)) => usize::try_from(size).ok(),
574 (_, None) => None,
575 };
576
577 let op = BatchOperation::Insert {
578 key,
579 metadata,
580 body,
581 };
582
583 match size {
584 Some(s) if s <= MAX_BATCH_PART_SIZE as usize => Classified::Batchable(op, s as u64),
585 _ => Classified::Individual(op),
586 }
587 }
588 other => Classified::Batchable(other, 0),
589 }
590}
591
592async fn partition(
596 operations: Vec<BatchOperation>,
597) -> (
598 Vec<(BatchOperation, u64)>,
599 Vec<BatchOperation>,
600 Vec<OperationResult>,
601) {
602 let classified = futures_util::future::join_all(operations.into_iter().map(classify)).await;
603 let mut batchable = Vec::new();
604 let mut individual = Vec::new();
605 let mut failed = Vec::new();
606 for item in classified {
607 match item {
608 Classified::Batchable(op, size) => batchable.push((op, size)),
609 Classified::Individual(op) => individual.push(op),
610 Classified::Failed(result) => failed.push(result),
611 }
612 }
613 (batchable, individual, failed)
614}
615
616async fn execute_individual(op: BatchOperation, session: &Session) -> OperationResult {
618 match op {
619 BatchOperation::Get {
620 key,
621 decompress,
622 accept_encoding,
623 } => {
624 let get = GetBuilder {
625 session: session.clone(),
626 key: key.clone(),
627 decompress,
628 accept_encoding,
629 };
630 OperationResult::Get(key, get.send().await)
631 }
632 BatchOperation::Insert {
633 key,
634 metadata,
635 body,
636 } => {
637 let error_key = key.clone().unwrap_or_else(|| "<unknown>".to_owned());
638 let put = PutBuilder {
639 session: session.clone(),
640 metadata,
641 key,
642 body,
643 };
644 match put.send().await {
645 Ok(response) => OperationResult::Put(response.key.clone(), Ok(response)),
646 Err(err) => OperationResult::Put(error_key, Err(err)),
647 }
648 }
649 BatchOperation::Delete { key } => {
650 let delete = DeleteBuilder {
651 session: session.clone(),
652 key: key.clone(),
653 };
654 OperationResult::Delete(key, delete.send().await)
655 }
656 BatchOperation::Head { key } => {
657 let head = HeadBuilder {
658 session: session.clone(),
659 key: key.clone(),
660 };
661 OperationResult::Head(key, head.send().await)
662 }
663 }
664}
665
666async fn execute_batch(operations: Vec<BatchOperation>, session: &Session) -> Vec<OperationResult> {
670 let contexts: Vec<_> = operations.iter().map(OperationContext::from).collect();
671 match send_batch(session, operations).await {
672 Ok(results) => results,
673 Err(e) => {
674 let shared = Arc::new(e);
675 contexts
676 .into_iter()
677 .map(|ctx| error_result(ctx, Error::Batch(shared.clone())))
678 .collect()
679 }
680 }
681}
682
683fn iter_batches(ops: Vec<(BatchOperation, u64)>) -> impl Iterator<Item = Vec<BatchOperation>> {
688 let mut remaining = ops.into_iter().peekable();
689
690 std::iter::from_fn(move || {
691 remaining.peek()?;
692 let mut batch_size = 0;
693 let mut batch = Vec::new();
694
695 while let Some((_, op_size)) = remaining.peek() {
696 if batch.len() >= MAX_BATCH_OPS
697 || (!batch.is_empty() && batch_size + op_size > MAX_BATCH_BODY_SIZE)
698 {
699 break;
700 }
701
702 let (op, op_size) = remaining.next().expect("peeked above");
703 batch_size += op_size;
704 batch.push(op);
705 }
706
707 Some(batch)
708 })
709}
710
711impl ManyBuilder {
712 pub async fn send(self) -> OperationResults {
716 let session = self.session;
717 let individual_concurrency = self
718 .max_individual_concurrency
719 .unwrap_or(DEFAULT_INDIVIDUAL_CONCURRENCY)
720 .max(1);
721 let batch_concurrency = self
722 .max_batch_concurrency
723 .unwrap_or(DEFAULT_BATCH_CONCURRENCY)
724 .max(1);
725
726 let (batchable, individual, failed) = partition(self.operations).await;
728
729 let individual_results = futures_util::stream::iter(individual)
731 .map({
732 let session = session.clone();
733 move |op| {
734 let session = session.clone();
735 async move { execute_individual(op, &session).await }
736 }
737 })
738 .buffer_unordered(individual_concurrency);
739
740 let batch_results = futures_util::stream::iter(iter_batches(batchable))
742 .map(move |chunk| {
743 let session = session.clone();
744 async move { execute_batch(chunk, &session).await }
745 })
746 .buffer_unordered(batch_concurrency)
747 .flat_map(futures_util::stream::iter);
748
749 let results = futures_util::stream::iter(failed)
750 .chain(individual_results)
751 .chain(batch_results);
752
753 OperationResults(results.boxed())
754 }
755
756 pub fn max_individual_concurrency(mut self, concurrency: usize) -> Self {
762 self.max_individual_concurrency = Some(concurrency);
763 self
764 }
765
766 pub fn max_batch_concurrency(mut self, concurrency: usize) -> Self {
772 self.max_batch_concurrency = Some(concurrency);
773 self
774 }
775
776 #[allow(private_bounds)]
786 pub fn push<B: Into<BatchOperation>>(mut self, builder: B) -> Self {
787 self.operations.push(builder.into());
788 self
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795
796 fn op(size: u64) -> (BatchOperation, u64) {
798 (
799 BatchOperation::Delete {
800 key: "k".to_owned(),
801 },
802 size,
803 )
804 }
805
806 fn batch_sizes(batches: &[Vec<BatchOperation>]) -> Vec<usize> {
807 batches.iter().map(Vec::len).collect()
808 }
809
810 fn batches(ops: Vec<(BatchOperation, u64)>) -> Vec<Vec<BatchOperation>> {
811 iter_batches(ops).collect()
812 }
813
814 fn put_with_zstd(size: usize) -> BatchOperation {
815 BatchOperation::Insert {
816 key: Some("k".to_owned()),
817 metadata: Metadata {
818 compression: Some(Compression::Zstd),
819 ..Default::default()
820 },
821 body: PutBody::Buffer(vec![0; size].into()),
822 }
823 }
824
825 #[tokio::test]
826 async fn zstd_put_at_limit_is_batchable() {
827 let size = 1_044_496;
828 let post_compression = zstd_safe::compress_bound(size);
829 assert!(post_compression == MAX_BATCH_PART_SIZE as usize);
830
831 core::assert_matches!(
832 classify(put_with_zstd(size)).await,
833 Classified::Batchable(_, s) if s == post_compression as u64
834 );
835 }
836
837 #[tokio::test]
838 async fn zstd_put_above_limit_is_individual() {
839 let size = 1_044_497;
840 let post_compression = zstd_safe::compress_bound(size);
841 assert!(post_compression > MAX_BATCH_PART_SIZE as usize);
842
843 core::assert_matches!(
844 classify(put_with_zstd(size)).await,
845 Classified::Individual(_)
846 );
847 }
848
849 #[test]
850 fn iter_batches_empty() {
851 assert!(batches(vec![]).is_empty());
852 }
853
854 #[test]
855 fn iter_batches_single_batch_count_limit() {
856 let ops: Vec<_> = (0..1000).map(|_| op(1)).collect();
858 assert_eq!(batch_sizes(&batches(ops)), vec![1000]);
859 }
860
861 #[test]
862 fn iter_batches_splits_on_count_limit() {
863 let ops: Vec<_> = (0..1001).map(|_| op(1)).collect();
865 assert_eq!(batch_sizes(&batches(ops)), vec![1000, 1]);
866 }
867
868 #[test]
869 fn iter_batches_exactly_at_size_limit() {
870 let ops: Vec<_> = (0..100).map(|_| op(1024 * 1024)).collect();
872 assert_eq!(batch_sizes(&batches(ops)), vec![100]);
873 }
874
875 #[test]
876 fn iter_batches_splits_on_size_limit() {
877 let ops: Vec<_> = (0..101).map(|_| op(1024 * 1024)).collect();
879 assert_eq!(batch_sizes(&batches(ops)), vec![100, 1]);
880 }
881
882 #[test]
883 fn iter_batches_size_limit_hits_before_count_limit() {
884 let op_size = 600 * 1024;
886 let ops: Vec<_> = (0..200).map(|_| op(op_size)).collect();
887 let result = batches(ops);
888 let per_batch = (MAX_BATCH_BODY_SIZE / op_size) as usize;
890 assert!(result.len() > 1, "expected multiple batches");
891 for batch in &result[..result.len() - 1] {
892 assert_eq!(batch.len(), per_batch);
893 }
894 }
895}