1use std::sync::Arc;
41
42use futures_util::{Stream, StreamExt};
43use objectstore_types::metadata::Metadata;
44
45use crate::backend::common::Backend;
46use crate::concurrency::ConcurrencyPermit;
47use crate::error::{Error, Result};
48use crate::id::{ObjectContext, ObjectId, ObjectKey};
49use crate::service::GetResponse;
50
51#[derive(Debug)]
53pub struct Insert {
54 pub key: Option<ObjectKey>,
56 pub metadata: Metadata,
58 pub payload: bytes::Bytes,
60}
61
62#[derive(Debug)]
64pub struct Get {
65 pub key: ObjectKey,
67}
68
69#[derive(Debug)]
71pub struct Delete {
72 pub key: ObjectKey,
74}
75
76#[derive(Debug)]
78pub struct Head {
79 pub key: ObjectKey,
81}
82
83#[derive(Debug)]
85pub enum Operation {
86 Insert(Insert),
88 Get(Get),
90 Delete(Delete),
92 Head(Head),
94}
95
96impl Operation {
97 pub fn key(&self) -> Option<&ObjectKey> {
99 match self {
100 Operation::Insert(op) => op.key.as_ref(),
101 Operation::Get(op) => Some(&op.key),
102 Operation::Delete(op) => Some(&op.key),
103 Operation::Head(op) => Some(&op.key),
104 }
105 }
106
107 pub fn permission(&self) -> objectstore_types::auth::Permission {
109 match self {
110 Operation::Get(_) | Operation::Head(_) => {
111 objectstore_types::auth::Permission::ObjectRead
112 }
113 Operation::Insert(_) => objectstore_types::auth::Permission::ObjectWrite,
114 Operation::Delete(_) => objectstore_types::auth::Permission::ObjectDelete,
115 }
116 }
117
118 pub fn kind(&self) -> &'static str {
120 match self {
121 Operation::Insert(_) => "insert",
122 Operation::Get(_) => "get",
123 Operation::Delete(_) => "delete",
124 Operation::Head(_) => "head",
125 }
126 }
127}
128
129pub enum OpResponse {
134 Inserted {
136 id: ObjectId,
138 },
139 Got {
141 key: ObjectKey,
143 response: GetResponse,
145 },
146 Deleted {
148 key: ObjectKey,
150 },
151 Head {
153 key: ObjectKey,
155 metadata: Option<Metadata>,
157 },
158}
159
160impl OpResponse {
161 pub fn kind(&self) -> &'static str {
163 match self {
164 OpResponse::Inserted { .. } => "insert",
165 OpResponse::Got { .. } => "get",
166 OpResponse::Deleted { .. } => "delete",
167 OpResponse::Head { .. } => "head",
168 }
169 }
170
171 pub fn key(&self) -> &ObjectKey {
173 match self {
174 OpResponse::Inserted { id } => &id.key,
175 OpResponse::Got { key, .. } => key,
176 OpResponse::Deleted { key } => key,
177 OpResponse::Head { key, .. } => key,
178 }
179 }
180}
181
182impl std::fmt::Debug for OpResponse {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 match self {
185 OpResponse::Inserted { id } => f.debug_struct("Inserted").field("id", id).finish(),
186 OpResponse::Got {
187 key,
188 response: Some(_),
189 } => f
190 .debug_struct("Got")
191 .field("key", key)
192 .field("response", &format_args!("Some(<stream>)"))
193 .finish(),
194 OpResponse::Got {
195 key,
196 response: None,
197 } => f
198 .debug_struct("Got")
199 .field("key", key)
200 .field("response", &format_args!("None"))
201 .finish(),
202 OpResponse::Deleted { key } => f.debug_struct("Deleted").field("key", key).finish(),
203 OpResponse::Head { key, metadata } => f
204 .debug_struct("Head")
205 .field("key", key)
206 .field("metadata", &metadata.is_some())
207 .finish(),
208 }
209 }
210}
211
212#[derive(Debug)]
220pub struct StreamExecutor {
221 pub(crate) backend: Arc<dyn Backend>,
222 pub(crate) window: usize,
223 pub(crate) reservation: ConcurrencyPermit,
224}
225
226impl StreamExecutor {
227 pub fn window(&self) -> usize {
229 self.window
230 }
231
232 pub fn execute<E>(
244 self,
245 context: ObjectContext,
246 operations: impl Stream<Item = (usize, Result<Operation, E>)> + Send + 'static,
247 ) -> impl Stream<Item = (usize, Result<OpResponse, E>)> + Send + 'static
248 where
249 E: From<Error> + Send + 'static,
250 {
251 let StreamExecutor {
252 backend,
253 window,
254 reservation,
255 } = self;
256
257 let reservation = Arc::new(reservation);
261
262 operations
263 .map(move |(idx, item)| {
264 let permit = Arc::clone(&reservation);
265 let backend = Arc::clone(&backend);
266 let context = context.clone();
267 async move {
268 let op = match item {
269 Ok(op) => op,
270 Err(e) => return (idx, Err(e)),
271 };
272
273 let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move {
274 execute_operation(backend, context, op).await
275 });
276
277 (idx, spawn.await.map_err(E::from))
278 }
279 })
280 .buffer_unordered(window)
281 }
282}
283
284async fn execute_operation(
285 backend: Arc<dyn Backend>,
286 context: ObjectContext,
287 op: Operation,
288) -> Result<OpResponse> {
289 match op {
290 Operation::Get(get) => {
291 let id = ObjectId::new(context, get.key);
292 let response = backend.get_object(&id, None).await?;
293 Ok(OpResponse::Got {
294 key: id.key,
295 response,
296 })
297 }
298 Operation::Insert(insert) => {
299 let id = ObjectId::optional(context, insert.key);
300 let stream = crate::stream::single(insert.payload);
301 backend.put_object(&id, &insert.metadata, stream).await?;
302 Ok(OpResponse::Inserted { id })
303 }
304 Operation::Delete(delete) => {
305 let id = ObjectId::new(context, delete.key);
306 backend.delete_object(&id).await?;
307 Ok(OpResponse::Deleted { key: id.key })
308 }
309 Operation::Head(head) => {
310 let id = ObjectId::new(context, head.key);
311 let metadata = backend.get_metadata(&id).await?;
312 Ok(OpResponse::Head {
313 key: id.key,
314 metadata,
315 })
316 }
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use std::sync::Arc;
323 use std::sync::atomic::{AtomicUsize, Ordering};
324
325 use bytes::Bytes;
326 use futures_util::StreamExt;
327 use objectstore_types::metadata::Metadata;
328 use objectstore_types::scope::{Scope, Scopes};
329
330 use super::*;
331 use crate::backend::common::PutResponse;
332 use crate::backend::in_memory::InMemoryBackend;
333 use crate::backend::testing::{Hooks, TestBackend};
334 use crate::error::Error;
335 use crate::service::StorageService;
336 use crate::stream::{self, ClientStream};
337
338 fn make_context() -> ObjectContext {
339 ObjectContext {
340 usecase: "testing".into(),
341 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
342 }
343 }
344
345 fn make_service_with_limit(limit: usize) -> StorageService {
346 StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
347 .with_concurrency_limit(limit)
348 }
349
350 fn make_service() -> StorageService {
351 make_service_with_limit(500)
352 }
353
354 fn indexed_ok(ops: Vec<Operation>) -> impl Stream<Item = (usize, Result<Operation, Error>)> {
356 futures_util::stream::iter(ops.into_iter().enumerate().map(|(i, op)| (i, Ok(op))))
357 }
358
359 #[test]
362 fn at_capacity_when_no_permits() {
363 let service = make_service_with_limit(0);
364 assert!(matches!(service.stream(), Err(Error::AtCapacity)));
365 }
366
367 #[test]
368 fn window_computation() {
369 let s = make_service_with_limit(1);
371 assert_eq!(s.stream().unwrap().window(), 1);
372
373 let s = make_service_with_limit(10);
375 assert_eq!(s.stream().unwrap().window(), 1);
376
377 let s = make_service_with_limit(100);
379 assert_eq!(s.stream().unwrap().window(), 10);
380
381 let s = make_service_with_limit(500);
383 assert_eq!(s.stream().unwrap().window(), 50);
384
385 let s = make_service_with_limit(1000);
387 assert_eq!(s.stream().unwrap().window(), 100);
388 }
389
390 #[tokio::test]
393 async fn execute_empty_stream() {
394 let service = make_service();
395 let executor = service.stream().unwrap();
396 let outcomes: Vec<_> = executor
397 .execute(
398 make_context(),
399 futures_util::stream::empty::<(usize, Result<Operation, Error>)>(),
400 )
401 .collect()
402 .await;
403 assert!(outcomes.is_empty());
404 }
405
406 #[tokio::test]
407 async fn execute_runs_all_operations() {
408 let service = make_service();
409 let context = make_context();
410
411 service
413 .insert_object(
414 context.clone(),
415 Some("key1".into()),
416 Metadata::default(),
417 stream::single("hello"),
418 )
419 .await
420 .unwrap();
421
422 let ops = vec![
423 Operation::Get(Get { key: "key1".into() }),
424 Operation::Get(Get {
425 key: "nonexistent".into(),
426 }),
427 Operation::Insert(Insert {
428 key: Some("key2".into()),
429 metadata: Metadata::default(),
430 payload: Bytes::from("world"),
431 }),
432 Operation::Delete(Delete { key: "key1".into() }),
433 ];
434
435 let executor = service.stream().unwrap();
436 let outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
437
438 assert_eq!(outcomes.len(), 4);
439
440 for (_, result) in &outcomes {
441 let response = result
442 .as_ref()
443 .unwrap_or_else(|e| panic!("unexpected error: {e:?}"));
444 assert!(
445 !response.key().as_str().is_empty(),
446 "response must have a non-empty key"
447 );
448 }
449 }
450
451 #[tokio::test]
452 async fn execute_head_operation() {
453 let service = make_service();
454 let context = make_context();
455
456 service
457 .insert_object(
458 context.clone(),
459 Some("exists".into()),
460 Metadata::default(),
461 stream::single("data"),
462 )
463 .await
464 .unwrap();
465
466 let ops = vec![
467 Operation::Head(Head {
468 key: "exists".into(),
469 }),
470 Operation::Head(Head {
471 key: "missing".into(),
472 }),
473 ];
474
475 let executor = service.stream().unwrap();
476 let mut outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
477 outcomes.sort_by_key(|(idx, _)| *idx);
478
479 assert_eq!(outcomes.len(), 2);
480
481 match &outcomes[0].1 {
482 Ok(OpResponse::Head {
483 key,
484 metadata: Some(_),
485 }) => assert_eq!(key.as_str(), "exists"),
486 other => panic!("expected Head with metadata, got: {other:?}"),
487 }
488
489 match &outcomes[1].1 {
490 Ok(OpResponse::Head {
491 key,
492 metadata: None,
493 }) => assert_eq!(key.as_str(), "missing"),
494 other => panic!("expected Head with None, got: {other:?}"),
495 }
496 }
497
498 struct GateOnPut {
501 paused_tx: tokio::sync::mpsc::Sender<()>,
502 resume: Arc<tokio::sync::Notify>,
503 in_flight: Arc<AtomicUsize>,
504 }
505
506 impl std::fmt::Debug for GateOnPut {
507 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508 f.debug_struct("GateOnPut").finish()
509 }
510 }
511
512 #[async_trait::async_trait]
513 impl Hooks for GateOnPut {
514 async fn put_object(
515 &self,
516 inner: &InMemoryBackend,
517 id: &ObjectId,
518 metadata: &Metadata,
519 stream: ClientStream,
520 ) -> Result<PutResponse> {
521 self.in_flight.fetch_add(1, Ordering::SeqCst);
522 let _ = self.paused_tx.send(()).await;
523 self.resume.notified().await;
524 let result = inner.put_object(id, metadata, stream).await;
525 self.in_flight.fetch_sub(1, Ordering::SeqCst);
526 result
527 }
528 }
529
530 #[tokio::test]
531 async fn concurrent_execution() {
532 let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(20);
534 let resume = Arc::new(tokio::sync::Notify::new());
535 let in_flight = Arc::new(AtomicUsize::new(0));
536
537 let gated = TestBackend::new(GateOnPut {
538 paused_tx,
539 resume: Arc::clone(&resume),
540 in_flight: Arc::clone(&in_flight),
541 });
542 let service = StorageService::new(Box::new(gated)).with_concurrency_limit(100);
543
544 let executor = service.stream().unwrap();
545 assert_eq!(executor.window(), 10);
546
547 let ops: Vec<Operation> = (0..10)
549 .map(|i| {
550 Operation::Insert(Insert {
551 key: Some(format!("key{i}")),
552 metadata: Metadata::default(),
553 payload: Bytes::from(format!("data{i}")),
554 })
555 })
556 .collect();
557
558 let exec_handle = tokio::spawn(async move {
559 executor
560 .execute(make_context(), indexed_ok(ops))
561 .collect::<Vec<_>>()
562 .await
563 });
564
565 for _ in 0..10 {
567 paused_rx.recv().await.unwrap();
568 }
569 assert_eq!(in_flight.load(Ordering::SeqCst), 10);
570
571 resume.notify_waiters();
573
574 let outcomes = exec_handle.await.unwrap();
575 assert_eq!(outcomes.len(), 10);
576 for (_, result) in &outcomes {
577 assert!(
578 matches!(result, Ok(OpResponse::Inserted { .. })),
579 "unexpected result: {result:?}",
580 );
581 }
582 }
583
584 #[tokio::test]
585 async fn batch_rejected_when_permits_exhausted() {
586 let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(2);
589 let resume = Arc::new(tokio::sync::Notify::new());
590
591 let gated = TestBackend::new(GateOnPut {
592 paused_tx,
593 resume: Arc::clone(&resume),
594 in_flight: Arc::new(AtomicUsize::new(0)),
595 });
596 let service = StorageService::new(Box::new(gated)).with_concurrency_limit(1);
597
598 let svc = service.clone();
600 tokio::spawn(async move {
601 let _ = svc
602 .insert_object(
603 make_context(),
604 Some("blocker".into()),
605 Metadata::default(),
606 stream::single("x"),
607 )
608 .await;
609 });
610 paused_rx.recv().await.unwrap();
611
612 assert!(
614 matches!(service.stream(), Err(Error::AtCapacity)),
615 "expected AtCapacity when all permits are held"
616 );
617
618 resume.notify_waiters();
619 }
620}