objectstore_service/
streaming.rs

1//! Streaming operation types and concurrent executor.
2//!
3//! [`StreamExecutor`] processes a stream of `(idx, Result<`[`Operation`]`, E>)` tuples
4//! concurrently within a bounded window. Errors in the input stream pass through
5//! unchanged; successful operations are executed against `TieredStorage` directly,
6//! with [`tokio::spawn`] for panic isolation and run-to-completion guarantees.
7//!
8//! ## Window and Permit Reservation
9//!
10//! The concurrency window is derived from the service's available permits at the time
11//! [`StorageService::stream`](crate::service::StorageService::stream) is called: `ceil(tasks_available × 0.10)`.
12//! The executor pre-acquires exactly `window` permits from the service's
13//! `ConcurrencyLimiter` as a single bulk reservation. The reservation is shared
14//! (via `Arc`) with every spawned task, so permits are released only after every
15//! in-flight task completes — even if the output stream is dropped early.
16//!
17//! This means:
18//! - If the service is at capacity, [`StorageService::stream`](crate::service::StorageService::stream) fails immediately with
19//!   [`Error::AtCapacity`] before any operations are read.
20//! - During execution, operations call the storage backend directly without acquiring
21//!   additional per-operation permits.
22//!
23//! ## Concurrency Model
24//!
25//! [`StreamExecutor::execute`] uses `buffer_unordered` to drive up to `window`
26//! operations concurrently. The input stream is pulled lazily — at most `window`
27//! operations are in-flight at once, bounding memory to roughly
28//! `window × max_operation_size`. Results are yielded in completion order.
29//!
30//! Each operation is wrapped in a [`tokio::spawn`] for panic isolation: a panic in
31//! one operation surfaces as [`Error::Panic`] for that item and does not affect the
32//! others.
33//!
34//! ## Future Scope
35//!
36//! The window fraction (10%) is hard-coded. Configurable fractions, adaptive window
37//! sizing, and backend-level optimizations (e.g. BigTable multi-read, GCS batch API)
38//! are out of scope for the current implementation.
39
40use std::sync::Arc;
41
42use futures_util::{Stream, StreamExt};
43use objectstore_types::metadata::Metadata;
44
45use crate::PayloadStream;
46use crate::concurrency::ConcurrencyPermit;
47use crate::error::{Error, Result};
48use crate::id::{ObjectContext, ObjectId, ObjectKey};
49use crate::service::GetResponse;
50use crate::tiered::TieredStorage;
51
52/// An insert operation: stores an object at the given key.
53#[derive(Debug)]
54pub struct Insert {
55    /// The key to store the object under. When `None`, the service generates a key.
56    pub key: Option<ObjectKey>,
57    /// Metadata for the object.
58    pub metadata: Metadata,
59    /// The object payload. Batch inserts are fully buffered (≤1 MiB).
60    pub payload: bytes::Bytes,
61}
62
63/// A get operation: retrieves an existing object by key.
64#[derive(Debug)]
65pub struct Get {
66    /// The key of the object to retrieve.
67    pub key: ObjectKey,
68}
69
70/// A delete operation: removes an object by key.
71#[derive(Debug)]
72pub struct Delete {
73    /// The key of the object to delete.
74    pub key: ObjectKey,
75}
76
77/// A single streaming operation.
78#[derive(Debug)]
79pub enum Operation {
80    /// Insert a new object.
81    Insert(Insert),
82    /// Get an existing object.
83    Get(Get),
84    /// Delete an object.
85    Delete(Delete),
86}
87
88impl Operation {
89    /// Returns the key for this operation, if one was provided.
90    pub fn key(&self) -> Option<&ObjectKey> {
91        match self {
92            Operation::Insert(op) => op.key.as_ref(),
93            Operation::Get(op) => Some(&op.key),
94            Operation::Delete(op) => Some(&op.key),
95        }
96    }
97
98    /// Returns the permission required to perform this operation.
99    pub fn permission(&self) -> objectstore_types::auth::Permission {
100        match self {
101            Operation::Get(_) => objectstore_types::auth::Permission::ObjectRead,
102            Operation::Insert(_) => objectstore_types::auth::Permission::ObjectWrite,
103            Operation::Delete(_) => objectstore_types::auth::Permission::ObjectDelete,
104        }
105    }
106
107    /// Returns the kind name for this operation.
108    pub fn kind(&self) -> &'static str {
109        match self {
110            Operation::Insert(_) => "insert",
111            Operation::Get(_) => "get",
112            Operation::Delete(_) => "delete",
113        }
114    }
115}
116
117/// The response of a single executed streaming operation.
118///
119/// Each variant carries the fields needed to render a response part.
120/// The kind (`"insert"`, `"get"`, `"delete"`) is derivable via [`OpResponse::kind`].
121pub enum OpResponse {
122    /// An insert completed successfully.
123    Inserted {
124        /// The fully-qualified identifier assigned to the inserted object.
125        id: ObjectId,
126    },
127    /// A get completed.
128    Got {
129        /// The key that was looked up.
130        key: ObjectKey,
131        /// The object content, or `None` if the object was not found.
132        response: GetResponse,
133    },
134    /// A delete completed successfully.
135    Deleted {
136        /// The key that was deleted.
137        key: ObjectKey,
138    },
139}
140
141impl OpResponse {
142    /// Returns the operation kind name.
143    pub fn kind(&self) -> &'static str {
144        match self {
145            OpResponse::Inserted { .. } => "insert",
146            OpResponse::Got { .. } => "get",
147            OpResponse::Deleted { .. } => "delete",
148        }
149    }
150
151    /// Returns the object key for this response.
152    pub fn key(&self) -> &ObjectKey {
153        match self {
154            OpResponse::Inserted { id } => &id.key,
155            OpResponse::Got { key, .. } => key,
156            OpResponse::Deleted { key } => key,
157        }
158    }
159}
160
161impl std::fmt::Debug for OpResponse {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        match self {
164            OpResponse::Inserted { id } => f.debug_struct("Inserted").field("id", id).finish(),
165            OpResponse::Got {
166                key,
167                response: Some(_),
168            } => f
169                .debug_struct("Got")
170                .field("key", key)
171                .field("response", &format_args!("Some(<stream>)"))
172                .finish(),
173            OpResponse::Got {
174                key,
175                response: None,
176            } => f
177                .debug_struct("Got")
178                .field("key", key)
179                .field("response", &format_args!("None"))
180                .finish(),
181            OpResponse::Deleted { key } => f.debug_struct("Deleted").field("key", key).finish(),
182        }
183    }
184}
185
186/// Executes streaming operations with bounded concurrency.
187///
188/// Construct via [`StorageService::stream`](crate::service::StorageService::stream),
189/// which pre-acquires the concurrency window from the service's available permits.
190///
191/// See the [module documentation](self) for a full description of the window
192/// calculation, permit reservation, and concurrency model.
193#[derive(Debug)]
194pub struct StreamExecutor {
195    pub(crate) tiered: Arc<TieredStorage>,
196    pub(crate) window: usize,
197    pub(crate) reservation: ConcurrencyPermit,
198}
199
200impl StreamExecutor {
201    /// Returns the concurrency window computed at construction.
202    pub fn window(&self) -> usize {
203        self.window
204    }
205
206    /// Executes the operations stream with bounded concurrency.
207    ///
208    /// Each item is a `(index, Result<Operation, E>)` tuple where `index` is the
209    /// 0-based position of the operation in the original request. Error items pass
210    /// through immediately; successful items are executed concurrently up to `window`
211    /// at a time, each in an isolated [`tokio::spawn`].
212    ///
213    /// Results are yielded in completion order (not submission order). The permit
214    /// reservation is held until every spawned task has completed — if the stream
215    /// is dropped early, in-flight tasks run to completion before the permits are
216    /// released.
217    pub fn execute<E>(
218        self,
219        context: ObjectContext,
220        operations: impl Stream<Item = (usize, Result<Operation, E>)> + Send + 'static,
221    ) -> impl Stream<Item = (usize, Result<OpResponse, E>)> + Send + 'static
222    where
223        E: From<Error> + Send + 'static,
224    {
225        let StreamExecutor {
226            tiered,
227            window,
228            reservation,
229        } = self;
230
231        // Arc-wrap so each spawned task can hold a clone. Permits are released
232        // only when the last clone is dropped — i.e. after every spawned task
233        // completes, even if the output stream is dropped early.
234        let reservation = Arc::new(reservation);
235
236        operations
237            .map(move |(idx, item)| {
238                let permit = Arc::clone(&reservation);
239                let tiered = Arc::clone(&tiered);
240                let context = context.clone();
241                async move {
242                    let op = match item {
243                        Ok(op) => op,
244                        Err(e) => return (idx, Err(e)),
245                    };
246
247                    let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move {
248                        execute_operation(tiered, context, op).await
249                    });
250
251                    (idx, spawn.await.map_err(E::from))
252                }
253            })
254            .buffer_unordered(window)
255    }
256}
257
258async fn execute_operation(
259    tiered: Arc<TieredStorage>,
260    context: ObjectContext,
261    op: Operation,
262) -> Result<OpResponse> {
263    match op {
264        Operation::Get(get) => {
265            let id = ObjectId::new(context, get.key);
266            let response = tiered.get_object(&id).await?;
267            Ok(OpResponse::Got {
268                key: id.key,
269                response,
270            })
271        }
272        Operation::Insert(insert) => {
273            let stream: PayloadStream =
274                futures_util::stream::once(futures_util::future::ready(Ok(insert.payload))).boxed();
275            let id = tiered
276                .insert_object(context, insert.key, &insert.metadata, stream)
277                .await?;
278            Ok(OpResponse::Inserted { id })
279        }
280        Operation::Delete(delete) => {
281            let id = ObjectId::new(context, delete.key);
282            tiered.delete_object(&id).await?;
283            Ok(OpResponse::Deleted { key: id.key })
284        }
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use std::sync::Arc;
291    use std::sync::atomic::{AtomicUsize, Ordering};
292
293    use bytes::Bytes;
294    use futures_util::StreamExt;
295    use objectstore_types::metadata::Metadata;
296    use objectstore_types::scope::{Scope, Scopes};
297
298    use super::*;
299    use crate::backend::in_memory::InMemoryBackend;
300    use crate::error::Error;
301
302    fn make_context() -> ObjectContext {
303        ObjectContext {
304            usecase: "testing".into(),
305            scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
306        }
307    }
308
309    fn make_service_with_limit(limit: usize) -> crate::service::StorageService {
310        let hv = InMemoryBackend::new("in-memory-hv");
311        let lt = InMemoryBackend::new("in-memory-lt");
312        crate::service::StorageService::from_backends(Box::new(hv), Box::new(lt))
313            .with_concurrency_limit(limit)
314    }
315
316    fn make_service() -> crate::service::StorageService {
317        make_service_with_limit(500)
318    }
319
320    // Wraps a plain `Vec<Operation>` as an indexed `Ok`-stream for `execute`.
321    fn indexed_ok(
322        ops: Vec<Operation>,
323    ) -> impl futures_util::Stream<Item = (usize, Result<Operation, Error>)> {
324        futures_util::stream::iter(ops.into_iter().enumerate().map(|(i, op)| (i, Ok(op))))
325    }
326
327    // --- StreamExecutor window and capacity tests ---
328
329    #[test]
330    fn at_capacity_when_no_permits() {
331        let service = make_service_with_limit(0);
332        assert!(matches!(service.stream(), Err(Error::AtCapacity)));
333    }
334
335    #[test]
336    fn window_computation() {
337        // ceil(1 × 0.10) = 1
338        let s = make_service_with_limit(1);
339        assert_eq!(s.stream().unwrap().window(), 1);
340
341        // ceil(10 × 0.10) = 1
342        let s = make_service_with_limit(10);
343        assert_eq!(s.stream().unwrap().window(), 1);
344
345        // ceil(100 × 0.10) = 10
346        let s = make_service_with_limit(100);
347        assert_eq!(s.stream().unwrap().window(), 10);
348
349        // ceil(500 × 0.10) = 50
350        let s = make_service_with_limit(500);
351        assert_eq!(s.stream().unwrap().window(), 50);
352
353        // ceil(1000 × 0.10) = 100
354        let s = make_service_with_limit(1000);
355        assert_eq!(s.stream().unwrap().window(), 100);
356    }
357
358    // --- StreamExecutor::execute() correctness tests ---
359
360    #[tokio::test]
361    async fn execute_empty_stream() {
362        let service = make_service();
363        let executor = service.stream().unwrap();
364        let outcomes: Vec<_> = executor
365            .execute(
366                make_context(),
367                futures_util::stream::empty::<(usize, Result<Operation, Error>)>(),
368            )
369            .collect()
370            .await;
371        assert!(outcomes.is_empty());
372    }
373
374    #[tokio::test]
375    async fn execute_runs_all_operations() {
376        let service = make_service();
377        let context = make_context();
378
379        // Seed an object to retrieve and delete.
380        service
381            .insert_object(
382                context.clone(),
383                Some("key1".into()),
384                Metadata::default(),
385                futures_util::stream::once(async { Ok(Bytes::from("hello")) }).boxed(),
386            )
387            .await
388            .unwrap();
389
390        let ops = vec![
391            Operation::Get(Get { key: "key1".into() }),
392            Operation::Get(Get {
393                key: "nonexistent".into(),
394            }),
395            Operation::Insert(Insert {
396                key: Some("key2".into()),
397                metadata: Metadata::default(),
398                payload: Bytes::from("world"),
399            }),
400            Operation::Delete(Delete { key: "key1".into() }),
401        ];
402
403        let executor = service.stream().unwrap();
404        let outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
405
406        assert_eq!(outcomes.len(), 4);
407
408        for (_, result) in &outcomes {
409            let response = result
410                .as_ref()
411                .unwrap_or_else(|e| panic!("unexpected error: {e:?}"));
412            assert!(
413                !response.key().as_str().is_empty(),
414                "response must have a non-empty key"
415            );
416        }
417    }
418
419    // --- Service-level concurrent execution and capacity tests ---
420
421    /// A backend that pauses on `put_object` for testing concurrency.
422    ///
423    /// Signals via a channel, then waits for a shared `Notify` before completing.
424    struct GatedBackend {
425        inner: InMemoryBackend,
426        paused_tx: tokio::sync::mpsc::Sender<()>,
427        resume: Arc<tokio::sync::Notify>,
428        in_flight: Arc<AtomicUsize>,
429    }
430
431    impl std::fmt::Debug for GatedBackend {
432        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433            f.debug_struct("GatedBackend").finish()
434        }
435    }
436
437    #[async_trait::async_trait]
438    impl crate::backend::common::Backend for GatedBackend {
439        fn name(&self) -> &'static str {
440            self.inner.name()
441        }
442
443        async fn put_object(
444            &self,
445            id: &ObjectId,
446            metadata: &Metadata,
447            stream: crate::PayloadStream,
448        ) -> Result<()> {
449            self.in_flight.fetch_add(1, Ordering::SeqCst);
450            let _ = self.paused_tx.send(()).await;
451            self.resume.notified().await;
452            let result = self.inner.put_object(id, metadata, stream).await;
453            self.in_flight.fetch_sub(1, Ordering::SeqCst);
454            result
455        }
456
457        async fn get_object(
458            &self,
459            id: &ObjectId,
460        ) -> Result<Option<(Metadata, crate::PayloadStream)>> {
461            self.inner.get_object(id).await
462        }
463
464        async fn delete_object(&self, id: &ObjectId) -> Result<()> {
465            self.inner.delete_object(id).await
466        }
467    }
468
469    #[tokio::test]
470    async fn concurrent_execution() {
471        // Window = ceil(100 × 0.10) = 10.
472        let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(20);
473        let resume = Arc::new(tokio::sync::Notify::new());
474        let in_flight = Arc::new(AtomicUsize::new(0));
475
476        let gated = GatedBackend {
477            inner: InMemoryBackend::new("gated-hv"),
478            paused_tx,
479            resume: Arc::clone(&resume),
480            in_flight: Arc::clone(&in_flight),
481        };
482        let lt = InMemoryBackend::new("in-memory-lt");
483        let service = crate::service::StorageService::from_backends(Box::new(gated), Box::new(lt))
484            .with_concurrency_limit(100);
485
486        let executor = service.stream().unwrap();
487        assert_eq!(executor.window(), 10);
488
489        // Submit 10 inserts. With window=10, all should be in-flight simultaneously.
490        let ops: Vec<Operation> = (0..10)
491            .map(|i| {
492                Operation::Insert(Insert {
493                    key: Some(format!("key{i}")),
494                    metadata: Metadata::default(),
495                    payload: Bytes::from(format!("data{i}")),
496                })
497            })
498            .collect();
499
500        let exec_handle = tokio::spawn(async move {
501            executor
502                .execute(make_context(), indexed_ok(ops))
503                .collect::<Vec<_>>()
504                .await
505        });
506
507        // Wait for all 10 operations to pause inside the backend.
508        for _ in 0..10 {
509            paused_rx.recv().await.unwrap();
510        }
511        assert_eq!(in_flight.load(Ordering::SeqCst), 10);
512
513        // Release all.
514        resume.notify_waiters();
515
516        let outcomes = exec_handle.await.unwrap();
517        assert_eq!(outcomes.len(), 10);
518        for (_, result) in &outcomes {
519            assert!(
520                matches!(result, Ok(OpResponse::Inserted { .. })),
521                "unexpected result: {:?}",
522                result
523            );
524        }
525    }
526
527    #[tokio::test]
528    async fn batch_rejected_when_permits_exhausted() {
529        // Service with limit=1. One background insert holds the only permit.
530        // service.stream() must fail with AtCapacity.
531        let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(2);
532        let resume = Arc::new(tokio::sync::Notify::new());
533
534        let gated = GatedBackend {
535            inner: InMemoryBackend::new("gated-cap"),
536            paused_tx,
537            resume: Arc::clone(&resume),
538            in_flight: Arc::new(AtomicUsize::new(0)),
539        };
540        let lt = InMemoryBackend::new("in-memory-lt");
541        let service = crate::service::StorageService::from_backends(Box::new(gated), Box::new(lt))
542            .with_concurrency_limit(1);
543
544        // Hold the only permit via a blocking insert.
545        let svc = service.clone();
546        tokio::spawn(async move {
547            let _ = svc
548                .insert_object(
549                    make_context(),
550                    Some("blocker".into()),
551                    Metadata::default(),
552                    futures_util::stream::once(async { Ok(Bytes::from("x")) }).boxed(),
553                )
554                .await;
555        });
556        paused_rx.recv().await.unwrap();
557
558        // Permit is held — stream() must fail immediately with AtCapacity.
559        assert!(
560            matches!(service.stream(), Err(Error::AtCapacity)),
561            "expected AtCapacity when all permits are held"
562        );
563
564        resume.notify_waiters();
565    }
566}