relay_server/services/processor/
report.rs

1//! Contains code related to validation and normalization of the user and client reports.
2
3use std::collections::BTreeMap;
4use std::error::Error;
5
6use chrono::{Duration as SignedDuration, Utc};
7use relay_common::time::UnixTimestamp;
8use relay_config::Config;
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{ClientReport, UserReport};
11use relay_filter::FilterStatKey;
12use relay_quotas::ReasonCode;
13use relay_sampling::evaluation::MatchedRuleIds;
14use relay_system::Addr;
15
16use crate::constants::DEFAULT_EVENT_RETENTION;
17use crate::envelope::{ContentType, ItemType};
18use crate::managed::{ItemAction, TypedEnvelope};
19use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome};
20use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT};
21use crate::services::projects::project::ProjectInfo;
22
23/// Fields of client reports that map to specific [`Outcome`]s without content.
24#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
25pub enum ClientReportField {
26    /// The event has been filtered by an inbound data filter.
27    Filtered,
28
29    /// The event has been filtered by a sampling rule.
30    FilteredSampling,
31
32    /// The event has been rate limited.
33    RateLimited,
34
35    /// The event has already been discarded on the client side.
36    ClientDiscard,
37}
38
39/// Validates and extracts client reports.
40///
41/// At the moment client reports are primarily used to transfer outcomes from
42/// client SDKs.  The outcomes are removed here and sent directly to the outcomes
43/// system.
44pub fn process_client_reports(
45    managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
46    config: &Config,
47    project_info: &ProjectInfo,
48    outcome_aggregator: Addr<TrackOutcome>,
49) {
50    // if client outcomes are disabled we leave the client reports unprocessed
51    // and pass them on.
52    if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
53        // if a processing relay has client outcomes disabled we drop them.
54        if config.processing_enabled() {
55            managed_envelope.retain_items(|item| match item.ty() {
56                ItemType::ClientReport => ItemAction::DropSilently,
57                _ => ItemAction::Keep,
58            });
59        }
60        return;
61    }
62
63    let mut timestamp = None;
64    let mut output_events = BTreeMap::new();
65    let received = managed_envelope.received_at();
66
67    let clock_drift_processor =
68        ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received)
69            .at_least(MINIMUM_CLOCK_DRIFT);
70
71    // we're going through all client reports but we're effectively just merging
72    // them into the first one.
73    managed_envelope.retain_items(|item| {
74        if item.ty() != &ItemType::ClientReport {
75            return ItemAction::Keep;
76        };
77        match ClientReport::parse(&item.payload()) {
78            Ok(ClientReport {
79                timestamp: report_timestamp,
80                discarded_events,
81                rate_limited_events,
82                filtered_events,
83                filtered_sampling_events,
84            }) => {
85                // Glue all discarded events together and give them the appropriate outcome type
86                let input_events =
87                    discarded_events
88                        .into_iter()
89                        .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
90                        .chain(
91                            filtered_events.into_iter().map(|discarded_event| {
92                                (ClientReportField::Filtered, discarded_event)
93                            }),
94                        )
95                        .chain(filtered_sampling_events.into_iter().map(|discarded_event| {
96                            (ClientReportField::FilteredSampling, discarded_event)
97                        }))
98                        .chain(rate_limited_events.into_iter().map(|discarded_event| {
99                            (ClientReportField::RateLimited, discarded_event)
100                        }));
101
102                for (outcome_type, discarded_event) in input_events {
103                    if discarded_event.reason.len() > 200 {
104                        relay_log::trace!("ignored client outcome with an overlong reason");
105                        continue;
106                    }
107                    *output_events
108                        .entry((
109                            outcome_type,
110                            discarded_event.reason,
111                            discarded_event.category,
112                        ))
113                        .or_insert(0) += discarded_event.quantity;
114                }
115                if let Some(ts) = report_timestamp {
116                    timestamp.get_or_insert(ts);
117                }
118            }
119            Err(err) => {
120                relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
121            }
122        }
123        ItemAction::DropSilently
124    });
125
126    if output_events.is_empty() {
127        return;
128    }
129
130    let timestamp =
131        timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));
132
133    if clock_drift_processor.is_drifted() {
134        relay_log::trace!("applying clock drift correction to client report");
135        clock_drift_processor.process_timestamp(timestamp);
136    }
137
138    let retention_days = project_info
139        .config()
140        .event_retention
141        .unwrap_or(DEFAULT_EVENT_RETENTION);
142    let max_age = SignedDuration::days(retention_days.into());
143    // also if we unable to parse the timestamp, we assume it's way too old here.
144    let in_past = timestamp
145        .as_datetime()
146        .map(|ts| (received - ts) > max_age)
147        .unwrap_or(true);
148    if in_past {
149        relay_log::trace!(
150            "skipping client outcomes older than {} days",
151            max_age.num_days()
152        );
153        return;
154    }
155
156    let max_future = SignedDuration::seconds(config.max_secs_in_future());
157    // also if we unable to parse the timestamp, we assume it's way far in the future here.
158    let in_future = timestamp
159        .as_datetime()
160        .map(|ts| (ts - received) > max_future)
161        .unwrap_or(true);
162    if in_future {
163        relay_log::trace!(
164            "skipping client outcomes more than {}s in the future",
165            max_future.num_seconds()
166        );
167        return;
168    }
169
170    for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
171        let outcome = match outcome_from_parts(outcome_type, &reason) {
172            Ok(outcome) => outcome,
173            Err(_) => {
174                relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
175                continue;
176            }
177        };
178
179        outcome_aggregator.send(TrackOutcome {
180            // If we get to this point, the unwrap should not be used anymore, since we know by
181            // now that the timestamp can be parsed, but just incase we fallback to UTC current
182            // `DateTime`.
183            timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
184            scoping: managed_envelope.scoping(),
185            outcome,
186            event_id: None,
187            remote_addr: None, // omitting the client address allows for better aggregation
188            category,
189            quantity,
190        });
191    }
192}
193
194/// Validates and normalizes all user report items in the envelope.
195///
196/// User feedback items are removed from the envelope if they contain invalid JSON or if the
197/// JSON violates the schema (basic type validation). Otherwise, their normalized representation
198/// is written back into the item.
199pub fn process_user_reports<Group>(managed_envelope: &mut TypedEnvelope<Group>) {
200    managed_envelope.retain_items(|item| {
201        if item.ty() != &ItemType::UserReport {
202            return ItemAction::Keep;
203        };
204
205        let payload = item.payload();
206        // There is a customer SDK which sends invalid reports with a trailing `\n`,
207        // strip it here, even if they update/fix their SDK there will still be many old
208        // versions with the broken SDK out there.
209        let payload = trim_whitespaces(&payload);
210        let report = match serde_json::from_slice::<UserReport>(payload) {
211            Ok(report) => report,
212            Err(error) => {
213                relay_log::debug!(
214                    error = &error as &dyn Error,
215                    "failed to deserialize user report"
216                );
217                return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
218            }
219        };
220
221        let json_string = match serde_json::to_string(&report) {
222            Ok(json) => json,
223            Err(err) => {
224                relay_log::error!(
225                    error = &err as &dyn Error,
226                    "failed to serialize user report"
227                );
228                return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
229            }
230        };
231
232        item.set_payload(ContentType::Json, json_string);
233        ItemAction::Keep
234    });
235}
236
237fn trim_whitespaces(data: &[u8]) -> &[u8] {
238    let Some(from) = data.iter().position(|x| !x.is_ascii_whitespace()) else {
239        return &[];
240    };
241    let Some(to) = data.iter().rposition(|x| !x.is_ascii_whitespace()) else {
242        return &[];
243    };
244    &data[from..to + 1]
245}
246
247/// Parse an outcome from an outcome ID and a reason string.
248///
249/// Currently only used to reconstruct outcomes encoded in client reports.
250fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
251    match field {
252        ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
253            Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
254                .map(RuleCategories::from)
255                .map(Outcome::FilteredSampling)
256                .map_err(|_| ()),
257            None => Err(()),
258        },
259        ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
260        ClientReportField::Filtered => Ok(Outcome::Filtered(
261            FilterStatKey::try_from(reason).map_err(|_| ())?,
262        )),
263        ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
264            "" => None,
265            other => Some(ReasonCode::new(other)),
266        })),
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use relay_cogs::Token;
273    use relay_config::Config;
274    use relay_event_schema::protocol::EventId;
275
276    use crate::envelope::{Envelope, Item};
277    use crate::extractors::RequestMeta;
278    use crate::managed::ManagedEnvelope;
279    use crate::processing;
280    use crate::services::outcome::RuleCategory;
281    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
282    use crate::testutils::create_test_processor;
283
284    use super::*;
285
286    #[tokio::test]
287    async fn test_client_report_removal() {
288        relay_test::setup();
289        let outcome_aggregator = Addr::dummy();
290
291        let config = Config::from_json_value(serde_json::json!({
292            "outcomes": {
293                "emit_outcomes": true,
294                "emit_client_outcomes": true
295            }
296        }))
297        .unwrap();
298
299        let processor = create_test_processor(Default::default()).await;
300
301        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
302            .parse()
303            .unwrap();
304
305        let request_meta = RequestMeta::new(dsn);
306        let mut envelope = Envelope::from_request(None, request_meta);
307
308        envelope.add_item({
309            let mut item = Item::new(ItemType::ClientReport);
310            item.set_payload(
311                ContentType::Json,
312                r#"
313                    {
314                        "discarded_events": [
315                            ["queue_full", "error", 42]
316                        ]
317                    }
318                "#,
319            );
320            item
321        });
322
323        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
324        assert_eq!(envelopes.len(), 1);
325        let (group, envelope) = envelopes.pop().unwrap();
326
327        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
328        let message = ProcessEnvelopeGrouped {
329            group,
330            envelope,
331            ctx: processing::Context {
332                config: &config,
333                ..processing::Context::for_test()
334            },
335        };
336
337        let envelope = processor
338            .process(&mut Token::noop(), message)
339            .await
340            .unwrap();
341        assert!(envelope.is_none());
342    }
343
344    #[tokio::test]
345    async fn test_client_report_forwarding() {
346        relay_test::setup();
347        let outcome_aggregator = Addr::dummy();
348
349        let config = Config::from_json_value(serde_json::json!({
350            "outcomes": {
351                "emit_outcomes": false,
352                // a relay need to emit outcomes at all to not process.
353                "emit_client_outcomes": true
354            }
355        }))
356        .unwrap();
357
358        let processor = create_test_processor(Default::default()).await;
359
360        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
361            .parse()
362            .unwrap();
363
364        let request_meta = RequestMeta::new(dsn);
365        let mut envelope = Envelope::from_request(None, request_meta);
366
367        envelope.add_item({
368            let mut item = Item::new(ItemType::ClientReport);
369            item.set_payload(
370                ContentType::Json,
371                r#"
372                    {
373                        "discarded_events": [
374                            ["queue_full", "error", 42]
375                        ]
376                    }
377                "#,
378            );
379            item
380        });
381
382        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
383        assert_eq!(envelopes.len(), 1);
384        let (group, envelope) = envelopes.pop().unwrap();
385        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
386
387        let message = ProcessEnvelopeGrouped {
388            group,
389            envelope,
390            ctx: processing::Context {
391                config: &config,
392                ..processing::Context::for_test()
393            },
394        };
395
396        let Ok(Some(Submit::Envelope(new_envelope))) =
397            processor.process(&mut Token::noop(), message).await
398        else {
399            panic!();
400        };
401        let item = new_envelope.envelope().items().next().unwrap();
402        assert_eq!(item.ty(), &ItemType::ClientReport);
403
404        new_envelope.accept(); // do not try to capture or emit outcomes
405    }
406
407    #[tokio::test]
408    #[cfg(feature = "processing")]
409    async fn test_client_report_removal_in_processing() {
410        relay_test::setup();
411        let outcome_aggregator = Addr::dummy();
412
413        let config = Config::from_json_value(serde_json::json!({
414            "outcomes": {
415                "emit_outcomes": true,
416                "emit_client_outcomes": false,
417            },
418            "processing": {
419                "enabled": true,
420                "kafka_config": [],
421            }
422        }))
423        .unwrap();
424
425        let processor = create_test_processor(Default::default()).await;
426
427        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
428            .parse()
429            .unwrap();
430
431        let request_meta = RequestMeta::new(dsn);
432        let mut envelope = Envelope::from_request(None, request_meta);
433
434        envelope.add_item({
435            let mut item = Item::new(ItemType::ClientReport);
436            item.set_payload(
437                ContentType::Json,
438                r#"
439                    {
440                        "discarded_events": [
441                            ["queue_full", "error", 42]
442                        ]
443                    }
444                "#,
445            );
446            item
447        });
448
449        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
450        assert_eq!(envelopes.len(), 1);
451
452        let (group, envelope) = envelopes.pop().unwrap();
453        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
454        let message = ProcessEnvelopeGrouped {
455            group,
456            envelope,
457            ctx: processing::Context {
458                config: &config,
459                ..processing::Context::for_test()
460            },
461        };
462
463        let envelope = processor
464            .process(&mut Token::noop(), message)
465            .await
466            .unwrap();
467        assert!(envelope.is_none());
468    }
469
470    #[tokio::test]
471    async fn test_user_report_only() {
472        relay_log::init_test!();
473        let processor = create_test_processor(Default::default()).await;
474        let outcome_aggregator = Addr::dummy();
475        let event_id = EventId::new();
476
477        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
478            .parse()
479            .unwrap();
480
481        let request_meta = RequestMeta::new(dsn);
482        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
483
484        envelope.add_item({
485            let mut item = Item::new(ItemType::UserReport);
486            item.set_payload(
487                ContentType::Json,
488                format!(r#"{{"event_id": "{event_id}"}}"#),
489            );
490            item
491        });
492
493        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
494        assert_eq!(envelopes.len(), 1);
495
496        let (group, envelope) = envelopes.pop().unwrap();
497
498        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
499        let message = ProcessEnvelopeGrouped {
500            group,
501            envelope,
502            ctx: processing::Context::for_test(),
503        };
504
505        let Ok(Some(Submit::Envelope(new_envelope))) =
506            processor.process(&mut Token::noop(), message).await
507        else {
508            panic!();
509        };
510        let new_envelope = new_envelope.envelope();
511
512        assert_eq!(new_envelope.len(), 1);
513        assert_eq!(
514            new_envelope.items().next().unwrap().ty(),
515            &ItemType::UserReport
516        );
517    }
518
519    #[tokio::test]
520    async fn test_user_report_invalid() {
521        let processor = create_test_processor(Default::default()).await;
522        let outcome_aggregator = Addr::dummy();
523        let event_id = EventId::new();
524
525        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
526            .parse()
527            .unwrap();
528
529        let request_meta = RequestMeta::new(dsn);
530        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
531
532        envelope.add_item({
533            let mut item = Item::new(ItemType::UserReport);
534            item.set_payload(ContentType::Json, r#"{"foo": "bar"}"#);
535            item
536        });
537
538        envelope.add_item({
539            let mut item = Item::new(ItemType::Event);
540            item.set_payload(ContentType::Json, "{}");
541            item
542        });
543
544        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
545        assert_eq!(envelopes.len(), 1);
546        let (group, envelope) = envelopes.pop().unwrap();
547        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
548
549        let message = ProcessEnvelopeGrouped {
550            group,
551            envelope,
552            ctx: processing::Context::for_test(),
553        };
554
555        let Ok(Some(Submit::Envelope(new_envelope))) =
556            processor.process(&mut Token::noop(), message).await
557        else {
558            panic!();
559        };
560        let new_envelope = new_envelope.envelope();
561
562        assert_eq!(new_envelope.len(), 1);
563        assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event);
564    }
565
566    #[test]
567    fn test_from_outcome_type_sampled() {
568        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err());
569
570        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err());
571
572        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err());
573
574        assert!(matches!(
575            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"),
576            Err(())
577        ));
578
579        assert!(matches!(
580            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"),
581            Err(())
582        ));
583
584        assert!(matches!(
585            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"),
586            Err(())
587        ));
588
589        assert_eq!(
590            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"),
591            Ok(Outcome::FilteredSampling(RuleCategories(
592                [RuleCategory::Other].into()
593            )))
594        );
595
596        assert_eq!(
597            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
598            Ok(Outcome::FilteredSampling(RuleCategories(
599                [RuleCategory::Other].into()
600            )))
601        );
602
603        assert_eq!(
604            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
605            Ok(Outcome::FilteredSampling(RuleCategories(
606                [RuleCategory::Other].into()
607            )))
608        );
609
610        assert_eq!(
611            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"),
612            Ok(Outcome::FilteredSampling(RuleCategories(
613                [RuleCategory::BoostEnvironments].into()
614            )))
615        );
616
617        assert_eq!(
618            outcome_from_parts(
619                ClientReportField::FilteredSampling,
620                "Sampled:1001,1456,1567,3333,4444"
621            ),
622            Ok(Outcome::FilteredSampling(RuleCategories(
623                [
624                    RuleCategory::BoostEnvironments,
625                    RuleCategory::BoostLowVolumeTransactions,
626                    RuleCategory::BoostLatestReleases,
627                    RuleCategory::Custom
628                ]
629                .into()
630            )))
631        );
632    }
633
634    #[test]
635    fn test_from_outcome_type_filtered() {
636        assert!(matches!(
637            outcome_from_parts(ClientReportField::Filtered, "error-message"),
638            Ok(Outcome::Filtered(FilterStatKey::ErrorMessage))
639        ));
640
641        assert!(matches!(
642            outcome_from_parts(ClientReportField::Filtered, "hydration-error"),
643            Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_)))
644        ));
645    }
646
647    #[test]
648    fn test_from_outcome_type_client_discard() {
649        assert_eq!(
650            outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(),
651            Outcome::ClientDiscard("foo_reason".into())
652        );
653    }
654
655    #[test]
656    fn test_from_outcome_type_rate_limited() {
657        assert!(matches!(
658            outcome_from_parts(ClientReportField::RateLimited, ""),
659            Ok(Outcome::RateLimited(None))
660        ));
661        assert_eq!(
662            outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(),
663            Outcome::RateLimited(Some(ReasonCode::new("foo_reason")))
664        );
665    }
666
667    #[test]
668    fn test_trim_whitespaces() {
669        assert_eq!(trim_whitespaces(b""), b"");
670        assert_eq!(trim_whitespaces(b" \n\r "), b"");
671        assert_eq!(trim_whitespaces(b" \nx\r "), b"x");
672        assert_eq!(trim_whitespaces(b" {foo: bar} "), b"{foo: bar}");
673        assert_eq!(trim_whitespaces(b"{ foo: bar}"), b"{ foo: bar}");
674    }
675}