1use std::sync::Arc;
41
42use futures_util::{Stream, StreamExt};
43use objectstore_types::metadata::Metadata;
44
45use crate::backend::common::Backend;
46use crate::concurrency::ConcurrencyPermit;
47use crate::error::{Error, Result};
48use crate::id::{ObjectContext, ObjectId, ObjectKey};
49use crate::service::GetResponse;
50
51#[derive(Debug)]
53pub struct Insert {
54 pub key: Option<ObjectKey>,
56 pub metadata: Metadata,
58 pub payload: bytes::Bytes,
60}
61
62#[derive(Debug)]
64pub struct Get {
65 pub key: ObjectKey,
67}
68
69#[derive(Debug)]
71pub struct Delete {
72 pub key: ObjectKey,
74}
75
76#[derive(Debug)]
78pub enum Operation {
79 Insert(Insert),
81 Get(Get),
83 Delete(Delete),
85}
86
87impl Operation {
88 pub fn key(&self) -> Option<&ObjectKey> {
90 match self {
91 Operation::Insert(op) => op.key.as_ref(),
92 Operation::Get(op) => Some(&op.key),
93 Operation::Delete(op) => Some(&op.key),
94 }
95 }
96
97 pub fn permission(&self) -> objectstore_types::auth::Permission {
99 match self {
100 Operation::Get(_) => objectstore_types::auth::Permission::ObjectRead,
101 Operation::Insert(_) => objectstore_types::auth::Permission::ObjectWrite,
102 Operation::Delete(_) => objectstore_types::auth::Permission::ObjectDelete,
103 }
104 }
105
106 pub fn kind(&self) -> &'static str {
108 match self {
109 Operation::Insert(_) => "insert",
110 Operation::Get(_) => "get",
111 Operation::Delete(_) => "delete",
112 }
113 }
114}
115
116pub enum OpResponse {
121 Inserted {
123 id: ObjectId,
125 },
126 Got {
128 key: ObjectKey,
130 response: GetResponse,
132 },
133 Deleted {
135 key: ObjectKey,
137 },
138}
139
140impl OpResponse {
141 pub fn kind(&self) -> &'static str {
143 match self {
144 OpResponse::Inserted { .. } => "insert",
145 OpResponse::Got { .. } => "get",
146 OpResponse::Deleted { .. } => "delete",
147 }
148 }
149
150 pub fn key(&self) -> &ObjectKey {
152 match self {
153 OpResponse::Inserted { id } => &id.key,
154 OpResponse::Got { key, .. } => key,
155 OpResponse::Deleted { key } => key,
156 }
157 }
158}
159
160impl std::fmt::Debug for OpResponse {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 OpResponse::Inserted { id } => f.debug_struct("Inserted").field("id", id).finish(),
164 OpResponse::Got {
165 key,
166 response: Some(_),
167 } => f
168 .debug_struct("Got")
169 .field("key", key)
170 .field("response", &format_args!("Some(<stream>)"))
171 .finish(),
172 OpResponse::Got {
173 key,
174 response: None,
175 } => f
176 .debug_struct("Got")
177 .field("key", key)
178 .field("response", &format_args!("None"))
179 .finish(),
180 OpResponse::Deleted { key } => f.debug_struct("Deleted").field("key", key).finish(),
181 }
182 }
183}
184
185#[derive(Debug)]
193pub struct StreamExecutor {
194 pub(crate) backend: Arc<dyn Backend>,
195 pub(crate) window: usize,
196 pub(crate) reservation: ConcurrencyPermit,
197}
198
199impl StreamExecutor {
200 pub fn window(&self) -> usize {
202 self.window
203 }
204
205 pub fn execute<E>(
217 self,
218 context: ObjectContext,
219 operations: impl Stream<Item = (usize, Result<Operation, E>)> + Send + 'static,
220 ) -> impl Stream<Item = (usize, Result<OpResponse, E>)> + Send + 'static
221 where
222 E: From<Error> + Send + 'static,
223 {
224 let StreamExecutor {
225 backend,
226 window,
227 reservation,
228 } = self;
229
230 let reservation = Arc::new(reservation);
234
235 operations
236 .map(move |(idx, item)| {
237 let permit = Arc::clone(&reservation);
238 let backend = Arc::clone(&backend);
239 let context = context.clone();
240 async move {
241 let op = match item {
242 Ok(op) => op,
243 Err(e) => return (idx, Err(e)),
244 };
245
246 let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move {
247 execute_operation(backend, context, op).await
248 });
249
250 (idx, spawn.await.map_err(E::from))
251 }
252 })
253 .buffer_unordered(window)
254 }
255}
256
257async fn execute_operation(
258 backend: Arc<dyn Backend>,
259 context: ObjectContext,
260 op: Operation,
261) -> Result<OpResponse> {
262 match op {
263 Operation::Get(get) => {
264 let id = ObjectId::new(context, get.key);
265 let response = backend.get_object(&id).await?;
266 Ok(OpResponse::Got {
267 key: id.key,
268 response,
269 })
270 }
271 Operation::Insert(insert) => {
272 let id = ObjectId::optional(context, insert.key);
273 let stream = crate::stream::single(insert.payload);
274 backend.put_object(&id, &insert.metadata, stream).await?;
275 Ok(OpResponse::Inserted { id })
276 }
277 Operation::Delete(delete) => {
278 let id = ObjectId::new(context, delete.key);
279 backend.delete_object(&id).await?;
280 Ok(OpResponse::Deleted { key: id.key })
281 }
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use std::sync::Arc;
288 use std::sync::atomic::{AtomicUsize, Ordering};
289
290 use bytes::Bytes;
291 use futures_util::StreamExt;
292 use objectstore_types::metadata::Metadata;
293 use objectstore_types::scope::{Scope, Scopes};
294
295 use super::*;
296 use crate::backend::common::PutResponse;
297 use crate::backend::in_memory::InMemoryBackend;
298 use crate::backend::testing::{Hooks, TestBackend};
299 use crate::error::Error;
300 use crate::service::StorageService;
301 use crate::stream::{self, ClientStream};
302
303 fn make_context() -> ObjectContext {
304 ObjectContext {
305 usecase: "testing".into(),
306 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
307 }
308 }
309
310 fn make_service_with_limit(limit: usize) -> StorageService {
311 StorageService::new(Box::new(InMemoryBackend::new("in-memory")))
312 .with_concurrency_limit(limit)
313 }
314
315 fn make_service() -> StorageService {
316 make_service_with_limit(500)
317 }
318
319 fn indexed_ok(
321 ops: Vec<Operation>,
322 ) -> impl futures_util::Stream<Item = (usize, Result<Operation, Error>)> {
323 futures_util::stream::iter(ops.into_iter().enumerate().map(|(i, op)| (i, Ok(op))))
324 }
325
326 #[test]
329 fn at_capacity_when_no_permits() {
330 let service = make_service_with_limit(0);
331 assert!(matches!(service.stream(), Err(Error::AtCapacity)));
332 }
333
334 #[test]
335 fn window_computation() {
336 let s = make_service_with_limit(1);
338 assert_eq!(s.stream().unwrap().window(), 1);
339
340 let s = make_service_with_limit(10);
342 assert_eq!(s.stream().unwrap().window(), 1);
343
344 let s = make_service_with_limit(100);
346 assert_eq!(s.stream().unwrap().window(), 10);
347
348 let s = make_service_with_limit(500);
350 assert_eq!(s.stream().unwrap().window(), 50);
351
352 let s = make_service_with_limit(1000);
354 assert_eq!(s.stream().unwrap().window(), 100);
355 }
356
357 #[tokio::test]
360 async fn execute_empty_stream() {
361 let service = make_service();
362 let executor = service.stream().unwrap();
363 let outcomes: Vec<_> = executor
364 .execute(
365 make_context(),
366 futures_util::stream::empty::<(usize, Result<Operation, Error>)>(),
367 )
368 .collect()
369 .await;
370 assert!(outcomes.is_empty());
371 }
372
373 #[tokio::test]
374 async fn execute_runs_all_operations() {
375 let service = make_service();
376 let context = make_context();
377
378 service
380 .insert_object(
381 context.clone(),
382 Some("key1".into()),
383 Metadata::default(),
384 stream::single("hello"),
385 )
386 .await
387 .unwrap();
388
389 let ops = vec![
390 Operation::Get(Get { key: "key1".into() }),
391 Operation::Get(Get {
392 key: "nonexistent".into(),
393 }),
394 Operation::Insert(Insert {
395 key: Some("key2".into()),
396 metadata: Metadata::default(),
397 payload: Bytes::from("world"),
398 }),
399 Operation::Delete(Delete { key: "key1".into() }),
400 ];
401
402 let executor = service.stream().unwrap();
403 let outcomes: Vec<_> = executor.execute(context, indexed_ok(ops)).collect().await;
404
405 assert_eq!(outcomes.len(), 4);
406
407 for (_, result) in &outcomes {
408 let response = result
409 .as_ref()
410 .unwrap_or_else(|e| panic!("unexpected error: {e:?}"));
411 assert!(
412 !response.key().as_str().is_empty(),
413 "response must have a non-empty key"
414 );
415 }
416 }
417
418 struct GateOnPut {
421 paused_tx: tokio::sync::mpsc::Sender<()>,
422 resume: Arc<tokio::sync::Notify>,
423 in_flight: Arc<AtomicUsize>,
424 }
425
426 impl std::fmt::Debug for GateOnPut {
427 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428 f.debug_struct("GateOnPut").finish()
429 }
430 }
431
432 #[async_trait::async_trait]
433 impl Hooks for GateOnPut {
434 async fn put_object(
435 &self,
436 inner: &InMemoryBackend,
437 id: &ObjectId,
438 metadata: &Metadata,
439 stream: ClientStream,
440 ) -> Result<PutResponse> {
441 self.in_flight.fetch_add(1, Ordering::SeqCst);
442 let _ = self.paused_tx.send(()).await;
443 self.resume.notified().await;
444 let result = inner.put_object(id, metadata, stream).await;
445 self.in_flight.fetch_sub(1, Ordering::SeqCst);
446 result
447 }
448 }
449
450 #[tokio::test]
451 async fn concurrent_execution() {
452 let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(20);
454 let resume = Arc::new(tokio::sync::Notify::new());
455 let in_flight = Arc::new(AtomicUsize::new(0));
456
457 let gated = TestBackend::new(GateOnPut {
458 paused_tx,
459 resume: Arc::clone(&resume),
460 in_flight: Arc::clone(&in_flight),
461 });
462 let service = StorageService::new(Box::new(gated)).with_concurrency_limit(100);
463
464 let executor = service.stream().unwrap();
465 assert_eq!(executor.window(), 10);
466
467 let ops: Vec<Operation> = (0..10)
469 .map(|i| {
470 Operation::Insert(Insert {
471 key: Some(format!("key{i}")),
472 metadata: Metadata::default(),
473 payload: Bytes::from(format!("data{i}")),
474 })
475 })
476 .collect();
477
478 let exec_handle = tokio::spawn(async move {
479 executor
480 .execute(make_context(), indexed_ok(ops))
481 .collect::<Vec<_>>()
482 .await
483 });
484
485 for _ in 0..10 {
487 paused_rx.recv().await.unwrap();
488 }
489 assert_eq!(in_flight.load(Ordering::SeqCst), 10);
490
491 resume.notify_waiters();
493
494 let outcomes = exec_handle.await.unwrap();
495 assert_eq!(outcomes.len(), 10);
496 for (_, result) in &outcomes {
497 assert!(
498 matches!(result, Ok(OpResponse::Inserted { .. })),
499 "unexpected result: {:?}",
500 result
501 );
502 }
503 }
504
505 #[tokio::test]
506 async fn batch_rejected_when_permits_exhausted() {
507 let (paused_tx, mut paused_rx) = tokio::sync::mpsc::channel::<()>(2);
510 let resume = Arc::new(tokio::sync::Notify::new());
511
512 let gated = TestBackend::new(GateOnPut {
513 paused_tx,
514 resume: Arc::clone(&resume),
515 in_flight: Arc::new(AtomicUsize::new(0)),
516 });
517 let service = StorageService::new(Box::new(gated)).with_concurrency_limit(1);
518
519 let svc = service.clone();
521 tokio::spawn(async move {
522 let _ = svc
523 .insert_object(
524 make_context(),
525 Some("blocker".into()),
526 Metadata::default(),
527 stream::single("x"),
528 )
529 .await;
530 });
531 paused_rx.recv().await.unwrap();
532
533 assert!(
535 matches!(service.stream(), Err(Error::AtCapacity)),
536 "expected AtCapacity when all permits are held"
537 );
538
539 resume.notify_waiters();
540 }
541}