objectstore_service/
streaming.rs

1//! Streaming operation types and concurrent executor.
2//!
3//! [`StreamExecutor`] processes a stream of `(idx, Result<`[`Operation`]`, E>)` tuples
4//! concurrently within a bounded window. Errors in the input stream pass through
5//! unchanged; successful operations are executed against the backend directly,
6//! with [`tokio::spawn`] for panic isolation and run-to-completion guarantees.
7//!
8//! ## Window and Permit Reservation
9//!
10//! The concurrency window is derived from the service's available permits at the time
11//! [`StorageService::stream`](crate::service::StorageService::stream) is called: `ceil(tasks_available × 0.10)`.
12//! The executor pre-acquires exactly `window` permits from the service's
13//! `ConcurrencyLimiter` as a single bulk reservation. The reservation is shared
14//! (via `Arc`) with every spawned task, so permits are released only after every
15//! in-flight task completes — even if the output stream is dropped early.
16//!
17//! This means:
18//! - If the service is at capacity, [`StorageService::stream`](crate::service::StorageService::stream) fails immediately with
19//!   [`Error::AtCapacity`] before any operations are read.
20//! - During execution, operations call the storage backend directly without acquiring
21//!   additional per-operation permits.
22//!
23//! ## Concurrency Model
24//!
25//! [`StreamExecutor::execute`] uses `buffer_unordered` to drive up to `window`
26//! operations concurrently. The input stream is pulled lazily — at most `window`
27//! operations are in-flight at once, bounding memory to roughly
28//! `window × max_operation_size`. Results are yielded in completion order.
29//!
30//! Each operation is wrapped in a [`tokio::spawn`] for panic isolation: a panic in
31//! one operation surfaces as [`Error::Panic`] for that item and does not affect the
32//! others.
33//!
34//! ## Future Scope
35//!
36//! The window fraction (10%) is hard-coded. Configurable fractions, adaptive window
37//! sizing, and backend-level optimizations (e.g. BigTable multi-read, GCS batch API)
38//! are out of scope for the current implementation.
39
40use std::sync::Arc;
41
42use futures_util::{Stream, StreamExt};
43use objectstore_types::metadata::Metadata;
44
45use crate::backend::common::Backend;
46use crate::concurrency::ConcurrencyPermit;
47use crate::error::{Error, Result};
48use crate::id::{ObjectContext, ObjectId, ObjectKey};
49use crate::service::GetResponse;
50
51/// An insert operation: stores an object at the given key.
52#[derive(Debug)]
53pub struct Insert {
54    /// The key to store the object under. When `None`, the service generates a key.
55    pub key: Option<ObjectKey>,
56    /// Metadata for the object.
57    pub metadata: Metadata,
58    /// The object payload. Batch inserts are fully buffered (≤1 MiB).
59    pub payload: bytes::Bytes,
60}
61
62/// A get operation: retrieves an existing object by key.
63#[derive(Debug)]
64pub struct Get {
65    /// The key of the object to retrieve.
66    pub key: ObjectKey,
67}
68
69/// A delete operation: removes an object by key.
70#[derive(Debug)]
71pub struct Delete {
72    /// The key of the object to delete.
73    pub key: ObjectKey,
74}
75
76/// A single streaming operation.
77#[derive(Debug)]
78pub enum Operation {
79    /// Insert a new object.
80    Insert(Insert),
81    /// Get an existing object.
82    Get(Get),
83    /// Delete an object.
84    Delete(Delete),
85}
86
87impl Operation {
88    /// Returns the key for this operation, if one was provided.
89    pub fn key(&self) -> Option<&ObjectKey> {
90        match self {
91            Operation::Insert(op) => op.key.as_ref(),
92            Operation::Get(op) => Some(&op.key),
93            Operation::Delete(op) => Some(&op.key),
94        }
95    }
96
97    /// Returns the permission required to perform this operation.
98    pub fn permission(&self) -> objectstore_types::auth::Permission {
99        match self {
100            Operation::Get(_) => objectstore_types::auth::Permission::ObjectRead,
101            Operation::Insert(_) => objectstore_types::auth::Permission::ObjectWrite,
102            Operation::Delete(_) => objectstore_types::auth::Permission::ObjectDelete,
103        }
104    }
105
106    /// Returns the kind name for this operation.
107    pub fn kind(&self) -> &'static str {
108        match self {
109            Operation::Insert(_) => "insert",
110            Operation::Get(_) => "get",
111            Operation::Delete(_) => "delete",
112        }
113    }
114}
115
116/// The response of a single executed streaming operation.
117///
118/// Each variant carries the fields needed to render a response part.
119/// The kind (`"insert"`, `"get"`, `"delete"`) is derivable via [`OpResponse::kind`].
120pub enum OpResponse {
121    /// An insert completed successfully.
122    Inserted {
123        /// The fully-qualified identifier assigned to the inserted object.
124        id: ObjectId,
125    },
126    /// A get completed.
127    Got {
128        /// The key that was looked up.
129        key: ObjectKey,
130        /// The object content, or `None` if the object was not found.
131        response: GetResponse,
132    },
133    /// A delete completed successfully.
134    Deleted {
135        /// The key that was deleted.
136        key: ObjectKey,
137    },
138}
139
140impl OpResponse {
141    /// Returns the operation kind name.
142    pub fn kind(&self) -> &'static str {
143        match self {
144            OpResponse::Inserted { .. } => "insert",
145            OpResponse::Got { .. } => "get",
146            OpResponse::Deleted { .. } => "delete",
147        }
148    }
149
150    /// Returns the object key for this response.
151    pub fn key(&self) -> &ObjectKey {
152        match self {
153            OpResponse::Inserted { id } => &id.key,
154            OpResponse::Got { key, .. } => key,
155            OpResponse::Deleted { key } => key,
156        }
157    }
158}
159
160impl std::fmt::Debug for OpResponse {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            OpResponse::Inserted { id } => f.debug_struct("Inserted").field("id", id).finish(),
164            OpResponse::Got {
165                key,
166                response: Some(_),
167            } => f
168                .debug_struct("Got")
169                .field("key", key)
170                .field("response", &format_args!("Some(<stream>)"))
171                .finish(),
172            OpResponse::Got {
173                key,
174                response: None,
175            } => f
176                .debug_struct("Got")
177                .field("key", key)
178                .field("response", &format_args!("None"))
179                .finish(),
180            OpResponse::Deleted { key } => f.debug_struct("Deleted").field("key", key).finish(),
181        }
182    }
183}
184
185/// Executes streaming operations with bounded concurrency.
186///
187/// Construct via [`StorageService::stream`](crate::service::StorageService::stream),
188/// which pre-acquires the concurrency window from the service's available permits.
189///
190/// See the [module documentation](self) for a full description of the window
191/// calculation, permit reservation, and concurrency model.
192#[derive(Debug)]
193pub struct StreamExecutor {
194    pub(crate) backend: Arc<dyn Backend>,
195    pub(crate) window: usize,
196    pub(crate) reservation: ConcurrencyPermit,
197}
198
199impl StreamExecutor {
200    /// Returns the concurrency window computed at construction.
201    pub fn window(&self) -> usize {
202        self.window
203    }
204
205    /// Executes the operations stream with bounded concurrency.
206    ///
207    /// Each item is a `(index, Result<Operation, E>)` tuple where `index` is the
208    /// 0-based position of the operation in the original request. Error items pass
209    /// through immediately; successful items are executed concurrently up to `window`
210    /// at a time, each in an isolated [`tokio::spawn`].
211    ///
212    /// Results are yielded in completion order (not submission order). The permit
213    /// reservation is held until every spawned task has completed — if the stream
214    /// is dropped early, in-flight tasks run to completion before the permits are
215    /// released.
216    pub fn execute<E>(
217        self,
218        context: ObjectContext,
219        operations: impl Stream<Item = (usize, Result<Operation, E>)> + Send + 'static,
220    ) -> impl Stream<Item = (usize, Result<OpResponse, E>)> + Send + 'static
221    where
222        E: From<Error> + Send + 'static,
223    {
224        let StreamExecutor {
225            backend,
226            window,
227            reservation,
228        } = self;
229
230        // Arc-wrap so each spawned task can hold a clone. Permits are released
231        // only when the last clone is dropped — i.e. after every spawned task
232        // completes, even if the output stream is dropped early.
233        let reservation = Arc::new(reservation);
234
235        operations
236            .map(move |(idx, item)| {
237                let permit = Arc::clone(&reservation);
238                let backend = Arc::clone(&backend);
239                let context = context.clone();
240                async move {
241                    let op = match item {
242                        Ok(op) => op,
243                        Err(e) => return (idx, Err(e)),
244                    };
245
246                    let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move {
247                        execute_operation(backend, context, op).await
248                    });
249
250                    (idx, spawn.await.map_err(E::from))
251                }
252            })
253            .buffer_unordered(window)
254    }
255}
256
257async fn execute_operation(
258    backend: Arc<dyn Backend>,
259    context: ObjectContext,
260    op: Operation,
261) -> Result<OpResponse> {
262    match op {
263        Operation::Get(get) => {
264            let id = ObjectId::new(context, get.key);
265            let response = backend.get_object(&id).await?;
266            Ok(OpResponse::Got {
267                key: id.key,
268                response,
269            })
270        }
271        Operation::Insert(insert) => {
272            let id = ObjectId::optional(context, insert.key);
273            let stream = crate::stream::single(insert.payload);
274            backend.put_object(&id, &insert.metadata, stream).await?;
275            Ok(OpResponse::Inserted { id })
276        }
277        Operation::Delete(delete) => {
278            let id = ObjectId::new(context, delete.key);
279            backend.delete_object(&id).await?;
280            Ok(OpResponse::Deleted { key: id.key })
281        }
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use std::sync::Arc;
288    use std::sync::atomic::{AtomicUsize, Ordering};
289
290    use bytes::Bytes;
291    use futures_util::StreamExt;
292    use objectstore_types::metadata::Metadata;
293    use objectstore_types::scope::{Scope, Scopes};
294
295    use super::*;
296    use crate::backend::common::PutResponse;
297    use crate::backend::in_memory::InMemoryBackend;
298    use crate::backend::testing::{Hooks, TestBackend};
299    use crate::error::Error;
300    use crate::service::StorageService;
301    use crate::stream::{self, ClientStream};
302
303    fn make_context() -> ObjectContext {
304        ObjectContext {
305            usecase: "testing".into(),
306            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
307        }
308    }
309
310    fn make_service_with_limit(limit: usize) -> StorageService {
311        StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
312            .with_concurrency_limit(limit)
313    }
314
315    fn make_service() -> StorageService {
316        make_service_with_limit(500)
317    }
318
319    // Wraps a plain `Vec<Operation>` as an indexed `Ok`-stream for `execute`.
320    fn indexed_ok(
321        ops: Vec<Operation>,
322    ) -> impl futures_util::Stream<Item = (usize, Result<Operation, Error>)> {
323        futures_util::stream::iter(ops.into_iter().enumerate().map(|(i, op)| (i, Ok(op))))
324    }
325
326    // --- StreamExecutor window and capacity tests ---
327
328    #[test]
329    fn at_capacity_when_no_permits() {
330        let service = make_service_with_limit(0);
331        assert!(matches!(service.stream(), Err(Error::AtCapacity)));
332    }
333
334    #[test]
335    fn window_computation() {
336        // ceil(1 × 0.10) = 1
337        let s = make_service_with_limit(1);
338        assert_eq!(s.stream().unwrap().window(), 1);
339
340        // ceil(10 × 0.10) = 1
341        let s = make_service_with_limit(10);
342        assert_eq!(s.stream().unwrap().window(), 1);
343
344        // ceil(100 × 0.10) = 10
345        let s = make_service_with_limit(100);
346        assert_eq!(s.stream().unwrap().window(), 10);
347
348        // ceil(500 × 0.10) = 50
349        let s = make_service_with_limit(500);
350        assert_eq!(s.stream().unwrap().window(), 50);
351
352        // ceil(1000 × 0.10) = 100
353        let s = make_service_with_limit(1000);
354        assert_eq!(s.stream().unwrap().window(), 100);
355    }
356
357    // --- StreamExecutor::execute() correctness tests ---
358
359    #[tokio::test]
360    async fn execute_empty_stream() {
361        let service = make_service();
362        let executor = service.stream().unwrap();
363        let outcomes: Vec<_> = executor
364            .execute(
365                make_context(),
366                futures_util::stream::empty::<(usize, Result<Operation, Error>)>(),
367            )
368            .collect()
369            .await;
370        assert!(outcomes.is_empty());
371    }
372
373    #[tokio::test]
374    async fn execute_runs_all_operations() {
375        let service = make_service();
376        let context = make_context();
377
378        // Seed an object to retrieve and delete.
379        service
380            .insert_object(
381                context.clone(),
382                Some("key1".into()),
383                Metadata::default(),
384                stream::single("hello"),
385            )
386            .await
387            .unwrap();
388
389        let ops = vec![
390            Operation::Get(Get { key: "key1".into() }),
391            Operation::Get(Get {
392                key: "nonexistent".into(),
393            }),
394            Operation::Insert(Insert {
395                key: Some("key2".into()),
396                metadata: Metadata::default(),
397                payload: Bytes::from("world"),
398            }),
399            Operation::Delete(Delete { key: "key1".into() }),
400        ];
401
402        let executor = service.stream().unwrap();
403        let outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
404
405        assert_eq!(outcomes.len(), 4);
406
407        for (_, result) in &outcomes {
408            let response = result
409                .as_ref()
410                .unwrap_or_else(|e| panic!("unexpected error: {e:?}"));
411            assert!(
412                !response.key().as_str().is_empty(),
413                "response must have a non-empty key"
414            );
415        }
416    }
417
418    // --- Service-level concurrent execution and capacity tests ---
419
420    struct GateOnPut {
421        paused_tx: tokio::sync::mpsc::Sender<()>,
422        resume: Arc<tokio::sync::Notify>,
423        in_flight: Arc<AtomicUsize>,
424    }
425
426    impl std::fmt::Debug for GateOnPut {
427        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428            f.debug_struct("GateOnPut").finish()
429        }
430    }
431
432    #[async_trait::async_trait]
433    impl Hooks for GateOnPut {
434        async fn put_object(
435            &self,
436            inner: &InMemoryBackend,
437            id: &ObjectId,
438            metadata: &Metadata,
439            stream: ClientStream,
440        ) -> Result<PutResponse> {
441            self.in_flight.fetch_add(1, Ordering::SeqCst);
442            let _ = self.paused_tx.send(()).await;
443            self.resume.notified().await;
444            let result = inner.put_object(id, metadata, stream).await;
445            self.in_flight.fetch_sub(1, Ordering::SeqCst);
446            result
447        }
448    }
449
450    #[tokio::test]
451    async fn concurrent_execution() {
452        // Window = ceil(100 × 0.10) = 10.
453        let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(20);
454        let resume = Arc::new(tokio::sync::Notify::new());
455        let in_flight = Arc::new(AtomicUsize::new(0));
456
457        let gated = TestBackend::new(GateOnPut {
458            paused_tx,
459            resume: Arc::clone(&resume),
460            in_flight: Arc::clone(&in_flight),
461        });
462        let service = StorageService::new(Box::new(gated)).with_concurrency_limit(100);
463
464        let executor = service.stream().unwrap();
465        assert_eq!(executor.window(), 10);
466
467        // Submit 10 inserts. With window=10, all should be in-flight simultaneously.
468        let ops: Vec<Operation> = (0..10)
469            .map(|i| {
470                Operation::Insert(Insert {
471                    key: Some(format!("key{i}")),
472                    metadata: Metadata::default(),
473                    payload: Bytes::from(format!("data{i}")),
474                })
475            })
476            .collect();
477
478        let exec_handle = tokio::spawn(async move {
479            executor
480                .execute(make_context(), indexed_ok(ops))
481                .collect::<Vec<_>>()
482                .await
483        });
484
485        // Wait for all 10 operations to pause inside the backend.
486        for _ in 0..10 {
487            paused_rx.recv().await.unwrap();
488        }
489        assert_eq!(in_flight.load(Ordering::SeqCst), 10);
490
491        // Release all.
492        resume.notify_waiters();
493
494        let outcomes = exec_handle.await.unwrap();
495        assert_eq!(outcomes.len(), 10);
496        for (_, result) in &outcomes {
497            assert!(
498                matches!(result, Ok(OpResponse::Inserted { .. })),
499                "unexpected result: {:?}",
500                result
501            );
502        }
503    }
504
505    #[tokio::test]
506    async fn batch_rejected_when_permits_exhausted() {
507        // Service with limit=1. One background insert holds the only permit.
508        // service.stream() must fail with AtCapacity.
509        let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(2);
510        let resume = Arc::new(tokio::sync::Notify::new());
511
512        let gated = TestBackend::new(GateOnPut {
513            paused_tx,
514            resume: Arc::clone(&resume),
515            in_flight: Arc::new(AtomicUsize::new(0)),
516        });
517        let service = StorageService::new(Box::new(gated)).with_concurrency_limit(1);
518
519        // Hold the only permit via a blocking insert.
520        let svc = service.clone();
521        tokio::spawn(async move {
522            let _ = svc
523                .insert_object(
524                    make_context(),
525                    Some("blocker".into()),
526                    Metadata::default(),
527                    stream::single("x"),
528                )
529                .await;
530        });
531        paused_rx.recv().await.unwrap();
532
533        // Permit is held — stream() must fail immediately with AtCapacity.
534        assert!(
535            matches!(service.stream(), Err(Error::AtCapacity)),
536            "expected AtCapacity when all permits are held"
537        );
538
539        resume.notify_waiters();
540    }
541}