relay_server/endpoints/
envelope.rs1use std::convert::Infallible;
4
5use axum::extract::rejection::BytesRejection;
6use axum::extract::{DefaultBodyLimit, FromRequest, Request};
7use axum::response::IntoResponse;
8use axum::routing::{post, MethodRouter};
9use axum::{Json, RequestExt};
10use bytes::Bytes;
11use relay_config::Config;
12use relay_event_schema::protocol::EventId;
13use serde::Serialize;
14
15use crate::endpoints::common::{self, BadStoreRequest};
16use crate::envelope::Envelope;
17use crate::extractors::{BadEventMeta, PartialMeta, RequestMeta};
18use crate::service::ServiceState;
19
20#[derive(Debug)]
22enum BadEnvelopeParams {
23 EventMeta(BadEventMeta),
24 InvalidBody(BytesRejection),
25}
26
27impl From<BadEventMeta> for BadEnvelopeParams {
28 fn from(value: BadEventMeta) -> Self {
29 Self::EventMeta(value)
30 }
31}
32
33impl From<BytesRejection> for BadEnvelopeParams {
34 fn from(value: BytesRejection) -> Self {
35 Self::InvalidBody(value)
36 }
37}
38
39impl From<Infallible> for BadEnvelopeParams {
40 fn from(value: Infallible) -> Self {
41 match value {}
42 }
43}
44
45impl IntoResponse for BadEnvelopeParams {
46 fn into_response(self) -> axum::response::Response {
47 match self {
48 BadEnvelopeParams::EventMeta(inner) => inner.into_response(),
49 BadEnvelopeParams::InvalidBody(inner) => inner.into_response(),
50 }
51 }
52}
53
54#[derive(Debug)]
55struct EnvelopeParams {
56 meta: RequestMeta,
57 body: Bytes,
58}
59
60impl EnvelopeParams {
61 fn extract_envelope(self) -> Result<Box<Envelope>, BadStoreRequest> {
62 let Self { meta, body } = self;
63
64 if body.is_empty() {
65 return Err(BadStoreRequest::EmptyBody);
66 }
67
68 Ok(Envelope::parse_request(body, meta)?)
69 }
70}
71
72impl FromRequest<ServiceState> for EnvelopeParams {
73 type Rejection = BadEnvelopeParams;
74
75 async fn from_request(
76 mut request: Request,
77 state: &ServiceState,
78 ) -> Result<Self, Self::Rejection> {
79 let result = request.extract_parts_with_state(state).await;
80
81 if !matches!(result, Err(BadEventMeta::MissingAuth)) {
82 return Ok(Self {
83 meta: result?,
84 body: request.extract().await?,
85 });
86 }
87
88 let partial_meta: PartialMeta = request.extract_parts_with_state(state).await?;
89 let body: Bytes = request.extract().await?;
90
91 let line = body
92 .splitn(2, |b| *b == b'\n')
93 .next()
94 .ok_or(BadEventMeta::MissingAuth)?;
95
96 let request_meta = serde_json::from_slice(line).map_err(BadEventMeta::BadEnvelopeAuth)?;
97
98 Ok(Self {
99 meta: partial_meta.copy_to(request_meta),
100 body,
101 })
102 }
103}
104
105#[derive(Serialize)]
106struct StoreResponse {
107 #[serde(skip_serializing_if = "Option::is_none")]
108 id: Option<EventId>,
109}
110
111async fn handle(
113 state: ServiceState,
114 params: EnvelopeParams,
115) -> Result<impl IntoResponse, BadStoreRequest> {
116 let envelope = params.extract_envelope()?;
117 let id = common::handle_envelope(&state, envelope).await?;
118 Ok(Json(StoreResponse { id }))
119}
120
121pub fn route(config: &Config) -> MethodRouter<ServiceState> {
122 post(handle).route_layer(DefaultBodyLimit::max(config.max_envelope_size()))
123}