1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
use std::{
collections::{HashMap, HashSet},
hash::Hash,
};
use relay_base_schema::project::ProjectKey;
use super::{Item, Location, MetricMeta, StartOfDayUnixTimestamp};
use crate::{statsd::MetricCounters, MetricResourceIdentifier};
/// A metrics meta aggregator.
///
/// Aggregates metric metadata based on their scope (project, mri, timestamp) and
/// only keeps the most relevant entries.
///
/// Currently we track the first N amount of unique metric meta elements we get.
///
/// This should represent the actual adoption rate of different code versions.
///
/// This aggregator is purely in memeory and will lose its state on restart,
/// which may cause multiple different items being emitted after restarts.
/// For this we have de-deuplication in the storage and the volume overall
/// of this happening is small enough to just add it to the storage worst case.
#[derive(Debug)]
pub struct MetaAggregator {
/// All tracked code locations.
locations: hashbrown::HashMap<Scope, HashSet<Location>>,
/// Maximum tracked locations.
max_locations: usize,
}
impl MetaAggregator {
/// Creates a new metrics meta aggregator.
pub fn new(max_locations: usize) -> Self {
Self {
locations: hashbrown::HashMap::new(),
max_locations,
}
}
/// Adds a new meta item to the aggregator.
///
/// Returns a new [`MetricMeta`] element when the element should be stored
/// or sent upstream for storage.
///
/// Returns `None` when the meta item was already seen or is not considered relevant.
pub fn add(&mut self, project_key: ProjectKey, meta: MetricMeta) -> Option<MetricMeta> {
let mut send_upstream = HashMap::new();
let mut total = 0;
for (mri, items) in meta.mapping {
let scope = Scope {
timestamp: meta.timestamp,
project_key,
mri,
};
total += items.len();
if let Some(items) = self.add_scoped(&scope, items) {
send_upstream.insert(scope.mri, items);
}
}
relay_statsd::metric!(counter(MetricCounters::MetaAggregatorItems) += total as i64);
if send_upstream.is_empty() {
return None;
}
relay_statsd::metric!(counter(MetricCounters::MetaAggregatorUpdate) += 1);
Some(MetricMeta {
timestamp: meta.timestamp,
mapping: send_upstream,
})
}
/// Retrieves all currently relevant metric meta for a project.
pub fn get_all_relevant(&self, project_key: ProjectKey) -> impl Iterator<Item = MetricMeta> {
let locations = self
.locations
.iter()
.filter(|(scope, _)| scope.project_key == project_key);
let mut result = HashMap::new();
for (scope, locations) in locations {
result
.entry(scope.timestamp)
.or_insert_with(|| MetricMeta {
timestamp: scope.timestamp,
mapping: HashMap::new(),
})
.mapping
.entry(scope.mri.clone()) // This clone sucks
.or_insert_with(Vec::new)
.extend(locations.iter().cloned().map(Item::Location));
}
result.into_values()
}
/// Remove all contained state related to a project.
pub fn clear(&mut self, project_key: ProjectKey) {
self.locations
.retain(|scope, _| scope.project_key != project_key);
}
fn add_scoped(&mut self, scope: &Scope, items: Vec<Item>) -> Option<Vec<Item>> {
// Entry ref needs hashbrown, we would have to clone the scope without or do a separate lookup.
let locations = self.locations.entry_ref(scope).or_default();
let mut send_upstream = Vec::new();
for item in items {
match item {
Item::Location(location) => {
if locations.len() > self.max_locations {
break;
}
if !locations.contains(&location) {
locations.insert(location.clone());
send_upstream.push(Item::Location(location));
}
}
Item::Unknown => {}
}
}
(!send_upstream.is_empty()).then_some(send_upstream)
}
}
/// The metadata scope.
///
/// We scope metadata by project, mri and day,
/// represented as a unix timestamp at the beginning of the day.
///
/// The technical scope (e.g. redis key) also includes the organization id, but this
/// can be inferred from the project.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Scope {
pub timestamp: StartOfDayUnixTimestamp,
pub project_key: ProjectKey,
pub mri: MetricResourceIdentifier<'static>,
}
impl From<&Scope> for Scope {
fn from(value: &Scope) -> Self {
value.clone()
}
}