use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
use relay_config::RelayMode;
use relay_event_schema::protocol::{EventId, EventType};
use relay_quotas::RateLimits;
use relay_statsd::metric;
use serde::Deserialize;
use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
use crate::service::ServiceState;
use crate::services::buffer::{EnvelopeBuffer, ProjectKeyPair};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{BucketSource, MetricData, ProcessMetrics, ProcessingGroup};
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope};
#[derive(Clone, Copy, Debug, thiserror::Error)]
#[error("the service is overloaded")]
pub struct ServiceUnavailable;
impl From<relay_system::SendError> for ServiceUnavailable {
fn from(_: relay_system::SendError) -> Self {
Self
}
}
impl IntoResponse for ServiceUnavailable {
fn into_response(self) -> axum::response::Response {
(
StatusCode::SERVICE_UNAVAILABLE,
ApiErrorResponse::from_error(&self),
)
.into_response()
}
}
#[derive(Debug, thiserror::Error)]
pub enum BadStoreRequest {
#[error("empty request body")]
EmptyBody,
#[error("invalid request body")]
InvalidBody(#[source] std::io::Error),
#[error("invalid JSON data: {0:?}")]
InvalidJson(#[source] serde_json::Error),
#[error("invalid messagepack data")]
InvalidMsgpack(#[source] rmp_serde::decode::Error),
#[error("invalid event envelope")]
InvalidEnvelope(#[from] EnvelopeError),
#[error("invalid multipart data")]
InvalidMultipart(#[from] multer::Error),
#[error("invalid minidump")]
InvalidMinidump,
#[error("missing minidump")]
MissingMinidump,
#[error("invalid compression container")]
InvalidCompressionContainer(#[source] std::io::Error),
#[error("invalid event id")]
InvalidEventId,
#[error("failed to queue envelope")]
QueueFailed,
#[error(
"envelope exceeded size limits for type '{0}' (https://develop.sentry.dev/sdk/envelopes/#size-limits)"
)]
Overflow(ItemType),
#[error(
"Sentry dropped data due to a quota or internal rate limit being reached. This will not affect your application. See https://docs.sentry.io/product/accounts/quotas/ for more information."
)]
RateLimited(RateLimits),
#[error("event submission rejected with_reason: {0:?}")]
EventRejected(DiscardReason),
}
impl IntoResponse for BadStoreRequest {
fn into_response(self) -> axum::response::Response {
let body = ApiErrorResponse::from_error(&self);
let response = match &self {
BadStoreRequest::RateLimited(rate_limits) => {
let retry_after_header = rate_limits
.longest()
.map(|limit| limit.retry_after.remaining_seconds().to_string())
.unwrap_or_default();
let rate_limits_header = utils::format_rate_limits(rate_limits);
let headers = [
(header::RETRY_AFTER.as_str(), retry_after_header),
(utils::RATE_LIMITS_HEADER, rate_limits_header),
];
(StatusCode::TOO_MANY_REQUESTS, headers, body).into_response()
}
BadStoreRequest::QueueFailed => {
(StatusCode::SERVICE_UNAVAILABLE, body).into_response()
}
BadStoreRequest::EventRejected(_) => {
(StatusCode::FORBIDDEN, body).into_response()
}
_ => {
(StatusCode::BAD_REQUEST, body).into_response()
}
};
metric!(counter(RelayCounters::EnvelopeRejected) += 1);
if response.status().is_server_error() {
relay_log::error!(
error = &self as &dyn std::error::Error,
"error handling request"
);
} else if response.status().is_client_error() {
relay_log::debug!(
error = &self as &dyn std::error::Error,
"error handling request"
);
}
response
}
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct MinimalEvent {
#[serde(default, rename = "event_id")]
pub id: Option<EventId>,
#[serde(default, rename = "type")]
pub ty: EventType,
}
pub fn minimal_event_from_json(data: &[u8]) -> Result<MinimalEvent, BadStoreRequest> {
serde_json::from_slice(data).map_err(BadStoreRequest::InvalidJson)
}
pub fn event_id_from_json(data: &[u8]) -> Result<Option<EventId>, BadStoreRequest> {
minimal_event_from_json(data).map(|event| event.id)
}
pub fn event_id_from_msgpack(data: &[u8]) -> Result<Option<EventId>, BadStoreRequest> {
rmp_serde::from_slice(data)
.map(|MinimalEvent { id, .. }| id)
.map_err(BadStoreRequest::InvalidMsgpack)
}
pub fn event_id_from_formdata(data: &[u8]) -> Result<Option<EventId>, BadStoreRequest> {
for entry in FormDataIter::new(data) {
if entry.key() == "sentry" {
return event_id_from_json(entry.value().as_bytes());
} else if entry.key() == "sentry[event_id]" {
return entry
.value()
.parse()
.map(Some)
.map_err(|_| BadStoreRequest::InvalidEventId);
}
}
Ok(None)
}
pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreRequest> {
if let Some(item) = items.iter().find(|item| item.ty() == &ItemType::Event) {
if let Some(event_id) = event_id_from_json(&item.payload())? {
return Ok(Some(event_id));
}
}
if let Some(item) = items
.iter()
.find(|item| item.attachment_type() == Some(&AttachmentType::EventPayload))
{
if let Some(event_id) = event_id_from_msgpack(&item.payload())? {
return Ok(Some(event_id));
}
}
if let Some(item) = items.iter().find(|item| item.ty() == &ItemType::FormData) {
if let Ok(Some(event_id)) = event_id_from_formdata(&item.payload()) {
return Ok(Some(event_id));
}
}
Ok(None)
}
fn queue_envelope(
state: &ServiceState,
mut managed_envelope: ManagedEnvelope,
) -> Result<(), BadStoreRequest> {
let envelope = managed_envelope.envelope_mut();
if state.config().relay_mode() != RelayMode::Proxy {
let is_metric = |i: &Item| matches!(i.ty(), ItemType::Statsd | ItemType::MetricBuckets);
let metric_items = envelope.take_items_by(is_metric);
if !metric_items.is_empty() {
relay_log::trace!("sending metrics into processing queue");
state.processor().send(ProcessMetrics {
data: MetricData::Raw(metric_items.into_vec()),
received_at: envelope.received_at(),
sent_at: envelope.sent_at(),
project_key: envelope.meta().public_key(),
source: BucketSource::from_meta(envelope.meta()),
});
}
}
let scoping = managed_envelope.scoping();
let envelopes = ProcessingGroup::split_envelope(*managed_envelope.take_envelope());
for (group, envelope) in envelopes {
let mut envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
group,
);
envelope.scope(scoping);
let project_key_pair = ProjectKeyPair::from_envelope(envelope.envelope());
let buffer = state.envelope_buffer(project_key_pair);
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}
relay_log::trace!("Pushing envelope to V2 buffer");
buffer
.addr()
.send(EnvelopeBuffer::Push(envelope.into_envelope()));
}
managed_envelope.accept();
Ok(())
}
pub async fn handle_envelope(
state: &ServiceState,
envelope: Box<Envelope>,
) -> Result<Option<EventId>, BadStoreRequest> {
emit_envelope_metrics(&envelope);
if state.memory_checker().check_memory().is_exceeded() {
return Err(BadStoreRequest::QueueFailed);
};
let mut managed_envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
ProcessingGroup::Ungrouped,
);
utils::remove_unknown_items(state.config(), &mut managed_envelope);
let event_id = managed_envelope.envelope().event_id();
if managed_envelope.envelope().is_empty() {
managed_envelope.reject(Outcome::Invalid(DiscardReason::EmptyEnvelope));
return Ok(event_id);
}
let project_key = managed_envelope.envelope().meta().public_key();
if let Some(sampling_project_key) = managed_envelope.envelope().sampling_key() {
if sampling_project_key != project_key {
state.project_cache_handle().fetch(sampling_project_key);
}
}
let checked = state
.project_cache_handle()
.get(project_key)
.check_envelope(managed_envelope)
.map_err(BadStoreRequest::EventRejected)?;
let Some(mut managed_envelope) = checked.envelope else {
return Err(BadStoreRequest::RateLimited(checked.rate_limits));
};
if let Err(offender) =
utils::check_envelope_size_limits(state.config(), managed_envelope.envelope())
{
managed_envelope.reject(Outcome::Invalid(DiscardReason::TooLarge));
return Err(BadStoreRequest::Overflow(offender));
}
queue_envelope(state, managed_envelope)?;
if checked.rate_limits.is_limited() {
Err(BadStoreRequest::RateLimited(checked.rate_limits))
} else {
Ok(event_id)
}
}
fn emit_envelope_metrics(envelope: &Envelope) {
let client_name = envelope.meta().client_name();
let mut has_transaction = false;
let mut has_attachment = false;
for item in envelope.items() {
has_transaction |= item.ty() == &ItemType::Transaction;
has_attachment |= item.ty() == &ItemType::Attachment;
metric!(
histogram(RelayHistograms::EnvelopeItemSize) = item.payload().len() as u64,
item_type = item.ty().name()
);
metric!(
counter(RelayCounters::EnvelopeItems) += 1,
item_type = item.ty().name(),
sdk = client_name.name(),
);
metric!(
counter(RelayCounters::EnvelopeItemBytes) += item.payload().len() as u64,
item_type = item.ty().name(),
sdk = client_name.name(),
);
}
if has_transaction && has_attachment {
metric!(
counter(RelayCounters::TransactionsWithAttachments) += 1,
sdk = client_name.name(),
);
relay_log::with_scope(
|scope| {
scope.set_tag(
"project_key",
envelope.meta().get_partial_scoping().project_key,
);
},
|| {
relay_log::sentry::capture_message(
"transaction with attachment",
relay_log::sentry::Level::Info,
);
},
);
}
}
#[derive(Debug)]
pub struct TextResponse(pub Option<EventId>);
impl IntoResponse for TextResponse {
fn into_response(self) -> axum::response::Response {
let EventId(id) = self.0.unwrap_or_default();
debug_assert!(!id.is_nil());
let text = id.as_hyphenated().to_string();
text.into_response()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_minimal_empty_event() {
let json = r#"{}"#;
let minimal = minimal_event_from_json(json.as_ref()).unwrap();
assert_eq!(
minimal,
MinimalEvent {
id: None,
ty: EventType::Default,
}
);
}
#[test]
fn test_minimal_event_id() {
let json = r#"{"event_id": "037af9ac1b49494bacd7ec5114f801d9"}"#;
let minimal = minimal_event_from_json(json.as_ref()).unwrap();
assert_eq!(
minimal,
MinimalEvent {
id: Some("037af9ac1b49494bacd7ec5114f801d9".parse().unwrap()),
ty: EventType::Default,
}
);
}
#[test]
fn test_minimal_event_type() {
let json = r#"{"type": "expectct"}"#;
let minimal = minimal_event_from_json(json.as_ref()).unwrap();
assert_eq!(
minimal,
MinimalEvent {
id: None,
ty: EventType::ExpectCt,
}
);
}
#[test]
fn test_minimal_event_invalid_type() {
let json = r#"{"type": "invalid"}"#;
let minimal = minimal_event_from_json(json.as_ref()).unwrap();
assert_eq!(
minimal,
MinimalEvent {
id: None,
ty: EventType::Default,
}
);
}
}