objectstore_server/extractors/
body.rs

1//! Axum extractor for bandwidth-metered request bodies.
2
3use std::convert::Infallible;
4use std::io;
5
6use axum::extract::{FromRequest, FromRequestParts, Path, Request};
7use futures_util::{StreamExt, TryStreamExt};
8use objectstore_service::PayloadStream;
9use objectstore_service::id::ObjectContext;
10
11use super::id::ContextParams;
12use crate::state::ServiceState;
13
14/// An extractor that converts the request body into a metered [`PayloadStream`].
15///
16/// Extracts the [`ObjectContext`] from the request path to attribute bandwidth to the correct
17/// per-usecase and per-scope accumulators in addition to the global accumulator.
18pub struct MeteredBody(pub PayloadStream);
19
20impl std::fmt::Debug for MeteredBody {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("MeteredBody").finish()
23    }
24}
25
26impl FromRequest<ServiceState> for MeteredBody {
27    type Rejection = Infallible;
28
29    async fn from_request(request: Request, state: &ServiceState) -> Result<Self, Self::Rejection> {
30        let (mut parts, body) = request.into_parts();
31        let Path(params) =
32            <Path<ContextParams> as FromRequestParts<ServiceState>>::from_request_parts(
33                &mut parts, state,
34            )
35            .await
36            .expect("MeteredBody must be used on routes with {usecase} and {scopes} path params");
37        let context = ObjectContext {
38            usecase: params.usecase,
39            scopes: params.scopes,
40        };
41        let stream = body.into_data_stream().map_err(io::Error::other).boxed();
42        let stream = state.meter_stream(stream, &context);
43        Ok(Self(stream))
44    }
45}