objectstore_service/backend/
counting.rs1use std::sync::Arc;
18
19use objectstore_types::metadata::Metadata;
20use objectstore_types::range::ByteRange;
21
22use crate::backend::common::{
23 Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend, PutResponse,
24};
25use crate::error::Result;
26use crate::id::ObjectId;
27use crate::multipart::{
28 AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
29 ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
30};
31use crate::stream::ClientStream;
32
33fn count(usecase: &str) {
38 objectstore_metrics::count!("cogs.usage" += 1, app_feature = usecase.to_owned());
39}
40
41#[derive(Debug)]
50pub struct CountingBackend {
51 inner: Arc<dyn Backend>,
52}
53
54impl CountingBackend {
55 pub fn new(inner: Box<dyn Backend>) -> Self {
58 let inner: Arc<dyn Backend> = Arc::from(inner);
59 Self { inner }
60 }
61}
62
63#[async_trait::async_trait]
64impl Backend for CountingBackend {
65 fn name(&self) -> &'static str {
66 self.inner.name()
67 }
68
69 async fn put_object(
70 &self,
71 id: &ObjectId,
72 metadata: &Metadata,
73 stream: ClientStream,
74 ) -> Result<PutResponse> {
75 count(&id.context.usecase);
76 self.inner.put_object(id, metadata, stream).await
77 }
78
79 async fn get_object(&self, id: &ObjectId, range: Option<ByteRange>) -> Result<GetResponse> {
80 count(&id.context.usecase);
81 self.inner.get_object(id, range).await
82 }
83
84 async fn get_metadata(&self, id: &ObjectId) -> Result<MetadataResponse> {
85 count(&id.context.usecase);
86 self.inner.get_metadata(id).await
87 }
88
89 async fn delete_object(&self, id: &ObjectId) -> Result<DeleteResponse> {
90 count(&id.context.usecase);
91 self.inner.delete_object(id).await
92 }
93
94 async fn join(&self) {
95 self.inner.join().await;
96 }
97
98 fn as_multipart_upload_backend(&self) -> Result<&dyn MultipartUploadBackend> {
99 self.inner.as_multipart_upload_backend()?;
100 Ok(self)
101 }
102}
103
104#[async_trait::async_trait]
105impl MultipartUploadBackend for CountingBackend {
106 async fn initiate_multipart(
107 &self,
108 id: &ObjectId,
109 metadata: &Metadata,
110 ) -> Result<InitiateMultipartResponse> {
111 count(&id.context.usecase);
112 self.inner
113 .as_multipart_upload_backend()?
114 .initiate_multipart(id, metadata)
115 .await
116 }
117
118 async fn upload_part(
119 &self,
120 id: &ObjectId,
121 upload_id: &UploadId,
122 part_number: PartNumber,
123 content_length: u64,
124 content_md5: Option<&str>,
125 body: ClientStream,
126 ) -> Result<UploadPartResponse> {
127 count(&id.context.usecase);
128 self.inner
129 .as_multipart_upload_backend()?
130 .upload_part(
131 id,
132 upload_id,
133 part_number,
134 content_length,
135 content_md5,
136 body,
137 )
138 .await
139 }
140
141 async fn list_parts(
142 &self,
143 id: &ObjectId,
144 upload_id: &UploadId,
145 max_parts: Option<u32>,
146 part_number_marker: Option<PartNumber>,
147 ) -> Result<ListPartsResponse> {
148 count(&id.context.usecase);
149 self.inner
150 .as_multipart_upload_backend()?
151 .list_parts(id, upload_id, max_parts, part_number_marker)
152 .await
153 }
154
155 async fn abort_multipart(
156 &self,
157 id: &ObjectId,
158 upload_id: &UploadId,
159 ) -> Result<AbortMultipartResponse> {
160 count(&id.context.usecase);
161 self.inner
162 .as_multipart_upload_backend()?
163 .abort_multipart(id, upload_id)
164 .await
165 }
166
167 async fn complete_multipart(
168 &self,
169 id: &ObjectId,
170 upload_id: &UploadId,
171 parts: Vec<CompletedPart>,
172 ) -> Result<CompleteMultipartResponse> {
173 count(&id.context.usecase);
174 self.inner
175 .as_multipart_upload_backend()?
176 .complete_multipart(id, upload_id, parts)
177 .await
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use objectstore_types::scope::{Scope, Scopes};
184
185 use super::*;
186 use crate::backend::in_memory::InMemoryBackend;
187 use crate::id::ObjectContext;
188 use crate::stream;
189
190 fn object_id(usecase: &str) -> ObjectId {
191 ObjectId::new(
192 ObjectContext {
193 usecase: usecase.into(),
194 scopes: Scopes::from_iter([Scope::create("org", "1").unwrap()]),
195 },
196 "key".into(),
197 )
198 }
199
200 fn capture(f: impl std::future::Future<Output = ()>) -> Vec<String> {
205 objectstore_metrics::with_capturing_test_client(|| {
206 tokio::runtime::Builder::new_current_thread()
207 .enable_all()
208 .build()
209 .unwrap()
210 .block_on(f);
211 })
212 }
213
214 #[test]
215 fn counts_each_core_operation_once() {
216 let captured = capture(async {
217 let backend = CountingBackend::new(Box::new(InMemoryBackend::new("in-memory")));
218 let id = object_id("attachments");
219
220 backend
221 .put_object(&id, &Metadata::default(), stream::single("hi"))
222 .await
223 .unwrap();
224 backend.get_object(&id, None).await.unwrap();
225 backend.get_metadata(&id).await.unwrap();
226 backend.delete_object(&id).await.unwrap();
227 });
228
229 let cogs = captured
230 .iter()
231 .filter(|m| m.starts_with("cogs.usage"))
232 .count();
233 assert_eq!(
234 cogs, 4,
235 "expected one count per operation, captured: {captured:?}"
236 );
237 assert!(
238 captured
239 .iter()
240 .all(|m| !m.starts_with("cogs.usage")
241 || m == "cogs.usage:+1|c|#app_feature:attachments"),
242 "captured: {captured:?}"
243 );
244 }
245
246 #[test]
247 fn counts_missing_reads_on_dispatch() {
248 let captured = capture(async {
249 let backend = CountingBackend::new(Box::new(InMemoryBackend::new("in-memory")));
250 let result = backend
252 .get_object(&object_id("attachments"), None)
253 .await
254 .unwrap();
255 assert!(result.is_none());
256 });
257
258 assert_eq!(
259 captured
260 .iter()
261 .filter(|m| m.starts_with("cogs.usage"))
262 .count(),
263 1,
264 "captured: {captured:?}"
265 );
266 }
267
268 #[test]
269 fn new_usecase_is_app_feature() {
270 let captured = capture(async {
271 let backend = CountingBackend::new(Box::new(InMemoryBackend::new("in-memory")));
272 backend
273 .get_object(&object_id("new_usecase"), None)
274 .await
275 .unwrap();
276 });
277
278 assert!(
279 captured
280 .iter()
281 .any(|m| m == "cogs.usage:+1|c|#app_feature:new_usecase"),
282 "captured: {captured:?}"
283 );
284 }
285
286 #[test]
287 fn counts_each_multipart_operation() {
288 let captured = capture(async {
289 let backend: Arc<dyn Backend> = Arc::new(CountingBackend::new(Box::new(
290 InMemoryBackend::new("in-memory"),
291 )));
292 let multipart = backend.as_multipart_upload_backend().unwrap();
293 let id = object_id("attachments");
294
295 let upload_id = multipart
296 .initiate_multipart(&id, &Metadata::default())
297 .await
298 .unwrap();
299 multipart
300 .upload_part(
301 &id,
302 &upload_id,
303 PartNumber::new(1).unwrap(),
304 2,
305 None,
306 stream::single("hi"),
307 )
308 .await
309 .unwrap();
310 multipart
311 .list_parts(&id, &upload_id, None, None)
312 .await
313 .unwrap();
314 multipart
315 .complete_multipart(&id, &upload_id, vec![])
316 .await
317 .unwrap();
318 let _ = multipart.abort_multipart(&id, &upload_id).await;
321 });
322
323 assert_eq!(
324 captured
325 .iter()
326 .filter(|m| m == &"cogs.usage:+1|c|#app_feature:attachments")
327 .count(),
328 5,
329 "expected one count per multipart operation, captured: {captured:?}"
330 );
331 }
332}