use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::error::Error;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::{Arc, Once};
use std::time::Duration;
use anyhow::Context;
use brotli::CompressorWriter as BrotliEncoder;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use flate2::write::{GzEncoder, ZlibEncoder};
use flate2::Compression;
use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
use relay_common::time::UnixTimestamp;
use relay_config::{Config, HttpEncoding, NormalizationLevel, RelayMode};
use relay_dynamic_config::{CombinedMetricExtractionConfig, ErrorBoundary, Feature};
use relay_event_normalization::{
normalize_event, validate_event, ClockDriftProcessor, CombinedMeasurementsConfig,
EventValidationConfig, GeoIpLookup, MeasurementsConfig, NormalizationConfig, RawUserAgentInfo,
TransactionNameConfig,
};
use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::{
ClientReport, Event, EventId, EventType, IpAddr, Metrics, NetworkReportError,
};
use relay_filter::FilterStatKey;
use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
use relay_pii::PiiConfigError;
use relay_protocol::{Annotated, Empty};
use relay_quotas::{DataCategory, RateLimits, Scoping};
use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, NoResponse, Service};
use reqwest::header;
use smallvec::{smallvec, SmallVec};
use zstd::stream::Encoder as ZstdEncoder;
use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
use crate::extractors::{PartialDsn, RequestMeta};
use crate::http;
use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
use crate::metrics_extraction::transactions::types::ExtractMetricsError;
use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::event::FiltersStatus;
use crate::services::projects::cache::ProjectCacheHandle;
use crate::services::projects::project::{ProjectInfo, ProjectState};
use crate::services::test_store::{Capture, TestStore};
use crate::services::upstream::{
SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
};
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{
self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope,
WorkerGroup,
};
use relay_base_schema::organization::OrganizationId;
#[cfg(feature = "processing")]
use {
crate::services::store::{Store, StoreEnvelope},
crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter, ItemAction},
itertools::Itertools,
relay_cardinality::{
CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
RedisSetLimiterOptions,
},
relay_dynamic_config::{CardinalityLimiterMode, GlobalConfig, MetricExtractionGroups},
relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
relay_redis::{RedisPool, RedisPools},
std::iter::Chain,
std::slice::Iter,
std::time::Instant,
symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
};
mod attachment;
mod dynamic_sampling;
mod event;
mod metrics;
mod profile;
mod profile_chunk;
mod replay;
mod report;
mod session;
mod span;
pub use span::extract_transaction_span;
#[cfg(feature = "processing")]
mod unreal;
macro_rules! if_processing {
($config:expr, $if_true:block) => {
#[cfg(feature = "processing")] {
if $config.processing_enabled() $if_true
}
};
($config:expr, $if_true:block else $if_false:block) => {
{
#[cfg(feature = "processing")] {
if $config.processing_enabled() $if_true else $if_false
}
#[cfg(not(feature = "processing"))] {
$if_false
}
}
};
}
const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
#[derive(Debug)]
pub struct GroupTypeError;
impl Display for GroupTypeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("failed to convert processing group into corresponding type")
}
}
impl std::error::Error for GroupTypeError {}
macro_rules! processing_group {
($ty:ident, $variant:ident) => {
#[derive(Clone, Copy, Debug)]
pub struct $ty;
impl From<$ty> for ProcessingGroup {
fn from(_: $ty) -> Self {
ProcessingGroup::$variant
}
}
impl TryFrom<ProcessingGroup> for $ty {
type Error = GroupTypeError;
fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
if matches!(value, ProcessingGroup::$variant) {
return Ok($ty);
}
return Err(GroupTypeError);
}
}
};
}
pub trait EventProcessing {}
pub trait Sampling {
fn supports_sampling(project_info: &ProjectInfo) -> bool;
fn supports_reservoir_sampling() -> bool;
}
processing_group!(TransactionGroup, Transaction);
impl EventProcessing for TransactionGroup {}
impl Sampling for TransactionGroup {
fn supports_sampling(project_info: &ProjectInfo) -> bool {
matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled())
}
fn supports_reservoir_sampling() -> bool {
true
}
}
processing_group!(ErrorGroup, Error);
impl EventProcessing for ErrorGroup {}
processing_group!(SessionGroup, Session);
processing_group!(StandaloneGroup, Standalone);
processing_group!(ClientReportGroup, ClientReport);
processing_group!(ReplayGroup, Replay);
processing_group!(CheckInGroup, CheckIn);
processing_group!(SpanGroup, Span);
impl Sampling for SpanGroup {
fn supports_sampling(project_info: &ProjectInfo) -> bool {
matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported())
}
fn supports_reservoir_sampling() -> bool {
false
}
}
processing_group!(ProfileChunkGroup, ProfileChunk);
processing_group!(MetricsGroup, Metrics);
processing_group!(ForwardUnknownGroup, ForwardUnknown);
processing_group!(Ungrouped, Ungrouped);
#[derive(Clone, Copy, Debug)]
pub struct Processed;
#[derive(Clone, Copy, Debug)]
pub enum ProcessingGroup {
Transaction,
Error,
Session,
Standalone,
ClientReport,
Replay,
CheckIn,
Span,
Metrics,
ProfileChunk,
ForwardUnknown,
Ungrouped,
}
impl ProcessingGroup {
pub fn split_envelope(mut envelope: Envelope) -> SmallVec<[(Self, Box<Envelope>); 3]> {
let headers = envelope.headers().clone();
let mut grouped_envelopes = smallvec![];
let nel_envelopes = envelope
.take_items_by(|item| matches!(item.ty(), &ItemType::Nel))
.into_iter()
.map(|item| {
let headers = headers.clone();
let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
let mut envelope = Envelope::from_parts(headers, items);
envelope.set_event_id(EventId::new());
(ProcessingGroup::Error, envelope)
});
grouped_envelopes.extend(nel_envelopes);
let replay_items = envelope.take_items_by(|item| {
matches!(
item.ty(),
&ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
)
});
if !replay_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::Replay,
Envelope::from_parts(headers.clone(), replay_items),
))
}
let session_items = envelope
.take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
if !session_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::Session,
Envelope::from_parts(headers.clone(), session_items),
))
}
let span_items = envelope.take_items_by(|item| {
matches!(
item.ty(),
&ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData
)
});
if !span_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::Span,
Envelope::from_parts(headers.clone(), span_items),
))
}
let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
if !metric_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::Metrics,
Envelope::from_parts(headers.clone(), metric_items),
))
}
let profile_chunk_items =
envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
if !profile_chunk_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::ProfileChunk,
Envelope::from_parts(headers.clone(), profile_chunk_items),
))
}
if !envelope.items().any(Item::creates_event) {
let standalone_items = envelope.take_items_by(Item::requires_event);
if !standalone_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::Standalone,
Envelope::from_parts(headers.clone(), standalone_items),
))
}
};
let security_reports_items = envelope
.take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
.into_iter()
.map(|item| {
let headers = headers.clone();
let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
let mut envelope = Envelope::from_parts(headers, items);
envelope.set_event_id(EventId::new());
(ProcessingGroup::Error, envelope)
});
grouped_envelopes.extend(security_reports_items);
let require_event_items = envelope.take_items_by(Item::requires_event);
if !require_event_items.is_empty() {
let group = if require_event_items
.iter()
.any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
{
ProcessingGroup::Transaction
} else {
ProcessingGroup::Error
};
grouped_envelopes.push((
group,
Envelope::from_parts(headers.clone(), require_event_items),
))
}
let envelopes = envelope.items_mut().map(|item| {
let headers = headers.clone();
let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
let envelope = Envelope::from_parts(headers, items);
let item_type = item.ty();
let group = if matches!(item_type, &ItemType::CheckIn) {
ProcessingGroup::CheckIn
} else if matches!(item.ty(), &ItemType::ClientReport) {
ProcessingGroup::ClientReport
} else if matches!(item_type, &ItemType::Unknown(_)) {
ProcessingGroup::ForwardUnknown
} else {
ProcessingGroup::Ungrouped
};
(group, envelope)
});
grouped_envelopes.extend(envelopes);
grouped_envelopes
}
pub fn variant(&self) -> &'static str {
match self {
ProcessingGroup::Transaction => "transaction",
ProcessingGroup::Error => "error",
ProcessingGroup::Session => "session",
ProcessingGroup::Standalone => "standalone",
ProcessingGroup::ClientReport => "client_report",
ProcessingGroup::Replay => "replay",
ProcessingGroup::CheckIn => "check_in",
ProcessingGroup::Span => "span",
ProcessingGroup::Metrics => "metrics",
ProcessingGroup::ProfileChunk => "profile_chunk",
ProcessingGroup::ForwardUnknown => "forward_unknown",
ProcessingGroup::Ungrouped => "ungrouped",
}
}
}
impl From<ProcessingGroup> for AppFeature {
fn from(value: ProcessingGroup) -> Self {
match value {
ProcessingGroup::Transaction => AppFeature::Transactions,
ProcessingGroup::Error => AppFeature::Errors,
ProcessingGroup::Session => AppFeature::Sessions,
ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
ProcessingGroup::ClientReport => AppFeature::ClientReports,
ProcessingGroup::Replay => AppFeature::Replays,
ProcessingGroup::CheckIn => AppFeature::CheckIns,
ProcessingGroup::Span => AppFeature::Spans,
ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
ProcessingGroup::ProfileChunk => AppFeature::Profiles,
ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ProcessingError {
#[error("invalid json in event")]
InvalidJson(#[source] serde_json::Error),
#[error("invalid message pack event payload")]
InvalidMsgpack(#[from] rmp_serde::decode::Error),
#[cfg(feature = "processing")]
#[error("invalid unreal crash report")]
InvalidUnrealReport(#[source] Unreal4Error),
#[error("event payload too large")]
PayloadTooLarge,
#[error("invalid transaction event")]
InvalidTransaction,
#[error("envelope processor failed")]
ProcessingFailed(#[from] ProcessingAction),
#[error("duplicate {0} in event")]
DuplicateItem(ItemType),
#[error("failed to extract event payload")]
NoEventPayload,
#[error("missing project id in DSN")]
MissingProjectId,
#[error("invalid security report type: {0:?}")]
InvalidSecurityType(Bytes),
#[error("unsupported security report type")]
UnsupportedSecurityType,
#[error("invalid security report")]
InvalidSecurityReport(#[source] serde_json::Error),
#[error("invalid nel report")]
InvalidNelReport(#[source] NetworkReportError),
#[error("event filtered with reason: {0:?}")]
EventFiltered(FilterStatKey),
#[error("missing or invalid required event timestamp")]
InvalidTimestamp,
#[error("could not serialize event payload")]
SerializeFailed(#[source] serde_json::Error),
#[cfg(feature = "processing")]
#[error("failed to apply quotas")]
QuotasFailed(#[from] RateLimitingError),
#[error("invalid pii config")]
PiiConfigError(PiiConfigError),
#[error("invalid processing group type")]
InvalidProcessingGroup(#[from] InvalidProcessingGroupType),
#[error("invalid replay")]
InvalidReplay(DiscardReason),
#[error("replay filtered with reason: {0:?}")]
ReplayFiltered(FilterStatKey),
}
impl ProcessingError {
fn to_outcome(&self) -> Option<Outcome> {
match self {
Self::PayloadTooLarge => Some(Outcome::Invalid(DiscardReason::TooLarge)),
Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
Self::InvalidSecurityType(_) => {
Some(Outcome::Invalid(DiscardReason::SecurityReportType))
}
Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
#[cfg(feature = "processing")]
Self::InvalidUnrealReport(ref err)
if err.kind() == Unreal4ErrorKind::BadCompression =>
{
Some(Outcome::Invalid(DiscardReason::InvalidCompression))
}
#[cfg(feature = "processing")]
Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
Some(Outcome::Invalid(DiscardReason::Internal))
}
#[cfg(feature = "processing")]
Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
Self::MissingProjectId => None,
Self::EventFiltered(_) => None,
Self::InvalidProcessingGroup(_) => None,
Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
}
}
fn is_unexpected(&self) -> bool {
self.to_outcome()
.map_or(false, |outcome| outcome.is_unexpected())
}
}
#[cfg(feature = "processing")]
impl From<Unreal4Error> for ProcessingError {
fn from(err: Unreal4Error) -> Self {
match err.kind() {
Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge,
_ => ProcessingError::InvalidUnrealReport(err),
}
}
}
impl From<ExtractMetricsError> for ProcessingError {
fn from(error: ExtractMetricsError) -> Self {
match error {
ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
Self::InvalidTimestamp
}
}
}
}
type ExtractedEvent = (Annotated<Event>, usize);
#[derive(Debug)]
pub struct ProcessingExtractedMetrics {
metrics: ExtractedMetrics,
}
impl ProcessingExtractedMetrics {
pub fn new() -> Self {
Self {
metrics: ExtractedMetrics::default(),
}
}
pub fn extend(
&mut self,
extracted: ExtractedMetrics,
sampling_decision: Option<SamplingDecision>,
) {
self.extend_project_metrics(extracted.project_metrics, sampling_decision);
self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
}
pub fn extend_project_metrics<I>(
&mut self,
buckets: I,
sampling_decision: Option<SamplingDecision>,
) where
I: IntoIterator<Item = Bucket>,
{
self.metrics
.project_metrics
.extend(buckets.into_iter().map(|mut bucket| {
bucket.metadata.extracted_from_indexed =
sampling_decision == Some(SamplingDecision::Keep);
bucket
}));
}
pub fn extend_sampling_metrics<I>(
&mut self,
buckets: I,
sampling_decision: Option<SamplingDecision>,
) where
I: IntoIterator<Item = Bucket>,
{
self.metrics
.sampling_metrics
.extend(buckets.into_iter().map(|mut bucket| {
bucket.metadata.extracted_from_indexed =
sampling_decision == Some(SamplingDecision::Keep);
bucket
}));
}
#[cfg(feature = "processing")]
fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
for (namespace, limit, indexed) in [
(
MetricNamespace::Transactions,
&enforcement.event,
&enforcement.event_indexed,
),
(
MetricNamespace::Spans,
&enforcement.spans,
&enforcement.spans_indexed,
),
] {
if limit.is_active() {
drop_namespaces.push(namespace);
} else if indexed.is_active() && !enforced_consistently {
reset_extracted_from_indexed.push(namespace);
}
}
if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
self.retain_mut(|bucket| {
let Some(namespace) = bucket.name.try_namespace() else {
return true;
};
if drop_namespaces.contains(&namespace) {
return false;
}
if reset_extracted_from_indexed.contains(&namespace) {
bucket.metadata.extracted_from_indexed = false;
}
true
});
}
}
#[cfg(feature = "processing")]
fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
self.metrics.project_metrics.retain_mut(&mut f);
self.metrics.sampling_metrics.retain_mut(&mut f);
}
}
fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, aggregator: &Addr<Aggregator>) {
let project_key = envelope.meta().public_key();
let ExtractedMetrics {
project_metrics,
sampling_metrics,
} = metrics;
if !project_metrics.is_empty() {
aggregator.send(MergeBuckets {
project_key,
buckets: project_metrics,
});
}
if !sampling_metrics.is_empty() {
let sampling_project_key = envelope.sampling_key().unwrap_or(project_key);
aggregator.send(MergeBuckets {
project_key: sampling_project_key,
buckets: sampling_metrics,
});
}
}
fn event_category(event: &Annotated<Event>) -> Option<DataCategory> {
event_type(event).map(DataCategory::from)
}
fn event_type(event: &Annotated<Event>) -> Option<EventType> {
event
.value()
.map(|event| event.ty.value().copied().unwrap_or_default())
}
fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
match config.relay_mode() {
RelayMode::Proxy | RelayMode::Static | RelayMode::Capture => false,
RelayMode::Managed => !project_info.has_feature(feature),
}
}
#[derive(Copy, Clone)]
struct EventFullyNormalized(bool);
impl EventFullyNormalized {
pub fn new(envelope: &Envelope) -> Self {
let event_fully_normalized = envelope.meta().is_from_internal_relay()
&& envelope
.items()
.any(|item| item.creates_event() && item.fully_normalized());
Self(event_fully_normalized)
}
}
#[derive(Debug, Copy, Clone)]
struct EventMetricsExtracted(bool);
#[derive(Debug, Copy, Clone)]
struct SpansExtracted(bool);
#[derive(Debug)]
struct ProcessingResult {
managed_envelope: TypedEnvelope<Processed>,
extracted_metrics: ProcessingExtractedMetrics,
}
impl ProcessingResult {
fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
Self {
managed_envelope,
extracted_metrics: ProcessingExtractedMetrics::new(),
}
}
fn into_inner(self) -> (TypedEnvelope<Processed>, ExtractedMetrics) {
(self.managed_envelope, self.extracted_metrics.metrics)
}
}
#[cfg_attr(not(feature = "processing"), allow(dead_code))]
pub struct ProcessEnvelopeResponse {
pub envelope: Option<TypedEnvelope<Processed>>,
}
#[derive(Debug)]
pub struct ProcessEnvelope {
pub envelope: ManagedEnvelope,
pub project_info: Arc<ProjectInfo>,
pub rate_limits: Arc<RateLimits>,
pub sampling_project_info: Option<Arc<ProjectInfo>>,
pub reservoir_counters: ReservoirCounters,
}
#[derive(Debug)]
pub struct ProcessMetrics {
pub data: MetricData,
pub project_key: ProjectKey,
pub source: BucketSource,
pub received_at: DateTime<Utc>,
pub sent_at: Option<DateTime<Utc>>,
}
#[derive(Debug)]
pub enum MetricData {
Raw(Vec<Item>),
Parsed(Vec<Bucket>),
}
impl MetricData {
fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
let items = match self {
Self::Parsed(buckets) => return buckets,
Self::Raw(items) => items,
};
let mut buckets = Vec::new();
for item in items {
let payload = item.payload();
if item.ty() == &ItemType::Statsd {
for bucket_result in Bucket::parse_all(&payload, timestamp) {
match bucket_result {
Ok(bucket) => buckets.push(bucket),
Err(error) => relay_log::debug!(
error = &error as &dyn Error,
"failed to parse metric bucket from statsd format",
),
}
}
} else if item.ty() == &ItemType::MetricBuckets {
match serde_json::from_slice::<Vec<Bucket>>(&payload) {
Ok(parsed_buckets) => {
if buckets.is_empty() {
buckets = parsed_buckets;
} else {
buckets.extend(parsed_buckets);
}
}
Err(error) => {
relay_log::debug!(
error = &error as &dyn Error,
"failed to parse metric bucket",
);
metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
}
}
} else {
relay_log::error!(
"invalid item of type {} passed to ProcessMetrics",
item.ty()
);
}
}
buckets
}
}
#[derive(Debug)]
pub struct ProcessBatchedMetrics {
pub payload: Bytes,
pub source: BucketSource,
pub received_at: DateTime<Utc>,
pub sent_at: Option<DateTime<Utc>>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum BucketSource {
Internal,
External,
}
impl BucketSource {
pub fn from_meta(meta: &RequestMeta) -> Self {
match meta.is_from_internal_relay() {
true => Self::Internal,
false => Self::External,
}
}
}
#[derive(Debug)]
pub struct SubmitEnvelope {
pub envelope: TypedEnvelope<Processed>,
}
#[derive(Debug)]
pub struct SubmitClientReports {
pub client_reports: Vec<ClientReport>,
pub scoping: Scoping,
}
#[derive(Debug)]
pub enum EnvelopeProcessor {
ProcessEnvelope(Box<ProcessEnvelope>),
ProcessProjectMetrics(Box<ProcessMetrics>),
ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
FlushBuckets(Box<FlushBuckets>),
SubmitEnvelope(Box<SubmitEnvelope>),
SubmitClientReports(Box<SubmitClientReports>),
}
impl EnvelopeProcessor {
pub fn variant(&self) -> &'static str {
match self {
EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
EnvelopeProcessor::SubmitEnvelope(_) => "SubmitEnvelope",
EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
}
}
}
impl relay_system::Interface for EnvelopeProcessor {}
impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
type Response = relay_system::NoResponse;
fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
Self::ProcessEnvelope(Box::new(message))
}
}
impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
type Response = NoResponse;
fn from_message(message: ProcessMetrics, _: ()) -> Self {
Self::ProcessProjectMetrics(Box::new(message))
}
}
impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
type Response = NoResponse;
fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
Self::ProcessBatchedMetrics(Box::new(message))
}
}
impl FromMessage<FlushBuckets> for EnvelopeProcessor {
type Response = NoResponse;
fn from_message(message: FlushBuckets, _: ()) -> Self {
Self::FlushBuckets(Box::new(message))
}
}
impl FromMessage<SubmitEnvelope> for EnvelopeProcessor {
type Response = NoResponse;
fn from_message(message: SubmitEnvelope, _: ()) -> Self {
Self::SubmitEnvelope(Box::new(message))
}
}
impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
type Response = NoResponse;
fn from_message(message: SubmitClientReports, _: ()) -> Self {
Self::SubmitClientReports(Box::new(message))
}
}
#[derive(Clone)]
pub struct EnvelopeProcessorService {
inner: Arc<InnerProcessor>,
}
pub struct Addrs {
pub outcome_aggregator: Addr<TrackOutcome>,
pub upstream_relay: Addr<UpstreamRelay>,
pub test_store: Addr<TestStore>,
#[cfg(feature = "processing")]
pub store_forwarder: Option<Addr<Store>>,
pub aggregator: Addr<Aggregator>,
}
impl Default for Addrs {
fn default() -> Self {
Addrs {
outcome_aggregator: Addr::dummy(),
upstream_relay: Addr::dummy(),
test_store: Addr::dummy(),
#[cfg(feature = "processing")]
store_forwarder: None,
aggregator: Addr::dummy(),
}
}
}
struct InnerProcessor {
workers: WorkerGroup,
config: Arc<Config>,
global_config: GlobalConfigHandle,
project_cache: ProjectCacheHandle,
cogs: Cogs,
#[cfg(feature = "processing")]
quotas_pool: Option<RedisPool>,
addrs: Addrs,
#[cfg(feature = "processing")]
rate_limiter: Option<RedisRateLimiter>,
geoip_lookup: Option<GeoIpLookup>,
#[cfg(feature = "processing")]
cardinality_limiter: Option<CardinalityLimiter>,
metric_outcomes: MetricOutcomes,
}
impl EnvelopeProcessorService {
#[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
pub fn new(
pool: ThreadPool,
config: Arc<Config>,
global_config: GlobalConfigHandle,
project_cache: ProjectCacheHandle,
cogs: Cogs,
#[cfg(feature = "processing")] redis: Option<RedisPools>,
addrs: Addrs,
metric_outcomes: MetricOutcomes,
) -> Self {
let geoip_lookup = config.geoip_path().and_then(|p| {
match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
Ok(geoip) => Some(geoip),
Err(err) => {
relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
None
}
}
});
#[cfg(feature = "processing")]
let (cardinality, quotas) = match redis {
Some(RedisPools {
cardinality,
quotas,
..
}) => (Some(cardinality), Some(quotas)),
None => (None, None),
};
let inner = InnerProcessor {
workers: WorkerGroup::new(pool),
global_config,
project_cache,
cogs,
#[cfg(feature = "processing")]
quotas_pool: quotas.clone(),
#[cfg(feature = "processing")]
rate_limiter: quotas
.map(|quotas| RedisRateLimiter::new(quotas).max_limit(config.max_rate_limit())),
addrs,
geoip_lookup,
#[cfg(feature = "processing")]
cardinality_limiter: cardinality
.map(|cardinality| {
RedisSetLimiter::new(
RedisSetLimiterOptions {
cache_vacuum_interval: config
.cardinality_limiter_cache_vacuum_interval(),
},
cardinality,
)
})
.map(CardinalityLimiter::new),
metric_outcomes,
config,
};
Self {
inner: Arc::new(inner),
}
}
#[cfg(feature = "processing")]
fn normalize_checkins(
&self,
managed_envelope: &mut TypedEnvelope<CheckInGroup>,
project_id: ProjectId,
) {
managed_envelope.retain_items(|item| {
if item.ty() != &ItemType::CheckIn {
return ItemAction::Keep;
}
match relay_monitors::process_check_in(&item.payload(), project_id) {
Ok(result) => {
item.set_routing_hint(result.routing_hint);
item.set_payload(ContentType::Json, result.payload);
ItemAction::Keep
}
Err(error) => {
relay_log::debug!(
error = &error as &dyn Error,
"dropped invalid monitor check-in"
);
ItemAction::DropSilently
}
}
})
}
#[cfg(feature = "processing")]
fn enforce_quotas<Group>(
&self,
managed_envelope: &mut TypedEnvelope<Group>,
event: Annotated<Event>,
extracted_metrics: &mut ProcessingExtractedMetrics,
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
) -> Result<Annotated<Event>, ProcessingError> {
let global_config = self.inner.global_config.current();
let rate_limiter = match self.inner.rate_limiter.as_ref() {
Some(rate_limiter) => rate_limiter,
None => return Ok(event),
};
let cached_result = RateLimiter::Cached.enforce(
managed_envelope,
event,
extracted_metrics,
&global_config,
project_info.clone(),
rate_limits.clone(),
)?;
let consistent_result = RateLimiter::Consistent(rate_limiter).enforce(
managed_envelope,
cached_result.event,
extracted_metrics,
&global_config,
project_info,
rate_limits,
)?;
if !consistent_result.rate_limits.is_empty() {
self.inner
.project_cache
.get(managed_envelope.scoping().project_key)
.rate_limits()
.merge(consistent_result.rate_limits);
}
Ok(consistent_result.event)
}
#[allow(clippy::too_many_arguments)]
fn extract_transaction_metrics(
&self,
managed_envelope: &mut TypedEnvelope<TransactionGroup>,
event: &mut Annotated<Event>,
extracted_metrics: &mut ProcessingExtractedMetrics,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
sampling_decision: SamplingDecision,
event_metrics_extracted: EventMetricsExtracted,
spans_extracted: SpansExtracted,
) -> Result<EventMetricsExtracted, ProcessingError> {
if event_metrics_extracted.0 {
return Ok(event_metrics_extracted);
}
let Some(event) = event.value_mut() else {
return Ok(event_metrics_extracted);
};
let global = self.inner.global_config.current();
let combined_config = {
let config = match &project_info.config.metric_extraction {
ErrorBoundary::Ok(ref config) if config.is_supported() => config,
_ => return Ok(event_metrics_extracted),
};
let global_config = match &global.metric_extraction {
ErrorBoundary::Ok(global_config) => global_config,
#[allow(unused_variables)]
ErrorBoundary::Err(e) => {
if_processing!(self.inner.config, {
relay_log::error!("Failed to parse global extraction config {e}");
MetricExtractionGroups::EMPTY
} else {
relay_log::debug!("Failed to parse global extraction config: {e}");
return Ok(event_metrics_extracted);
})
}
};
CombinedMetricExtractionConfig::new(global_config, config)
};
let tx_config = match &project_info.config.transaction_metrics {
Some(ErrorBoundary::Ok(tx_config)) => tx_config,
Some(ErrorBoundary::Err(e)) => {
relay_log::debug!("Failed to parse legacy transaction metrics config: {e}");
return Ok(event_metrics_extracted);
}
None => {
relay_log::debug!("Legacy transaction metrics config is missing");
return Ok(event_metrics_extracted);
}
};
if !tx_config.is_enabled() {
static TX_CONFIG_ERROR: Once = Once::new();
TX_CONFIG_ERROR.call_once(|| {
if self.inner.config.processing_enabled() {
relay_log::error!(
"Processing Relay outdated, received tx config in version {}, which is not supported",
tx_config.version
);
}
});
return Ok(event_metrics_extracted);
}
let extract_spans = !spans_extracted.0
&& project_info.config.features.produces_spans()
&& utils::sample(global.options.span_extraction_sample_rate.unwrap_or(1.0));
let metrics = crate::metrics_extraction::event::extract_metrics(
event,
combined_config,
sampling_decision,
project_id,
self.inner
.config
.aggregator_config_for(MetricNamespace::Spans)
.max_tag_value_length,
extract_spans,
);
extracted_metrics.extend(metrics, Some(sampling_decision));
if !project_info.has_feature(Feature::DiscardTransaction) {
let transaction_from_dsc = managed_envelope
.envelope()
.dsc()
.and_then(|dsc| dsc.transaction.as_deref());
let extractor = TransactionExtractor {
config: tx_config,
generic_config: Some(combined_config),
transaction_from_dsc,
sampling_decision,
target_project_id: project_id,
};
extracted_metrics.extend(extractor.extract(event)?, Some(sampling_decision));
}
Ok(EventMetricsExtracted(true))
}
fn normalize_event<Group: EventProcessing>(
&self,
managed_envelope: &mut TypedEnvelope<Group>,
event: &mut Annotated<Event>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
mut event_fully_normalized: EventFullyNormalized,
) -> Result<EventFullyNormalized, ProcessingError> {
if event.value().is_empty() {
return Ok(event_fully_normalized);
}
let full_normalization = match self.inner.config.normalization_level() {
NormalizationLevel::Full => true,
NormalizationLevel::Default => {
if self.inner.config.processing_enabled() && event_fully_normalized.0 {
return Ok(event_fully_normalized);
}
self.inner.config.processing_enabled()
}
};
let request_meta = managed_envelope.envelope().meta();
let client_ipaddr = request_meta.client_addr().map(IpAddr::from);
let transaction_aggregator_config = self
.inner
.config
.aggregator_config_for(MetricNamespace::Transactions);
let global_config = self.inner.global_config.current();
let ai_model_costs = global_config.ai_model_costs.clone().ok();
let http_span_allowed_hosts = global_config.options.http_span_allowed_hosts.as_slice();
let retention_days: i64 = project_info
.config
.event_retention
.unwrap_or(DEFAULT_EVENT_RETENTION)
.into();
utils::log_transaction_name_metrics(event, |event| {
let event_validation_config = EventValidationConfig {
received_at: Some(managed_envelope.received_at()),
max_secs_in_past: Some(retention_days * 24 * 3600),
max_secs_in_future: Some(self.inner.config.max_secs_in_future()),
transaction_timestamp_range: Some(transaction_aggregator_config.timestamp_range()),
is_validated: false,
};
let key_id = project_info
.get_public_key_config()
.and_then(|key| Some(key.numeric_id?.to_string()));
if full_normalization && key_id.is_none() {
relay_log::error!(
"project state for key {} is missing key id",
managed_envelope.envelope().meta().public_key()
);
}
let normalization_config = NormalizationConfig {
project_id: Some(project_id.value()),
client: request_meta.client().map(str::to_owned),
key_id,
protocol_version: Some(request_meta.version().to_string()),
grouping_config: project_info.config.grouping_config.clone(),
client_ip: client_ipaddr.as_ref(),
client_sample_rate: managed_envelope
.envelope()
.dsc()
.and_then(|ctx| ctx.sample_rate),
user_agent: RawUserAgentInfo {
user_agent: request_meta.user_agent(),
client_hints: request_meta.client_hints().as_deref(),
},
max_name_and_unit_len: Some(
transaction_aggregator_config
.max_name_length
.saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD),
),
breakdowns_config: project_info.config.breakdowns_v2.as_ref(),
performance_score: project_info.config.performance_score.as_ref(),
normalize_user_agent: Some(true),
transaction_name_config: TransactionNameConfig {
rules: &project_info.config.tx_name_rules,
},
device_class_synthesis_config: project_info
.has_feature(Feature::DeviceClassSynthesis),
enrich_spans: project_info.has_feature(Feature::ExtractSpansFromEvent)
|| project_info.has_feature(Feature::ExtractCommonSpanMetricsFromEvent),
max_tag_value_length: self
.inner
.config
.aggregator_config_for(MetricNamespace::Spans)
.max_tag_value_length,
is_renormalize: false,
remove_other: full_normalization,
emit_event_errors: full_normalization,
span_description_rules: project_info.config.span_description_rules.as_ref(),
geoip_lookup: self.inner.geoip_lookup.as_ref(),
ai_model_costs: ai_model_costs.as_ref(),
enable_trimming: true,
measurements: Some(CombinedMeasurementsConfig::new(
project_info.config().measurements.as_ref(),
global_config.measurements.as_ref(),
)),
normalize_spans: true,
replay_id: managed_envelope
.envelope()
.dsc()
.and_then(|ctx| ctx.replay_id),
span_allowed_hosts: http_span_allowed_hosts,
span_op_defaults: global_config.span_op_defaults.borrow(),
};
metric!(timer(RelayTimers::EventProcessingNormalization), {
validate_event(event, &event_validation_config)
.map_err(|_| ProcessingError::InvalidTransaction)?;
normalize_event(event, &normalization_config);
if full_normalization && event::has_unprintable_fields(event) {
metric!(counter(RelayCounters::EventCorrupted) += 1);
}
Result::<(), ProcessingError>::Ok(())
})
})?;
event_fully_normalized.0 |= full_normalization;
Ok(event_fully_normalized)
}
fn process_errors(
&self,
managed_envelope: &mut TypedEnvelope<ErrorGroup>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
let mut metrics = Metrics::default();
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();
report::process_user_reports(managed_envelope);
if_processing!(self.inner.config, {
unreal::expand(managed_envelope, &self.inner.config)?;
});
let extraction_result = event::extract(
managed_envelope,
&mut metrics,
event_fully_normalized,
&self.inner.config,
)?;
let mut event = extraction_result.event;
if_processing!(self.inner.config, {
if let Some(inner_event_fully_normalized) =
unreal::process(managed_envelope, &mut event)?
{
event_fully_normalized = inner_event_fully_normalized;
}
if let Some(inner_event_fully_normalized) =
attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
{
event_fully_normalized = inner_event_fully_normalized;
}
});
event::finalize(
managed_envelope,
&mut event,
&mut metrics,
&self.inner.config,
)?;
event_fully_normalized = self.normalize_event(
managed_envelope,
&mut event,
project_id,
project_info.clone(),
event_fully_normalized,
)?;
let filter_run = event::filter(
managed_envelope,
&mut event,
project_info.clone(),
&self.inner.global_config.current(),
)?;
if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
dynamic_sampling::tag_error_with_sampling_decision(
managed_envelope,
&mut event,
sampling_project_info,
&self.inner.config,
);
}
if_processing!(self.inner.config, {
event = self.enforce_quotas(
managed_envelope,
event,
&mut extracted_metrics,
project_info.clone(),
rate_limits,
)?;
});
if event.value().is_some() {
event::scrub(&mut event, project_info.clone())?;
event::serialize(
managed_envelope,
&mut event,
event_fully_normalized,
EventMetricsExtracted(false),
SpansExtracted(false),
)?;
event::emit_feedback_metrics(managed_envelope.envelope());
}
attachment::scrub(managed_envelope, project_info);
if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
relay_log::error!(
tags.project = %project_id,
tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
"ingested event without normalizing"
);
}
Ok(Some(extracted_metrics))
}
#[allow(unused_assignments)]
#[allow(clippy::too_many_arguments)]
fn process_transactions(
&self,
managed_envelope: &mut TypedEnvelope<TransactionGroup>,
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
mut sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
reservoir_counters: ReservoirCounters,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
let mut event_metrics_extracted = EventMetricsExtracted(false);
let mut spans_extracted = SpansExtracted(false);
let mut metrics = Metrics::default();
let mut extracted_metrics = ProcessingExtractedMetrics::new();
let global_config = self.inner.global_config.current();
let extraction_result = event::extract(
managed_envelope,
&mut metrics,
event_fully_normalized,
&self.inner.config,
)?;
if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
event_metrics_extracted = inner_event_metrics_extracted;
}
if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
spans_extracted = inner_spans_extracted;
};
let mut event = extraction_result.event;
let profile_id = profile::filter(
managed_envelope,
&event,
config.clone(),
project_id,
project_info.clone(),
);
profile::transfer_id(&mut event, profile_id);
event::finalize(
managed_envelope,
&mut event,
&mut metrics,
&self.inner.config,
)?;
event_fully_normalized = self.normalize_event(
managed_envelope,
&mut event,
project_id,
project_info.clone(),
event_fully_normalized,
)?;
sampling_project_info = dynamic_sampling::validate_and_set_dsc(
managed_envelope,
&mut event,
project_info.clone(),
sampling_project_info.clone(),
);
let filter_run = event::filter(
managed_envelope,
&mut event,
project_info.clone(),
&self.inner.global_config.current(),
)?;
let run_dynamic_sampling =
matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();
let reservoir = self.new_reservoir_evaluator(
managed_envelope.scoping().organization_id,
reservoir_counters,
);
let sampling_result = match run_dynamic_sampling {
true => dynamic_sampling::run(
managed_envelope,
&mut event,
config.clone(),
project_info.clone(),
sampling_project_info,
&reservoir,
),
false => SamplingResult::Pending,
};
#[cfg(feature = "processing")]
let server_sample_rate = match sampling_result {
SamplingResult::Match(ref sampling_match) => Some(sampling_match.sample_rate()),
SamplingResult::NoMatch | SamplingResult::Pending => None,
};
if let Some(outcome) = sampling_result.into_dropped_outcome() {
profile::process(
managed_envelope,
&mut event,
&global_config,
config.clone(),
project_info.clone(),
);
event_metrics_extracted = self.extract_transaction_metrics(
managed_envelope,
&mut event,
&mut extracted_metrics,
project_id,
project_info.clone(),
SamplingDecision::Drop,
event_metrics_extracted,
spans_extracted,
)?;
dynamic_sampling::drop_unsampled_items(managed_envelope, event, outcome);
if_processing!(self.inner.config, {
event = self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
project_info.clone(),
rate_limits,
)?;
});
return Ok(Some(extracted_metrics));
}
event::scrub(&mut event, project_info.clone())?;
attachment::scrub(managed_envelope, project_info.clone());
if_processing!(self.inner.config, {
let profile_id = profile::process(
managed_envelope,
&mut event,
&global_config,
config.clone(),
project_info.clone(),
);
profile::transfer_id(&mut event, profile_id);
event_metrics_extracted = self.extract_transaction_metrics(
managed_envelope,
&mut event,
&mut extracted_metrics,
project_id,
project_info.clone(),
SamplingDecision::Keep,
event_metrics_extracted,
spans_extracted,
)?;
if project_info.has_feature(Feature::ExtractSpansFromEvent) {
spans_extracted = span::extract_from_event(
managed_envelope,
&event,
&global_config,
config,
project_info.clone(),
server_sample_rate,
event_metrics_extracted,
spans_extracted,
);
}
event = self.enforce_quotas(
managed_envelope,
event,
&mut extracted_metrics,
project_info.clone(),
rate_limits,
)?;
event = span::maybe_discard_transaction(managed_envelope, event, project_info);
});
if event.value().is_some() {
event::serialize(
managed_envelope,
&mut event,
event_fully_normalized,
event_metrics_extracted,
spans_extracted,
)?;
}
if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
relay_log::error!(
tags.project = %project_id,
tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
"ingested event without normalizing"
);
};
Ok(Some(extracted_metrics))
}
fn process_profile_chunks(
&self,
managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
project_info: Arc<ProjectInfo>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
profile_chunk::filter(managed_envelope, project_info.clone());
if_processing!(self.inner.config, {
profile_chunk::process(
managed_envelope,
project_info,
&self.inner.global_config.current(),
&self.inner.config,
);
});
Ok(None)
}
fn process_standalone(
&self,
managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();
profile::filter(
managed_envelope,
&Annotated::empty(),
config,
project_id,
project_info.clone(),
);
if_processing!(self.inner.config, {
self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
project_info.clone(),
rate_limits,
)?;
});
report::process_user_reports(managed_envelope);
attachment::scrub(managed_envelope, project_info);
Ok(Some(extracted_metrics))
}
fn process_sessions(
&self,
managed_envelope: &mut TypedEnvelope<SessionGroup>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut extracted_metrics = ProcessingExtractedMetrics::new();
session::process(
managed_envelope,
&mut extracted_metrics,
project_info.clone(),
&self.inner.config,
);
if_processing!(self.inner.config, {
self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
project_info,
rate_limits,
)?;
});
Ok(Some(extracted_metrics))
}
fn process_client_reports(
&self,
managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();
if_processing!(self.inner.config, {
self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
project_info.clone(),
rate_limits,
)?;
});
report::process_client_reports(
managed_envelope,
config,
project_info,
self.inner.addrs.outcome_aggregator.clone(),
);
Ok(Some(extracted_metrics))
}
fn process_replays(
&self,
managed_envelope: &mut TypedEnvelope<ReplayGroup>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();
replay::process(
managed_envelope,
&self.inner.global_config.current(),
config,
project_info.clone(),
self.inner.geoip_lookup.as_ref(),
)?;
if_processing!(self.inner.config, {
self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
project_info,
rate_limits,
)?;
});
Ok(Some(extracted_metrics))
}
fn process_checkins(
&self,
#[allow(unused_variables)] managed_envelope: &mut TypedEnvelope<CheckInGroup>,
#[allow(unused_variables)] project_id: ProjectId,
#[allow(unused_variables)] project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();
if_processing!(self.inner.config, {
self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
project_info,
rate_limits,
)?;
self.normalize_checkins(managed_envelope, project_id);
});
Ok(Some(extracted_metrics))
}
#[allow(clippy::too_many_arguments)]
fn process_standalone_spans(
&self,
managed_envelope: &mut TypedEnvelope<SpanGroup>,
config: Arc<Config>,
#[allow(unused_variables)] project_id: ProjectId,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
#[allow(unused_variables)] reservoir_counters: ReservoirCounters,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();
span::filter(managed_envelope, config.clone(), project_info.clone());
span::convert_otel_traces_data(managed_envelope);
if_processing!(self.inner.config, {
let global_config = self.inner.global_config.current();
let reservoir = self.new_reservoir_evaluator(
managed_envelope.scoping().organization_id,
reservoir_counters,
);
span::process(
managed_envelope,
&mut Annotated::empty(),
&mut extracted_metrics,
&global_config,
config,
project_id,
project_info.clone(),
sampling_project_info,
self.inner.geoip_lookup.as_ref(),
&reservoir,
);
self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
project_info,
rate_limits,
)?;
});
Ok(Some(extracted_metrics))
}
fn process_envelope(
&self,
mut managed_envelope: ManagedEnvelope,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
sampling_project_info: Option<Arc<ProjectInfo>>,
reservoir_counters: ReservoirCounters,
) -> Result<ProcessingResult, ProcessingError> {
let group = managed_envelope.group();
if let Some(sampling_state) = sampling_project_info.as_ref() {
managed_envelope
.envelope_mut()
.parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
}
if let Some(retention) = project_info.config.event_retention {
managed_envelope.envelope_mut().set_retention(retention);
}
managed_envelope
.envelope_mut()
.meta_mut()
.set_project_id(project_id);
macro_rules! run {
($fn_name:ident $(, $args:expr)*) => {{
let mut managed_envelope = managed_envelope.try_into()?;
match self.$fn_name(&mut managed_envelope, $($args),*) {
Ok(extracted_metrics) => Ok(ProcessingResult {
managed_envelope: managed_envelope.into_processed(),
extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
}),
Err(error) => {
if let Some(outcome) = error.to_outcome() {
managed_envelope.reject(outcome);
}
return Err(error);
}
}
}};
}
relay_log::trace!("Processing {group} group", group = group.variant());
match group {
ProcessingGroup::Error => run!(
process_errors,
project_id,
project_info,
sampling_project_info,
rate_limits
),
ProcessingGroup::Transaction => {
run!(
process_transactions,
self.inner.config.clone(),
project_id,
project_info,
sampling_project_info,
rate_limits,
reservoir_counters
)
}
ProcessingGroup::Session => run!(process_sessions, project_info, rate_limits),
ProcessingGroup::Standalone => run!(
process_standalone,
self.inner.config.clone(),
project_id,
project_info,
rate_limits
),
ProcessingGroup::ClientReport => run!(
process_client_reports,
self.inner.config.clone(),
project_info,
rate_limits
),
ProcessingGroup::Replay => {
run!(
process_replays,
self.inner.config.clone(),
project_info,
rate_limits
)
}
ProcessingGroup::CheckIn => {
run!(process_checkins, project_id, project_info, rate_limits)
}
ProcessingGroup::Span => run!(
process_standalone_spans,
self.inner.config.clone(),
project_id,
project_info,
sampling_project_info,
rate_limits,
reservoir_counters
),
ProcessingGroup::ProfileChunk => run!(process_profile_chunks, project_info),
ProcessingGroup::Metrics => {
if self.inner.config.relay_mode() != RelayMode::Proxy {
relay_log::error!(
tags.project = %project_id,
items = ?managed_envelope.envelope().items().next().map(Item::ty),
"received metrics in the process_state"
);
}
Ok(ProcessingResult::no_metrics(
managed_envelope.into_processed(),
))
}
ProcessingGroup::Ungrouped => {
relay_log::error!(
tags.project = %project_id,
items = ?managed_envelope.envelope().items().next().map(Item::ty),
"could not identify the processing group based on the envelope's items"
);
Ok(ProcessingResult::no_metrics(
managed_envelope.into_processed(),
))
}
ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
managed_envelope.into_processed(),
)),
}
}
fn process(
&self,
message: ProcessEnvelope,
) -> Result<ProcessEnvelopeResponse, ProcessingError> {
let ProcessEnvelope {
envelope: mut managed_envelope,
project_info,
rate_limits,
sampling_project_info,
reservoir_counters,
} = message;
let project_id = match project_info
.project_id
.or_else(|| managed_envelope.envelope().meta().project_id())
{
Some(project_id) => project_id,
None => {
managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal));
return Err(ProcessingError::MissingProjectId);
}
};
let client = managed_envelope
.envelope()
.meta()
.client()
.map(str::to_owned);
let user_agent = managed_envelope
.envelope()
.meta()
.user_agent()
.map(str::to_owned);
relay_log::with_scope(
|scope| {
scope.set_tag("project", project_id);
if let Some(client) = client {
scope.set_tag("sdk", client);
}
if let Some(user_agent) = user_agent {
scope.set_extra("user_agent", user_agent.into());
}
},
|| {
match self.process_envelope(
managed_envelope,
project_id,
project_info,
rate_limits,
sampling_project_info,
reservoir_counters,
) {
Ok(result) => {
let (mut managed_envelope, extracted_metrics) = result.into_inner();
managed_envelope.update();
let has_metrics = !extracted_metrics.project_metrics.is_empty();
if has_metrics {
send_metrics(
extracted_metrics,
managed_envelope.envelope(),
&self.inner.addrs.aggregator,
);
}
let envelope_response = if managed_envelope.envelope().is_empty() {
if !has_metrics {
managed_envelope.reject(Outcome::RateLimited(None));
} else {
managed_envelope.accept();
}
None
} else {
Some(managed_envelope)
};
Ok(ProcessEnvelopeResponse {
envelope: envelope_response,
})
}
Err(err) => Err(err),
}
},
)
}
fn handle_process_envelope(&self, message: ProcessEnvelope) {
let project_key = message.envelope.envelope().meta().public_key();
let wait_time = message.envelope.age();
metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
let group = message.envelope.group().variant();
let result = metric!(timer(RelayTimers::EnvelopeProcessingTime), group = group, {
self.process(message)
});
match result {
Ok(response) => {
if let Some(envelope) = response.envelope {
self.handle_submit_envelope(SubmitEnvelope { envelope });
};
}
Err(error) => {
if error.is_unexpected() {
relay_log::error!(
tags.project_key = %project_key,
error = &error as &dyn Error,
"error processing envelope"
);
}
}
}
}
fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
let ProcessMetrics {
data,
project_key,
received_at,
sent_at,
source,
} = message;
let received_timestamp =
UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
let mut buckets = data.into_buckets(received_timestamp);
if buckets.is_empty() {
return;
};
cogs.update(relay_metrics::cogs::BySize(&buckets));
let clock_drift_processor =
ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
buckets.retain_mut(|bucket| {
if let Err(error) = relay_metrics::normalize_bucket(bucket) {
relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
return false;
}
if !self::metrics::is_valid_namespace(bucket, source) {
return false;
}
clock_drift_processor.process_timestamp(&mut bucket.timestamp);
if !matches!(source, BucketSource::Internal) {
bucket.metadata = BucketMetadata::new(received_timestamp);
}
true
});
let project = self.inner.project_cache.get(project_key);
let buckets = match project.state() {
ProjectState::Enabled(project_info) => {
let rate_limits = project.rate_limits().current_limits();
self.check_buckets(project_key, project_info, &rate_limits, buckets)
}
_ => buckets,
};
relay_log::trace!("merging metric buckets into the aggregator");
self.inner
.addrs
.aggregator
.send(MergeBuckets::new(project_key, buckets));
}
fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
let ProcessBatchedMetrics {
payload,
source,
received_at,
sent_at,
} = message;
#[derive(serde::Deserialize)]
struct Wrapper {
buckets: HashMap<ProjectKey, Vec<Bucket>>,
}
let buckets = match serde_json::from_slice(&payload) {
Ok(Wrapper { buckets }) => buckets,
Err(error) => {
relay_log::debug!(
error = &error as &dyn Error,
"failed to parse batched metrics",
);
metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
return;
}
};
for (project_key, buckets) in buckets {
self.handle_process_metrics(
cogs,
ProcessMetrics {
data: MetricData::Parsed(buckets),
project_key,
source,
received_at,
sent_at,
},
)
}
}
fn handle_submit_envelope(&self, message: SubmitEnvelope) {
let SubmitEnvelope { mut envelope } = message;
#[cfg(feature = "processing")]
if self.inner.config.processing_enabled() {
if let Some(store_forwarder) = self.inner.addrs.store_forwarder.clone() {
relay_log::trace!("sending envelope to kafka");
store_forwarder.send(StoreEnvelope { envelope });
return;
}
}
if Capture::should_capture(&self.inner.config) {
relay_log::trace!("capturing envelope in memory");
self.inner
.addrs
.test_store
.send(Capture::accepted(envelope));
return;
}
envelope.envelope_mut().set_sent_at(Utc::now());
relay_log::trace!("sending envelope to sentry endpoint");
let http_encoding = self.inner.config.http_encoding();
let result = envelope.envelope().to_vec().and_then(|v| {
encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
});
match result {
Ok(body) => {
self.inner
.addrs
.upstream_relay
.send(SendRequest(SendEnvelope {
envelope,
body,
http_encoding,
project_cache: self.inner.project_cache.clone(),
}));
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
tags.project_key = %envelope.scoping().project_key,
"failed to serialize envelope payload"
);
envelope.reject(Outcome::Invalid(DiscardReason::Internal));
}
}
}
fn handle_submit_client_reports(&self, message: SubmitClientReports) {
let SubmitClientReports {
client_reports,
scoping,
} = message;
let upstream = self.inner.config.upstream_descriptor();
let dsn = PartialDsn::outbound(&scoping, upstream);
let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
for client_report in client_reports {
let mut item = Item::new(ItemType::ClientReport);
item.set_payload(ContentType::Json, client_report.serialize().unwrap()); envelope.add_item(item);
}
let envelope = ManagedEnvelope::new(
envelope,
self.inner.addrs.outcome_aggregator.clone(),
self.inner.addrs.test_store.clone(),
ProcessingGroup::ClientReport,
);
self.handle_submit_envelope(SubmitEnvelope {
envelope: envelope.into_processed(),
});
}
fn check_buckets(
&self,
project_key: ProjectKey,
project_info: &ProjectInfo,
rate_limits: &RateLimits,
buckets: Vec<Bucket>,
) -> Vec<Bucket> {
let Some(scoping) = project_info.scoping(project_key) else {
relay_log::error!(
tags.project_key = project_key.as_str(),
"there is no scoping: dropping {} buckets",
buckets.len(),
);
return Vec::new();
};
let mut buckets = self::metrics::apply_project_info(
buckets,
&self.inner.metric_outcomes,
project_info,
scoping,
);
let namespaces: BTreeSet<MetricNamespace> = buckets
.iter()
.filter_map(|bucket| bucket.name.try_namespace())
.collect();
for namespace in namespaces {
let limits = rate_limits.check_with_quotas(
project_info.get_quotas(),
scoping.item(DataCategory::MetricBucket),
);
if limits.is_limited() {
let rejected;
(buckets, rejected) = utils::split_off(buckets, |bucket| {
bucket.name.try_namespace() == Some(namespace)
});
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
self.inner.metric_outcomes.track(
scoping,
&rejected,
Outcome::RateLimited(reason_code),
);
}
}
let quotas = project_info.config.quotas.clone();
match MetricsLimiter::create(buckets, quotas, scoping) {
Ok(mut bucket_limiter) => {
bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
bucket_limiter.into_buckets()
}
Err(buckets) => buckets,
}
}
#[cfg(feature = "processing")]
fn rate_limit_buckets(
&self,
scoping: Scoping,
project_info: &ProjectInfo,
mut buckets: Vec<Bucket>,
) -> Vec<Bucket> {
let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else {
return buckets;
};
let global_config = self.inner.global_config.current();
let namespaces = buckets
.iter()
.filter_map(|bucket| bucket.name.try_namespace())
.counts();
let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
for (namespace, quantity) in namespaces {
let item_scoping = scoping.metric_bucket(namespace);
let limits = match rate_limiter.is_rate_limited(quotas, item_scoping, quantity, false) {
Ok(limits) => limits,
Err(err) => {
relay_log::error!(
error = &err as &dyn std::error::Error,
"failed to check redis rate limits"
);
break;
}
};
if limits.is_limited() {
let rejected;
(buckets, rejected) = utils::split_off(buckets, |bucket| {
bucket.name.try_namespace() == Some(namespace)
});
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
self.inner.metric_outcomes.track(
scoping,
&rejected,
Outcome::RateLimited(reason_code),
);
self.inner
.project_cache
.get(item_scoping.scoping.project_key)
.rate_limits()
.merge(limits);
}
}
match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
Err(buckets) => buckets,
Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter),
}
}
#[cfg(feature = "processing")]
fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
relay_log::trace!("handle_rate_limit_buckets");
let scoping = *bucket_limiter.scoping();
if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
let global_config = self.inner.global_config.current();
let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
let over_accept_once = true;
let mut rate_limits = RateLimits::new();
for category in [DataCategory::Transaction, DataCategory::Span] {
let count = bucket_limiter.count(category);
let timer = Instant::now();
let mut is_limited = false;
if let Some(count) = count {
match rate_limiter.is_rate_limited(
quotas,
scoping.item(category),
count,
over_accept_once,
) {
Ok(limits) => {
is_limited = limits.is_limited();
rate_limits.merge(limits)
}
Err(e) => relay_log::error!(error = &e as &dyn Error),
}
}
relay_statsd::metric!(
timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
category = category.name(),
limited = if is_limited { "true" } else { "false" },
count = match count {
None => "none",
Some(0) => "0",
Some(1) => "1",
Some(1..=10) => "10",
Some(1..=25) => "25",
Some(1..=50) => "50",
Some(51..=100) => "100",
Some(101..=500) => "500",
_ => "> 500",
},
);
}
if rate_limits.is_limited() {
let was_enforced =
bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
if was_enforced {
self.inner
.project_cache
.get(scoping.project_key)
.rate_limits()
.merge(rate_limits);
}
}
}
bucket_limiter.into_buckets()
}
#[cfg(feature = "processing")]
fn cardinality_limit_buckets(
&self,
scoping: Scoping,
limits: &[CardinalityLimit],
buckets: Vec<Bucket>,
) -> Vec<Bucket> {
let global_config = self.inner.global_config.current();
let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
return buckets;
}
let Some(ref limiter) = self.inner.cardinality_limiter else {
return buckets;
};
let scope = relay_cardinality::Scoping {
organization_id: scoping.organization_id,
project_id: scoping.project_id,
};
let limits = match limiter.check_cardinality_limits(scope, limits, buckets) {
Ok(limits) => limits,
Err((buckets, error)) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"cardinality limiter failed"
);
return buckets;
}
};
let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate) {
for limit in limits.exceeded_limits() {
relay_log::with_scope(
|scope| {
scope.set_user(Some(relay_log::sentry::User {
id: Some(scoping.organization_id.to_string()),
..Default::default()
}));
},
|| {
relay_log::error!(
tags.organization_id = scoping.organization_id.value(),
tags.limit_id = limit.id,
tags.passive = limit.passive,
"Cardinality Limit"
);
},
);
}
}
for (limit, reports) in limits.cardinality_reports() {
for report in reports {
self.inner
.metric_outcomes
.cardinality(scoping, limit, report);
}
}
if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
return limits.into_source();
}
let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
for (bucket, exceeded) in rejected {
self.inner.metric_outcomes.track(
scoping,
&[bucket],
Outcome::CardinalityLimited(exceeded.id.clone()),
);
}
accepted
}
#[cfg(feature = "processing")]
fn encode_metrics_processing(&self, message: FlushBuckets, store_forwarder: &Addr<Store>) {
use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::services::store::StoreMetrics;
for ProjectBuckets {
buckets,
scoping,
project_info,
..
} in message.buckets.into_values()
{
let buckets = self.rate_limit_buckets(scoping, &project_info, buckets);
let limits = project_info.get_cardinality_limits();
let buckets = self.cardinality_limit_buckets(scoping, limits, buckets);
if buckets.is_empty() {
continue;
}
let retention = project_info
.config
.event_retention
.unwrap_or(DEFAULT_EVENT_RETENTION);
store_forwarder.send(StoreMetrics {
buckets,
scoping,
retention,
});
}
}
fn encode_metrics_envelope(&self, message: FlushBuckets) {
let FlushBuckets {
partition_key,
buckets,
} = message;
let batch_size = self.inner.config.metrics_max_batch_size_bytes();
let upstream = self.inner.config.upstream_descriptor();
for ProjectBuckets {
buckets, scoping, ..
} in buckets.values()
{
let dsn = PartialDsn::outbound(scoping, upstream);
relay_statsd::metric!(
histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
);
let mut num_batches = 0;
for batch in BucketsView::from(buckets).by_size(batch_size) {
let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
let mut item = Item::new(ItemType::MetricBuckets);
item.set_source_quantities(crate::metrics::extract_quantities(batch));
item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
envelope.add_item(item);
let mut envelope = ManagedEnvelope::new(
envelope,
self.inner.addrs.outcome_aggregator.clone(),
self.inner.addrs.test_store.clone(),
ProcessingGroup::Metrics,
);
envelope
.set_partition_key(Some(partition_key))
.scope(*scoping);
relay_statsd::metric!(
histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
);
self.handle_submit_envelope(SubmitEnvelope {
envelope: envelope.into_processed(),
});
num_batches += 1;
}
relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
}
}
fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
if partition.is_empty() {
return;
}
let (unencoded, project_info) = partition.take();
let http_encoding = self.inner.config.http_encoding();
let encoded = match encode_payload(&unencoded, http_encoding) {
Ok(payload) => payload,
Err(error) => {
let error = &error as &dyn std::error::Error;
relay_log::error!(error, "failed to encode metrics payload");
return;
}
};
let request = SendMetricsRequest {
partition_key: partition_key.to_string(),
unencoded,
encoded,
project_info,
http_encoding,
metric_outcomes: self.inner.metric_outcomes.clone(),
};
self.inner.addrs.upstream_relay.send(SendRequest(request));
}
fn encode_metrics_global(&self, message: FlushBuckets) {
let FlushBuckets {
partition_key,
buckets,
} = message;
let batch_size = self.inner.config.metrics_max_batch_size_bytes();
let mut partition = Partition::new(batch_size);
let mut partition_splits = 0;
for ProjectBuckets {
buckets, scoping, ..
} in buckets.values()
{
for bucket in buckets {
let mut remaining = Some(BucketView::new(bucket));
while let Some(bucket) = remaining.take() {
if let Some(next) = partition.insert(bucket, *scoping) {
self.send_global_partition(partition_key, &mut partition);
remaining = Some(next);
partition_splits += 1;
}
}
}
}
if partition_splits > 0 {
metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
}
self.send_global_partition(partition_key, &mut partition);
}
fn handle_flush_buckets(&self, mut message: FlushBuckets) {
for (project_key, pb) in message.buckets.iter_mut() {
let buckets = std::mem::take(&mut pb.buckets);
pb.buckets =
self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
}
#[cfg(feature = "processing")]
if self.inner.config.processing_enabled() {
if let Some(ref store_forwarder) = self.inner.addrs.store_forwarder {
return self.encode_metrics_processing(message, store_forwarder);
}
}
if self.inner.config.http_global_metrics() {
self.encode_metrics_global(message)
} else {
self.encode_metrics_envelope(message)
}
}
#[cfg(all(test, feature = "processing"))]
fn redis_rate_limiter_enabled(&self) -> bool {
self.inner.rate_limiter.is_some()
}
fn handle_message(&self, message: EnvelopeProcessor) {
let ty = message.variant();
let feature_weights = self.feature_weights(&message);
metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
match message {
EnvelopeProcessor::ProcessEnvelope(m) => self.handle_process_envelope(*m),
EnvelopeProcessor::ProcessProjectMetrics(m) => {
self.handle_process_metrics(&mut cogs, *m)
}
EnvelopeProcessor::ProcessBatchedMetrics(m) => {
self.handle_process_batched_metrics(&mut cogs, *m)
}
EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m),
EnvelopeProcessor::SubmitEnvelope(m) => self.handle_submit_envelope(*m),
EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
}
});
}
fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
match message {
EnvelopeProcessor::ProcessEnvelope(v) => AppFeature::from(v.envelope.group()).into(),
EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
EnvelopeProcessor::FlushBuckets(v) => v
.buckets
.values()
.map(|s| {
if self.inner.config.processing_enabled() {
relay_metrics::cogs::ByCount(&s.buckets).into()
} else {
relay_metrics::cogs::BySize(&s.buckets).into()
}
})
.fold(FeatureWeights::none(), FeatureWeights::merge),
EnvelopeProcessor::SubmitEnvelope(v) => AppFeature::from(v.envelope.group()).into(),
EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
}
}
fn new_reservoir_evaluator(
&self,
#[allow(unused_variables)] organization_id: OrganizationId,
reservoir_counters: ReservoirCounters,
) -> ReservoirEvaluator {
#[allow(unused_mut)]
let mut reservoir = ReservoirEvaluator::new(reservoir_counters);
#[cfg(feature = "processing")]
if let Some(quotas_pool) = self.inner.quotas_pool.as_ref() {
reservoir.set_redis(organization_id, quotas_pool);
}
reservoir
}
}
impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;
async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
while let Some(message) = rx.recv().await {
let service = self.clone();
self.inner
.workers
.spawn(move || service.handle_message(message))
.await;
}
}
}
#[cfg(feature = "processing")]
struct EnforcementResult {
event: Annotated<Event>,
rate_limits: RateLimits,
}
#[cfg(feature = "processing")]
impl EnforcementResult {
pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
Self { event, rate_limits }
}
}
#[cfg(feature = "processing")]
enum RateLimiter<'a> {
Cached,
Consistent(&'a RedisRateLimiter),
}
#[cfg(feature = "processing")]
impl RateLimiter<'_> {
fn enforce<Group>(
&self,
managed_envelope: &mut TypedEnvelope<Group>,
event: Annotated<Event>,
extracted_metrics: &mut ProcessingExtractedMetrics,
global_config: &GlobalConfig,
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
) -> Result<EnforcementResult, ProcessingError> {
if managed_envelope.envelope().is_empty() && event.value().is_none() {
return Ok(EnforcementResult::new(event, RateLimits::default()));
}
let quotas = CombinedQuotas::new(global_config, project_info.get_quotas());
if quotas.is_empty() {
return Ok(EnforcementResult::new(event, RateLimits::default()));
}
let event_category = event_category(&event);
let mut envelope_limiter =
EnvelopeLimiter::new(CheckLimits::All, |item_scope, quantity| match self {
RateLimiter::Cached => Ok(rate_limits.check_with_quotas(quotas, item_scope)),
RateLimiter::Consistent(rl) => Ok::<_, ProcessingError>(
rl.is_rate_limited(quotas, item_scope, quantity, false)?,
),
});
if let Some(category) = event_category {
envelope_limiter.assume_event(category);
}
let scoping = managed_envelope.scoping();
let (enforcement, rate_limits) =
metric!(timer(RelayTimers::EventProcessingRateLimiting), {
envelope_limiter.compute(managed_envelope.envelope_mut(), &scoping)?
});
let event_active = enforcement.is_event_active();
extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
enforcement.apply_with_outcomes(managed_envelope);
if event_active {
debug_assert!(managed_envelope.envelope().is_empty());
return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
}
Ok(EnforcementResult::new(event, rate_limits))
}
}
fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
let envelope_body: Vec<u8> = match http_encoding {
HttpEncoding::Identity => return Ok(body.clone()),
HttpEncoding::Deflate => {
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
encoder.write_all(body.as_ref())?;
encoder.finish()?
}
HttpEncoding::Gzip => {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(body.as_ref())?;
encoder.finish()?
}
HttpEncoding::Br => {
let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
encoder.write_all(body.as_ref())?;
encoder.into_inner()
}
HttpEncoding::Zstd => {
let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
encoder.write_all(body.as_ref())?;
encoder.finish()?
}
};
Ok(envelope_body.into())
}
#[derive(Debug)]
pub struct SendEnvelope {
envelope: TypedEnvelope<Processed>,
body: Bytes,
http_encoding: HttpEncoding,
project_cache: ProjectCacheHandle,
}
impl UpstreamRequest for SendEnvelope {
fn method(&self) -> reqwest::Method {
reqwest::Method::POST
}
fn path(&self) -> Cow<'_, str> {
format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
}
fn route(&self) -> &'static str {
"envelope"
}
fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
let envelope_body = self.body.clone();
metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
let meta = &self.envelope.meta();
let shard = self.envelope.partition_key().map(|p| p.to_string());
builder
.content_encoding(self.http_encoding)
.header_opt("Origin", meta.origin().map(|url| url.as_str()))
.header_opt("User-Agent", meta.user_agent())
.header("X-Sentry-Auth", meta.auth_header())
.header("X-Forwarded-For", meta.forwarded_for())
.header("Content-Type", envelope::CONTENT_TYPE)
.header_opt("X-Sentry-Relay-Shard", shard)
.body(envelope_body);
Ok(())
}
fn respond(
self: Box<Self>,
result: Result<http::Response, UpstreamRequestError>,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
Box::pin(async move {
let result = match result {
Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
Err(error) => Err(error),
};
match result {
Ok(()) => self.envelope.accept(),
Err(error) if error.is_received() => {
let scoping = self.envelope.scoping();
self.envelope.accept();
if let UpstreamRequestError::RateLimited(limits) = error {
self.project_cache
.get(scoping.project_key)
.rate_limits()
.merge(limits.scope(&scoping));
}
}
Err(error) => {
let mut envelope = self.envelope;
envelope.reject(Outcome::Invalid(DiscardReason::Internal));
relay_log::error!(
error = &error as &dyn Error,
tags.project_key = %envelope.scoping().project_key,
"error sending envelope"
);
}
}
})
}
}
#[derive(Debug)]
struct Partition<'a> {
max_size: usize,
remaining: usize,
views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
project_info: HashMap<ProjectKey, Scoping>,
}
impl<'a> Partition<'a> {
pub fn new(size: usize) -> Self {
Self {
max_size: size,
remaining: size,
views: HashMap::new(),
project_info: HashMap::new(),
}
}
pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
let (current, next) = bucket.split(self.remaining, Some(self.max_size));
if let Some(current) = current {
self.remaining = self.remaining.saturating_sub(current.estimated_size());
self.views
.entry(scoping.project_key)
.or_default()
.push(current);
self.project_info
.entry(scoping.project_key)
.or_insert(scoping);
}
next
}
fn is_empty(&self) -> bool {
self.views.is_empty()
}
fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
#[derive(serde::Serialize)]
struct Wrapper<'a> {
buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
}
let buckets = &self.views;
let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
let scopings = self.project_info.clone();
self.project_info.clear();
self.views.clear();
self.remaining = self.max_size;
(payload, scopings)
}
}
#[derive(Debug)]
struct SendMetricsRequest {
partition_key: String,
unencoded: Bytes,
encoded: Bytes,
project_info: HashMap<ProjectKey, Scoping>,
http_encoding: HttpEncoding,
metric_outcomes: MetricOutcomes,
}
impl SendMetricsRequest {
fn create_error_outcomes(self) {
#[derive(serde::Deserialize)]
struct Wrapper {
buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
}
let buckets = match serde_json::from_slice(&self.unencoded) {
Ok(Wrapper { buckets }) => buckets,
Err(err) => {
relay_log::error!(
error = &err as &dyn std::error::Error,
"failed to parse buckets from failed transmission"
);
return;
}
};
for (key, buckets) in buckets {
let Some(&scoping) = self.project_info.get(&key) else {
relay_log::error!("missing scoping for project key");
continue;
};
self.metric_outcomes.track(
scoping,
&buckets,
Outcome::Invalid(DiscardReason::Internal),
);
}
}
}
impl UpstreamRequest for SendMetricsRequest {
fn set_relay_id(&self) -> bool {
true
}
fn sign(&mut self) -> Option<Bytes> {
Some(self.unencoded.clone())
}
fn method(&self) -> reqwest::Method {
reqwest::Method::POST
}
fn path(&self) -> Cow<'_, str> {
"/api/0/relays/metrics/".into()
}
fn route(&self) -> &'static str {
"global_metrics"
}
fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
builder
.content_encoding(self.http_encoding)
.header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
.header(header::CONTENT_TYPE, b"application/json")
.body(self.encoded.clone());
Ok(())
}
fn respond(
self: Box<Self>,
result: Result<http::Response, UpstreamRequestError>,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
Box::pin(async {
match result {
Ok(mut response) => {
response.consume().await.ok();
}
Err(error) => {
relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
if error.is_received() {
return;
}
self.create_error_outcomes()
}
}
})
}
}
#[cfg(feature = "processing")]
#[derive(Copy, Clone)]
struct CombinedQuotas<'a> {
global_quotas: &'a [Quota],
project_quotas: &'a [Quota],
}
#[cfg(feature = "processing")]
impl<'a> CombinedQuotas<'a> {
pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
Self {
global_quotas: &global_config.quotas,
project_quotas,
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn len(&self) -> usize {
self.global_quotas.len() + self.project_quotas.len()
}
}
#[cfg(feature = "processing")]
impl<'a> IntoIterator for CombinedQuotas<'a> {
type Item = &'a Quota;
type IntoIter = Chain<Iter<'a, Quota>, Iter<'a, Quota>>;
fn into_iter(self) -> Self::IntoIter {
self.global_quotas.iter().chain(self.project_quotas.iter())
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::env;
use insta::assert_debug_snapshot;
use relay_base_schema::metrics::{DurationUnit, MetricUnit};
use relay_common::glob2::LazyGlob;
use relay_dynamic_config::ProjectConfig;
use relay_event_normalization::{RedactionRule, TransactionNameRule};
use relay_event_schema::protocol::TransactionSource;
use relay_pii::DataScrubbingConfig;
use similar_asserts::assert_eq;
use crate::metrics_extraction::transactions::types::{
CommonTags, TransactionMeasurementTags, TransactionMetric,
};
use crate::metrics_extraction::IntoMetric;
use crate::testutils::{self, create_test_processor, create_test_processor_with_addrs};
#[cfg(feature = "processing")]
use {
relay_metrics::BucketValue,
relay_quotas::{QuotaScope, ReasonCode},
relay_test::mock_service,
};
use super::*;
#[cfg(feature = "processing")]
fn mock_quota(id: &str) -> Quota {
Quota {
id: Some(id.into()),
categories: smallvec::smallvec![DataCategory::MetricBucket],
scope: QuotaScope::Organization,
scope_id: None,
limit: Some(0),
window: None,
reason_code: None,
namespace: None,
}
}
#[cfg(feature = "processing")]
#[test]
fn test_dynamic_quotas() {
let global_config = GlobalConfig {
quotas: vec![mock_quota("foo"), mock_quota("bar")],
..Default::default()
};
let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
assert_eq!(dynamic_quotas.len(), 4);
assert!(!dynamic_quotas.is_empty());
let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
}
#[cfg(feature = "processing")]
#[tokio::test]
async fn test_ratelimit_per_batch() {
use relay_base_schema::organization::OrganizationId;
let rate_limited_org = Scoping {
organization_id: OrganizationId::new(1),
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
key_id: Some(17),
};
let not_rate_limited_org = Scoping {
organization_id: OrganizationId::new(2),
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
key_id: Some(17),
};
let message = {
let project_info = {
let quota = Quota {
id: Some("testing".into()),
categories: vec![DataCategory::MetricBucket].into(),
scope: relay_quotas::QuotaScope::Organization,
scope_id: Some(rate_limited_org.organization_id.to_string()),
limit: Some(0),
window: None,
reason_code: Some(ReasonCode::new("test")),
namespace: None,
};
let mut config = ProjectConfig::default();
config.quotas.push(quota);
Arc::new(ProjectInfo {
config,
..Default::default()
})
};
let project_metrics = |scoping| ProjectBuckets {
buckets: vec![Bucket {
name: "d:transactions/bar".into(),
value: BucketValue::Counter(relay_metrics::FiniteF64::new(1.0).unwrap()),
timestamp: UnixTimestamp::now(),
tags: Default::default(),
width: 10,
metadata: BucketMetadata::default(),
}],
rate_limits: Default::default(),
project_info: project_info.clone(),
scoping,
};
let buckets = hashbrown::HashMap::from([
(
rate_limited_org.project_key,
project_metrics(rate_limited_org),
),
(
not_rate_limited_org.project_key,
project_metrics(not_rate_limited_org),
),
]);
FlushBuckets {
partition_key: 0,
buckets,
}
};
assert_eq!(message.buckets.keys().count(), 2);
let config = {
let config_json = serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": [],
"redis": {
"server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
}
}
});
Config::from_json_value(config_json).unwrap()
};
let (store, handle) = {
let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
let org_id = match msg {
Store::Metrics(x) => x.scoping.organization_id,
_ => panic!("received envelope when expecting only metrics"),
};
org_ids.push(org_id);
};
mock_service("store_forwarder", vec![], f)
};
let processor = create_test_processor(config).await;
assert!(processor.redis_rate_limiter_enabled());
processor.encode_metrics_processing(message, &store);
drop(store);
let orgs_not_ratelimited = handle.await.unwrap();
assert_eq!(
orgs_not_ratelimited,
vec![not_rate_limited_org.organization_id]
);
}
#[tokio::test]
async fn test_browser_version_extraction_with_pii_like_data() {
let processor = create_test_processor(Default::default()).await;
let (outcome_aggregator, test_store) = testutils::processor_services();
let event_id = EventId::new();
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(Some(event_id), request_meta);
envelope.add_item({
let mut item = Item::new(ItemType::Event);
item.set_payload(
ContentType::Json,
r#"
{
"request": {
"headers": [
["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
]
}
}
"#,
);
item
});
let mut datascrubbing_settings = DataScrubbingConfig::default();
datascrubbing_settings.scrub_data = true;
datascrubbing_settings.scrub_defaults = true;
datascrubbing_settings.scrub_ip_addresses = true;
let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
let config = ProjectConfig {
datascrubbing_settings,
pii_config: Some(pii_config),
..Default::default()
};
let project_info = ProjectInfo {
config,
..Default::default()
};
let mut envelopes = ProcessingGroup::split_envelope(*envelope);
assert_eq!(envelopes.len(), 1);
let (group, envelope) = envelopes.pop().unwrap();
let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store, group);
let message = ProcessEnvelope {
envelope,
project_info: Arc::new(project_info),
rate_limits: Default::default(),
sampling_project_info: None,
reservoir_counters: ReservoirCounters::default(),
};
let envelope_response = processor.process(message).unwrap();
let new_envelope = envelope_response.envelope.unwrap();
let new_envelope = new_envelope.envelope();
let event_item = new_envelope.items().last().unwrap();
let annotated_event: Annotated<Event> =
Annotated::from_json_bytes(&event_item.payload()).unwrap();
let event = annotated_event.into_value().unwrap();
let headers = event
.request
.into_value()
.unwrap()
.headers
.into_value()
.unwrap();
assert_eq!(Some("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"), headers.get_header("User-Agent"));
let contexts = event.contexts.into_value().unwrap();
let browser = contexts.0.get("browser").unwrap();
assert_eq!(
r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
browser.to_json().unwrap()
);
}
#[tokio::test]
#[cfg(feature = "processing")]
async fn test_materialize_dsc() {
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(None, request_meta);
let dsc = r#"{
"trace_id": "00000000-0000-0000-0000-000000000000",
"public_key": "e12d836b15bb49d7bbf99e64295d995b",
"sample_rate": "0.2"
}"#;
envelope.set_dsc(serde_json::from_str(dsc).unwrap());
let mut item = Item::new(ItemType::Event);
item.set_payload(ContentType::Json, r#"{}"#);
envelope.add_item(item);
let (outcome_aggregator, test_store) = testutils::processor_services();
let managed_envelope = ManagedEnvelope::new(
envelope,
outcome_aggregator,
test_store,
ProcessingGroup::Error,
);
let process_message = ProcessEnvelope {
envelope: managed_envelope,
project_info: Arc::new(ProjectInfo::default()),
rate_limits: Default::default(),
sampling_project_info: None,
reservoir_counters: ReservoirCounters::default(),
};
let config = Config::from_json_value(serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": [],
}
}))
.unwrap();
let processor = create_test_processor(config).await;
let response = processor.process(process_message).unwrap();
let envelope = response.envelope.as_ref().unwrap().envelope();
let event = envelope
.get_item_by(|item| item.ty() == &ItemType::Event)
.unwrap();
let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
Object(
{
"environment": ~,
"public_key": String(
"e12d836b15bb49d7bbf99e64295d995b",
),
"release": ~,
"replay_id": ~,
"sample_rate": String(
"0.2",
),
"trace_id": String(
"00000000-0000-0000-0000-000000000000",
),
"transaction": ~,
},
)
"###);
}
fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
let mut event = Annotated::<Event>::from_json(
r#"
{
"type": "transaction",
"transaction": "/foo/",
"timestamp": 946684810.0,
"start_timestamp": 946684800.0,
"contexts": {
"trace": {
"trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
"span_id": "fa90fdead5f74053",
"op": "http.server",
"type": "trace"
}
},
"transaction_info": {
"source": "url"
}
}
"#,
)
.unwrap();
let e = event.value_mut().as_mut().unwrap();
e.transaction.set_value(Some(transaction_name.into()));
e.transaction_info
.value_mut()
.as_mut()
.unwrap()
.source
.set_value(Some(source));
relay_statsd::with_capturing_test_client(|| {
utils::log_transaction_name_metrics(&mut event, |event| {
let config = NormalizationConfig {
transaction_name_config: TransactionNameConfig {
rules: &[TransactionNameRule {
pattern: LazyGlob::new("/foo/*/**".to_owned()),
expiry: DateTime::<Utc>::MAX_UTC,
redaction: RedactionRule::Replace {
substitution: "*".to_owned(),
},
}],
},
..Default::default()
};
normalize_event(event, &config)
});
})
}
#[test]
fn test_log_transaction_metrics_none() {
let captures = capture_test_event("/nothing", TransactionSource::Url);
insta::assert_debug_snapshot!(captures, @r#"
[
"event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
]
"#);
}
#[test]
fn test_log_transaction_metrics_rule() {
let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
insta::assert_debug_snapshot!(captures, @r#"
[
"event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
]
"#);
}
#[test]
fn test_log_transaction_metrics_pattern() {
let captures = capture_test_event("/something/12345", TransactionSource::Url);
insta::assert_debug_snapshot!(captures, @r#"
[
"event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
]
"#);
}
#[test]
fn test_log_transaction_metrics_both() {
let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
insta::assert_debug_snapshot!(captures, @r#"
[
"event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
]
"#);
}
#[test]
fn test_log_transaction_metrics_no_match() {
let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
insta::assert_debug_snapshot!(captures, @r#"
[
"event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
]
"#);
}
#[test]
fn test_mri_overhead_constant() {
let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
let derived_value = {
let name = "foobar".to_string();
let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
let tags = TransactionMeasurementTags {
measurement_rating: None,
universal_tags: CommonTags(BTreeMap::new()),
score_profile_version: None,
};
let measurement = TransactionMetric::Measurement {
name: name.clone(),
value,
unit,
tags,
};
let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
metric.name.len() - unit.to_string().len() - name.len()
};
assert_eq!(
hardcoded_value, derived_value,
"Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
);
}
#[tokio::test]
async fn test_process_metrics_bucket_metadata() {
let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let received_at = Utc::now();
let config = Config::default();
let (aggregator, mut aggregator_rx) = Addr::custom();
let processor = create_test_processor_with_addrs(
config,
Addrs {
aggregator,
..Default::default()
},
)
.await;
let mut item = Item::new(ItemType::Statsd);
item.set_payload(
ContentType::Text,
"transactions/foo:3182887624:4267882815|s",
);
for (source, expected_received_at) in [
(
BucketSource::External,
Some(UnixTimestamp::from_datetime(received_at).unwrap()),
),
(BucketSource::Internal, None),
] {
let message = ProcessMetrics {
data: MetricData::Raw(vec![item.clone()]),
project_key,
source,
received_at,
sent_at: Some(Utc::now()),
};
processor.handle_process_metrics(&mut token, message);
let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
let buckets = merge_buckets.buckets;
assert_eq!(buckets.len(), 1);
assert_eq!(buckets[0].metadata.received_at, expected_received_at);
}
}
#[tokio::test]
async fn test_process_batched_metrics() {
let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
let received_at = Utc::now();
let config = Config::default();
let (aggregator, mut aggregator_rx) = Addr::custom();
let processor = create_test_processor_with_addrs(
config,
Addrs {
aggregator,
..Default::default()
},
)
.await;
let payload = r#"{
"buckets": {
"11111111111111111111111111111111": [
{
"timestamp": 1615889440,
"width": 0,
"name": "d:custom/endpoint.response_time@millisecond",
"type": "d",
"value": [
68.0
],
"tags": {
"route": "user_index"
}
}
],
"22222222222222222222222222222222": [
{
"timestamp": 1615889440,
"width": 0,
"name": "d:custom/endpoint.cache_rate@none",
"type": "d",
"value": [
36.0
]
}
]
}
}
"#;
let message = ProcessBatchedMetrics {
payload: Bytes::from(payload),
source: BucketSource::Internal,
received_at,
sent_at: Some(Utc::now()),
};
processor.handle_process_batched_metrics(&mut token, message);
let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
let mut messages = vec![mb1, mb2];
messages.sort_by_key(|mb| mb.project_key);
let actual = messages
.into_iter()
.map(|mb| (mb.project_key, mb.buckets))
.collect::<Vec<_>>();
assert_debug_snapshot!(actual, @r###"
[
(
ProjectKey("11111111111111111111111111111111"),
[
Bucket {
timestamp: UnixTimestamp(1615889440),
width: 0,
name: MetricName(
"d:custom/endpoint.response_time@millisecond",
),
value: Distribution(
[
68.0,
],
),
tags: {
"route": "user_index",
},
metadata: BucketMetadata {
merges: 1,
received_at: None,
extracted_from_indexed: false,
},
},
],
),
(
ProjectKey("22222222222222222222222222222222"),
[
Bucket {
timestamp: UnixTimestamp(1615889440),
width: 0,
name: MetricName(
"d:custom/endpoint.cache_rate@none",
),
value: Distribution(
[
36.0,
],
),
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
extracted_from_indexed: false,
},
},
],
),
]
"###);
}
}