objectstore_server/
endpoints.rs1use std::io;
4use std::time::SystemTime;
5
6use anyhow::Context;
7use axum::body::Body;
8use axum::extract::{Path, State};
9use axum::http::{HeaderMap, Method, StatusCode};
10use axum::response::{IntoResponse, Response};
11use axum::routing;
12use axum::{Json, Router};
13use futures_util::{StreamExt, TryStreamExt};
14use objectstore_service::{ObjectPath, OptionalObjectPath};
15use objectstore_types::Metadata;
16use serde::Serialize;
17
18use crate::error::ApiResult;
19use crate::state::ServiceState;
20
21pub fn routes() -> Router<ServiceState> {
22 let service_routes = Router::new().route(
23 "/{*path}",
24 routing::post(insert_object)
25 .put(insert_object)
26 .get(get_object)
27 .delete(delete_object),
28 );
29
30 Router::new()
31 .route("/health", routing::get(health))
32 .nest("/v1/", service_routes)
33}
34
35async fn health() -> impl IntoResponse {
36 "OK"
37}
38
39#[derive(Debug, Serialize)]
40struct PutBlobResponse {
41 key: String,
42}
43
44async fn insert_object(
45 State(state): State<ServiceState>,
46 Path(path): Path<OptionalObjectPath>,
47 method: Method,
48 headers: HeaderMap,
49 body: Body,
50) -> ApiResult<Response> {
51 let (expected_method, response_status) = match path.key {
52 Some(_) => (Method::PUT, StatusCode::OK),
53 None => (Method::POST, StatusCode::CREATED),
54 };
55
56 if method != expected_method && method == Method::POST {
58 return Ok(StatusCode::METHOD_NOT_ALLOWED.into_response());
59 }
60
61 let path = path.create_key();
62 populate_sentry_scope(&path);
63
64 let mut metadata =
65 Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
66 metadata.time_created = Some(SystemTime::now());
67
68 let stream = body.into_data_stream().map_err(io::Error::other).boxed();
69 let response_path = state.service.put_object(path, &metadata, stream).await?;
70 let response = Json(PutBlobResponse {
71 key: response_path.key.to_string(),
72 });
73
74 Ok((response_status, response).into_response())
75}
76
77async fn get_object(
78 State(state): State<ServiceState>,
79 Path(path): Path<ObjectPath>,
80) -> ApiResult<Response> {
81 populate_sentry_scope(&path);
82 let Some((metadata, stream)) = state.service.get_object(&path).await? else {
83 return Ok(StatusCode::NOT_FOUND.into_response());
84 };
85
86 let headers = metadata
87 .to_headers("", false)
88 .context("extracting metadata from headers")?;
89 Ok((headers, Body::from_stream(stream)).into_response())
90}
91
92async fn delete_object(
93 State(state): State<ServiceState>,
94 Path(path): Path<ObjectPath>,
95) -> ApiResult<impl IntoResponse> {
96 populate_sentry_scope(&path);
97
98 state.service.delete_object(&path).await?;
99
100 Ok(StatusCode::NO_CONTENT)
101}
102
103fn populate_sentry_scope(path: &ObjectPath) {
104 sentry::configure_scope(|s| {
105 s.set_tag("usecase", path.usecase.clone());
106 s.set_extra("scope", path.scope.clone().into());
107 s.set_extra("key", path.key.clone().into());
108 });
109}