objectstore_server/
endpoints.rs

1//! Contains all HTTP endpoint handlers.
2
3use std::io;
4use std::time::SystemTime;
5
6use anyhow::Context;
7use axum::body::Body;
8use axum::extract::{Path, State};
9use axum::http::{HeaderMap, Method, StatusCode};
10use axum::response::{IntoResponse, Response};
11use axum::routing;
12use axum::{Json, Router};
13use futures_util::{StreamExt, TryStreamExt};
14use objectstore_service::{ObjectPath, OptionalObjectPath};
15use objectstore_types::Metadata;
16use serde::Serialize;
17
18use crate::error::ApiResult;
19use crate::state::ServiceState;
20
21pub fn routes() -> Router<ServiceState> {
22    let service_routes = Router::new().route(
23        "/{*path}",
24        routing::post(insert_object)
25            .put(insert_object)
26            .get(get_object)
27            .delete(delete_object),
28    );
29
30    Router::new()
31        .route("/health", routing::get(health))
32        .nest("/v1/", service_routes)
33}
34
35async fn health() -> impl IntoResponse {
36    "OK"
37}
38
39#[derive(Debug, Serialize)]
40struct PutBlobResponse {
41    key: String,
42}
43
44async fn insert_object(
45    State(state): State<ServiceState>,
46    Path(path): Path<OptionalObjectPath>,
47    method: Method,
48    headers: HeaderMap,
49    body: Body,
50) -> ApiResult<Response> {
51    let (expected_method, response_status) = match path.key {
52        Some(_) => (Method::PUT, StatusCode::OK),
53        None => (Method::POST, StatusCode::CREATED),
54    };
55
56    // TODO: For now allow PUT everywhere. Remove the second condition when all clients are updated.
57    if method != expected_method && method == Method::POST {
58        return Ok(StatusCode::METHOD_NOT_ALLOWED.into_response());
59    }
60
61    let path = path.create_key();
62    populate_sentry_scope(&path);
63
64    let mut metadata =
65        Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
66    metadata.time_created = Some(SystemTime::now());
67
68    let stream = body.into_data_stream().map_err(io::Error::other).boxed();
69    let response_path = state.service.put_object(path, &metadata, stream).await?;
70    let response = Json(PutBlobResponse {
71        key: response_path.key.to_string(),
72    });
73
74    Ok((response_status, response).into_response())
75}
76
77async fn get_object(
78    State(state): State<ServiceState>,
79    Path(path): Path<ObjectPath>,
80) -> ApiResult<Response> {
81    populate_sentry_scope(&path);
82    let Some((metadata, stream)) = state.service.get_object(&path).await? else {
83        return Ok(StatusCode::NOT_FOUND.into_response());
84    };
85
86    let headers = metadata
87        .to_headers("", false)
88        .context("extracting metadata from headers")?;
89    Ok((headers, Body::from_stream(stream)).into_response())
90}
91
92async fn delete_object(
93    State(state): State<ServiceState>,
94    Path(path): Path<ObjectPath>,
95) -> ApiResult<impl IntoResponse> {
96    populate_sentry_scope(&path);
97
98    state.service.delete_object(&path).await?;
99
100    Ok(StatusCode::NO_CONTENT)
101}
102
103fn populate_sentry_scope(path: &ObjectPath) {
104    sentry::configure_scope(|s| {
105        s.set_tag("usecase", path.usecase.clone());
106        s.set_extra("scope", path.scope.clone().into());
107        s.set_extra("key", path.key.clone().into());
108    });
109}