relay_server/services/metrics/
router.rsuse relay_config::aggregator::Condition;
use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig};
use relay_metrics::MetricNamespace;
use relay_system::{Addr, NoResponse, Recipient, Service, ServiceRunner};
use crate::services::metrics::{
Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets,
};
use crate::services::projects::cache::ProjectCacheHandle;
use crate::statsd::RelayTimers;
use crate::utils;
pub struct RouterService {
default: AggregatorService,
secondary: Vec<(AggregatorService, Condition)>,
}
impl RouterService {
pub fn new(
default_config: AggregatorServiceConfig,
secondary_configs: Vec<ScopedAggregatorConfig>,
receiver: Option<Recipient<FlushBuckets, NoResponse>>,
project_cache: ProjectCacheHandle,
) -> Self {
let mut secondary = Vec::new();
for c in secondary_configs {
let service =
AggregatorService::named(c.name, c.config, receiver.clone(), project_cache.clone());
secondary.push((service, c.condition));
}
let default = AggregatorService::new(default_config, receiver, project_cache);
Self { default, secondary }
}
pub fn handle(&self) -> RouterHandle {
let mut handles = vec![self.default.handle()];
for (aggregator, _) in &self.secondary {
handles.push(aggregator.handle());
}
RouterHandle(handles)
}
}
impl Service for RouterService {
type Interface = Aggregator;
async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
let mut router = StartedRouter::start_in(self, &mut ServiceRunner::new());
relay_log::info!("metrics router started");
loop {
tokio::select! {
biased;
Some(message) = rx.recv() => {
router.handle_message(message)
},
else => break,
}
}
relay_log::info!("metrics router stopped");
}
}
struct StartedRouter {
default: Addr<Aggregator>,
secondary: Vec<(Addr<Aggregator>, Vec<MetricNamespace>)>,
}
impl StartedRouter {
fn start_in(router: RouterService, runner: &mut ServiceRunner) -> Self {
let RouterService { default, secondary } = router;
let secondary = secondary
.into_iter()
.map(|(aggregator, condition)| {
let namespaces: Vec<_> = MetricNamespace::all()
.into_iter()
.filter(|&namespace| condition.matches(Some(namespace)))
.collect();
(runner.start(aggregator), namespaces)
})
.collect();
Self {
default: runner.start(default),
secondary,
}
}
fn handle_message(&mut self, message: Aggregator) {
let ty = message.variant();
relay_statsd::metric!(
timer(RelayTimers::MetricRouterServiceDuration),
message = ty,
{
match message {
Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg),
}
}
)
}
fn handle_merge_buckets(&mut self, message: MergeBuckets) {
let MergeBuckets {
project_key,
mut buckets,
} = message;
for (aggregator, namespaces) in &self.secondary {
let matching;
(buckets, matching) = utils::split_off(buckets, |bucket| {
bucket
.name
.try_namespace()
.map(|namespace| namespaces.contains(&namespace))
.unwrap_or(false)
});
if !matching.is_empty() {
aggregator.send(MergeBuckets::new(project_key, matching));
}
}
if !buckets.is_empty() {
self.default.send(MergeBuckets::new(project_key, buckets));
}
}
}
#[derive(Clone, Debug)]
pub struct RouterHandle(Vec<AggregatorHandle>);
impl RouterHandle {
pub fn can_accept_metrics(&self) -> bool {
self.0.iter().all(|ah| ah.can_accept_metrics())
}
}