1use itertools::Either;
4use relay_metrics::Bucket;
5use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
6
7use crate::metrics::outcomes::{BucketSummary, MetricOutcomes, TrackableBucket};
8use crate::services::outcome::Outcome;
9use crate::utils;
10
11#[derive(Debug)]
13pub struct MetricsLimiter<Q: AsRef<Vec<Quota>> = Vec<Quota>> {
14 buckets: Vec<SummarizedBucket>,
16
17 quotas: Q,
19
20 scoping: Scoping,
22
23 #[cfg(feature = "processing")]
25 counts: EntityCounts,
26}
27
28fn to_counts(summary: &BucketSummary) -> EntityCounts {
29 match *summary {
30 BucketSummary::Transactions(count) => EntityCounts {
31 transactions: Some(count),
32 spans: None,
33 },
34 BucketSummary::Spans(count) => EntityCounts {
35 transactions: None,
36 spans: Some(count),
37 },
38 BucketSummary::None => EntityCounts::default(),
39 }
40}
41
42#[derive(Debug)]
43struct SummarizedBucket {
44 bucket: Bucket,
45 summary: BucketSummary,
46}
47
48#[derive(Debug, Default, Clone)]
50struct EntityCounts {
51 transactions: Option<usize>,
60 spans: Option<usize>,
69}
70
71impl std::ops::Add for EntityCounts {
72 type Output = Self;
73
74 fn add(self, rhs: EntityCounts) -> Self::Output {
75 Self {
76 transactions: add_some(self.transactions, rhs.transactions),
77 spans: add_some(self.spans, rhs.spans),
78 }
79 }
80}
81
82fn add_some<T>(a: Option<T>, b: Option<T>) -> Option<T>
83where
84 T: std::ops::Add<Output = T>,
85{
86 match (a, b) {
87 (None, None) => None,
88 (None, Some(c)) | (Some(c), None) => Some(c),
89 (Some(a), Some(b)) => Some(a + b),
90 }
91}
92
93impl<Q: AsRef<Vec<Quota>>> MetricsLimiter<Q> {
94 pub fn create(
98 buckets: impl IntoIterator<Item = Bucket>,
99 quotas: Q,
100 scoping: Scoping,
101 ) -> Result<Self, Vec<Bucket>> {
102 let buckets: Vec<_> = buckets
103 .into_iter()
104 .map(|bucket| {
105 let summary = match bucket.metadata.extracted_from_indexed {
107 false => bucket.summary(),
108 true => Default::default(),
109 };
110 SummarizedBucket { bucket, summary }
111 })
112 .collect();
113
114 let total_counts = buckets
116 .iter()
117 .map(|b| to_counts(&b.summary))
118 .reduce(|a, b| a + b);
119 if let Some(_counts) = total_counts {
120 Ok(Self {
121 buckets,
122 quotas,
123 scoping,
124 #[cfg(feature = "processing")]
125 counts: _counts,
126 })
127 } else {
128 Err(buckets.into_iter().map(|s| s.bucket).collect())
129 }
130 }
131
132 #[cfg(feature = "processing")]
134 pub fn scoping(&self) -> &Scoping {
135 &self.scoping
136 }
137
138 #[cfg(feature = "processing")]
140 pub fn quotas(&self) -> &[Quota] {
141 self.quotas.as_ref()
142 }
143
144 #[cfg(feature = "processing")]
154 pub fn count(&self, category: DataCategory) -> Option<usize> {
155 match category {
156 DataCategory::Transaction => self.counts.transactions,
157 DataCategory::Span => self.counts.spans,
158 _ => None,
159 }
160 }
161
162 fn drop_with_outcome(
163 &mut self,
164 category: DataCategory,
165 outcome: Outcome,
166 metric_outcomes: &MetricOutcomes,
167 ) {
168 let buckets = std::mem::take(&mut self.buckets);
169 let (buckets, dropped) = utils::split_off_map(buckets, |b| match b.summary {
170 BucketSummary::Transactions { .. } if category == DataCategory::Transaction => {
171 Either::Right(b.bucket)
172 }
173 BucketSummary::Spans(_) if category == DataCategory::Span => Either::Right(b.bucket),
174 _ => Either::Left(b),
175 });
176 self.buckets = buckets;
177
178 metric_outcomes.track(self.scoping, &dropped, outcome);
179 }
180
181 pub fn enforce_limits(
185 &mut self,
186 rate_limits: &RateLimits,
187 metric_outcomes: &MetricOutcomes,
188 ) -> bool {
189 for category in [DataCategory::Transaction, DataCategory::Span] {
190 let active_rate_limits =
191 rate_limits.check_with_quotas(self.quotas.as_ref(), self.scoping.item(category));
192
193 if let Some(limit) = active_rate_limits.longest() {
195 self.drop_with_outcome(
196 category,
197 Outcome::RateLimited(limit.reason_code.clone()),
198 metric_outcomes,
199 );
200
201 return true;
202 }
203 }
204
205 false
206 }
207
208 pub fn into_buckets(self) -> Vec<Bucket> {
210 self.buckets.into_iter().map(|s| s.bucket).collect()
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use relay_base_schema::organization::OrganizationId;
217 use relay_base_schema::project::{ProjectId, ProjectKey};
218 use relay_metrics::{BucketMetadata, BucketValue, UnixTimestamp};
219 use relay_quotas::QuotaScope;
220 use relay_system::Addr;
221
222 use super::*;
223
224 fn deny(category: DataCategory) -> Vec<Quota> {
225 vec![Quota {
226 id: None,
227 categories: [category].into(),
228 scope: QuotaScope::Organization,
229 scope_id: None,
230 limit: Some(0),
231 window: None,
232 reason_code: None,
233 namespace: None,
234 }]
235 }
236
237 fn run_limiter(
239 metrics: Vec<Bucket>,
240 quotas: Vec<Quota>,
241 ) -> (Vec<Bucket>, Vec<(Outcome, DataCategory, u32)>) {
242 let (outcome_sink, mut rx) = Addr::custom();
243 let metric_outcomes = MetricOutcomes::new(outcome_sink.clone());
244
245 let mut limiter = MetricsLimiter::create(
246 metrics,
247 quotas,
248 Scoping {
249 organization_id: OrganizationId::new(1),
250 project_id: ProjectId::new(1),
251 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
252 key_id: None,
253 },
254 )
255 .unwrap();
256
257 limiter.enforce_limits(&RateLimits::new(), &metric_outcomes);
258 let metrics = limiter.into_buckets();
259
260 rx.close();
261
262 let outcomes: Vec<_> = (0..)
263 .map(|_| rx.blocking_recv())
264 .take_while(|o| o.is_some())
265 .flatten()
266 .filter(|o| o.category != DataCategory::MetricBucket)
267 .map(|o| (o.outcome, o.category, o.quantity))
268 .collect();
269
270 (metrics, outcomes)
271 }
272
273 fn mixed_bag() -> Vec<Bucket> {
275 vec![
276 Bucket {
277 timestamp: UnixTimestamp::now(),
278 width: 0,
279 name: "c:transactions/usage@none".into(),
280 tags: Default::default(),
281 value: BucketValue::counter(12.into()),
282 metadata: BucketMetadata::default(),
283 },
284 Bucket {
285 timestamp: UnixTimestamp::now(),
286 width: 0,
287 name: "c:spans/usage@none".into(),
288 tags: Default::default(),
289 value: BucketValue::counter(34.into()),
290 metadata: BucketMetadata::default(),
291 },
292 Bucket {
293 timestamp: UnixTimestamp::now(),
294 width: 0,
295 name: "c:spans/usage@none".into(),
296 tags: Default::default(),
297 value: BucketValue::counter(56.into()),
298 metadata: BucketMetadata::default(),
299 },
300 Bucket {
301 timestamp: UnixTimestamp::now(),
302 width: 0,
303 name: "d:spans/exclusive_time@millisecond".into(),
304 tags: Default::default(),
305 value: BucketValue::distribution(78.into()),
306 metadata: BucketMetadata::default(),
307 },
308 Bucket {
309 timestamp: UnixTimestamp::now(),
310 width: 0,
311 name: "d:custom/something@millisecond".into(),
312 tags: Default::default(),
313 value: BucketValue::distribution(78.into()),
314 metadata: BucketMetadata::default(),
315 },
316 ]
317 }
318
319 #[test]
320 fn mixed_with_span_quota() {
321 let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Span));
322
323 assert_eq!(metrics.len(), 2);
324 assert_eq!(&*metrics[0].name, "c:transactions/usage@none");
325 assert_eq!(&*metrics[1].name, "d:custom/something@millisecond");
326
327 assert_eq!(
328 outcomes,
329 vec![(Outcome::RateLimited(None), DataCategory::Span, 90)]
330 );
331 }
332
333 #[test]
334 fn mixed_with_transaction_quota() {
335 let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Transaction));
336
337 assert_eq!(metrics.len(), 4);
338 assert_eq!(&*metrics[0].name, "c:spans/usage@none");
339 assert_eq!(&*metrics[1].name, "c:spans/usage@none");
340 assert_eq!(&*metrics[2].name, "d:spans/exclusive_time@millisecond");
341 assert_eq!(&*metrics[3].name, "d:custom/something@millisecond");
342
343 assert_eq!(
344 outcomes,
345 vec![(Outcome::RateLimited(None), DataCategory::Transaction, 12)]
346 );
347 }
348}