relay_server/services/processor/
report.rs

1//! Contains code related to validation and normalization of the user and client reports.
2
3use std::collections::BTreeMap;
4use std::error::Error;
5
6use chrono::{Duration as SignedDuration, Utc};
7use relay_common::time::UnixTimestamp;
8use relay_config::Config;
9use relay_event_normalization::ClockDriftProcessor;
10use relay_event_schema::protocol::{ClientReport, UserReport};
11use relay_filter::FilterStatKey;
12use relay_quotas::ReasonCode;
13use relay_sampling::evaluation::MatchedRuleIds;
14use relay_system::Addr;
15
16use crate::constants::DEFAULT_EVENT_RETENTION;
17use crate::envelope::{ContentType, ItemType};
18use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome};
19use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT};
20use crate::services::projects::project::ProjectInfo;
21use crate::utils::{ItemAction, TypedEnvelope};
22
23/// Fields of client reports that map to specific [`Outcome`]s without content.
24#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
25pub enum ClientReportField {
26    /// The event has been filtered by an inbound data filter.
27    Filtered,
28
29    /// The event has been filtered by a sampling rule.
30    FilteredSampling,
31
32    /// The event has been rate limited.
33    RateLimited,
34
35    /// The event has already been discarded on the client side.
36    ClientDiscard,
37}
38
39/// Validates and extracts client reports.
40///
41/// At the moment client reports are primarily used to transfer outcomes from
42/// client SDKs.  The outcomes are removed here and sent directly to the outcomes
43/// system.
44pub fn process_client_reports(
45    managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
46    config: &Config,
47    project_info: &ProjectInfo,
48    outcome_aggregator: Addr<TrackOutcome>,
49) {
50    // if client outcomes are disabled we leave the client reports unprocessed
51    // and pass them on.
52    if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
53        // if a processing relay has client outcomes disabled we drop them.
54        if config.processing_enabled() {
55            managed_envelope.retain_items(|item| match item.ty() {
56                ItemType::ClientReport => ItemAction::DropSilently,
57                _ => ItemAction::Keep,
58            });
59        }
60        return;
61    }
62
63    let mut timestamp = None;
64    let mut output_events = BTreeMap::new();
65    let received = managed_envelope.received_at();
66
67    let clock_drift_processor =
68        ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received)
69            .at_least(MINIMUM_CLOCK_DRIFT);
70
71    // we're going through all client reports but we're effectively just merging
72    // them into the first one.
73    managed_envelope.retain_items(|item| {
74        if item.ty() != &ItemType::ClientReport {
75            return ItemAction::Keep;
76        };
77        match ClientReport::parse(&item.payload()) {
78            Ok(ClientReport {
79                timestamp: report_timestamp,
80                discarded_events,
81                rate_limited_events,
82                filtered_events,
83                filtered_sampling_events,
84            }) => {
85                // Glue all discarded events together and give them the appropriate outcome type
86                let input_events =
87                    discarded_events
88                        .into_iter()
89                        .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
90                        .chain(
91                            filtered_events.into_iter().map(|discarded_event| {
92                                (ClientReportField::Filtered, discarded_event)
93                            }),
94                        )
95                        .chain(filtered_sampling_events.into_iter().map(|discarded_event| {
96                            (ClientReportField::FilteredSampling, discarded_event)
97                        }))
98                        .chain(rate_limited_events.into_iter().map(|discarded_event| {
99                            (ClientReportField::RateLimited, discarded_event)
100                        }));
101
102                for (outcome_type, discarded_event) in input_events {
103                    if discarded_event.reason.len() > 200 {
104                        relay_log::trace!("ignored client outcome with an overlong reason");
105                        continue;
106                    }
107                    *output_events
108                        .entry((
109                            outcome_type,
110                            discarded_event.reason,
111                            discarded_event.category,
112                        ))
113                        .or_insert(0) += discarded_event.quantity;
114                }
115                if let Some(ts) = report_timestamp {
116                    timestamp.get_or_insert(ts);
117                }
118            }
119            Err(err) => {
120                relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
121            }
122        }
123        ItemAction::DropSilently
124    });
125
126    if output_events.is_empty() {
127        return;
128    }
129
130    let timestamp =
131        timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));
132
133    if clock_drift_processor.is_drifted() {
134        relay_log::trace!("applying clock drift correction to client report");
135        clock_drift_processor.process_timestamp(timestamp);
136    }
137
138    let retention_days = project_info
139        .config()
140        .event_retention
141        .unwrap_or(DEFAULT_EVENT_RETENTION);
142    let max_age = SignedDuration::days(retention_days.into());
143    // also if we unable to parse the timestamp, we assume it's way too old here.
144    let in_past = timestamp
145        .as_datetime()
146        .map(|ts| (received - ts) > max_age)
147        .unwrap_or(true);
148    if in_past {
149        relay_log::trace!(
150            "skipping client outcomes older than {} days",
151            max_age.num_days()
152        );
153        return;
154    }
155
156    let max_future = SignedDuration::seconds(config.max_secs_in_future());
157    // also if we unable to parse the timestamp, we assume it's way far in the future here.
158    let in_future = timestamp
159        .as_datetime()
160        .map(|ts| (ts - received) > max_future)
161        .unwrap_or(true);
162    if in_future {
163        relay_log::trace!(
164            "skipping client outcomes more than {}s in the future",
165            max_future.num_seconds()
166        );
167        return;
168    }
169
170    for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
171        let outcome = match outcome_from_parts(outcome_type, &reason) {
172            Ok(outcome) => outcome,
173            Err(_) => {
174                relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
175                continue;
176            }
177        };
178
179        outcome_aggregator.send(TrackOutcome {
180            // If we get to this point, the unwrap should not be used anymore, since we know by
181            // now that the timestamp can be parsed, but just incase we fallback to UTC current
182            // `DateTime`.
183            timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
184            scoping: managed_envelope.scoping(),
185            outcome,
186            event_id: None,
187            remote_addr: None, // omitting the client address allows for better aggregation
188            category,
189            quantity,
190        });
191    }
192}
193
194/// Validates and normalizes all user report items in the envelope.
195///
196/// User feedback items are removed from the envelope if they contain invalid JSON or if the
197/// JSON violates the schema (basic type validation). Otherwise, their normalized representation
198/// is written back into the item.
199pub fn process_user_reports<Group>(managed_envelope: &mut TypedEnvelope<Group>) {
200    managed_envelope.retain_items(|item| {
201        if item.ty() != &ItemType::UserReport {
202            return ItemAction::Keep;
203        };
204
205        let payload = item.payload();
206        // There is a customer SDK which sends invalid reports with a trailing `\n`,
207        // strip it here, even if they update/fix their SDK there will still be many old
208        // versions with the broken SDK out there.
209        let payload = trim_whitespaces(&payload);
210        let report = match serde_json::from_slice::<UserReport>(payload) {
211            Ok(report) => report,
212            Err(error) => {
213                relay_log::error!(error = &error as &dyn Error, "failed to store user report");
214                return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
215            }
216        };
217
218        let json_string = match serde_json::to_string(&report) {
219            Ok(json) => json,
220            Err(err) => {
221                relay_log::error!(
222                    error = &err as &dyn Error,
223                    "failed to serialize user report"
224                );
225                return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
226            }
227        };
228
229        item.set_payload(ContentType::Json, json_string);
230        ItemAction::Keep
231    });
232}
233
234fn trim_whitespaces(data: &[u8]) -> &[u8] {
235    let Some(from) = data.iter().position(|x| !x.is_ascii_whitespace()) else {
236        return &[];
237    };
238    let Some(to) = data.iter().rposition(|x| !x.is_ascii_whitespace()) else {
239        return &[];
240    };
241    &data[from..to + 1]
242}
243
244/// Parse an outcome from an outcome ID and a reason string.
245///
246/// Currently only used to reconstruct outcomes encoded in client reports.
247fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
248    match field {
249        ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
250            Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
251                .map(RuleCategories::from)
252                .map(Outcome::FilteredSampling)
253                .map_err(|_| ()),
254            None => Err(()),
255        },
256        ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
257        ClientReportField::Filtered => Ok(Outcome::Filtered(
258            FilterStatKey::try_from(reason).map_err(|_| ())?,
259        )),
260        ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
261            "" => None,
262            other => Some(ReasonCode::new(other)),
263        })),
264    }
265}
266
267#[cfg(test)]
268mod tests {
269
270    use std::sync::Arc;
271
272    use relay_cogs::Token;
273    use relay_config::Config;
274    use relay_event_schema::protocol::EventId;
275    use relay_sampling::evaluation::ReservoirCounters;
276
277    use crate::envelope::{Envelope, Item};
278    use crate::extractors::RequestMeta;
279    use crate::services::outcome::RuleCategory;
280    use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit};
281    use crate::services::projects::project::ProjectInfo;
282    use crate::testutils::{self, create_test_processor};
283    use crate::utils::ManagedEnvelope;
284
285    use super::*;
286
287    #[tokio::test]
288    async fn test_client_report_removal() {
289        relay_test::setup();
290        let (outcome_aggregator, test_store) = testutils::processor_services();
291
292        let config = Config::from_json_value(serde_json::json!({
293            "outcomes": {
294                "emit_outcomes": true,
295                "emit_client_outcomes": true
296            }
297        }))
298        .unwrap();
299
300        let processor = create_test_processor(config).await;
301
302        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
303            .parse()
304            .unwrap();
305
306        let request_meta = RequestMeta::new(dsn);
307        let mut envelope = Envelope::from_request(None, request_meta);
308
309        envelope.add_item({
310            let mut item = Item::new(ItemType::ClientReport);
311            item.set_payload(
312                ContentType::Json,
313                r#"
314                    {
315                        "discarded_events": [
316                            ["queue_full", "error", 42]
317                        ]
318                    }
319                "#,
320            );
321            item
322        });
323
324        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
325        assert_eq!(envelopes.len(), 1);
326        let (group, envelope) = envelopes.pop().unwrap();
327
328        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
329        let message = ProcessEnvelopeGrouped {
330            group,
331            envelope,
332            project_info: Arc::new(ProjectInfo::default()),
333            rate_limits: Default::default(),
334            sampling_project_info: None,
335            reservoir_counters: ReservoirCounters::default(),
336        };
337
338        let envelope = processor
339            .process(&mut Token::noop(), message)
340            .await
341            .unwrap();
342        assert!(envelope.is_none());
343    }
344
345    #[tokio::test]
346    async fn test_client_report_forwarding() {
347        relay_test::setup();
348        let (outcome_aggregator, test_store) = testutils::processor_services();
349
350        let config = Config::from_json_value(serde_json::json!({
351            "outcomes": {
352                "emit_outcomes": false,
353                // a relay need to emit outcomes at all to not process.
354                "emit_client_outcomes": true
355            }
356        }))
357        .unwrap();
358
359        let processor = create_test_processor(config).await;
360
361        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
362            .parse()
363            .unwrap();
364
365        let request_meta = RequestMeta::new(dsn);
366        let mut envelope = Envelope::from_request(None, request_meta);
367
368        envelope.add_item({
369            let mut item = Item::new(ItemType::ClientReport);
370            item.set_payload(
371                ContentType::Json,
372                r#"
373                    {
374                        "discarded_events": [
375                            ["queue_full", "error", 42]
376                        ]
377                    }
378                "#,
379            );
380            item
381        });
382
383        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
384        assert_eq!(envelopes.len(), 1);
385        let (group, envelope) = envelopes.pop().unwrap();
386        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
387
388        let message = ProcessEnvelopeGrouped {
389            group,
390            envelope,
391            project_info: Arc::new(ProjectInfo::default()),
392            rate_limits: Default::default(),
393            sampling_project_info: None,
394            reservoir_counters: ReservoirCounters::default(),
395        };
396
397        let Ok(Some(Submit::Envelope(new_envelope))) =
398            processor.process(&mut Token::noop(), message).await
399        else {
400            panic!();
401        };
402        let item = new_envelope.envelope().items().next().unwrap();
403        assert_eq!(item.ty(), &ItemType::ClientReport);
404
405        new_envelope.accept(); // do not try to capture or emit outcomes
406    }
407
408    #[tokio::test]
409    #[cfg(feature = "processing")]
410    async fn test_client_report_removal_in_processing() {
411        relay_test::setup();
412        let (outcome_aggregator, test_store) = testutils::processor_services();
413
414        let config = Config::from_json_value(serde_json::json!({
415            "outcomes": {
416                "emit_outcomes": true,
417                "emit_client_outcomes": false,
418            },
419            "processing": {
420                "enabled": true,
421                "kafka_config": [],
422            }
423        }))
424        .unwrap();
425
426        let processor = create_test_processor(config).await;
427
428        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
429            .parse()
430            .unwrap();
431
432        let request_meta = RequestMeta::new(dsn);
433        let mut envelope = Envelope::from_request(None, request_meta);
434
435        envelope.add_item({
436            let mut item = Item::new(ItemType::ClientReport);
437            item.set_payload(
438                ContentType::Json,
439                r#"
440                    {
441                        "discarded_events": [
442                            ["queue_full", "error", 42]
443                        ]
444                    }
445                "#,
446            );
447            item
448        });
449
450        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
451        assert_eq!(envelopes.len(), 1);
452
453        let (group, envelope) = envelopes.pop().unwrap();
454        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
455        let message = ProcessEnvelopeGrouped {
456            group,
457            envelope,
458            project_info: Arc::new(ProjectInfo::default()),
459            rate_limits: Default::default(),
460            sampling_project_info: None,
461            reservoir_counters: ReservoirCounters::default(),
462        };
463
464        let envelope = processor
465            .process(&mut Token::noop(), message)
466            .await
467            .unwrap();
468        assert!(envelope.is_none());
469    }
470
471    #[tokio::test]
472    async fn test_user_report_only() {
473        relay_log::init_test!();
474        let processor = create_test_processor(Default::default()).await;
475        let (outcome_aggregator, test_store) = testutils::processor_services();
476        let event_id = EventId::new();
477
478        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
479            .parse()
480            .unwrap();
481
482        let request_meta = RequestMeta::new(dsn);
483        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
484
485        envelope.add_item({
486            let mut item = Item::new(ItemType::UserReport);
487            item.set_payload(
488                ContentType::Json,
489                format!(r#"{{"event_id": "{event_id}"}}"#),
490            );
491            item
492        });
493
494        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
495        assert_eq!(envelopes.len(), 1);
496
497        let (group, envelope) = envelopes.pop().unwrap();
498
499        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
500        let message = ProcessEnvelopeGrouped {
501            group,
502            envelope,
503            project_info: Arc::new(ProjectInfo::default()),
504            rate_limits: Default::default(),
505            sampling_project_info: None,
506            reservoir_counters: ReservoirCounters::default(),
507        };
508
509        let Ok(Some(Submit::Envelope(new_envelope))) =
510            processor.process(&mut Token::noop(), message).await
511        else {
512            panic!();
513        };
514        let new_envelope = new_envelope.envelope();
515
516        assert_eq!(new_envelope.len(), 1);
517        assert_eq!(
518            new_envelope.items().next().unwrap().ty(),
519            &ItemType::UserReport
520        );
521    }
522
523    #[tokio::test]
524    async fn test_user_report_invalid() {
525        let processor = create_test_processor(Default::default()).await;
526        let (outcome_aggregator, test_store) = testutils::processor_services();
527        let event_id = EventId::new();
528
529        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
530            .parse()
531            .unwrap();
532
533        let request_meta = RequestMeta::new(dsn);
534        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
535
536        envelope.add_item({
537            let mut item = Item::new(ItemType::UserReport);
538            item.set_payload(ContentType::Json, r#"{"foo": "bar"}"#);
539            item
540        });
541
542        envelope.add_item({
543            let mut item = Item::new(ItemType::Event);
544            item.set_payload(ContentType::Json, "{}");
545            item
546        });
547
548        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
549        assert_eq!(envelopes.len(), 1);
550        let (group, envelope) = envelopes.pop().unwrap();
551        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
552
553        let message = ProcessEnvelopeGrouped {
554            group,
555            envelope,
556            project_info: Arc::new(ProjectInfo::default()),
557            rate_limits: Default::default(),
558            sampling_project_info: None,
559            reservoir_counters: ReservoirCounters::default(),
560        };
561
562        let Ok(Some(Submit::Envelope(new_envelope))) =
563            processor.process(&mut Token::noop(), message).await
564        else {
565            panic!();
566        };
567        let new_envelope = new_envelope.envelope();
568
569        assert_eq!(new_envelope.len(), 1);
570        assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event);
571    }
572
573    #[test]
574    fn test_from_outcome_type_sampled() {
575        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err());
576
577        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err());
578
579        assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err());
580
581        assert!(matches!(
582            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"),
583            Err(())
584        ));
585
586        assert!(matches!(
587            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"),
588            Err(())
589        ));
590
591        assert!(matches!(
592            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"),
593            Err(())
594        ));
595
596        assert_eq!(
597            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"),
598            Ok(Outcome::FilteredSampling(RuleCategories(
599                [RuleCategory::Other].into()
600            )))
601        );
602
603        assert_eq!(
604            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
605            Ok(Outcome::FilteredSampling(RuleCategories(
606                [RuleCategory::Other].into()
607            )))
608        );
609
610        assert_eq!(
611            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"),
612            Ok(Outcome::FilteredSampling(RuleCategories(
613                [RuleCategory::Other].into()
614            )))
615        );
616
617        assert_eq!(
618            outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"),
619            Ok(Outcome::FilteredSampling(RuleCategories(
620                [RuleCategory::BoostEnvironments].into()
621            )))
622        );
623
624        assert_eq!(
625            outcome_from_parts(
626                ClientReportField::FilteredSampling,
627                "Sampled:1001,1456,1567,3333,4444"
628            ),
629            Ok(Outcome::FilteredSampling(RuleCategories(
630                [
631                    RuleCategory::BoostEnvironments,
632                    RuleCategory::BoostLowVolumeTransactions,
633                    RuleCategory::BoostLatestReleases,
634                    RuleCategory::Custom
635                ]
636                .into()
637            )))
638        );
639    }
640
641    #[test]
642    fn test_from_outcome_type_filtered() {
643        assert!(matches!(
644            outcome_from_parts(ClientReportField::Filtered, "error-message"),
645            Ok(Outcome::Filtered(FilterStatKey::ErrorMessage))
646        ));
647
648        assert!(matches!(
649            outcome_from_parts(ClientReportField::Filtered, "hydration-error"),
650            Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_)))
651        ));
652    }
653
654    #[test]
655    fn test_from_outcome_type_client_discard() {
656        assert_eq!(
657            outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(),
658            Outcome::ClientDiscard("foo_reason".into())
659        );
660    }
661
662    #[test]
663    fn test_from_outcome_type_rate_limited() {
664        assert!(matches!(
665            outcome_from_parts(ClientReportField::RateLimited, ""),
666            Ok(Outcome::RateLimited(None))
667        ));
668        assert_eq!(
669            outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(),
670            Outcome::RateLimited(Some(ReasonCode::new("foo_reason")))
671        );
672    }
673
674    #[test]
675    fn test_trim_whitespaces() {
676        assert_eq!(trim_whitespaces(b""), b"");
677        assert_eq!(trim_whitespaces(b" \n\r "), b"");
678        assert_eq!(trim_whitespaces(b" \nx\r "), b"x");
679        assert_eq!(trim_whitespaces(b" {foo: bar} "), b"{foo: bar}");
680        assert_eq!(trim_whitespaces(b"{ foo: bar}"), b"{ foo: bar}");
681    }
682}