Skip to main content

relay_server/metrics/
rate_limits.rs

1//! Quota and rate limiting helpers for metrics and metrics buckets.
2
3use itertools::Either;
4use relay_metrics::Bucket;
5use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
6
7use crate::metrics::outcomes::{BucketSummary, MetricOutcomes, TrackableBucket};
8use crate::services::outcome::Outcome;
9use crate::utils;
10
11/// Contains all data necessary to rate limit metrics or metrics buckets.
12#[derive(Debug)]
13pub struct MetricsLimiter<Q: AsRef<Vec<Quota>> = Vec<Quota>> {
14    /// A list of aggregated metric buckets with some counters.
15    buckets: Vec<SummarizedBucket>,
16
17    /// The quotas set on the current project.
18    quotas: Q,
19
20    /// Project information.
21    scoping: Scoping,
22
23    /// The number of performance items (spans) contributing to these metrics.
24    #[cfg(feature = "processing")]
25    counts: EntityCounts,
26}
27
28fn to_counts(summary: &BucketSummary) -> EntityCounts {
29    match *summary {
30        BucketSummary::Spans { count, .. } => EntityCounts { spans: Some(count) },
31        BucketSummary::None => EntityCounts::default(),
32    }
33}
34
35#[derive(Debug)]
36struct SummarizedBucket {
37    bucket: Bucket,
38    summary: BucketSummary,
39}
40
41/// Contains the total counts of limitable entities represented by a batch of metrics.
42#[derive(Debug, Default, Clone)]
43struct EntityCounts {
44    /// The number of spans represented in the current batch.
45    ///
46    /// - `None` if the batch does not contain any span metrics.
47    /// - `Some(0)` if the batch contains span metrics, but no `usage` count.
48    /// - `Some(n > 0)` if the batch contains a `usage` count for spans.
49    ///
50    /// The distinction between `None` and `Some(0)` is needed to decide whether or not a rate limit
51    /// must be checked.
52    spans: Option<usize>,
53}
54
55impl std::ops::Add for EntityCounts {
56    type Output = Self;
57
58    fn add(self, rhs: EntityCounts) -> Self::Output {
59        Self {
60            spans: add_some(self.spans, rhs.spans),
61        }
62    }
63}
64
65fn add_some<T>(a: Option<T>, b: Option<T>) -> Option<T>
66where
67    T: std::ops::Add<Output = T>,
68{
69    match (a, b) {
70        (None, None) => None,
71        (None, Some(c)) | (Some(c), None) => Some(c),
72        (Some(a), Some(b)) => Some(a + b),
73    }
74}
75
76impl<Q: AsRef<Vec<Quota>>> MetricsLimiter<Q> {
77    /// Create a new limiter instance.
78    ///
79    /// Returns Ok if `metrics` contain relevant metrics, `metrics` otherwise.
80    pub fn create(
81        buckets: impl IntoIterator<Item = Bucket>,
82        quotas: Q,
83        scoping: Scoping,
84    ) -> Result<Self, Vec<Bucket>> {
85        let buckets: Vec<_> = buckets
86            .into_iter()
87            .map(|bucket| {
88                // Sampled buckets are not rate limited, because the sample has already been rate limited.
89                let summary = match bucket.metadata.extracted_from_indexed {
90                    false => bucket.summary(),
91                    true => Default::default(),
92                };
93                SummarizedBucket { bucket, summary }
94            })
95            .collect();
96
97        // Accumulate the total counts
98        let total_counts = buckets
99            .iter()
100            .map(|b| to_counts(&b.summary))
101            .reduce(|a, b| a + b);
102        if let Some(_counts) = total_counts {
103            Ok(Self {
104                buckets,
105                quotas,
106                scoping,
107                #[cfg(feature = "processing")]
108                counts: _counts,
109            })
110        } else {
111            Err(buckets.into_iter().map(|s| s.bucket).collect())
112        }
113    }
114
115    /// Returns a reference to the scoping information.
116    #[cfg(feature = "processing")]
117    pub fn scoping(&self) -> &Scoping {
118        &self.scoping
119    }
120
121    /// Returns a reference to the list of quotas.
122    #[cfg(feature = "processing")]
123    pub fn quotas(&self) -> &[Quota] {
124        self.quotas.as_ref()
125    }
126
127    /// Counts the number of spans represented in this batch.
128    ///
129    /// Returns
130    /// - `None` if the batch does not contain metrics related to the data category.
131    /// - `Some(0)` if the batch contains metrics related to the data category, but no `usage` count.
132    /// - `Some(n > 0)` if the batch contains a `usage` count for the given data category.
133    ///
134    /// The distinction between `None` and `Some(0)` is needed to decide whether or not a rate limit
135    /// must be checked.
136    #[cfg(feature = "processing")]
137    pub fn count(&self) -> (DataCategory, Option<usize>) {
138        (DataCategory::Span, self.counts.spans)
139    }
140
141    fn drop_with_outcome(
142        &mut self,
143        category: DataCategory,
144        outcome: Outcome,
145        metric_outcomes: &MetricOutcomes,
146    ) {
147        let buckets = std::mem::take(&mut self.buckets);
148        let (buckets, dropped) = utils::split_off_map(buckets, |b| match b.summary {
149            BucketSummary::Spans { .. } if category == DataCategory::Span => {
150                Either::Right(b.bucket)
151            }
152            _ => Either::Left(b),
153        });
154        self.buckets = buckets;
155
156        metric_outcomes.track(self.scoping, &dropped, outcome);
157    }
158
159    // Drop span-related metrics and create outcomes for any active rate limits.
160    //
161    // Returns true if any metrics were dropped.
162    pub fn enforce_limits(
163        &mut self,
164        rate_limits: &RateLimits,
165        metric_outcomes: &MetricOutcomes,
166    ) -> bool {
167        let active_rate_limits = rate_limits
168            .check_with_quotas(self.quotas.as_ref(), self.scoping.item(DataCategory::Span));
169
170        // If a rate limit is active, discard relevant buckets.
171        if let Some(limit) = active_rate_limits.longest() {
172            self.drop_with_outcome(
173                DataCategory::Span,
174                Outcome::RateLimited(limit.reason_code.clone()),
175                metric_outcomes,
176            );
177
178            return true;
179        }
180
181        false
182    }
183
184    /// Consume this struct and return the contained metrics.
185    pub fn into_buckets(self) -> Vec<Bucket> {
186        self.buckets.into_iter().map(|s| s.bucket).collect()
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use relay_base_schema::organization::OrganizationId;
193    use relay_base_schema::project::{ProjectId, ProjectKey};
194    use relay_metrics::{BucketMetadata, BucketValue, UnixTimestamp};
195    use relay_quotas::QuotaScope;
196    use relay_system::Addr;
197
198    use super::*;
199
200    fn deny(category: DataCategory) -> Vec<Quota> {
201        vec![Quota {
202            id: None,
203            categories: [category].into(),
204            scope: QuotaScope::Organization,
205            scope_id: None,
206            limit: Some(0),
207            window: None,
208            reason_code: None,
209            namespace: None,
210        }]
211    }
212
213    /// Applies rate limits and returns the remaining buckets and generated outcomes.
214    fn run_limiter(
215        metrics: Vec<Bucket>,
216        quotas: Vec<Quota>,
217    ) -> (Vec<Bucket>, Vec<(Outcome, DataCategory, u32)>) {
218        let (outcome_sink, mut rx) = Addr::custom();
219        let metric_outcomes = MetricOutcomes::new(outcome_sink.clone());
220
221        let mut limiter = MetricsLimiter::create(
222            metrics,
223            quotas,
224            Scoping {
225                organization_id: OrganizationId::new(1),
226                project_id: ProjectId::new(1),
227                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
228                key_id: None,
229            },
230        )
231        .unwrap();
232
233        limiter.enforce_limits(&RateLimits::new(), &metric_outcomes);
234        let metrics = limiter.into_buckets();
235
236        rx.close();
237
238        let outcomes: Vec<_> = (0..)
239            .map(|_| rx.blocking_recv())
240            .take_while(|o| o.is_some())
241            .flatten()
242            .filter(|o| o.category != DataCategory::MetricBucket)
243            .map(|o| (o.outcome, o.category, o.quantity))
244            .collect();
245
246        (metrics, outcomes)
247    }
248
249    /// A few different bucket types
250    fn mixed_bag() -> Vec<Bucket> {
251        vec![
252            Bucket {
253                timestamp: UnixTimestamp::now(),
254                width: 0,
255                name: "c:spans/usage@none".into(),
256                tags: Default::default(),
257                value: BucketValue::counter(34.into()),
258                metadata: BucketMetadata::default(),
259            },
260            Bucket {
261                timestamp: UnixTimestamp::now(),
262                width: 0,
263                name: "c:spans/usage@none".into(),
264                tags: Default::default(),
265                value: BucketValue::counter(56.into()),
266                metadata: BucketMetadata::default(),
267            },
268            Bucket {
269                timestamp: UnixTimestamp::now(),
270                width: 0,
271                name: "d:spans/exclusive_time@millisecond".into(),
272                tags: Default::default(),
273                value: BucketValue::distribution(78.into()),
274                metadata: BucketMetadata::default(),
275            },
276            Bucket {
277                timestamp: UnixTimestamp::now(),
278                width: 0,
279                name: "d:custom/something@millisecond".into(),
280                tags: Default::default(),
281                value: BucketValue::distribution(78.into()),
282                metadata: BucketMetadata::default(),
283            },
284        ]
285    }
286
287    #[test]
288    fn mixed_with_span_quota() {
289        let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Span));
290
291        assert_eq!(metrics.len(), 1);
292        assert_eq!(&*metrics[0].name, "d:custom/something@millisecond");
293
294        assert_eq!(
295            outcomes,
296            vec![(Outcome::RateLimited(None), DataCategory::Span, 90)]
297        );
298    }
299}