use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::hash::Hasher;
use std::time::Duration;
use std::{fmt, mem};
use fnv::FnvHasher;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::time::Instant;
use crate::bucket::{Bucket, BucketValue};
use crate::protocol::{self, MetricNamespace, MetricResourceIdentifier};
use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers};
use crate::{BucketMetadata, MetricName};
#[derive(Debug, Error, PartialEq)]
#[error("failed to aggregate metrics: {kind}")]
pub struct AggregateMetricsError {
kind: AggregateMetricsErrorKind,
}
impl From<AggregateMetricsErrorKind> for AggregateMetricsError {
fn from(kind: AggregateMetricsErrorKind) -> Self {
AggregateMetricsError { kind }
}
}
#[derive(Debug, Error, PartialEq)]
#[allow(clippy::enum_variant_names)]
enum AggregateMetricsErrorKind {
#[error("found invalid characters: {0}")]
InvalidCharacters(MetricName),
#[error("found unsupported namespace: {0}")]
UnsupportedNamespace(MetricNamespace),
#[error("found invalid timestamp: {0}")]
InvalidTimestamp(UnixTimestamp),
#[error("found incompatible metric types")]
InvalidTypes,
#[error("found invalid string: {0}")]
InvalidStringLength(MetricName),
#[error("total metrics limit exceeded")]
TotalLimitExceeded,
#[error("project metrics limit exceeded")]
ProjectLimitExceeded,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct BucketKey {
project_key: ProjectKey,
timestamp: UnixTimestamp,
metric_name: MetricName,
tags: BTreeMap<String, String>,
}
impl BucketKey {
fn hash64(&self) -> u64 {
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
}
fn cost(&self) -> usize {
mem::size_of::<Self>() + self.metric_name.len() + tags_cost(&self.tags)
}
fn namespace(&self) -> MetricNamespace {
self.metric_name.namespace()
}
}
pub fn tags_cost(tags: &BTreeMap<String, String>) -> usize {
tags.iter().map(|(k, v)| k.len() + v.len()).sum()
}
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ShiftKey {
#[default]
Project,
Bucket,
None,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
pub struct AggregatorConfig {
pub bucket_interval: u64,
pub initial_delay: u64,
pub debounce_delay: u64,
pub max_secs_in_past: u64,
pub max_secs_in_future: u64,
pub max_name_length: usize,
pub max_tag_key_length: usize,
pub max_tag_value_length: usize,
pub max_project_key_bucket_bytes: Option<usize>,
pub shift_key: ShiftKey,
}
impl AggregatorConfig {
fn get_flush_time(&self, bucket_key: &BucketKey) -> Instant {
let initial_flush = bucket_key.timestamp + self.bucket_interval() + self.initial_delay();
let now = UnixTimestamp::now();
let backdated = initial_flush <= now;
let delay = now.as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
relay_statsd::metric!(
histogram(MetricHistograms::BucketsDelay) = delay as f64,
backdated = if backdated { "true" } else { "false" },
);
let flush_timestamp = if backdated {
let floor = (now.as_secs() / self.bucket_interval) * self.bucket_interval;
UnixTimestamp::from_secs(floor) + self.bucket_interval() + self.debounce_delay()
} else {
initial_flush
};
let instant = if flush_timestamp > now {
Instant::now().checked_add(flush_timestamp - now)
} else {
Instant::now().checked_sub(now - flush_timestamp)
};
instant.unwrap_or_else(Instant::now) + self.flush_time_shift(bucket_key)
}
fn debounce_delay(&self) -> Duration {
Duration::from_secs(self.debounce_delay)
}
fn bucket_interval(&self) -> Duration {
Duration::from_secs(self.bucket_interval)
}
fn initial_delay(&self) -> Duration {
Duration::from_secs(self.initial_delay)
}
fn flush_time_shift(&self, bucket: &BucketKey) -> Duration {
let hash_value = match self.shift_key {
ShiftKey::Project => {
let mut hasher = FnvHasher::default();
hasher.write(bucket.project_key.as_str().as_bytes());
hasher.finish()
}
ShiftKey::Bucket => bucket.hash64(),
ShiftKey::None => return Duration::ZERO,
};
let shift_millis = hash_value % (self.bucket_interval * 1000);
Duration::from_millis(shift_millis)
}
fn get_bucket_timestamp(&self, timestamp: UnixTimestamp, bucket_width: u64) -> UnixTimestamp {
let ts = timestamp.as_secs().saturating_add(bucket_width / 2);
let ts = (ts / self.bucket_interval) * self.bucket_interval;
UnixTimestamp::from_secs(ts)
}
pub fn timestamp_range(&self) -> std::ops::Range<UnixTimestamp> {
let now = UnixTimestamp::now().as_secs();
let min_timestamp = UnixTimestamp::from_secs(now.saturating_sub(self.max_secs_in_past));
let max_timestamp = UnixTimestamp::from_secs(now.saturating_add(self.max_secs_in_future));
min_timestamp..max_timestamp
}
}
impl Default for AggregatorConfig {
fn default() -> Self {
Self {
bucket_interval: 10,
initial_delay: 30,
debounce_delay: 10,
max_secs_in_past: 5 * 24 * 60 * 60, max_secs_in_future: 60, max_name_length: 200,
max_tag_key_length: 200,
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
shift_key: ShiftKey::default(),
}
}
}
#[derive(Debug)]
struct QueuedBucket {
flush_at: Instant,
value: BucketValue,
metadata: BucketMetadata,
}
impl QueuedBucket {
fn new(flush_at: Instant, value: BucketValue, metadata: BucketMetadata) -> Self {
Self {
flush_at,
value,
metadata,
}
}
fn elapsed(&self) -> bool {
Instant::now() > self.flush_at
}
fn merge(
&mut self,
value: BucketValue,
metadata: BucketMetadata,
) -> Result<usize, AggregateMetricsErrorKind> {
let cost_before = self.value.cost();
self.value
.merge(value)
.map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?;
self.metadata.merge(metadata);
Ok(self.value.cost().saturating_sub(cost_before))
}
}
impl PartialEq for QueuedBucket {
fn eq(&self, other: &Self) -> bool {
self.flush_at.eq(&other.flush_at)
}
}
impl Eq for QueuedBucket {}
impl PartialOrd for QueuedBucket {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(other.flush_at.cmp(&self.flush_at))
}
}
impl Ord for QueuedBucket {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.flush_at.cmp(&self.flush_at)
}
}
#[derive(Default)]
struct CostTracker {
total_cost: usize,
cost_per_project_key: HashMap<ProjectKey, usize>,
}
impl CostTracker {
fn totals_cost_exceeded(&self, max_total_cost: Option<usize>) -> bool {
if let Some(max_total_cost) = max_total_cost {
if self.total_cost >= max_total_cost {
return true;
}
}
false
}
fn check_limits_exceeded(
&self,
project_key: ProjectKey,
max_total_cost: Option<usize>,
max_project_cost: Option<usize>,
) -> Result<(), AggregateMetricsError> {
if self.totals_cost_exceeded(max_total_cost) {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::TotalLimitExceeded.into());
}
if let Some(max_project_cost) = max_project_cost {
let project_cost = self
.cost_per_project_key
.get(&project_key)
.cloned()
.unwrap_or(0);
if project_cost >= max_project_cost {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::ProjectLimitExceeded.into());
}
}
Ok(())
}
fn add_cost(&mut self, project_key: ProjectKey, cost: usize) {
self.total_cost += cost;
let project_cost = self.cost_per_project_key.entry(project_key).or_insert(0);
*project_cost += cost;
}
fn subtract_cost(&mut self, project_key: ProjectKey, cost: usize) {
match self.cost_per_project_key.entry(project_key) {
Entry::Vacant(_) => {
relay_log::error!("cost subtracted for an untracked project key");
}
Entry::Occupied(mut entry) => {
let project_cost = entry.get_mut();
if cost > *project_cost {
relay_log::error!("underflow while subtracing project cost");
self.total_cost = self.total_cost.saturating_sub(*project_cost);
*project_cost = 0;
} else {
*project_cost -= cost;
self.total_cost = self.total_cost.saturating_sub(cost);
}
if *project_cost == 0 {
entry.remove();
}
}
};
}
}
impl fmt::Debug for CostTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CostTracker")
.field("total_cost", &self.total_cost)
.field(
"cost_per_project_key",
&BTreeMap::from_iter(self.cost_per_project_key.iter()),
)
.finish()
}
}
pub struct Aggregator {
name: String,
config: AggregatorConfig,
buckets: HashMap<BucketKey, QueuedBucket>,
cost_tracker: CostTracker,
}
impl Aggregator {
pub fn new(config: AggregatorConfig) -> Self {
Self::named("default".to_owned(), config)
}
pub fn named(name: String, config: AggregatorConfig) -> Self {
Self {
name,
config,
buckets: HashMap::new(),
cost_tracker: CostTracker::default(),
}
}
pub fn name(&self) -> &str {
self.name.as_str()
}
pub fn bucket_count(&self) -> usize {
self.buckets.len()
}
pub fn totals_cost_exceeded(&self, max_total_cost: Option<usize>) -> bool {
self.cost_tracker.totals_cost_exceeded(max_total_cost)
}
pub fn into_buckets(self) -> Vec<Bucket> {
relay_statsd::metric!(
gauge(MetricGauges::Buckets) = self.bucket_count() as u64,
aggregator = &self.name,
);
let bucket_interval = self.config.bucket_interval;
self.buckets
.into_iter()
.map(|(key, entry)| Bucket {
timestamp: key.timestamp,
width: bucket_interval,
name: key.metric_name,
value: entry.value,
tags: key.tags,
metadata: entry.metadata,
})
.collect()
}
pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap<ProjectKey, Vec<Bucket>> {
relay_statsd::metric!(
gauge(MetricGauges::Buckets) = self.bucket_count() as u64,
aggregator = &self.name,
);
relay_statsd::metric!(
gauge(MetricGauges::BucketsCost) = self.cost_tracker.total_cost as u64,
aggregator = &self.name,
);
let mut buckets = HashMap::new();
let mut stats = HashMap::new();
relay_statsd::metric!(
timer(MetricTimers::BucketsScanDuration),
aggregator = &self.name,
{
let bucket_interval = self.config.bucket_interval;
let cost_tracker = &mut self.cost_tracker;
self.buckets.retain(|key, entry| {
if force || entry.elapsed() {
let value = mem::replace(&mut entry.value, BucketValue::counter(0.into()));
let metadata = mem::take(&mut entry.metadata);
cost_tracker.subtract_cost(key.project_key, key.cost());
cost_tracker.subtract_cost(key.project_key, value.cost());
let (bucket_count, item_count) = stats
.entry((value.ty(), key.namespace()))
.or_insert((0usize, 0usize));
*bucket_count += 1;
*item_count += value.len();
let bucket = Bucket {
timestamp: key.timestamp,
width: bucket_interval,
name: key.metric_name.clone(),
value,
tags: key.tags.clone(),
metadata,
};
buckets
.entry(key.project_key)
.or_insert_with(Vec::new)
.push(bucket);
false
} else {
true
}
});
}
);
for ((ty, namespace), (bucket_count, item_count)) in stats.into_iter() {
relay_statsd::metric!(
gauge(MetricGauges::AvgBucketSize) = item_count as f64 / bucket_count as f64,
metric_type = ty.as_str(),
namespace = namespace.as_str(),
aggregator = self.name(),
);
}
buckets
}
fn get_bucket_timestamp(
&self,
timestamp: UnixTimestamp,
bucket_width: u64,
) -> Result<UnixTimestamp, AggregateMetricsError> {
let bucket_ts = self.config.get_bucket_timestamp(timestamp, bucket_width);
if !self.config.timestamp_range().contains(&bucket_ts) {
let delta = (bucket_ts.as_secs() as i64) - (UnixTimestamp::now().as_secs() as i64);
relay_statsd::metric!(
histogram(MetricHistograms::InvalidBucketTimestamp) = delta as f64,
aggregator = &self.name,
);
return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into());
}
Ok(bucket_ts)
}
pub fn merge(
&mut self,
project_key: ProjectKey,
bucket: Bucket,
max_total_bucket_bytes: Option<usize>,
) -> Result<(), AggregateMetricsError> {
let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?;
let key = BucketKey {
project_key,
timestamp,
metric_name: bucket.name,
tags: bucket.tags,
};
let key = validate_bucket_key(key, &self.config)?;
self.cost_tracker.check_limits_exceeded(
project_key,
max_total_bucket_bytes,
self.config.max_project_key_bucket_bytes,
)?;
let added_cost;
match self.buckets.entry(key) {
Entry::Occupied(mut entry) => {
relay_statsd::metric!(
counter(MetricCounters::MergeHit) += 1,
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
added_cost = entry.get_mut().merge(bucket.value, bucket.metadata)?;
}
Entry::Vacant(entry) => {
relay_statsd::metric!(
counter(MetricCounters::MergeMiss) += 1,
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
relay_statsd::metric!(
set(MetricSets::UniqueBucketsCreated) = entry.key().hash64() as i64, aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
let flush_at = self.config.get_flush_time(entry.key());
let value = bucket.value;
added_cost = entry.key().cost() + value.cost();
entry.insert(QueuedBucket::new(flush_at, value, bucket.metadata));
}
}
self.cost_tracker.add_cost(project_key, added_cost);
Ok(())
}
pub fn merge_all(
&mut self,
project_key: ProjectKey,
buckets: impl IntoIterator<Item = Bucket>,
max_total_bucket_bytes: Option<usize>,
) {
for bucket in buckets.into_iter() {
if let Err(error) = self.merge(project_key, bucket, max_total_bucket_bytes) {
match &error.kind {
AggregateMetricsErrorKind::InvalidTimestamp(_) => {}
_other => {
relay_log::error!(
tags.aggregator = self.name,
bucket.error = &error as &dyn Error
);
}
}
}
}
}
}
impl fmt::Debug for Aggregator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(std::any::type_name::<Self>())
.field("config", &self.config)
.field("buckets", &self.buckets)
.field("receiver", &format_args!("Recipient<FlushBuckets>"))
.finish()
}
}
fn validate_bucket_key(
mut key: BucketKey,
aggregator_config: &AggregatorConfig,
) -> Result<BucketKey, AggregateMetricsError> {
key = validate_metric_name(key, aggregator_config)?;
key = validate_metric_tags(key, aggregator_config);
Ok(key)
}
fn validate_metric_name(
mut key: BucketKey,
aggregator_config: &AggregatorConfig,
) -> Result<BucketKey, AggregateMetricsError> {
let metric_name_length = key.metric_name.len();
if metric_name_length > aggregator_config.max_name_length {
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra(
"bucket.metric_name.length",
metric_name_length.to_string().into(),
);
scope.set_extra(
"aggregator_config.max_name_length",
aggregator_config.max_name_length.to_string().into(),
);
});
return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into());
}
if let Err(err) = normalize_metric_name(&mut key) {
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra("bucket.metric_name", key.metric_name.to_string().into());
});
return Err(err);
}
Ok(key)
}
fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsError> {
key.metric_name = match MetricResourceIdentifier::parse(&key.metric_name) {
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", &key.metric_name);
return Err(AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into());
}
mri.to_string().into()
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", &key.metric_name);
return Err(
AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(),
);
}
};
Ok(())
}
fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig) -> BucketKey {
let proj_key = key.project_key.as_str();
key.tags.retain(|tag_key, tag_value| {
if tag_key.len() > aggregator_config.max_tag_key_length {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", proj_key.to_owned().into());
scope.set_extra("bucket.metric.tag_key", tag_key.to_owned().into());
scope.set_extra(
"aggregator_config.max_tag_key_length",
aggregator_config.max_tag_key_length.to_string().into(),
);
});
relay_log::debug!("Invalid metric tag key");
return false;
}
if bytecount::num_chars(tag_value.as_bytes()) > aggregator_config.max_tag_value_length {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", proj_key.to_owned().into());
scope.set_extra("bucket.metric.tag_value", tag_value.to_owned().into());
scope.set_extra(
"aggregator_config.max_tag_value_length",
aggregator_config.max_tag_value_length.to_string().into(),
);
});
relay_log::debug!("Invalid metric tag value");
return false;
}
if protocol::is_valid_tag_key(tag_key) {
true
} else {
relay_log::debug!("invalid metric tag key {tag_key:?}");
false
}
});
for (_, tag_value) in key.tags.iter_mut() {
protocol::validate_tag_value(tag_value);
}
key
}
#[cfg(test)]
mod tests {
use similar_asserts::assert_eq;
use super::*;
use crate::{dist, GaugeValue};
fn test_config() -> AggregatorConfig {
AggregatorConfig {
bucket_interval: 1,
initial_delay: 0,
debounce_delay: 0,
max_secs_in_past: 50 * 365 * 24 * 60 * 60,
max_secs_in_future: 50 * 365 * 24 * 60 * 60,
max_name_length: 200,
max_tag_key_length: 200,
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
shift_key: ShiftKey::default(),
}
}
fn some_bucket(timestamp: Option<UnixTimestamp>) -> Bucket {
let timestamp = timestamp.map_or(UnixTimestamp::from_secs(999994711), |t| t);
Bucket {
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(timestamp),
}
}
#[test]
fn test_aggregator_merge_counters() {
relay_test::setup();
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator: Aggregator = Aggregator::new(test_config());
let bucket1 = some_bucket(None);
let mut bucket2 = bucket1.clone();
bucket2.value = BucketValue::counter(43.into());
aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
let buckets: Vec<_> = aggregator
.buckets
.iter()
.map(|(k, e)| (k, &e.value)) .collect();
insta::assert_debug_snapshot!(buckets, @r###"
[
(
BucketKey {
project_key: ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"),
timestamp: UnixTimestamp(999994711),
metric_name: MetricName(
"c:transactions/foo@none",
),
tags: {},
},
Counter(
85.0,
),
),
]
"###);
}
#[test]
fn test_bucket_value_cost() {
let expected_bucket_value_size = 48;
let expected_set_entry_size = 4;
let counter = BucketValue::Counter(123.into());
assert_eq!(counter.cost(), expected_bucket_value_size);
let set = BucketValue::Set([1, 2, 3, 4, 5].into());
assert_eq!(
set.cost(),
expected_bucket_value_size + 5 * expected_set_entry_size
);
let distribution = BucketValue::Distribution(dist![1, 2, 3]);
assert_eq!(distribution.cost(), expected_bucket_value_size + 3 * 8);
let gauge = BucketValue::Gauge(GaugeValue {
last: 43.into(),
min: 42.into(),
max: 43.into(),
sum: 85.into(),
count: 2,
});
assert_eq!(gauge.cost(), expected_bucket_value_size);
}
#[test]
fn test_bucket_key_cost() {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let metric_name = "12345".into();
let bucket_key = BucketKey {
timestamp: UnixTimestamp::now(),
project_key,
metric_name,
tags: BTreeMap::from([
("hello".to_owned(), "world".to_owned()),
("answer".to_owned(), "42".to_owned()),
]),
};
assert_eq!(
bucket_key.cost(),
80 + 5 + (5 + 5 + 6 + 2) );
}
#[test]
fn test_aggregator_merge_timestamps() {
relay_test::setup();
let mut config = test_config();
config.bucket_interval = 10;
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator: Aggregator = Aggregator::new(config);
let bucket1 = some_bucket(None);
let mut bucket2 = bucket1.clone();
bucket2.timestamp = UnixTimestamp::from_secs(999994712);
let mut bucket3 = bucket1.clone();
bucket3.timestamp = UnixTimestamp::from_secs(999994721);
aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
aggregator.merge(project_key, bucket3, None).unwrap();
let mut buckets: Vec<_> = aggregator
.buckets
.iter()
.map(|(k, e)| (k, &e.value)) .collect();
buckets.sort_by(|a, b| a.0.timestamp.cmp(&b.0.timestamp));
insta::assert_debug_snapshot!(buckets, @r###"
[
(
BucketKey {
project_key: ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"),
timestamp: UnixTimestamp(999994710),
metric_name: MetricName(
"c:transactions/foo@none",
),
tags: {},
},
Counter(
84.0,
),
),
(
BucketKey {
project_key: ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"),
timestamp: UnixTimestamp(999994720),
metric_name: MetricName(
"c:transactions/foo@none",
),
tags: {},
},
Counter(
42.0,
),
),
]
"###);
}
#[test]
fn test_aggregator_mixed_projects() {
relay_test::setup();
let mut config = test_config();
config.bucket_interval = 10;
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator: Aggregator = Aggregator::new(config);
aggregator
.merge(project_key1, some_bucket(None), None)
.unwrap();
aggregator
.merge(project_key2, some_bucket(None), None)
.unwrap();
assert_eq!(aggregator.buckets.len(), 2);
}
#[test]
fn test_cost_tracker() {
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
let mut cost_tracker = CostTracker::default();
insta::assert_debug_snapshot!(cost_tracker, @r#"
CostTracker {
total_cost: 0,
cost_per_project_key: {},
}
"#);
cost_tracker.add_cost(project_key1, 100);
insta::assert_debug_snapshot!(cost_tracker, @r#"
CostTracker {
total_cost: 100,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
},
}
"#);
cost_tracker.add_cost(project_key2, 200);
insta::assert_debug_snapshot!(cost_tracker, @r#"
CostTracker {
total_cost: 300,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"#);
cost_tracker.subtract_cost(project_key3, 666);
insta::assert_debug_snapshot!(cost_tracker, @r#"
CostTracker {
total_cost: 300,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"#);
cost_tracker.subtract_cost(project_key1, 666);
insta::assert_debug_snapshot!(cost_tracker, @r#"
CostTracker {
total_cost: 200,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"#);
cost_tracker.subtract_cost(project_key2, 20);
insta::assert_debug_snapshot!(cost_tracker, @r#"
CostTracker {
total_cost: 180,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 180,
},
}
"#);
cost_tracker.subtract_cost(project_key2, 180);
insta::assert_debug_snapshot!(cost_tracker, @r#"
CostTracker {
total_cost: 0,
cost_per_project_key: {},
}
"#);
}
#[test]
fn test_aggregator_cost_tracking() {
let mut aggregator: Aggregator = Aggregator::new(test_config());
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp,
width: 0,
name: "c:transactions/foo@none".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(timestamp),
};
let bucket_key = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/foo@none".into(),
tags: BTreeMap::new(),
};
let fixed_cost = bucket_key.cost() + mem::size_of::<BucketValue>();
for (metric_name, metric_value, expected_added_cost) in [
(
"c:transactions/foo@none",
BucketValue::counter(42.into()),
fixed_cost,
),
(
"c:transactions/foo@none",
BucketValue::counter(42.into()),
0,
), (
"s:transactions/foo@none",
BucketValue::set(123),
fixed_cost + 4,
), ("s:transactions/foo@none", BucketValue::set(123), 0), ("s:transactions/foo@none", BucketValue::set(456), 4), (
"d:transactions/foo@none",
BucketValue::distribution(1.into()),
fixed_cost + 8,
), (
"d:transactions/foo@none",
BucketValue::distribution(1.into()),
8,
), (
"d:transactions/foo@none",
BucketValue::distribution(2.into()),
8,
), (
"g:transactions/foo@none",
BucketValue::gauge(3.into()),
fixed_cost,
), ("g:transactions/foo@none", BucketValue::gauge(2.into()), 0), ] {
let mut bucket = bucket.clone();
bucket.value = metric_value;
bucket.name = metric_name.into();
let current_cost = aggregator.cost_tracker.total_cost;
aggregator.merge(project_key, bucket, None).unwrap();
let total_cost = aggregator.cost_tracker.total_cost;
assert_eq!(total_cost, current_cost + expected_added_cost);
}
aggregator.pop_flush_buckets(true);
assert_eq!(aggregator.cost_tracker.total_cost, 0);
}
#[test]
fn test_get_bucket_timestamp_overflow() {
let config = AggregatorConfig {
bucket_interval: 10,
initial_delay: 0,
debounce_delay: 0,
..Default::default()
};
let aggregator: Aggregator = Aggregator::new(config);
assert!(matches!(
aggregator
.get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2)
.unwrap_err()
.kind,
AggregateMetricsErrorKind::InvalidTimestamp(_)
));
}
#[test]
fn test_get_bucket_timestamp_zero() {
let config = AggregatorConfig {
bucket_interval: 10,
initial_delay: 0,
debounce_delay: 0,
..Default::default()
};
let now = UnixTimestamp::now().as_secs();
let rounded_now = UnixTimestamp::from_secs(now / 10 * 10);
assert_eq!(
config.get_bucket_timestamp(UnixTimestamp::from_secs(now), 0),
rounded_now
);
}
#[test]
fn test_get_bucket_timestamp_multiple() {
let config = AggregatorConfig {
bucket_interval: 10,
initial_delay: 0,
debounce_delay: 0,
..Default::default()
};
let rounded_now = UnixTimestamp::now().as_secs() / 10 * 10;
let now = rounded_now + 3;
assert_eq!(
config
.get_bucket_timestamp(UnixTimestamp::from_secs(now), 20)
.as_secs(),
rounded_now + 10
);
}
#[test]
fn test_get_bucket_timestamp_non_multiple() {
let config = AggregatorConfig {
bucket_interval: 10,
initial_delay: 0,
debounce_delay: 0,
..Default::default()
};
let rounded_now = UnixTimestamp::now().as_secs() / 10 * 10;
let now = rounded_now + 3;
assert_eq!(
config
.get_bucket_timestamp(UnixTimestamp::from_secs(now), 23)
.as_secs(),
rounded_now + 10
);
}
#[test]
fn test_validate_bucket_key_chars() {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let bucket_key = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/hergus.bergus".into(),
tags: {
let mut tags = BTreeMap::new();
tags.insert(
"is_it_garbage".to_owned(),
"a\0b\0s\0o\0l\0u\0t\0e\0l\0y".to_owned(),
);
tags.insert("another\0garbage".to_owned(), "bye".to_owned());
tags
},
};
let mut bucket_key = validate_bucket_key(bucket_key, &test_config()).unwrap();
assert_eq!(bucket_key.tags.len(), 1);
assert_eq!(
bucket_key.tags.get("is_it_garbage"),
Some(&"absolutely".to_owned())
);
assert_eq!(bucket_key.tags.get("another\0garbage"), None);
bucket_key.metric_name = "hergus\0bergus".into();
validate_bucket_key(bucket_key, &test_config()).unwrap_err();
}
#[test]
fn test_validate_bucket_key_str_lens() {
relay_test::setup();
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let short_metric = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/a_short_metric".into(),
tags: BTreeMap::new(),
};
assert!(validate_bucket_key(short_metric, &test_config()).is_ok());
let long_metric = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/long_name_a_very_long_name_its_super_long_really_but_like_super_long_probably_the_longest_name_youve_seen_and_even_the_longest_name_ever_its_extremly_long_i_cant_tell_how_long_it_is_because_i_dont_have_that_many_fingers_thus_i_cant_count_the_many_characters_this_long_name_is".into(),
tags: BTreeMap::new(),
};
let validation = validate_bucket_key(long_metric, &test_config());
assert!(matches!(
validation.unwrap_err(),
AggregateMetricsError {
kind: AggregateMetricsErrorKind::InvalidStringLength(_)
}
));
let short_metric_long_tag_key = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/a_short_metric_with_long_tag_key".into(),
tags: BTreeMap::from([("i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into(), "tag_value".into())]),
};
let validation = validate_bucket_key(short_metric_long_tag_key, &test_config()).unwrap();
assert_eq!(validation.tags.len(), 0);
let short_metric_long_tag_value = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/a_short_metric_with_long_tag_value".into(),
tags: BTreeMap::from([("tag_key".into(), "i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into())]),
};
let validation = validate_bucket_key(short_metric_long_tag_value, &test_config()).unwrap();
assert_eq!(validation.tags.len(), 0);
}
#[test]
fn test_validate_tag_values_special_chars() {
relay_test::setup();
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let tag_value = "x".repeat(199) + "ø";
assert_eq!(tag_value.chars().count(), 200); let short_metric = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/a_short_metric".into(),
tags: BTreeMap::from([("foo".into(), tag_value.clone())]),
};
let validated_bucket = validate_metric_tags(short_metric, &test_config());
assert_eq!(validated_bucket.tags["foo"], tag_value);
}
#[test]
fn test_aggregator_cost_enforcement_total() {
let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(timestamp),
};
let mut aggregator: Aggregator = Aggregator::new(test_config());
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
aggregator
.merge(project_key, bucket.clone(), Some(1))
.unwrap();
assert_eq!(
aggregator
.merge(project_key, bucket, Some(1))
.unwrap_err()
.kind,
AggregateMetricsErrorKind::TotalLimitExceeded
);
}
#[test]
fn test_aggregator_cost_enforcement_project() {
relay_test::setup();
let mut config = test_config();
config.max_project_key_bucket_bytes = Some(1);
let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(timestamp),
};
let mut aggregator: Aggregator = Aggregator::new(config);
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
aggregator.merge(project_key, bucket.clone(), None).unwrap();
assert_eq!(
aggregator
.merge(project_key, bucket, None)
.unwrap_err()
.kind,
AggregateMetricsErrorKind::ProjectLimitExceeded
);
}
#[test]
fn test_parse_shift_key() {
let json = r#"{"shift_key": "bucket"}"#;
let parsed: AggregatorConfig = serde_json::from_str(json).unwrap();
assert!(matches!(parsed.shift_key, ShiftKey::Bucket));
}
#[test]
fn test_aggregator_merge_metadata() {
let mut config = test_config();
config.bucket_interval = 10;
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator: Aggregator = Aggregator::new(config);
let bucket1 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));
let bucket2 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));
let mut bucket3 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));
bucket3
.metadata
.merge(BucketMetadata::new(UnixTimestamp::from_secs(999997811)));
bucket3
.metadata
.merge(BucketMetadata::new(UnixTimestamp::from_secs(999999811)));
aggregator
.merge(project_key, bucket1.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket2.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket3.clone(), None)
.unwrap();
let buckets_metadata: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect();
insta::assert_debug_snapshot!(buckets_metadata, @r###"
[
BucketMetadata {
merges: 5,
received_at: Some(
UnixTimestamp(999994711),
),
},
]
"###);
}
}