relay_server/metrics/
rate_limits.rs

1//! Quota and rate limiting helpers for metrics and metrics buckets.
2
3use itertools::Either;
4use relay_metrics::Bucket;
5use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
6
7use crate::metrics::outcomes::{BucketSummary, MetricOutcomes, TrackableBucket};
8use crate::services::outcome::Outcome;
9use crate::utils;
10
11/// Contains all data necessary to rate limit metrics or metrics buckets.
12#[derive(Debug)]
13pub struct MetricsLimiter<Q: AsRef<Vec<Quota>> = Vec<Quota>> {
14    /// A list of aggregated metric buckets with some counters.
15    buckets: Vec<SummarizedBucket>,
16
17    /// The quotas set on the current project.
18    quotas: Q,
19
20    /// Project information.
21    scoping: Scoping,
22
23    /// The number of performance items (transactions, spans, profiles) contributing to these metrics.
24    #[cfg(feature = "processing")]
25    counts: EntityCounts,
26}
27
28fn to_counts(summary: &BucketSummary) -> EntityCounts {
29    match *summary {
30        BucketSummary::Transactions(count) => EntityCounts {
31            transactions: Some(count),
32            spans: None,
33        },
34        BucketSummary::Spans(count) => EntityCounts {
35            transactions: None,
36            spans: Some(count),
37        },
38        BucketSummary::None => EntityCounts::default(),
39    }
40}
41
42#[derive(Debug)]
43struct SummarizedBucket {
44    bucket: Bucket,
45    summary: BucketSummary,
46}
47
48/// Contains the total counts of limitable entities represented by a batch of metrics.
49#[derive(Debug, Default, Clone)]
50struct EntityCounts {
51    /// The number of transactions represented in the current batch.
52    ///
53    /// - `None` if the batch does not contain any transaction metrics.
54    /// - `Some(0)` if the batch contains transaction metrics, but no `usage` count.
55    /// - `Some(n > 0)` if the batch contains a `usage` count for transactions.
56    ///
57    /// The distinction between `None` and `Some(0)` is needed to decide whether or not a rate limit
58    /// must be checked.
59    transactions: Option<usize>,
60    /// The number of spans represented in the current batch.
61    ///
62    /// - `None` if the batch does not contain any transaction metrics.
63    /// - `Some(0)` if the batch contains transaction metrics, but no `usage` count.
64    /// - `Some(n > 0)` if the batch contains a `usage` count for spans.
65    ///
66    /// The distinction between `None` and `Some(0)` is needed to decide whether or not a rate limit
67    /// must be checked.
68    spans: Option<usize>,
69}
70
71impl std::ops::Add for EntityCounts {
72    type Output = Self;
73
74    fn add(self, rhs: EntityCounts) -> Self::Output {
75        Self {
76            transactions: add_some(self.transactions, rhs.transactions),
77            spans: add_some(self.spans, rhs.spans),
78        }
79    }
80}
81
82fn add_some<T>(a: Option<T>, b: Option<T>) -> Option<T>
83where
84    T: std::ops::Add<Output = T>,
85{
86    match (a, b) {
87        (None, None) => None,
88        (None, Some(c)) | (Some(c), None) => Some(c),
89        (Some(a), Some(b)) => Some(a + b),
90    }
91}
92
93impl<Q: AsRef<Vec<Quota>>> MetricsLimiter<Q> {
94    /// Create a new limiter instance.
95    ///
96    /// Returns Ok if `metrics` contain relevant metrics, `metrics` otherwise.
97    pub fn create(
98        buckets: impl IntoIterator<Item = Bucket>,
99        quotas: Q,
100        scoping: Scoping,
101    ) -> Result<Self, Vec<Bucket>> {
102        let buckets: Vec<_> = buckets
103            .into_iter()
104            .map(|bucket| {
105                // Sampled buckets are not rate limited, because the sample has already been rate limited.
106                let summary = match bucket.metadata.extracted_from_indexed {
107                    false => bucket.summary(),
108                    true => Default::default(),
109                };
110                SummarizedBucket { bucket, summary }
111            })
112            .collect();
113
114        // Accumulate the total counts
115        let total_counts = buckets
116            .iter()
117            .map(|b| to_counts(&b.summary))
118            .reduce(|a, b| a + b);
119        if let Some(_counts) = total_counts {
120            Ok(Self {
121                buckets,
122                quotas,
123                scoping,
124                #[cfg(feature = "processing")]
125                counts: _counts,
126            })
127        } else {
128            Err(buckets.into_iter().map(|s| s.bucket).collect())
129        }
130    }
131
132    /// Returns a reference to the scoping information.
133    #[cfg(feature = "processing")]
134    pub fn scoping(&self) -> &Scoping {
135        &self.scoping
136    }
137
138    /// Returns a reference to the list of quotas.
139    #[cfg(feature = "processing")]
140    pub fn quotas(&self) -> &[Quota] {
141        self.quotas.as_ref()
142    }
143
144    /// Counts the number of transactions/spans represented in this batch.
145    ///
146    /// Returns
147    /// - `None` if the batch does not contain metrics related to the data category.
148    /// - `Some(0)` if the batch contains metrics related to the data category, but no `usage` count.
149    /// - `Some(n > 0)` if the batch contains a `usage` count for the given data category.
150    ///
151    /// The distinction between `None` and `Some(0)` is needed to decide whether or not a rate limit
152    /// must be checked.
153    #[cfg(feature = "processing")]
154    pub fn count(&self, category: DataCategory) -> Option<usize> {
155        match category {
156            DataCategory::Transaction => self.counts.transactions,
157            DataCategory::Span => self.counts.spans,
158            _ => None,
159        }
160    }
161
162    fn drop_with_outcome(
163        &mut self,
164        category: DataCategory,
165        outcome: Outcome,
166        metric_outcomes: &MetricOutcomes,
167    ) {
168        let buckets = std::mem::take(&mut self.buckets);
169        let (buckets, dropped) = utils::split_off_map(buckets, |b| match b.summary {
170            BucketSummary::Transactions { .. } if category == DataCategory::Transaction => {
171                Either::Right(b.bucket)
172            }
173            BucketSummary::Spans(_) if category == DataCategory::Span => Either::Right(b.bucket),
174            _ => Either::Left(b),
175        });
176        self.buckets = buckets;
177
178        metric_outcomes.track(self.scoping, &dropped, outcome);
179    }
180
181    // Drop transaction-related metrics and create outcomes for any active rate limits.
182    //
183    // Returns true if any metrics were dropped.
184    pub fn enforce_limits(
185        &mut self,
186        rate_limits: &RateLimits,
187        metric_outcomes: &MetricOutcomes,
188    ) -> bool {
189        for category in [DataCategory::Transaction, DataCategory::Span] {
190            let active_rate_limits =
191                rate_limits.check_with_quotas(self.quotas.as_ref(), self.scoping.item(category));
192
193            // If a rate limit is active, discard relevant buckets.
194            if let Some(limit) = active_rate_limits.longest() {
195                self.drop_with_outcome(
196                    category,
197                    Outcome::RateLimited(limit.reason_code.clone()),
198                    metric_outcomes,
199                );
200
201                return true;
202            }
203        }
204
205        false
206    }
207
208    /// Consume this struct and return the contained metrics.
209    pub fn into_buckets(self) -> Vec<Bucket> {
210        self.buckets.into_iter().map(|s| s.bucket).collect()
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use relay_base_schema::organization::OrganizationId;
217    use relay_base_schema::project::{ProjectId, ProjectKey};
218    use relay_metrics::{BucketMetadata, BucketValue, UnixTimestamp};
219    use relay_quotas::QuotaScope;
220    use relay_system::Addr;
221    use smallvec::smallvec;
222
223    use crate::metrics::MetricStats;
224
225    use super::*;
226
227    fn deny(category: DataCategory) -> Vec<Quota> {
228        vec![Quota {
229            id: None,
230            categories: smallvec![category],
231            scope: QuotaScope::Organization,
232            scope_id: None,
233            limit: Some(0),
234            window: None,
235            reason_code: None,
236            namespace: None,
237        }]
238    }
239
240    /// Applies rate limits and returns the remaining buckets and generated outcomes.
241    fn run_limiter(
242        metrics: Vec<Bucket>,
243        quotas: Vec<Quota>,
244    ) -> (Vec<Bucket>, Vec<(Outcome, DataCategory, u32)>) {
245        let (outcome_sink, mut rx) = Addr::custom();
246        let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_sink.clone());
247
248        let mut limiter = MetricsLimiter::create(
249            metrics,
250            quotas,
251            Scoping {
252                organization_id: OrganizationId::new(1),
253                project_id: ProjectId::new(1),
254                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
255                key_id: None,
256            },
257        )
258        .unwrap();
259
260        limiter.enforce_limits(&RateLimits::new(), &metric_outcomes);
261        let metrics = limiter.into_buckets();
262
263        rx.close();
264
265        let outcomes: Vec<_> = (0..)
266            .map(|_| rx.blocking_recv())
267            .take_while(|o| o.is_some())
268            .flatten()
269            .filter(|o| o.category != DataCategory::MetricBucket)
270            .map(|o| (o.outcome, o.category, o.quantity))
271            .collect();
272
273        (metrics, outcomes)
274    }
275
276    /// A few different bucket types
277    fn mixed_bag() -> Vec<Bucket> {
278        vec![
279            Bucket {
280                timestamp: UnixTimestamp::now(),
281                width: 0,
282                name: "c:transactions/usage@none".into(),
283                tags: Default::default(),
284                value: BucketValue::counter(12.into()),
285                metadata: BucketMetadata::default(),
286            },
287            Bucket {
288                timestamp: UnixTimestamp::now(),
289                width: 0,
290                name: "c:spans/usage@none".into(),
291                tags: Default::default(),
292                value: BucketValue::counter(34.into()),
293                metadata: BucketMetadata::default(),
294            },
295            Bucket {
296                timestamp: UnixTimestamp::now(),
297                width: 0,
298                name: "c:spans/usage@none".into(),
299                tags: Default::default(),
300                value: BucketValue::counter(56.into()),
301                metadata: BucketMetadata::default(),
302            },
303            Bucket {
304                timestamp: UnixTimestamp::now(),
305                width: 0,
306                name: "d:spans/exclusive_time@millisecond".into(),
307                tags: Default::default(),
308                value: BucketValue::distribution(78.into()),
309                metadata: BucketMetadata::default(),
310            },
311            Bucket {
312                timestamp: UnixTimestamp::now(),
313                width: 0,
314                name: "d:custom/something@millisecond".into(),
315                tags: Default::default(),
316                value: BucketValue::distribution(78.into()),
317                metadata: BucketMetadata::default(),
318            },
319        ]
320    }
321
322    #[test]
323    fn mixed_with_span_quota() {
324        let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Span));
325
326        assert_eq!(metrics.len(), 2);
327        assert_eq!(&*metrics[0].name, "c:transactions/usage@none");
328        assert_eq!(&*metrics[1].name, "d:custom/something@millisecond");
329
330        assert_eq!(
331            outcomes,
332            vec![(Outcome::RateLimited(None), DataCategory::Span, 90)]
333        );
334    }
335
336    #[test]
337    fn mixed_with_transaction_quota() {
338        let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Transaction));
339
340        assert_eq!(metrics.len(), 4);
341        assert_eq!(&*metrics[0].name, "c:spans/usage@none");
342        assert_eq!(&*metrics[1].name, "c:spans/usage@none");
343        assert_eq!(&*metrics[2].name, "d:spans/exclusive_time@millisecond");
344        assert_eq!(&*metrics[3].name, "d:custom/something@millisecond");
345
346        assert_eq!(
347            outcomes,
348            vec![(Outcome::RateLimited(None), DataCategory::Transaction, 12)]
349        );
350    }
351}