1use std::collections::BTreeMap;
4use std::error::Error;
5
6use chrono::{Duration as SignedDuration, Utc};
7use relay_common::time::UnixTimestamp;
8use relay_config::Config;
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{ClientReport, UserReport};
11use relay_filter::FilterStatKey;
12use relay_quotas::ReasonCode;
13use relay_sampling::evaluation::MatchedRuleIds;
14use relay_system::Addr;
15
16use crate::constants::DEFAULT_EVENT_RETENTION;
17use crate::envelope::{ContentType, ItemType};
18use crate::managed::{ItemAction, TypedEnvelope};
19use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome};
20use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT};
21use crate::services::projects::project::ProjectInfo;
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
25pub enum ClientReportField {
26 Filtered,
28
29 FilteredSampling,
31
32 RateLimited,
34
35 ClientDiscard,
37}
38
39pub fn process_client_reports(
45 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
46 config: &Config,
47 project_info: &ProjectInfo,
48 outcome_aggregator: Addr<TrackOutcome>,
49) {
50 if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
53 if config.processing_enabled() {
55 managed_envelope.retain_items(|item| match item.ty() {
56 ItemType::ClientReport => ItemAction::DropSilently,
57 _ => ItemAction::Keep,
58 });
59 }
60 return;
61 }
62
63 let mut timestamp = None;
64 let mut output_events = BTreeMap::new();
65 let received = managed_envelope.received_at();
66
67 let clock_drift_processor =
68 ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received)
69 .at_least(MINIMUM_CLOCK_DRIFT);
70
71 managed_envelope.retain_items(|item| {
74 if item.ty() != &ItemType::ClientReport {
75 return ItemAction::Keep;
76 };
77 match ClientReport::parse(&item.payload()) {
78 Ok(ClientReport {
79 timestamp: report_timestamp,
80 discarded_events,
81 rate_limited_events,
82 filtered_events,
83 filtered_sampling_events,
84 }) => {
85 let input_events =
87 discarded_events
88 .into_iter()
89 .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
90 .chain(
91 filtered_events.into_iter().map(|discarded_event| {
92 (ClientReportField::Filtered, discarded_event)
93 }),
94 )
95 .chain(filtered_sampling_events.into_iter().map(|discarded_event| {
96 (ClientReportField::FilteredSampling, discarded_event)
97 }))
98 .chain(rate_limited_events.into_iter().map(|discarded_event| {
99 (ClientReportField::RateLimited, discarded_event)
100 }));
101
102 for (outcome_type, discarded_event) in input_events {
103 if discarded_event.reason.len() > 200 {
104 relay_log::trace!("ignored client outcome with an overlong reason");
105 continue;
106 }
107 *output_events
108 .entry((
109 outcome_type,
110 discarded_event.reason,
111 discarded_event.category,
112 ))
113 .or_insert(0) += discarded_event.quantity;
114 }
115 if let Some(ts) = report_timestamp {
116 timestamp.get_or_insert(ts);
117 }
118 }
119 Err(err) => {
120 relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
121 }
122 }
123 ItemAction::DropSilently
124 });
125
126 if output_events.is_empty() {
127 return;
128 }
129
130 let timestamp =
131 timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));
132
133 if clock_drift_processor.is_drifted() {
134 relay_log::trace!("applying clock drift correction to client report");
135 clock_drift_processor.process_timestamp(timestamp);
136 }
137
138 let retention_days = project_info
139 .config()
140 .event_retention
141 .unwrap_or(DEFAULT_EVENT_RETENTION);
142 let max_age = SignedDuration::days(retention_days.into());
143 let in_past = timestamp
145 .as_datetime()
146 .map(|ts| (received - ts) > max_age)
147 .unwrap_or(true);
148 if in_past {
149 relay_log::trace!(
150 "skipping client outcomes older than {} days",
151 max_age.num_days()
152 );
153 return;
154 }
155
156 let max_future = SignedDuration::seconds(config.max_secs_in_future());
157 let in_future = timestamp
159 .as_datetime()
160 .map(|ts| (ts - received) > max_future)
161 .unwrap_or(true);
162 if in_future {
163 relay_log::trace!(
164 "skipping client outcomes more than {}s in the future",
165 max_future.num_seconds()
166 );
167 return;
168 }
169
170 for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
171 let outcome = match outcome_from_parts(outcome_type, &reason) {
172 Ok(outcome) => outcome,
173 Err(_) => {
174 relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
175 continue;
176 }
177 };
178
179 outcome_aggregator.send(TrackOutcome {
180 timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
184 scoping: managed_envelope.scoping(),
185 outcome,
186 event_id: None,
187 remote_addr: None, category,
189 quantity,
190 });
191 }
192}
193
194pub fn process_user_reports<Group>(managed_envelope: &mut TypedEnvelope<Group>) {
200 managed_envelope.retain_items(|item| {
201 if item.ty() != &ItemType::UserReport {
202 return ItemAction::Keep;
203 };
204
205 let payload = item.payload();
206 let payload = trim_whitespaces(&payload);
210 let report = match serde_json::from_slice::<UserReport>(payload) {
211 Ok(report) => report,
212 Err(error) => {
213 relay_log::debug!(
214 error = &error as &dyn Error,
215 "failed to deserialize user report"
216 );
217 return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
218 }
219 };
220
221 let json_string = match serde_json::to_string(&report) {
222 Ok(json) => json,
223 Err(err) => {
224 relay_log::error!(
225 error = &err as &dyn Error,
226 "failed to serialize user report"
227 );
228 return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
229 }
230 };
231
232 item.set_payload(ContentType::Json, json_string);
233 ItemAction::Keep
234 });
235}
236
237fn trim_whitespaces(data: &[u8]) -> &[u8] {
238 let Some(from) = data.iter().position(|x| !x.is_ascii_whitespace()) else {
239 return &[];
240 };
241 let Some(to) = data.iter().rposition(|x| !x.is_ascii_whitespace()) else {
242 return &[];
243 };
244 &data[from..to + 1]
245}
246
247fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
251 match field {
252 ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
253 Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
254 .map(RuleCategories::from)
255 .map(Outcome::FilteredSampling)
256 .map_err(|_| ()),
257 None => Err(()),
258 },
259 ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
260 ClientReportField::Filtered => Ok(Outcome::Filtered(
261 FilterStatKey::try_from(reason).map_err(|_| ())?,
262 )),
263 ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
264 "" => None,
265 other => Some(ReasonCode::new(other)),
266 })),
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use relay_config::Config;
273 use relay_event_schema::protocol::EventId;
274
275 use crate::envelope::{Envelope, Item};
276 use crate::extractors::RequestMeta;
277 use crate::managed::ManagedEnvelope;
278 use crate::processing;
279 use crate::services::outcome::RuleCategory;
280 use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
281 use crate::testutils::create_test_processor;
282
283 use super::*;
284
285 #[tokio::test]
286 async fn test_client_report_removal() {
287 relay_test::setup();
288 let outcome_aggregator = Addr::dummy();
289
290 let config = Config::from_json_value(serde_json::json!({
291 "outcomes": {
292 "emit_outcomes": true,
293 "emit_client_outcomes": true
294 }
295 }))
296 .unwrap();
297
298 let processor = create_test_processor(Default::default()).await;
299
300 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
301 .parse()
302 .unwrap();
303
304 let request_meta = RequestMeta::new(dsn);
305 let mut envelope = Envelope::from_request(None, request_meta);
306
307 envelope.add_item({
308 let mut item = Item::new(ItemType::ClientReport);
309 item.set_payload(
310 ContentType::Json,
311 r#"
312 {
313 "discarded_events": [
314 ["queue_full", "error", 42]
315 ]
316 }
317 "#,
318 );
319 item
320 });
321
322 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
323 assert_eq!(envelopes.len(), 1);
324 let (group, envelope) = envelopes.pop().unwrap();
325
326 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
327 let message = ProcessEnvelopeGrouped {
328 group,
329 envelope,
330 ctx: processing::Context {
331 config: &config,
332 ..processing::Context::for_test()
333 },
334 };
335
336 let envelope = processor.process(message).await.unwrap();
337 assert!(envelope.is_none());
338 }
339
340 #[tokio::test]
341 async fn test_client_report_forwarding() {
342 relay_test::setup();
343 let outcome_aggregator = Addr::dummy();
344
345 let config = Config::from_json_value(serde_json::json!({
346 "outcomes": {
347 "emit_outcomes": false,
348 "emit_client_outcomes": true
350 }
351 }))
352 .unwrap();
353
354 let processor = create_test_processor(Default::default()).await;
355
356 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
357 .parse()
358 .unwrap();
359
360 let request_meta = RequestMeta::new(dsn);
361 let mut envelope = Envelope::from_request(None, request_meta);
362
363 envelope.add_item({
364 let mut item = Item::new(ItemType::ClientReport);
365 item.set_payload(
366 ContentType::Json,
367 r#"
368 {
369 "discarded_events": [
370 ["queue_full", "error", 42]
371 ]
372 }
373 "#,
374 );
375 item
376 });
377
378 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
379 assert_eq!(envelopes.len(), 1);
380 let (group, envelope) = envelopes.pop().unwrap();
381 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
382
383 let message = ProcessEnvelopeGrouped {
384 group,
385 envelope,
386 ctx: processing::Context {
387 config: &config,
388 ..processing::Context::for_test()
389 },
390 };
391
392 let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else {
393 panic!();
394 };
395 let item = new_envelope.envelope().items().next().unwrap();
396 assert_eq!(item.ty(), &ItemType::ClientReport);
397
398 new_envelope.accept(); }
400
401 #[tokio::test]
402 #[cfg(feature = "processing")]
403 async fn test_client_report_removal_in_processing() {
404 relay_test::setup();
405 let outcome_aggregator = Addr::dummy();
406
407 let config = Config::from_json_value(serde_json::json!({
408 "outcomes": {
409 "emit_outcomes": true,
410 "emit_client_outcomes": false,
411 },
412 "processing": {
413 "enabled": true,
414 "kafka_config": [],
415 }
416 }))
417 .unwrap();
418
419 let processor = create_test_processor(Default::default()).await;
420
421 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
422 .parse()
423 .unwrap();
424
425 let request_meta = RequestMeta::new(dsn);
426 let mut envelope = Envelope::from_request(None, request_meta);
427
428 envelope.add_item({
429 let mut item = Item::new(ItemType::ClientReport);
430 item.set_payload(
431 ContentType::Json,
432 r#"
433 {
434 "discarded_events": [
435 ["queue_full", "error", 42]
436 ]
437 }
438 "#,
439 );
440 item
441 });
442
443 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
444 assert_eq!(envelopes.len(), 1);
445
446 let (group, envelope) = envelopes.pop().unwrap();
447 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
448 let message = ProcessEnvelopeGrouped {
449 group,
450 envelope,
451 ctx: processing::Context {
452 config: &config,
453 ..processing::Context::for_test()
454 },
455 };
456
457 let envelope = processor.process(message).await.unwrap();
458 assert!(envelope.is_none());
459 }
460
461 #[tokio::test]
462 async fn test_user_report_only() {
463 relay_log::init_test!();
464 let processor = create_test_processor(Default::default()).await;
465 let outcome_aggregator = Addr::dummy();
466 let event_id = EventId::new();
467
468 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
469 .parse()
470 .unwrap();
471
472 let request_meta = RequestMeta::new(dsn);
473 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
474
475 envelope.add_item({
476 let mut item = Item::new(ItemType::UserReport);
477 item.set_payload(
478 ContentType::Json,
479 format!(r#"{{"event_id": "{event_id}"}}"#),
480 );
481 item
482 });
483
484 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
485 assert_eq!(envelopes.len(), 1);
486
487 let (group, envelope) = envelopes.pop().unwrap();
488
489 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
490 let message = ProcessEnvelopeGrouped {
491 group,
492 envelope,
493 ctx: processing::Context::for_test(),
494 };
495
496 let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else {
497 panic!();
498 };
499 let new_envelope = new_envelope.envelope();
500
501 assert_eq!(new_envelope.len(), 1);
502 assert_eq!(
503 new_envelope.items().next().unwrap().ty(),
504 &ItemType::UserReport
505 );
506 }
507
508 #[tokio::test]
509 async fn test_user_report_invalid() {
510 let processor = create_test_processor(Default::default()).await;
511 let outcome_aggregator = Addr::dummy();
512 let event_id = EventId::new();
513
514 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
515 .parse()
516 .unwrap();
517
518 let request_meta = RequestMeta::new(dsn);
519 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
520
521 envelope.add_item({
522 let mut item = Item::new(ItemType::UserReport);
523 item.set_payload(ContentType::Json, r#"{"foo": "bar"}"#);
524 item
525 });
526
527 envelope.add_item({
528 let mut item = Item::new(ItemType::Event);
529 item.set_payload(ContentType::Json, "{}");
530 item
531 });
532
533 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
534 assert_eq!(envelopes.len(), 1);
535 let (group, envelope) = envelopes.pop().unwrap();
536 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
537
538 let message = ProcessEnvelopeGrouped {
539 group,
540 envelope,
541 ctx: processing::Context::for_test(),
542 };
543
544 let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else {
545 panic!();
546 };
547 let new_envelope = new_envelope.envelope();
548
549 assert_eq!(new_envelope.len(), 1);
550 assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event);
551 }
552
553 #[test]
554 fn test_from_outcome_type_sampled() {
555 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err());
556
557 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err());
558
559 assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err());
560
561 assert!(matches!(
562 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"),
563 Err(())
564 ));
565
566 assert!(matches!(
567 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"),
568 Err(())
569 ));
570
571 assert!(matches!(
572 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"),
573 Err(())
574 ));
575
576 assert_eq!(
577 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"),
578 Ok(Outcome::FilteredSampling(RuleCategories(
579 [RuleCategory::Other].into()
580 )))
581 );
582
583 assert_eq!(
584 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
585 Ok(Outcome::FilteredSampling(RuleCategories(
586 [RuleCategory::Other].into()
587 )))
588 );
589
590 assert_eq!(
591 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
592 Ok(Outcome::FilteredSampling(RuleCategories(
593 [RuleCategory::Other].into()
594 )))
595 );
596
597 assert_eq!(
598 outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"),
599 Ok(Outcome::FilteredSampling(RuleCategories(
600 [RuleCategory::BoostEnvironments].into()
601 )))
602 );
603
604 assert_eq!(
605 outcome_from_parts(
606 ClientReportField::FilteredSampling,
607 "Sampled:1001,1456,1567,3333,4444"
608 ),
609 Ok(Outcome::FilteredSampling(RuleCategories(
610 [
611 RuleCategory::BoostEnvironments,
612 RuleCategory::BoostLowVolumeTransactions,
613 RuleCategory::BoostLatestReleases,
614 RuleCategory::Custom
615 ]
616 .into()
617 )))
618 );
619 }
620
621 #[test]
622 fn test_from_outcome_type_filtered() {
623 assert!(matches!(
624 outcome_from_parts(ClientReportField::Filtered, "error-message"),
625 Ok(Outcome::Filtered(FilterStatKey::ErrorMessage))
626 ));
627
628 assert!(matches!(
629 outcome_from_parts(ClientReportField::Filtered, "hydration-error"),
630 Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_)))
631 ));
632 }
633
634 #[test]
635 fn test_from_outcome_type_client_discard() {
636 assert_eq!(
637 outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(),
638 Outcome::ClientDiscard("foo_reason".into())
639 );
640 }
641
642 #[test]
643 fn test_from_outcome_type_rate_limited() {
644 assert!(matches!(
645 outcome_from_parts(ClientReportField::RateLimited, ""),
646 Ok(Outcome::RateLimited(None))
647 ));
648 assert_eq!(
649 outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(),
650 Outcome::RateLimited(Some(ReasonCode::new("foo_reason")))
651 );
652 }
653
654 #[test]
655 fn test_trim_whitespaces() {
656 assert_eq!(trim_whitespaces(b""), b"");
657 assert_eq!(trim_whitespaces(b" \n\r "), b"");
658 assert_eq!(trim_whitespaces(b" \nx\r "), b"x");
659 assert_eq!(trim_whitespaces(b" {foo: bar} "), b"{foo: bar}");
660 assert_eq!(trim_whitespaces(b"{ foo: bar}"), b"{ foo: bar}");
661 }
662}