objectstore_server/
endpoints.rs

1//! Contains all HTTP endpoint handlers.
2
3use std::io;
4
5use anyhow::Context;
6use axum::body::Body;
7use axum::extract::{Path, Query, State};
8use axum::http::{HeaderMap, StatusCode};
9use axum::response::{IntoResponse, Response};
10use axum::routing::{get, put};
11use axum::{Json, Router};
12use futures_util::{StreamExt, TryStreamExt};
13use objectstore_service::ObjectPath;
14use objectstore_types::Metadata;
15use serde::{Deserialize, Serialize};
16
17use crate::error::ApiResult;
18use crate::state::ServiceState;
19
20pub fn routes() -> Router<ServiceState> {
21    let service_routes = Router::new().route("/", put(put_object_nokey)).route(
22        "/{*key}",
23        put(put_object).get(get_object).delete(delete_object),
24    );
25
26    Router::new()
27        .route("/health", get(health))
28        .nest("/v1/", service_routes)
29}
30
31async fn health() -> impl IntoResponse {
32    "OK"
33}
34
35#[derive(Deserialize, Debug)]
36struct ContextParams {
37    scope: String,
38    usecase: String,
39}
40
41#[derive(Debug, Serialize)]
42struct PutBlobResponse {
43    key: String,
44}
45
46async fn put_object_nokey(
47    State(state): State<ServiceState>,
48    Query(params): Query<ContextParams>,
49    headers: HeaderMap,
50    body: Body,
51) -> ApiResult<impl IntoResponse> {
52    let path = ObjectPath {
53        usecase: params.usecase,
54        scope: params.scope,
55        key: uuid::Uuid::new_v4().to_string(),
56    };
57    populate_sentry_scope(&path);
58    let metadata =
59        Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
60
61    let stream = body.into_data_stream().map_err(io::Error::other).boxed();
62    let key = state.service.put_object(path, &metadata, stream).await?;
63
64    Ok(Json(PutBlobResponse {
65        key: key.key.to_string(),
66    }))
67}
68
69async fn put_object(
70    State(state): State<ServiceState>,
71    Query(params): Query<ContextParams>,
72    Path(key): Path<String>,
73    headers: HeaderMap,
74    body: Body,
75) -> ApiResult<impl IntoResponse> {
76    let path = ObjectPath {
77        usecase: params.usecase,
78        scope: params.scope,
79        key,
80    };
81    populate_sentry_scope(&path);
82    let metadata =
83        Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
84
85    let stream = body.into_data_stream().map_err(io::Error::other).boxed();
86    let key = state.service.put_object(path, &metadata, stream).await?;
87
88    Ok(Json(PutBlobResponse {
89        key: key.key.to_string(),
90    }))
91}
92
93async fn get_object(
94    State(state): State<ServiceState>,
95    Query(params): Query<ContextParams>,
96    Path(key): Path<String>,
97) -> ApiResult<Response> {
98    let path = ObjectPath {
99        usecase: params.usecase,
100        scope: params.scope,
101        key,
102    };
103    populate_sentry_scope(&path);
104
105    let Some((metadata, stream)) = state.service.get_object(&path).await? else {
106        return Ok(StatusCode::NOT_FOUND.into_response());
107    };
108
109    let headers = metadata
110        .to_headers("", false)
111        .context("extracting metadata from headers")?;
112    Ok((headers, Body::from_stream(stream)).into_response())
113}
114
115async fn delete_object(
116    State(state): State<ServiceState>,
117    Query(params): Query<ContextParams>,
118    Path(key): Path<String>,
119) -> ApiResult<impl IntoResponse> {
120    let path = ObjectPath {
121        usecase: params.usecase,
122        scope: params.scope,
123        key,
124    };
125    populate_sentry_scope(&path);
126
127    state.service.delete_object(&path).await?;
128
129    Ok(())
130}
131
132fn populate_sentry_scope(path: &ObjectPath) {
133    sentry::configure_scope(|s| {
134        s.set_tag("usecase", path.usecase.clone());
135        s.set_extra("scope", path.scope.clone().into());
136        s.set_extra("key", path.key.clone().into());
137    });
138}