relay_server/services/
cogs.rs1use std::sync::atomic::{AtomicBool, Ordering};
2
3use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId};
4use relay_config::Config;
5use relay_system::{Addr, FromMessage, Interface, Service};
6
7use crate::statsd::RelayCounters;
8
9pub struct CogsReport(CogsMeasurement);
10
11impl Interface for CogsReport {}
12
13impl FromMessage<CogsMeasurement> for CogsReport {
14 type Response = relay_system::NoResponse;
15
16 fn from_message(message: CogsMeasurement, _: ()) -> Self {
17 Self(message)
18 }
19}
20
21pub struct CogsService {
23 relay_resource_id: String,
24}
25
26impl CogsService {
27 pub fn new(config: &Config) -> Self {
28 Self {
29 relay_resource_id: config.cogs_relay_resource_id().to_owned(),
30 }
31 }
32
33 fn handle_report(&mut self, CogsReport(measurement): CogsReport) {
34 relay_log::trace!("recording measurement: {measurement:?}");
35
36 let resource_id = match measurement.resource {
37 ResourceId::Relay => &self.relay_resource_id,
38 };
39
40 let amount = match measurement.value {
41 relay_cogs::Value::Time(duration) => {
42 duration.as_micros().try_into().unwrap_or(i64::MAX)
43 }
44 };
45
46 relay_statsd::metric!(
47 counter(RelayCounters::CogsUsage) += amount,
48 resource_id = resource_id,
49 app_feature = measurement.feature.as_str(),
50 category = measurement.category.unwrap_or("default"),
51 );
52 }
53}
54
55impl Service for CogsService {
56 type Interface = CogsReport;
57
58 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
59 while let Some(message) = rx.recv().await {
60 self.handle_report(message);
61 }
62 }
63}
64
65pub struct CogsServiceRecorder {
69 addr: Addr<CogsReport>,
70 max_size: u64,
71 has_errored: AtomicBool,
72}
73
74impl CogsServiceRecorder {
75 pub fn new(config: &Config, addr: Addr<CogsReport>) -> Self {
77 Self {
78 addr,
79 max_size: config.cogs_max_queue_size(),
80 has_errored: AtomicBool::new(false),
81 }
82 }
83}
84
85impl CogsRecorder for CogsServiceRecorder {
86 fn record(&self, measurement: CogsMeasurement) {
87 if self.addr.len() >= self.max_size {
90 if !self.has_errored.swap(true, Ordering::Relaxed) {
91 relay_log::error!("COGS measurements backlogged");
92 }
93
94 return;
95 }
96
97 self.addr.send(measurement);
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use std::time::Duration;
104
105 use super::*;
106
107 #[test]
108 fn test_cogs_service_recorder_limit() {
109 let addr = Addr::dummy();
110 let config = Config::from_json_value(serde_json::json!({
111 "cogs": {
112 "max_queue_size": 2
113 }
114 }))
115 .unwrap();
116 let recorder = CogsServiceRecorder::new(&config, addr.clone());
117
118 for _ in 0..5 {
119 recorder.record(CogsMeasurement {
120 resource: ResourceId::Relay,
121 feature: relay_cogs::AppFeature::Spans,
122 value: relay_cogs::Value::Time(Duration::from_secs(1)),
123 category: None,
124 });
125 }
126
127 assert_eq!(addr.len(), 2);
128 assert!(recorder.has_errored.load(Ordering::Relaxed));
129 }
130}