relay_metrics/aggregator/
stats.rsuse hashbrown::HashMap;
use relay_base_schema::metrics::MetricNamespace;
use relay_base_schema::project::ProjectKey;
use crate::aggregator::AggregateMetricsError;
use crate::utils::ByNamespace;
#[derive(Default, Debug)]
pub struct Total {
pub count: u64,
pub count_by_namespace: ByNamespace<u64>,
pub cost: u64,
pub cost_by_namespace: ByNamespace<u64>,
}
impl Total {
pub fn remove_slot(&mut self, slot: &Slot) {
let Self {
count,
count_by_namespace,
cost,
cost_by_namespace,
} = self;
*count -= slot.count;
*count_by_namespace -= slot.count_by_namespace;
*cost -= slot.cost;
*cost_by_namespace -= slot.cost_by_namespace;
}
}
#[derive(Default, Debug, PartialEq, Eq)]
pub struct Slot {
pub count: u64,
pub count_by_namespace: ByNamespace<u64>,
pub merges: u64,
pub merges_by_namespace: ByNamespace<u64>,
pub cost: u64,
pub cost_by_namespace: ByNamespace<u64>,
pub cost_by_project: HashMap<ProjectKey, u64>,
}
impl Slot {
pub fn reset(&mut self) {
let Self {
count,
count_by_namespace,
merges,
merges_by_namespace,
cost,
cost_by_namespace,
cost_by_project,
} = self;
*count = 0;
*count_by_namespace = Default::default();
*merges = 0;
*merges_by_namespace = Default::default();
*cost = 0;
*cost_by_namespace = Default::default();
cost_by_project.shrink_to_fit();
cost_by_project.clear();
}
pub fn incr_count(&mut self, total: &mut Total, namespace: MetricNamespace) {
self.count += 1;
*self.count_by_namespace.get_mut(namespace) += 1;
total.count += 1;
*total.count_by_namespace.get_mut(namespace) += 1;
}
pub fn incr_merges(&mut self, namespace: MetricNamespace) {
self.merges += 1;
*self.merges_by_namespace.get_mut(namespace) += 1;
}
pub fn reserve<'a>(
&'a mut self,
total: &'a mut Total,
project_key: ProjectKey,
namespace: MetricNamespace,
cost: u64,
limits: &Limits,
) -> Result<Reservation<'a>, AggregateMetricsError> {
if total.cost + cost > limits.max_total {
return Err(AggregateMetricsError::TotalLimitExceeded);
}
let project = self.cost_by_project.entry(project_key).or_insert(0);
if *project + cost > limits.max_partition_project {
return Err(AggregateMetricsError::ProjectLimitExceeded);
}
Ok(Reservation {
total,
cost_partition: &mut self.cost,
cost_partition_by_namespace: &mut self.cost_by_namespace,
cost_partition_project: project,
namespace,
reserved: cost,
})
}
}
#[derive(Debug, Clone, Copy)]
pub struct Limits {
pub max_total: u64,
pub max_partition_project: u64,
}
#[must_use = "a reservation does nothing if it is not used"]
pub struct Reservation<'a> {
total: &'a mut Total,
cost_partition: &'a mut u64,
cost_partition_by_namespace: &'a mut ByNamespace<u64>,
cost_partition_project: &'a mut u64,
namespace: MetricNamespace,
reserved: u64,
}
impl Reservation<'_> {
pub fn consume(self) {
let reserved = self.reserved;
self.consume_with(reserved);
}
pub fn consume_with(self, cost: u64) {
debug_assert!(cost <= self.reserved, "less reserved than used");
self.total.cost += cost;
*self.total.cost_by_namespace.get_mut(self.namespace) += cost;
*self.cost_partition += cost;
*self.cost_partition_by_namespace.get_mut(self.namespace) += cost;
*self.cost_partition_project += cost;
}
}