1use std::sync::Arc;
41
42use futures_util::{Stream, StreamExt};
43use objectstore_types::metadata::Metadata;
44
45use crate::backend::common::Backend;
46use crate::concurrency::ConcurrencyPermit;
47use crate::error::{Error, Result};
48use crate::id::{ObjectContext, ObjectId, ObjectKey};
49use crate::service::GetResponse;
50
51#[derive(Debug)]
53pub struct Insert {
54 pub key: Option<ObjectKey>,
56 pub metadata: Metadata,
58 pub payload: bytes::Bytes,
60}
61
62#[derive(Debug)]
64pub struct Get {
65 pub key: ObjectKey,
67}
68
69#[derive(Debug)]
71pub struct Delete {
72 pub key: ObjectKey,
74}
75
76#[derive(Debug)]
78pub struct Head {
79 pub key: ObjectKey,
81}
82
83#[derive(Debug)]
85pub enum Operation {
86 Insert(Insert),
88 Get(Get),
90 Delete(Delete),
92 Head(Head),
94}
95
96impl Operation {
97 pub fn key(&self) -> Option<&ObjectKey> {
99 match self {
100 Operation::Insert(op) => op.key.as_ref(),
101 Operation::Get(op) => Some(&op.key),
102 Operation::Delete(op) => Some(&op.key),
103 Operation::Head(op) => Some(&op.key),
104 }
105 }
106
107 pub fn permission(&self) -> objectstore_types::auth::Permission {
109 match self {
110 Operation::Get(_) | Operation::Head(_) => {
111 objectstore_types::auth::Permission::ObjectRead
112 }
113 Operation::Insert(_) => objectstore_types::auth::Permission::ObjectWrite,
114 Operation::Delete(_) => objectstore_types::auth::Permission::ObjectDelete,
115 }
116 }
117
118 pub fn kind(&self) -> &'static str {
120 match self {
121 Operation::Insert(_) => "insert",
122 Operation::Get(_) => "get",
123 Operation::Delete(_) => "delete",
124 Operation::Head(_) => "head",
125 }
126 }
127}
128
129pub enum OpResponse {
134 Inserted {
136 id: ObjectId,
138 },
139 Got {
141 key: ObjectKey,
143 response: GetResponse,
145 },
146 Deleted {
148 key: ObjectKey,
150 },
151 Head {
153 key: ObjectKey,
155 metadata: Option<Metadata>,
157 },
158}
159
160impl OpResponse {
161 pub fn kind(&self) -> &'static str {
163 match self {
164 OpResponse::Inserted { .. } => "insert",
165 OpResponse::Got { .. } => "get",
166 OpResponse::Deleted { .. } => "delete",
167 OpResponse::Head { .. } => "head",
168 }
169 }
170
171 pub fn key(&self) -> &ObjectKey {
173 match self {
174 OpResponse::Inserted { id } => &id.key,
175 OpResponse::Got { key, .. } => key,
176 OpResponse::Deleted { key } => key,
177 OpResponse::Head { key, .. } => key,
178 }
179 }
180}
181
182impl std::fmt::Debug for OpResponse {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 match self {
185 OpResponse::Inserted { id } => f.debug_struct("Inserted").field("id", id).finish(),
186 OpResponse::Got {
187 key,
188 response: Some(_),
189 } => f
190 .debug_struct("Got")
191 .field("key", key)
192 .field("response", &format_args!("Some(<stream>)"))
193 .finish(),
194 OpResponse::Got {
195 key,
196 response: None,
197 } => f
198 .debug_struct("Got")
199 .field("key", key)
200 .field("response", &format_args!("None"))
201 .finish(),
202 OpResponse::Deleted { key } => f.debug_struct("Deleted").field("key", key).finish(),
203 OpResponse::Head { key, metadata } => f
204 .debug_struct("Head")
205 .field("key", key)
206 .field("metadata", &metadata.is_some())
207 .finish(),
208 }
209 }
210}
211
212#[derive(Debug)]
220pub struct StreamExecutor {
221 pub(crate) backend: Arc<dyn Backend>,
222 pub(crate) window: usize,
223 pub(crate) reservation: ConcurrencyPermit,
224}
225
226impl StreamExecutor {
227 pub fn window(&self) -> usize {
229 self.window
230 }
231
232 pub fn execute<E>(
244 self,
245 context: ObjectContext,
246 operations: impl Stream<Item = (usize, Result<Operation, E>)> + Send + 'static,
247 ) -> impl Stream<Item = (usize, Result<OpResponse, E>)> + Send + 'static
248 where
249 E: From<Error> + Send + 'static,
250 {
251 let StreamExecutor {
252 backend,
253 window,
254 reservation,
255 } = self;
256
257 let reservation = Arc::new(reservation);
261
262 operations
263 .map(move |(idx, item)| {
264 let permit = Arc::clone(&reservation);
265 let backend = Arc::clone(&backend);
266 let context = context.clone();
267 async move {
268 let op = match item {
269 Ok(op) => op,
270 Err(e) => return (idx, Err(e)),
271 };
272
273 let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move {
274 execute_operation(backend, context, op).await
275 });
276
277 (idx, spawn.await.map_err(E::from))
278 }
279 })
280 .buffer_unordered(window)
281 }
282}
283
284async fn execute_operation(
285 backend: Arc<dyn Backend>,
286 context: ObjectContext,
287 op: Operation,
288) -> Result<OpResponse> {
289 match op {
290 Operation::Get(get) => {
291 let id = ObjectId::new(context, get.key);
292 let response = backend.get_object(&id).await?;
293 Ok(OpResponse::Got {
294 key: id.key,
295 response,
296 })
297 }
298 Operation::Insert(insert) => {
299 let id = ObjectId::optional(context, insert.key);
300 let stream = crate::stream::single(insert.payload);
301 backend.put_object(&id, &insert.metadata, stream).await?;
302 Ok(OpResponse::Inserted { id })
303 }
304 Operation::Delete(delete) => {
305 let id = ObjectId::new(context, delete.key);
306 backend.delete_object(&id).await?;
307 Ok(OpResponse::Deleted { key: id.key })
308 }
309 Operation::Head(head) => {
310 let id = ObjectId::new(context, head.key);
311 let metadata = backend.get_metadata(&id).await?;
312 Ok(OpResponse::Head {
313 key: id.key,
314 metadata,
315 })
316 }
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use std::sync::Arc;
323 use std::sync::atomic::{AtomicUsize, Ordering};
324
325 use bytes::Bytes;
326 use futures_util::StreamExt;
327 use objectstore_types::metadata::Metadata;
328 use objectstore_types::scope::{Scope, Scopes};
329
330 use super::*;
331 use crate::backend::common::PutResponse;
332 use crate::backend::in_memory::InMemoryBackend;
333 use crate::backend::testing::{Hooks, TestBackend};
334 use crate::error::Error;
335 use crate::service::StorageService;
336 use crate::stream::{self, ClientStream};
337
338 fn make_context() -> ObjectContext {
339 ObjectContext {
340 usecase: "testing".into(),
341 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
342 }
343 }
344
345 fn make_service_with_limit(limit: usize) -> StorageService {
346 StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
347 .with_concurrency_limit(limit)
348 }
349
350 fn make_service() -> StorageService {
351 make_service_with_limit(500)
352 }
353
354 fn indexed_ok(
356 ops: Vec<Operation>,
357 ) -> impl futures_util::Stream<Item = (usize, Result<Operation, Error>)> {
358 futures_util::stream::iter(ops.into_iter().enumerate().map(|(i, op)| (i, Ok(op))))
359 }
360
361 #[test]
364 fn at_capacity_when_no_permits() {
365 let service = make_service_with_limit(0);
366 assert!(matches!(service.stream(), Err(Error::AtCapacity)));
367 }
368
369 #[test]
370 fn window_computation() {
371 let s = make_service_with_limit(1);
373 assert_eq!(s.stream().unwrap().window(), 1);
374
375 let s = make_service_with_limit(10);
377 assert_eq!(s.stream().unwrap().window(), 1);
378
379 let s = make_service_with_limit(100);
381 assert_eq!(s.stream().unwrap().window(), 10);
382
383 let s = make_service_with_limit(500);
385 assert_eq!(s.stream().unwrap().window(), 50);
386
387 let s = make_service_with_limit(1000);
389 assert_eq!(s.stream().unwrap().window(), 100);
390 }
391
392 #[tokio::test]
395 async fn execute_empty_stream() {
396 let service = make_service();
397 let executor = service.stream().unwrap();
398 let outcomes: Vec<_> = executor
399 .execute(
400 make_context(),
401 futures_util::stream::empty::<(usize, Result<Operation, Error>)>(),
402 )
403 .collect()
404 .await;
405 assert!(outcomes.is_empty());
406 }
407
408 #[tokio::test]
409 async fn execute_runs_all_operations() {
410 let service = make_service();
411 let context = make_context();
412
413 service
415 .insert_object(
416 context.clone(),
417 Some("key1".into()),
418 Metadata::default(),
419 stream::single("hello"),
420 )
421 .await
422 .unwrap();
423
424 let ops = vec![
425 Operation::Get(Get { key: "key1".into() }),
426 Operation::Get(Get {
427 key: "nonexistent".into(),
428 }),
429 Operation::Insert(Insert {
430 key: Some("key2".into()),
431 metadata: Metadata::default(),
432 payload: Bytes::from("world"),
433 }),
434 Operation::Delete(Delete { key: "key1".into() }),
435 ];
436
437 let executor = service.stream().unwrap();
438 let outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
439
440 assert_eq!(outcomes.len(), 4);
441
442 for (_, result) in &outcomes {
443 let response = result
444 .as_ref()
445 .unwrap_or_else(|e| panic!("unexpected error: {e:?}"));
446 assert!(
447 !response.key().as_str().is_empty(),
448 "response must have a non-empty key"
449 );
450 }
451 }
452
453 #[tokio::test]
454 async fn execute_head_operation() {
455 let service = make_service();
456 let context = make_context();
457
458 service
459 .insert_object(
460 context.clone(),
461 Some("exists".into()),
462 Metadata::default(),
463 stream::single("data"),
464 )
465 .await
466 .unwrap();
467
468 let ops = vec![
469 Operation::Head(Head {
470 key: "exists".into(),
471 }),
472 Operation::Head(Head {
473 key: "missing".into(),
474 }),
475 ];
476
477 let executor = service.stream().unwrap();
478 let mut outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
479 outcomes.sort_by_key(|(idx, _)| *idx);
480
481 assert_eq!(outcomes.len(), 2);
482
483 match &outcomes[0].1 {
484 Ok(OpResponse::Head {
485 key,
486 metadata: Some(_),
487 }) => assert_eq!(key.as_str(), "exists"),
488 other => panic!("expected Head with metadata, got: {other:?}"),
489 }
490
491 match &outcomes[1].1 {
492 Ok(OpResponse::Head {
493 key,
494 metadata: None,
495 }) => assert_eq!(key.as_str(), "missing"),
496 other => panic!("expected Head with None, got: {other:?}"),
497 }
498 }
499
500 struct GateOnPut {
503 paused_tx: tokio::sync::mpsc::Sender<()>,
504 resume: Arc<tokio::sync::Notify>,
505 in_flight: Arc<AtomicUsize>,
506 }
507
508 impl std::fmt::Debug for GateOnPut {
509 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510 f.debug_struct("GateOnPut").finish()
511 }
512 }
513
514 #[async_trait::async_trait]
515 impl Hooks for GateOnPut {
516 async fn put_object(
517 &self,
518 inner: &InMemoryBackend,
519 id: &ObjectId,
520 metadata: &Metadata,
521 stream: ClientStream,
522 ) -> Result<PutResponse> {
523 self.in_flight.fetch_add(1, Ordering::SeqCst);
524 let _ = self.paused_tx.send(()).await;
525 self.resume.notified().await;
526 let result = inner.put_object(id, metadata, stream).await;
527 self.in_flight.fetch_sub(1, Ordering::SeqCst);
528 result
529 }
530 }
531
532 #[tokio::test]
533 async fn concurrent_execution() {
534 let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(20);
536 let resume = Arc::new(tokio::sync::Notify::new());
537 let in_flight = Arc::new(AtomicUsize::new(0));
538
539 let gated = TestBackend::new(GateOnPut {
540 paused_tx,
541 resume: Arc::clone(&resume),
542 in_flight: Arc::clone(&in_flight),
543 });
544 let service = StorageService::new(Box::new(gated)).with_concurrency_limit(100);
545
546 let executor = service.stream().unwrap();
547 assert_eq!(executor.window(), 10);
548
549 let ops: Vec<Operation> = (0..10)
551 .map(|i| {
552 Operation::Insert(Insert {
553 key: Some(format!("key{i}")),
554 metadata: Metadata::default(),
555 payload: Bytes::from(format!("data{i}")),
556 })
557 })
558 .collect();
559
560 let exec_handle = tokio::spawn(async move {
561 executor
562 .execute(make_context(), indexed_ok(ops))
563 .collect::<Vec<_>>()
564 .await
565 });
566
567 for _ in 0..10 {
569 paused_rx.recv().await.unwrap();
570 }
571 assert_eq!(in_flight.load(Ordering::SeqCst), 10);
572
573 resume.notify_waiters();
575
576 let outcomes = exec_handle.await.unwrap();
577 assert_eq!(outcomes.len(), 10);
578 for (_, result) in &outcomes {
579 assert!(
580 matches!(result, Ok(OpResponse::Inserted { .. })),
581 "unexpected result: {:?}",
582 result
583 );
584 }
585 }
586
587 #[tokio::test]
588 async fn batch_rejected_when_permits_exhausted() {
589 let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(2);
592 let resume = Arc::new(tokio::sync::Notify::new());
593
594 let gated = TestBackend::new(GateOnPut {
595 paused_tx,
596 resume: Arc::clone(&resume),
597 in_flight: Arc::new(AtomicUsize::new(0)),
598 });
599 let service = StorageService::new(Box::new(gated)).with_concurrency_limit(1);
600
601 let svc = service.clone();
603 tokio::spawn(async move {
604 let _ = svc
605 .insert_object(
606 make_context(),
607 Some("blocker".into()),
608 Metadata::default(),
609 stream::single("x"),
610 )
611 .await;
612 });
613 paused_rx.recv().await.unwrap();
614
615 assert!(
617 matches!(service.stream(), Err(Error::AtCapacity)),
618 "expected AtCapacity when all permits are held"
619 );
620
621 resume.notify_waiters();
622 }
623}