1use std::collections::BTreeMap;
4use std::error::Error;
5
6use chrono::{Duration as SignedDuration, Utc};
7use relay_common::time::UnixTimestamp;
8use relay_config::Config;
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{ClientReport, UserReport};
11use relay_filter::FilterStatKey;
12use relay_quotas::ReasonCode;
13use relay_sampling::evaluation::MatchedRuleIds;
14use relay_system::Addr;
15
16use crate::constants::DEFAULT_EVENT_RETENTION;
17use crate::envelope::{ContentType, ItemType};
18use crate::managed::{ItemAction, TypedEnvelope};
19use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome};
20use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT};
21use crate::services::projects::project::ProjectInfo;
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
25pub enum ClientReportField {
26 Filtered,
28
29 FilteredSampling,
31
32 RateLimited,
34
35 ClientDiscard,
37}
38
39pub fn process_client_reports(
45 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
46 config: &Config,
47 project_info: &ProjectInfo,
48 outcome_aggregator: Addr<TrackOutcome>,
49) {
50 if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
53 if config.processing_enabled() {
55 managed_envelope.retain_items(|item| match item.ty() {
56 ItemType::ClientReport => ItemAction::DropSilently,
57 _ => ItemAction::Keep,
58 });
59 }
60 return;
61 }
62
63 let mut timestamp = None;
64 let mut output_events = BTreeMap::new();
65 let received = managed_envelope.received_at();
66
67 let clock_drift_processor =
68 ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received)
69 .at_least(MINIMUM_CLOCK_DRIFT);
70
71 managed_envelope.retain_items(|item| {
74 if item.ty() != &ItemType::ClientReport {
75 return ItemAction::Keep;
76 };
77 match ClientReport::parse(&item.payload()) {
78 Ok(ClientReport {
79 timestamp: report_timestamp,
80 discarded_events,
81 rate_limited_events,
82 filtered_events,
83 filtered_sampling_events,
84 }) => {
85 let input_events =
87 discarded_events
88 .into_iter()
89 .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
90 .chain(
91 filtered_events.into_iter().map(|discarded_event| {
92 (ClientReportField::Filtered, discarded_event)
93 }),
94 )
95 .chain(filtered_sampling_events.into_iter().map(|discarded_event| {
96 (ClientReportField::FilteredSampling, discarded_event)
97 }))
98 .chain(rate_limited_events.into_iter().map(|discarded_event| {
99 (ClientReportField::RateLimited, discarded_event)
100 }));
101
102 for (outcome_type, discarded_event) in input_events {
103 if discarded_event.reason.len() > 200 {
104 relay_log::trace!("ignored client outcome with an overlong reason");
105 continue;
106 }
107 *output_events
108 .entry((
109 outcome_type,
110 discarded_event.reason,
111 discarded_event.category,
112 ))
113 .or_insert(0) += discarded_event.quantity;
114 }
115 if let Some(ts) = report_timestamp {
116 timestamp.get_or_insert(ts);
117 }
118 }
119 Err(err) => {
120 relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
121 }
122 }
123 ItemAction::DropSilently
124 });
125
126 if output_events.is_empty() {
127 return;
128 }
129
130 let timestamp =
131 timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));
132
133 if clock_drift_processor.is_drifted() {
134 relay_log::trace!("applying clock drift correction to client report");
135 clock_drift_processor.process_timestamp(timestamp);
136 }
137
138 let retention_days = project_info
139 .config()
140 .event_retention
141 .unwrap_or(DEFAULT_EVENT_RETENTION);
142 let max_age = SignedDuration::days(retention_days.into());
143 let in_past = timestamp
145 .as_datetime()
146 .map(|ts| (received - ts) > max_age)
147 .unwrap_or(true);
148 if in_past {
149 relay_log::trace!(
150 "skipping client outcomes older than {} days",
151 max_age.num_days()
152 );
153 return;
154 }
155
156 let max_future = SignedDuration::seconds(config.max_secs_in_future());
157 let in_future = timestamp
159 .as_datetime()
160 .map(|ts| (ts - received) > max_future)
161 .unwrap_or(true);
162 if in_future {
163 relay_log::trace!(
164 "skipping client outcomes more than {}s in the future",
165 max_future.num_seconds()
166 );
167 return;
168 }
169
170 for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
171 let outcome = match outcome_from_parts(outcome_type, &reason) {
172 Ok(outcome) => outcome,
173 Err(_) => {
174 relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
175 continue;
176 }
177 };
178
179 outcome_aggregator.send(TrackOutcome {
180 timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
184 scoping: managed_envelope.scoping(),
185 outcome,
186 event_id: None,
187 remote_addr: None, category,
189 quantity,
190 });
191 }
192}
193
194pub fn process_user_reports<Group>(managed_envelope: &mut TypedEnvelope<Group>) {
200 managed_envelope.retain_items(|item| {
201 if item.ty() != &ItemType::UserReport {
202 return ItemAction::Keep;
203 };
204
205 let payload = item.payload();
206 let payload = trim_whitespaces(&payload);
210 let report = match serde_json::from_slice::<UserReport>(payload) {
211 Ok(report) => report,
212 Err(error) => {
213 relay_log::debug!(
214 error = &error as &dyn Error,
215 "failed to deserialize user report"
216 );
217 return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
218 }
219 };
220
221 let json_string = match serde_json::to_string(&report) {
222 Ok(json) => json,
223 Err(err) => {
224 relay_log::error!(
225 error = &err as &dyn Error,
226 "failed to serialize user report"
227 );
228 return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
229 }
230 };
231
232 item.set_payload(ContentType::Json, json_string);
233 ItemAction::Keep
234 });
235}
236
237fn trim_whitespaces(data: &[u8]) -> &[u8] {
238 let Some(from) = data.iter().position(|x| !x.is_ascii_whitespace()) else {
239 return &[];
240 };
241 let Some(to) = data.iter().rposition(|x| !x.is_ascii_whitespace()) else {
242 return &[];
243 };
244 &data[from..to + 1]
245}
246
247fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
251 match field {
252 ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
253 Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
254 .map(RuleCategories::from)
255 .map(Outcome::FilteredSampling)
256 .map_err(|_| ()),
257 None => Err(()),
258 },
259 ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
260 ClientReportField::Filtered => Ok(Outcome::Filtered(
261 FilterStatKey::try_from(reason).map_err(|_| ())?,
262 )),
263 ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
264 "" => None,
265 other => Some(ReasonCode::new(other)),
266 })),
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use relay_cogs::Token;
273 use relay_config::Config;
274 use relay_event_schema::protocol::EventId;
275
276 use crate::envelope::{Envelope, Item};
277 use crate::extractors::RequestMeta;
278 use crate::managed::ManagedEnvelope;
279 use crate::processing;
280 use crate::services::outcome::RuleCategory;
281 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
282 use crate::testutils::create_test_processor;
283
284 use super::*;
285
286 #[tokio::test]
287 async fn test_client_report_removal() {
288 relay_test::setup();
289 let outcome_aggregator = Addr::dummy();
290
291 let config = Config::from_json_value(serde_json::json!({
292 "outcomes": {
293 "emit_outcomes": true,
294 "emit_client_outcomes": true
295 }
296 }))
297 .unwrap();
298
299 let processor = create_test_processor(Default::default()).await;
300
301 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
302 .parse()
303 .unwrap();
304
305 let request_meta = RequestMeta::new(dsn);
306 let mut envelope = Envelope::from_request(None, request_meta);
307
308 envelope.add_item({
309 let mut item = Item::new(ItemType::ClientReport);
310 item.set_payload(
311 ContentType::Json,
312 r#"
313 {
314 "discarded_events": [
315 ["queue_full", "error", 42]
316 ]
317 }
318 "#,
319 );
320 item
321 });
322
323 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
324 assert_eq!(envelopes.len(), 1);
325 let (group, envelope) = envelopes.pop().unwrap();
326
327 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
328 let message = ProcessEnvelopeGrouped {
329 group,
330 envelope,
331 ctx: processing::Context {
332 config: &config,
333 ..processing::Context::for_test()
334 },
335 };
336
337 let envelope = processor
338 .process(&mut Token::noop(), message)
339 .await
340 .unwrap();
341 assert!(envelope.is_none());
342 }
343
344 #[tokio::test]
345 async fn test_client_report_forwarding() {
346 relay_test::setup();
347 let outcome_aggregator = Addr::dummy();
348
349 let config = Config::from_json_value(serde_json::json!({
350 "outcomes": {
351 "emit_outcomes": false,
352 "emit_client_outcomes": true
354 }
355 }))
356 .unwrap();
357
358 let processor = create_test_processor(Default::default()).await;
359
360 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
361 .parse()
362 .unwrap();
363
364 let request_meta = RequestMeta::new(dsn);
365 let mut envelope = Envelope::from_request(None, request_meta);
366
367 envelope.add_item({
368 let mut item = Item::new(ItemType::ClientReport);
369 item.set_payload(
370 ContentType::Json,
371 r#"
372 {
373 "discarded_events": [
374 ["queue_full", "error", 42]
375 ]
376 }
377 "#,
378 );
379 item
380 });
381
382 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
383 assert_eq!(envelopes.len(), 1);
384 let (group, envelope) = envelopes.pop().unwrap();
385 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
386
387 let message = ProcessEnvelopeGrouped {
388 group,
389 envelope,
390 ctx: processing::Context {
391 config: &config,
392 ..processing::Context::for_test()
393 },
394 };
395
396 let Ok(Some(Submit::Envelope(new_envelope))) =
397 processor.process(&mut Token::noop(), message).await
398 else {
399 panic!();
400 };
401 let item = new_envelope.envelope().items().next().unwrap();
402 assert_eq!(item.ty(), &ItemType::ClientReport);
403
404 new_envelope.accept(); }
406
407 #[tokio::test]
408 #[cfg(feature = "processing")]
409 async fn test_client_report_removal_in_processing() {
410 relay_test::setup();
411 let outcome_aggregator = Addr::dummy();
412
413 let config = Config::from_json_value(serde_json::json!({
414 "outcomes": {
415 "emit_outcomes": true,
416 "emit_client_outcomes": false,
417 },
418 "processing": {
419 "enabled": true,
420 "kafka_config": [],
421 }
422 }))
423 .unwrap();
424
425 let processor = create_test_processor(Default::default()).await;
426
427 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
428 .parse()
429 .unwrap();
430
431 let request_meta = RequestMeta::new(dsn);
432 let mut envelope = Envelope::from_request(None, request_meta);
433
434 envelope.add_item({
435 let mut item = Item::new(ItemType::ClientReport);
436 item.set_payload(
437 ContentType::Json,
438 r#"
439 {
440 "discarded_events": [
441 ["queue_full", "error", 42]
442 ]
443 }
444 "#,
445 );
446 item
447 });
448
449 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
450 assert_eq!(envelopes.len(), 1);
451
452 let (group, envelope) = envelopes.pop().unwrap();
453 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
454 let message = ProcessEnvelopeGrouped {
455 group,
456 envelope,
457 ctx: processing::Context {
458 config: &config,
459 ..processing::Context::for_test()
460 },
461 };
462
463 let envelope = processor
464 .process(&mut Token::noop(), message)
465 .await
466 .unwrap();
467 assert!(envelope.is_none());
468 }
469
470 #[tokio::test]
471 async fn test_user_report_only() {
472 relay_log::init_test!();
473 let processor = create_test_processor(Default::default()).await;
474 let outcome_aggregator = Addr::dummy();
475 let event_id = EventId::new();
476
477 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
478 .parse()
479 .unwrap();
480
481 let request_meta = RequestMeta::new(dsn);
482 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
483
484 envelope.add_item({
485 let mut item = Item::new(ItemType::UserReport);
486 item.set_payload(
487 ContentType::Json,
488 format!(r#"{{"event_id": "{event_id}"}}"#),
489 );
490 item
491 });
492
493 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
494 assert_eq!(envelopes.len(), 1);
495
496 let (group, envelope) = envelopes.pop().unwrap();
497
498 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
499 let message = ProcessEnvelopeGrouped {
500 group,
501 envelope,
502 ctx: processing::Context::for_test(),
503 };
504
505 let Ok(Some(Submit::Envelope(new_envelope))) =
506 processor.process(&mut Token::noop(), message).await
507 else {
508 panic!();
509 };
510 let new_envelope = new_envelope.envelope();
511
512 assert_eq!(new_envelope.len(), 1);
513 assert_eq!(
514 new_envelope.items().next().unwrap().ty(),
515 &ItemType::UserReport
516 );
517 }
518
519 #[tokio::test]
520 async fn test_user_report_invalid() {
521 let processor = create_test_processor(Default::default()).await;
522 let outcome_aggregator = Addr::dummy();
523 let event_id = EventId::new();
524
525 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
526 .parse()
527 .unwrap();
528
529 let request_meta = RequestMeta::new(dsn);
530 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
531
532 envelope.add_item({
533 let mut item = Item::new(ItemType::UserReport);
534 item.set_payload(ContentType::Json, r#"{"foo": "bar"}"#);
535 item
536 });
537
538 envelope.add_item({
539 let mut item = Item::new(ItemType::Event);
540 item.set_payload(ContentType::Json, "{}");
541 item
542 });
543
544 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
545 assert_eq!(envelopes.len(), 1);
546 let (group, envelope) = envelopes.pop().unwrap();
547 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
548
549 let message = ProcessEnvelopeGrouped {
550 group,
551 envelope,
552 ctx: processing::Context::for_test(),
553 };
554
555 let Ok(Some(Submit::Envelope(new_envelope))) =
556 processor.process(&mut Token::noop(), message).await
557 else {
558 panic!();
559 };
560 let new_envelope = new_envelope.envelope();
561
562 assert_eq!(new_envelope.len(), 1);
563 assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event);
564 }
565
566 #[test]
567 fn test_from_outcome_type_sampled() {
568 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err());
569
570 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err());
571
572 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err());
573
574 assert!(matches!(
575 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"),
576 Err(())
577 ));
578
579 assert!(matches!(
580 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"),
581 Err(())
582 ));
583
584 assert!(matches!(
585 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"),
586 Err(())
587 ));
588
589 assert_eq!(
590 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"),
591 Ok(Outcome::FilteredSampling(RuleCategories(
592 [RuleCategory::Other].into()
593 )))
594 );
595
596 assert_eq!(
597 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
598 Ok(Outcome::FilteredSampling(RuleCategories(
599 [RuleCategory::Other].into()
600 )))
601 );
602
603 assert_eq!(
604 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
605 Ok(Outcome::FilteredSampling(RuleCategories(
606 [RuleCategory::Other].into()
607 )))
608 );
609
610 assert_eq!(
611 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"),
612 Ok(Outcome::FilteredSampling(RuleCategories(
613 [RuleCategory::BoostEnvironments].into()
614 )))
615 );
616
617 assert_eq!(
618 outcome_from_parts(
619 ClientReportField::FilteredSampling,
620 "Sampled:1001,1456,1567,3333,4444"
621 ),
622 Ok(Outcome::FilteredSampling(RuleCategories(
623 [
624 RuleCategory::BoostEnvironments,
625 RuleCategory::BoostLowVolumeTransactions,
626 RuleCategory::BoostLatestReleases,
627 RuleCategory::Custom
628 ]
629 .into()
630 )))
631 );
632 }
633
634 #[test]
635 fn test_from_outcome_type_filtered() {
636 assert!(matches!(
637 outcome_from_parts(ClientReportField::Filtered, "error-message"),
638 Ok(Outcome::Filtered(FilterStatKey::ErrorMessage))
639 ));
640
641 assert!(matches!(
642 outcome_from_parts(ClientReportField::Filtered, "hydration-error"),
643 Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_)))
644 ));
645 }
646
647 #[test]
648 fn test_from_outcome_type_client_discard() {
649 assert_eq!(
650 outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(),
651 Outcome::ClientDiscard("foo_reason".into())
652 );
653 }
654
655 #[test]
656 fn test_from_outcome_type_rate_limited() {
657 assert!(matches!(
658 outcome_from_parts(ClientReportField::RateLimited, ""),
659 Ok(Outcome::RateLimited(None))
660 ));
661 assert_eq!(
662 outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(),
663 Outcome::RateLimited(Some(ReasonCode::new("foo_reason")))
664 );
665 }
666
667 #[test]
668 fn test_trim_whitespaces() {
669 assert_eq!(trim_whitespaces(b""), b"");
670 assert_eq!(trim_whitespaces(b" \n\r "), b"");
671 assert_eq!(trim_whitespaces(b" \nx\r "), b"x");
672 assert_eq!(trim_whitespaces(b" {foo: bar} "), b"{foo: bar}");
673 assert_eq!(trim_whitespaces(b"{ foo: bar}"), b"{ foo: bar}");
674 }
675}