relay_server/services/
cogs.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2
3use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId};
4use relay_config::Config;
5use relay_system::{Addr, FromMessage, Interface, Service};
6
7use crate::statsd::RelayCounters;
8
9pub struct CogsReport(CogsMeasurement);
10
11impl Interface for CogsReport {}
12
13impl FromMessage<CogsMeasurement> for CogsReport {
14    type Response = relay_system::NoResponse;
15
16    fn from_message(message: CogsMeasurement, _: ()) -> Self {
17        Self(message)
18    }
19}
20
21/// Service implementing the [`CogsReport`] interface.
22pub struct CogsService {
23    relay_resource_id: String,
24}
25
26impl CogsService {
27    pub fn new(config: &Config) -> Self {
28        Self {
29            relay_resource_id: config.cogs_relay_resource_id().to_owned(),
30        }
31    }
32
33    fn handle_report(&mut self, CogsReport(measurement): CogsReport) {
34        relay_log::trace!("recording measurement: {measurement:?}");
35
36        let resource_id = match measurement.resource {
37            ResourceId::Relay => &self.relay_resource_id,
38        };
39
40        let amount = match measurement.value {
41            relay_cogs::Value::Time(duration) => {
42                duration.as_micros().try_into().unwrap_or(i64::MAX)
43            }
44        };
45
46        relay_statsd::metric!(
47            counter(RelayCounters::CogsUsage) += amount,
48            resource_id = resource_id,
49            app_feature = measurement.feature.as_str(),
50            category = measurement.category.unwrap_or("default"),
51        );
52    }
53}
54
55impl Service for CogsService {
56    type Interface = CogsReport;
57
58    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
59        while let Some(message) = rx.recv().await {
60            self.handle_report(message);
61        }
62    }
63}
64
65/// COGS measurement recorder.
66///
67/// The recorder forwards the measurements to the [cogs service](`CogsService`).
68pub struct CogsServiceRecorder {
69    addr: Addr<CogsReport>,
70    max_size: u64,
71    has_errored: AtomicBool,
72}
73
74impl CogsServiceRecorder {
75    /// Creates a new recorder forwarding messages to [`CogsService`].
76    pub fn new(config: &Config, addr: Addr<CogsReport>) -> Self {
77        Self {
78            addr,
79            max_size: config.cogs_max_queue_size(),
80            has_errored: AtomicBool::new(false),
81        }
82    }
83}
84
85impl CogsRecorder for CogsServiceRecorder {
86    fn record(&self, measurement: CogsMeasurement) {
87        // Make sure we don't have an ever growing backlog of COGS measurements,
88        // an error in the service should not have a visible impact in production.
89        if self.addr.len() >= self.max_size {
90            if !self.has_errored.swap(true, Ordering::Relaxed) {
91                relay_log::error!("COGS measurements backlogged");
92            }
93
94            return;
95        }
96
97        self.addr.send(measurement);
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use std::time::Duration;
104
105    use super::*;
106
107    #[test]
108    fn test_cogs_service_recorder_limit() {
109        let addr = Addr::dummy();
110        let config = Config::from_json_value(serde_json::json!({
111            "cogs": {
112                "max_queue_size": 2
113            }
114        }))
115        .unwrap();
116        let recorder = CogsServiceRecorder::new(&config, addr.clone());
117
118        for _ in 0..5 {
119            recorder.record(CogsMeasurement {
120                resource: ResourceId::Relay,
121                feature: relay_cogs::AppFeature::Spans,
122                value: relay_cogs::Value::Time(Duration::from_secs(1)),
123                category: None,
124            });
125        }
126
127        assert_eq!(addr.len(), 2);
128        assert!(recorder.has_errored.load(Ordering::Relaxed));
129    }
130}