relay_server/services/
outcome_aggregator.rs

1//! This module contains the outcomes aggregator, which collects similar outcomes into groups
2//! and flushed them periodically.
3
4use std::collections::HashMap;
5use std::time::Duration;
6
7use chrono::Utc;
8use relay_common::time::UnixTimestamp;
9use relay_config::{Config, EmitOutcomes};
10use relay_quotas::{DataCategory, Scoping};
11use relay_statsd::metric;
12use relay_system::{Addr, Controller, Service, Shutdown};
13
14use crate::services::outcome::{Outcome, OutcomeProducer, TrackOutcome};
15use crate::statsd::RelayTimers;
16use crate::utils::SleepHandle;
17
18/// Contains everything to construct a `TrackOutcome`, except quantity
19#[derive(Debug, PartialEq, Eq, Hash)]
20struct BucketKey {
21    /// The time slot for which outcomes are aggregated. timestamp = offset * bucket_interval
22    offset: u64,
23    /// Scoping of the request.
24    pub scoping: Scoping,
25    /// The outcome.
26    pub outcome: Outcome,
27    /// The event's data category.
28    pub category: DataCategory,
29}
30
31/// Aggregates [`Outcome`]s into buckets and flushes them periodically.
32///
33/// This service handles a single message [`TrackOutcome`].
34pub struct OutcomeAggregator {
35    /// Whether or not to produce outcomes.
36    ///
37    /// If `true`, all outcomes will be dropped.
38    disabled: bool,
39    /// The width of each aggregated bucket in seconds
40    bucket_interval: u64,
41    /// The number of seconds between flushes of all buckets
42    flush_interval: u64,
43    /// Mapping from bucket key to quantity.
44    buckets: HashMap<BucketKey, u32>,
45    /// The recipient of the aggregated outcomes
46    outcome_producer: Addr<OutcomeProducer>,
47    /// An optional timeout to the next scheduled flush.
48    flush_handle: SleepHandle,
49}
50
51impl OutcomeAggregator {
52    pub fn new(config: &Config, outcome_producer: Addr<OutcomeProducer>) -> Self {
53        let disabled = matches!(config.emit_outcomes(), EmitOutcomes::None);
54
55        Self {
56            disabled,
57            bucket_interval: config.outcome_aggregator().bucket_interval,
58            flush_interval: config.outcome_aggregator().flush_interval,
59            buckets: HashMap::new(),
60            outcome_producer,
61            flush_handle: SleepHandle::idle(),
62        }
63    }
64
65    fn handle_shutdown(&mut self, message: Shutdown) {
66        if message.timeout.is_some() {
67            self.flush();
68            relay_log::info!("outcome aggregator stopped");
69        }
70    }
71
72    fn handle_track_outcome(&mut self, msg: TrackOutcome) {
73        if self.disabled {
74            return;
75        }
76
77        let offset = msg.timestamp.timestamp() as u64 / self.bucket_interval;
78
79        let bucket_key = BucketKey {
80            offset,
81            scoping: msg.scoping,
82            outcome: msg.outcome,
83            category: msg.category,
84        };
85
86        let counter = self.buckets.entry(bucket_key).or_insert(0);
87        *counter += msg.quantity;
88
89        if self.flush_interval == 0 {
90            // Flush immediately. This is useful for integration tests.
91            self.do_flush();
92        } else if self.flush_handle.is_idle() {
93            self.flush_handle
94                .set(Duration::from_secs(self.flush_interval));
95        }
96    }
97
98    fn do_flush(&mut self) {
99        self.flush_handle.reset();
100
101        let bucket_interval = self.bucket_interval;
102        let outcome_producer = self.outcome_producer.clone();
103
104        for (bucket_key, quantity) in self.buckets.drain() {
105            let BucketKey {
106                offset,
107                scoping,
108                outcome,
109                category,
110            } = bucket_key;
111
112            // In case the timestamp cannot be extracted, we fallback to the current UTC `DateTime`
113            let timestamp = UnixTimestamp::from_secs(offset * bucket_interval)
114                .as_datetime()
115                .unwrap_or_else(Utc::now);
116
117            let outcome = TrackOutcome {
118                timestamp,
119                scoping,
120                outcome,
121                event_id: None,
122                remote_addr: None,
123                category,
124                quantity,
125            };
126
127            outcome_producer.send(outcome);
128        }
129    }
130
131    fn flush(&mut self) {
132        metric!(timer(RelayTimers::OutcomeAggregatorFlushTime), {
133            self.do_flush();
134        });
135    }
136}
137
138impl Service for OutcomeAggregator {
139    type Interface = TrackOutcome;
140
141    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
142        let mut shutdown = Controller::shutdown_handle();
143        relay_log::info!("outcome aggregator started");
144
145        loop {
146            tokio::select! {
147                // Prioritize flush over receiving messages to prevent starving. Shutdown can be
148                // last since it is not vital if there are still messages in the channel.
149                biased;
150
151                () = &mut self.flush_handle => self.flush(),
152                Some(message) = rx.recv() => self.handle_track_outcome(message),
153                shutdown = shutdown.notified() => self.handle_shutdown(shutdown),
154                else => break,
155            }
156        }
157
158        relay_log::info!("outcome aggregator stopped");
159    }
160}