relay_server/services/
outcome_aggregator.rs1use std::collections::HashMap;
5use std::time::Duration;
6
7use chrono::Utc;
8use relay_common::time::UnixTimestamp;
9use relay_config::{Config, EmitOutcomes};
10use relay_quotas::{DataCategory, Scoping};
11use relay_statsd::metric;
12use relay_system::{Addr, Controller, Service, Shutdown};
13
14use crate::services::outcome::{Outcome, OutcomeProducer, TrackOutcome};
15use crate::statsd::RelayTimers;
16use crate::utils::SleepHandle;
17
18#[derive(Debug, PartialEq, Eq, Hash)]
20struct BucketKey {
21 offset: u64,
23 pub scoping: Scoping,
25 pub outcome: Outcome,
27 pub category: DataCategory,
29}
30
31pub struct OutcomeAggregator {
35 disabled: bool,
39 bucket_interval: u64,
41 flush_interval: u64,
43 buckets: HashMap<BucketKey, u32>,
45 outcome_producer: Addr<OutcomeProducer>,
47 flush_handle: SleepHandle,
49}
50
51impl OutcomeAggregator {
52 pub fn new(config: &Config, outcome_producer: Addr<OutcomeProducer>) -> Self {
53 let disabled = matches!(config.emit_outcomes(), EmitOutcomes::None);
54
55 Self {
56 disabled,
57 bucket_interval: config.outcome_aggregator().bucket_interval,
58 flush_interval: config.outcome_aggregator().flush_interval,
59 buckets: HashMap::new(),
60 outcome_producer,
61 flush_handle: SleepHandle::idle(),
62 }
63 }
64
65 fn handle_shutdown(&mut self, message: Shutdown) {
66 if message.timeout.is_some() {
67 self.flush();
68 relay_log::info!("outcome aggregator stopped");
69 }
70 }
71
72 fn handle_track_outcome(&mut self, msg: TrackOutcome) {
73 if self.disabled {
74 return;
75 }
76
77 let offset = msg.timestamp.timestamp() as u64 / self.bucket_interval;
78
79 let bucket_key = BucketKey {
80 offset,
81 scoping: msg.scoping,
82 outcome: msg.outcome,
83 category: msg.category,
84 };
85
86 let counter = self.buckets.entry(bucket_key).or_insert(0);
87 *counter += msg.quantity;
88
89 if self.flush_interval == 0 {
90 self.do_flush();
92 } else if self.flush_handle.is_idle() {
93 self.flush_handle
94 .set(Duration::from_secs(self.flush_interval));
95 }
96 }
97
98 fn do_flush(&mut self) {
99 self.flush_handle.reset();
100
101 let bucket_interval = self.bucket_interval;
102 let outcome_producer = self.outcome_producer.clone();
103
104 for (bucket_key, quantity) in self.buckets.drain() {
105 let BucketKey {
106 offset,
107 scoping,
108 outcome,
109 category,
110 } = bucket_key;
111
112 let timestamp = UnixTimestamp::from_secs(offset * bucket_interval)
114 .as_datetime()
115 .unwrap_or_else(Utc::now);
116
117 let outcome = TrackOutcome {
118 timestamp,
119 scoping,
120 outcome,
121 event_id: None,
122 remote_addr: None,
123 category,
124 quantity,
125 };
126
127 outcome_producer.send(outcome);
128 }
129 }
130
131 fn flush(&mut self) {
132 metric!(timer(RelayTimers::OutcomeAggregatorFlushTime), {
133 self.do_flush();
134 });
135 }
136}
137
138impl Service for OutcomeAggregator {
139 type Interface = TrackOutcome;
140
141 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
142 let mut shutdown = Controller::shutdown_handle();
143 relay_log::info!("outcome aggregator started");
144
145 loop {
146 tokio::select! {
147 biased;
150
151 () = &mut self.flush_handle => self.flush(),
152 Some(message) = rx.recv() => self.handle_track_outcome(message),
153 shutdown = shutdown.notified() => self.handle_shutdown(shutdown),
154 else => break,
155 }
156 }
157
158 relay_log::info!("outcome aggregator stopped");
159 }
160}