Skip to main content

objectstore_service/backend/
counting.rs

1//! Defines [`CountingBackend`], a decorator for the [`Backend`] trait that emits Cost of Goods Sold
2//! (COGS) compute usage metrics to the `cogs.usage` counter.
3//!
4//! [`CountingBackend`] is meant to wrap the outer-most [`Backend`] implementation owned by
5//! [`StorageService`] so that every tracked backend operation, whether a single-object operation
6//! called by [`StorageService`] or a batched operation streamed by [`StreamExecutor`], is counted
7//! once. Notably, any operation that fails before it gets to [`StorageService`] (e.g. an auth or
8//! rate limit failure at a higher layer) is not counted.
9//!
10//! For COGS purposes we use operation count as a proxy for compute cost under the assumption that
11//! each request we serve has a basically flat CPU cost. Large payloads take longer, but they can be
12//! streamed in the background while other requests are served so they don't really cost more.
13//!
14//! [`StorageService`]: crate::service::StorageService
15//! [`StreamExecutor`]: crate::streaming::StreamExecutor
16
17use std::sync::Arc;
18
19use objectstore_types::metadata::Metadata;
20use objectstore_types::range::ByteRange;
21
22use crate::backend::common::{
23    Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend, PutResponse,
24};
25use crate::error::Result;
26use crate::id::ObjectId;
27use crate::multipart::{
28    AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
29    ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
30};
31use crate::stream::ClientStream;
32
33/// Increments `cogs.usage` by one operation for the given `usecase`.
34///
35/// Under the hood, the `usecase` is used as the `app_feature`. This allows to identify distinct
36/// products and map them in the for the COGs pipeline.
37fn count(usecase: &str) {
38    objectstore_metrics::count!("cogs.usage" += 1, app_feature = usecase.to_owned());
39}
40
41/// A [`Backend`] decorator that counts each operation performed for COGS. Also implements
42/// [`MultipartUploadBackend`]. See the [module documentation](self) for how it should be used.
43///
44/// [`CountingBackend`]'s implementation clashes with how the [`MultipartUploadBackend`] trait is
45/// connected to the [`Backend`] trait. The workaround is to give `CountingBackend` (up to) two
46/// `Arc`s that point to the inner backend:
47/// - `inner: Arc<dyn Backend>`
48/// - `inner_multipart: Option<Arc<dyn MultipartUploadBackend>>` if `inner` supports it
49#[derive(Debug)]
50pub struct CountingBackend {
51    inner: Arc<dyn Backend>,
52}
53
54impl CountingBackend {
55    /// Creates a [`CountingBackend`] that wraps `inner` and increments `cogs.usage`
56    /// before delegating operations to it.
57    pub fn new(inner: Box<dyn Backend>) -> Self {
58        let inner: Arc<dyn Backend> = Arc::from(inner);
59        Self { inner }
60    }
61}
62
63#[async_trait::async_trait]
64impl Backend for CountingBackend {
65    fn name(&self) -> &'static str {
66        self.inner.name()
67    }
68
69    async fn put_object(
70        &self,
71        id: &ObjectId,
72        metadata: &Metadata,
73        stream: ClientStream,
74    ) -> Result<PutResponse> {
75        count(&id.context.usecase);
76        self.inner.put_object(id, metadata, stream).await
77    }
78
79    async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
80        count(&id.context.usecase);
81        self.inner.get_object(id, range).await
82    }
83
84    async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
85        count(&id.context.usecase);
86        self.inner.get_metadata(id).await
87    }
88
89    async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
90        count(&id.context.usecase);
91        self.inner.delete_object(id).await
92    }
93
94    async fn join(&self) {
95        self.inner.join().await;
96    }
97
98    fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
99        self.inner.as_multipart_upload_backend()?;
100        Ok(self)
101    }
102}
103
104#[async_trait::async_trait]
105impl MultipartUploadBackend for CountingBackend {
106    async fn initiate_multipart(
107        &self,
108        id: &ObjectId,
109        metadata: &Metadata,
110    ) -> Result<InitiateMultipartResponse> {
111        count(&id.context.usecase);
112        self.inner
113            .as_multipart_upload_backend()?
114            .initiate_multipart(id, metadata)
115            .await
116    }
117
118    async fn upload_part(
119        &self,
120        id: &ObjectId,
121        upload_id: &UploadId,
122        part_number: PartNumber,
123        content_length: u64,
124        content_md5: Option<&str>,
125        body: ClientStream,
126    ) -> Result<UploadPartResponse> {
127        count(&id.context.usecase);
128        self.inner
129            .as_multipart_upload_backend()?
130            .upload_part(
131                id,
132                upload_id,
133                part_number,
134                content_length,
135                content_md5,
136                body,
137            )
138            .await
139    }
140
141    async fn list_parts(
142        &self,
143        id: &ObjectId,
144        upload_id: &UploadId,
145        max_parts: Option<u32>,
146        part_number_marker: Option<PartNumber>,
147    ) -> Result<ListPartsResponse> {
148        count(&id.context.usecase);
149        self.inner
150            .as_multipart_upload_backend()?
151            .list_parts(id, upload_id, max_parts, part_number_marker)
152            .await
153    }
154
155    async fn abort_multipart(
156        &self,
157        id: &ObjectId,
158        upload_id: &UploadId,
159    ) -> Result<AbortMultipartResponse> {
160        count(&id.context.usecase);
161        self.inner
162            .as_multipart_upload_backend()?
163            .abort_multipart(id, upload_id)
164            .await
165    }
166
167    async fn complete_multipart(
168        &self,
169        id: &ObjectId,
170        upload_id: &UploadId,
171        parts: Vec<CompletedPart>,
172    ) -> Result<CompleteMultipartResponse> {
173        count(&id.context.usecase);
174        self.inner
175            .as_multipart_upload_backend()?
176            .complete_multipart(id, upload_id, parts)
177            .await
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use objectstore_types::scope::{Scope, Scopes};
184
185    use super::*;
186    use crate::backend::in_memory::InMemoryBackend;
187    use crate::id::ObjectContext;
188    use crate::stream;
189
190    fn object_id(usecase: &str) -> ObjectId {
191        ObjectId::new(
192            ObjectContext {
193                usecase: usecase.into(),
194                scopes: Scopes::from_iter([Scope::create("org", "1").unwrap()]),
195            },
196            "key".into(),
197        )
198    }
199
200    /// Runs `f` on a current-thread runtime while capturing emitted metrics.
201    ///
202    /// The capturing client is thread-local, so the futures must run on the same
203    /// thread that installs it.
204    fn capture(f: impl std::future::Future<Output = ()>) -> Vec<String> {
205        objectstore_metrics::with_capturing_test_client(|| {
206            tokio::runtime::Builder::new_current_thread()
207                .enable_all()
208                .build()
209                .unwrap()
210                .block_on(f);
211        })
212    }
213
214    #[test]
215    fn counts_each_core_operation_once() {
216        let captured = capture(async {
217            let backend = CountingBackend::new(Box::new(InMemoryBackend::new("in-memory")));
218            let id = object_id("attachments");
219
220            backend
221                .put_object(&id, &Metadata::default(), stream::single("hi"))
222                .await
223                .unwrap();
224            backend.get_object(&id, None).await.unwrap();
225            backend.get_metadata(&id).await.unwrap();
226            backend.delete_object(&id).await.unwrap();
227        });
228
229        let cogs = captured
230            .iter()
231            .filter(|m| m.starts_with("cogs.usage"))
232            .count();
233        assert_eq!(
234            cogs, 4,
235            "expected one count per operation, captured: {captured:?}"
236        );
237        assert!(
238            captured
239                .iter()
240                .all(|m| !m.starts_with("cogs.usage")
241                    || m == "cogs.usage:+1|c|#app_feature:attachments"),
242            "captured: {captured:?}"
243        );
244    }
245
246    #[test]
247    fn counts_missing_reads_on_dispatch() {
248        let captured = capture(async {
249            let backend = CountingBackend::new(Box::new(InMemoryBackend::new("in-memory")));
250            // Nothing stored: the read returns `None` but is still billed.
251            let result = backend
252                .get_object(&object_id("attachments"), None)
253                .await
254                .unwrap();
255            assert!(result.is_none());
256        });
257
258        assert_eq!(
259            captured
260                .iter()
261                .filter(|m| m.starts_with("cogs.usage"))
262                .count(),
263            1,
264            "captured: {captured:?}"
265        );
266    }
267
268    #[test]
269    fn new_usecase_is_app_feature() {
270        let captured = capture(async {
271            let backend = CountingBackend::new(Box::new(InMemoryBackend::new("in-memory")));
272            backend
273                .get_object(&object_id("new_usecase"), None)
274                .await
275                .unwrap();
276        });
277
278        assert!(
279            captured
280                .iter()
281                .any(|m| m == "cogs.usage:+1|c|#app_feature:new_usecase"),
282            "captured: {captured:?}"
283        );
284    }
285
286    #[test]
287    fn counts_each_multipart_operation() {
288        let captured = capture(async {
289            let backend: Arc<dyn Backend> = Arc::new(CountingBackend::new(Box::new(
290                InMemoryBackend::new("in-memory"),
291            )));
292            let multipart = backend.as_multipart_upload_backend().unwrap();
293            let id = object_id("attachments");
294
295            let upload_id = multipart
296                .initiate_multipart(&id, &Metadata::default())
297                .await
298                .unwrap();
299            multipart
300                .upload_part(
301                    &id,
302                    &upload_id,
303                    PartNumber::new(1).unwrap(),
304                    2,
305                    None,
306                    stream::single("hi"),
307                )
308                .await
309                .unwrap();
310            multipart
311                .list_parts(&id, &upload_id, None, None)
312                .await
313                .unwrap();
314            multipart
315                .complete_multipart(&id, &upload_id, vec![])
316                .await
317                .unwrap();
318            // The upload was completed above, so aborting it is a no-op; counting
319            // happens on dispatch regardless, which is what this asserts.
320            let _ = multipart.abort_multipart(&id, &upload_id).await;
321        });
322
323        assert_eq!(
324            captured
325                .iter()
326                .filter(|m| m == &"cogs.usage:+1|c|#app_feature:attachments")
327                .count(),
328            5,
329            "expected one count per multipart operation, captured: {captured:?}"
330        );
331    }
332}