1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use std::{
    collections::{HashMap, HashSet},
    hash::Hash,
};

use relay_base_schema::project::ProjectKey;

use super::{Item, Location, MetricMeta, StartOfDayUnixTimestamp};
use crate::{statsd::MetricCounters, MetricResourceIdentifier};

/// A metrics meta aggregator.
///
/// Aggregates metric metadata based on their scope (project, mri, timestamp) and
/// only keeps the most relevant entries.
///
/// Currently we track the first N amount of unique metric meta elements we get.
///
/// This should represent the actual adoption rate of different code versions.
///
/// This aggregator is purely in memeory and will lose its state on restart,
/// which may cause multiple different items being emitted after restarts.
/// For this we have de-deuplication in the storage and the volume overall
/// of this happening is small enough to just add it to the storage worst case.
#[derive(Debug)]
pub struct MetaAggregator {
    /// All tracked code locations.
    locations: hashbrown::HashMap<Scope, HashSet<Location>>,

    /// Maximum tracked locations.
    max_locations: usize,
}

impl MetaAggregator {
    /// Creates a new metrics meta aggregator.
    pub fn new(max_locations: usize) -> Self {
        Self {
            locations: hashbrown::HashMap::new(),
            max_locations,
        }
    }

    /// Adds a new meta item to the aggregator.
    ///
    /// Returns a new [`MetricMeta`] element when the element should be stored
    /// or sent upstream for storage.
    ///
    /// Returns `None` when the meta item was already seen or is not considered relevant.
    pub fn add(&mut self, project_key: ProjectKey, meta: MetricMeta) -> Option<MetricMeta> {
        let mut send_upstream = HashMap::new();

        let mut total = 0;
        for (mri, items) in meta.mapping {
            let scope = Scope {
                timestamp: meta.timestamp,
                project_key,
                mri,
            };

            total += items.len();
            if let Some(items) = self.add_scoped(&scope, items) {
                send_upstream.insert(scope.mri, items);
            }
        }

        relay_statsd::metric!(counter(MetricCounters::MetaAggregatorItems) += total as i64);

        if send_upstream.is_empty() {
            return None;
        }

        relay_statsd::metric!(counter(MetricCounters::MetaAggregatorUpdate) += 1);
        Some(MetricMeta {
            timestamp: meta.timestamp,
            mapping: send_upstream,
        })
    }

    /// Retrieves all currently relevant metric meta for a project.
    pub fn get_all_relevant(&self, project_key: ProjectKey) -> impl Iterator<Item = MetricMeta> {
        let locations = self
            .locations
            .iter()
            .filter(|(scope, _)| scope.project_key == project_key);

        let mut result = HashMap::new();

        for (scope, locations) in locations {
            result
                .entry(scope.timestamp)
                .or_insert_with(|| MetricMeta {
                    timestamp: scope.timestamp,
                    mapping: HashMap::new(),
                })
                .mapping
                .entry(scope.mri.clone()) // This clone sucks
                .or_insert_with(Vec::new)
                .extend(locations.iter().cloned().map(Item::Location));
        }

        result.into_values()
    }

    /// Remove all contained state related to a project.
    pub fn clear(&mut self, project_key: ProjectKey) {
        self.locations
            .retain(|scope, _| scope.project_key != project_key);
    }

    fn add_scoped(&mut self, scope: &Scope, items: Vec<Item>) -> Option<Vec<Item>> {
        // Entry ref needs hashbrown, we would have to clone the scope without or do a separate lookup.
        let locations = self.locations.entry_ref(scope).or_default();
        let mut send_upstream = Vec::new();

        for item in items {
            match item {
                Item::Location(location) => {
                    if locations.len() > self.max_locations {
                        break;
                    }

                    if !locations.contains(&location) {
                        locations.insert(location.clone());
                        send_upstream.push(Item::Location(location));
                    }
                }
                Item::Unknown => {}
            }
        }

        (!send_upstream.is_empty()).then_some(send_upstream)
    }
}

/// The metadata scope.
///
/// We scope metadata by project, mri and day,
/// represented as a unix timestamp at the beginning of the day.
///
/// The technical scope (e.g. redis key) also includes the organization id, but this
/// can be inferred from the project.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Scope {
    pub timestamp: StartOfDayUnixTimestamp,
    pub project_key: ProjectKey,
    pub mri: MetricResourceIdentifier<'static>,
}

impl From<&Scope> for Scope {
    fn from(value: &Scope) -> Self {
        value.clone()
    }
}