relay_server/metrics/outcomes.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
use chrono::Utc;
use relay_metrics::{
Bucket, BucketMetadata, BucketView, BucketViewValue, MetricName, MetricNamespace,
MetricResourceIdentifier, MetricType,
};
use relay_quotas::{DataCategory, Scoping};
use relay_system::Addr;
use crate::envelope::SourceQuantities;
use crate::metrics::MetricStats;
use crate::services::outcome::{Outcome, TrackOutcome};
#[cfg(feature = "processing")]
use relay_cardinality::{CardinalityLimit, CardinalityReport};
/// [`MetricOutcomes`] takes care of creating the right outcomes for metrics at the end of their
/// lifecycle.
///
/// It is aware of surrogate metrics like transaction- and span-duration as well as pure metrics
/// like custom.
#[derive(Debug, Clone)]
pub struct MetricOutcomes {
metric_stats: MetricStats,
outcomes: Addr<TrackOutcome>,
}
impl MetricOutcomes {
/// Creates a new [`MetricOutcomes`].
pub fn new(metric_stats: MetricStats, outcomes: Addr<TrackOutcome>) -> Self {
Self {
metric_stats,
outcomes,
}
}
/// Tracks an outcome for a list of buckets and generates the necessary outcomes.
pub fn track(&self, scoping: Scoping, buckets: &[impl TrackableBucket], outcome: Outcome) {
let timestamp = Utc::now();
// Never emit accepted outcomes for surrogate metrics.
// These are handled from within Sentry.
if !matches!(outcome, Outcome::Accepted) {
let SourceQuantities {
transactions,
spans,
profiles,
buckets,
} = extract_quantities(buckets);
let categories = [
(DataCategory::Transaction, transactions as u32),
(DataCategory::Span, spans as u32),
(DataCategory::Profile, profiles as u32),
(DataCategory::MetricBucket, buckets as u32),
];
for (category, quantity) in categories {
if quantity > 0 {
self.outcomes.send(TrackOutcome {
timestamp,
scoping,
outcome: outcome.clone(),
event_id: None,
remote_addr: None,
category,
quantity,
});
}
}
}
// When rejecting metrics, we need to make sure that the number of merges is correctly handled
// for buckets views, since if we have a bucket which has 5 merges, and it's split into 2
// bucket views, we will emit the volume of the rejection as 5 + 5 merges since we still read
// the underlying metadata for each view, and it points to the same bucket reference.
// Possible solutions to this problem include emitting the merges only if the bucket view is
// the first of view or distributing uniformly the metadata between split views.
for bucket in buckets {
relay_log::trace!("{:<50} -> {outcome}", bucket.name());
self.metric_stats.track_metric(scoping, bucket, &outcome)
}
}
/// Tracks the cardinality of a metric.
#[cfg(feature = "processing")]
pub fn cardinality(
&self,
scoping: Scoping,
limit: &CardinalityLimit,
report: &CardinalityReport,
) {
self.metric_stats.track_cardinality(scoping, limit, report)
}
}
/// The return value of [`TrackableBucket::summary`].
///
/// Contains the count of total transactions or spans that went into this bucket.
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum BucketSummary {
Transactions(usize),
Spans(usize),
#[default]
None,
}
/// Minimum information required to track outcomes for a metric bucket.
pub trait TrackableBucket {
/// Full mri of the bucket.
fn name(&self) -> &MetricName;
/// Type of the metric bucket.
fn ty(&self) -> MetricType;
/// Extracts quota information from the metric bucket.
///
/// If the metric was extracted from one or more transactions or spans, it returns the amount
/// of datapoints contained in the bucket.
///
/// Additionally tracks whether the transactions also contained profiling information.
fn summary(&self) -> BucketSummary;
/// Metric bucket metadata.
fn metadata(&self) -> BucketMetadata;
}
impl<T: TrackableBucket> TrackableBucket for &T {
fn name(&self) -> &MetricName {
(**self).name()
}
fn ty(&self) -> MetricType {
(**self).ty()
}
fn summary(&self) -> BucketSummary {
(**self).summary()
}
fn metadata(&self) -> BucketMetadata {
(**self).metadata()
}
}
impl TrackableBucket for Bucket {
fn name(&self) -> &MetricName {
&self.name
}
fn ty(&self) -> MetricType {
self.value.ty()
}
fn summary(&self) -> BucketSummary {
BucketView::new(self).summary()
}
fn metadata(&self) -> BucketMetadata {
self.metadata
}
}
impl TrackableBucket for BucketView<'_> {
fn name(&self) -> &MetricName {
self.name()
}
fn ty(&self) -> MetricType {
self.ty()
}
fn summary(&self) -> BucketSummary {
let mri = match MetricResourceIdentifier::parse(self.name()) {
Ok(mri) => mri,
Err(_) => return BucketSummary::default(),
};
match mri.namespace {
MetricNamespace::Transactions => {
let count = match self.value() {
BucketViewValue::Counter(c) if mri.name == "usage" => c.to_f64() as usize,
_ => 0,
};
BucketSummary::Transactions(count)
}
MetricNamespace::Spans => BucketSummary::Spans(match self.value() {
BucketViewValue::Counter(c) if mri.name == "usage" => c.to_f64() as usize,
_ => 0,
}),
_ => {
// Nothing to count
BucketSummary::default()
}
}
}
fn metadata(&self) -> BucketMetadata {
self.metadata()
}
}
/// Extracts quota information from a list of metric buckets.
pub fn extract_quantities<I, T>(buckets: I) -> SourceQuantities
where
I: IntoIterator<Item = T>,
T: TrackableBucket,
{
let mut quantities = SourceQuantities::default();
for bucket in buckets {
quantities.buckets += 1;
let summary = bucket.summary();
match summary {
BucketSummary::Transactions(count) => {
quantities.transactions += count;
}
BucketSummary::Spans(count) => quantities.spans += count,
BucketSummary::None => continue,
};
}
quantities
}