relay_server/endpoints/
envelope.rs

1//! Handles envelope store requests.
2
3use std::convert::Infallible;
4
5use axum::extract::rejection::BytesRejection;
6use axum::extract::{DefaultBodyLimit, FromRequest, Request};
7use axum::response::IntoResponse;
8use axum::routing::{post, MethodRouter};
9use axum::{Json, RequestExt};
10use bytes::Bytes;
11use relay_config::Config;
12use relay_event_schema::protocol::EventId;
13use serde::Serialize;
14
15use crate::endpoints::common::{self, BadStoreRequest};
16use crate::envelope::Envelope;
17use crate::extractors::{BadEventMeta, PartialMeta, RequestMeta};
18use crate::service::ServiceState;
19
20/// Aggregate rejection thrown when extracting [`EnvelopeParams`].
21#[derive(Debug)]
22enum BadEnvelopeParams {
23    EventMeta(BadEventMeta),
24    InvalidBody(BytesRejection),
25}
26
27impl From<BadEventMeta> for BadEnvelopeParams {
28    fn from(value: BadEventMeta) -> Self {
29        Self::EventMeta(value)
30    }
31}
32
33impl From<BytesRejection> for BadEnvelopeParams {
34    fn from(value: BytesRejection) -> Self {
35        Self::InvalidBody(value)
36    }
37}
38
39impl From<Infallible> for BadEnvelopeParams {
40    fn from(value: Infallible) -> Self {
41        match value {}
42    }
43}
44
45impl IntoResponse for BadEnvelopeParams {
46    fn into_response(self) -> axum::response::Response {
47        match self {
48            BadEnvelopeParams::EventMeta(inner) => inner.into_response(),
49            BadEnvelopeParams::InvalidBody(inner) => inner.into_response(),
50        }
51    }
52}
53
54#[derive(Debug)]
55struct EnvelopeParams {
56    meta: RequestMeta,
57    body: Bytes,
58}
59
60impl EnvelopeParams {
61    fn extract_envelope(self) -> Result<Box<Envelope>, BadStoreRequest> {
62        let Self { meta, body } = self;
63
64        if body.is_empty() {
65            return Err(BadStoreRequest::EmptyBody);
66        }
67
68        Ok(Envelope::parse_request(body, meta)?)
69    }
70}
71
72impl FromRequest<ServiceState> for EnvelopeParams {
73    type Rejection = BadEnvelopeParams;
74
75    async fn from_request(
76        mut request: Request,
77        state: &ServiceState,
78    ) -> Result<Self, Self::Rejection> {
79        let result = request.extract_parts_with_state(state).await;
80
81        if !matches!(result, Err(BadEventMeta::MissingAuth)) {
82            return Ok(Self {
83                meta: result?,
84                body: request.extract().await?,
85            });
86        }
87
88        let partial_meta: PartialMeta = request.extract_parts_with_state(state).await?;
89        let body: Bytes = request.extract().await?;
90
91        let line = body
92            .splitn(2, |b| *b == b'\n')
93            .next()
94            .ok_or(BadEventMeta::MissingAuth)?;
95
96        let request_meta = serde_json::from_slice(line).map_err(BadEventMeta::BadEnvelopeAuth)?;
97
98        Ok(Self {
99            meta: partial_meta.copy_to(request_meta),
100            body,
101        })
102    }
103}
104
105#[derive(Serialize)]
106struct StoreResponse {
107    #[serde(skip_serializing_if = "Option::is_none")]
108    id: Option<EventId>,
109}
110
111/// Handler for the envelope store endpoint.
112async fn handle(
113    state: ServiceState,
114    params: EnvelopeParams,
115) -> Result<impl IntoResponse, BadStoreRequest> {
116    let envelope = params.extract_envelope()?;
117    let id = common::handle_envelope(&state, envelope).await?;
118    Ok(Json(StoreResponse { id }))
119}
120
121pub fn route(config: &Config) -> MethodRouter<ServiceState> {
122    post(handle).route_layer(DefaultBodyLimit::max(config.max_envelope_size()))
123}