1use std::error::Error;
4use std::net;
5
6use chrono::{DateTime, Duration as SignedDuration, Utc};
7use relay_config::Config;
8use relay_dynamic_config::{GlobalConfig, SessionMetricsConfig};
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{
11 IpAddr, SessionAggregates, SessionAttributes, SessionStatus, SessionUpdate,
12};
13use relay_filter::ProjectFiltersConfig;
14use relay_metrics::Bucket;
15use relay_statsd::metric;
16
17use crate::envelope::{ContentType, Item, ItemType};
18use crate::managed::{ItemAction, TypedEnvelope};
19use crate::services::processor::{MINIMUM_CLOCK_DRIFT, ProcessingExtractedMetrics, SessionGroup};
20use crate::services::projects::project::ProjectInfo;
21use crate::statsd::RelayTimers;
22
23#[derive(Debug, Clone, Copy)]
24struct SessionProcessingConfig<'a> {
25 pub global_config: &'a GlobalConfig,
26 pub config: &'a Config,
27 pub filters_config: &'a ProjectFiltersConfig,
28 pub metrics_config: &'a SessionMetricsConfig,
29 pub client: Option<&'a str>,
30 pub client_addr: Option<std::net::IpAddr>,
31 pub received: DateTime<Utc>,
32 pub clock_drift_processor: &'a ClockDriftProcessor,
33}
34
35pub fn process(
40 managed_envelope: &mut TypedEnvelope<SessionGroup>,
41 global_config: &GlobalConfig,
42 config: &Config,
43 extracted_metrics: &mut ProcessingExtractedMetrics,
44 project_info: &ProjectInfo,
45) {
46 let received = managed_envelope.received_at();
47 let envelope = managed_envelope.envelope_mut();
48 let client = envelope.meta().client().map(|x| x.to_owned());
49 let client_addr = envelope.meta().client_addr();
50
51 let clock_drift_processor =
52 ClockDriftProcessor::new(envelope.sent_at(), received).at_least(MINIMUM_CLOCK_DRIFT);
53
54 let spc = SessionProcessingConfig {
55 global_config,
56 config,
57 filters_config: &project_info.config().filter_settings,
58 metrics_config: &project_info.config().session_metrics,
59 client: client.as_deref(),
60 client_addr,
61 received,
62 clock_drift_processor: &clock_drift_processor,
63 };
64
65 let mut session_extracted_metrics = Vec::new();
66 managed_envelope.retain_items(|item| {
67 let should_keep = match item.ty() {
68 ItemType::Session => process_session(item, spc, &mut session_extracted_metrics),
69 ItemType::Sessions => {
70 process_session_aggregates(item, spc, &mut session_extracted_metrics)
71 }
72 _ => true, };
74 if should_keep {
75 ItemAction::Keep
76 } else {
77 ItemAction::DropSilently }
79 });
80
81 extracted_metrics.extend_project_metrics(session_extracted_metrics, None);
82}
83
84fn validate_attributes(
87 client_addr: &Option<net::IpAddr>,
88 attributes: &mut SessionAttributes,
89) -> Result<bool, ()> {
90 let mut changed = false;
91
92 let release = &attributes.release;
93 if let Err(e) = relay_event_normalization::validate_release(release) {
94 relay_log::trace!(
95 error = &e as &dyn Error,
96 release,
97 "skipping session with invalid release"
98 );
99 return Err(());
100 }
101
102 if let Some(ref env) = attributes.environment {
103 if let Err(e) = relay_event_normalization::validate_environment(env) {
104 relay_log::trace!(
105 error = &e as &dyn Error,
106 env,
107 "removing invalid environment"
108 );
109 attributes.environment = None;
110 changed = true;
111 }
112 }
113
114 if let Some(ref ip_address) = attributes.ip_address {
115 if ip_address.is_auto() {
116 attributes.ip_address = client_addr.map(IpAddr::from);
117 changed = true;
118 }
119 }
120
121 Ok(changed)
122}
123
124fn is_valid_session_timestamp(
125 received: DateTime<Utc>,
126 timestamp: DateTime<Utc>,
127 max_secs_in_future: i64,
128 max_session_secs_in_past: i64,
129) -> bool {
130 let max_age = SignedDuration::seconds(max_session_secs_in_past);
131 if (received - timestamp) > max_age {
132 relay_log::trace!("skipping session older than {} days", max_age.num_days());
133 return false;
134 }
135
136 let max_future = SignedDuration::seconds(max_secs_in_future);
137 if (timestamp - received) > max_future {
138 relay_log::trace!(
139 "skipping session more than {}s in the future",
140 max_future.num_seconds()
141 );
142 return false;
143 }
144
145 true
146}
147
148#[allow(clippy::too_many_arguments)]
150fn process_session(
151 item: &mut Item,
152 session_processing_config: SessionProcessingConfig,
153 extracted_metrics: &mut Vec<Bucket>,
154) -> bool {
155 let SessionProcessingConfig {
156 global_config,
157 config,
158 filters_config,
159 metrics_config,
160 client,
161 client_addr,
162 received,
163 clock_drift_processor,
164 } = session_processing_config;
165
166 let mut changed = false;
167 let payload = item.payload();
168 let max_secs_in_future = config.max_secs_in_future();
169 let max_session_secs_in_past = config.max_session_secs_in_past();
170
171 let mut session = match SessionUpdate::parse(&payload) {
173 Ok(session) => session,
174 Err(error) => {
175 relay_log::trace!(
176 error = &error as &dyn Error,
177 "skipping invalid session payload"
178 );
179 return false;
180 }
181 };
182
183 if session.sequence == u64::MAX {
184 relay_log::trace!("skipping session due to sequence overflow");
185 return false;
186 };
187
188 if clock_drift_processor.is_drifted() {
189 relay_log::trace!("applying clock drift correction to session");
190 clock_drift_processor.process_datetime(&mut session.started);
191 clock_drift_processor.process_datetime(&mut session.timestamp);
192 changed = true;
193 }
194
195 if session.timestamp < session.started {
196 relay_log::trace!("fixing session timestamp to {}", session.timestamp);
197 session.timestamp = session.started;
198 changed = true;
199 }
200
201 let session_delay = received - session.timestamp;
203 if session_delay > SignedDuration::minutes(1) {
204 metric!(
205 timer(RelayTimers::TimestampDelay) = session_delay.to_std().unwrap(),
206 category = "session",
207 );
208 }
209
210 for t in [session.timestamp, session.started] {
212 if !is_valid_session_timestamp(received, t, max_secs_in_future, max_session_secs_in_past) {
213 return false;
214 }
215 }
216
217 match validate_attributes(&client_addr, &mut session.attributes) {
219 Err(_) => return false,
220 Ok(changed_attributes) => {
221 changed |= changed_attributes;
222 }
223 }
224
225 if config.processing_enabled() && matches!(session.status, SessionStatus::Unknown(_)) {
226 return false;
227 }
228
229 if relay_filter::should_filter(
230 &session,
231 client_addr,
232 filters_config,
233 global_config.filters(),
234 )
235 .is_err()
236 {
237 return false;
238 };
239
240 if metrics_config.is_enabled()
242 && !item.metrics_extracted()
243 && !matches!(session.status, SessionStatus::Unknown(_))
244 {
245 crate::metrics_extraction::sessions::extract_session_metrics(
246 &session.attributes,
247 &session,
248 client,
249 extracted_metrics,
250 metrics_config.should_extract_abnormal_mechanism(),
251 );
252 item.set_metrics_extracted(true);
253 }
254
255 if item.metrics_extracted() {
257 return false;
258 } else if config.processing_enabled() {
259 relay_log::error!(
260 "Session metrics extraction disabled on a processing Relay, \
261 make sure you're running an up to date Relay matching the Sentry \
262 version."
263 );
264 return false;
265 }
266
267 if changed {
268 let json_string = match serde_json::to_string(&session) {
269 Ok(json) => json,
270 Err(err) => {
271 relay_log::error!(error = &err as &dyn Error, "failed to serialize session");
272 return false;
273 }
274 };
275
276 item.set_payload(ContentType::Json, json_string);
277 }
278
279 true
280}
281
282#[allow(clippy::too_many_arguments)]
283fn process_session_aggregates(
284 item: &mut Item,
285 session_processing_config: SessionProcessingConfig,
286 extracted_metrics: &mut Vec<Bucket>,
287) -> bool {
288 let SessionProcessingConfig {
289 global_config,
290 config,
291 filters_config,
292 metrics_config,
293 client,
294 client_addr,
295 received,
296 clock_drift_processor,
297 } = session_processing_config;
298
299 let mut changed = false;
300 let payload = item.payload();
301 let max_secs_in_future = config.max_secs_in_future();
302 let max_session_secs_in_past = config.max_session_secs_in_past();
303
304 let mut session = match SessionAggregates::parse(&payload) {
305 Ok(session) => session,
306 Err(error) => {
307 relay_log::trace!(
308 error = &error as &dyn Error,
309 "skipping invalid sessions payload"
310 );
311 return false;
312 }
313 };
314
315 if clock_drift_processor.is_drifted() {
316 relay_log::trace!("applying clock drift correction to session");
317 for aggregate in &mut session.aggregates {
318 clock_drift_processor.process_datetime(&mut aggregate.started);
319 }
320 changed = true;
321 }
322
323 session.aggregates.retain(|aggregate| {
325 is_valid_session_timestamp(
326 received,
327 aggregate.started,
328 max_secs_in_future,
329 max_session_secs_in_past,
330 )
331 });
332
333 if session.aggregates.is_empty() {
335 return false;
336 }
337
338 match validate_attributes(&client_addr, &mut session.attributes) {
340 Err(_) => return false,
341 Ok(changed_attributes) => {
342 changed |= changed_attributes;
343 }
344 }
345
346 if relay_filter::should_filter(
347 &session,
348 client_addr,
349 filters_config,
350 global_config.filters(),
351 )
352 .is_err()
353 {
354 return false;
355 };
356
357 if metrics_config.is_enabled() && !item.metrics_extracted() {
359 for aggregate in &session.aggregates {
360 crate::metrics_extraction::sessions::extract_session_metrics(
361 &session.attributes,
362 aggregate,
363 client,
364 extracted_metrics,
365 metrics_config.should_extract_abnormal_mechanism(),
366 );
367 item.set_metrics_extracted(true);
368 }
369 }
370
371 if item.metrics_extracted() {
373 return false;
374 }
375
376 if changed {
377 let json_string = match serde_json::to_string(&session) {
378 Ok(json) => json,
379 Err(err) => {
380 relay_log::error!(error = &err as &dyn Error, "failed to serialize session");
381 return false;
382 }
383 };
384
385 item.set_payload(ContentType::Json, json_string);
386 }
387
388 true
389}
390
391#[cfg(test)]
392mod tests {
393 use std::str::FromStr;
394
395 use super::*;
396
397 struct TestProcessSessionArguments<'a> {
398 item: Item,
399 received: DateTime<Utc>,
400 client: Option<&'a str>,
401 client_addr: Option<net::IpAddr>,
402 metrics_config: SessionMetricsConfig,
403 clock_drift_processor: ClockDriftProcessor,
404 extracted_metrics: Vec<Bucket>,
405 }
406
407 impl TestProcessSessionArguments<'_> {
408 fn run_session_producer(&mut self) -> bool {
409 let spc = SessionProcessingConfig {
410 global_config: &Default::default(),
411 config: &Default::default(),
412 filters_config: &Default::default(),
413 metrics_config: &self.metrics_config,
414 client: self.client,
415 client_addr: self.client_addr,
416 received: self.received,
417 clock_drift_processor: &self.clock_drift_processor,
418 };
419 process_session(&mut self.item, spc, &mut self.extracted_metrics)
420 }
421
422 fn default() -> Self {
423 let mut item = Item::new(ItemType::Event);
424
425 let session = r#"{
426 "init": false,
427 "started": "2021-04-26T08:00:00+0100",
428 "timestamp": "2021-04-26T08:00:00+0100",
429 "attrs": {
430 "release": "1.0.0"
431 },
432 "did": "user123",
433 "status": "this is not a valid status!",
434 "duration": 123.4
435 }"#;
436
437 item.set_payload(ContentType::Json, session);
438 let received = DateTime::from_str("2021-04-26T08:00:00+0100").unwrap();
439
440 Self {
441 item,
442 received,
443 client: None,
444 client_addr: None,
445 metrics_config: serde_json::from_str(
446 "
447 {
448 \"version\": 0,
449 \"drop\": true
450 }",
451 )
452 .unwrap(),
453 clock_drift_processor: ClockDriftProcessor::new(None, received),
454 extracted_metrics: vec![],
455 }
456 }
457 }
458
459 #[test]
462 fn test_process_session_keep_item() {
463 let mut args = TestProcessSessionArguments::default();
464 assert!(args.run_session_producer());
465 }
466
467 #[test]
468 fn test_process_session_invalid_json() {
469 let mut args = TestProcessSessionArguments::default();
470 args.item
471 .set_payload(ContentType::Json, "this isnt valid json");
472 assert!(!args.run_session_producer());
473 }
474
475 #[test]
476 fn test_process_session_sequence_overflow() {
477 let mut args = TestProcessSessionArguments::default();
478 args.item.set_payload(
479 ContentType::Json,
480 r#"{
481 "init": false,
482 "started": "2021-04-26T08:00:00+0100",
483 "timestamp": "2021-04-26T08:00:00+0100",
484 "seq": 18446744073709551615,
485 "attrs": {
486 "release": "1.0.0"
487 },
488 "did": "user123",
489 "status": "this is not a valid status!",
490 "duration": 123.4
491 }"#,
492 );
493 assert!(!args.run_session_producer());
494 }
495
496 #[test]
497 fn test_process_session_invalid_timestamp() {
498 let mut args = TestProcessSessionArguments::default();
499 args.received = DateTime::from_str("2021-05-26T08:00:00+0100").unwrap();
500 assert!(!args.run_session_producer());
501 }
502
503 #[test]
504 fn test_process_session_metrics_extracted() {
505 let mut args = TestProcessSessionArguments::default();
506 args.item.set_metrics_extracted(true);
507 assert!(!args.run_session_producer());
508 }
509}