relay_server/services/processor/
session.rs

1//! Contains the sessions related processor code.
2
3use std::error::Error;
4use std::net;
5
6use chrono::{DateTime, Duration as SignedDuration, Utc};
7use relay_config::Config;
8use relay_dynamic_config::{GlobalConfig, SessionMetricsConfig};
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{
11    IpAddr, SessionAggregates, SessionAttributes, SessionStatus, SessionUpdate,
12};
13use relay_filter::ProjectFiltersConfig;
14use relay_metrics::Bucket;
15use relay_statsd::metric;
16
17use crate::envelope::{ContentType, Item, ItemType};
18use crate::managed::{ItemAction, TypedEnvelope};
19use crate::services::processor::{MINIMUM_CLOCK_DRIFT, ProcessingExtractedMetrics, SessionGroup};
20use crate::services::projects::project::ProjectInfo;
21use crate::statsd::RelayTimers;
22
23#[derive(Debug, Clone, Copy)]
24struct SessionProcessingConfig<'a> {
25    pub global_config: &'a GlobalConfig,
26    pub config: &'a Config,
27    pub filters_config: &'a ProjectFiltersConfig,
28    pub metrics_config: &'a SessionMetricsConfig,
29    pub client: Option<&'a str>,
30    pub client_addr: Option<std::net::IpAddr>,
31    pub received: DateTime<Utc>,
32    pub clock_drift_processor: &'a ClockDriftProcessor,
33}
34
35/// Validates all sessions and session aggregates in the envelope, if any.
36///
37/// Both are removed from the envelope if they contain invalid JSON or if their timestamps
38/// are out of range after clock drift correction.
39pub fn process(
40    managed_envelope: &mut TypedEnvelope<SessionGroup>,
41    global_config: &GlobalConfig,
42    config: &Config,
43    extracted_metrics: &mut ProcessingExtractedMetrics,
44    project_info: &ProjectInfo,
45) {
46    let received = managed_envelope.received_at();
47    let envelope = managed_envelope.envelope_mut();
48    let client = envelope.meta().client().map(|x| x.to_owned());
49    let client_addr = envelope.meta().client_addr();
50
51    let clock_drift_processor =
52        ClockDriftProcessor::new(envelope.sent_at(), received).at_least(MINIMUM_CLOCK_DRIFT);
53
54    let spc = SessionProcessingConfig {
55        global_config,
56        config,
57        filters_config: &project_info.config().filter_settings,
58        metrics_config: &project_info.config().session_metrics,
59        client: client.as_deref(),
60        client_addr,
61        received,
62        clock_drift_processor: &clock_drift_processor,
63    };
64
65    let mut session_extracted_metrics = Vec::new();
66    managed_envelope.retain_items(|item| {
67        let should_keep = match item.ty() {
68            ItemType::Session => process_session(item, spc, &mut session_extracted_metrics),
69            ItemType::Sessions => {
70                process_session_aggregates(item, spc, &mut session_extracted_metrics)
71            }
72            _ => true, // Keep all other item types
73        };
74        if should_keep {
75            ItemAction::Keep
76        } else {
77            ItemAction::DropSilently // sessions never log outcomes.
78        }
79    });
80
81    extracted_metrics.extend_project_metrics(session_extracted_metrics, None);
82}
83
84/// Returns Ok(true) if attributes were modified.
85/// Returns Err if the session should be dropped.
86fn validate_attributes(
87    client_addr: &Option<net::IpAddr>,
88    attributes: &mut SessionAttributes,
89) -> Result<bool, ()> {
90    let mut changed = false;
91
92    let release = &attributes.release;
93    if let Err(e) = relay_event_normalization::validate_release(release) {
94        relay_log::trace!(
95            error = &e as &dyn Error,
96            release,
97            "skipping session with invalid release"
98        );
99        return Err(());
100    }
101
102    if let Some(ref env) = attributes.environment {
103        if let Err(e) = relay_event_normalization::validate_environment(env) {
104            relay_log::trace!(
105                error = &e as &dyn Error,
106                env,
107                "removing invalid environment"
108            );
109            attributes.environment = None;
110            changed = true;
111        }
112    }
113
114    if let Some(ref ip_address) = attributes.ip_address {
115        if ip_address.is_auto() {
116            attributes.ip_address = client_addr.map(IpAddr::from);
117            changed = true;
118        }
119    }
120
121    Ok(changed)
122}
123
124fn is_valid_session_timestamp(
125    received: DateTime<Utc>,
126    timestamp: DateTime<Utc>,
127    max_secs_in_future: i64,
128    max_session_secs_in_past: i64,
129) -> bool {
130    let max_age = SignedDuration::seconds(max_session_secs_in_past);
131    if (received - timestamp) > max_age {
132        relay_log::trace!("skipping session older than {} days", max_age.num_days());
133        return false;
134    }
135
136    let max_future = SignedDuration::seconds(max_secs_in_future);
137    if (timestamp - received) > max_future {
138        relay_log::trace!(
139            "skipping session more than {}s in the future",
140            max_future.num_seconds()
141        );
142        return false;
143    }
144
145    true
146}
147
148/// Returns true if the item should be kept.
149#[allow(clippy::too_many_arguments)]
150fn process_session(
151    item: &mut Item,
152    session_processing_config: SessionProcessingConfig,
153    extracted_metrics: &mut Vec<Bucket>,
154) -> bool {
155    let SessionProcessingConfig {
156        global_config,
157        config,
158        filters_config,
159        metrics_config,
160        client,
161        client_addr,
162        received,
163        clock_drift_processor,
164    } = session_processing_config;
165
166    let mut changed = false;
167    let payload = item.payload();
168    let max_secs_in_future = config.max_secs_in_future();
169    let max_session_secs_in_past = config.max_session_secs_in_past();
170
171    // sessionupdate::parse is already tested
172    let mut session = match SessionUpdate::parse(&payload) {
173        Ok(session) => session,
174        Err(error) => {
175            relay_log::trace!(
176                error = &error as &dyn Error,
177                "skipping invalid session payload"
178            );
179            return false;
180        }
181    };
182
183    if session.sequence == u64::MAX {
184        relay_log::trace!("skipping session due to sequence overflow");
185        return false;
186    };
187
188    if clock_drift_processor.is_drifted() {
189        relay_log::trace!("applying clock drift correction to session");
190        clock_drift_processor.process_datetime(&mut session.started);
191        clock_drift_processor.process_datetime(&mut session.timestamp);
192        changed = true;
193    }
194
195    if session.timestamp < session.started {
196        relay_log::trace!("fixing session timestamp to {}", session.timestamp);
197        session.timestamp = session.started;
198        changed = true;
199    }
200
201    // Log the timestamp delay for all sessions after clock drift correction.
202    let session_delay = received - session.timestamp;
203    if session_delay > SignedDuration::minutes(1) {
204        metric!(
205            timer(RelayTimers::TimestampDelay) = session_delay.to_std().unwrap(),
206            category = "session",
207        );
208    }
209
210    // Validate timestamps
211    for t in [session.timestamp, session.started] {
212        if !is_valid_session_timestamp(received, t, max_secs_in_future, max_session_secs_in_past) {
213            return false;
214        }
215    }
216
217    // Validate attributes
218    match validate_attributes(&client_addr, &mut session.attributes) {
219        Err(_) => return false,
220        Ok(changed_attributes) => {
221            changed |= changed_attributes;
222        }
223    }
224
225    if config.processing_enabled() && matches!(session.status, SessionStatus::Unknown(_)) {
226        return false;
227    }
228
229    if relay_filter::should_filter(
230        &session,
231        client_addr,
232        filters_config,
233        global_config.filters(),
234    )
235    .is_err()
236    {
237        return false;
238    };
239
240    // Extract metrics if they haven't been extracted by a prior Relay
241    if metrics_config.is_enabled()
242        && !item.metrics_extracted()
243        && !matches!(session.status, SessionStatus::Unknown(_))
244    {
245        crate::metrics_extraction::sessions::extract_session_metrics(
246            &session.attributes,
247            &session,
248            client,
249            extracted_metrics,
250            metrics_config.should_extract_abnormal_mechanism(),
251        );
252        item.set_metrics_extracted(true);
253    }
254
255    // Drop the session if metrics have been extracted in this or a prior Relay
256    if item.metrics_extracted() {
257        return false;
258    } else if config.processing_enabled() {
259        relay_log::error!(
260            "Session metrics extraction disabled on a processing Relay, \
261            make sure you're running an up to date Relay matching the Sentry \
262            version."
263        );
264        return false;
265    }
266
267    if changed {
268        let json_string = match serde_json::to_string(&session) {
269            Ok(json) => json,
270            Err(err) => {
271                relay_log::error!(error = &err as &dyn Error, "failed to serialize session");
272                return false;
273            }
274        };
275
276        item.set_payload(ContentType::Json, json_string);
277    }
278
279    true
280}
281
282#[allow(clippy::too_many_arguments)]
283fn process_session_aggregates(
284    item: &mut Item,
285    session_processing_config: SessionProcessingConfig,
286    extracted_metrics: &mut Vec<Bucket>,
287) -> bool {
288    let SessionProcessingConfig {
289        global_config,
290        config,
291        filters_config,
292        metrics_config,
293        client,
294        client_addr,
295        received,
296        clock_drift_processor,
297    } = session_processing_config;
298
299    let mut changed = false;
300    let payload = item.payload();
301    let max_secs_in_future = config.max_secs_in_future();
302    let max_session_secs_in_past = config.max_session_secs_in_past();
303
304    let mut session = match SessionAggregates::parse(&payload) {
305        Ok(session) => session,
306        Err(error) => {
307            relay_log::trace!(
308                error = &error as &dyn Error,
309                "skipping invalid sessions payload"
310            );
311            return false;
312        }
313    };
314
315    if clock_drift_processor.is_drifted() {
316        relay_log::trace!("applying clock drift correction to session");
317        for aggregate in &mut session.aggregates {
318            clock_drift_processor.process_datetime(&mut aggregate.started);
319        }
320        changed = true;
321    }
322
323    // Validate timestamps
324    session.aggregates.retain(|aggregate| {
325        is_valid_session_timestamp(
326            received,
327            aggregate.started,
328            max_secs_in_future,
329            max_session_secs_in_past,
330        )
331    });
332
333    // After timestamp validation, aggregates could now be empty
334    if session.aggregates.is_empty() {
335        return false;
336    }
337
338    // Validate attributes
339    match validate_attributes(&client_addr, &mut session.attributes) {
340        Err(_) => return false,
341        Ok(changed_attributes) => {
342            changed |= changed_attributes;
343        }
344    }
345
346    if relay_filter::should_filter(
347        &session,
348        client_addr,
349        filters_config,
350        global_config.filters(),
351    )
352    .is_err()
353    {
354        return false;
355    };
356
357    // Extract metrics if they haven't been extracted by a prior Relay
358    if metrics_config.is_enabled() && !item.metrics_extracted() {
359        for aggregate in &session.aggregates {
360            crate::metrics_extraction::sessions::extract_session_metrics(
361                &session.attributes,
362                aggregate,
363                client,
364                extracted_metrics,
365                metrics_config.should_extract_abnormal_mechanism(),
366            );
367            item.set_metrics_extracted(true);
368        }
369    }
370
371    // Drop the aggregate if metrics have been extracted in this or a prior Relay
372    if item.metrics_extracted() {
373        return false;
374    }
375
376    if changed {
377        let json_string = match serde_json::to_string(&session) {
378            Ok(json) => json,
379            Err(err) => {
380                relay_log::error!(error = &err as &dyn Error, "failed to serialize session");
381                return false;
382            }
383        };
384
385        item.set_payload(ContentType::Json, json_string);
386    }
387
388    true
389}
390
391#[cfg(test)]
392mod tests {
393    use std::str::FromStr;
394
395    use super::*;
396
397    struct TestProcessSessionArguments<'a> {
398        item: Item,
399        received: DateTime<Utc>,
400        client: Option<&'a str>,
401        client_addr: Option<net::IpAddr>,
402        metrics_config: SessionMetricsConfig,
403        clock_drift_processor: ClockDriftProcessor,
404        extracted_metrics: Vec<Bucket>,
405    }
406
407    impl TestProcessSessionArguments<'_> {
408        fn run_session_producer(&mut self) -> bool {
409            let spc = SessionProcessingConfig {
410                global_config: &Default::default(),
411                config: &Default::default(),
412                filters_config: &Default::default(),
413                metrics_config: &self.metrics_config,
414                client: self.client,
415                client_addr: self.client_addr,
416                received: self.received,
417                clock_drift_processor: &self.clock_drift_processor,
418            };
419            process_session(&mut self.item, spc, &mut self.extracted_metrics)
420        }
421
422        fn default() -> Self {
423            let mut item = Item::new(ItemType::Event);
424
425            let session = r#"{
426            "init": false,
427            "started": "2021-04-26T08:00:00+0100",
428            "timestamp": "2021-04-26T08:00:00+0100",
429            "attrs": {
430                "release": "1.0.0"
431            },
432            "did": "user123",
433            "status": "this is not a valid status!",
434            "duration": 123.4
435        }"#;
436
437            item.set_payload(ContentType::Json, session);
438            let received = DateTime::from_str("2021-04-26T08:00:00+0100").unwrap();
439
440            Self {
441                item,
442                received,
443                client: None,
444                client_addr: None,
445                metrics_config: serde_json::from_str(
446                    "
447        {
448            \"version\": 0,
449            \"drop\": true
450        }",
451                )
452                .unwrap(),
453                clock_drift_processor: ClockDriftProcessor::new(None, received),
454                extracted_metrics: vec![],
455            }
456        }
457    }
458
459    /// Checks that the default test-arguments leads to the item being kept, which helps ensure the
460    /// other tests are valid.
461    #[test]
462    fn test_process_session_keep_item() {
463        let mut args = TestProcessSessionArguments::default();
464        assert!(args.run_session_producer());
465    }
466
467    #[test]
468    fn test_process_session_invalid_json() {
469        let mut args = TestProcessSessionArguments::default();
470        args.item
471            .set_payload(ContentType::Json, "this isnt valid json");
472        assert!(!args.run_session_producer());
473    }
474
475    #[test]
476    fn test_process_session_sequence_overflow() {
477        let mut args = TestProcessSessionArguments::default();
478        args.item.set_payload(
479            ContentType::Json,
480            r#"{
481            "init": false,
482            "started": "2021-04-26T08:00:00+0100",
483            "timestamp": "2021-04-26T08:00:00+0100",
484            "seq": 18446744073709551615,
485            "attrs": {
486                "release": "1.0.0"
487            },
488            "did": "user123",
489            "status": "this is not a valid status!",
490            "duration": 123.4
491        }"#,
492        );
493        assert!(!args.run_session_producer());
494    }
495
496    #[test]
497    fn test_process_session_invalid_timestamp() {
498        let mut args = TestProcessSessionArguments::default();
499        args.received = DateTime::from_str("2021-05-26T08:00:00+0100").unwrap();
500        assert!(!args.run_session_producer());
501    }
502
503    #[test]
504    fn test_process_session_metrics_extracted() {
505        let mut args = TestProcessSessionArguments::default();
506        args.item.set_metrics_extracted(true);
507        assert!(!args.run_session_producer());
508    }
509}