relay_server/utils/
memory.rs1use crate::statsd::RelayGauges;
2use arc_swap::ArcSwap;
3use relay_config::Config;
4use relay_statsd::metric;
5use std::fmt;
6use std::fmt::Formatter;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::Instant;
10use sysinfo::{MemoryRefreshKind, System};
11
12#[derive(Clone, Copy, Debug)]
14pub struct Memory {
15 pub used: u64,
21 pub total: u64,
23}
24
25impl Memory {
26 pub fn used_percent(&self) -> f32 {
32 let used_percent = self.used as f32 / self.total as f32;
33 if used_percent.is_nan() {
34 return 1.0;
35 };
36
37 used_percent.clamp(0.0, 1.0)
38 }
39}
40
41struct Inner {
53 memory: ArcSwap<Memory>,
54 last_update: AtomicU64,
55 reference_time: Instant,
56 system: Mutex<System>,
57 refresh_frequency_ms: u64,
58}
59
60#[derive(Clone)]
63pub struct MemoryStat(Arc<Inner>);
64
65impl MemoryStat {
66 pub fn new(refresh_frequency_ms: u64) -> Self {
69 let mut system = System::new();
71 Self(Arc::new(Inner {
72 memory: ArcSwap::from(Arc::new(Self::refresh_memory(&mut system))),
73 last_update: AtomicU64::new(0),
74 reference_time: Instant::now(),
75 system: Mutex::new(system),
76 refresh_frequency_ms,
77 }))
78 }
79
80 pub fn current_memory() -> Memory {
82 let mut system = System::new();
83 Self::refresh_memory(&mut system)
84 }
85
86 pub fn memory(&self) -> Memory {
88 self.try_update();
89 **self.0.memory.load()
90 }
91
92 fn refresh_memory(system: &mut System) -> Memory {
94 system.refresh_memory_specifics(MemoryRefreshKind::nothing().with_ram());
95 let memory = match system.cgroup_limits() {
96 Some(cgroup) => Memory {
97 used: cgroup.rss,
98 total: cgroup.total_memory,
99 },
100 None => Memory {
101 used: system.used_memory(),
102 total: system.total_memory(),
103 },
104 };
105
106 metric!(gauge(RelayGauges::SystemMemoryUsed) = memory.used);
107 metric!(gauge(RelayGauges::SystemMemoryTotal) = memory.total);
108
109 memory
110 }
111
112 fn update(&self) {
114 let mut system = self
115 .0
116 .system
117 .lock()
118 .unwrap_or_else(|system| system.into_inner());
119
120 let updated_memory = Self::refresh_memory(&mut system);
121 self.0.memory.store(Arc::new(updated_memory));
122 }
123
124 fn try_update(&self) {
126 let last_update = self.0.last_update.load(Ordering::Relaxed);
127 let elapsed_time = self.0.reference_time.elapsed().as_millis() as u64;
128
129 if elapsed_time - last_update < self.0.refresh_frequency_ms {
130 return;
131 }
132
133 if self
134 .0
135 .last_update
136 .compare_exchange_weak(
137 last_update,
138 elapsed_time,
139 Ordering::Relaxed,
140 Ordering::Relaxed,
141 )
142 .is_err()
143 {
144 return;
145 }
146
147 self.update();
148 }
149}
150
151impl fmt::Debug for MemoryStat {
152 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
153 write!(f, "MemoryStat")
154 }
155}
156
157impl Default for MemoryStat {
158 fn default() -> Self {
159 Self::new(100)
160 }
161}
162
163pub enum MemoryCheck {
165 Ok(Memory),
167 Exceeded(Memory),
169}
170
171impl MemoryCheck {
172 pub fn has_capacity(&self) -> bool {
174 matches!(self, Self::Ok(_))
175 }
176
177 pub fn is_exceeded(&self) -> bool {
179 !self.has_capacity()
180 }
181}
182
183#[derive(Clone, Debug)]
190pub struct MemoryChecker {
191 memory_stat: MemoryStat,
192 config: Arc<Config>,
193}
194
195impl MemoryChecker {
196 pub fn new(memory_stat: MemoryStat, config: Arc<Config>) -> Self {
198 Self {
199 memory_stat,
200 config: config.clone(),
201 }
202 }
203
204 pub fn check_memory_percent(&self) -> MemoryCheck {
206 let memory = self.memory_stat.memory();
207 if memory.used_percent() < self.config.health_max_memory_watermark_percent() {
208 return MemoryCheck::Ok(memory);
209 }
210
211 MemoryCheck::Exceeded(memory)
212 }
213
214 pub fn check_memory_bytes(&self) -> MemoryCheck {
216 let memory = self.memory_stat.memory();
217 if memory.used < self.config.health_max_memory_watermark_bytes() {
218 return MemoryCheck::Ok(memory);
219 }
220
221 MemoryCheck::Exceeded(memory)
222 }
223
224 pub fn check_memory(&self) -> MemoryCheck {
229 let memory = self.memory_stat.memory();
230 if memory.used_percent() < self.config.health_max_memory_watermark_percent()
231 && memory.used < self.config.health_max_memory_watermark_bytes()
232 {
233 return MemoryCheck::Ok(memory);
234 }
235
236 MemoryCheck::Exceeded(memory)
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use relay_config::Config;
243 use std::sync::atomic::Ordering;
244 use std::sync::Arc;
245 use std::thread::sleep;
246 use std::time::Duration;
247
248 use crate::utils::{Memory, MemoryChecker, MemoryStat};
249
250 #[test]
251 fn test_memory_used_percent_both_0() {
252 let memory = Memory { used: 0, total: 0 };
253 assert_eq!(memory.used_percent(), 1.0);
254 }
255
256 #[test]
257 fn test_memory_used_percent_total_0() {
258 let memory = Memory {
259 used: 100,
260 total: 0,
261 };
262 assert_eq!(memory.used_percent(), 1.0);
263 }
264
265 #[test]
266 fn test_memory_used_percent_zero() {
267 let memory = Memory {
268 used: 0,
269 total: 100,
270 };
271 assert_eq!(memory.used_percent(), 0.0);
272 }
273
274 #[test]
275 fn test_memory_used_percent_half() {
276 let memory = Memory {
277 used: 50,
278 total: 100,
279 };
280 assert_eq!(memory.used_percent(), 0.5);
281 }
282
283 #[test]
284 fn test_memory_checker() {
285 let config = Config::from_json_value(serde_json::json!({
286 "health": {
287 "max_memory_percent": 1.0
288 }
289 }))
290 .unwrap();
291 let memory_checker = MemoryChecker::new(MemoryStat::default(), Arc::new(config));
292 assert!(memory_checker.check_memory().has_capacity());
293
294 let config = Config::from_json_value(serde_json::json!({
295 "health": {
296 "max_memory_percent": 0.0
297 }
298 }))
299 .unwrap();
300 let memory_checker = MemoryChecker::new(MemoryStat::default(), Arc::new(config));
301 assert!(memory_checker.check_memory().is_exceeded());
302 }
303
304 #[test]
305 fn test_last_update_is_updated() {
306 let memory = MemoryStat::new(0);
307 let first_update = memory.0.last_update.load(Ordering::Relaxed);
308
309 sleep(Duration::from_millis(1));
310
311 memory.memory();
312 let second_update = memory.0.last_update.load(Ordering::Relaxed);
313
314 assert!(first_update <= second_update);
315 }
316}