1use itertools::Either;
4use relay_metrics::Bucket;
5use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
6
7use crate::metrics::outcomes::{BucketSummary, MetricOutcomes, TrackableBucket};
8use crate::services::outcome::Outcome;
9use crate::utils;
10
11#[derive(Debug)]
13pub struct MetricsLimiter<Q: AsRef<Vec<Quota>> = Vec<Quota>> {
14 buckets: Vec<SummarizedBucket>,
16
17 quotas: Q,
19
20 scoping: Scoping,
22
23 #[cfg(feature = "processing")]
25 counts: EntityCounts,
26}
27
28fn to_counts(summary: &BucketSummary) -> EntityCounts {
29 match *summary {
30 BucketSummary::Transactions(count) => EntityCounts {
31 transactions: Some(count),
32 spans: None,
33 },
34 BucketSummary::Spans(count) => EntityCounts {
35 transactions: None,
36 spans: Some(count),
37 },
38 BucketSummary::None => EntityCounts::default(),
39 }
40}
41
42#[derive(Debug)]
43struct SummarizedBucket {
44 bucket: Bucket,
45 summary: BucketSummary,
46}
47
48#[derive(Debug, Default, Clone)]
50struct EntityCounts {
51 transactions: Option<usize>,
60 spans: Option<usize>,
69}
70
71impl std::ops::Add for EntityCounts {
72 type Output = Self;
73
74 fn add(self, rhs: EntityCounts) -> Self::Output {
75 Self {
76 transactions: add_some(self.transactions, rhs.transactions),
77 spans: add_some(self.spans, rhs.spans),
78 }
79 }
80}
81
82fn add_some<T>(a: Option<T>, b: Option<T>) -> Option<T>
83where
84 T: std::ops::Add<Output = T>,
85{
86 match (a, b) {
87 (None, None) => None,
88 (None, Some(c)) | (Some(c), None) => Some(c),
89 (Some(a), Some(b)) => Some(a + b),
90 }
91}
92
93impl<Q: AsRef<Vec<Quota>>> MetricsLimiter<Q> {
94 pub fn create(
98 buckets: impl IntoIterator<Item = Bucket>,
99 quotas: Q,
100 scoping: Scoping,
101 ) -> Result<Self, Vec<Bucket>> {
102 let buckets: Vec<_> = buckets
103 .into_iter()
104 .map(|bucket| {
105 let summary = match bucket.metadata.extracted_from_indexed {
107 false => bucket.summary(),
108 true => Default::default(),
109 };
110 SummarizedBucket { bucket, summary }
111 })
112 .collect();
113
114 let total_counts = buckets
116 .iter()
117 .map(|b| to_counts(&b.summary))
118 .reduce(|a, b| a + b);
119 if let Some(_counts) = total_counts {
120 Ok(Self {
121 buckets,
122 quotas,
123 scoping,
124 #[cfg(feature = "processing")]
125 counts: _counts,
126 })
127 } else {
128 Err(buckets.into_iter().map(|s| s.bucket).collect())
129 }
130 }
131
132 #[cfg(feature = "processing")]
134 pub fn scoping(&self) -> &Scoping {
135 &self.scoping
136 }
137
138 #[cfg(feature = "processing")]
140 pub fn quotas(&self) -> &[Quota] {
141 self.quotas.as_ref()
142 }
143
144 #[cfg(feature = "processing")]
154 pub fn count(&self, category: DataCategory) -> Option<usize> {
155 match category {
156 DataCategory::Transaction => self.counts.transactions,
157 DataCategory::Span => self.counts.spans,
158 _ => None,
159 }
160 }
161
162 fn drop_with_outcome(
163 &mut self,
164 category: DataCategory,
165 outcome: Outcome,
166 metric_outcomes: &MetricOutcomes,
167 ) {
168 let buckets = std::mem::take(&mut self.buckets);
169 let (buckets, dropped) = utils::split_off_map(buckets, |b| match b.summary {
170 BucketSummary::Transactions { .. } if category == DataCategory::Transaction => {
171 Either::Right(b.bucket)
172 }
173 BucketSummary::Spans(_) if category == DataCategory::Span => Either::Right(b.bucket),
174 _ => Either::Left(b),
175 });
176 self.buckets = buckets;
177
178 metric_outcomes.track(self.scoping, &dropped, outcome);
179 }
180
181 pub fn enforce_limits(
185 &mut self,
186 rate_limits: &RateLimits,
187 metric_outcomes: &MetricOutcomes,
188 ) -> bool {
189 for category in [DataCategory::Transaction, DataCategory::Span] {
190 let active_rate_limits =
191 rate_limits.check_with_quotas(self.quotas.as_ref(), self.scoping.item(category));
192
193 if let Some(limit) = active_rate_limits.longest() {
195 self.drop_with_outcome(
196 category,
197 Outcome::RateLimited(limit.reason_code.clone()),
198 metric_outcomes,
199 );
200
201 return true;
202 }
203 }
204
205 false
206 }
207
208 pub fn into_buckets(self) -> Vec<Bucket> {
210 self.buckets.into_iter().map(|s| s.bucket).collect()
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use relay_base_schema::organization::OrganizationId;
217 use relay_base_schema::project::{ProjectId, ProjectKey};
218 use relay_metrics::{BucketMetadata, BucketValue, UnixTimestamp};
219 use relay_quotas::QuotaScope;
220 use relay_system::Addr;
221 use smallvec::smallvec;
222
223 use crate::metrics::MetricStats;
224
225 use super::*;
226
227 fn deny(category: DataCategory) -> Vec<Quota> {
228 vec![Quota {
229 id: None,
230 categories: smallvec![category],
231 scope: QuotaScope::Organization,
232 scope_id: None,
233 limit: Some(0),
234 window: None,
235 reason_code: None,
236 namespace: None,
237 }]
238 }
239
240 fn run_limiter(
242 metrics: Vec<Bucket>,
243 quotas: Vec<Quota>,
244 ) -> (Vec<Bucket>, Vec<(Outcome, DataCategory, u32)>) {
245 let (outcome_sink, mut rx) = Addr::custom();
246 let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_sink.clone());
247
248 let mut limiter = MetricsLimiter::create(
249 metrics,
250 quotas,
251 Scoping {
252 organization_id: OrganizationId::new(1),
253 project_id: ProjectId::new(1),
254 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
255 key_id: None,
256 },
257 )
258 .unwrap();
259
260 limiter.enforce_limits(&RateLimits::new(), &metric_outcomes);
261 let metrics = limiter.into_buckets();
262
263 rx.close();
264
265 let outcomes: Vec<_> = (0..)
266 .map(|_| rx.blocking_recv())
267 .take_while(|o| o.is_some())
268 .flatten()
269 .filter(|o| o.category != DataCategory::MetricBucket)
270 .map(|o| (o.outcome, o.category, o.quantity))
271 .collect();
272
273 (metrics, outcomes)
274 }
275
276 fn mixed_bag() -> Vec<Bucket> {
278 vec![
279 Bucket {
280 timestamp: UnixTimestamp::now(),
281 width: 0,
282 name: "c:transactions/usage@none".into(),
283 tags: Default::default(),
284 value: BucketValue::counter(12.into()),
285 metadata: BucketMetadata::default(),
286 },
287 Bucket {
288 timestamp: UnixTimestamp::now(),
289 width: 0,
290 name: "c:spans/usage@none".into(),
291 tags: Default::default(),
292 value: BucketValue::counter(34.into()),
293 metadata: BucketMetadata::default(),
294 },
295 Bucket {
296 timestamp: UnixTimestamp::now(),
297 width: 0,
298 name: "c:spans/usage@none".into(),
299 tags: Default::default(),
300 value: BucketValue::counter(56.into()),
301 metadata: BucketMetadata::default(),
302 },
303 Bucket {
304 timestamp: UnixTimestamp::now(),
305 width: 0,
306 name: "d:spans/exclusive_time@millisecond".into(),
307 tags: Default::default(),
308 value: BucketValue::distribution(78.into()),
309 metadata: BucketMetadata::default(),
310 },
311 Bucket {
312 timestamp: UnixTimestamp::now(),
313 width: 0,
314 name: "d:custom/something@millisecond".into(),
315 tags: Default::default(),
316 value: BucketValue::distribution(78.into()),
317 metadata: BucketMetadata::default(),
318 },
319 ]
320 }
321
322 #[test]
323 fn mixed_with_span_quota() {
324 let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Span));
325
326 assert_eq!(metrics.len(), 2);
327 assert_eq!(&*metrics[0].name, "c:transactions/usage@none");
328 assert_eq!(&*metrics[1].name, "d:custom/something@millisecond");
329
330 assert_eq!(
331 outcomes,
332 vec![(Outcome::RateLimited(None), DataCategory::Span, 90)]
333 );
334 }
335
336 #[test]
337 fn mixed_with_transaction_quota() {
338 let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Transaction));
339
340 assert_eq!(metrics.len(), 4);
341 assert_eq!(&*metrics[0].name, "c:spans/usage@none");
342 assert_eq!(&*metrics[1].name, "c:spans/usage@none");
343 assert_eq!(&*metrics[2].name, "d:spans/exclusive_time@millisecond");
344 assert_eq!(&*metrics[3].name, "d:custom/something@millisecond");
345
346 assert_eq!(
347 outcomes,
348 vec![(Outcome::RateLimited(None), DataCategory::Transaction, 12)]
349 );
350 }
351}