1use itertools::Either;
4use relay_metrics::Bucket;
5use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
6
7use crate::metrics::outcomes::{BucketSummary, MetricOutcomes, TrackableBucket};
8use crate::services::outcome::Outcome;
9use crate::utils;
10
11#[derive(Debug)]
13pub struct MetricsLimiter<Q: AsRef<Vec<Quota>> = Vec<Quota>> {
14 buckets: Vec<SummarizedBucket>,
16
17 quotas: Q,
19
20 scoping: Scoping,
22
23 #[cfg(feature = "processing")]
25 counts: EntityCounts,
26}
27
28fn to_counts(summary: &BucketSummary) -> EntityCounts {
29 match *summary {
30 BucketSummary::Spans { count, .. } => EntityCounts { spans: Some(count) },
31 BucketSummary::None => EntityCounts::default(),
32 }
33}
34
35#[derive(Debug)]
36struct SummarizedBucket {
37 bucket: Bucket,
38 summary: BucketSummary,
39}
40
41#[derive(Debug, Default, Clone)]
43struct EntityCounts {
44 spans: Option<usize>,
53}
54
55impl std::ops::Add for EntityCounts {
56 type Output = Self;
57
58 fn add(self, rhs: EntityCounts) -> Self::Output {
59 Self {
60 spans: add_some(self.spans, rhs.spans),
61 }
62 }
63}
64
65fn add_some<T>(a: Option<T>, b: Option<T>) -> Option<T>
66where
67 T: std::ops::Add<Output = T>,
68{
69 match (a, b) {
70 (None, None) => None,
71 (None, Some(c)) | (Some(c), None) => Some(c),
72 (Some(a), Some(b)) => Some(a + b),
73 }
74}
75
76impl<Q: AsRef<Vec<Quota>>> MetricsLimiter<Q> {
77 pub fn create(
81 buckets: impl IntoIterator<Item = Bucket>,
82 quotas: Q,
83 scoping: Scoping,
84 ) -> Result<Self, Vec<Bucket>> {
85 let buckets: Vec<_> = buckets
86 .into_iter()
87 .map(|bucket| {
88 let summary = match bucket.metadata.extracted_from_indexed {
90 false => bucket.summary(),
91 true => Default::default(),
92 };
93 SummarizedBucket { bucket, summary }
94 })
95 .collect();
96
97 let total_counts = buckets
99 .iter()
100 .map(|b| to_counts(&b.summary))
101 .reduce(|a, b| a + b);
102 if let Some(_counts) = total_counts {
103 Ok(Self {
104 buckets,
105 quotas,
106 scoping,
107 #[cfg(feature = "processing")]
108 counts: _counts,
109 })
110 } else {
111 Err(buckets.into_iter().map(|s| s.bucket).collect())
112 }
113 }
114
115 #[cfg(feature = "processing")]
117 pub fn scoping(&self) -> &Scoping {
118 &self.scoping
119 }
120
121 #[cfg(feature = "processing")]
123 pub fn quotas(&self) -> &[Quota] {
124 self.quotas.as_ref()
125 }
126
127 #[cfg(feature = "processing")]
137 pub fn count(&self) -> (DataCategory, Option<usize>) {
138 (DataCategory::Span, self.counts.spans)
139 }
140
141 fn drop_with_outcome(
142 &mut self,
143 category: DataCategory,
144 outcome: Outcome,
145 metric_outcomes: &MetricOutcomes,
146 ) {
147 let buckets = std::mem::take(&mut self.buckets);
148 let (buckets, dropped) = utils::split_off_map(buckets, |b| match b.summary {
149 BucketSummary::Spans { .. } if category == DataCategory::Span => {
150 Either::Right(b.bucket)
151 }
152 _ => Either::Left(b),
153 });
154 self.buckets = buckets;
155
156 metric_outcomes.track(self.scoping, &dropped, outcome);
157 }
158
159 pub fn enforce_limits(
163 &mut self,
164 rate_limits: &RateLimits,
165 metric_outcomes: &MetricOutcomes,
166 ) -> bool {
167 let active_rate_limits = rate_limits
168 .check_with_quotas(self.quotas.as_ref(), self.scoping.item(DataCategory::Span));
169
170 if let Some(limit) = active_rate_limits.longest() {
172 self.drop_with_outcome(
173 DataCategory::Span,
174 Outcome::RateLimited(limit.reason_code.clone()),
175 metric_outcomes,
176 );
177
178 return true;
179 }
180
181 false
182 }
183
184 pub fn into_buckets(self) -> Vec<Bucket> {
186 self.buckets.into_iter().map(|s| s.bucket).collect()
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use relay_base_schema::organization::OrganizationId;
193 use relay_base_schema::project::{ProjectId, ProjectKey};
194 use relay_metrics::{BucketMetadata, BucketValue, UnixTimestamp};
195 use relay_quotas::QuotaScope;
196 use relay_system::Addr;
197
198 use super::*;
199
200 fn deny(category: DataCategory) -> Vec<Quota> {
201 vec![Quota {
202 id: None,
203 categories: [category].into(),
204 scope: QuotaScope::Organization,
205 scope_id: None,
206 limit: Some(0),
207 window: None,
208 reason_code: None,
209 namespace: None,
210 }]
211 }
212
213 fn run_limiter(
215 metrics: Vec<Bucket>,
216 quotas: Vec<Quota>,
217 ) -> (Vec<Bucket>, Vec<(Outcome, DataCategory, u32)>) {
218 let (outcome_sink, mut rx) = Addr::custom();
219 let metric_outcomes = MetricOutcomes::new(outcome_sink.clone());
220
221 let mut limiter = MetricsLimiter::create(
222 metrics,
223 quotas,
224 Scoping {
225 organization_id: OrganizationId::new(1),
226 project_id: ProjectId::new(1),
227 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
228 key_id: None,
229 },
230 )
231 .unwrap();
232
233 limiter.enforce_limits(&RateLimits::new(), &metric_outcomes);
234 let metrics = limiter.into_buckets();
235
236 rx.close();
237
238 let outcomes: Vec<_> = (0..)
239 .map(|_| rx.blocking_recv())
240 .take_while(|o| o.is_some())
241 .flatten()
242 .filter(|o| o.category != DataCategory::MetricBucket)
243 .map(|o| (o.outcome, o.category, o.quantity))
244 .collect();
245
246 (metrics, outcomes)
247 }
248
249 fn mixed_bag() -> Vec<Bucket> {
251 vec![
252 Bucket {
253 timestamp: UnixTimestamp::now(),
254 width: 0,
255 name: "c:spans/usage@none".into(),
256 tags: Default::default(),
257 value: BucketValue::counter(34.into()),
258 metadata: BucketMetadata::default(),
259 },
260 Bucket {
261 timestamp: UnixTimestamp::now(),
262 width: 0,
263 name: "c:spans/usage@none".into(),
264 tags: Default::default(),
265 value: BucketValue::counter(56.into()),
266 metadata: BucketMetadata::default(),
267 },
268 Bucket {
269 timestamp: UnixTimestamp::now(),
270 width: 0,
271 name: "d:spans/exclusive_time@millisecond".into(),
272 tags: Default::default(),
273 value: BucketValue::distribution(78.into()),
274 metadata: BucketMetadata::default(),
275 },
276 Bucket {
277 timestamp: UnixTimestamp::now(),
278 width: 0,
279 name: "d:custom/something@millisecond".into(),
280 tags: Default::default(),
281 value: BucketValue::distribution(78.into()),
282 metadata: BucketMetadata::default(),
283 },
284 ]
285 }
286
287 #[test]
288 fn mixed_with_span_quota() {
289 let (metrics, outcomes) = run_limiter(mixed_bag(), deny(DataCategory::Span));
290
291 assert_eq!(metrics.len(), 1);
292 assert_eq!(&*metrics[0].name, "d:custom/something@millisecond");
293
294 assert_eq!(
295 outcomes,
296 vec![(Outcome::RateLimited(None), DataCategory::Span, 90)]
297 );
298 }
299}