relay_server/services/processor/
report.rs

1//! Contains code related to validation and normalization of the user and client reports.
2
3use std::collections::BTreeMap;
4use std::error::Error;
5
6use chrono::{Duration as SignedDuration, Utc};
7use relay_common::time::UnixTimestamp;
8use relay_config::Config;
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{ClientReport, UserReport};
11use relay_filter::FilterStatKey;
12use relay_quotas::ReasonCode;
13use relay_sampling::evaluation::MatchedRuleIds;
14use relay_system::Addr;
15
16use crate::constants::DEFAULT_EVENT_RETENTION;
17use crate::envelope::{ContentType, ItemType};
18use crate::managed::{ItemAction, TypedEnvelope};
19use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome};
20use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT};
21use crate::services::projects::project::ProjectInfo;
22
23/// Fields of client reports that map to specific [`Outcome`]s without content.
24#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
25pub enum ClientReportField {
26    /// The event has been filtered by an inbound data filter.
27    Filtered,
28
29    /// The event has been filtered by a sampling rule.
30    FilteredSampling,
31
32    /// The event has been rate limited.
33    RateLimited,
34
35    /// The event has already been discarded on the client side.
36    ClientDiscard,
37}
38
39/// Validates and extracts client reports.
40///
41/// At the moment client reports are primarily used to transfer outcomes from
42/// client SDKs.  The outcomes are removed here and sent directly to the outcomes
43/// system.
44pub fn process_client_reports(
45    managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
46    config: &Config,
47    project_info: &ProjectInfo,
48    outcome_aggregator: Addr<TrackOutcome>,
49) {
50    // if client outcomes are disabled we leave the client reports unprocessed
51    // and pass them on.
52    if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
53        // if a processing relay has client outcomes disabled we drop them.
54        if config.processing_enabled() {
55            managed_envelope.retain_items(|item| match item.ty() {
56                ItemType::ClientReport => ItemAction::DropSilently,
57                _ => ItemAction::Keep,
58            });
59        }
60        return;
61    }
62
63    let mut timestamp = None;
64    let mut output_events = BTreeMap::new();
65    let received = managed_envelope.received_at();
66
67    let clock_drift_processor =
68        ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received)
69            .at_least(MINIMUM_CLOCK_DRIFT);
70
71    // we're going through all client reports but we're effectively just merging
72    // them into the first one.
73    managed_envelope.retain_items(|item| {
74        if item.ty() != &ItemType::ClientReport {
75            return ItemAction::Keep;
76        };
77        match ClientReport::parse(&item.payload()) {
78            Ok(ClientReport {
79                timestamp: report_timestamp,
80                discarded_events,
81                rate_limited_events,
82                filtered_events,
83                filtered_sampling_events,
84            }) => {
85                // Glue all discarded events together and give them the appropriate outcome type
86                let input_events =
87                    discarded_events
88                        .into_iter()
89                        .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
90                        .chain(
91                            filtered_events.into_iter().map(|discarded_event| {
92                                (ClientReportField::Filtered, discarded_event)
93                            }),
94                        )
95                        .chain(filtered_sampling_events.into_iter().map(|discarded_event| {
96                            (ClientReportField::FilteredSampling, discarded_event)
97                        }))
98                        .chain(rate_limited_events.into_iter().map(|discarded_event| {
99                            (ClientReportField::RateLimited, discarded_event)
100                        }));
101
102                for (outcome_type, discarded_event) in input_events {
103                    if discarded_event.reason.len() > 200 {
104                        relay_log::trace!("ignored client outcome with an overlong reason");
105                        continue;
106                    }
107                    *output_events
108                        .entry((
109                            outcome_type,
110                            discarded_event.reason,
111                            discarded_event.category,
112                        ))
113                        .or_insert(0) += discarded_event.quantity;
114                }
115                if let Some(ts) = report_timestamp {
116                    timestamp.get_or_insert(ts);
117                }
118            }
119            Err(err) => {
120                relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
121            }
122        }
123        ItemAction::DropSilently
124    });
125
126    if output_events.is_empty() {
127        return;
128    }
129
130    let timestamp =
131        timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));
132
133    if clock_drift_processor.is_drifted() {
134        relay_log::trace!("applying clock drift correction to client report");
135        clock_drift_processor.process_timestamp(timestamp);
136    }
137
138    let retention_days = project_info
139        .config()
140        .event_retention
141        .unwrap_or(DEFAULT_EVENT_RETENTION);
142    let max_age = SignedDuration::days(retention_days.into());
143    // also if we unable to parse the timestamp, we assume it's way too old here.
144    let in_past = timestamp
145        .as_datetime()
146        .map(|ts| (received - ts) > max_age)
147        .unwrap_or(true);
148    if in_past {
149        relay_log::trace!(
150            "skipping client outcomes older than {} days",
151            max_age.num_days()
152        );
153        return;
154    }
155
156    let max_future = SignedDuration::seconds(config.max_secs_in_future());
157    // also if we unable to parse the timestamp, we assume it's way far in the future here.
158    let in_future = timestamp
159        .as_datetime()
160        .map(|ts| (ts - received) > max_future)
161        .unwrap_or(true);
162    if in_future {
163        relay_log::trace!(
164            "skipping client outcomes more than {}s in the future",
165            max_future.num_seconds()
166        );
167        return;
168    }
169
170    for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
171        let outcome = match outcome_from_parts(outcome_type, &reason) {
172            Ok(outcome) => outcome,
173            Err(_) => {
174                relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
175                continue;
176            }
177        };
178
179        outcome_aggregator.send(TrackOutcome {
180            // If we get to this point, the unwrap should not be used anymore, since we know by
181            // now that the timestamp can be parsed, but just incase we fallback to UTC current
182            // `DateTime`.
183            timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
184            scoping: managed_envelope.scoping(),
185            outcome,
186            event_id: None,
187            remote_addr: None, // omitting the client address allows for better aggregation
188            category,
189            quantity,
190        });
191    }
192}
193
194/// Validates and normalizes all user report items in the envelope.
195///
196/// User feedback items are removed from the envelope if they contain invalid JSON or if the
197/// JSON violates the schema (basic type validation). Otherwise, their normalized representation
198/// is written back into the item.
199pub fn process_user_reports<Group>(managed_envelope: &mut TypedEnvelope<Group>) {
200    managed_envelope.retain_items(|item| {
201        if item.ty() != &ItemType::UserReport {
202            return ItemAction::Keep;
203        };
204
205        let payload = item.payload();
206        // There is a customer SDK which sends invalid reports with a trailing `\n`,
207        // strip it here, even if they update/fix their SDK there will still be many old
208        // versions with the broken SDK out there.
209        let payload = trim_whitespaces(&payload);
210        let report = match serde_json::from_slice::<UserReport>(payload) {
211            Ok(report) => report,
212            Err(error) => {
213                relay_log::debug!(
214                    error = &error as &dyn Error,
215                    "failed to deserialize user report"
216                );
217                return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
218            }
219        };
220
221        let json_string = match serde_json::to_string(&report) {
222            Ok(json) => json,
223            Err(err) => {
224                relay_log::error!(
225                    error = &err as &dyn Error,
226                    "failed to serialize user report"
227                );
228                return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
229            }
230        };
231
232        item.set_payload(ContentType::Json, json_string);
233        ItemAction::Keep
234    });
235}
236
237fn trim_whitespaces(data: &[u8]) -> &[u8] {
238    let Some(from) = data.iter().position(|x| !x.is_ascii_whitespace()) else {
239        return &[];
240    };
241    let Some(to) = data.iter().rposition(|x| !x.is_ascii_whitespace()) else {
242        return &[];
243    };
244    &data[from..to + 1]
245}
246
247/// Parse an outcome from an outcome ID and a reason string.
248///
249/// Currently only used to reconstruct outcomes encoded in client reports.
250fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
251    match field {
252        ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
253            Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
254                .map(RuleCategories::from)
255                .map(Outcome::FilteredSampling)
256                .map_err(|_| ()),
257            None => Err(()),
258        },
259        ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
260        ClientReportField::Filtered => Ok(Outcome::Filtered(
261            FilterStatKey::try_from(reason).map_err(|_| ())?,
262        )),
263        ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
264            "" => None,
265            other => Some(ReasonCode::new(other)),
266        })),
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use relay_config::Config;
273    use relay_event_schema::protocol::EventId;
274
275    use crate::envelope::{Envelope, Item};
276    use crate::extractors::RequestMeta;
277    use crate::managed::ManagedEnvelope;
278    use crate::processing;
279    use crate::services::outcome::RuleCategory;
280    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
281    use crate::testutils::create_test_processor;
282
283    use super::*;
284
285    #[tokio::test]
286    async fn test_client_report_removal() {
287        relay_test::setup();
288        let outcome_aggregator = Addr::dummy();
289
290        let config = Config::from_json_value(serde_json::json!({
291            "outcomes": {
292                "emit_outcomes": true,
293                "emit_client_outcomes": true
294            }
295        }))
296        .unwrap();
297
298        let processor = create_test_processor(Default::default()).await;
299
300        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
301            .parse()
302            .unwrap();
303
304        let request_meta = RequestMeta::new(dsn);
305        let mut envelope = Envelope::from_request(None, request_meta);
306
307        envelope.add_item({
308            let mut item = Item::new(ItemType::ClientReport);
309            item.set_payload(
310                ContentType::Json,
311                r#"
312                    {
313                        "discarded_events": [
314                            ["queue_full", "error", 42]
315                        ]
316                    }
317                "#,
318            );
319            item
320        });
321
322        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
323        assert_eq!(envelopes.len(), 1);
324        let (group, envelope) = envelopes.pop().unwrap();
325
326        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
327        let message = ProcessEnvelopeGrouped {
328            group,
329            envelope,
330            ctx: processing::Context {
331                config: &config,
332                ..processing::Context::for_test()
333            },
334        };
335
336        let envelope = processor.process(message).await.unwrap();
337        assert!(envelope.is_none());
338    }
339
340    #[tokio::test]
341    async fn test_client_report_forwarding() {
342        relay_test::setup();
343        let outcome_aggregator = Addr::dummy();
344
345        let config = Config::from_json_value(serde_json::json!({
346            "outcomes": {
347                "emit_outcomes": false,
348                // a relay need to emit outcomes at all to not process.
349                "emit_client_outcomes": true
350            }
351        }))
352        .unwrap();
353
354        let processor = create_test_processor(Default::default()).await;
355
356        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
357            .parse()
358            .unwrap();
359
360        let request_meta = RequestMeta::new(dsn);
361        let mut envelope = Envelope::from_request(None, request_meta);
362
363        envelope.add_item({
364            let mut item = Item::new(ItemType::ClientReport);
365            item.set_payload(
366                ContentType::Json,
367                r#"
368                    {
369                        "discarded_events": [
370                            ["queue_full", "error", 42]
371                        ]
372                    }
373                "#,
374            );
375            item
376        });
377
378        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
379        assert_eq!(envelopes.len(), 1);
380        let (group, envelope) = envelopes.pop().unwrap();
381        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
382
383        let message = ProcessEnvelopeGrouped {
384            group,
385            envelope,
386            ctx: processing::Context {
387                config: &config,
388                ..processing::Context::for_test()
389            },
390        };
391
392        let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else {
393            panic!();
394        };
395        let item = new_envelope.envelope().items().next().unwrap();
396        assert_eq!(item.ty(), &ItemType::ClientReport);
397
398        new_envelope.accept(); // do not try to capture or emit outcomes
399    }
400
401    #[tokio::test]
402    #[cfg(feature = "processing")]
403    async fn test_client_report_removal_in_processing() {
404        relay_test::setup();
405        let outcome_aggregator = Addr::dummy();
406
407        let config = Config::from_json_value(serde_json::json!({
408            "outcomes": {
409                "emit_outcomes": true,
410                "emit_client_outcomes": false,
411            },
412            "processing": {
413                "enabled": true,
414                "kafka_config": [],
415            }
416        }))
417        .unwrap();
418
419        let processor = create_test_processor(Default::default()).await;
420
421        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
422            .parse()
423            .unwrap();
424
425        let request_meta = RequestMeta::new(dsn);
426        let mut envelope = Envelope::from_request(None, request_meta);
427
428        envelope.add_item({
429            let mut item = Item::new(ItemType::ClientReport);
430            item.set_payload(
431                ContentType::Json,
432                r#"
433                    {
434                        "discarded_events": [
435                            ["queue_full", "error", 42]
436                        ]
437                    }
438                "#,
439            );
440            item
441        });
442
443        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
444        assert_eq!(envelopes.len(), 1);
445
446        let (group, envelope) = envelopes.pop().unwrap();
447        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
448        let message = ProcessEnvelopeGrouped {
449            group,
450            envelope,
451            ctx: processing::Context {
452                config: &config,
453                ..processing::Context::for_test()
454            },
455        };
456
457        let envelope = processor.process(message).await.unwrap();
458        assert!(envelope.is_none());
459    }
460
461    #[tokio::test]
462    async fn test_user_report_only() {
463        relay_log::init_test!();
464        let processor = create_test_processor(Default::default()).await;
465        let outcome_aggregator = Addr::dummy();
466        let event_id = EventId::new();
467
468        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
469            .parse()
470            .unwrap();
471
472        let request_meta = RequestMeta::new(dsn);
473        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
474
475        envelope.add_item({
476            let mut item = Item::new(ItemType::UserReport);
477            item.set_payload(
478                ContentType::Json,
479                format!(r#"{{"event_id": "{event_id}"}}"#),
480            );
481            item
482        });
483
484        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
485        assert_eq!(envelopes.len(), 1);
486
487        let (group, envelope) = envelopes.pop().unwrap();
488
489        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
490        let message = ProcessEnvelopeGrouped {
491            group,
492            envelope,
493            ctx: processing::Context::for_test(),
494        };
495
496        let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else {
497            panic!();
498        };
499        let new_envelope = new_envelope.envelope();
500
501        assert_eq!(new_envelope.len(), 1);
502        assert_eq!(
503            new_envelope.items().next().unwrap().ty(),
504            &ItemType::UserReport
505        );
506    }
507
508    #[tokio::test]
509    async fn test_user_report_invalid() {
510        let processor = create_test_processor(Default::default()).await;
511        let outcome_aggregator = Addr::dummy();
512        let event_id = EventId::new();
513
514        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
515            .parse()
516            .unwrap();
517
518        let request_meta = RequestMeta::new(dsn);
519        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
520
521        envelope.add_item({
522            let mut item = Item::new(ItemType::UserReport);
523            item.set_payload(ContentType::Json, r#"{"foo": "bar"}"#);
524            item
525        });
526
527        envelope.add_item({
528            let mut item = Item::new(ItemType::Event);
529            item.set_payload(ContentType::Json, "{}");
530            item
531        });
532
533        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
534        assert_eq!(envelopes.len(), 1);
535        let (group, envelope) = envelopes.pop().unwrap();
536        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
537
538        let message = ProcessEnvelopeGrouped {
539            group,
540            envelope,
541            ctx: processing::Context::for_test(),
542        };
543
544        let Ok(Some(Submit::Envelope(new_envelope))) = processor.process(message).await else {
545            panic!();
546        };
547        let new_envelope = new_envelope.envelope();
548
549        assert_eq!(new_envelope.len(), 1);
550        assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event);
551    }
552
553    #[test]
554    fn test_from_outcome_type_sampled() {
555        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err());
556
557        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err());
558
559        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err());
560
561        assert!(matches!(
562            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"),
563            Err(())
564        ));
565
566        assert!(matches!(
567            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"),
568            Err(())
569        ));
570
571        assert!(matches!(
572            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"),
573            Err(())
574        ));
575
576        assert_eq!(
577            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"),
578            Ok(Outcome::FilteredSampling(RuleCategories(
579                [RuleCategory::Other].into()
580            )))
581        );
582
583        assert_eq!(
584            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
585            Ok(Outcome::FilteredSampling(RuleCategories(
586                [RuleCategory::Other].into()
587            )))
588        );
589
590        assert_eq!(
591            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
592            Ok(Outcome::FilteredSampling(RuleCategories(
593                [RuleCategory::Other].into()
594            )))
595        );
596
597        assert_eq!(
598            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"),
599            Ok(Outcome::FilteredSampling(RuleCategories(
600                [RuleCategory::BoostEnvironments].into()
601            )))
602        );
603
604        assert_eq!(
605            outcome_from_parts(
606                ClientReportField::FilteredSampling,
607                "Sampled:1001,1456,1567,3333,4444"
608            ),
609            Ok(Outcome::FilteredSampling(RuleCategories(
610                [
611                    RuleCategory::BoostEnvironments,
612                    RuleCategory::BoostLowVolumeTransactions,
613                    RuleCategory::BoostLatestReleases,
614                    RuleCategory::Custom
615                ]
616                .into()
617            )))
618        );
619    }
620
621    #[test]
622    fn test_from_outcome_type_filtered() {
623        assert!(matches!(
624            outcome_from_parts(ClientReportField::Filtered, "error-message"),
625            Ok(Outcome::Filtered(FilterStatKey::ErrorMessage))
626        ));
627
628        assert!(matches!(
629            outcome_from_parts(ClientReportField::Filtered, "hydration-error"),
630            Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_)))
631        ));
632    }
633
634    #[test]
635    fn test_from_outcome_type_client_discard() {
636        assert_eq!(
637            outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(),
638            Outcome::ClientDiscard("foo_reason".into())
639        );
640    }
641
642    #[test]
643    fn test_from_outcome_type_rate_limited() {
644        assert!(matches!(
645            outcome_from_parts(ClientReportField::RateLimited, ""),
646            Ok(Outcome::RateLimited(None))
647        ));
648        assert_eq!(
649            outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(),
650            Outcome::RateLimited(Some(ReasonCode::new("foo_reason")))
651        );
652    }
653
654    #[test]
655    fn test_trim_whitespaces() {
656        assert_eq!(trim_whitespaces(b""), b"");
657        assert_eq!(trim_whitespaces(b" \n\r "), b"");
658        assert_eq!(trim_whitespaces(b" \nx\r "), b"x");
659        assert_eq!(trim_whitespaces(b" {foo: bar} "), b"{foo: bar}");
660        assert_eq!(trim_whitespaces(b"{ foo: bar}"), b"{ foo: bar}");
661    }
662}