Skip to main content

objectstore_service/
streaming.rs

1//! Streaming operation types and concurrent executor.
2//!
3//! [`StreamExecutor`] processes a stream of `(idx, Result<`[`Operation`]`, E>)` tuples
4//! concurrently within a bounded window. Errors in the input stream pass through
5//! unchanged; successful operations are executed against the backend directly,
6//! with [`tokio::spawn`] for panic isolation and run-to-completion guarantees.
7//!
8//! ## Window and Permit Reservation
9//!
10//! The concurrency window is derived from the service's available permits at the time
11//! [`StorageService::stream`](crate::service::StorageService::stream) is called: `ceil(tasks_available × 0.10)`.
12//! The executor pre-acquires exactly `window` permits from the service's
13//! `ConcurrencyLimiter` as a single bulk reservation. The reservation is shared
14//! (via `Arc`) with every spawned task, so permits are released only after every
15//! in-flight task completes — even if the output stream is dropped early.
16//!
17//! This means:
18//! - If the service is at capacity, [`StorageService::stream`](crate::service::StorageService::stream) fails immediately with
19//!   [`Error::AtCapacity`] before any operations are read.
20//! - During execution, operations call the storage backend directly without acquiring
21//!   additional per-operation permits.
22//!
23//! ## Concurrency Model
24//!
25//! [`StreamExecutor::execute`] uses `buffer_unordered` to drive up to `window`
26//! operations concurrently. The input stream is pulled lazily — at most `window`
27//! operations are in-flight at once, bounding memory to roughly
28//! `window × max_operation_size`. Results are yielded in completion order.
29//!
30//! Each operation is wrapped in a [`tokio::spawn`] for panic isolation: a panic in
31//! one operation surfaces as [`Error::Panic`] for that item and does not affect the
32//! others.
33//!
34//! ## Future Scope
35//!
36//! The window fraction (10%) is hard-coded. Configurable fractions, adaptive window
37//! sizing, and backend-level optimizations (e.g. BigTable multi-read, GCS batch API)
38//! are out of scope for the current implementation.
39
40use std::sync::Arc;
41
42use futures_util::{Stream, StreamExt};
43use objectstore_types::metadata::Metadata;
44
45use crate::backend::common::Backend;
46use crate::concurrency::ConcurrencyPermit;
47use crate::error::{Error, Result};
48use crate::id::{ObjectContext, ObjectId, ObjectKey};
49use crate::service::GetResponse;
50
51/// An insert operation: stores an object at the given key.
52#[derive(Debug)]
53pub struct Insert {
54    /// The key to store the object under. When `None`, the service generates a key.
55    pub key: Option<ObjectKey>,
56    /// Metadata for the object.
57    pub metadata: Metadata,
58    /// The object payload. Batch inserts are fully buffered (≤1 MiB).
59    pub payload: bytes::Bytes,
60}
61
62/// A get operation: retrieves an existing object by key.
63#[derive(Debug)]
64pub struct Get {
65    /// The key of the object to retrieve.
66    pub key: ObjectKey,
67}
68
69/// A delete operation: removes an object by key.
70#[derive(Debug)]
71pub struct Delete {
72    /// The key of the object to delete.
73    pub key: ObjectKey,
74}
75
76/// A head (metadata-only) operation: checks existence and retrieves metadata by key.
77#[derive(Debug)]
78pub struct Head {
79    /// The key of the object to check.
80    pub key: ObjectKey,
81}
82
83/// A single streaming operation.
84#[derive(Debug)]
85pub enum Operation {
86    /// Insert a new object.
87    Insert(Insert),
88    /// Get an existing object.
89    Get(Get),
90    /// Delete an object.
91    Delete(Delete),
92    /// Head (metadata-only) check for an object.
93    Head(Head),
94}
95
96impl Operation {
97    /// Returns the key for this operation, if one was provided.
98    pub fn key(&self) -> Option<&ObjectKey> {
99        match self {
100            Operation::Insert(op) => op.key.as_ref(),
101            Operation::Get(op) => Some(&op.key),
102            Operation::Delete(op) => Some(&op.key),
103            Operation::Head(op) => Some(&op.key),
104        }
105    }
106
107    /// Returns the permission required to perform this operation.
108    pub fn permission(&self) -> objectstore_types::auth::Permission {
109        match self {
110            Operation::Get(_) | Operation::Head(_) => {
111                objectstore_types::auth::Permission::ObjectRead
112            }
113            Operation::Insert(_) => objectstore_types::auth::Permission::ObjectWrite,
114            Operation::Delete(_) => objectstore_types::auth::Permission::ObjectDelete,
115        }
116    }
117
118    /// Returns the kind name for this operation.
119    pub fn kind(&self) -> &'static str {
120        match self {
121            Operation::Insert(_) => "insert",
122            Operation::Get(_) => "get",
123            Operation::Delete(_) => "delete",
124            Operation::Head(_) => "head",
125        }
126    }
127}
128
129/// The response of a single executed streaming operation.
130///
131/// Each variant carries the fields needed to render a response part.
132/// The kind (`"insert"`, `"get"`, `"delete"`) is derivable via [`OpResponse::kind`].
133pub enum OpResponse {
134    /// An insert completed successfully.
135    Inserted {
136        /// The fully-qualified identifier assigned to the inserted object.
137        id: ObjectId,
138    },
139    /// A get completed.
140    Got {
141        /// The key that was looked up.
142        key: ObjectKey,
143        /// The object content, or `None` if the object was not found.
144        response: GetResponse,
145    },
146    /// A delete completed successfully.
147    Deleted {
148        /// The key that was deleted.
149        key: ObjectKey,
150    },
151    /// A head (metadata-only) check completed.
152    Head {
153        /// The key that was checked.
154        key: ObjectKey,
155        /// The metadata, or `None` if the object was not found.
156        metadata: Option<Metadata>,
157    },
158}
159
160impl OpResponse {
161    /// Returns the operation kind name.
162    pub fn kind(&self) -> &'static str {
163        match self {
164            OpResponse::Inserted { .. } => "insert",
165            OpResponse::Got { .. } => "get",
166            OpResponse::Deleted { .. } => "delete",
167            OpResponse::Head { .. } => "head",
168        }
169    }
170
171    /// Returns the object key for this response.
172    pub fn key(&self) -> &ObjectKey {
173        match self {
174            OpResponse::Inserted { id } => &id.key,
175            OpResponse::Got { key, .. } => key,
176            OpResponse::Deleted { key } => key,
177            OpResponse::Head { key, .. } => key,
178        }
179    }
180}
181
182impl std::fmt::Debug for OpResponse {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        match self {
185            OpResponse::Inserted { id } => f.debug_struct("Inserted").field("id", id).finish(),
186            OpResponse::Got {
187                key,
188                response: Some(_),
189            } => f
190                .debug_struct("Got")
191                .field("key", key)
192                .field("response", &format_args!("Some(<stream>)"))
193                .finish(),
194            OpResponse::Got {
195                key,
196                response: None,
197            } => f
198                .debug_struct("Got")
199                .field("key", key)
200                .field("response", &format_args!("None"))
201                .finish(),
202            OpResponse::Deleted { key } => f.debug_struct("Deleted").field("key", key).finish(),
203            OpResponse::Head { key, metadata } => f
204                .debug_struct("Head")
205                .field("key", key)
206                .field("metadata", &metadata.is_some())
207                .finish(),
208        }
209    }
210}
211
212/// Executes streaming operations with bounded concurrency.
213///
214/// Construct via [`StorageService::stream`](crate::service::StorageService::stream),
215/// which pre-acquires the concurrency window from the service's available permits.
216///
217/// See the [module documentation](self) for a full description of the window
218/// calculation, permit reservation, and concurrency model.
219#[derive(Debug)]
220pub struct StreamExecutor {
221    pub(crate) backend: Arc<dyn Backend>,
222    pub(crate) window: usize,
223    pub(crate) reservation: ConcurrencyPermit,
224}
225
226impl StreamExecutor {
227    /// Returns the concurrency window computed at construction.
228    pub fn window(&self) -> usize {
229        self.window
230    }
231
232    /// Executes the operations stream with bounded concurrency.
233    ///
234    /// Each item is a `(index, Result<Operation, E>)` tuple where `index` is the
235    /// 0-based position of the operation in the original request. Error items pass
236    /// through immediately; successful items are executed concurrently up to `window`
237    /// at a time, each in an isolated [`tokio::spawn`].
238    ///
239    /// Results are yielded in completion order (not submission order). The permit
240    /// reservation is held until every spawned task has completed — if the stream
241    /// is dropped early, in-flight tasks run to completion before the permits are
242    /// released.
243    pub fn execute<E>(
244        self,
245        context: ObjectContext,
246        operations: impl Stream<Item = (usize, Result<Operation, E>)> + Send + 'static,
247    ) -> impl Stream<Item = (usize, Result<OpResponse, E>)> + Send + 'static
248    where
249        E: From<Error> + Send + 'static,
250    {
251        let StreamExecutor {
252            backend,
253            window,
254            reservation,
255        } = self;
256
257        // Arc-wrap so each spawned task can hold a clone. Permits are released
258        // only when the last clone is dropped — i.e. after every spawned task
259        // completes, even if the output stream is dropped early.
260        let reservation = Arc::new(reservation);
261
262        operations
263            .map(move |(idx, item)| {
264                let permit = Arc::clone(&reservation);
265                let backend = Arc::clone(&backend);
266                let context = context.clone();
267                async move {
268                    let op = match item {
269                        Ok(op) => op,
270                        Err(e) => return (idx, Err(e)),
271                    };
272
273                    let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move {
274                        execute_operation(backend, context, op).await
275                    });
276
277                    (idx, spawn.await.map_err(E::from))
278                }
279            })
280            .buffer_unordered(window)
281    }
282}
283
284async fn execute_operation(
285    backend: Arc<dyn Backend>,
286    context: ObjectContext,
287    op: Operation,
288) -> Result<OpResponse> {
289    match op {
290        Operation::Get(get) => {
291            let id = ObjectId::new(context, get.key);
292            let response = backend.get_object(&id).await?;
293            Ok(OpResponse::Got {
294                key: id.key,
295                response,
296            })
297        }
298        Operation::Insert(insert) => {
299            let id = ObjectId::optional(context, insert.key);
300            let stream = crate::stream::single(insert.payload);
301            backend.put_object(&id, &insert.metadata, stream).await?;
302            Ok(OpResponse::Inserted { id })
303        }
304        Operation::Delete(delete) => {
305            let id = ObjectId::new(context, delete.key);
306            backend.delete_object(&id).await?;
307            Ok(OpResponse::Deleted { key: id.key })
308        }
309        Operation::Head(head) => {
310            let id = ObjectId::new(context, head.key);
311            let metadata = backend.get_metadata(&id).await?;
312            Ok(OpResponse::Head {
313                key: id.key,
314                metadata,
315            })
316        }
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use std::sync::Arc;
323    use std::sync::atomic::{AtomicUsize, Ordering};
324
325    use bytes::Bytes;
326    use futures_util::StreamExt;
327    use objectstore_types::metadata::Metadata;
328    use objectstore_types::scope::{Scope, Scopes};
329
330    use super::*;
331    use crate::backend::common::PutResponse;
332    use crate::backend::in_memory::InMemoryBackend;
333    use crate::backend::testing::{Hooks, TestBackend};
334    use crate::error::Error;
335    use crate::service::StorageService;
336    use crate::stream::{self, ClientStream};
337
338    fn make_context() -> ObjectContext {
339        ObjectContext {
340            usecase: "testing".into(),
341            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
342        }
343    }
344
345    fn make_service_with_limit(limit: usize) -> StorageService {
346        StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
347            .with_concurrency_limit(limit)
348    }
349
350    fn make_service() -> StorageService {
351        make_service_with_limit(500)
352    }
353
354    // Wraps a plain `Vec<Operation>` as an indexed `Ok`-stream for `execute`.
355    fn indexed_ok(
356        ops: Vec<Operation>,
357    ) -> impl futures_util::Stream<Item = (usize, Result<Operation, Error>)> {
358        futures_util::stream::iter(ops.into_iter().enumerate().map(|(i, op)| (i, Ok(op))))
359    }
360
361    // --- StreamExecutor window and capacity tests ---
362
363    #[test]
364    fn at_capacity_when_no_permits() {
365        let service = make_service_with_limit(0);
366        assert!(matches!(service.stream(), Err(Error::AtCapacity)));
367    }
368
369    #[test]
370    fn window_computation() {
371        // ceil(1 × 0.10) = 1
372        let s = make_service_with_limit(1);
373        assert_eq!(s.stream().unwrap().window(), 1);
374
375        // ceil(10 × 0.10) = 1
376        let s = make_service_with_limit(10);
377        assert_eq!(s.stream().unwrap().window(), 1);
378
379        // ceil(100 × 0.10) = 10
380        let s = make_service_with_limit(100);
381        assert_eq!(s.stream().unwrap().window(), 10);
382
383        // ceil(500 × 0.10) = 50
384        let s = make_service_with_limit(500);
385        assert_eq!(s.stream().unwrap().window(), 50);
386
387        // ceil(1000 × 0.10) = 100
388        let s = make_service_with_limit(1000);
389        assert_eq!(s.stream().unwrap().window(), 100);
390    }
391
392    // --- StreamExecutor::execute() correctness tests ---
393
394    #[tokio::test]
395    async fn execute_empty_stream() {
396        let service = make_service();
397        let executor = service.stream().unwrap();
398        let outcomes: Vec<_> = executor
399            .execute(
400                make_context(),
401                futures_util::stream::empty::<(usize, Result<Operation, Error>)>(),
402            )
403            .collect()
404            .await;
405        assert!(outcomes.is_empty());
406    }
407
408    #[tokio::test]
409    async fn execute_runs_all_operations() {
410        let service = make_service();
411        let context = make_context();
412
413        // Seed an object to retrieve and delete.
414        service
415            .insert_object(
416                context.clone(),
417                Some("key1".into()),
418                Metadata::default(),
419                stream::single("hello"),
420            )
421            .await
422            .unwrap();
423
424        let ops = vec![
425            Operation::Get(Get { key: "key1".into() }),
426            Operation::Get(Get {
427                key: "nonexistent".into(),
428            }),
429            Operation::Insert(Insert {
430                key: Some("key2".into()),
431                metadata: Metadata::default(),
432                payload: Bytes::from("world"),
433            }),
434            Operation::Delete(Delete { key: "key1".into() }),
435        ];
436
437        let executor = service.stream().unwrap();
438        let outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
439
440        assert_eq!(outcomes.len(), 4);
441
442        for (_, result) in &outcomes {
443            let response = result
444                .as_ref()
445                .unwrap_or_else(|e| panic!("unexpected error: {e:?}"));
446            assert!(
447                !response.key().as_str().is_empty(),
448                "response must have a non-empty key"
449            );
450        }
451    }
452
453    #[tokio::test]
454    async fn execute_head_operation() {
455        let service = make_service();
456        let context = make_context();
457
458        service
459            .insert_object(
460                context.clone(),
461                Some("exists".into()),
462                Metadata::default(),
463                stream::single("data"),
464            )
465            .await
466            .unwrap();
467
468        let ops = vec![
469            Operation::Head(Head {
470                key: "exists".into(),
471            }),
472            Operation::Head(Head {
473                key: "missing".into(),
474            }),
475        ];
476
477        let executor = service.stream().unwrap();
478        let mut outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
479        outcomes.sort_by_key(|(idx, _)| *idx);
480
481        assert_eq!(outcomes.len(), 2);
482
483        match &outcomes[0].1 {
484            Ok(OpResponse::Head {
485                key,
486                metadata: Some(_),
487            }) => assert_eq!(key.as_str(), "exists"),
488            other => panic!("expected Head with metadata, got: {other:?}"),
489        }
490
491        match &outcomes[1].1 {
492            Ok(OpResponse::Head {
493                key,
494                metadata: None,
495            }) => assert_eq!(key.as_str(), "missing"),
496            other => panic!("expected Head with None, got: {other:?}"),
497        }
498    }
499
500    // --- Service-level concurrent execution and capacity tests ---
501
502    struct GateOnPut {
503        paused_tx: tokio::sync::mpsc::Sender<()>,
504        resume: Arc<tokio::sync::Notify>,
505        in_flight: Arc<AtomicUsize>,
506    }
507
508    impl std::fmt::Debug for GateOnPut {
509        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510            f.debug_struct("GateOnPut").finish()
511        }
512    }
513
514    #[async_trait::async_trait]
515    impl Hooks for GateOnPut {
516        async fn put_object(
517            &self,
518            inner: &InMemoryBackend,
519            id: &ObjectId,
520            metadata: &Metadata,
521            stream: ClientStream,
522        ) -> Result<PutResponse> {
523            self.in_flight.fetch_add(1, Ordering::SeqCst);
524            let _ = self.paused_tx.send(()).await;
525            self.resume.notified().await;
526            let result = inner.put_object(id, metadata, stream).await;
527            self.in_flight.fetch_sub(1, Ordering::SeqCst);
528            result
529        }
530    }
531
532    #[tokio::test]
533    async fn concurrent_execution() {
534        // Window = ceil(100 × 0.10) = 10.
535        let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(20);
536        let resume = Arc::new(tokio::sync::Notify::new());
537        let in_flight = Arc::new(AtomicUsize::new(0));
538
539        let gated = TestBackend::new(GateOnPut {
540            paused_tx,
541            resume: Arc::clone(&resume),
542            in_flight: Arc::clone(&in_flight),
543        });
544        let service = StorageService::new(Box::new(gated)).with_concurrency_limit(100);
545
546        let executor = service.stream().unwrap();
547        assert_eq!(executor.window(), 10);
548
549        // Submit 10 inserts. With window=10, all should be in-flight simultaneously.
550        let ops: Vec<Operation> = (0..10)
551            .map(|i| {
552                Operation::Insert(Insert {
553                    key: Some(format!("key{i}")),
554                    metadata: Metadata::default(),
555                    payload: Bytes::from(format!("data{i}")),
556                })
557            })
558            .collect();
559
560        let exec_handle = tokio::spawn(async move {
561            executor
562                .execute(make_context(), indexed_ok(ops))
563                .collect::<Vec<_>>()
564                .await
565        });
566
567        // Wait for all 10 operations to pause inside the backend.
568        for _ in 0..10 {
569            paused_rx.recv().await.unwrap();
570        }
571        assert_eq!(in_flight.load(Ordering::SeqCst), 10);
572
573        // Release all.
574        resume.notify_waiters();
575
576        let outcomes = exec_handle.await.unwrap();
577        assert_eq!(outcomes.len(), 10);
578        for (_, result) in &outcomes {
579            assert!(
580                matches!(result, Ok(OpResponse::Inserted { .. })),
581                "unexpected result: {:?}",
582                result
583            );
584        }
585    }
586
587    #[tokio::test]
588    async fn batch_rejected_when_permits_exhausted() {
589        // Service with limit=1. One background insert holds the only permit.
590        // service.stream() must fail with AtCapacity.
591        let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(2);
592        let resume = Arc::new(tokio::sync::Notify::new());
593
594        let gated = TestBackend::new(GateOnPut {
595            paused_tx,
596            resume: Arc::clone(&resume),
597            in_flight: Arc::new(AtomicUsize::new(0)),
598        });
599        let service = StorageService::new(Box::new(gated)).with_concurrency_limit(1);
600
601        // Hold the only permit via a blocking insert.
602        let svc = service.clone();
603        tokio::spawn(async move {
604            let _ = svc
605                .insert_object(
606                    make_context(),
607                    Some("blocker".into()),
608                    Metadata::default(),
609                    stream::single("x"),
610                )
611                .await;
612        });
613        paused_rx.recv().await.unwrap();
614
615        // Permit is held — stream() must fail immediately with AtCapacity.
616        assert!(
617            matches!(service.stream(), Err(Error::AtCapacity)),
618            "expected AtCapacity when all permits are held"
619        );
620
621        resume.notify_waiters();
622    }
623}