Skip to main content

objectstore_service/
streaming.rs

1//! Streaming operation types and concurrent executor.
2//!
3//! [`StreamExecutor`] processes a stream of `(idx, Result<`[`Operation`]`, E>)` tuples
4//! concurrently within a bounded window. Errors in the input stream pass through
5//! unchanged; successful operations are executed against the backend directly,
6//! with [`tokio::spawn`] for panic isolation and run-to-completion guarantees.
7//!
8//! ## Window and Permit Reservation
9//!
10//! The concurrency window is derived from the service's available permits at the time
11//! [`StorageService::stream`](crate::service::StorageService::stream) is called: `ceil(tasks_available × 0.10)`.
12//! The executor pre-acquires exactly `window` permits from the service's
13//! `ConcurrencyLimiter` as a single bulk reservation. The reservation is shared
14//! (via `Arc`) with every spawned task, so permits are released only after every
15//! in-flight task completes — even if the output stream is dropped early.
16//!
17//! This means:
18//! - If the service is at capacity, [`StorageService::stream`](crate::service::StorageService::stream) fails immediately with
19//!   [`Error::AtCapacity`] before any operations are read.
20//! - During execution, operations call the storage backend directly without acquiring
21//!   additional per-operation permits.
22//!
23//! ## Concurrency Model
24//!
25//! [`StreamExecutor::execute`] uses `buffer_unordered` to drive up to `window`
26//! operations concurrently. The input stream is pulled lazily — at most `window`
27//! operations are in-flight at once, bounding memory to roughly
28//! `window × max_operation_size`. Results are yielded in completion order.
29//!
30//! Each operation is wrapped in a [`tokio::spawn`] for panic isolation: a panic in
31//! one operation surfaces as [`Error::Panic`] for that item and does not affect the
32//! others.
33//!
34//! ## Future Scope
35//!
36//! The window fraction (10%) is hard-coded. Configurable fractions, adaptive window
37//! sizing, and backend-level optimizations (e.g. BigTable multi-read, GCS batch API)
38//! are out of scope for the current implementation.
39
40use std::sync::Arc;
41
42use futures_util::{Stream, StreamExt};
43use objectstore_types::metadata::Metadata;
44
45use crate::backend::common::Backend;
46use crate::concurrency::ConcurrencyPermit;
47use crate::error::{Error, Result};
48use crate::id::{ObjectContext, ObjectId, ObjectKey};
49use crate::service::GetResponse;
50
51/// An insert operation: stores an object at the given key.
52#[derive(Debug)]
53pub struct Insert {
54    /// The key to store the object under. When `None`, the service generates a key.
55    pub key: Option<ObjectKey>,
56    /// Metadata for the object.
57    pub metadata: Metadata,
58    /// The object payload. Batch inserts are fully buffered (≤1 MiB).
59    pub payload: bytes::Bytes,
60}
61
62/// A get operation: retrieves an existing object by key.
63#[derive(Debug)]
64pub struct Get {
65    /// The key of the object to retrieve.
66    pub key: ObjectKey,
67}
68
69/// A delete operation: removes an object by key.
70#[derive(Debug)]
71pub struct Delete {
72    /// The key of the object to delete.
73    pub key: ObjectKey,
74}
75
76/// A head (metadata-only) operation: checks existence and retrieves metadata by key.
77#[derive(Debug)]
78pub struct Head {
79    /// The key of the object to check.
80    pub key: ObjectKey,
81}
82
83/// A single streaming operation.
84#[derive(Debug)]
85pub enum Operation {
86    /// Insert a new object.
87    Insert(Insert),
88    /// Get an existing object.
89    Get(Get),
90    /// Delete an object.
91    Delete(Delete),
92    /// Head (metadata-only) check for an object.
93    Head(Head),
94}
95
96impl Operation {
97    /// Returns the key for this operation, if one was provided.
98    pub fn key(&self) -> Option<&ObjectKey> {
99        match self {
100            Operation::Insert(op) => op.key.as_ref(),
101            Operation::Get(op) => Some(&op.key),
102            Operation::Delete(op) => Some(&op.key),
103            Operation::Head(op) => Some(&op.key),
104        }
105    }
106
107    /// Returns the permission required to perform this operation.
108    pub fn permission(&self) -> objectstore_types::auth::Permission {
109        match self {
110            Operation::Get(_) | Operation::Head(_) => {
111                objectstore_types::auth::Permission::ObjectRead
112            }
113            Operation::Insert(_) => objectstore_types::auth::Permission::ObjectWrite,
114            Operation::Delete(_) => objectstore_types::auth::Permission::ObjectDelete,
115        }
116    }
117
118    /// Returns the kind name for this operation.
119    pub fn kind(&self) -> &'static str {
120        match self {
121            Operation::Insert(_) => "insert",
122            Operation::Get(_) => "get",
123            Operation::Delete(_) => "delete",
124            Operation::Head(_) => "head",
125        }
126    }
127}
128
129/// The response of a single executed streaming operation.
130///
131/// Each variant carries the fields needed to render a response part.
132/// The kind (`"insert"`, `"get"`, `"delete"`) is derivable via [`OpResponse::kind`].
133pub enum OpResponse {
134    /// An insert completed successfully.
135    Inserted {
136        /// The fully-qualified identifier assigned to the inserted object.
137        id: ObjectId,
138    },
139    /// A get completed.
140    Got {
141        /// The key that was looked up.
142        key: ObjectKey,
143        /// The object content, or `None` if the object was not found.
144        response: GetResponse,
145    },
146    /// A delete completed successfully.
147    Deleted {
148        /// The key that was deleted.
149        key: ObjectKey,
150    },
151    /// A head (metadata-only) check completed.
152    Head {
153        /// The key that was checked.
154        key: ObjectKey,
155        /// The metadata, or `None` if the object was not found.
156        metadata: Option<Metadata>,
157    },
158}
159
160impl OpResponse {
161    /// Returns the operation kind name.
162    pub fn kind(&self) -> &'static str {
163        match self {
164            OpResponse::Inserted { .. } => "insert",
165            OpResponse::Got { .. } => "get",
166            OpResponse::Deleted { .. } => "delete",
167            OpResponse::Head { .. } => "head",
168        }
169    }
170
171    /// Returns the object key for this response.
172    pub fn key(&self) -> &ObjectKey {
173        match self {
174            OpResponse::Inserted { id } => &id.key,
175            OpResponse::Got { key, .. } => key,
176            OpResponse::Deleted { key } => key,
177            OpResponse::Head { key, .. } => key,
178        }
179    }
180}
181
182impl std::fmt::Debug for OpResponse {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        match self {
185            OpResponse::Inserted { id } => f.debug_struct("Inserted").field("id", id).finish(),
186            OpResponse::Got {
187                key,
188                response: Some(_),
189            } => f
190                .debug_struct("Got")
191                .field("key", key)
192                .field("response", &format_args!("Some(<stream>)"))
193                .finish(),
194            OpResponse::Got {
195                key,
196                response: None,
197            } => f
198                .debug_struct("Got")
199                .field("key", key)
200                .field("response", &format_args!("None"))
201                .finish(),
202            OpResponse::Deleted { key } => f.debug_struct("Deleted").field("key", key).finish(),
203            OpResponse::Head { key, metadata } => f
204                .debug_struct("Head")
205                .field("key", key)
206                .field("metadata", &metadata.is_some())
207                .finish(),
208        }
209    }
210}
211
212/// Executes streaming operations with bounded concurrency.
213///
214/// Construct via [`StorageService::stream`](crate::service::StorageService::stream),
215/// which pre-acquires the concurrency window from the service's available permits.
216///
217/// See the [module documentation](self) for a full description of the window
218/// calculation, permit reservation, and concurrency model.
219#[derive(Debug)]
220pub struct StreamExecutor {
221    pub(crate) backend: Arc<dyn Backend>,
222    pub(crate) window: usize,
223    pub(crate) reservation: ConcurrencyPermit,
224}
225
226impl StreamExecutor {
227    /// Returns the concurrency window computed at construction.
228    pub fn window(&self) -> usize {
229        self.window
230    }
231
232    /// Executes the operations stream with bounded concurrency.
233    ///
234    /// Each item is a `(index, Result<Operation, E>)` tuple where `index` is the
235    /// 0-based position of the operation in the original request. Error items pass
236    /// through immediately; successful items are executed concurrently up to `window`
237    /// at a time, each in an isolated [`tokio::spawn`].
238    ///
239    /// Results are yielded in completion order (not submission order). The permit
240    /// reservation is held until every spawned task has completed — if the stream
241    /// is dropped early, in-flight tasks run to completion before the permits are
242    /// released.
243    pub fn execute<E>(
244        self,
245        context: ObjectContext,
246        operations: impl Stream<Item = (usize, Result<Operation, E>)> + Send + 'static,
247    ) -> impl Stream<Item = (usize, Result<OpResponse, E>)> + Send + 'static
248    where
249        E: From<Error> + Send + 'static,
250    {
251        let StreamExecutor {
252            backend,
253            window,
254            reservation,
255        } = self;
256
257        // Arc-wrap so each spawned task can hold a clone. Permits are released
258        // only when the last clone is dropped — i.e. after every spawned task
259        // completes, even if the output stream is dropped early.
260        let reservation = Arc::new(reservation);
261
262        operations
263            .map(move |(idx, item)| {
264                let permit = Arc::clone(&reservation);
265                let backend = Arc::clone(&backend);
266                let context = context.clone();
267                async move {
268                    let op = match item {
269                        Ok(op) => op,
270                        Err(e) => return (idx, Err(e)),
271                    };
272
273                    let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move {
274                        execute_operation(backend, context, op).await
275                    });
276
277                    (idx, spawn.await.map_err(E::from))
278                }
279            })
280            .buffer_unordered(window)
281    }
282}
283
284async fn execute_operation(
285    backend: Arc<dyn Backend>,
286    context: ObjectContext,
287    op: Operation,
288) -> Result<OpResponse> {
289    match op {
290        Operation::Get(get) => {
291            let id = ObjectId::new(context, get.key);
292            let response = backend.get_object(&id, None).await?;
293            Ok(OpResponse::Got {
294                key: id.key,
295                response,
296            })
297        }
298        Operation::Insert(insert) => {
299            let id = ObjectId::optional(context, insert.key);
300            let stream = crate::stream::single(insert.payload);
301            backend.put_object(&id, &insert.metadata, stream).await?;
302            Ok(OpResponse::Inserted { id })
303        }
304        Operation::Delete(delete) => {
305            let id = ObjectId::new(context, delete.key);
306            backend.delete_object(&id).await?;
307            Ok(OpResponse::Deleted { key: id.key })
308        }
309        Operation::Head(head) => {
310            let id = ObjectId::new(context, head.key);
311            let metadata = backend.get_metadata(&id).await?;
312            Ok(OpResponse::Head {
313                key: id.key,
314                metadata,
315            })
316        }
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use std::sync::Arc;
323    use std::sync::atomic::{AtomicUsize, Ordering};
324
325    use bytes::Bytes;
326    use futures_util::StreamExt;
327    use objectstore_types::metadata::Metadata;
328    use objectstore_types::scope::{Scope, Scopes};
329
330    use super::*;
331    use crate::backend::common::PutResponse;
332    use crate::backend::in_memory::InMemoryBackend;
333    use crate::backend::testing::{Hooks, TestBackend};
334    use crate::error::Error;
335    use crate::service::StorageService;
336    use crate::stream::{self, ClientStream};
337
338    fn make_context() -> ObjectContext {
339        ObjectContext {
340            usecase: "testing".into(),
341            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
342        }
343    }
344
345    fn make_service_with_limit(limit: usize) -> StorageService {
346        StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
347            .with_concurrency_limit(limit)
348    }
349
350    fn make_service() -> StorageService {
351        make_service_with_limit(500)
352    }
353
354    // Wraps a plain `Vec<Operation>` as an indexed `Ok`-stream for `execute`.
355    fn indexed_ok(ops: Vec<Operation>) -> impl Stream<Item = (usize, Result<Operation, Error>)> {
356        futures_util::stream::iter(ops.into_iter().enumerate().map(|(i, op)| (i, Ok(op))))
357    }
358
359    // --- StreamExecutor window and capacity tests ---
360
361    #[test]
362    fn at_capacity_when_no_permits() {
363        let service = make_service_with_limit(0);
364        assert!(matches!(service.stream(), Err(Error::AtCapacity)));
365    }
366
367    #[test]
368    fn window_computation() {
369        // ceil(1 × 0.10) = 1
370        let s = make_service_with_limit(1);
371        assert_eq!(s.stream().unwrap().window(), 1);
372
373        // ceil(10 × 0.10) = 1
374        let s = make_service_with_limit(10);
375        assert_eq!(s.stream().unwrap().window(), 1);
376
377        // ceil(100 × 0.10) = 10
378        let s = make_service_with_limit(100);
379        assert_eq!(s.stream().unwrap().window(), 10);
380
381        // ceil(500 × 0.10) = 50
382        let s = make_service_with_limit(500);
383        assert_eq!(s.stream().unwrap().window(), 50);
384
385        // ceil(1000 × 0.10) = 100
386        let s = make_service_with_limit(1000);
387        assert_eq!(s.stream().unwrap().window(), 100);
388    }
389
390    // --- StreamExecutor::execute() correctness tests ---
391
392    #[tokio::test]
393    async fn execute_empty_stream() {
394        let service = make_service();
395        let executor = service.stream().unwrap();
396        let outcomes: Vec<_> = executor
397            .execute(
398                make_context(),
399                futures_util::stream::empty::<(usize, Result<Operation, Error>)>(),
400            )
401            .collect()
402            .await;
403        assert!(outcomes.is_empty());
404    }
405
406    #[tokio::test]
407    async fn execute_runs_all_operations() {
408        let service = make_service();
409        let context = make_context();
410
411        // Seed an object to retrieve and delete.
412        service
413            .insert_object(
414                context.clone(),
415                Some("key1".into()),
416                Metadata::default(),
417                stream::single("hello"),
418            )
419            .await
420            .unwrap();
421
422        let ops = vec![
423            Operation::Get(Get { key: "key1".into() }),
424            Operation::Get(Get {
425                key: "nonexistent".into(),
426            }),
427            Operation::Insert(Insert {
428                key: Some("key2".into()),
429                metadata: Metadata::default(),
430                payload: Bytes::from("world"),
431            }),
432            Operation::Delete(Delete { key: "key1".into() }),
433        ];
434
435        let executor = service.stream().unwrap();
436        let outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
437
438        assert_eq!(outcomes.len(), 4);
439
440        for (_, result) in &outcomes {
441            let response = result
442                .as_ref()
443                .unwrap_or_else(|e| panic!("unexpected error: {e:?}"));
444            assert!(
445                !response.key().as_str().is_empty(),
446                "response must have a non-empty key"
447            );
448        }
449    }
450
451    #[tokio::test]
452    async fn execute_head_operation() {
453        let service = make_service();
454        let context = make_context();
455
456        service
457            .insert_object(
458                context.clone(),
459                Some("exists".into()),
460                Metadata::default(),
461                stream::single("data"),
462            )
463            .await
464            .unwrap();
465
466        let ops = vec![
467            Operation::Head(Head {
468                key: "exists".into(),
469            }),
470            Operation::Head(Head {
471                key: "missing".into(),
472            }),
473        ];
474
475        let executor = service.stream().unwrap();
476        let mut outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
477        outcomes.sort_by_key(|(idx, _)| *idx);
478
479        assert_eq!(outcomes.len(), 2);
480
481        match &outcomes[0].1 {
482            Ok(OpResponse::Head {
483                key,
484                metadata: Some(_),
485            }) => assert_eq!(key.as_str(), "exists"),
486            other => panic!("expected Head with metadata, got: {other:?}"),
487        }
488
489        match &outcomes[1].1 {
490            Ok(OpResponse::Head {
491                key,
492                metadata: None,
493            }) => assert_eq!(key.as_str(), "missing"),
494            other => panic!("expected Head with None, got: {other:?}"),
495        }
496    }
497
498    // --- Service-level concurrent execution and capacity tests ---
499
500    struct GateOnPut {
501        paused_tx: tokio::sync::mpsc::Sender<()>,
502        resume: Arc<tokio::sync::Notify>,
503        in_flight: Arc<AtomicUsize>,
504    }
505
506    impl std::fmt::Debug for GateOnPut {
507        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508            f.debug_struct("GateOnPut").finish()
509        }
510    }
511
512    #[async_trait::async_trait]
513    impl Hooks for GateOnPut {
514        async fn put_object(
515            &self,
516            inner: &InMemoryBackend,
517            id: &ObjectId,
518            metadata: &Metadata,
519            stream: ClientStream,
520        ) -> Result<PutResponse> {
521            self.in_flight.fetch_add(1, Ordering::SeqCst);
522            let _ = self.paused_tx.send(()).await;
523            self.resume.notified().await;
524            let result = inner.put_object(id, metadata, stream).await;
525            self.in_flight.fetch_sub(1, Ordering::SeqCst);
526            result
527        }
528    }
529
530    #[tokio::test]
531    async fn concurrent_execution() {
532        // Window = ceil(100 × 0.10) = 10.
533        let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(20);
534        let resume = Arc::new(tokio::sync::Notify::new());
535        let in_flight = Arc::new(AtomicUsize::new(0));
536
537        let gated = TestBackend::new(GateOnPut {
538            paused_tx,
539            resume: Arc::clone(&resume),
540            in_flight: Arc::clone(&in_flight),
541        });
542        let service = StorageService::new(Box::new(gated)).with_concurrency_limit(100);
543
544        let executor = service.stream().unwrap();
545        assert_eq!(executor.window(), 10);
546
547        // Submit 10 inserts. With window=10, all should be in-flight simultaneously.
548        let ops: Vec<Operation> = (0..10)
549            .map(|i| {
550                Operation::Insert(Insert {
551                    key: Some(format!("key{i}")),
552                    metadata: Metadata::default(),
553                    payload: Bytes::from(format!("data{i}")),
554                })
555            })
556            .collect();
557
558        let exec_handle = tokio::spawn(async move {
559            executor
560                .execute(make_context(), indexed_ok(ops))
561                .collect::<Vec<_>>()
562                .await
563        });
564
565        // Wait for all 10 operations to pause inside the backend.
566        for _ in 0..10 {
567            paused_rx.recv().await.unwrap();
568        }
569        assert_eq!(in_flight.load(Ordering::SeqCst), 10);
570
571        // Release all.
572        resume.notify_waiters();
573
574        let outcomes = exec_handle.await.unwrap();
575        assert_eq!(outcomes.len(), 10);
576        for (_, result) in &outcomes {
577            assert!(
578                matches!(result, Ok(OpResponse::Inserted { .. })),
579                "unexpected result: {result:?}",
580            );
581        }
582    }
583
584    #[tokio::test]
585    async fn batch_rejected_when_permits_exhausted() {
586        // Service with limit=1. One background insert holds the only permit.
587        // service.stream() must fail with AtCapacity.
588        let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(2);
589        let resume = Arc::new(tokio::sync::Notify::new());
590
591        let gated = TestBackend::new(GateOnPut {
592            paused_tx,
593            resume: Arc::clone(&resume),
594            in_flight: Arc::new(AtomicUsize::new(0)),
595        });
596        let service = StorageService::new(Box::new(gated)).with_concurrency_limit(1);
597
598        // Hold the only permit via a blocking insert.
599        let svc = service.clone();
600        tokio::spawn(async move {
601            let _ = svc
602                .insert_object(
603                    make_context(),
604                    Some("blocker".into()),
605                    Metadata::default(),
606                    stream::single("x"),
607                )
608                .await;
609        });
610        paused_rx.recv().await.unwrap();
611
612        // Permit is held — stream() must fail immediately with AtCapacity.
613        assert!(
614            matches!(service.stream(), Err(Error::AtCapacity)),
615            "expected AtCapacity when all permits are held"
616        );
617
618        resume.notify_waiters();
619    }
620}