relay_server/endpoints/
autoscaling.rs

1use crate::http::StatusCode;
2use crate::service::ServiceState;
3use crate::services::autoscaling::{AutoscalingData, AutoscalingMessageKind};
4use std::fmt::Display;
5use std::fmt::Write;
6
7/// Returns internal metrics data for relay.
8pub async fn handle(state: ServiceState) -> (StatusCode, String) {
9    let data = match state
10        .autoscaling()
11        .send(AutoscalingMessageKind::Check)
12        .await
13    {
14        Ok(data) => data,
15        Err(_) => {
16            return (
17                StatusCode::INTERNAL_SERVER_ERROR,
18                "Failed to collect internal metrics".to_owned(),
19            );
20        }
21    };
22
23    (StatusCode::OK, to_prometheus_string(&data))
24}
25
26/// Serializes the autoscaling data into a prometheus string.
27fn to_prometheus_string(data: &AutoscalingData) -> String {
28    let mut result = String::with_capacity(2048);
29
30    append_data_row(&mut result, "memory_usage", data.memory_usage, &[]);
31    append_data_row(&mut result, "up", data.up, &[]);
32    append_data_row(&mut result, "spool_item_count", data.item_count, &[]);
33    append_data_row(&mut result, "spool_total_size", data.total_size, &[]);
34    for utilization in &data.services_metrics {
35        let service_name = extract_service_name(utilization.name);
36        append_data_row(
37            &mut result,
38            "service_utilization",
39            utilization.utilization,
40            &[
41                ("relay_service", service_name),
42                ("instance_id", &format!("{}", utilization.instance_id)),
43            ],
44        );
45    }
46
47    append_data_row(
48        &mut result,
49        "worker_pool_utilization",
50        data.worker_pool_utilization,
51        &[],
52    );
53    append_data_row(
54        &mut result,
55        "runtime_utilization",
56        data.runtime_utilization,
57        &[],
58    );
59    result
60}
61
62fn append_data_row(result: &mut String, label: &str, data: impl Display, tags: &[(&str, &str)]) {
63    // Metrics are automatically prefixed with "relay_"
64    write!(result, "relay_{label}").unwrap();
65    if !tags.is_empty() {
66        result.push('{');
67        for (idx, (key, value)) in tags.iter().enumerate() {
68            if idx > 0 {
69                result.push_str(", ");
70            }
71            write!(result, "{key}=\"{value}\"").unwrap();
72        }
73        result.push('}');
74    }
75    writeln!(result, " {data}").unwrap();
76}
77
78/// Extracts the concrete Service name from a string with a namespace,
79/// In case there are no ':' because a custom name is used, then the full name is returned.
80/// For example:
81/// * `relay::services::MyService` -> `MyService`.
82/// * `aggregator_service` -> `aggregator_service`.
83fn extract_service_name(full_name: &str) -> &str {
84    full_name
85        .rsplit_once(':')
86        .map(|(_, s)| s)
87        .unwrap_or(full_name)
88}
89
90#[cfg(test)]
91mod test {
92    use crate::endpoints::autoscaling::{append_data_row, extract_service_name};
93    use crate::services::autoscaling::{AutoscalingData, ServiceUtilization};
94
95    #[test]
96    fn test_extract_service_with_namespace() {
97        let service_name = extract_service_name("relay::services::MyService");
98        assert_eq!(service_name, "MyService");
99    }
100
101    #[test]
102    fn test_extract_service_without_namespace() {
103        let service_name = extract_service_name("custom_service");
104        assert_eq!(service_name, "custom_service");
105    }
106
107    #[test]
108    fn test_append_no_labels() {
109        let mut result = String::new();
110        append_data_row(&mut result, "example", 200, &[]);
111        assert_eq!(result, "relay_example 200\n");
112    }
113
114    #[test]
115    fn test_append_single_label() {
116        let mut result = String::new();
117        append_data_row(&mut result, "example", 200, &[("key", "value")]);
118        assert_eq!(result, "relay_example{key=\"value\"} 200\n");
119    }
120
121    #[test]
122    fn test_append_multiple_labels() {
123        let mut result = String::new();
124        append_data_row(
125            &mut result,
126            "example",
127            200,
128            &[("first_key", "first_value"), ("second_key", "second_value")],
129        );
130        assert_eq!(
131            result,
132            "relay_example{first_key=\"first_value\", second_key=\"second_value\"} 200\n"
133        );
134    }
135
136    #[test]
137    fn test_prometheus_serialize() {
138        let data = AutoscalingData {
139            memory_usage: 0.75,
140            up: 1,
141            item_count: 10,
142            total_size: 30,
143            services_metrics: vec![
144                ServiceUtilization {
145                    name: "test",
146                    instance_id: 0,
147                    utilization: 10,
148                },
149                ServiceUtilization {
150                    name: "test",
151                    instance_id: 1,
152                    utilization: 30,
153                },
154                ServiceUtilization {
155                    name: "envelope",
156                    instance_id: 1,
157                    utilization: 50,
158                },
159            ],
160            worker_pool_utilization: 61,
161            runtime_utilization: 41,
162        };
163        let result = super::to_prometheus_string(&data);
164        assert_eq!(
165            result,
166            r#"relay_memory_usage 0.75
167relay_up 1
168relay_spool_item_count 10
169relay_spool_total_size 30
170relay_service_utilization{relay_service="test", instance_id="0"} 10
171relay_service_utilization{relay_service="test", instance_id="1"} 30
172relay_service_utilization{relay_service="envelope", instance_id="1"} 50
173relay_worker_pool_utilization 61
174relay_runtime_utilization 41
175"#
176        );
177    }
178}