relay_server/endpoints/
envelope.rs1use std::convert::Infallible;
4
5use axum::extract::rejection::BytesRejection;
6use axum::extract::{DefaultBodyLimit, FromRequest, Request};
7use axum::response::IntoResponse;
8use axum::routing::{MethodRouter, post};
9use axum::{Json, RequestExt};
10use bytes::Bytes;
11use relay_config::Config;
12use relay_event_schema::protocol::EventId;
13use serde::Serialize;
14
15use crate::endpoints::common::{self, BadStoreRequest};
16use crate::envelope::Envelope;
17use crate::extractors::{BadEventMeta, PartialMeta, RequestMeta};
18use crate::service::ServiceState;
19
20#[derive(Debug)]
22enum BadEnvelopeParams {
23    EventMeta(BadEventMeta),
24    InvalidBody(BytesRejection),
25}
26
27impl From<BadEventMeta> for BadEnvelopeParams {
28    fn from(value: BadEventMeta) -> Self {
29        Self::EventMeta(value)
30    }
31}
32
33impl From<BytesRejection> for BadEnvelopeParams {
34    fn from(value: BytesRejection) -> Self {
35        Self::InvalidBody(value)
36    }
37}
38
39impl From<Infallible> for BadEnvelopeParams {
40    fn from(value: Infallible) -> Self {
41        match value {}
42    }
43}
44
45impl IntoResponse for BadEnvelopeParams {
46    fn into_response(self) -> axum::response::Response {
47        match self {
48            BadEnvelopeParams::EventMeta(inner) => inner.into_response(),
49            BadEnvelopeParams::InvalidBody(inner) => inner.into_response(),
50        }
51    }
52}
53
54#[derive(Debug)]
55struct EnvelopeParams {
56    meta: RequestMeta,
57    body: Bytes,
58}
59
60impl EnvelopeParams {
61    fn extract_envelope(self) -> Result<Box<Envelope>, BadStoreRequest> {
62        let Self { meta, body } = self;
63
64        if body.is_empty() {
65            return Err(BadStoreRequest::EmptyBody);
66        }
67
68        let envelope = Envelope::parse_request(body, meta)?;
69        if envelope.is_internal() {
70            return Err(BadStoreRequest::InternalEnvelope);
71        }
72        Ok(envelope)
73    }
74}
75
76impl FromRequest<ServiceState> for EnvelopeParams {
77    type Rejection = BadEnvelopeParams;
78
79    async fn from_request(
80        mut request: Request,
81        state: &ServiceState,
82    ) -> Result<Self, Self::Rejection> {
83        let result = request.extract_parts_with_state(state).await;
84
85        if !matches!(result, Err(BadEventMeta::MissingAuth)) {
86            return Ok(Self {
87                meta: result?,
88                body: request.extract().await?,
89            });
90        }
91
92        let partial_meta: PartialMeta = request.extract_parts_with_state(state).await?;
93        let body: Bytes = request.extract().await?;
94
95        let line = body
96            .splitn(2, |b| *b == b'\n')
97            .next()
98            .ok_or(BadEventMeta::MissingAuth)?;
99
100        let request_meta = serde_json::from_slice(line).map_err(BadEventMeta::BadEnvelopeAuth)?;
101
102        Ok(Self {
103            meta: partial_meta.copy_to(request_meta),
104            body,
105        })
106    }
107}
108
109#[derive(Serialize)]
110struct StoreResponse {
111    #[serde(skip_serializing_if = "Option::is_none")]
112    id: Option<EventId>,
113}
114
115async fn handle(
117    state: ServiceState,
118    params: EnvelopeParams,
119) -> Result<impl IntoResponse, BadStoreRequest> {
120    let envelope = params.extract_envelope()?;
121    let id = common::handle_envelope(&state, envelope).await?;
122    Ok(Json(StoreResponse { id }))
123}
124
125pub fn route(config: &Config) -> MethodRouter<ServiceState> {
126    post(handle).route_layer(DefaultBodyLimit::max(config.max_envelope_size()))
127}