relay_server/endpoints/
autoscaling.rs

1use crate::http::StatusCode;
2use crate::service::ServiceState;
3use crate::services::autoscaling::{AutoscalingData, AutoscalingMessageKind};
4use std::fmt::Display;
5use std::fmt::Write;
6
7/// Returns internal metrics data for relay.
8pub async fn handle(state: ServiceState) -> (StatusCode, String) {
9    let Some(autoscaling) = state.autoscaling() else {
10        return (
11            StatusCode::NOT_FOUND,
12            "Autoscaling metrics not enabled".to_owned(),
13        );
14    };
15
16    let data = match autoscaling.send(AutoscalingMessageKind::Check).await {
17        Ok(data) => data,
18        Err(_) => {
19            return (
20                StatusCode::INTERNAL_SERVER_ERROR,
21                "Failed to collect internal metrics".to_owned(),
22            );
23        }
24    };
25
26    (StatusCode::OK, to_prometheus_string(&data))
27}
28
29/// Serializes the autoscaling data into a prometheus string.
30fn to_prometheus_string(data: &AutoscalingData) -> String {
31    let mut result = String::with_capacity(2048);
32
33    append_data_row(&mut result, "memory_usage", data.memory_usage, &[]);
34    append_data_row(&mut result, "up", data.up, &[]);
35    append_data_row(&mut result, "spool_item_count", data.item_count, &[]);
36    append_data_row(&mut result, "spool_total_size", data.total_size, &[]);
37    for utilization in &data.services_metrics {
38        let service_name = extract_service_name(utilization.name);
39        append_data_row(
40            &mut result,
41            "service_utilization",
42            utilization.utilization,
43            &[
44                ("relay_service", service_name),
45                ("instance_id", &format!("{}", utilization.instance_id)),
46            ],
47        );
48    }
49
50    append_data_row(
51        &mut result,
52        "worker_pool_utilization",
53        data.worker_pool_utilization,
54        &[],
55    );
56    append_data_row(
57        &mut result,
58        "runtime_utilization",
59        data.runtime_utilization,
60        &[],
61    );
62    result
63}
64
65fn append_data_row(result: &mut String, label: &str, data: impl Display, tags: &[(&str, &str)]) {
66    // Metrics are automatically prefixed with "relay_"
67    write!(result, "relay_{label}").unwrap();
68    if !tags.is_empty() {
69        result.push('{');
70        for (idx, (key, value)) in tags.iter().enumerate() {
71            if idx > 0 {
72                result.push_str(", ");
73            }
74            write!(result, "{key}=\"{value}\"").unwrap();
75        }
76        result.push('}');
77    }
78    writeln!(result, " {data}").unwrap();
79}
80
81/// Extracts the concrete Service name from a string with a namespace,
82/// In case there are no ':' because a custom name is used, then the full name is returned.
83/// For example:
84/// * `relay::services::MyService` -> `MyService`.
85/// * `aggregator_service` -> `aggregator_service`.
86fn extract_service_name(full_name: &str) -> &str {
87    full_name
88        .rsplit_once(':')
89        .map(|(_, s)| s)
90        .unwrap_or(full_name)
91}
92
93#[cfg(test)]
94mod test {
95    use crate::endpoints::autoscaling::{append_data_row, extract_service_name};
96    use crate::services::autoscaling::{AutoscalingData, ServiceUtilization};
97
98    #[test]
99    fn test_extract_service_with_namespace() {
100        let service_name = extract_service_name("relay::services::MyService");
101        assert_eq!(service_name, "MyService");
102    }
103
104    #[test]
105    fn test_extract_service_without_namespace() {
106        let service_name = extract_service_name("custom_service");
107        assert_eq!(service_name, "custom_service");
108    }
109
110    #[test]
111    fn test_append_no_labels() {
112        let mut result = String::new();
113        append_data_row(&mut result, "example", 200, &[]);
114        assert_eq!(result, "relay_example 200\n");
115    }
116
117    #[test]
118    fn test_append_single_label() {
119        let mut result = String::new();
120        append_data_row(&mut result, "example", 200, &[("key", "value")]);
121        assert_eq!(result, "relay_example{key=\"value\"} 200\n");
122    }
123
124    #[test]
125    fn test_append_multiple_labels() {
126        let mut result = String::new();
127        append_data_row(
128            &mut result,
129            "example",
130            200,
131            &[("first_key", "first_value"), ("second_key", "second_value")],
132        );
133        assert_eq!(
134            result,
135            "relay_example{first_key=\"first_value\", second_key=\"second_value\"} 200\n"
136        );
137    }
138
139    #[test]
140    fn test_prometheus_serialize() {
141        let data = AutoscalingData {
142            memory_usage: 0.75,
143            up: 1,
144            item_count: 10,
145            total_size: 30,
146            services_metrics: vec![
147                ServiceUtilization {
148                    name: "test",
149                    instance_id: 0,
150                    utilization: 10,
151                },
152                ServiceUtilization {
153                    name: "test",
154                    instance_id: 1,
155                    utilization: 30,
156                },
157                ServiceUtilization {
158                    name: "envelope",
159                    instance_id: 1,
160                    utilization: 50,
161                },
162            ],
163            worker_pool_utilization: 61,
164            runtime_utilization: 41,
165        };
166        let result = super::to_prometheus_string(&data);
167        assert_eq!(
168            result,
169            r#"relay_memory_usage 0.75
170relay_up 1
171relay_spool_item_count 10
172relay_spool_total_size 30
173relay_service_utilization{relay_service="test", instance_id="0"} 10
174relay_service_utilization{relay_service="test", instance_id="1"} 30
175relay_service_utilization{relay_service="envelope", instance_id="1"} 50
176relay_worker_pool_utilization 61
177relay_runtime_utilization 41
178"#
179        );
180    }
181}