relay_server/endpoints/
envelope.rs1use std::convert::Infallible;
4
5use axum::extract::rejection::BytesRejection;
6use axum::extract::{DefaultBodyLimit, FromRequest, Request};
7use axum::response::IntoResponse;
8use axum::routing::{MethodRouter, post};
9use axum::{Json, RequestExt};
10use bytes::Bytes;
11use relay_config::Config;
12use relay_event_schema::protocol::EventId;
13use serde::Serialize;
14
15use crate::endpoints::common::{self, BadStoreRequest};
16use crate::envelope::Envelope;
17use crate::extractors::{BadEventMeta, PartialMeta, RequestMeta};
18use crate::service::ServiceState;
19
20#[derive(Debug)]
22enum BadEnvelopeParams {
23 EventMeta(BadEventMeta),
24 InvalidBody(BytesRejection),
25}
26
27impl From<BadEventMeta> for BadEnvelopeParams {
28 fn from(value: BadEventMeta) -> Self {
29 Self::EventMeta(value)
30 }
31}
32
33impl From<BytesRejection> for BadEnvelopeParams {
34 fn from(value: BytesRejection) -> Self {
35 Self::InvalidBody(value)
36 }
37}
38
39impl From<Infallible> for BadEnvelopeParams {
40 fn from(value: Infallible) -> Self {
41 match value {}
42 }
43}
44
45impl IntoResponse for BadEnvelopeParams {
46 fn into_response(self) -> axum::response::Response {
47 match self {
48 BadEnvelopeParams::EventMeta(inner) => inner.into_response(),
49 BadEnvelopeParams::InvalidBody(inner) => inner.into_response(),
50 }
51 }
52}
53
54#[derive(Debug)]
55struct EnvelopeParams {
56 meta: RequestMeta,
57 body: Bytes,
58}
59
60impl EnvelopeParams {
61 fn extract_envelope(self) -> Result<Box<Envelope>, BadStoreRequest> {
62 let Self { meta, body } = self;
63
64 if body.is_empty() {
65 return Err(BadStoreRequest::EmptyBody);
66 }
67
68 let envelope = Envelope::parse_request(body, meta)?;
69 if envelope.is_internal() {
70 return Err(BadStoreRequest::InternalEnvelope);
71 }
72 Ok(envelope)
73 }
74}
75
76impl FromRequest<ServiceState> for EnvelopeParams {
77 type Rejection = BadEnvelopeParams;
78
79 async fn from_request(
80 mut request: Request,
81 state: &ServiceState,
82 ) -> Result<Self, Self::Rejection> {
83 let result = request.extract_parts_with_state(state).await;
84
85 if !matches!(result, Err(BadEventMeta::MissingAuth)) {
86 return Ok(Self {
87 meta: result?,
88 body: request.extract().await?,
89 });
90 }
91
92 let partial_meta: PartialMeta = request.extract_parts_with_state(state).await?;
93 let body: Bytes = request.extract().await?;
94
95 let line = body
96 .splitn(2, |b| *b == b'\n')
97 .next()
98 .ok_or(BadEventMeta::MissingAuth)?;
99
100 let request_meta = serde_json::from_slice(line).map_err(BadEventMeta::BadEnvelopeAuth)?;
101
102 Ok(Self {
103 meta: partial_meta.copy_to(request_meta),
104 body,
105 })
106 }
107}
108
109#[derive(Serialize)]
110struct StoreResponse {
111 #[serde(skip_serializing_if = "Option::is_none")]
112 id: Option<EventId>,
113}
114
115async fn handle(
117 state: ServiceState,
118 params: EnvelopeParams,
119) -> Result<impl IntoResponse, BadStoreRequest> {
120 let envelope = params.extract_envelope()?;
121 let id = common::handle_envelope(&state, envelope).await?;
122 Ok(Json(StoreResponse { id }))
123}
124
125pub fn route(config: &Config) -> MethodRouter<ServiceState> {
126 post(handle).route_layer(DefaultBodyLimit::max(config.max_envelope_size()))
127}