relay_server/services/
outcome_aggregator.rsuse std::collections::HashMap;
use std::time::Duration;
use chrono::Utc;
use relay_common::time::UnixTimestamp;
use relay_config::{Config, EmitOutcomes};
use relay_quotas::{DataCategory, Scoping};
use relay_statsd::metric;
use relay_system::{Addr, Controller, Service, Shutdown};
use crate::services::outcome::{Outcome, OutcomeProducer, TrackOutcome};
use crate::statsd::RelayTimers;
use crate::utils::SleepHandle;
#[derive(Debug, PartialEq, Eq, Hash)]
struct BucketKey {
offset: u64,
pub scoping: Scoping,
pub outcome: Outcome,
pub category: DataCategory,
}
pub struct OutcomeAggregator {
disabled: bool,
bucket_interval: u64,
flush_interval: u64,
buckets: HashMap<BucketKey, u32>,
outcome_producer: Addr<OutcomeProducer>,
flush_handle: SleepHandle,
}
impl OutcomeAggregator {
pub fn new(config: &Config, outcome_producer: Addr<OutcomeProducer>) -> Self {
let disabled = matches!(config.emit_outcomes(), EmitOutcomes::None);
Self {
disabled,
bucket_interval: config.outcome_aggregator().bucket_interval,
flush_interval: config.outcome_aggregator().flush_interval,
buckets: HashMap::new(),
outcome_producer,
flush_handle: SleepHandle::idle(),
}
}
fn handle_shutdown(&mut self, message: Shutdown) {
if message.timeout.is_some() {
self.flush();
relay_log::info!("outcome aggregator stopped");
}
}
fn handle_track_outcome(&mut self, msg: TrackOutcome) {
if self.disabled {
return;
}
let offset = msg.timestamp.timestamp() as u64 / self.bucket_interval;
let bucket_key = BucketKey {
offset,
scoping: msg.scoping,
outcome: msg.outcome,
category: msg.category,
};
let counter = self.buckets.entry(bucket_key).or_insert(0);
*counter += msg.quantity;
if self.flush_interval == 0 {
self.do_flush();
} else if self.flush_handle.is_idle() {
self.flush_handle
.set(Duration::from_secs(self.flush_interval));
}
}
fn do_flush(&mut self) {
self.flush_handle.reset();
let bucket_interval = self.bucket_interval;
let outcome_producer = self.outcome_producer.clone();
for (bucket_key, quantity) in self.buckets.drain() {
let BucketKey {
offset,
scoping,
outcome,
category,
} = bucket_key;
let timestamp = UnixTimestamp::from_secs(offset * bucket_interval)
.as_datetime()
.unwrap_or_else(Utc::now);
let outcome = TrackOutcome {
timestamp,
scoping,
outcome,
event_id: None,
remote_addr: None,
category,
quantity,
};
outcome_producer.send(outcome);
}
}
fn flush(&mut self) {
metric!(timer(RelayTimers::OutcomeAggregatorFlushTime), {
self.do_flush();
});
}
}
impl Service for OutcomeAggregator {
type Interface = TrackOutcome;
async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
let mut shutdown = Controller::shutdown_handle();
relay_log::info!("outcome aggregator started");
loop {
tokio::select! {
biased;
() = &mut self.flush_handle => self.flush(),
Some(message) = rx.recv() => self.handle_track_outcome(message),
shutdown = shutdown.notified() => self.handle_shutdown(shutdown),
else => break,
}
}
relay_log::info!("outcome aggregator stopped");
}
}