1use std::pin::Pin;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::{Duration, SystemTime};
5
6use hashbrown::HashMap;
7use hashbrown::hash_map::Entry;
8use relay_base_schema::project::ProjectKey;
9use relay_config::AggregatorServiceConfig;
10use relay_metrics::Bucket;
11use relay_metrics::aggregator::{self, AggregateMetricsError, AggregatorConfig, Partition};
12use relay_quotas::{RateLimits, Scoping};
13use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service};
14use tokio::time::{Instant, Sleep};
15
16use crate::services::projects::cache::ProjectCacheHandle;
17use crate::services::projects::project::{ProjectInfo, ProjectState};
18use crate::statsd::{RelayCounters, RelayTimers};
19
20#[derive(Debug)]
31pub enum Aggregator {
32 MergeBuckets(MergeBuckets),
34}
35
36impl Aggregator {
37 pub fn variant(&self) -> &'static str {
39 match self {
40 Aggregator::MergeBuckets(_) => "MergeBuckets",
41 }
42 }
43}
44
45impl Interface for Aggregator {}
46
47impl FromMessage<MergeBuckets> for Aggregator {
48 type Response = NoResponse;
49 fn from_message(message: MergeBuckets, _: ()) -> Self {
50 Self::MergeBuckets(message)
51 }
52}
53
54#[derive(Clone, Debug)]
61pub struct FlushBuckets {
62 pub partition_key: u32,
64 pub buckets: HashMap<ProjectKey, ProjectBuckets>,
66}
67
68#[derive(Debug, Clone)]
70pub struct ProjectBuckets {
71 pub buckets: Vec<Bucket>,
73 pub scoping: Scoping,
75 pub project_info: Arc<ProjectInfo>,
77 pub rate_limits: Arc<RateLimits>,
79}
80
81impl Extend<Bucket> for ProjectBuckets {
82 fn extend<T: IntoIterator<Item = Bucket>>(&mut self, iter: T) {
83 self.buckets.extend(iter)
84 }
85}
86
87pub struct AggregatorService {
89 aggregator: aggregator::Aggregator,
90 receiver: Option<Recipient<FlushBuckets, NoResponse>>,
91 project_cache: ProjectCacheHandle,
92 config: AggregatorServiceConfig,
93 can_accept_metrics: Arc<AtomicBool>,
94 next_flush: Pin<Box<Sleep>>,
95}
96
97impl AggregatorService {
98 pub fn new(
103 config: AggregatorServiceConfig,
104 receiver: Option<Recipient<FlushBuckets, NoResponse>>,
105 project_cache: ProjectCacheHandle,
106 ) -> Self {
107 Self::named("default".to_owned(), config, receiver, project_cache)
108 }
109
110 pub(crate) fn named(
112 name: String,
113 config: AggregatorServiceConfig,
114 receiver: Option<Recipient<FlushBuckets, NoResponse>>,
115 project_cache: ProjectCacheHandle,
116 ) -> Self {
117 let aggregator = aggregator::Aggregator::named(name, &config.aggregator);
118 Self {
119 receiver,
120 config,
121 can_accept_metrics: Arc::new(AtomicBool::new(true)),
122 aggregator,
123 project_cache,
124 next_flush: Box::pin(tokio::time::sleep(Duration::from_secs(0))),
125 }
126 }
127
128 pub fn handle(&self) -> AggregatorHandle {
129 AggregatorHandle {
130 can_accept_metrics: Arc::clone(&self.can_accept_metrics),
131 }
132 }
133
134 fn try_flush(&mut self) -> Duration {
140 let partition = match self.aggregator.try_flush_next(SystemTime::now()) {
141 Ok(partition) => partition,
142 Err(duration) => return duration,
143 };
144 self.can_accept_metrics.store(true, Ordering::Relaxed);
145
146 self.flush_partition(partition);
147
148 self.aggregator.next_flush_at(SystemTime::now())
149 }
150
151 fn flush_partition(&mut self, partition: Partition) {
152 let Some(receiver) = &self.receiver else {
153 return;
154 };
155
156 let mut buckets_by_project = hashbrown::HashMap::new();
157
158 let partition_key = partition.partition_key;
159 for (project_key, bucket) in partition {
160 let s = match buckets_by_project.entry(project_key) {
161 Entry::Occupied(occupied_entry) => occupied_entry.into_mut(),
162 Entry::Vacant(vacant_entry) => {
163 let project = self.project_cache.get(project_key);
164
165 let project_info = match project.state() {
166 ProjectState::Enabled(info) => Arc::clone(info),
167 ProjectState::Disabled => continue, ProjectState::Pending => {
169 if let Err(error) = self.aggregator.merge(project_key, bucket) {
171 relay_log::error!(
172 tags.aggregator = self.aggregator.name(),
173 tags.project_key = project_key.as_str(),
174 bucket.error = &error as &dyn std::error::Error,
175 "failed to return metric bucket back to the aggregator"
176 );
177 }
178 relay_statsd::metric!(
179 counter(RelayCounters::ProjectStateFlushMetricsNoProject) += 1
180 );
181 continue;
182 }
183 };
184
185 let rate_limits = project.rate_limits().current_limits();
186 let Some(scoping) = project_info.scoping(project_key) else {
187 relay_log::error!(
192 tags.project_key = project_key.as_str(),
193 "dropping buckets because of missing scope",
194 );
195 continue;
196 };
197
198 vacant_entry.insert(ProjectBuckets {
199 buckets: Vec::new(),
200 scoping,
201 project_info,
202 rate_limits,
203 })
204 }
205 };
206
207 s.buckets.push(bucket);
208 }
209
210 if !buckets_by_project.is_empty() {
211 relay_log::debug!(
212 "flushing buckets for {} projects in partition {partition_key}",
213 buckets_by_project.len()
214 );
215
216 receiver.send(FlushBuckets {
217 partition_key,
218 buckets: buckets_by_project,
219 });
220 }
221 }
222
223 fn handle_merge_buckets(&mut self, msg: MergeBuckets) {
224 let MergeBuckets {
225 project_key,
226 buckets,
227 } = msg;
228
229 for mut bucket in buckets.into_iter() {
230 if !validate_bucket(&mut bucket, &self.config) {
231 continue;
232 };
233
234 match self.aggregator.merge(project_key, bucket) {
235 Err(AggregateMetricsError::InvalidTimestamp(_)) => {}
237 Err(AggregateMetricsError::TotalLimitExceeded) => {
238 relay_log::error!(
239 tags.aggregator = self.aggregator.name(),
240 "aggregator limit exceeded"
241 );
242 self.can_accept_metrics.store(false, Ordering::Relaxed);
243 break;
244 }
245 Err(AggregateMetricsError::ProjectLimitExceeded) => {
246 relay_log::error!(
247 tags.aggregator = self.aggregator.name(),
248 tags.project_key = project_key.as_str(),
249 "project metrics limit exceeded for project {project_key}"
250 );
251 break;
252 }
253 Err(error) => {
254 relay_log::error!(
255 tags.aggregator = self.aggregator.name(),
256 tags.project_key = project_key.as_str(),
257 bucket.error = &error as &dyn std::error::Error,
258 "failed to aggregate metric bucket"
259 );
260 }
261 Ok(()) => {}
262 };
263 }
264 }
265
266 fn handle_message(&mut self, message: Aggregator) {
267 match message {
268 Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg),
269 }
270 }
271
272 fn handle_shutdown(&mut self) {
273 relay_log::info!(
274 "Shutting down metrics aggregator {}",
275 self.aggregator.name()
276 );
277
278 let aggregator = aggregator::Aggregator::named(
280 self.aggregator.name().to_owned(),
281 &AggregatorConfig {
282 bucket_interval: 1,
283 aggregator_size: 1,
284 initial_delay: 0,
285 ..self.config.aggregator
286 },
287 );
288
289 let previous = std::mem::replace(&mut self.aggregator, aggregator);
290
291 let mut partitions = 0;
292 for partition in previous.into_partitions() {
293 self.flush_partition(partition);
294 partitions += 1;
295 }
296 relay_log::debug!("Force flushed {partitions} partitions");
297
298 self.next_flush
300 .as_mut()
301 .reset(Instant::now() + self.aggregator.next_flush_at(SystemTime::now()));
302 }
303}
304
305impl Service for AggregatorService {
306 type Interface = Aggregator;
307
308 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
309 let mut shutdown = Controller::shutdown_handle();
310
311 macro_rules! timed {
312 ($task:expr, $body:expr) => {{
313 let task_name = $task;
314 relay_statsd::metric!(
315 timer(RelayTimers::AggregatorServiceDuration),
316 task = task_name,
317 aggregator = self.aggregator.name(),
318 { $body }
319 )
320 }};
321 }
322
323 loop {
324 tokio::select! {
325 biased;
326
327 _ = &mut self.next_flush => timed!(
328 "try_flush", {
329 let next = self.try_flush();
330 self.next_flush.as_mut().reset(Instant::now() + next);
331 }
332 ),
333 Some(message) = rx.recv() => timed!(message.variant(), self.handle_message(message)),
334 _ = shutdown.notified() => timed!("shutdown", self.handle_shutdown()),
335
336 else => break,
337 }
338 }
339 }
340}
341
342impl Drop for AggregatorService {
343 fn drop(&mut self) {
344 if !self.aggregator.is_empty() {
345 relay_log::error!(
346 tags.aggregator = self.aggregator.name(),
347 "metrics aggregator dropping buckets"
348 );
349 relay_statsd::metric!(
350 counter(RelayCounters::BucketsDropped) += 1,
351 aggregator = self.aggregator.name(),
352 );
353 }
354 }
355}
356
357#[derive(Debug)]
359pub struct MergeBuckets {
360 pub project_key: ProjectKey,
361 pub buckets: Vec<Bucket>,
362}
363
364impl MergeBuckets {
365 pub fn new(project_key: ProjectKey, buckets: Vec<Bucket>) -> Self {
367 Self {
368 project_key,
369 buckets,
370 }
371 }
372}
373
374#[derive(Debug, Clone)]
376pub struct AggregatorHandle {
377 can_accept_metrics: Arc<AtomicBool>,
378}
379
380impl AggregatorHandle {
381 pub fn can_accept_metrics(&self) -> bool {
383 self.can_accept_metrics.load(Ordering::Relaxed)
384 }
385}
386
387fn validate_bucket(bucket: &mut Bucket, config: &AggregatorServiceConfig) -> bool {
391 if bucket.name.len() > config.max_name_length {
392 relay_log::debug!(
393 "Invalid metric name, too long (> {}): {:?}",
394 config.max_name_length,
395 bucket.name
396 );
397 return false;
398 }
399
400 bucket.tags.retain(|tag_key, tag_value| {
401 if tag_key.len() > config.max_tag_key_length {
402 relay_log::debug!("Invalid metric tag key {tag_key:?}");
403 return false;
404 }
405 if bytecount::num_chars(tag_value.as_bytes()) > config.max_tag_value_length {
406 relay_log::debug!("Invalid metric tag value");
407 return false;
408 }
409
410 true
411 });
412
413 true
414}
415
416#[cfg(test)]
417mod tests {
418 use std::collections::BTreeMap;
419 use std::sync::{Arc, RwLock};
420
421 use relay_base_schema::organization::OrganizationId;
422 use relay_base_schema::project::ProjectId;
423 use relay_common::time::UnixTimestamp;
424 use relay_metrics::{BucketMetadata, BucketValue, aggregator::AggregatorConfig};
425
426 use super::*;
427
428 #[derive(Default)]
429 struct ReceivedData {
430 buckets: Vec<Bucket>,
431 }
432
433 struct TestInterface(FlushBuckets);
434
435 impl Interface for TestInterface {}
436
437 impl FromMessage<FlushBuckets> for TestInterface {
438 type Response = NoResponse;
439
440 fn from_message(message: FlushBuckets, _: ()) -> Self {
441 Self(message)
442 }
443 }
444
445 #[derive(Clone, Default)]
446 struct TestReceiver {
447 data: Arc<RwLock<ReceivedData>>,
448 reject_all: bool,
449 }
450
451 impl TestReceiver {
452 fn add_buckets(&self, buckets: HashMap<ProjectKey, ProjectBuckets>) {
453 let buckets = buckets.into_values().flat_map(|s| s.buckets);
454 self.data.write().unwrap().buckets.extend(buckets);
455 }
456
457 fn bucket_count(&self) -> usize {
458 self.data.read().unwrap().buckets.len()
459 }
460 }
461
462 impl Service for TestReceiver {
463 type Interface = TestInterface;
464
465 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
466 while let Some(message) = rx.recv().await {
467 let buckets = message.0.buckets;
468 relay_log::debug!(?buckets, "received buckets");
469 if !self.reject_all {
470 self.add_buckets(buckets);
471 }
472 }
473 }
474 }
475
476 fn some_bucket() -> Bucket {
477 let timestamp = UnixTimestamp::from_secs(999994711);
478 Bucket {
479 timestamp,
480 width: 0,
481 name: "c:transactions/foo".into(),
482 value: BucketValue::counter(42.into()),
483 tags: BTreeMap::new(),
484 metadata: BucketMetadata::new(timestamp),
485 }
486 }
487
488 #[tokio::test(start_paused = true)]
489 async fn test_flush_bucket() {
490 relay_test::setup();
491
492 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
493
494 let receiver = TestReceiver::default();
495 let recipient = receiver.clone().start_detached().recipient();
496 let project_cache = ProjectCacheHandle::for_test();
497 project_cache.test_set_project_state(
498 project_key,
499 ProjectState::Enabled({
500 Arc::new(ProjectInfo {
501 project_id: Some(ProjectId::new(3)),
503 organization_id: Some(OrganizationId::new(1)),
504 ..Default::default()
505 })
506 }),
507 );
508
509 let config = AggregatorServiceConfig {
510 aggregator: AggregatorConfig {
511 bucket_interval: 1,
512 initial_delay: 0,
513 ..Default::default()
514 },
515 ..Default::default()
516 };
517 let aggregator =
518 AggregatorService::new(config, Some(recipient), project_cache).start_detached();
519
520 let mut bucket = some_bucket();
521 bucket.timestamp = UnixTimestamp::now();
522
523 aggregator.send(MergeBuckets::new(project_key, vec![bucket]));
524
525 assert_eq!(receiver.bucket_count(), 0);
527
528 tokio::time::sleep(Duration::from_millis(2100)).await;
531 assert_eq!(receiver.bucket_count(), 1);
533 }
534
535 fn test_config() -> AggregatorServiceConfig {
536 AggregatorServiceConfig {
537 max_name_length: 200,
538 max_tag_key_length: 200,
539 max_tag_value_length: 200,
540 ..Default::default()
541 }
542 }
543
544 #[test]
545 fn test_validate_bucket_key_str_length() {
546 relay_test::setup();
547 let mut short_metric = Bucket {
548 timestamp: UnixTimestamp::now(),
549 name: "c:transactions/a_short_metric".into(),
550 tags: BTreeMap::new(),
551 metadata: Default::default(),
552 width: 0,
553 value: BucketValue::Counter(0.into()),
554 };
555 assert!(validate_bucket(&mut short_metric, &test_config()));
556
557 let mut long_metric = Bucket {
558 timestamp: UnixTimestamp::now(),
559 name: "c:transactions/long_name_a_very_long_name_its_super_long_really_but_like_super_long_probably_the_longest_name_youve_seen_and_even_the_longest_name_ever_its_extremly_long_i_cant_tell_how_long_it_is_because_i_dont_have_that_many_fingers_thus_i_cant_count_the_many_characters_this_long_name_is".into(),
560 tags: BTreeMap::new(),
561 metadata: Default::default(),
562 width: 0,
563 value: BucketValue::Counter(0.into()),
564 };
565 assert!(!validate_bucket(&mut long_metric, &test_config()));
566
567 let mut short_metric_long_tag_key = Bucket {
568 timestamp: UnixTimestamp::now(),
569 name: "c:transactions/a_short_metric_with_long_tag_key".into(),
570 tags: BTreeMap::from([("i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into(), "tag_value".into())]),
571 metadata: Default::default(),
572 width: 0,
573 value: BucketValue::Counter(0.into()),
574 };
575 assert!(validate_bucket(
576 &mut short_metric_long_tag_key,
577 &test_config()
578 ));
579 assert_eq!(short_metric_long_tag_key.tags.len(), 0);
580
581 let mut short_metric_long_tag_value = Bucket {
582 timestamp: UnixTimestamp::now(),
583 name: "c:transactions/a_short_metric_with_long_tag_value".into(),
584 tags: BTreeMap::from([("tag_key".into(), "i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into())]),
585 metadata: Default::default(),
586 width: 0,
587 value: BucketValue::Counter(0.into()),
588 };
589 assert!(validate_bucket(
590 &mut short_metric_long_tag_value,
591 &test_config()
592 ));
593 assert_eq!(short_metric_long_tag_value.tags.len(), 0);
594 }
595
596 #[test]
597 fn test_validate_tag_values_special_chars() {
598 relay_test::setup();
599
600 let tag_value = "x".repeat(199) + "ΓΈ";
601 assert_eq!(tag_value.chars().count(), 200); let mut short_metric = Bucket {
604 timestamp: UnixTimestamp::now(),
605 name: "c:transactions/a_short_metric".into(),
606 tags: BTreeMap::from([("foo".into(), tag_value.clone())]),
607 metadata: Default::default(),
608 width: 0,
609 value: BucketValue::Counter(0.into()),
610 };
611 assert!(validate_bucket(&mut short_metric, &test_config()));
612 assert_eq!(short_metric.tags["foo"], tag_value);
613 }
614}