Skip to main content

relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task;
10
11use bytes::Bytes;
12use chrono::{DateTime, SecondsFormat, Utc};
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use uuid::Uuid;
17
18use relay_base_schema::data_category::DataCategory;
19use relay_base_schema::organization::OrganizationId;
20use relay_base_schema::project::ProjectId;
21use relay_common::time::UnixTimestamp;
22use relay_config::Config;
23use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
24use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
25use relay_metrics::{
26    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
27    MetricNamespace, SetView,
28};
29use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
30use relay_quotas::Scoping;
31use relay_statsd::metric;
32use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
33use relay_threading::AsyncPool;
34
35use crate::envelope::{AttachmentPlaceholder, AttachmentType, Item, ItemType};
36use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
37use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
38use crate::service::ServiceError;
39use crate::services::global_config::GlobalConfigHandle;
40use crate::services::outcome::{self, DiscardReason, Outcome, OutcomeId, TrackOutcome};
41use crate::services::upload::{Final, SignedLocation};
42use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
43use crate::utils::{self, FormDataIter};
44
45mod sessions;
46
47/// Fallback name used for attachment items without a `filename` header.
48const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
49
50#[derive(Debug, thiserror::Error)]
51pub enum StoreError {
52    #[error("failed to send the message to kafka: {0}")]
53    SendFailed(#[from] ClientError),
54    #[error("failed to encode data: {0}")]
55    EncodingFailed(std::io::Error),
56    #[error("failed to store event because event id was missing")]
57    NoEventId,
58    #[error("invalid attachment reference")]
59    InvalidAttachmentRef,
60}
61
62impl OutcomeError for StoreError {
63    type Error = Self;
64
65    fn consume(self) -> (Option<Outcome>, Self::Error) {
66        let outcome = match self {
67            StoreError::SendFailed(_) | StoreError::EncodingFailed(_) | StoreError::NoEventId => {
68                Some(Outcome::Invalid(DiscardReason::Internal))
69            }
70            StoreError::InvalidAttachmentRef => {
71                Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
72            }
73        };
74        (outcome, self)
75    }
76}
77
78struct Producer {
79    client: KafkaClient,
80}
81
82impl Producer {
83    pub fn create(config: &Config) -> anyhow::Result<Self> {
84        let mut client_builder = KafkaClient::builder();
85
86        for topic in KafkaTopic::iter() {
87            let kafka_configs = config.kafka_configs(*topic)?;
88            client_builder = client_builder
89                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
90                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
91        }
92
93        Ok(Self {
94            client: client_builder.build(),
95        })
96    }
97}
98
99/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
100#[derive(Debug)]
101pub struct StoreEnvelope {
102    pub envelope: ManagedEnvelope,
103}
104
105/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
106#[derive(Clone, Debug)]
107pub struct StoreMetrics {
108    pub buckets: Vec<Bucket>,
109    pub scoping: Scoping,
110    pub retention: u16,
111}
112
113/// Publishes a log item to the Sentry core application through Kafka.
114#[derive(Debug)]
115pub struct StoreTraceItem {
116    /// The final trace item which will be produced to Kafka.
117    pub trace_item: TraceItem,
118}
119
120impl Counted for StoreTraceItem {
121    fn quantities(&self) -> Quantities {
122        self.trace_item.quantities()
123    }
124}
125
126/// Publishes a span item to the Sentry core application through Kafka.
127#[derive(Debug)]
128pub struct StoreSpanV2 {
129    /// Routing key to assign a Kafka partition.
130    pub routing_key: Option<Uuid>,
131    /// Default retention of the span.
132    pub retention_days: u16,
133    /// Downsampled retention of the span.
134    pub downsampled_retention_days: u16,
135    /// Optional event id, if the span was extracted from a transaction.
136    pub event_id: Option<EventId>,
137    /// The final Sentry compatible span item.
138    pub item: SpanV2,
139    // Whether to run issue detection on the transaction or on the segment span.
140    // (only true for segment spans created from a transaction).
141    pub performance_issues_spans: bool,
142}
143
144impl Counted for StoreSpanV2 {
145    fn quantities(&self) -> Quantities {
146        smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
147    }
148}
149
150/// Publishes a singular profile chunk to Kafka.
151#[derive(Debug)]
152pub struct StoreProfileChunk {
153    /// Default retention of the span.
154    pub retention_days: u16,
155    /// The serialized profile chunk payload.
156    pub payload: Bytes,
157    /// Outcome quantities associated with this profile.
158    ///
159    /// Quantities are different for backend and ui profile chunks.
160    pub quantities: Quantities,
161}
162
163impl Counted for StoreProfileChunk {
164    fn quantities(&self) -> Quantities {
165        self.quantities.clone()
166    }
167}
168
169/// A replay to be stored to Kafka.
170#[derive(Debug)]
171pub struct StoreReplay {
172    /// The event ID.
173    pub event_id: EventId,
174    /// Number of days to retain.
175    pub retention_days: u16,
176    /// The recording payload (rrweb data).
177    pub recording: Bytes,
178    /// Optional replay event payload (JSON).
179    pub event: Option<Bytes>,
180    /// Optional replay video.
181    pub video: Option<Bytes>,
182    /// Outcome quantities associated with this replay.
183    ///
184    /// Quantities are different for web and native replays.
185    pub quantities: Quantities,
186}
187
188impl Counted for StoreReplay {
189    fn quantities(&self) -> Quantities {
190        self.quantities.clone()
191    }
192}
193
194/// An attachment to be stored to Kafka.
195#[derive(Debug)]
196pub struct StoreAttachment {
197    /// The event ID.
198    pub event_id: EventId,
199    /// That attachment item.
200    pub attachment: Item,
201    /// Outcome quantities associated with this attachment.
202    pub quantities: Quantities,
203    /// Data retention in days for this attachment.
204    pub retention: u16,
205}
206
207impl Counted for StoreAttachment {
208    fn quantities(&self) -> Quantities {
209        self.quantities.clone()
210    }
211}
212
213/// A user report to be stored to Kafka.
214#[derive(Debug)]
215pub struct StoreUserReport {
216    /// The event ID.
217    pub event_id: EventId,
218    /// The user report.
219    pub report: Item,
220}
221
222impl Counted for StoreUserReport {
223    fn quantities(&self) -> Quantities {
224        smallvec::smallvec![(DataCategory::UserReportV2, 1)]
225    }
226}
227
228/// A profile to be stored to Kafka.
229#[derive(Debug)]
230pub struct StoreProfile {
231    /// Number of days to retain.
232    pub retention_days: u16,
233    /// The profile.
234    pub profile: Item,
235    /// Outcome quantities associated with this profile.
236    pub quantities: Quantities,
237}
238
239impl Counted for StoreProfile {
240    fn quantities(&self) -> Quantities {
241        self.quantities.clone()
242    }
243}
244
245/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
246pub type StoreServicePool = AsyncPool<StoreTask>;
247
248/// Service interface for the [`StoreEnvelope`] message.
249#[derive(Debug)]
250pub enum Store {
251    /// An envelope containing a mixture of items.
252    ///
253    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
254    /// for example logs must be submitted via [`Self::TraceItem`] instead.
255    ///
256    /// Long term this variant is going to be replaced with fully typed variants of items which can
257    /// be stored instead.
258    Envelope(StoreEnvelope),
259    /// Aggregated generic metrics.
260    Metrics(StoreMetrics),
261    /// A singular [`TraceItem`].
262    TraceItem(Managed<StoreTraceItem>),
263    /// A singular Span.
264    Span(Managed<Box<StoreSpanV2>>),
265    /// A singular profile chunk.
266    ProfileChunk(Managed<StoreProfileChunk>),
267    /// A singular replay.
268    Replay(Managed<StoreReplay>),
269    /// A singular attachment.
270    Attachment(Managed<StoreAttachment>),
271    /// A singular user report.
272    UserReport(Managed<StoreUserReport>),
273    /// A single profile.
274    Profile(Managed<StoreProfile>),
275}
276
277impl Store {
278    /// Returns the name of the message variant.
279    fn variant(&self) -> &'static str {
280        match self {
281            Store::Envelope(_) => "envelope",
282            Store::Metrics(_) => "metrics",
283            Store::TraceItem(_) => "trace_item",
284            Store::Span(_) => "span",
285            Store::ProfileChunk(_) => "profile_chunk",
286            Store::Replay(_) => "replay",
287            Store::Attachment(_) => "attachment",
288            Store::UserReport(_) => "user_report",
289            Store::Profile(_) => "profile",
290        }
291    }
292}
293
294impl Interface for Store {}
295
296impl FromMessage<StoreEnvelope> for Store {
297    type Response = NoResponse;
298
299    fn from_message(message: StoreEnvelope, _: ()) -> Self {
300        Self::Envelope(message)
301    }
302}
303
304impl FromMessage<StoreMetrics> for Store {
305    type Response = NoResponse;
306
307    fn from_message(message: StoreMetrics, _: ()) -> Self {
308        Self::Metrics(message)
309    }
310}
311
312impl FromMessage<Managed<StoreTraceItem>> for Store {
313    type Response = NoResponse;
314
315    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
316        Self::TraceItem(message)
317    }
318}
319
320impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
321    type Response = NoResponse;
322
323    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
324        Self::Span(message)
325    }
326}
327
328impl FromMessage<Managed<StoreProfileChunk>> for Store {
329    type Response = NoResponse;
330
331    fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
332        Self::ProfileChunk(message)
333    }
334}
335
336impl FromMessage<Managed<StoreReplay>> for Store {
337    type Response = NoResponse;
338
339    fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
340        Self::Replay(message)
341    }
342}
343
344impl FromMessage<Managed<StoreAttachment>> for Store {
345    type Response = NoResponse;
346
347    fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
348        Self::Attachment(message)
349    }
350}
351
352impl FromMessage<Managed<StoreUserReport>> for Store {
353    type Response = NoResponse;
354
355    fn from_message(message: Managed<StoreUserReport>, _: ()) -> Self {
356        Self::UserReport(message)
357    }
358}
359
360impl FromMessage<Managed<StoreProfile>> for Store {
361    type Response = NoResponse;
362
363    fn from_message(message: Managed<StoreProfile>, _: ()) -> Self {
364        Self::Profile(message)
365    }
366}
367
368/// Service implementing the [`Store`] interface.
369pub struct StoreService {
370    pool: StoreServicePool,
371    config: Arc<Config>,
372    global_config: GlobalConfigHandle,
373    outcome_aggregator: Addr<TrackOutcome>,
374    metric_outcomes: MetricOutcomes,
375    producer: Producer,
376}
377
378impl StoreService {
379    pub fn create(
380        pool: StoreServicePool,
381        config: Arc<Config>,
382        global_config: GlobalConfigHandle,
383        outcome_aggregator: Addr<TrackOutcome>,
384        metric_outcomes: MetricOutcomes,
385    ) -> anyhow::Result<Self> {
386        let producer = Producer::create(&config)?;
387        Ok(Self {
388            pool,
389            config,
390            global_config,
391            outcome_aggregator,
392            metric_outcomes,
393            producer,
394        })
395    }
396
397    fn handle_message(&self, message: Store) {
398        let ty = message.variant();
399        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
400            let result = match message {
401                Store::Envelope(message) => self.handle_store_envelope(message),
402                Store::Metrics(message) => {
403                    self.handle_store_metrics(message);
404                    Ok(())
405                }
406                Store::TraceItem(message) => self.handle_store_trace_item(message),
407                Store::Span(message) => self.handle_store_span(message),
408                Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
409                Store::Replay(message) => self.handle_store_replay(message),
410                Store::Attachment(message) => self.handle_store_attachment(message),
411                Store::UserReport(message) => self.handle_user_report(message),
412                Store::Profile(message) => self.handle_profile(message),
413            };
414            if let Err(error) = result {
415                relay_log::error!(
416                    error = &error as &dyn Error,
417                    tags.message = ty,
418                    "failed to store message"
419                );
420            }
421        })
422    }
423
424    fn handle_store_envelope(&self, message: StoreEnvelope) -> Result<(), Rejected<StoreError>> {
425        let StoreEnvelope { mut envelope } = message;
426
427        match self.store_envelope(&mut envelope) {
428            Ok(()) => {
429                envelope.accept();
430                Ok(())
431            }
432            Err(error) => {
433                // The envelope is now empty. `ManagedEnvelope` still counts items correctly
434                // through `EnvelopeSummary`, but `Managed<Box<Envelope>>` does not.
435                // -> Reject the old way and return a dummy item.
436                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
437                Err(Managed::with_meta_from_managed_envelope(&envelope, ()).reject_err(error))
438            }
439        }
440    }
441
442    fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
443        let mut envelope = managed_envelope.take_envelope();
444        let received_at = managed_envelope.received_at();
445        let scoping = managed_envelope.scoping();
446
447        let retention = envelope.retention();
448
449        let event_id = envelope.event_id();
450        let event_item = envelope.as_mut().take_item_by(|item| {
451            matches!(
452                item.ty(),
453                ItemType::Event | ItemType::Transaction | ItemType::Security
454            )
455        });
456        let event_type = event_item.as_ref().map(|item| item.ty());
457
458        // Some error events like minidumps need all attachment chunks to be processed _before_
459        // the event payload on the consumer side. Transaction attachments do not require this ordering
460        // guarantee, so they do not have to go to the same topic as their event payload.
461        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
462            KafkaTopic::Transactions
463        } else if envelope.get_item_by(is_slow_item).is_some() {
464            KafkaTopic::Attachments
465        } else {
466            KafkaTopic::Events
467        };
468
469        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
470
471        let mut attachments = Vec::new();
472
473        for item in envelope.items() {
474            match item.ty() {
475                ItemType::Attachment => {
476                    if let Some(attachment) = self.produce_attachment(
477                        event_id.ok_or(StoreError::NoEventId)?,
478                        scoping.project_id,
479                        scoping.organization_id,
480                        item,
481                        send_individual_attachments,
482                        retention,
483                    )? {
484                        attachments.push(attachment);
485                    }
486                }
487                ItemType::UserReport => {
488                    debug_assert!(event_topic == KafkaTopic::Attachments);
489                    self.produce_user_report(
490                        event_id.ok_or(StoreError::NoEventId)?,
491                        scoping.project_id,
492                        scoping.organization_id,
493                        received_at,
494                        item,
495                    )?;
496                }
497                ItemType::UserReportV2 => {
498                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
499                    self.produce_user_report_v2(
500                        event_id.ok_or(StoreError::NoEventId)?,
501                        scoping.project_id,
502                        scoping.organization_id,
503                        received_at,
504                        item,
505                        remote_addr,
506                    )?;
507                }
508                ItemType::Profile => self.produce_profile(
509                    scoping.organization_id,
510                    scoping.project_id,
511                    scoping.key_id,
512                    received_at,
513                    retention,
514                    item,
515                )?,
516                ItemType::CheckIn => {
517                    let client = envelope.meta().client();
518                    self.produce_check_in(
519                        scoping.project_id,
520                        scoping.organization_id,
521                        received_at,
522                        client,
523                        retention,
524                        item,
525                    )?
526                }
527                ty @ (ItemType::Log | ItemType::Span) => {
528                    debug_assert!(
529                        false,
530                        "received {ty} through an envelope, \
531                        this item must be submitted via a specific store message instead"
532                    );
533                    relay_log::error!(
534                        tags.project_key = %scoping.project_key,
535                        "StoreService received unsupported item type '{ty}' in envelope"
536                    );
537                }
538                other => {
539                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
540                    let item_types = envelope
541                        .items()
542                        .map(|item| item.ty().as_str())
543                        .collect::<Vec<_>>();
544                    let attachment_types = envelope
545                        .items()
546                        .map(|item| {
547                            item.attachment_type()
548                                .map(|t| t.to_string())
549                                .unwrap_or_default()
550                        })
551                        .collect::<Vec<_>>();
552
553                    relay_log::with_scope(
554                        |scope| {
555                            scope.set_extra("item_types", item_types.into());
556                            scope.set_extra("attachment_types", attachment_types.into());
557                            if other == &ItemType::FormData {
558                                let payload = item.payload();
559                                let form_data_keys = FormDataIter::new(&payload)
560                                    .map(|entry| entry.key())
561                                    .collect::<Vec<_>>();
562                                scope.set_extra("form_data_keys", form_data_keys.into());
563                            }
564                        },
565                        || {
566                            relay_log::error!(
567                                tags.project_key = %scoping.project_key,
568                                tags.event_type = event_type.unwrap_or("none"),
569                                "StoreService received unexpected item type: {other}"
570                            )
571                        },
572                    )
573                }
574            }
575        }
576
577        if let Some(event_item) = event_item {
578            let event_id = event_id.ok_or(StoreError::NoEventId)?;
579            let project_id = scoping.project_id;
580            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
581
582            self.produce(
583                event_topic,
584                KafkaMessage::Event(EventKafkaMessage {
585                    payload: event_item.payload(),
586                    start_time: safe_timestamp(received_at),
587                    event_id,
588                    project_id,
589                    remote_addr,
590                    attachments,
591                    org_id: scoping.organization_id,
592                }),
593            )?;
594        } else {
595            debug_assert!(attachments.is_empty());
596        }
597
598        Ok(())
599    }
600
601    fn handle_store_metrics(&self, message: StoreMetrics) {
602        let StoreMetrics {
603            buckets,
604            scoping,
605            retention,
606        } = message;
607
608        let batch_size = self.config.metrics_max_batch_size_bytes();
609        let mut error = None;
610
611        let global_config = self.global_config.current().unwrap_or_default();
612        let mut encoder = BucketEncoder::new(&global_config);
613
614        let emit_sessions_to_eap = utils::is_rolled_out(
615            scoping.organization_id.value(),
616            global_config.options.sessions_eap_rollout_rate,
617        )
618        .is_keep();
619
620        let now = UnixTimestamp::now();
621        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
622
623        for mut bucket in buckets {
624            let namespace = encoder.prepare(&mut bucket);
625
626            if let Some(received_at) = bucket.metadata.received_at {
627                let delay = now.as_secs().saturating_sub(received_at.as_secs());
628                let (total, count, max) = delay_stats.get_mut(namespace);
629                *total += delay;
630                *count += 1;
631                *max = (*max).max(delay);
632            }
633
634            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
635            // each bucket separately, we only need to split buckets that exceed the size, but not
636            // batches.
637            for view in BucketsView::new(std::slice::from_ref(&bucket))
638                .by_size(batch_size)
639                .flatten()
640            {
641                let message =
642                    self.create_metric_message(&scoping, &mut encoder, namespace, &view, retention);
643
644                let result =
645                    message.and_then(|message| self.send_metric_message(namespace, message));
646
647                let outcome = match result {
648                    Ok(()) => Outcome::Accepted,
649                    Err(e) => {
650                        error.get_or_insert(e);
651                        Outcome::Invalid(DiscardReason::Internal)
652                    }
653                };
654
655                self.metric_outcomes.track(scoping, &[view], outcome);
656            }
657
658            if emit_sessions_to_eap
659                && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
660            {
661                let message = KafkaMessage::for_item(scoping, trace_item);
662                let res = self.produce(KafkaTopic::Items, message);
663                if let Err(error) = res {
664                    relay_log::error!(
665                        error = &error as &dyn std::error::Error,
666                        "failed to produce session metrics to EAP"
667                    )
668                }
669            }
670        }
671
672        if let Some(error) = error {
673            relay_log::error!(
674                error = &error as &dyn std::error::Error,
675                "failed to produce metric buckets: {error}"
676            );
677        }
678
679        for (namespace, (total, count, max)) in delay_stats {
680            if count == 0 {
681                continue;
682            }
683            metric!(
684                counter(RelayCounters::MetricDelaySum) += total,
685                namespace = namespace.as_str()
686            );
687            metric!(
688                counter(RelayCounters::MetricDelayCount) += count,
689                namespace = namespace.as_str()
690            );
691            metric!(
692                gauge(RelayGauges::MetricDelayMax) = max,
693                namespace = namespace.as_str()
694            );
695        }
696    }
697
698    fn handle_store_trace_item(
699        &self,
700        message: Managed<StoreTraceItem>,
701    ) -> Result<(), Rejected<StoreError>> {
702        let scoping = message.scoping();
703        let received_at = message.received_at();
704
705        let eap_emits_outcomes = utils::is_rolled_out(
706            scoping.organization_id.value(),
707            self.global_config
708                .current()
709                .unwrap_or_default()
710                .options
711                .eap_outcomes_rollout_rate,
712        )
713        .is_keep();
714
715        let outcomes = message.try_accept(|mut item| {
716            let outcomes = match eap_emits_outcomes {
717                true => None,
718                false => item.trace_item.outcomes.take(),
719            };
720
721            let message = KafkaMessage::for_item(scoping, item.trace_item);
722            self.produce(KafkaTopic::Items, message).map(|()| outcomes)
723        })?;
724
725        // Accepted outcomes when items have been successfully produced to rdkafka.
726        //
727        // This is only a temporary measure, long term these outcomes will be part of the trace
728        // item and emitted by Snuba to guarantee a delivery to storage.
729        if let Some(outcomes) = outcomes {
730            for (category, quantity) in outcomes.quantities() {
731                self.outcome_aggregator.send(TrackOutcome {
732                    category,
733                    event_id: None,
734                    outcome: Outcome::Accepted,
735                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
736                    remote_addr: None,
737                    scoping,
738                    timestamp: received_at,
739                });
740            }
741        }
742
743        Ok(())
744    }
745
746    fn handle_store_span(
747        &self,
748        message: Managed<Box<StoreSpanV2>>,
749    ) -> Result<(), Rejected<StoreError>> {
750        let scoping = message.scoping();
751        let received_at = message.received_at();
752
753        let relay_emits_accepted_outcome = !utils::is_rolled_out(
754            scoping.organization_id.value(),
755            self.global_config
756                .current()
757                .unwrap_or_default()
758                .options
759                .eap_span_outcomes_rollout_rate,
760        )
761        .is_keep();
762
763        let meta = SpanMeta {
764            organization_id: scoping.organization_id,
765            project_id: scoping.project_id,
766            key_id: scoping.key_id,
767            event_id: message.event_id,
768            retention_days: message.retention_days,
769            downsampled_retention_days: message.downsampled_retention_days,
770            received: datetime_to_timestamp(received_at),
771            accepted_outcome_emitted: relay_emits_accepted_outcome,
772            performance_issues_spans: message.performance_issues_spans,
773        };
774
775        message.try_accept(|span| {
776            let item = Annotated::new(span.item);
777            let message = KafkaMessage::SpanV2 {
778                routing_key: span.routing_key,
779                headers: BTreeMap::from([(
780                    "project_id".to_owned(),
781                    scoping.project_id.to_string(),
782                )]),
783                message: SpanKafkaMessage {
784                    meta,
785                    span: SerializableAnnotated(&item),
786                },
787                org_id: scoping.organization_id,
788            };
789
790            self.produce(KafkaTopic::Spans, message)
791        })?;
792
793        relay_statsd::metric!(
794            counter(RelayCounters::SpanV2Produced) += 1,
795            via = "processing"
796        );
797
798        if relay_emits_accepted_outcome {
799            // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
800            // or the segments consumer, depending on which will produce outcomes later.
801            self.outcome_aggregator.send(TrackOutcome {
802                category: DataCategory::SpanIndexed,
803                event_id: None,
804                outcome: Outcome::Accepted,
805                quantity: 1,
806                remote_addr: None,
807                scoping,
808                timestamp: received_at,
809            });
810        }
811
812        Ok(())
813    }
814
815    fn handle_store_profile_chunk(
816        &self,
817        message: Managed<StoreProfileChunk>,
818    ) -> Result<(), Rejected<StoreError>> {
819        let scoping = message.scoping();
820        let received_at = message.received_at();
821
822        message.try_accept(|message| {
823            let message = ProfileChunkKafkaMessage {
824                organization_id: scoping.organization_id,
825                project_id: scoping.project_id,
826                received: safe_timestamp(received_at),
827                retention_days: message.retention_days,
828                headers: BTreeMap::from([(
829                    "project_id".to_owned(),
830                    scoping.project_id.to_string(),
831                )]),
832                payload: message.payload,
833            };
834
835            self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
836        })
837    }
838
839    fn handle_store_replay(
840        &self,
841        message: Managed<StoreReplay>,
842    ) -> Result<(), Rejected<StoreError>> {
843        let scoping = message.scoping();
844        let received_at = message.received_at();
845
846        message.try_accept(|replay| {
847            let kafka_msg =
848                KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
849                    replay_id: replay.event_id,
850                    key_id: scoping.key_id,
851                    org_id: scoping.organization_id,
852                    project_id: scoping.project_id,
853                    received: safe_timestamp(received_at),
854                    retention_days: replay.retention_days,
855                    payload: &replay.recording,
856                    replay_event: replay.event.as_deref(),
857                    replay_video: replay.video.as_deref(),
858                    // Hardcoded to `true` to indicate to the consumer that it should always publish the
859                    // replay_event as relay no longer does it.
860                    relay_snuba_publish_disabled: true,
861                });
862            self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
863        })
864    }
865
866    fn handle_store_attachment(
867        &self,
868        message: Managed<StoreAttachment>,
869    ) -> Result<(), Rejected<StoreError>> {
870        let scoping = message.scoping();
871        message.try_accept(|attachment| {
872            let result = self.produce_attachment(
873                attachment.event_id,
874                scoping.project_id,
875                scoping.organization_id,
876                &attachment.attachment,
877                // Hardcoded to `true` since standalone attachments are 'individual attachments'.
878                true,
879                attachment.retention,
880            );
881            // Since we are sending an 'individual attachment' this function should never return a
882            // `ChunkedAttachment`.
883            debug_assert!(!matches!(result, Ok(Some(_))));
884            result.map(|_| ())
885        })
886    }
887
888    fn handle_user_report(
889        &self,
890        message: Managed<StoreUserReport>,
891    ) -> Result<(), Rejected<StoreError>> {
892        let scoping = message.scoping();
893        let received_at = message.received_at();
894
895        message.try_accept(|report| {
896            let kafka_msg = KafkaMessage::UserReport(UserReportKafkaMessage {
897                project_id: scoping.project_id,
898                event_id: report.event_id,
899                start_time: safe_timestamp(received_at),
900                payload: report.report.payload(),
901                org_id: scoping.organization_id,
902            });
903            self.produce(KafkaTopic::Attachments, kafka_msg)
904        })
905    }
906
907    fn handle_profile(&self, message: Managed<StoreProfile>) -> Result<(), Rejected<StoreError>> {
908        let scoping = message.scoping();
909        let received_at = message.received_at();
910
911        message.try_accept(|profile| {
912            self.produce_profile(
913                scoping.organization_id,
914                scoping.project_id,
915                scoping.key_id,
916                received_at,
917                profile.retention_days,
918                &profile.profile,
919            )
920        })
921    }
922
923    fn create_metric_message<'a>(
924        &self,
925        scoping: &Scoping,
926        encoder: &'a mut BucketEncoder,
927        namespace: MetricNamespace,
928        view: &BucketView<'a>,
929        retention_days: u16,
930    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
931        let value = match view.value() {
932            BucketViewValue::Counter(c) => MetricValue::Counter(c),
933            BucketViewValue::Distribution(data) => MetricValue::Distribution(
934                encoder
935                    .encode_distribution(namespace, data)
936                    .map_err(StoreError::EncodingFailed)?,
937            ),
938            BucketViewValue::Set(data) => MetricValue::Set(
939                encoder
940                    .encode_set(namespace, data)
941                    .map_err(StoreError::EncodingFailed)?,
942            ),
943            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
944        };
945
946        Ok(MetricKafkaMessage {
947            org_id: scoping.organization_id,
948            project_id: scoping.project_id,
949            key_id: scoping.key_id,
950            name: view.name(),
951            value,
952            timestamp: view.timestamp(),
953            tags: view.tags(),
954            retention_days,
955            received_at: view.metadata().received_at,
956        })
957    }
958
959    fn produce(
960        &self,
961        topic: KafkaTopic,
962        // Takes message by value to ensure it is not being produced twice.
963        message: KafkaMessage,
964    ) -> Result<(), StoreError> {
965        relay_log::trace!("Sending kafka message of type {}", message.variant());
966
967        let topic_name = self.producer.client.send_message(topic, &message)?;
968
969        match &message {
970            KafkaMessage::Metric {
971                message: metric, ..
972            } => {
973                metric!(
974                    counter(RelayCounters::ProcessingMessageProduced) += 1,
975                    event_type = message.variant(),
976                    topic = topic_name,
977                    metric_type = metric.value.variant(),
978                    metric_encoding = metric.value.encoding().unwrap_or(""),
979                );
980            }
981            KafkaMessage::ReplayRecordingNotChunked(replay) => {
982                let has_video = replay.replay_video.is_some();
983
984                metric!(
985                    counter(RelayCounters::ProcessingMessageProduced) += 1,
986                    event_type = message.variant(),
987                    topic = topic_name,
988                    has_video = bool_to_str(has_video),
989                );
990            }
991            message => {
992                metric!(
993                    counter(RelayCounters::ProcessingMessageProduced) += 1,
994                    event_type = message.variant(),
995                    topic = topic_name,
996                );
997            }
998        }
999
1000        Ok(())
1001    }
1002
1003    fn chunked_attachment_from_placeholder(
1004        &self,
1005        item: &Item,
1006        retention_days: u16,
1007    ) -> Result<ChunkedAttachment, StoreError> {
1008        debug_assert!(
1009            item.stored_key().is_none(),
1010            "AttachmentRef should not have been uploaded to objectstore"
1011        );
1012
1013        let payload = item.payload();
1014        let placeholder: AttachmentPlaceholder<'_> =
1015            serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?;
1016        let location = SignedLocation::<Final>::try_from_str(placeholder.location)
1017            .ok_or(StoreError::InvalidAttachmentRef)?
1018            .verify(Utc::now(), &self.config)
1019            .map_err(|_| StoreError::InvalidAttachmentRef)?;
1020
1021        let store_key = location.key;
1022
1023        Ok(ChunkedAttachment {
1024            id: Uuid::new_v4().to_string(),
1025            name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(),
1026            rate_limited: item.rate_limited(),
1027            content_type: placeholder.content_type,
1028            attachment_type: item.attachment_type().unwrap_or_default(),
1029            size: item.attachment_body_size(),
1030            retention_days,
1031            payload: AttachmentPayload::Stored(store_key),
1032        })
1033    }
1034
1035    fn chunked_attachment_from_attachment(
1036        &self,
1037        event_id: EventId,
1038        project_id: ProjectId,
1039        org_id: OrganizationId,
1040        item: &Item,
1041        send_individual_attachments: bool,
1042        retention_days: u16,
1043    ) -> Result<ChunkedAttachment, StoreError> {
1044        let id = Uuid::new_v4().to_string();
1045
1046        let payload = item.payload();
1047        let size = item.len();
1048        let max_chunk_size = self.config.attachment_chunk_size();
1049
1050        let payload = if size == 0 {
1051            AttachmentPayload::Chunked(0)
1052        } else if let Some(stored_key) = item.stored_key() {
1053            AttachmentPayload::Stored(stored_key.into())
1054        } else if send_individual_attachments && size < max_chunk_size {
1055            // When sending individual attachments, and we have a single chunk, we want to send the
1056            // `data` inline in the `attachment` message.
1057            // This avoids a needless roundtrip through the attachments cache on the Sentry side.
1058            AttachmentPayload::Inline(payload)
1059        } else {
1060            let mut chunk_index = 0;
1061            let mut offset = 0;
1062            // This skips chunks for empty attachments. The consumer does not require chunks for
1063            // empty attachments. `chunks` will be `0` in this case.
1064            while offset < size {
1065                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
1066                let chunk_message = AttachmentChunkKafkaMessage {
1067                    payload: payload.slice(offset..offset + chunk_size),
1068                    event_id,
1069                    project_id,
1070                    id: id.clone(),
1071                    chunk_index,
1072                    org_id,
1073                };
1074
1075                self.produce(
1076                    KafkaTopic::Attachments,
1077                    KafkaMessage::AttachmentChunk(chunk_message),
1078                )?;
1079                offset += chunk_size;
1080                chunk_index += 1;
1081            }
1082
1083            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
1084            // is one larger than the last chunk, so it is equal to the number of chunks.
1085            AttachmentPayload::Chunked(chunk_index)
1086        };
1087
1088        Ok(ChunkedAttachment {
1089            id,
1090            name: match item.filename() {
1091                Some(name) => name.to_owned(),
1092                None => UNNAMED_ATTACHMENT.to_owned(),
1093            },
1094            rate_limited: item.rate_limited(),
1095            content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
1096            attachment_type: item.attachment_type().unwrap_or_default(),
1097            size,
1098            retention_days,
1099            payload,
1100        })
1101    }
1102
1103    /// Produces Kafka messages for the content and metadata of an attachment item.
1104    ///
1105    /// The `send_individual_attachments` controls whether the metadata of an attachment
1106    /// is produced directly as an individual `attachment` message, or returned from this function
1107    /// to be later sent as part of an `event` message.
1108    ///
1109    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
1110    /// unless the `send_individual_attachments` flag is set, and the content is small enough
1111    /// to fit inside a message.
1112    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
1113    /// of the `attachment` message instead.
1114    fn produce_attachment(
1115        &self,
1116        event_id: EventId,
1117        project_id: ProjectId,
1118        org_id: OrganizationId,
1119        item: &Item,
1120        send_individual_attachments: bool,
1121        retention_days: u16,
1122    ) -> Result<Option<ChunkedAttachment>, StoreError> {
1123        let attachment = if item.is_attachment_ref() {
1124            self.chunked_attachment_from_placeholder(item, retention_days)
1125        } else {
1126            self.chunked_attachment_from_attachment(
1127                event_id,
1128                project_id,
1129                org_id,
1130                item,
1131                send_individual_attachments,
1132                retention_days,
1133            )
1134        }?;
1135
1136        if send_individual_attachments {
1137            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
1138                event_id,
1139                project_id,
1140                attachment,
1141                org_id,
1142            });
1143            self.produce(KafkaTopic::Attachments, message)?;
1144            Ok(None)
1145        } else {
1146            Ok(Some(attachment))
1147        }
1148    }
1149
1150    fn produce_user_report(
1151        &self,
1152        event_id: EventId,
1153        project_id: ProjectId,
1154        org_id: OrganizationId,
1155        received_at: DateTime<Utc>,
1156        item: &Item,
1157    ) -> Result<(), StoreError> {
1158        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
1159            project_id,
1160            event_id,
1161            start_time: safe_timestamp(received_at),
1162            payload: item.payload(),
1163            org_id,
1164        });
1165
1166        self.produce(KafkaTopic::Attachments, message)
1167    }
1168
1169    fn produce_user_report_v2(
1170        &self,
1171        event_id: EventId,
1172        project_id: ProjectId,
1173        org_id: OrganizationId,
1174        received_at: DateTime<Utc>,
1175        item: &Item,
1176        remote_addr: Option<String>,
1177    ) -> Result<(), StoreError> {
1178        let message = KafkaMessage::Event(EventKafkaMessage {
1179            project_id,
1180            event_id,
1181            payload: item.payload(),
1182            start_time: safe_timestamp(received_at),
1183            remote_addr,
1184            attachments: vec![],
1185            org_id,
1186        });
1187        self.produce(KafkaTopic::Feedback, message)
1188    }
1189
1190    fn send_metric_message(
1191        &self,
1192        namespace: MetricNamespace,
1193        message: MetricKafkaMessage,
1194    ) -> Result<(), StoreError> {
1195        let topic = match namespace {
1196            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1197            MetricNamespace::Outcomes => {
1198                return self.send_metric_based_outcome(message);
1199            }
1200            MetricNamespace::Unsupported => {
1201                relay_log::error!(
1202                    metric_message.name = message.name.as_ref(),
1203                    "store service dropping unknown metric usecase"
1204                );
1205                return Ok(());
1206            }
1207            _ => KafkaTopic::MetricsGeneric,
1208        };
1209
1210        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1211        self.produce(topic, KafkaMessage::Metric { headers, message })?;
1212        Ok(())
1213    }
1214
1215    fn send_metric_based_outcome(&self, message: MetricKafkaMessage) -> Result<(), StoreError> {
1216        let Some(outcome) = outcome::metric::to_outcome_id(message.name) else {
1217            relay_log::error!(
1218                mri = message.name.as_ref(),
1219                "invalid outcome metric, cannot infer outcome id from metric name"
1220            );
1221            return Ok(());
1222        };
1223        let quantity = match message.value {
1224            MetricValue::Counter(c) => c.to_f64() as u32,
1225            v => {
1226                relay_log::error!(
1227                    mri = message.name.as_ref(),
1228                    "invalid outcome metric, expected a counter got '{}'",
1229                    v.variant()
1230                );
1231                return Ok(());
1232            }
1233        };
1234
1235        let outcome = OutcomeMessage {
1236            timestamp: message
1237                .timestamp
1238                .as_datetime()
1239                .unwrap_or_else(Utc::now)
1240                .to_rfc3339_opts(SecondsFormat::Micros, true),
1241            org_id: Some(message.org_id).filter(|id| id.value() != 0),
1242            project_id: message.project_id,
1243            key_id: message.key_id,
1244            outcome,
1245            reason: message.tags.get("reason").map(|s| s.as_str()),
1246            event_id: message.tags.get("event_id").map(|s| s.as_str()),
1247            remote_addr: message.tags.get("remote_addr").map(|s| s.as_str()),
1248            source: message.tags.get("source").map(|s| s.as_str()),
1249            category: message.tags.get("category").and_then(|s| s.parse().ok()),
1250            quantity: Some(quantity),
1251        };
1252
1253        let topic = match outcome.outcome.is_billing() {
1254            true => KafkaTopic::OutcomesBilling,
1255            false => KafkaTopic::Outcomes,
1256        };
1257
1258        self.produce(topic, KafkaMessage::Outcome(outcome))
1259    }
1260
1261    fn produce_profile(
1262        &self,
1263        organization_id: OrganizationId,
1264        project_id: ProjectId,
1265        key_id: Option<u64>,
1266        received_at: DateTime<Utc>,
1267        retention_days: u16,
1268        item: &Item,
1269    ) -> Result<(), StoreError> {
1270        let message = ProfileKafkaMessage {
1271            organization_id,
1272            project_id,
1273            key_id,
1274            received: safe_timestamp(received_at),
1275            retention_days,
1276            headers: BTreeMap::from([
1277                (
1278                    "sampled".to_owned(),
1279                    if item.sampled() { "true" } else { "false" }.to_owned(),
1280                ),
1281                ("project_id".to_owned(), project_id.to_string()),
1282            ]),
1283            payload: item.payload(),
1284        };
1285        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1286        Ok(())
1287    }
1288
1289    fn produce_check_in(
1290        &self,
1291        project_id: ProjectId,
1292        org_id: OrganizationId,
1293        received_at: DateTime<Utc>,
1294        client: Option<&str>,
1295        retention_days: u16,
1296        item: &Item,
1297    ) -> Result<(), StoreError> {
1298        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1299            message_type: CheckInMessageType::CheckIn,
1300            project_id,
1301            retention_days,
1302            start_time: safe_timestamp(received_at),
1303            sdk: client.map(str::to_owned),
1304            payload: item.payload(),
1305            routing_key_hint: item.routing_hint(),
1306            org_id,
1307        });
1308
1309        self.produce(KafkaTopic::Monitors, message)?;
1310
1311        Ok(())
1312    }
1313}
1314
1315impl Service for StoreService {
1316    type Interface = Store;
1317
1318    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1319        let this = Arc::new(self);
1320
1321        relay_log::info!("store forwarder started");
1322
1323        while let Some(message) = rx.recv().await {
1324            let task = StoreTask {
1325                service: Arc::clone(&this),
1326                message: Some(message),
1327            };
1328            this.pool.spawn_async(task).await;
1329        }
1330
1331        relay_log::info!("store forwarder stopped");
1332    }
1333}
1334
1335/// Task executed by [`StoreService`].
1336pub struct StoreTask {
1337    service: Arc<StoreService>,
1338    message: Option<Store>,
1339}
1340
1341impl Future for StoreTask {
1342    type Output = ();
1343
1344    fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> task::Poll<Self::Output> {
1345        let message = self
1346            .message
1347            .take()
1348            .expect("StoreTask polled after completion");
1349        let () = relay_log::with_scope(|_| {}, || self.service.handle_message(message));
1350        task::Poll::Ready(())
1351    }
1352}
1353
1354/// This signifies how the attachment payload is being transfered.
1355#[derive(Debug, Serialize)]
1356enum AttachmentPayload {
1357    /// The payload has been split into multiple chunks.
1358    ///
1359    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1360    /// If the payload `size == 0`, the number of chunks will also be `0`.
1361    #[serde(rename = "chunks")]
1362    Chunked(usize),
1363
1364    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1365    #[serde(rename = "data")]
1366    Inline(Bytes),
1367
1368    /// The attachment has already been stored into the objectstore, with the given Id.
1369    #[serde(rename = "stored_id")]
1370    Stored(String),
1371}
1372
1373/// Common attributes for both standalone attachments and processing-relevant attachments.
1374#[derive(Debug, Serialize)]
1375struct ChunkedAttachment {
1376    /// The attachment ID within the event.
1377    ///
1378    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1379    id: String,
1380
1381    /// File name of the attachment file.
1382    name: String,
1383
1384    /// Whether this attachment was rate limited and should be removed after processing.
1385    ///
1386    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1387    /// native crash reports still need to be retained. These attachments are marked with the
1388    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1389    /// not be persisted after processing.
1390    rate_limited: bool,
1391
1392    /// Content type of the attachment payload.
1393    #[serde(skip_serializing_if = "Option::is_none")]
1394    content_type: Option<String>,
1395
1396    /// The Sentry-internal attachment type used in the processing pipeline.
1397    #[serde(serialize_with = "serialize_attachment_type")]
1398    attachment_type: AttachmentType,
1399
1400    /// The size of the attachment in bytes.
1401    size: usize,
1402
1403    /// The retention in days for this attachment.
1404    retention_days: u16,
1405
1406    /// The attachment payload, chunked, inlined, or already stored.
1407    #[serde(flatten)]
1408    payload: AttachmentPayload,
1409}
1410
1411/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1412///
1413/// Cannot serialize bytes.
1414///
1415/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1416fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1417where
1418    S: serde::Serializer,
1419    T: serde::Serialize,
1420{
1421    serde_json::to_value(t)
1422        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1423        .serialize(serializer)
1424}
1425
1426/// Container payload for event messages.
1427#[derive(Debug, Serialize)]
1428struct EventKafkaMessage {
1429    /// Raw event payload.
1430    payload: Bytes,
1431    /// Time at which the event was received by Relay.
1432    start_time: u64,
1433    /// The event id.
1434    event_id: EventId,
1435    /// The project id for the current event.
1436    project_id: ProjectId,
1437    /// The client ip address.
1438    remote_addr: Option<String>,
1439    /// Attachments that are potentially relevant for processing.
1440    attachments: Vec<ChunkedAttachment>,
1441
1442    /// Used for [`KafkaMessage::key`]
1443    #[serde(skip)]
1444    org_id: OrganizationId,
1445}
1446
1447/// Container payload for chunks of attachments.
1448#[derive(Debug, Serialize)]
1449struct AttachmentChunkKafkaMessage {
1450    /// Chunk payload of the attachment.
1451    payload: Bytes,
1452    /// The event id.
1453    event_id: EventId,
1454    /// The project id for the current event.
1455    project_id: ProjectId,
1456    /// The attachment ID within the event.
1457    ///
1458    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1459    id: String,
1460    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1461    chunk_index: usize,
1462
1463    /// Used for [`KafkaMessage::key`]
1464    #[serde(skip)]
1465    org_id: OrganizationId,
1466}
1467
1468/// A "standalone" attachment.
1469///
1470/// Still belongs to an event but can be sent independently (like UserReport) and is not
1471/// considered in processing.
1472#[derive(Debug, Serialize)]
1473struct AttachmentKafkaMessage {
1474    /// The event id.
1475    event_id: EventId,
1476    /// The project id for the current event.
1477    project_id: ProjectId,
1478    /// The attachment.
1479    attachment: ChunkedAttachment,
1480
1481    /// Used for [`KafkaMessage::key`]
1482    #[serde(skip)]
1483    org_id: OrganizationId,
1484}
1485
1486#[derive(Debug, Serialize)]
1487struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1488    replay_id: EventId,
1489    key_id: Option<u64>,
1490    org_id: OrganizationId,
1491    project_id: ProjectId,
1492    received: u64,
1493    retention_days: u16,
1494    #[serde(with = "serde_bytes")]
1495    payload: &'a [u8],
1496    #[serde(with = "serde_bytes")]
1497    replay_event: Option<&'a [u8]>,
1498    #[serde(with = "serde_bytes")]
1499    replay_video: Option<&'a [u8]>,
1500    relay_snuba_publish_disabled: bool,
1501}
1502
1503/// User report for an event wrapped up in a message ready for consumption in Kafka.
1504///
1505/// Is always independent of an event and can be sent as part of any envelope.
1506#[derive(Debug, Serialize)]
1507struct UserReportKafkaMessage {
1508    /// The project id for the current event.
1509    project_id: ProjectId,
1510    start_time: u64,
1511    payload: Bytes,
1512
1513    /// Used for [`KafkaMessage::key`]
1514    #[serde(skip)]
1515    event_id: EventId,
1516    /// Used for [`KafkaMessage::key`]
1517    #[serde(skip)]
1518    org_id: OrganizationId,
1519}
1520
1521#[derive(Clone, Debug, Serialize)]
1522struct MetricKafkaMessage<'a> {
1523    org_id: OrganizationId,
1524    project_id: ProjectId,
1525    #[serde(skip)]
1526    key_id: Option<u64>,
1527    name: &'a MetricName,
1528    #[serde(flatten)]
1529    value: MetricValue<'a>,
1530    timestamp: UnixTimestamp,
1531    tags: &'a BTreeMap<String, String>,
1532    retention_days: u16,
1533    #[serde(skip_serializing_if = "Option::is_none")]
1534    received_at: Option<UnixTimestamp>,
1535}
1536
1537#[derive(Clone, Debug, Serialize)]
1538#[serde(tag = "type", content = "value")]
1539enum MetricValue<'a> {
1540    #[serde(rename = "c")]
1541    Counter(FiniteF64),
1542    #[serde(rename = "d")]
1543    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1544    #[serde(rename = "s")]
1545    Set(ArrayEncoding<'a, SetView<'a>>),
1546    #[serde(rename = "g")]
1547    Gauge(GaugeValue),
1548}
1549
1550impl MetricValue<'_> {
1551    fn variant(&self) -> &'static str {
1552        match self {
1553            Self::Counter(_) => "counter",
1554            Self::Distribution(_) => "distribution",
1555            Self::Set(_) => "set",
1556            Self::Gauge(_) => "gauge",
1557        }
1558    }
1559
1560    fn encoding(&self) -> Option<&'static str> {
1561        match self {
1562            Self::Distribution(ae) => Some(ae.name()),
1563            Self::Set(ae) => Some(ae.name()),
1564            _ => None,
1565        }
1566    }
1567}
1568
1569/// Raw representation of an outcome for Kafka.
1570#[derive(Debug, Serialize, Clone)]
1571pub struct OutcomeMessage<'a> {
1572    /// The timespan of the event outcome.
1573    timestamp: String,
1574    /// Organization id.
1575    #[serde(skip_serializing_if = "Option::is_none")]
1576    org_id: Option<OrganizationId>,
1577    /// Project id.
1578    project_id: ProjectId,
1579    /// The DSN project key id.
1580    #[serde(skip_serializing_if = "Option::is_none")]
1581    key_id: Option<u64>,
1582    /// The outcome.
1583    outcome: OutcomeId,
1584    /// Reason for the outcome.
1585    #[serde(skip_serializing_if = "Option::is_none")]
1586    reason: Option<&'a str>,
1587    /// The event id.
1588    #[serde(skip_serializing_if = "Option::is_none")]
1589    event_id: Option<&'a str>,
1590    /// The client ip address.
1591    #[serde(skip_serializing_if = "Option::is_none")]
1592    remote_addr: Option<&'a str>,
1593    /// The source of the outcome (which Relay sent it)
1594    #[serde(skip_serializing_if = "Option::is_none")]
1595    source: Option<&'a str>,
1596    /// The event's data category.
1597    #[serde(skip_serializing_if = "Option::is_none")]
1598    category: Option<u8>,
1599    /// The number of events or total attachment size in bytes.
1600    #[serde(skip_serializing_if = "Option::is_none")]
1601    quantity: Option<u32>,
1602}
1603
1604#[derive(Clone, Debug, Serialize)]
1605struct ProfileKafkaMessage {
1606    organization_id: OrganizationId,
1607    project_id: ProjectId,
1608    key_id: Option<u64>,
1609    received: u64,
1610    retention_days: u16,
1611    #[serde(skip)]
1612    headers: BTreeMap<String, String>,
1613    payload: Bytes,
1614}
1615
1616/// Used to discriminate cron monitor ingestion messages.
1617///
1618/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1619/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1620/// intended to ensure the clock continues to run even when ingestion volume drops.
1621#[allow(dead_code)]
1622#[derive(Debug, Serialize)]
1623#[serde(rename_all = "snake_case")]
1624enum CheckInMessageType {
1625    ClockPulse,
1626    CheckIn,
1627}
1628
1629#[derive(Debug, Serialize)]
1630struct CheckInKafkaMessage {
1631    /// Used by the consumer to discrinminate the message.
1632    message_type: CheckInMessageType,
1633    /// Raw event payload.
1634    payload: Bytes,
1635    /// Time at which the event was received by Relay.
1636    start_time: u64,
1637    /// The SDK client which produced the event.
1638    sdk: Option<String>,
1639    /// The project id for the current event.
1640    project_id: ProjectId,
1641    /// Number of days to retain.
1642    retention_days: u16,
1643
1644    /// Used for [`KafkaMessage::key`]
1645    #[serde(skip)]
1646    routing_key_hint: Option<Uuid>,
1647    /// Used for [`KafkaMessage::key`]
1648    #[serde(skip)]
1649    org_id: OrganizationId,
1650}
1651
1652#[derive(Debug, Serialize)]
1653struct SpanKafkaMessage<'a> {
1654    #[serde(flatten)]
1655    meta: SpanMeta,
1656    #[serde(flatten)]
1657    span: SerializableAnnotated<'a, SpanV2>,
1658}
1659
1660#[derive(Debug, Serialize)]
1661struct SpanMeta {
1662    organization_id: OrganizationId,
1663    project_id: ProjectId,
1664    // Required for the buffer to emit outcomes scoped to the DSN.
1665    #[serde(skip_serializing_if = "Option::is_none")]
1666    key_id: Option<u64>,
1667    #[serde(skip_serializing_if = "Option::is_none")]
1668    event_id: Option<EventId>,
1669    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1670    received: f64,
1671    /// Number of days until these data should be deleted.
1672    retention_days: u16,
1673    /// Number of days until the downsampled version of this data should be deleted.
1674    downsampled_retention_days: u16,
1675    /// Indicates whether Relay already emitted an accepted outcome or if EAP still needs to emit it.
1676    accepted_outcome_emitted: bool,
1677    /// Whether the segment span should be used for issue detection instead of the transaction.
1678    #[serde(rename = "_performance_issues_spans", skip_serializing_if = "is_false")]
1679    performance_issues_spans: bool,
1680}
1681
1682fn is_false(val: &bool) -> bool {
1683    !val
1684}
1685
1686#[derive(Clone, Debug, Serialize)]
1687struct ProfileChunkKafkaMessage {
1688    organization_id: OrganizationId,
1689    project_id: ProjectId,
1690    received: u64,
1691    retention_days: u16,
1692    #[serde(skip)]
1693    headers: BTreeMap<String, String>,
1694    payload: Bytes,
1695}
1696
1697/// An enum over all possible ingest messages.
1698#[derive(Debug, Serialize)]
1699#[serde(tag = "type", rename_all = "snake_case")]
1700#[allow(clippy::large_enum_variant)]
1701enum KafkaMessage<'a> {
1702    Event(EventKafkaMessage),
1703    UserReport(UserReportKafkaMessage),
1704    Metric {
1705        #[serde(skip)]
1706        headers: BTreeMap<String, String>,
1707        #[serde(flatten)]
1708        message: MetricKafkaMessage<'a>,
1709    },
1710    CheckIn(CheckInKafkaMessage),
1711    Item {
1712        #[serde(skip)]
1713        headers: BTreeMap<String, String>,
1714        #[serde(skip)]
1715        item_type: TraceItemType,
1716        #[serde(skip)]
1717        message: TraceItem,
1718    },
1719    SpanV2 {
1720        #[serde(skip)]
1721        routing_key: Option<Uuid>,
1722        #[serde(skip)]
1723        headers: BTreeMap<String, String>,
1724        #[serde(flatten)]
1725        message: SpanKafkaMessage<'a>,
1726
1727        /// Used for [`KafkaMessage::key`]
1728        #[serde(skip)]
1729        org_id: OrganizationId,
1730    },
1731
1732    Attachment(AttachmentKafkaMessage),
1733    AttachmentChunk(AttachmentChunkKafkaMessage),
1734
1735    Profile(ProfileKafkaMessage),
1736    ProfileChunk(ProfileChunkKafkaMessage),
1737
1738    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1739
1740    Outcome(OutcomeMessage<'a>),
1741}
1742
1743impl KafkaMessage<'_> {
1744    /// Creates a [`KafkaMessage`] for a [`TraceItem`].
1745    fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1746        let item_type = item.item_type();
1747        KafkaMessage::Item {
1748            headers: BTreeMap::from([
1749                ("project_id".to_owned(), scoping.project_id.to_string()),
1750                ("item_type".to_owned(), (item_type as i32).to_string()),
1751            ]),
1752            message: item,
1753            item_type,
1754        }
1755    }
1756}
1757
1758impl Message for KafkaMessage<'_> {
1759    fn variant(&self) -> &'static str {
1760        match self {
1761            KafkaMessage::Event(_) => "event",
1762            KafkaMessage::UserReport(_) => "user_report",
1763            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1764                MetricNamespace::Sessions => "metric_sessions",
1765                MetricNamespace::Spans => "metric_spans",
1766                MetricNamespace::Transactions => "metric_transactions",
1767                MetricNamespace::Custom => "metric_custom",
1768                MetricNamespace::Outcomes => "metric_outcomes",
1769                MetricNamespace::Unsupported => "metric_unsupported",
1770            },
1771            KafkaMessage::CheckIn(_) => "check_in",
1772            KafkaMessage::SpanV2 { .. } => "span",
1773            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1774
1775            KafkaMessage::Attachment(_) => "attachment",
1776            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1777
1778            KafkaMessage::Profile(_) => "profile",
1779            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1780
1781            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1782
1783            KafkaMessage::Outcome(_) => "outcome",
1784        }
1785    }
1786
1787    /// Returns the partitioning key for this Kafka message determining.
1788    fn key(&self) -> Option<relay_kafka::Key> {
1789        match self {
1790            Self::Event(message) => Some((message.event_id.0, message.org_id)),
1791            Self::UserReport(message) => Some((message.event_id.0, message.org_id)),
1792            Self::SpanV2 {
1793                routing_key,
1794                org_id,
1795                ..
1796            } => routing_key.map(|r| (r, *org_id)),
1797
1798            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1799            //
1800            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1801            // recieve the routing_key_hint form their envelopes.
1802            Self::CheckIn(message) => message.routing_key_hint.map(|r| (r, message.org_id)),
1803
1804            Self::Attachment(message) => Some((message.event_id.0, message.org_id)),
1805            Self::AttachmentChunk(message) => Some((message.event_id.0, message.org_id)),
1806
1807            // Random partitioning
1808            Self::Metric { .. }
1809            | Self::Item { .. }
1810            | Self::Profile(_)
1811            | Self::ProfileChunk(_)
1812            | Self::ReplayRecordingNotChunked(_)
1813            | Self::Outcome(_) => None,
1814        }
1815        .filter(|(uuid, _)| !uuid.is_nil())
1816        .map(|(uuid, org_id)| {
1817            // mix key with org id for better paritioning in case of incidents where messages
1818            // across orgs get the same id
1819            let mut res = uuid.into_bytes();
1820            for (i, &b) in org_id.value().to_be_bytes().iter().enumerate() {
1821                res[i] ^= b;
1822            }
1823            u128::from_be_bytes(res)
1824        })
1825    }
1826
1827    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1828        match &self {
1829            KafkaMessage::Metric { headers, .. }
1830            | KafkaMessage::SpanV2 { headers, .. }
1831            | KafkaMessage::Item { headers, .. }
1832            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1833            | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1834
1835            KafkaMessage::Event(_)
1836            | KafkaMessage::UserReport(_)
1837            | KafkaMessage::CheckIn(_)
1838            | KafkaMessage::Attachment(_)
1839            | KafkaMessage::AttachmentChunk(_)
1840            | KafkaMessage::ReplayRecordingNotChunked(_)
1841            | KafkaMessage::Outcome(_) => None,
1842        }
1843    }
1844
1845    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1846        match self {
1847            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1848            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1849            KafkaMessage::Item { message, .. } => {
1850                let mut payload = Vec::new();
1851                match message.encode(&mut payload) {
1852                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1853                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1854                }
1855            }
1856            KafkaMessage::Outcome(outcome) => serialize_as_json(outcome),
1857            KafkaMessage::Event(_)
1858            | KafkaMessage::UserReport(_)
1859            | KafkaMessage::CheckIn(_)
1860            | KafkaMessage::Attachment(_)
1861            | KafkaMessage::AttachmentChunk(_)
1862            | KafkaMessage::Profile(_)
1863            | KafkaMessage::ProfileChunk(_)
1864            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1865                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1866                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1867            },
1868        }
1869    }
1870}
1871
1872fn serialize_as_json<T: serde::Serialize>(
1873    value: &T,
1874) -> Result<SerializationOutput<'_>, ClientError> {
1875    match serde_json::to_vec(value) {
1876        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1877        Err(err) => Err(ClientError::InvalidJson(err)),
1878    }
1879}
1880
1881/// Determines if the given item is considered slow.
1882///
1883/// Slow items must be routed to the `Attachments` topic.
1884fn is_slow_item(item: &Item) -> bool {
1885    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1886}
1887
1888fn bool_to_str(value: bool) -> &'static str {
1889    if value { "true" } else { "false" }
1890}
1891
1892/// Returns a safe timestamp for Kafka.
1893///
1894/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1895fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1896    let ts = timestamp.timestamp();
1897    if ts >= 0 {
1898        return ts as u64;
1899    }
1900
1901    // We assume this call can't return < 0.
1902    Utc::now().timestamp() as u64
1903}