relay_server/utils/
memory.rs

1use crate::statsd::RelayGauges;
2use arc_swap::ArcSwap;
3use relay_config::Config;
4use relay_statsd::metric;
5use std::fmt;
6use std::fmt::Formatter;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::Instant;
10use sysinfo::{MemoryRefreshKind, System};
11
12/// The representation of the current memory state.
13#[derive(Clone, Copy, Debug)]
14pub struct Memory {
15    /// Used memory.
16    ///
17    /// This measure of used memory represents the Resident Set Size (RSS) which represents the
18    /// amount of physical memory that a process has in the main memory that does not correspond
19    /// to anything on disk.
20    pub used: u64,
21    /// Total memory.
22    pub total: u64,
23}
24
25impl Memory {
26    /// Returns the percentage amount of used memory in the interval [0.0, 1.0].
27    ///
28    /// The percentage measurement will return 1.0 in the following edge cases:
29    /// - When total is 0
30    /// - When used / total produces a NaN
31    pub fn used_percent(&self) -> f32 {
32        let used_percent = self.used as f32 / self.total as f32;
33        if used_percent.is_nan() {
34            return 1.0;
35        };
36
37        used_percent.clamp(0.0, 1.0)
38    }
39}
40
41/// Inner struct that holds the latest [`Memory`] state which is polled at least every 100ms.
42///
43/// The goal of this implementation is to offer lock-free reading to any arbitrary number of threads
44/// while at the same time, reducing to the minimum the need for locking when memory stats need to
45/// be updated.
46///
47/// Because of how the implementation is designed, there is a very small chance that multiple
48/// threads are waiting on the lock that guards [`System`]. The only case in which there might be
49/// multiple threads waiting on the lock, is if a thread holds the lock for more than
50/// `refresh_frequency_ms` and a new thread comes and updates the `last_update` and tries
51/// to acquire the lock to perform another memory reading.
52struct Inner {
53    memory: ArcSwap<Memory>,
54    last_update: AtomicU64,
55    reference_time: Instant,
56    system: Mutex<System>,
57    refresh_frequency_ms: u64,
58}
59
60/// Wrapper which hides the [`Arc`] and exposes utils method to make working with
61/// [`MemoryStat`] as opaque as possible.
62#[derive(Clone)]
63pub struct MemoryStat(Arc<Inner>);
64
65impl MemoryStat {
66    /// Creates an instance of [`MemoryStat`] and obtains the current memory readings from
67    /// [`System`].
68    pub fn new(refresh_frequency_ms: u64) -> Self {
69        // sysinfo docs suggest to use a single instance of `System` across the program.
70        let mut system = System::new();
71        Self(Arc::new(Inner {
72            memory: ArcSwap::from(Arc::new(Self::refresh_memory(&mut system))),
73            last_update: AtomicU64::new(0),
74            reference_time: Instant::now(),
75            system: Mutex::new(system),
76            refresh_frequency_ms,
77        }))
78    }
79
80    /// Returns the current memory data without instantiating [`MemoryStat`].
81    pub fn current_memory() -> Memory {
82        let mut system = System::new();
83        Self::refresh_memory(&mut system)
84    }
85
86    /// Returns a copy of the most up-to-date memory data.
87    pub fn memory(&self) -> Memory {
88        self.try_update();
89        **self.0.memory.load()
90    }
91
92    /// Refreshes the memory readings.
93    fn refresh_memory(system: &mut System) -> Memory {
94        system.refresh_memory_specifics(MemoryRefreshKind::nothing().with_ram());
95        let memory = match system.cgroup_limits() {
96            Some(cgroup) => Memory {
97                used: cgroup.rss,
98                total: cgroup.total_memory,
99            },
100            None => Memory {
101                used: system.used_memory(),
102                total: system.total_memory(),
103            },
104        };
105
106        metric!(gauge(RelayGauges::SystemMemoryUsed) = memory.used);
107        metric!(gauge(RelayGauges::SystemMemoryTotal) = memory.total);
108
109        memory
110    }
111
112    /// Updates the memory readings unconditionally.
113    fn update(&self) {
114        let mut system = self
115            .0
116            .system
117            .lock()
118            .unwrap_or_else(|system| system.into_inner());
119
120        let updated_memory = Self::refresh_memory(&mut system);
121        self.0.memory.store(Arc::new(updated_memory));
122    }
123
124    /// Updates the memory readings if at least `refresh_frequency_ms` has passed.
125    fn try_update(&self) {
126        let last_update = self.0.last_update.load(Ordering::Relaxed);
127        let elapsed_time = self.0.reference_time.elapsed().as_millis() as u64;
128
129        if elapsed_time - last_update < self.0.refresh_frequency_ms {
130            return;
131        }
132
133        if self
134            .0
135            .last_update
136            .compare_exchange_weak(
137                last_update,
138                elapsed_time,
139                Ordering::Relaxed,
140                Ordering::Relaxed,
141            )
142            .is_err()
143        {
144            return;
145        }
146
147        self.update();
148    }
149}
150
151impl fmt::Debug for MemoryStat {
152    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
153        write!(f, "MemoryStat")
154    }
155}
156
157impl Default for MemoryStat {
158    fn default() -> Self {
159        Self::new(100)
160    }
161}
162
163/// Enum representing the two different states of a memory check.
164pub enum MemoryCheck {
165    /// The memory usage is below the specified thresholds.
166    Ok(Memory),
167    /// The memory usage exceeds the specified thresholds.
168    Exceeded(Memory),
169}
170
171impl MemoryCheck {
172    /// Returns `true` if [`MemoryCheck`] is of variant [`MemoryCheck::Ok`].
173    pub fn has_capacity(&self) -> bool {
174        matches!(self, Self::Ok(_))
175    }
176
177    /// Returns `true` if [`MemoryCheck`] is of variant [`MemoryCheck::Exceeded`].
178    pub fn is_exceeded(&self) -> bool {
179        !self.has_capacity()
180    }
181}
182
183/// Struct that composes a [`Config`] and [`MemoryStat`] and provides utility methods to validate
184/// whether memory is within limits.
185///
186/// The rationale behind such struct, is to be able to share across Relay the same logic for dealing
187/// with memory readings. It's decoupled from [`MemoryStat`] because it's just a layer on top that
188/// decides how memory readings are interpreted.
189#[derive(Clone, Debug)]
190pub struct MemoryChecker {
191    memory_stat: MemoryStat,
192    config: Arc<Config>,
193}
194
195impl MemoryChecker {
196    /// Create an instance of [`MemoryChecker`].
197    pub fn new(memory_stat: MemoryStat, config: Arc<Config>) -> Self {
198        Self {
199            memory_stat,
200            config: config.clone(),
201        }
202    }
203
204    /// Checks if the used percentage of memory is below the specified threshold.
205    pub fn check_memory_percent(&self) -> MemoryCheck {
206        let memory = self.memory_stat.memory();
207        if memory.used_percent() < self.config.health_max_memory_watermark_percent() {
208            return MemoryCheck::Ok(memory);
209        }
210
211        MemoryCheck::Exceeded(memory)
212    }
213
214    /// Checks if the used memory (in bytes) is below the specified threshold.
215    pub fn check_memory_bytes(&self) -> MemoryCheck {
216        let memory = self.memory_stat.memory();
217        if memory.used < self.config.health_max_memory_watermark_bytes() {
218            return MemoryCheck::Ok(memory);
219        }
220
221        MemoryCheck::Exceeded(memory)
222    }
223
224    /// Checks if the used memory is below both percentage and bytes thresholds.
225    ///
226    /// This is the function that should be mainly used for checking whether of not Relay has
227    /// enough memory.
228    pub fn check_memory(&self) -> MemoryCheck {
229        let memory = self.memory_stat.memory();
230        if memory.used_percent() < self.config.health_max_memory_watermark_percent()
231            && memory.used < self.config.health_max_memory_watermark_bytes()
232        {
233            return MemoryCheck::Ok(memory);
234        }
235
236        MemoryCheck::Exceeded(memory)
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use relay_config::Config;
243    use std::sync::atomic::Ordering;
244    use std::sync::Arc;
245    use std::thread::sleep;
246    use std::time::Duration;
247
248    use crate::utils::{Memory, MemoryChecker, MemoryStat};
249
250    #[test]
251    fn test_memory_used_percent_both_0() {
252        let memory = Memory { used: 0, total: 0 };
253        assert_eq!(memory.used_percent(), 1.0);
254    }
255
256    #[test]
257    fn test_memory_used_percent_total_0() {
258        let memory = Memory {
259            used: 100,
260            total: 0,
261        };
262        assert_eq!(memory.used_percent(), 1.0);
263    }
264
265    #[test]
266    fn test_memory_used_percent_zero() {
267        let memory = Memory {
268            used: 0,
269            total: 100,
270        };
271        assert_eq!(memory.used_percent(), 0.0);
272    }
273
274    #[test]
275    fn test_memory_used_percent_half() {
276        let memory = Memory {
277            used: 50,
278            total: 100,
279        };
280        assert_eq!(memory.used_percent(), 0.5);
281    }
282
283    #[test]
284    fn test_memory_checker() {
285        let config = Config::from_json_value(serde_json::json!({
286            "health": {
287                "max_memory_percent": 1.0
288            }
289        }))
290        .unwrap();
291        let memory_checker = MemoryChecker::new(MemoryStat::default(), Arc::new(config));
292        assert!(memory_checker.check_memory().has_capacity());
293
294        let config = Config::from_json_value(serde_json::json!({
295            "health": {
296                "max_memory_percent": 0.0
297            }
298        }))
299        .unwrap();
300        let memory_checker = MemoryChecker::new(MemoryStat::default(), Arc::new(config));
301        assert!(memory_checker.check_memory().is_exceeded());
302    }
303
304    #[test]
305    fn test_last_update_is_updated() {
306        let memory = MemoryStat::new(0);
307        let first_update = memory.0.last_update.load(Ordering::Relaxed);
308
309        sleep(Duration::from_millis(1));
310
311        memory.memory();
312        let second_update = memory.0.last_update.load(Ordering::Relaxed);
313
314        assert!(first_update <= second_update);
315    }
316}