1use itertools::Either;
4use relay_metrics::Bucket;
5use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
6
7use crate::metrics::outcomes::{BucketSummary, MetricOutcomes, TrackableBucket};
8use crate::services::outcome::Outcome;
9use crate::utils;
10
11#[derive(Debug)]
13pub struct MetricsLimiter<Q: AsRef<Vec<Quota>> = Vec<Quota>> {
14 buckets: Vec<SummarizedBucket>,
16
17 quotas: Q,
19
20 scoping: Scoping,
22
23 #[cfg(feature = "processing")]
25 counts: EntityCounts,
26}
27
28fn to_counts(summary: &BucketSummary) -> EntityCounts {
29 match *summary {
30 BucketSummary::Transactions(count) => EntityCounts {
31 transactions: Some(count),
32 spans: None,
33 },
34 BucketSummary::Spans(count) => EntityCounts {
35 transactions: None,
36 spans: Some(count),
37 },
38 BucketSummary::None => EntityCounts::default(),
39 }
40}
41
42#[derive(Debug)]
43struct SummarizedBucket {
44 bucket: Bucket,
45 summary: BucketSummary,
46}
47
48#[derive(Debug, Default, Clone)]
50struct EntityCounts {
51 transactions: Option<usize>,
60 spans: Option<usize>,
69}
70
71impl std::ops::Add for EntityCounts {
72 type Output = Self;
73
74 fn add(self, rhs: EntityCounts) -> Self::Output {
75 Self {
76 transactions: add_some(self.transactions, rhs.transactions),
77 spans: add_some(self.spans, rhs.spans),
78 }
79 }
80}
81
82fn add_some<T>(a: Option<T>, b: Option<T>) -> Option<T>
83where
84 T: std::ops::Add<Output = T>,
85{
86 match (a, b) {
87 (None, None) => None,
88 (None, Some(c)) | (Some(c), None) => Some(c),
89 (Some(a), Some(b)) => Some(a + b),
90 }
91}
92
93impl<Q: AsRef<Vec<Quota>>> MetricsLimiter<Q> {
94 pub fn create(
98 buckets: impl IntoIterator<Item = Bucket>,
99 quotas: Q,
100 scoping: Scoping,
101 ) -> Result<Self, Vec<Bucket>> {
102 let buckets: Vec<_> = buckets
103 .into_iter()
104 .map(|bucket| {
105 let summary = match bucket.metadata.extracted_from_indexed {
107 false => bucket.summary(),
108 true => Default::default(),
109 };
110 SummarizedBucket { bucket, summary }
111 })
112 .collect();
113
114 let total_counts = buckets
116 .iter()
117 .map(|b| to_counts(&b.summary))
118 .reduce(|a, b| a + b);
119 if let Some(_counts) = total_counts {
120 Ok(Self {
121 buckets,
122 quotas,
123 scoping,
124 #[cfg(feature = "processing")]
125 counts: _counts,
126 })
127 } else {
128 Err(buckets.into_iter().map(|s| s.bucket).collect())
129 }
130 }
131
132 #[cfg(feature = "processing")]
134 pub fn scoping(&self) -> &Scoping {
135 &self.scoping
136 }
137
138 #[cfg(feature = "processing")]
140 pub fn quotas(&self) -> &[Quota] {
141 self.quotas.as_ref()
142 }
143
144 #[cfg(feature = "processing")]
154 pub fn count(&self, category: DataCategory) -> Option<usize> {
155 match category {
156 DataCategory::Transaction => self.counts.transactions,
157 DataCategory::Span => self.counts.spans,
158 _ => None,
159 }
160 }
161
162 fn drop_with_outcome(
163 &mut self,
164 category: DataCategory,
165 outcome: Outcome,
166 metric_outcomes: &MetricOutcomes,
167 ) {
168 let buckets = std::mem::take(&mut self.buckets);
169 let (buckets, dropped) = utils::split_off_map(buckets, |b| match b.summary {
170 BucketSummary::Transactions { .. } if category == DataCategory::Transaction => {
171 Either::Right(b.bucket)
172 }
173 BucketSummary::Spans(_) if category == DataCategory::Span => Either::Right(b.bucket),
174 _ => Either::Left(b),
175 });
176 self.buckets = buckets;
177
178 metric_outcomes.track(self.scoping, &dropped, outcome);
179 }
180
181 pub fn enforce_limits(
185 &mut self,
186 rate_limits: &RateLimits,
187 metric_outcomes: &MetricOutcomes,
188 ) -> bool {
189 for category in [DataCategory::Transaction, DataCategory::Span] {
190 let active_rate_limits =
191 rate_limits.check_with_quotas(self.quotas.as_ref(), self.scoping.item(category));
192
193 if let Some(limit) = active_rate_limits.longest() {
195 self.drop_with_outcome(
196 category,
197 Outcome::RateLimited(limit.reason_code.clone()),
198 metric_outcomes,
199 );
200
201 return true;
202 }
203 }
204
205 false
206 }
207
208 pub fn into_buckets(self) -> Vec<Bucket> {
210 self.buckets.into_iter().map(|s| s.bucket).collect()
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use relay_base_schema::organization::OrganizationId;
217 use relay_base_schema::project::{ProjectId, ProjectKey};
218 use relay_metrics::{BucketMetadata, BucketValue, UnixTimestamp};
219 use relay_quotas::QuotaScope;
220 use relay_system::Addr;
221 use smallvec::smallvec;
222
223 use super::*;
224
225 fn deny(category: DataCategory) -> Vec<Quota> {
226 vec![Quota {
227 id: None,
228 categories: smallvec![category],
229 scope: QuotaScope::Organization,
230 scope_id: None,
231 limit: Some(0),
232 window: None,
233 reason_code: None,
234 namespace: None,
235 }]
236 }
237
238 fn run_limiter(
240 metrics: Vec<Bucket>,
241 quotas: Vec<Quota>,
242 ) -> (Vec<Bucket>, Vec<(Outcome, DataCategory, u32)>) {
243 let (outcome_sink, mut rx) = Addr::custom();
244 let metric_outcomes = MetricOutcomes::new(outcome_sink.clone());
245
246 let mut limiter = MetricsLimiter::create(
247 metrics,
248 quotas,
249 Scoping {
250 organization_id: OrganizationId::new(1),
251 project_id: ProjectId::new(1),
252 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
253 key_id: None,
254 },
255 )
256 .unwrap();
257
258 limiter.enforce_limits(&RateLimits::new(), &metric_outcomes);
259 let metrics = limiter.into_buckets();
260
261 rx.close();
262
263 let outcomes: Vec<_> = (0..)
264 .map(|_| rx.blocking_recv())
265 .take_while(|o| o.is_some())
266 .flatten()
267 .filter(|o| o.category != DataCategory::MetricBucket)
268 .map(|o| (o.outcome, o.category, o.quantity))
269 .collect();
270
271 (metrics, outcomes)
272 }
273
274 fn mixed_bag() -> Vec<Bucket> {
276 vec![
277 Bucket {
278 timestamp: UnixTimestamp::now(),
279 width: 0,
280 name: "c:transactions/usage@none".into(),
281 tags: Default::default(),
282 value: BucketValue::counter(12.into()),
283 metadata: BucketMetadata::default(),
284 },
285 Bucket {
286 timestamp: UnixTimestamp::now(),
287 width: 0,
288 name: "c:spans/usage@none".into(),
289 tags: Default::default(),
290 value: BucketValue::counter(34.into()),
291 metadata: BucketMetadata::default(),
292 },
293 Bucket {
294 timestamp: UnixTimestamp::now(),
295 width: 0,
296 name: "c:spans/usage@none".into(),
297 tags: Default::default(),
298 value: BucketValue::counter(56.into()),
299 metadata: BucketMetadata::default(),
300 },
301 Bucket {
302 timestamp: UnixTimestamp::now(),
303 width: 0,
304 name: "d:spans/exclusive_time@millisecond".into(),
305 tags: Default::default(),
306 value: BucketValue::distribution(78.into()),
307 metadata: BucketMetadata::default(),
308 },
309 Bucket {
310 timestamp: UnixTimestamp::now(),
311 width: 0,
312 name: "d:custom/something@millisecond".into(),
313 tags: Default::default(),
314 value: BucketValue::distribution(78.into()),
315 metadata: BucketMetadata::default(),
316 },
317 ]
318 }
319
320 #[test]
321 fn mixed_with_span_quota() {
322 let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Span));
323
324 assert_eq!(metrics.len(), 2);
325 assert_eq!(&*metrics[0].name, "c:transactions/usage@none");
326 assert_eq!(&*metrics[1].name, "d:custom/something@millisecond");
327
328 assert_eq!(
329 outcomes,
330 vec![(Outcome::RateLimited(None), DataCategory::Span, 90)]
331 );
332 }
333
334 #[test]
335 fn mixed_with_transaction_quota() {
336 let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Transaction));
337
338 assert_eq!(metrics.len(), 4);
339 assert_eq!(&*metrics[0].name, "c:spans/usage@none");
340 assert_eq!(&*metrics[1].name, "c:spans/usage@none");
341 assert_eq!(&*metrics[2].name, "d:spans/exclusive_time@millisecond");
342 assert_eq!(&*metrics[3].name, "d:custom/something@millisecond");
343
344 assert_eq!(
345 outcomes,
346 vec![(Outcome::RateLimited(None), DataCategory::Transaction, 12)]
347 );
348 }
349}