objectstore_server/
endpoints.rs1use std::io;
4
5use anyhow::Context;
6use axum::body::Body;
7use axum::extract::{Path, Query, State};
8use axum::http::{HeaderMap, StatusCode};
9use axum::response::{IntoResponse, Response};
10use axum::routing::{get, put};
11use axum::{Json, Router};
12use futures_util::{StreamExt, TryStreamExt};
13use objectstore_service::ObjectPath;
14use objectstore_types::Metadata;
15use serde::{Deserialize, Serialize};
16
17use crate::error::ApiResult;
18use crate::state::ServiceState;
19
20pub fn routes() -> Router<ServiceState> {
21 let service_routes = Router::new().route("/", put(put_object_nokey)).route(
22 "/{*key}",
23 put(put_object).get(get_object).delete(delete_object),
24 );
25
26 Router::new()
27 .route("/health", get(health))
28 .nest("/v1/", service_routes)
29}
30
31async fn health() -> impl IntoResponse {
32 "OK"
33}
34
35#[derive(Deserialize, Debug)]
36struct ContextParams {
37 scope: String,
38 usecase: String,
39}
40
41#[derive(Debug, Serialize)]
42struct PutBlobResponse {
43 key: String,
44}
45
46async fn put_object_nokey(
47 State(state): State<ServiceState>,
48 Query(params): Query<ContextParams>,
49 headers: HeaderMap,
50 body: Body,
51) -> ApiResult<impl IntoResponse> {
52 let path = ObjectPath {
53 usecase: params.usecase,
54 scope: params.scope,
55 key: uuid::Uuid::new_v4().to_string(),
56 };
57 populate_sentry_scope(&path);
58 let metadata =
59 Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
60
61 let stream = body.into_data_stream().map_err(io::Error::other).boxed();
62 let key = state.service.put_object(path, &metadata, stream).await?;
63
64 Ok(Json(PutBlobResponse {
65 key: key.key.to_string(),
66 }))
67}
68
69async fn put_object(
70 State(state): State<ServiceState>,
71 Query(params): Query<ContextParams>,
72 Path(key): Path<String>,
73 headers: HeaderMap,
74 body: Body,
75) -> ApiResult<impl IntoResponse> {
76 let path = ObjectPath {
77 usecase: params.usecase,
78 scope: params.scope,
79 key,
80 };
81 populate_sentry_scope(&path);
82 let metadata =
83 Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
84
85 let stream = body.into_data_stream().map_err(io::Error::other).boxed();
86 let key = state.service.put_object(path, &metadata, stream).await?;
87
88 Ok(Json(PutBlobResponse {
89 key: key.key.to_string(),
90 }))
91}
92
93async fn get_object(
94 State(state): State<ServiceState>,
95 Query(params): Query<ContextParams>,
96 Path(key): Path<String>,
97) -> ApiResult<Response> {
98 let path = ObjectPath {
99 usecase: params.usecase,
100 scope: params.scope,
101 key,
102 };
103 populate_sentry_scope(&path);
104
105 let Some((metadata, stream)) = state.service.get_object(&path).await? else {
106 return Ok(StatusCode::NOT_FOUND.into_response());
107 };
108
109 let headers = metadata
110 .to_headers("", false)
111 .context("extracting metadata from headers")?;
112 Ok((headers, Body::from_stream(stream)).into_response())
113}
114
115async fn delete_object(
116 State(state): State<ServiceState>,
117 Query(params): Query<ContextParams>,
118 Path(key): Path<String>,
119) -> ApiResult<impl IntoResponse> {
120 let path = ObjectPath {
121 usecase: params.usecase,
122 scope: params.scope,
123 key,
124 };
125 populate_sentry_scope(&path);
126
127 state.service.delete_object(&path).await?;
128
129 Ok(())
130}
131
132fn populate_sentry_scope(path: &ObjectPath) {
133 sentry::configure_scope(|s| {
134 s.set_tag("usecase", path.usecase.clone());
135 s.set_extra("scope", path.scope.clone().into());
136 s.set_extra("key", path.key.clone().into());
137 });
138}