1use std::collections::BTreeMap;
4use std::error::Error;
5
6use chrono::{Duration as SignedDuration, Utc};
7use relay_common::time::UnixTimestamp;
8use relay_config::Config;
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{ClientReport, UserReport};
11use relay_filter::FilterStatKey;
12use relay_quotas::ReasonCode;
13use relay_sampling::evaluation::MatchedRuleIds;
14use relay_system::Addr;
15
16use crate::constants::DEFAULT_EVENT_RETENTION;
17use crate::envelope::{ContentType, ItemType};
18use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome};
19use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT};
20use crate::services::projects::project::ProjectInfo;
21use crate::utils::{ItemAction, TypedEnvelope};
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
25pub enum ClientReportField {
26 Filtered,
28
29 FilteredSampling,
31
32 RateLimited,
34
35 ClientDiscard,
37}
38
39pub fn process_client_reports(
45 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
46 config: &Config,
47 project_info: &ProjectInfo,
48 outcome_aggregator: Addr<TrackOutcome>,
49) {
50 if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
53 if config.processing_enabled() {
55 managed_envelope.retain_items(|item| match item.ty() {
56 ItemType::ClientReport => ItemAction::DropSilently,
57 _ => ItemAction::Keep,
58 });
59 }
60 return;
61 }
62
63 let mut timestamp = None;
64 let mut output_events = BTreeMap::new();
65 let received = managed_envelope.received_at();
66
67 let clock_drift_processor =
68 ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received)
69 .at_least(MINIMUM_CLOCK_DRIFT);
70
71 managed_envelope.retain_items(|item| {
74 if item.ty() != &ItemType::ClientReport {
75 return ItemAction::Keep;
76 };
77 match ClientReport::parse(&item.payload()) {
78 Ok(ClientReport {
79 timestamp: report_timestamp,
80 discarded_events,
81 rate_limited_events,
82 filtered_events,
83 filtered_sampling_events,
84 }) => {
85 let input_events =
87 discarded_events
88 .into_iter()
89 .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
90 .chain(
91 filtered_events.into_iter().map(|discarded_event| {
92 (ClientReportField::Filtered, discarded_event)
93 }),
94 )
95 .chain(filtered_sampling_events.into_iter().map(|discarded_event| {
96 (ClientReportField::FilteredSampling, discarded_event)
97 }))
98 .chain(rate_limited_events.into_iter().map(|discarded_event| {
99 (ClientReportField::RateLimited, discarded_event)
100 }));
101
102 for (outcome_type, discarded_event) in input_events {
103 if discarded_event.reason.len() > 200 {
104 relay_log::trace!("ignored client outcome with an overlong reason");
105 continue;
106 }
107 *output_events
108 .entry((
109 outcome_type,
110 discarded_event.reason,
111 discarded_event.category,
112 ))
113 .or_insert(0) += discarded_event.quantity;
114 }
115 if let Some(ts) = report_timestamp {
116 timestamp.get_or_insert(ts);
117 }
118 }
119 Err(err) => {
120 relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
121 }
122 }
123 ItemAction::DropSilently
124 });
125
126 if output_events.is_empty() {
127 return;
128 }
129
130 let timestamp =
131 timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));
132
133 if clock_drift_processor.is_drifted() {
134 relay_log::trace!("applying clock drift correction to client report");
135 clock_drift_processor.process_timestamp(timestamp);
136 }
137
138 let retention_days = project_info
139 .config()
140 .event_retention
141 .unwrap_or(DEFAULT_EVENT_RETENTION);
142 let max_age = SignedDuration::days(retention_days.into());
143 let in_past = timestamp
145 .as_datetime()
146 .map(|ts| (received - ts) > max_age)
147 .unwrap_or(true);
148 if in_past {
149 relay_log::trace!(
150 "skipping client outcomes older than {} days",
151 max_age.num_days()
152 );
153 return;
154 }
155
156 let max_future = SignedDuration::seconds(config.max_secs_in_future());
157 let in_future = timestamp
159 .as_datetime()
160 .map(|ts| (ts - received) > max_future)
161 .unwrap_or(true);
162 if in_future {
163 relay_log::trace!(
164 "skipping client outcomes more than {}s in the future",
165 max_future.num_seconds()
166 );
167 return;
168 }
169
170 for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
171 let outcome = match outcome_from_parts(outcome_type, &reason) {
172 Ok(outcome) => outcome,
173 Err(_) => {
174 relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
175 continue;
176 }
177 };
178
179 outcome_aggregator.send(TrackOutcome {
180 timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
184 scoping: managed_envelope.scoping(),
185 outcome,
186 event_id: None,
187 remote_addr: None, category,
189 quantity,
190 });
191 }
192}
193
194pub fn process_user_reports<Group>(managed_envelope: &mut TypedEnvelope<Group>) {
200 managed_envelope.retain_items(|item| {
201 if item.ty() != &ItemType::UserReport {
202 return ItemAction::Keep;
203 };
204
205 let payload = item.payload();
206 let payload = trim_whitespaces(&payload);
210 let report = match serde_json::from_slice::<UserReport>(payload) {
211 Ok(report) => report,
212 Err(error) => {
213 relay_log::error!(error = &error as &dyn Error, "failed to store user report");
214 return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
215 }
216 };
217
218 let json_string = match serde_json::to_string(&report) {
219 Ok(json) => json,
220 Err(err) => {
221 relay_log::error!(
222 error = &err as &dyn Error,
223 "failed to serialize user report"
224 );
225 return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
226 }
227 };
228
229 item.set_payload(ContentType::Json, json_string);
230 ItemAction::Keep
231 });
232}
233
234fn trim_whitespaces(data: &[u8]) -> &[u8] {
235 let Some(from) = data.iter().position(|x| !x.is_ascii_whitespace()) else {
236 return &[];
237 };
238 let Some(to) = data.iter().rposition(|x| !x.is_ascii_whitespace()) else {
239 return &[];
240 };
241 &data[from..to + 1]
242}
243
244fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
248 match field {
249 ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
250 Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
251 .map(RuleCategories::from)
252 .map(Outcome::FilteredSampling)
253 .map_err(|_| ()),
254 None => Err(()),
255 },
256 ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
257 ClientReportField::Filtered => Ok(Outcome::Filtered(
258 FilterStatKey::try_from(reason).map_err(|_| ())?,
259 )),
260 ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
261 "" => None,
262 other => Some(ReasonCode::new(other)),
263 })),
264 }
265}
266
267#[cfg(test)]
268mod tests {
269
270 use std::sync::Arc;
271
272 use relay_cogs::Token;
273 use relay_config::Config;
274 use relay_event_schema::protocol::EventId;
275 use relay_sampling::evaluation::ReservoirCounters;
276
277 use crate::envelope::{Envelope, Item};
278 use crate::extractors::RequestMeta;
279 use crate::services::outcome::RuleCategory;
280 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
281 use crate::services::projects::project::ProjectInfo;
282 use crate::testutils::{self, create_test_processor};
283 use crate::utils::ManagedEnvelope;
284
285 use super::*;
286
287 #[tokio::test]
288 async fn test_client_report_removal() {
289 relay_test::setup();
290 let (outcome_aggregator, test_store) = testutils::processor_services();
291
292 let config = Config::from_json_value(serde_json::json!({
293 "outcomes": {
294 "emit_outcomes": true,
295 "emit_client_outcomes": true
296 }
297 }))
298 .unwrap();
299
300 let processor = create_test_processor(config).await;
301
302 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
303 .parse()
304 .unwrap();
305
306 let request_meta = RequestMeta::new(dsn);
307 let mut envelope = Envelope::from_request(None, request_meta);
308
309 envelope.add_item({
310 let mut item = Item::new(ItemType::ClientReport);
311 item.set_payload(
312 ContentType::Json,
313 r#"
314 {
315 "discarded_events": [
316 ["queue_full", "error", 42]
317 ]
318 }
319 "#,
320 );
321 item
322 });
323
324 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
325 assert_eq!(envelopes.len(), 1);
326 let (group, envelope) = envelopes.pop().unwrap();
327
328 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
329 let message = ProcessEnvelopeGrouped {
330 group,
331 envelope,
332 project_info: Arc::new(ProjectInfo::default()),
333 rate_limits: Default::default(),
334 sampling_project_info: None,
335 reservoir_counters: ReservoirCounters::default(),
336 };
337
338 let envelope = processor
339 .process(&mut Token::noop(), message)
340 .await
341 .unwrap();
342 assert!(envelope.is_none());
343 }
344
345 #[tokio::test]
346 async fn test_client_report_forwarding() {
347 relay_test::setup();
348 let (outcome_aggregator, test_store) = testutils::processor_services();
349
350 let config = Config::from_json_value(serde_json::json!({
351 "outcomes": {
352 "emit_outcomes": false,
353 "emit_client_outcomes": true
355 }
356 }))
357 .unwrap();
358
359 let processor = create_test_processor(config).await;
360
361 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
362 .parse()
363 .unwrap();
364
365 let request_meta = RequestMeta::new(dsn);
366 let mut envelope = Envelope::from_request(None, request_meta);
367
368 envelope.add_item({
369 let mut item = Item::new(ItemType::ClientReport);
370 item.set_payload(
371 ContentType::Json,
372 r#"
373 {
374 "discarded_events": [
375 ["queue_full", "error", 42]
376 ]
377 }
378 "#,
379 );
380 item
381 });
382
383 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
384 assert_eq!(envelopes.len(), 1);
385 let (group, envelope) = envelopes.pop().unwrap();
386 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
387
388 let message = ProcessEnvelopeGrouped {
389 group,
390 envelope,
391 project_info: Arc::new(ProjectInfo::default()),
392 rate_limits: Default::default(),
393 sampling_project_info: None,
394 reservoir_counters: ReservoirCounters::default(),
395 };
396
397 let Ok(Some(Submit::Envelope(new_envelope))) =
398 processor.process(&mut Token::noop(), message).await
399 else {
400 panic!();
401 };
402 let item = new_envelope.envelope().items().next().unwrap();
403 assert_eq!(item.ty(), &ItemType::ClientReport);
404
405 new_envelope.accept(); }
407
408 #[tokio::test]
409 #[cfg(feature = "processing")]
410 async fn test_client_report_removal_in_processing() {
411 relay_test::setup();
412 let (outcome_aggregator, test_store) = testutils::processor_services();
413
414 let config = Config::from_json_value(serde_json::json!({
415 "outcomes": {
416 "emit_outcomes": true,
417 "emit_client_outcomes": false,
418 },
419 "processing": {
420 "enabled": true,
421 "kafka_config": [],
422 }
423 }))
424 .unwrap();
425
426 let processor = create_test_processor(config).await;
427
428 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
429 .parse()
430 .unwrap();
431
432 let request_meta = RequestMeta::new(dsn);
433 let mut envelope = Envelope::from_request(None, request_meta);
434
435 envelope.add_item({
436 let mut item = Item::new(ItemType::ClientReport);
437 item.set_payload(
438 ContentType::Json,
439 r#"
440 {
441 "discarded_events": [
442 ["queue_full", "error", 42]
443 ]
444 }
445 "#,
446 );
447 item
448 });
449
450 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
451 assert_eq!(envelopes.len(), 1);
452
453 let (group, envelope) = envelopes.pop().unwrap();
454 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
455 let message = ProcessEnvelopeGrouped {
456 group,
457 envelope,
458 project_info: Arc::new(ProjectInfo::default()),
459 rate_limits: Default::default(),
460 sampling_project_info: None,
461 reservoir_counters: ReservoirCounters::default(),
462 };
463
464 let envelope = processor
465 .process(&mut Token::noop(), message)
466 .await
467 .unwrap();
468 assert!(envelope.is_none());
469 }
470
471 #[tokio::test]
472 async fn test_user_report_only() {
473 relay_log::init_test!();
474 let processor = create_test_processor(Default::default()).await;
475 let (outcome_aggregator, test_store) = testutils::processor_services();
476 let event_id = EventId::new();
477
478 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
479 .parse()
480 .unwrap();
481
482 let request_meta = RequestMeta::new(dsn);
483 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
484
485 envelope.add_item({
486 let mut item = Item::new(ItemType::UserReport);
487 item.set_payload(
488 ContentType::Json,
489 format!(r#"{{"event_id": "{event_id}"}}"#),
490 );
491 item
492 });
493
494 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
495 assert_eq!(envelopes.len(), 1);
496
497 let (group, envelope) = envelopes.pop().unwrap();
498
499 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
500 let message = ProcessEnvelopeGrouped {
501 group,
502 envelope,
503 project_info: Arc::new(ProjectInfo::default()),
504 rate_limits: Default::default(),
505 sampling_project_info: None,
506 reservoir_counters: ReservoirCounters::default(),
507 };
508
509 let Ok(Some(Submit::Envelope(new_envelope))) =
510 processor.process(&mut Token::noop(), message).await
511 else {
512 panic!();
513 };
514 let new_envelope = new_envelope.envelope();
515
516 assert_eq!(new_envelope.len(), 1);
517 assert_eq!(
518 new_envelope.items().next().unwrap().ty(),
519 &ItemType::UserReport
520 );
521 }
522
523 #[tokio::test]
524 async fn test_user_report_invalid() {
525 let processor = create_test_processor(Default::default()).await;
526 let (outcome_aggregator, test_store) = testutils::processor_services();
527 let event_id = EventId::new();
528
529 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
530 .parse()
531 .unwrap();
532
533 let request_meta = RequestMeta::new(dsn);
534 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
535
536 envelope.add_item({
537 let mut item = Item::new(ItemType::UserReport);
538 item.set_payload(ContentType::Json, r#"{"foo": "bar"}"#);
539 item
540 });
541
542 envelope.add_item({
543 let mut item = Item::new(ItemType::Event);
544 item.set_payload(ContentType::Json, "{}");
545 item
546 });
547
548 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
549 assert_eq!(envelopes.len(), 1);
550 let (group, envelope) = envelopes.pop().unwrap();
551 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
552
553 let message = ProcessEnvelopeGrouped {
554 group,
555 envelope,
556 project_info: Arc::new(ProjectInfo::default()),
557 rate_limits: Default::default(),
558 sampling_project_info: None,
559 reservoir_counters: ReservoirCounters::default(),
560 };
561
562 let Ok(Some(Submit::Envelope(new_envelope))) =
563 processor.process(&mut Token::noop(), message).await
564 else {
565 panic!();
566 };
567 let new_envelope = new_envelope.envelope();
568
569 assert_eq!(new_envelope.len(), 1);
570 assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event);
571 }
572
573 #[test]
574 fn test_from_outcome_type_sampled() {
575 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err());
576
577 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err());
578
579 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err());
580
581 assert!(matches!(
582 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"),
583 Err(())
584 ));
585
586 assert!(matches!(
587 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"),
588 Err(())
589 ));
590
591 assert!(matches!(
592 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"),
593 Err(())
594 ));
595
596 assert_eq!(
597 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"),
598 Ok(Outcome::FilteredSampling(RuleCategories(
599 [RuleCategory::Other].into()
600 )))
601 );
602
603 assert_eq!(
604 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
605 Ok(Outcome::FilteredSampling(RuleCategories(
606 [RuleCategory::Other].into()
607 )))
608 );
609
610 assert_eq!(
611 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
612 Ok(Outcome::FilteredSampling(RuleCategories(
613 [RuleCategory::Other].into()
614 )))
615 );
616
617 assert_eq!(
618 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"),
619 Ok(Outcome::FilteredSampling(RuleCategories(
620 [RuleCategory::BoostEnvironments].into()
621 )))
622 );
623
624 assert_eq!(
625 outcome_from_parts(
626 ClientReportField::FilteredSampling,
627 "Sampled:1001,1456,1567,3333,4444"
628 ),
629 Ok(Outcome::FilteredSampling(RuleCategories(
630 [
631 RuleCategory::BoostEnvironments,
632 RuleCategory::BoostLowVolumeTransactions,
633 RuleCategory::BoostLatestReleases,
634 RuleCategory::Custom
635 ]
636 .into()
637 )))
638 );
639 }
640
641 #[test]
642 fn test_from_outcome_type_filtered() {
643 assert!(matches!(
644 outcome_from_parts(ClientReportField::Filtered, "error-message"),
645 Ok(Outcome::Filtered(FilterStatKey::ErrorMessage))
646 ));
647
648 assert!(matches!(
649 outcome_from_parts(ClientReportField::Filtered, "hydration-error"),
650 Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_)))
651 ));
652 }
653
654 #[test]
655 fn test_from_outcome_type_client_discard() {
656 assert_eq!(
657 outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(),
658 Outcome::ClientDiscard("foo_reason".into())
659 );
660 }
661
662 #[test]
663 fn test_from_outcome_type_rate_limited() {
664 assert!(matches!(
665 outcome_from_parts(ClientReportField::RateLimited, ""),
666 Ok(Outcome::RateLimited(None))
667 ));
668 assert_eq!(
669 outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(),
670 Outcome::RateLimited(Some(ReasonCode::new("foo_reason")))
671 );
672 }
673
674 #[test]
675 fn test_trim_whitespaces() {
676 assert_eq!(trim_whitespaces(b""), b"");
677 assert_eq!(trim_whitespaces(b" \n\r "), b"");
678 assert_eq!(trim_whitespaces(b" \nx\r "), b"x");
679 assert_eq!(trim_whitespaces(b" {foo: bar} "), b"{foo: bar}");
680 assert_eq!(trim_whitespaces(b"{ foo: bar}"), b"{ foo: bar}");
681 }
682}