1use std::sync::Arc;
41
42use futures_util::{Stream, StreamExt};
43use objectstore_types::metadata::Metadata;
44
45use crate::PayloadStream;
46use crate::concurrency::ConcurrencyPermit;
47use crate::error::{Error, Result};
48use crate::id::{ObjectContext, ObjectId, ObjectKey};
49use crate::service::GetResponse;
50use crate::tiered::TieredStorage;
51
52#[derive(Debug)]
54pub struct Insert {
55 pub key: Option<ObjectKey>,
57 pub metadata: Metadata,
59 pub payload: bytes::Bytes,
61}
62
63#[derive(Debug)]
65pub struct Get {
66 pub key: ObjectKey,
68}
69
70#[derive(Debug)]
72pub struct Delete {
73 pub key: ObjectKey,
75}
76
77#[derive(Debug)]
79pub enum Operation {
80 Insert(Insert),
82 Get(Get),
84 Delete(Delete),
86}
87
88impl Operation {
89 pub fn key(&self) -> Option<&ObjectKey> {
91 match self {
92 Operation::Insert(op) => op.key.as_ref(),
93 Operation::Get(op) => Some(&op.key),
94 Operation::Delete(op) => Some(&op.key),
95 }
96 }
97
98 pub fn permission(&self) -> objectstore_types::auth::Permission {
100 match self {
101 Operation::Get(_) => objectstore_types::auth::Permission::ObjectRead,
102 Operation::Insert(_) => objectstore_types::auth::Permission::ObjectWrite,
103 Operation::Delete(_) => objectstore_types::auth::Permission::ObjectDelete,
104 }
105 }
106
107 pub fn kind(&self) -> &'static str {
109 match self {
110 Operation::Insert(_) => "insert",
111 Operation::Get(_) => "get",
112 Operation::Delete(_) => "delete",
113 }
114 }
115}
116
117pub enum OpResponse {
122 Inserted {
124 id: ObjectId,
126 },
127 Got {
129 key: ObjectKey,
131 response: GetResponse,
133 },
134 Deleted {
136 key: ObjectKey,
138 },
139}
140
141impl OpResponse {
142 pub fn kind(&self) -> &'static str {
144 match self {
145 OpResponse::Inserted { .. } => "insert",
146 OpResponse::Got { .. } => "get",
147 OpResponse::Deleted { .. } => "delete",
148 }
149 }
150
151 pub fn key(&self) -> &ObjectKey {
153 match self {
154 OpResponse::Inserted { id } => &id.key,
155 OpResponse::Got { key, .. } => key,
156 OpResponse::Deleted { key } => key,
157 }
158 }
159}
160
161impl std::fmt::Debug for OpResponse {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 match self {
164 OpResponse::Inserted { id } => f.debug_struct("Inserted").field("id", id).finish(),
165 OpResponse::Got {
166 key,
167 response: Some(_),
168 } => f
169 .debug_struct("Got")
170 .field("key", key)
171 .field("response", &format_args!("Some(<stream>)"))
172 .finish(),
173 OpResponse::Got {
174 key,
175 response: None,
176 } => f
177 .debug_struct("Got")
178 .field("key", key)
179 .field("response", &format_args!("None"))
180 .finish(),
181 OpResponse::Deleted { key } => f.debug_struct("Deleted").field("key", key).finish(),
182 }
183 }
184}
185
186#[derive(Debug)]
194pub struct StreamExecutor {
195 pub(crate) tiered: Arc<TieredStorage>,
196 pub(crate) window: usize,
197 pub(crate) reservation: ConcurrencyPermit,
198}
199
200impl StreamExecutor {
201 pub fn window(&self) -> usize {
203 self.window
204 }
205
206 pub fn execute<E>(
218 self,
219 context: ObjectContext,
220 operations: impl Stream<Item = (usize, Result<Operation, E>)> + Send + 'static,
221 ) -> impl Stream<Item = (usize, Result<OpResponse, E>)> + Send + 'static
222 where
223 E: From<Error> + Send + 'static,
224 {
225 let StreamExecutor {
226 tiered,
227 window,
228 reservation,
229 } = self;
230
231 let reservation = Arc::new(reservation);
235
236 operations
237 .map(move |(idx, item)| {
238 let permit = Arc::clone(&reservation);
239 let tiered = Arc::clone(&tiered);
240 let context = context.clone();
241 async move {
242 let op = match item {
243 Ok(op) => op,
244 Err(e) => return (idx, Err(e)),
245 };
246
247 let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move {
248 execute_operation(tiered, context, op).await
249 });
250
251 (idx, spawn.await.map_err(E::from))
252 }
253 })
254 .buffer_unordered(window)
255 }
256}
257
258async fn execute_operation(
259 tiered: Arc<TieredStorage>,
260 context: ObjectContext,
261 op: Operation,
262) -> Result<OpResponse> {
263 match op {
264 Operation::Get(get) => {
265 let id = ObjectId::new(context, get.key);
266 let response = tiered.get_object(&id).await?;
267 Ok(OpResponse::Got {
268 key: id.key,
269 response,
270 })
271 }
272 Operation::Insert(insert) => {
273 let stream: PayloadStream =
274 futures_util::stream::once(futures_util::future::ready(Ok(insert.payload))).boxed();
275 let id = tiered
276 .insert_object(context, insert.key, &insert.metadata, stream)
277 .await?;
278 Ok(OpResponse::Inserted { id })
279 }
280 Operation::Delete(delete) => {
281 let id = ObjectId::new(context, delete.key);
282 tiered.delete_object(&id).await?;
283 Ok(OpResponse::Deleted { key: id.key })
284 }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use std::sync::Arc;
291 use std::sync::atomic::{AtomicUsize, Ordering};
292
293 use bytes::Bytes;
294 use futures_util::StreamExt;
295 use objectstore_types::metadata::Metadata;
296 use objectstore_types::scope::{Scope, Scopes};
297
298 use super::*;
299 use crate::backend::in_memory::InMemoryBackend;
300 use crate::error::Error;
301
302 fn make_context() -> ObjectContext {
303 ObjectContext {
304 usecase: "testing".into(),
305 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
306 }
307 }
308
309 fn make_service_with_limit(limit: usize) -> crate::service::StorageService {
310 let hv = InMemoryBackend::new("in-memory-hv");
311 let lt = InMemoryBackend::new("in-memory-lt");
312 crate::service::StorageService::from_backends(Box::new(hv), Box::new(lt))
313 .with_concurrency_limit(limit)
314 }
315
316 fn make_service() -> crate::service::StorageService {
317 make_service_with_limit(500)
318 }
319
320 fn indexed_ok(
322 ops: Vec<Operation>,
323 ) -> impl futures_util::Stream<Item = (usize, Result<Operation, Error>)> {
324 futures_util::stream::iter(ops.into_iter().enumerate().map(|(i, op)| (i, Ok(op))))
325 }
326
327 #[test]
330 fn at_capacity_when_no_permits() {
331 let service = make_service_with_limit(0);
332 assert!(matches!(service.stream(), Err(Error::AtCapacity)));
333 }
334
335 #[test]
336 fn window_computation() {
337 let s = make_service_with_limit(1);
339 assert_eq!(s.stream().unwrap().window(), 1);
340
341 let s = make_service_with_limit(10);
343 assert_eq!(s.stream().unwrap().window(), 1);
344
345 let s = make_service_with_limit(100);
347 assert_eq!(s.stream().unwrap().window(), 10);
348
349 let s = make_service_with_limit(500);
351 assert_eq!(s.stream().unwrap().window(), 50);
352
353 let s = make_service_with_limit(1000);
355 assert_eq!(s.stream().unwrap().window(), 100);
356 }
357
358 #[tokio::test]
361 async fn execute_empty_stream() {
362 let service = make_service();
363 let executor = service.stream().unwrap();
364 let outcomes: Vec<_> = executor
365 .execute(
366 make_context(),
367 futures_util::stream::empty::<(usize, Result<Operation, Error>)>(),
368 )
369 .collect()
370 .await;
371 assert!(outcomes.is_empty());
372 }
373
374 #[tokio::test]
375 async fn execute_runs_all_operations() {
376 let service = make_service();
377 let context = make_context();
378
379 service
381 .insert_object(
382 context.clone(),
383 Some("key1".into()),
384 Metadata::default(),
385 futures_util::stream::once(async { Ok(Bytes::from("hello")) }).boxed(),
386 )
387 .await
388 .unwrap();
389
390 let ops = vec![
391 Operation::Get(Get { key: "key1".into() }),
392 Operation::Get(Get {
393 key: "nonexistent".into(),
394 }),
395 Operation::Insert(Insert {
396 key: Some("key2".into()),
397 metadata: Metadata::default(),
398 payload: Bytes::from("world"),
399 }),
400 Operation::Delete(Delete { key: "key1".into() }),
401 ];
402
403 let executor = service.stream().unwrap();
404 let outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
405
406 assert_eq!(outcomes.len(), 4);
407
408 for (_, result) in &outcomes {
409 let response = result
410 .as_ref()
411 .unwrap_or_else(|e| panic!("unexpected error: {e:?}"));
412 assert!(
413 !response.key().as_str().is_empty(),
414 "response must have a non-empty key"
415 );
416 }
417 }
418
419 struct GatedBackend {
425 inner: InMemoryBackend,
426 paused_tx: tokio::sync::mpsc::Sender<()>,
427 resume: Arc<tokio::sync::Notify>,
428 in_flight: Arc<AtomicUsize>,
429 }
430
431 impl std::fmt::Debug for GatedBackend {
432 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433 f.debug_struct("GatedBackend").finish()
434 }
435 }
436
437 #[async_trait::async_trait]
438 impl crate::backend::common::Backend for GatedBackend {
439 fn name(&self) -> &'static str {
440 self.inner.name()
441 }
442
443 async fn put_object(
444 &self,
445 id: &ObjectId,
446 metadata: &Metadata,
447 stream: crate::PayloadStream,
448 ) -> Result<()> {
449 self.in_flight.fetch_add(1, Ordering::SeqCst);
450 let _ = self.paused_tx.send(()).await;
451 self.resume.notified().await;
452 let result = self.inner.put_object(id, metadata, stream).await;
453 self.in_flight.fetch_sub(1, Ordering::SeqCst);
454 result
455 }
456
457 async fn get_object(
458 &self,
459 id: &ObjectId,
460 ) -> Result<Option<(Metadata, crate::PayloadStream)>> {
461 self.inner.get_object(id).await
462 }
463
464 async fn delete_object(&self, id: &ObjectId) -> Result<()> {
465 self.inner.delete_object(id).await
466 }
467 }
468
469 #[tokio::test]
470 async fn concurrent_execution() {
471 let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(20);
473 let resume = Arc::new(tokio::sync::Notify::new());
474 let in_flight = Arc::new(AtomicUsize::new(0));
475
476 let gated = GatedBackend {
477 inner: InMemoryBackend::new("gated-hv"),
478 paused_tx,
479 resume: Arc::clone(&resume),
480 in_flight: Arc::clone(&in_flight),
481 };
482 let lt = InMemoryBackend::new("in-memory-lt");
483 let service = crate::service::StorageService::from_backends(Box::new(gated), Box::new(lt))
484 .with_concurrency_limit(100);
485
486 let executor = service.stream().unwrap();
487 assert_eq!(executor.window(), 10);
488
489 let ops: Vec<Operation> = (0..10)
491 .map(|i| {
492 Operation::Insert(Insert {
493 key: Some(format!("key{i}")),
494 metadata: Metadata::default(),
495 payload: Bytes::from(format!("data{i}")),
496 })
497 })
498 .collect();
499
500 let exec_handle = tokio::spawn(async move {
501 executor
502 .execute(make_context(), indexed_ok(ops))
503 .collect::<Vec<_>>()
504 .await
505 });
506
507 for _ in 0..10 {
509 paused_rx.recv().await.unwrap();
510 }
511 assert_eq!(in_flight.load(Ordering::SeqCst), 10);
512
513 resume.notify_waiters();
515
516 let outcomes = exec_handle.await.unwrap();
517 assert_eq!(outcomes.len(), 10);
518 for (_, result) in &outcomes {
519 assert!(
520 matches!(result, Ok(OpResponse::Inserted { .. })),
521 "unexpected result: {:?}",
522 result
523 );
524 }
525 }
526
527 #[tokio::test]
528 async fn batch_rejected_when_permits_exhausted() {
529 let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(2);
532 let resume = Arc::new(tokio::sync::Notify::new());
533
534 let gated = GatedBackend {
535 inner: InMemoryBackend::new("gated-cap"),
536 paused_tx,
537 resume: Arc::clone(&resume),
538 in_flight: Arc::new(AtomicUsize::new(0)),
539 };
540 let lt = InMemoryBackend::new("in-memory-lt");
541 let service = crate::service::StorageService::from_backends(Box::new(gated), Box::new(lt))
542 .with_concurrency_limit(1);
543
544 let svc = service.clone();
546 tokio::spawn(async move {
547 let _ = svc
548 .insert_object(
549 make_context(),
550 Some("blocker".into()),
551 Metadata::default(),
552 futures_util::stream::once(async { Ok(Bytes::from("x")) }).boxed(),
553 )
554 .await;
555 });
556 paused_rx.recv().await.unwrap();
557
558 assert!(
560 matches!(service.stream(), Err(Error::AtCapacity)),
561 "expected AtCapacity when all permits are held"
562 );
563
564 resume.notify_waiters();
565 }
566}