relay_server/services/metrics/
aggregator.rs

1use std::pin::Pin;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::{Duration, SystemTime};
5
6use hashbrown::HashMap;
7use hashbrown::hash_map::Entry;
8use relay_base_schema::project::ProjectKey;
9use relay_config::AggregatorServiceConfig;
10use relay_metrics::Bucket;
11use relay_metrics::aggregator::{self, AggregateMetricsError, AggregatorConfig, Partition};
12use relay_quotas::{RateLimits, Scoping};
13use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service};
14use tokio::time::{Instant, Sleep};
15
16use crate::services::projects::cache::ProjectCacheHandle;
17use crate::services::projects::project::{ProjectInfo, ProjectState};
18use crate::statsd::{RelayCounters, RelayTimers};
19
20/// Aggregator for metric buckets.
21///
22/// Buckets are flushed to a receiver after their time window and a grace period have passed.
23/// Metrics with a recent timestamp are given a longer grace period than backdated metrics, which
24/// are flushed after a shorter debounce delay. See [`AggregatorServiceConfig`] for configuration options.
25///
26/// Internally, the aggregator maintains a continuous flush cycle every 100ms. It guarantees that
27/// all elapsed buckets belonging to the same [`ProjectKey`] are flushed together.
28///
29/// Receivers must implement a handler for the [`FlushBuckets`] message.
30#[derive(Debug)]
31pub enum Aggregator {
32    /// Merge the buckets.
33    MergeBuckets(MergeBuckets),
34}
35
36impl Aggregator {
37    /// Returns the name of the message variant.
38    pub fn variant(&self) -> &'static str {
39        match self {
40            Aggregator::MergeBuckets(_) => "MergeBuckets",
41        }
42    }
43}
44
45impl Interface for Aggregator {}
46
47impl FromMessage<MergeBuckets> for Aggregator {
48    type Response = NoResponse;
49    fn from_message(message: MergeBuckets, _: ()) -> Self {
50        Self::MergeBuckets(message)
51    }
52}
53
54/// A message containing a vector of buckets to be flushed.
55///
56/// Handlers must respond to this message with a `Result`:
57/// - If flushing has succeeded or the buckets should be dropped for any reason, respond with `Ok`.
58/// - If flushing fails and should be retried at a later time, respond with `Err` containing the
59///   failed buckets. They will be merged back into the aggregator and flushed at a later time.
60#[derive(Clone, Debug)]
61pub struct FlushBuckets {
62    /// The partition to which the buckets belong.
63    pub partition_key: u32,
64    /// The buckets to be flushed.
65    pub buckets: HashMap<ProjectKey, ProjectBuckets>,
66}
67
68/// Metric buckets with additional project.
69#[derive(Debug, Clone)]
70pub struct ProjectBuckets {
71    /// The metric buckets to encode.
72    pub buckets: Vec<Bucket>,
73    /// Scoping of the project.
74    pub scoping: Scoping,
75    /// Project info for extracting quotas.
76    pub project_info: Arc<ProjectInfo>,
77    /// Currently cached rate limits.
78    pub rate_limits: Arc<RateLimits>,
79}
80
81impl Extend<Bucket> for ProjectBuckets {
82    fn extend<T: IntoIterator<Item = Bucket>>(&mut self, iter: T) {
83        self.buckets.extend(iter)
84    }
85}
86
87/// Service implementing the [`Aggregator`] interface.
88pub struct AggregatorService {
89    aggregator: aggregator::Aggregator,
90    receiver: Option<Recipient<FlushBuckets, NoResponse>>,
91    project_cache: ProjectCacheHandle,
92    config: AggregatorServiceConfig,
93    can_accept_metrics: Arc<AtomicBool>,
94    next_flush: Pin<Box<Sleep>>,
95}
96
97impl AggregatorService {
98    /// Create a new aggregator service and connect it to `receiver`.
99    ///
100    /// The aggregator will flush a list of buckets to the receiver in regular intervals based on
101    /// the given `config`.
102    pub fn new(
103        config: AggregatorServiceConfig,
104        receiver: Option<Recipient<FlushBuckets, NoResponse>>,
105        project_cache: ProjectCacheHandle,
106    ) -> Self {
107        Self::named("default".to_owned(), config, receiver, project_cache)
108    }
109
110    /// Like [`Self::new`], but with a provided name.
111    pub(crate) fn named(
112        name: String,
113        config: AggregatorServiceConfig,
114        receiver: Option<Recipient<FlushBuckets, NoResponse>>,
115        project_cache: ProjectCacheHandle,
116    ) -> Self {
117        let aggregator = aggregator::Aggregator::named(name, &config.aggregator);
118        Self {
119            receiver,
120            config,
121            can_accept_metrics: Arc::new(AtomicBool::new(true)),
122            aggregator,
123            project_cache,
124            next_flush: Box::pin(tokio::time::sleep(Duration::from_secs(0))),
125        }
126    }
127
128    pub fn handle(&self) -> AggregatorHandle {
129        AggregatorHandle {
130            can_accept_metrics: Arc::clone(&self.can_accept_metrics),
131        }
132    }
133
134    /// Sends the [`FlushBuckets`] message to the receiver in the fire and forget fashion. It is up
135    /// to the receiver to send the [`MergeBuckets`] message back if buckets could not be flushed
136    /// and we require another re-try.
137    ///
138    /// Returns when the next flush should be attempted.
139    fn try_flush(&mut self) -> Duration {
140        let partition = match self.aggregator.try_flush_next(SystemTime::now()) {
141            Ok(partition) => partition,
142            Err(duration) => return duration,
143        };
144        self.can_accept_metrics.store(true, Ordering::Relaxed);
145
146        self.flush_partition(partition);
147
148        self.aggregator.next_flush_at(SystemTime::now())
149    }
150
151    fn flush_partition(&mut self, partition: Partition) {
152        let Some(receiver) = &self.receiver else {
153            return;
154        };
155
156        let mut buckets_by_project = hashbrown::HashMap::new();
157
158        let partition_key = partition.partition_key;
159        for (project_key, bucket) in partition {
160            let s = match buckets_by_project.entry(project_key) {
161                Entry::Occupied(occupied_entry) => occupied_entry.into_mut(),
162                Entry::Vacant(vacant_entry) => {
163                    let project = self.project_cache.get(project_key);
164
165                    let project_info = match project.state() {
166                        ProjectState::Enabled(info) => Arc::clone(info),
167                        ProjectState::Disabled => continue, // Drop the bucket.
168                        ProjectState::Pending => {
169                            // Return to the aggregator, which will assign a new flush time.
170                            if let Err(error) = self.aggregator.merge(project_key, bucket) {
171                                relay_log::error!(
172                                    tags.aggregator = self.aggregator.name(),
173                                    tags.project_key = project_key.as_str(),
174                                    bucket.error = &error as &dyn std::error::Error,
175                                    "failed to return metric bucket back to the aggregator"
176                                );
177                            }
178                            relay_statsd::metric!(
179                                counter(RelayCounters::ProjectStateFlushMetricsNoProject) += 1
180                            );
181                            continue;
182                        }
183                    };
184
185                    let rate_limits = project.rate_limits().current_limits();
186                    let Some(scoping) = project_info.scoping(project_key) else {
187                        // This should never happen, at this point we should always have a valid
188                        // project with the necessary information to construct a scoping.
189                        //
190                        // Ideally we enforce this through the type system in the future.
191                        relay_log::error!(
192                            tags.project_key = project_key.as_str(),
193                            "dropping buckets because of missing scope",
194                        );
195                        continue;
196                    };
197
198                    vacant_entry.insert(ProjectBuckets {
199                        buckets: Vec::new(),
200                        scoping,
201                        project_info,
202                        rate_limits,
203                    })
204                }
205            };
206
207            s.buckets.push(bucket);
208        }
209
210        if !buckets_by_project.is_empty() {
211            relay_log::debug!(
212                "flushing buckets for {} projects in partition {partition_key}",
213                buckets_by_project.len()
214            );
215
216            receiver.send(FlushBuckets {
217                partition_key,
218                buckets: buckets_by_project,
219            });
220        }
221    }
222
223    fn handle_merge_buckets(&mut self, msg: MergeBuckets) {
224        let MergeBuckets {
225            project_key,
226            buckets,
227        } = msg;
228
229        for mut bucket in buckets.into_iter() {
230            if !validate_bucket(&mut bucket, &self.config) {
231                continue;
232            };
233
234            match self.aggregator.merge(project_key, bucket) {
235                // Ignore invalid timestamp errors and drop the bucket.
236                Err(AggregateMetricsError::InvalidTimestamp(_)) => {}
237                Err(AggregateMetricsError::TotalLimitExceeded) => {
238                    relay_log::error!(
239                        tags.aggregator = self.aggregator.name(),
240                        "aggregator limit exceeded"
241                    );
242                    self.can_accept_metrics.store(false, Ordering::Relaxed);
243                    break;
244                }
245                Err(AggregateMetricsError::ProjectLimitExceeded) => {
246                    relay_log::error!(
247                        tags.aggregator = self.aggregator.name(),
248                        tags.project_key = project_key.as_str(),
249                        "project metrics limit exceeded for project {project_key}"
250                    );
251                    break;
252                }
253                Err(error) => {
254                    relay_log::error!(
255                        tags.aggregator = self.aggregator.name(),
256                        tags.project_key = project_key.as_str(),
257                        bucket.error = &error as &dyn std::error::Error,
258                        "failed to aggregate metric bucket"
259                    );
260                }
261                Ok(()) => {}
262            };
263        }
264    }
265
266    fn handle_message(&mut self, message: Aggregator) {
267        match message {
268            Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg),
269        }
270    }
271
272    fn handle_shutdown(&mut self) {
273        relay_log::info!(
274            "Shutting down metrics aggregator {}",
275            self.aggregator.name()
276        );
277
278        // Create a new aggregator with very aggressive flush parameters.
279        let aggregator = aggregator::Aggregator::named(
280            self.aggregator.name().to_owned(),
281            &AggregatorConfig {
282                bucket_interval: 1,
283                aggregator_size: 1,
284                initial_delay: 0,
285                ..self.config.aggregator
286            },
287        );
288
289        let previous = std::mem::replace(&mut self.aggregator, aggregator);
290
291        let mut partitions = 0;
292        for partition in previous.into_partitions() {
293            self.flush_partition(partition);
294            partitions += 1;
295        }
296        relay_log::debug!("Force flushed {partitions} partitions");
297
298        // Reset the next flush time, to the time of the new aggregator.
299        self.next_flush
300            .as_mut()
301            .reset(Instant::now() + self.aggregator.next_flush_at(SystemTime::now()));
302    }
303}
304
305impl Service for AggregatorService {
306    type Interface = Aggregator;
307
308    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
309        let mut shutdown = Controller::shutdown_handle();
310
311        macro_rules! timed {
312            ($task:expr, $body:expr) => {{
313                let task_name = $task;
314                relay_statsd::metric!(
315                    timer(RelayTimers::AggregatorServiceDuration),
316                    task = task_name,
317                    aggregator = self.aggregator.name(),
318                    { $body }
319                )
320            }};
321        }
322
323        loop {
324            tokio::select! {
325                biased;
326
327                _ = &mut self.next_flush => timed!(
328                    "try_flush", {
329                        let next = self.try_flush();
330                        self.next_flush.as_mut().reset(Instant::now() + next);
331                    }
332                ),
333                Some(message) = rx.recv() => timed!(message.variant(), self.handle_message(message)),
334                _ = shutdown.notified() => timed!("shutdown", self.handle_shutdown()),
335
336                else => break,
337            }
338        }
339    }
340}
341
342impl Drop for AggregatorService {
343    fn drop(&mut self) {
344        if !self.aggregator.is_empty() {
345            relay_log::error!(
346                tags.aggregator = self.aggregator.name(),
347                "metrics aggregator dropping buckets"
348            );
349            relay_statsd::metric!(
350                counter(RelayCounters::BucketsDropped) += 1,
351                aggregator = self.aggregator.name(),
352            );
353        }
354    }
355}
356
357/// A message containing a list of [`Bucket`]s to be inserted into the aggregator.
358#[derive(Debug)]
359pub struct MergeBuckets {
360    pub project_key: ProjectKey,
361    pub buckets: Vec<Bucket>,
362}
363
364impl MergeBuckets {
365    /// Creates a new message containing a list of [`Bucket`]s.
366    pub fn new(project_key: ProjectKey, buckets: Vec<Bucket>) -> Self {
367        Self {
368            project_key,
369            buckets,
370        }
371    }
372}
373
374/// Provides sync access to the state of the [`AggregatorService`].
375#[derive(Debug, Clone)]
376pub struct AggregatorHandle {
377    can_accept_metrics: Arc<AtomicBool>,
378}
379
380impl AggregatorHandle {
381    /// Returns `true` if the aggregator can still accept metrics.
382    pub fn can_accept_metrics(&self) -> bool {
383        self.can_accept_metrics.load(Ordering::Relaxed)
384    }
385}
386
387/// Validates the metric name and its tags are correct.
388///
389/// Returns `false` if the metric should be dropped.
390fn validate_bucket(bucket: &mut Bucket, config: &AggregatorServiceConfig) -> bool {
391    if bucket.name.len() > config.max_name_length {
392        relay_log::debug!(
393            "Invalid metric name, too long (> {}): {:?}",
394            config.max_name_length,
395            bucket.name
396        );
397        return false;
398    }
399
400    bucket.tags.retain(|tag_key, tag_value| {
401        if tag_key.len() > config.max_tag_key_length {
402            relay_log::debug!("Invalid metric tag key {tag_key:?}");
403            return false;
404        }
405        if bytecount::num_chars(tag_value.as_bytes()) > config.max_tag_value_length {
406            relay_log::debug!("Invalid metric tag value");
407            return false;
408        }
409
410        true
411    });
412
413    true
414}
415
416#[cfg(test)]
417mod tests {
418    use std::collections::BTreeMap;
419    use std::sync::{Arc, RwLock};
420
421    use relay_base_schema::organization::OrganizationId;
422    use relay_base_schema::project::ProjectId;
423    use relay_common::time::UnixTimestamp;
424    use relay_metrics::{BucketMetadata, BucketValue, aggregator::AggregatorConfig};
425
426    use super::*;
427
428    #[derive(Default)]
429    struct ReceivedData {
430        buckets: Vec<Bucket>,
431    }
432
433    struct TestInterface(FlushBuckets);
434
435    impl Interface for TestInterface {}
436
437    impl FromMessage<FlushBuckets> for TestInterface {
438        type Response = NoResponse;
439
440        fn from_message(message: FlushBuckets, _: ()) -> Self {
441            Self(message)
442        }
443    }
444
445    #[derive(Clone, Default)]
446    struct TestReceiver {
447        data: Arc<RwLock<ReceivedData>>,
448        reject_all: bool,
449    }
450
451    impl TestReceiver {
452        fn add_buckets(&self, buckets: HashMap<ProjectKey, ProjectBuckets>) {
453            let buckets = buckets.into_values().flat_map(|s| s.buckets);
454            self.data.write().unwrap().buckets.extend(buckets);
455        }
456
457        fn bucket_count(&self) -> usize {
458            self.data.read().unwrap().buckets.len()
459        }
460    }
461
462    impl Service for TestReceiver {
463        type Interface = TestInterface;
464
465        async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
466            while let Some(message) = rx.recv().await {
467                let buckets = message.0.buckets;
468                relay_log::debug!(?buckets, "received buckets");
469                if !self.reject_all {
470                    self.add_buckets(buckets);
471                }
472            }
473        }
474    }
475
476    fn some_bucket() -> Bucket {
477        let timestamp = UnixTimestamp::from_secs(999994711);
478        Bucket {
479            timestamp,
480            width: 0,
481            name: "c:transactions/foo".into(),
482            value: BucketValue::counter(42.into()),
483            tags: BTreeMap::new(),
484            metadata: BucketMetadata::new(timestamp),
485        }
486    }
487
488    #[tokio::test(start_paused = true)]
489    async fn test_flush_bucket() {
490        relay_test::setup();
491
492        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
493
494        let receiver = TestReceiver::default();
495        let recipient = receiver.clone().start_detached().recipient();
496        let project_cache = ProjectCacheHandle::for_test();
497        project_cache.test_set_project_state(
498            project_key,
499            ProjectState::Enabled({
500                Arc::new(ProjectInfo {
501                    // Minimum necessary to get a valid scoping.
502                    project_id: Some(ProjectId::new(3)),
503                    organization_id: Some(OrganizationId::new(1)),
504                    ..Default::default()
505                })
506            }),
507        );
508
509        let config = AggregatorServiceConfig {
510            aggregator: AggregatorConfig {
511                bucket_interval: 1,
512                initial_delay: 0,
513                ..Default::default()
514            },
515            ..Default::default()
516        };
517        let aggregator =
518            AggregatorService::new(config, Some(recipient), project_cache).start_detached();
519
520        let mut bucket = some_bucket();
521        bucket.timestamp = UnixTimestamp::now();
522
523        aggregator.send(MergeBuckets::new(project_key, vec![bucket]));
524
525        // Nothing flushed.
526        assert_eq!(receiver.bucket_count(), 0);
527
528        // Wait until flush delay has passed. It is up to 2s: 1s for the current bucket
529        // and 1s for the flush shift. Adding 100ms buffer.
530        tokio::time::sleep(Duration::from_millis(2100)).await;
531        // receiver must have 1 bucket flushed
532        assert_eq!(receiver.bucket_count(), 1);
533    }
534
535    fn test_config() -> AggregatorServiceConfig {
536        AggregatorServiceConfig {
537            max_name_length: 200,
538            max_tag_key_length: 200,
539            max_tag_value_length: 200,
540            ..Default::default()
541        }
542    }
543
544    #[test]
545    fn test_validate_bucket_key_str_length() {
546        relay_test::setup();
547        let mut short_metric = Bucket {
548            timestamp: UnixTimestamp::now(),
549            name: "c:transactions/a_short_metric".into(),
550            tags: BTreeMap::new(),
551            metadata: Default::default(),
552            width: 0,
553            value: BucketValue::Counter(0.into()),
554        };
555        assert!(validate_bucket(&mut short_metric, &test_config()));
556
557        let mut long_metric = Bucket {
558            timestamp: UnixTimestamp::now(),
559            name: "c:transactions/long_name_a_very_long_name_its_super_long_really_but_like_super_long_probably_the_longest_name_youve_seen_and_even_the_longest_name_ever_its_extremly_long_i_cant_tell_how_long_it_is_because_i_dont_have_that_many_fingers_thus_i_cant_count_the_many_characters_this_long_name_is".into(),
560            tags: BTreeMap::new(),
561            metadata: Default::default(),
562            width: 0,
563            value: BucketValue::Counter(0.into()),
564        };
565        assert!(!validate_bucket(&mut long_metric, &test_config()));
566
567        let mut short_metric_long_tag_key  = Bucket {
568            timestamp: UnixTimestamp::now(),
569            name: "c:transactions/a_short_metric_with_long_tag_key".into(),
570            tags: BTreeMap::from([("i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into(), "tag_value".into())]),
571            metadata: Default::default(),
572            width: 0,
573            value: BucketValue::Counter(0.into()),
574        };
575        assert!(validate_bucket(
576            &mut short_metric_long_tag_key,
577            &test_config()
578        ));
579        assert_eq!(short_metric_long_tag_key.tags.len(), 0);
580
581        let mut short_metric_long_tag_value  = Bucket {
582            timestamp: UnixTimestamp::now(),
583            name: "c:transactions/a_short_metric_with_long_tag_value".into(),
584            tags: BTreeMap::from([("tag_key".into(), "i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into())]),
585            metadata: Default::default(),
586            width: 0,
587            value: BucketValue::Counter(0.into()),
588        };
589        assert!(validate_bucket(
590            &mut short_metric_long_tag_value,
591            &test_config()
592        ));
593        assert_eq!(short_metric_long_tag_value.tags.len(), 0);
594    }
595
596    #[test]
597    fn test_validate_tag_values_special_chars() {
598        relay_test::setup();
599
600        let tag_value = "x".repeat(199) + "ΓΈ";
601        assert_eq!(tag_value.chars().count(), 200); // Should be allowed
602
603        let mut short_metric = Bucket {
604            timestamp: UnixTimestamp::now(),
605            name: "c:transactions/a_short_metric".into(),
606            tags: BTreeMap::from([("foo".into(), tag_value.clone())]),
607            metadata: Default::default(),
608            width: 0,
609            value: BucketValue::Counter(0.into()),
610        };
611        assert!(validate_bucket(&mut short_metric, &test_config()));
612        assert_eq!(short_metric.tags["foo"], tag_value);
613    }
614}