relay_server/services/processor/
report.rs

1//! Contains code related to validation and normalization of the user and client reports.
2
3use std::collections::BTreeMap;
4use std::error::Error;
5
6use chrono::{Duration as SignedDuration, Utc};
7use relay_common::time::UnixTimestamp;
8use relay_config::Config;
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{ClientReport, UserReport};
11use relay_filter::FilterStatKey;
12use relay_quotas::ReasonCode;
13use relay_sampling::evaluation::MatchedRuleIds;
14use relay_system::Addr;
15
16use crate::constants::DEFAULT_EVENT_RETENTION;
17use crate::envelope::{ContentType, ItemType};
18use crate::managed::{ItemAction, TypedEnvelope};
19use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome};
20use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT};
21use crate::services::projects::project::ProjectInfo;
22
23/// Fields of client reports that map to specific [`Outcome`]s without content.
24#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
25pub enum ClientReportField {
26    /// The event has been filtered by an inbound data filter.
27    Filtered,
28
29    /// The event has been filtered by a sampling rule.
30    FilteredSampling,
31
32    /// The event has been rate limited.
33    RateLimited,
34
35    /// The event has already been discarded on the client side.
36    ClientDiscard,
37}
38
39/// Validates and extracts client reports.
40///
41/// At the moment client reports are primarily used to transfer outcomes from
42/// client SDKs.  The outcomes are removed here and sent directly to the outcomes
43/// system.
44pub fn process_client_reports(
45    managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
46    config: &Config,
47    project_info: &ProjectInfo,
48    outcome_aggregator: Addr<TrackOutcome>,
49) {
50    // if client outcomes are disabled we leave the client reports unprocessed
51    // and pass them on.
52    if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
53        // if a processing relay has client outcomes disabled we drop them.
54        if config.processing_enabled() {
55            managed_envelope.retain_items(|item| match item.ty() {
56                ItemType::ClientReport => ItemAction::DropSilently,
57                _ => ItemAction::Keep,
58            });
59        }
60        return;
61    }
62
63    let mut timestamp = None;
64    let mut output_events = BTreeMap::new();
65    let received = managed_envelope.received_at();
66
67    let clock_drift_processor =
68        ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received)
69            .at_least(MINIMUM_CLOCK_DRIFT);
70
71    // we're going through all client reports but we're effectively just merging
72    // them into the first one.
73    managed_envelope.retain_items(|item| {
74        if item.ty() != &ItemType::ClientReport {
75            return ItemAction::Keep;
76        };
77        match ClientReport::parse(&item.payload()) {
78            Ok(ClientReport {
79                timestamp: report_timestamp,
80                discarded_events,
81                rate_limited_events,
82                filtered_events,
83                filtered_sampling_events,
84            }) => {
85                // Glue all discarded events together and give them the appropriate outcome type
86                let input_events =
87                    discarded_events
88                        .into_iter()
89                        .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
90                        .chain(
91                            filtered_events.into_iter().map(|discarded_event| {
92                                (ClientReportField::Filtered, discarded_event)
93                            }),
94                        )
95                        .chain(filtered_sampling_events.into_iter().map(|discarded_event| {
96                            (ClientReportField::FilteredSampling, discarded_event)
97                        }))
98                        .chain(rate_limited_events.into_iter().map(|discarded_event| {
99                            (ClientReportField::RateLimited, discarded_event)
100                        }));
101
102                for (outcome_type, discarded_event) in input_events {
103                    if discarded_event.reason.len() > 200 {
104                        relay_log::trace!("ignored client outcome with an overlong reason");
105                        continue;
106                    }
107                    *output_events
108                        .entry((
109                            outcome_type,
110                            discarded_event.reason,
111                            discarded_event.category,
112                        ))
113                        .or_insert(0) += discarded_event.quantity;
114                }
115                if let Some(ts) = report_timestamp {
116                    timestamp.get_or_insert(ts);
117                }
118            }
119            Err(err) => {
120                relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
121            }
122        }
123        ItemAction::DropSilently
124    });
125
126    if output_events.is_empty() {
127        return;
128    }
129
130    let timestamp =
131        timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));
132
133    if clock_drift_processor.is_drifted() {
134        relay_log::trace!("applying clock drift correction to client report");
135        clock_drift_processor.process_timestamp(timestamp);
136    }
137
138    let retention_days = project_info
139        .config()
140        .event_retention
141        .unwrap_or(DEFAULT_EVENT_RETENTION);
142    let max_age = SignedDuration::days(retention_days.into());
143    // also if we unable to parse the timestamp, we assume it's way too old here.
144    let in_past = timestamp
145        .as_datetime()
146        .map(|ts| (received - ts) > max_age)
147        .unwrap_or(true);
148    if in_past {
149        relay_log::trace!(
150            "skipping client outcomes older than {} days",
151            max_age.num_days()
152        );
153        return;
154    }
155
156    let max_future = SignedDuration::seconds(config.max_secs_in_future());
157    // also if we unable to parse the timestamp, we assume it's way far in the future here.
158    let in_future = timestamp
159        .as_datetime()
160        .map(|ts| (ts - received) > max_future)
161        .unwrap_or(true);
162    if in_future {
163        relay_log::trace!(
164            "skipping client outcomes more than {}s in the future",
165            max_future.num_seconds()
166        );
167        return;
168    }
169
170    for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
171        let outcome = match outcome_from_parts(outcome_type, &reason) {
172            Ok(outcome) => outcome,
173            Err(_) => {
174                relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
175                continue;
176            }
177        };
178
179        outcome_aggregator.send(TrackOutcome {
180            // If we get to this point, the unwrap should not be used anymore, since we know by
181            // now that the timestamp can be parsed, but just incase we fallback to UTC current
182            // `DateTime`.
183            timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
184            scoping: managed_envelope.scoping(),
185            outcome,
186            event_id: None,
187            remote_addr: None, // omitting the client address allows for better aggregation
188            category,
189            quantity,
190        });
191    }
192}
193
194/// Validates and normalizes all user report items in the envelope.
195///
196/// User feedback items are removed from the envelope if they contain invalid JSON or if the
197/// JSON violates the schema (basic type validation). Otherwise, their normalized representation
198/// is written back into the item.
199pub fn process_user_reports<Group>(managed_envelope: &mut TypedEnvelope<Group>) {
200    managed_envelope.retain_items(|item| {
201        if item.ty() != &ItemType::UserReport {
202            return ItemAction::Keep;
203        };
204
205        let payload = item.payload();
206        // There is a customer SDK which sends invalid reports with a trailing `\n`,
207        // strip it here, even if they update/fix their SDK there will still be many old
208        // versions with the broken SDK out there.
209        let payload = trim_whitespaces(&payload);
210        let report = match serde_json::from_slice::<UserReport>(payload) {
211            Ok(report) => report,
212            Err(error) => {
213                relay_log::error!(error = &error as &dyn Error, "failed to store user report");
214                return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
215            }
216        };
217
218        let json_string = match serde_json::to_string(&report) {
219            Ok(json) => json,
220            Err(err) => {
221                relay_log::error!(
222                    error = &err as &dyn Error,
223                    "failed to serialize user report"
224                );
225                return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
226            }
227        };
228
229        item.set_payload(ContentType::Json, json_string);
230        ItemAction::Keep
231    });
232}
233
234fn trim_whitespaces(data: &[u8]) -> &[u8] {
235    let Some(from) = data.iter().position(|x| !x.is_ascii_whitespace()) else {
236        return &[];
237    };
238    let Some(to) = data.iter().rposition(|x| !x.is_ascii_whitespace()) else {
239        return &[];
240    };
241    &data[from..to + 1]
242}
243
244/// Parse an outcome from an outcome ID and a reason string.
245///
246/// Currently only used to reconstruct outcomes encoded in client reports.
247fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
248    match field {
249        ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
250            Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
251                .map(RuleCategories::from)
252                .map(Outcome::FilteredSampling)
253                .map_err(|_| ()),
254            None => Err(()),
255        },
256        ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
257        ClientReportField::Filtered => Ok(Outcome::Filtered(
258            FilterStatKey::try_from(reason).map_err(|_| ())?,
259        )),
260        ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
261            "" => None,
262            other => Some(ReasonCode::new(other)),
263        })),
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use relay_cogs::Token;
270    use relay_config::Config;
271    use relay_event_schema::protocol::EventId;
272    use relay_sampling::evaluation::ReservoirCounters;
273
274    use crate::envelope::{Envelope, Item};
275    use crate::extractors::RequestMeta;
276    use crate::managed::ManagedEnvelope;
277    use crate::processing;
278    use crate::services::outcome::RuleCategory;
279    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
280    use crate::testutils::create_test_processor;
281
282    use super::*;
283
284    #[tokio::test]
285    async fn test_client_report_removal() {
286        relay_test::setup();
287        let outcome_aggregator = Addr::dummy();
288
289        let config = Config::from_json_value(serde_json::json!({
290            "outcomes": {
291                "emit_outcomes": true,
292                "emit_client_outcomes": true
293            }
294        }))
295        .unwrap();
296
297        let processor = create_test_processor(Default::default()).await;
298
299        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
300            .parse()
301            .unwrap();
302
303        let request_meta = RequestMeta::new(dsn);
304        let mut envelope = Envelope::from_request(None, request_meta);
305
306        envelope.add_item({
307            let mut item = Item::new(ItemType::ClientReport);
308            item.set_payload(
309                ContentType::Json,
310                r#"
311                    {
312                        "discarded_events": [
313                            ["queue_full", "error", 42]
314                        ]
315                    }
316                "#,
317            );
318            item
319        });
320
321        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
322        assert_eq!(envelopes.len(), 1);
323        let (group, envelope) = envelopes.pop().unwrap();
324
325        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
326        let message = ProcessEnvelopeGrouped {
327            group,
328            envelope,
329            ctx: processing::Context {
330                config: &config,
331                ..processing::Context::for_test()
332            },
333            reservoir_counters: &ReservoirCounters::default(),
334        };
335
336        let envelope = processor
337            .process(&mut Token::noop(), message)
338            .await
339            .unwrap();
340        assert!(envelope.is_none());
341    }
342
343    #[tokio::test]
344    async fn test_client_report_forwarding() {
345        relay_test::setup();
346        let outcome_aggregator = Addr::dummy();
347
348        let config = Config::from_json_value(serde_json::json!({
349            "outcomes": {
350                "emit_outcomes": false,
351                // a relay need to emit outcomes at all to not process.
352                "emit_client_outcomes": true
353            }
354        }))
355        .unwrap();
356
357        let processor = create_test_processor(Default::default()).await;
358
359        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
360            .parse()
361            .unwrap();
362
363        let request_meta = RequestMeta::new(dsn);
364        let mut envelope = Envelope::from_request(None, request_meta);
365
366        envelope.add_item({
367            let mut item = Item::new(ItemType::ClientReport);
368            item.set_payload(
369                ContentType::Json,
370                r#"
371                    {
372                        "discarded_events": [
373                            ["queue_full", "error", 42]
374                        ]
375                    }
376                "#,
377            );
378            item
379        });
380
381        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
382        assert_eq!(envelopes.len(), 1);
383        let (group, envelope) = envelopes.pop().unwrap();
384        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
385
386        let message = ProcessEnvelopeGrouped {
387            group,
388            envelope,
389            ctx: processing::Context {
390                config: &config,
391                ..processing::Context::for_test()
392            },
393            reservoir_counters: &ReservoirCounters::default(),
394        };
395
396        let Ok(Some(Submit::Envelope(new_envelope))) =
397            processor.process(&mut Token::noop(), message).await
398        else {
399            panic!();
400        };
401        let item = new_envelope.envelope().items().next().unwrap();
402        assert_eq!(item.ty(), &ItemType::ClientReport);
403
404        new_envelope.accept(); // do not try to capture or emit outcomes
405    }
406
407    #[tokio::test]
408    #[cfg(feature = "processing")]
409    async fn test_client_report_removal_in_processing() {
410        relay_test::setup();
411        let outcome_aggregator = Addr::dummy();
412
413        let config = Config::from_json_value(serde_json::json!({
414            "outcomes": {
415                "emit_outcomes": true,
416                "emit_client_outcomes": false,
417            },
418            "processing": {
419                "enabled": true,
420                "kafka_config": [],
421            }
422        }))
423        .unwrap();
424
425        let processor = create_test_processor(Default::default()).await;
426
427        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
428            .parse()
429            .unwrap();
430
431        let request_meta = RequestMeta::new(dsn);
432        let mut envelope = Envelope::from_request(None, request_meta);
433
434        envelope.add_item({
435            let mut item = Item::new(ItemType::ClientReport);
436            item.set_payload(
437                ContentType::Json,
438                r#"
439                    {
440                        "discarded_events": [
441                            ["queue_full", "error", 42]
442                        ]
443                    }
444                "#,
445            );
446            item
447        });
448
449        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
450        assert_eq!(envelopes.len(), 1);
451
452        let (group, envelope) = envelopes.pop().unwrap();
453        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
454        let message = ProcessEnvelopeGrouped {
455            group,
456            envelope,
457            ctx: processing::Context {
458                config: &config,
459                ..processing::Context::for_test()
460            },
461            reservoir_counters: &ReservoirCounters::default(),
462        };
463
464        let envelope = processor
465            .process(&mut Token::noop(), message)
466            .await
467            .unwrap();
468        assert!(envelope.is_none());
469    }
470
471    #[tokio::test]
472    async fn test_user_report_only() {
473        relay_log::init_test!();
474        let processor = create_test_processor(Default::default()).await;
475        let outcome_aggregator = Addr::dummy();
476        let event_id = EventId::new();
477
478        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
479            .parse()
480            .unwrap();
481
482        let request_meta = RequestMeta::new(dsn);
483        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
484
485        envelope.add_item({
486            let mut item = Item::new(ItemType::UserReport);
487            item.set_payload(
488                ContentType::Json,
489                format!(r#"{{"event_id": "{event_id}"}}"#),
490            );
491            item
492        });
493
494        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
495        assert_eq!(envelopes.len(), 1);
496
497        let (group, envelope) = envelopes.pop().unwrap();
498
499        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
500        let message = ProcessEnvelopeGrouped {
501            group,
502            envelope,
503            ctx: processing::Context::for_test(),
504            reservoir_counters: &ReservoirCounters::default(),
505        };
506
507        let Ok(Some(Submit::Envelope(new_envelope))) =
508            processor.process(&mut Token::noop(), message).await
509        else {
510            panic!();
511        };
512        let new_envelope = new_envelope.envelope();
513
514        assert_eq!(new_envelope.len(), 1);
515        assert_eq!(
516            new_envelope.items().next().unwrap().ty(),
517            &ItemType::UserReport
518        );
519    }
520
521    #[tokio::test]
522    async fn test_user_report_invalid() {
523        let processor = create_test_processor(Default::default()).await;
524        let outcome_aggregator = Addr::dummy();
525        let event_id = EventId::new();
526
527        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
528            .parse()
529            .unwrap();
530
531        let request_meta = RequestMeta::new(dsn);
532        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
533
534        envelope.add_item({
535            let mut item = Item::new(ItemType::UserReport);
536            item.set_payload(ContentType::Json, r#"{"foo": "bar"}"#);
537            item
538        });
539
540        envelope.add_item({
541            let mut item = Item::new(ItemType::Event);
542            item.set_payload(ContentType::Json, "{}");
543            item
544        });
545
546        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
547        assert_eq!(envelopes.len(), 1);
548        let (group, envelope) = envelopes.pop().unwrap();
549        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
550
551        let message = ProcessEnvelopeGrouped {
552            group,
553            envelope,
554            ctx: processing::Context::for_test(),
555            reservoir_counters: &ReservoirCounters::default(),
556        };
557
558        let Ok(Some(Submit::Envelope(new_envelope))) =
559            processor.process(&mut Token::noop(), message).await
560        else {
561            panic!();
562        };
563        let new_envelope = new_envelope.envelope();
564
565        assert_eq!(new_envelope.len(), 1);
566        assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event);
567    }
568
569    #[test]
570    fn test_from_outcome_type_sampled() {
571        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err());
572
573        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err());
574
575        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err());
576
577        assert!(matches!(
578            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"),
579            Err(())
580        ));
581
582        assert!(matches!(
583            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"),
584            Err(())
585        ));
586
587        assert!(matches!(
588            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"),
589            Err(())
590        ));
591
592        assert_eq!(
593            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"),
594            Ok(Outcome::FilteredSampling(RuleCategories(
595                [RuleCategory::Other].into()
596            )))
597        );
598
599        assert_eq!(
600            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
601            Ok(Outcome::FilteredSampling(RuleCategories(
602                [RuleCategory::Other].into()
603            )))
604        );
605
606        assert_eq!(
607            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
608            Ok(Outcome::FilteredSampling(RuleCategories(
609                [RuleCategory::Other].into()
610            )))
611        );
612
613        assert_eq!(
614            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"),
615            Ok(Outcome::FilteredSampling(RuleCategories(
616                [RuleCategory::BoostEnvironments].into()
617            )))
618        );
619
620        assert_eq!(
621            outcome_from_parts(
622                ClientReportField::FilteredSampling,
623                "Sampled:1001,1456,1567,3333,4444"
624            ),
625            Ok(Outcome::FilteredSampling(RuleCategories(
626                [
627                    RuleCategory::BoostEnvironments,
628                    RuleCategory::BoostLowVolumeTransactions,
629                    RuleCategory::BoostLatestReleases,
630                    RuleCategory::Custom
631                ]
632                .into()
633            )))
634        );
635    }
636
637    #[test]
638    fn test_from_outcome_type_filtered() {
639        assert!(matches!(
640            outcome_from_parts(ClientReportField::Filtered, "error-message"),
641            Ok(Outcome::Filtered(FilterStatKey::ErrorMessage))
642        ));
643
644        assert!(matches!(
645            outcome_from_parts(ClientReportField::Filtered, "hydration-error"),
646            Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_)))
647        ));
648    }
649
650    #[test]
651    fn test_from_outcome_type_client_discard() {
652        assert_eq!(
653            outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(),
654            Outcome::ClientDiscard("foo_reason".into())
655        );
656    }
657
658    #[test]
659    fn test_from_outcome_type_rate_limited() {
660        assert!(matches!(
661            outcome_from_parts(ClientReportField::RateLimited, ""),
662            Ok(Outcome::RateLimited(None))
663        ));
664        assert_eq!(
665            outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(),
666            Outcome::RateLimited(Some(ReasonCode::new("foo_reason")))
667        );
668    }
669
670    #[test]
671    fn test_trim_whitespaces() {
672        assert_eq!(trim_whitespaces(b""), b"");
673        assert_eq!(trim_whitespaces(b" \n\r "), b"");
674        assert_eq!(trim_whitespaces(b" \nx\r "), b"x");
675        assert_eq!(trim_whitespaces(b" {foo: bar} "), b"{foo: bar}");
676        assert_eq!(trim_whitespaces(b"{ foo: bar}"), b"{ foo: bar}");
677    }
678}