Skip to main content

relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42use crate::services::upload::SignedLocation;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46mod sessions;
47
48/// Fallback name used for attachment items without a `filename` header.
49const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
50
51#[derive(Debug, thiserror::Error)]
52pub enum StoreError {
53    #[error("failed to send the message to kafka: {0}")]
54    SendFailed(#[from] ClientError),
55    #[error("failed to encode data: {0}")]
56    EncodingFailed(std::io::Error),
57    #[error("failed to store event because event id was missing")]
58    NoEventId,
59    #[error("invalid attachment reference")]
60    InvalidAttachmentRef,
61}
62
63impl OutcomeError for StoreError {
64    type Error = Self;
65
66    fn consume(self) -> (Option<Outcome>, Self::Error) {
67        let outcome = match self {
68            StoreError::SendFailed(_) | StoreError::EncodingFailed(_) | StoreError::NoEventId => {
69                Some(Outcome::Invalid(DiscardReason::Internal))
70            }
71            StoreError::InvalidAttachmentRef => {
72                Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
73            }
74        };
75        (outcome, self)
76    }
77}
78
79struct Producer {
80    client: KafkaClient,
81}
82
83impl Producer {
84    pub fn create(config: &Config) -> anyhow::Result<Self> {
85        let mut client_builder = KafkaClient::builder();
86
87        for topic in KafkaTopic::iter().filter(|t| {
88            // Outcomes should not be sent from the store forwarder.
89            // See `KafkaOutcomesProducer`.
90            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
91        }) {
92            let kafka_configs = config.kafka_configs(*topic)?;
93            client_builder = client_builder
94                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
95                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
96        }
97
98        Ok(Self {
99            client: client_builder.build(),
100        })
101    }
102}
103
104/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
105#[derive(Debug)]
106pub struct StoreEnvelope {
107    pub envelope: ManagedEnvelope,
108}
109
110/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
111#[derive(Clone, Debug)]
112pub struct StoreMetrics {
113    pub buckets: Vec<Bucket>,
114    pub scoping: Scoping,
115    pub retention: u16,
116}
117
118/// Publishes a log item to the Sentry core application through Kafka.
119#[derive(Debug)]
120pub struct StoreTraceItem {
121    /// The final trace item which will be produced to Kafka.
122    pub trace_item: TraceItem,
123}
124
125impl Counted for StoreTraceItem {
126    fn quantities(&self) -> Quantities {
127        self.trace_item.quantities()
128    }
129}
130
131/// Publishes a span item to the Sentry core application through Kafka.
132#[derive(Debug)]
133pub struct StoreSpanV2 {
134    /// Routing key to assign a Kafka partition.
135    pub routing_key: Option<Uuid>,
136    /// Default retention of the span.
137    pub retention_days: u16,
138    /// Downsampled retention of the span.
139    pub downsampled_retention_days: u16,
140    /// The final Sentry compatible span item.
141    pub item: SpanV2,
142}
143
144impl Counted for StoreSpanV2 {
145    fn quantities(&self) -> Quantities {
146        smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
147    }
148}
149
150/// Publishes a singular profile chunk to Kafka.
151#[derive(Debug)]
152pub struct StoreProfileChunk {
153    /// Default retention of the span.
154    pub retention_days: u16,
155    /// The serialized profile chunk payload.
156    pub payload: Bytes,
157    /// Outcome quantities associated with this profile.
158    ///
159    /// Quantities are different for backend and ui profile chunks.
160    pub quantities: Quantities,
161}
162
163impl Counted for StoreProfileChunk {
164    fn quantities(&self) -> Quantities {
165        self.quantities.clone()
166    }
167}
168
169/// A replay to be stored to Kafka.
170#[derive(Debug)]
171pub struct StoreReplay {
172    /// The event ID.
173    pub event_id: EventId,
174    /// Number of days to retain.
175    pub retention_days: u16,
176    /// The recording payload (rrweb data).
177    pub recording: Bytes,
178    /// Optional replay event payload (JSON).
179    pub event: Option<Bytes>,
180    /// Optional replay video.
181    pub video: Option<Bytes>,
182    /// Outcome quantities associated with this replay.
183    ///
184    /// Quantities are different for web and native replays.
185    pub quantities: Quantities,
186}
187
188impl Counted for StoreReplay {
189    fn quantities(&self) -> Quantities {
190        self.quantities.clone()
191    }
192}
193
194/// An attachment to be stored to Kafka.
195#[derive(Debug)]
196pub struct StoreAttachment {
197    /// The event ID.
198    pub event_id: EventId,
199    /// That attachment item.
200    pub attachment: Item,
201    /// Outcome quantities associated with this attachment.
202    pub quantities: Quantities,
203    /// Data retention in days for this attachment.
204    pub retention: u16,
205}
206
207impl Counted for StoreAttachment {
208    fn quantities(&self) -> Quantities {
209        self.quantities.clone()
210    }
211}
212
213/// A user report to be stored to Kafka.
214#[derive(Debug)]
215pub struct StoreUserReport {
216    /// The event ID.
217    pub event_id: EventId,
218    /// The user report.
219    pub report: Item,
220}
221
222impl Counted for StoreUserReport {
223    fn quantities(&self) -> Quantities {
224        smallvec::smallvec![(DataCategory::UserReportV2, 1)]
225    }
226}
227
228/// A profile to be stored to Kafka.
229#[derive(Debug)]
230pub struct StoreProfile {
231    /// Number of days to retain.
232    pub retention_days: u16,
233    /// The profile.
234    pub profile: Item,
235    /// Outcome quantities associated with this profile.
236    pub quantities: Quantities,
237}
238
239impl Counted for StoreProfile {
240    fn quantities(&self) -> Quantities {
241        self.quantities.clone()
242    }
243}
244
245/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
246pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
247
248/// Service interface for the [`StoreEnvelope`] message.
249#[derive(Debug)]
250pub enum Store {
251    /// An envelope containing a mixture of items.
252    ///
253    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
254    /// for example logs must be submitted via [`Self::TraceItem`] instead.
255    ///
256    /// Long term this variant is going to be replaced with fully typed variants of items which can
257    /// be stored instead.
258    Envelope(StoreEnvelope),
259    /// Aggregated generic metrics.
260    Metrics(StoreMetrics),
261    /// A singular [`TraceItem`].
262    TraceItem(Managed<StoreTraceItem>),
263    /// A singular Span.
264    Span(Managed<Box<StoreSpanV2>>),
265    /// A singular profile chunk.
266    ProfileChunk(Managed<StoreProfileChunk>),
267    /// A singular replay.
268    Replay(Managed<StoreReplay>),
269    /// A singular attachment.
270    Attachment(Managed<StoreAttachment>),
271    /// A singular user report.
272    UserReport(Managed<StoreUserReport>),
273    /// A single profile.
274    Profile(Managed<StoreProfile>),
275}
276
277impl Store {
278    /// Returns the name of the message variant.
279    fn variant(&self) -> &'static str {
280        match self {
281            Store::Envelope(_) => "envelope",
282            Store::Metrics(_) => "metrics",
283            Store::TraceItem(_) => "trace_item",
284            Store::Span(_) => "span",
285            Store::ProfileChunk(_) => "profile_chunk",
286            Store::Replay(_) => "replay",
287            Store::Attachment(_) => "attachment",
288            Store::UserReport(_) => "user_report",
289            Store::Profile(_) => "profile",
290        }
291    }
292}
293
294impl Interface for Store {}
295
296impl FromMessage<StoreEnvelope> for Store {
297    type Response = NoResponse;
298
299    fn from_message(message: StoreEnvelope, _: ()) -> Self {
300        Self::Envelope(message)
301    }
302}
303
304impl FromMessage<StoreMetrics> for Store {
305    type Response = NoResponse;
306
307    fn from_message(message: StoreMetrics, _: ()) -> Self {
308        Self::Metrics(message)
309    }
310}
311
312impl FromMessage<Managed<StoreTraceItem>> for Store {
313    type Response = NoResponse;
314
315    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
316        Self::TraceItem(message)
317    }
318}
319
320impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
321    type Response = NoResponse;
322
323    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
324        Self::Span(message)
325    }
326}
327
328impl FromMessage<Managed<StoreProfileChunk>> for Store {
329    type Response = NoResponse;
330
331    fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
332        Self::ProfileChunk(message)
333    }
334}
335
336impl FromMessage<Managed<StoreReplay>> for Store {
337    type Response = NoResponse;
338
339    fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
340        Self::Replay(message)
341    }
342}
343
344impl FromMessage<Managed<StoreAttachment>> for Store {
345    type Response = NoResponse;
346
347    fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
348        Self::Attachment(message)
349    }
350}
351
352impl FromMessage<Managed<StoreUserReport>> for Store {
353    type Response = NoResponse;
354
355    fn from_message(message: Managed<StoreUserReport>, _: ()) -> Self {
356        Self::UserReport(message)
357    }
358}
359
360impl FromMessage<Managed<StoreProfile>> for Store {
361    type Response = NoResponse;
362
363    fn from_message(message: Managed<StoreProfile>, _: ()) -> Self {
364        Self::Profile(message)
365    }
366}
367
368/// Service implementing the [`Store`] interface.
369pub struct StoreService {
370    pool: StoreServicePool,
371    config: Arc<Config>,
372    global_config: GlobalConfigHandle,
373    outcome_aggregator: Addr<TrackOutcome>,
374    metric_outcomes: MetricOutcomes,
375    producer: Producer,
376}
377
378impl StoreService {
379    pub fn create(
380        pool: StoreServicePool,
381        config: Arc<Config>,
382        global_config: GlobalConfigHandle,
383        outcome_aggregator: Addr<TrackOutcome>,
384        metric_outcomes: MetricOutcomes,
385    ) -> anyhow::Result<Self> {
386        let producer = Producer::create(&config)?;
387        Ok(Self {
388            pool,
389            config,
390            global_config,
391            outcome_aggregator,
392            metric_outcomes,
393            producer,
394        })
395    }
396
397    fn handle_message(&self, message: Store) {
398        // relay_kafka configures the scope
399        relay_log::with_scope(|_| {}, || self.handle_message_inner(message))
400    }
401
402    fn handle_message_inner(&self, message: Store) {
403        let ty = message.variant();
404        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
405            let result = match message {
406                Store::Envelope(message) => self.handle_store_envelope(message),
407                Store::Metrics(message) => {
408                    self.handle_store_metrics(message);
409                    Ok(())
410                }
411                Store::TraceItem(message) => self.handle_store_trace_item(message),
412                Store::Span(message) => self.handle_store_span(message),
413                Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
414                Store::Replay(message) => self.handle_store_replay(message),
415                Store::Attachment(message) => self.handle_store_attachment(message),
416                Store::UserReport(message) => self.handle_user_report(message),
417                Store::Profile(message) => self.handle_profile(message),
418            };
419            if let Err(error) = result {
420                relay_log::error!(
421                    error = &error as &dyn Error,
422                    tags.message = ty,
423                    "failed to store message"
424                );
425            }
426        })
427    }
428
429    fn handle_store_envelope(&self, message: StoreEnvelope) -> Result<(), Rejected<StoreError>> {
430        let StoreEnvelope { mut envelope } = message;
431
432        match self.store_envelope(&mut envelope) {
433            Ok(()) => {
434                envelope.accept();
435                Ok(())
436            }
437            Err(error) => {
438                // The envelope is now empty. `ManagedEnvelope` still counts items correctly
439                // through `EnvelopeSummary`, but `Managed<Box<Envelope>>` does not.
440                // -> Reject the old way and return a dummy item.
441                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
442                Err(Managed::with_meta_from(&envelope, ()).reject_err(error))
443            }
444        }
445    }
446
447    fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
448        let mut envelope = managed_envelope.take_envelope();
449        let received_at = managed_envelope.received_at();
450        let scoping = managed_envelope.scoping();
451
452        let retention = envelope.retention();
453        let downsampled_retention = envelope.downsampled_retention();
454
455        let event_id = envelope.event_id();
456        let event_item = envelope.as_mut().take_item_by(|item| {
457            matches!(
458                item.ty(),
459                ItemType::Event | ItemType::Transaction | ItemType::Security
460            )
461        });
462        let event_type = event_item.as_ref().map(|item| item.ty());
463
464        // Some error events like minidumps need all attachment chunks to be processed _before_
465        // the event payload on the consumer side. Transaction attachments do not require this ordering
466        // guarantee, so they do not have to go to the same topic as their event payload.
467        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
468            KafkaTopic::Transactions
469        } else if envelope.get_item_by(is_slow_item).is_some() {
470            KafkaTopic::Attachments
471        } else {
472            KafkaTopic::Events
473        };
474
475        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
476
477        let mut attachments = Vec::new();
478
479        for item in envelope.items() {
480            let content_type = item.content_type();
481            match item.ty() {
482                ItemType::Attachment => {
483                    if let Some(attachment) = self.produce_attachment(
484                        event_id.ok_or(StoreError::NoEventId)?,
485                        scoping.project_id,
486                        scoping.organization_id,
487                        item,
488                        send_individual_attachments,
489                        retention,
490                    )? {
491                        attachments.push(attachment);
492                    }
493                }
494                ItemType::UserReport => {
495                    debug_assert!(event_topic == KafkaTopic::Attachments);
496                    self.produce_user_report(
497                        event_id.ok_or(StoreError::NoEventId)?,
498                        scoping.project_id,
499                        scoping.organization_id,
500                        received_at,
501                        item,
502                    )?;
503                }
504                ItemType::UserReportV2 => {
505                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
506                    self.produce_user_report_v2(
507                        event_id.ok_or(StoreError::NoEventId)?,
508                        scoping.project_id,
509                        scoping.organization_id,
510                        received_at,
511                        item,
512                        remote_addr,
513                    )?;
514                }
515                ItemType::Profile => self.produce_profile(
516                    scoping.organization_id,
517                    scoping.project_id,
518                    scoping.key_id,
519                    received_at,
520                    retention,
521                    item,
522                )?,
523                ItemType::CheckIn => {
524                    let client = envelope.meta().client();
525                    self.produce_check_in(
526                        scoping.project_id,
527                        scoping.organization_id,
528                        received_at,
529                        client,
530                        retention,
531                        item,
532                    )?
533                }
534                ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span(
535                    scoping,
536                    received_at,
537                    event_id,
538                    retention,
539                    downsampled_retention,
540                    item,
541                )?,
542                ty @ ItemType::Log => {
543                    debug_assert!(
544                        false,
545                        "received {ty} through an envelope, \
546                        this item must be submitted via a specific store message instead"
547                    );
548                    relay_log::error!(
549                        tags.project_key = %scoping.project_key,
550                        "StoreService received unsupported item type '{ty}' in envelope"
551                    );
552                }
553                other => {
554                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
555                    let item_types = envelope
556                        .items()
557                        .map(|item| item.ty().as_str())
558                        .collect::<Vec<_>>();
559                    let attachment_types = envelope
560                        .items()
561                        .map(|item| {
562                            item.attachment_type()
563                                .map(|t| t.to_string())
564                                .unwrap_or_default()
565                        })
566                        .collect::<Vec<_>>();
567
568                    relay_log::with_scope(
569                        |scope| {
570                            scope.set_extra("item_types", item_types.into());
571                            scope.set_extra("attachment_types", attachment_types.into());
572                            if other == &ItemType::FormData {
573                                let payload = item.payload();
574                                let form_data_keys = FormDataIter::new(&payload)
575                                    .map(|entry| entry.key())
576                                    .collect::<Vec<_>>();
577                                scope.set_extra("form_data_keys", form_data_keys.into());
578                            }
579                        },
580                        || {
581                            relay_log::error!(
582                                tags.project_key = %scoping.project_key,
583                                tags.event_type = event_type.unwrap_or("none"),
584                                "StoreService received unexpected item type: {other}"
585                            )
586                        },
587                    )
588                }
589            }
590        }
591
592        if let Some(event_item) = event_item {
593            let event_id = event_id.ok_or(StoreError::NoEventId)?;
594            let project_id = scoping.project_id;
595            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
596
597            self.produce(
598                event_topic,
599                KafkaMessage::Event(EventKafkaMessage {
600                    payload: event_item.payload(),
601                    start_time: safe_timestamp(received_at),
602                    event_id,
603                    project_id,
604                    remote_addr,
605                    attachments,
606                    org_id: scoping.organization_id,
607                }),
608            )?;
609        } else {
610            debug_assert!(attachments.is_empty());
611        }
612
613        Ok(())
614    }
615
616    fn handle_store_metrics(&self, message: StoreMetrics) {
617        let StoreMetrics {
618            buckets,
619            scoping,
620            retention,
621        } = message;
622
623        let batch_size = self.config.metrics_max_batch_size_bytes();
624        let mut error = None;
625
626        let global_config = self.global_config.current();
627        let mut encoder = BucketEncoder::new(&global_config);
628
629        let emit_sessions_to_eap = utils::is_rolled_out(
630            scoping.organization_id.value(),
631            global_config.options.sessions_eap_rollout_rate,
632        )
633        .is_keep();
634
635        let now = UnixTimestamp::now();
636        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
637
638        for mut bucket in buckets {
639            let namespace = encoder.prepare(&mut bucket);
640
641            if let Some(received_at) = bucket.metadata.received_at {
642                let delay = now.as_secs().saturating_sub(received_at.as_secs());
643                let (total, count, max) = delay_stats.get_mut(namespace);
644                *total += delay;
645                *count += 1;
646                *max = (*max).max(delay);
647            }
648
649            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
650            // each bucket separately, we only need to split buckets that exceed the size, but not
651            // batches.
652            for view in BucketsView::new(std::slice::from_ref(&bucket))
653                .by_size(batch_size)
654                .flatten()
655            {
656                let message = self.create_metric_message(
657                    scoping.organization_id,
658                    scoping.project_id,
659                    &mut encoder,
660                    namespace,
661                    &view,
662                    retention,
663                );
664
665                let result =
666                    message.and_then(|message| self.send_metric_message(namespace, message));
667
668                let outcome = match result {
669                    Ok(()) => Outcome::Accepted,
670                    Err(e) => {
671                        error.get_or_insert(e);
672                        Outcome::Invalid(DiscardReason::Internal)
673                    }
674                };
675
676                self.metric_outcomes.track(scoping, &[view], outcome);
677            }
678
679            if emit_sessions_to_eap
680                && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
681            {
682                let message = KafkaMessage::for_item(scoping, trace_item);
683                let res = self.produce(KafkaTopic::Items, message);
684                if let Err(error) = res {
685                    relay_log::error!(
686                        error = &error as &dyn std::error::Error,
687                        "failed to produce session metrics to EAP"
688                    )
689                }
690            }
691        }
692
693        if let Some(error) = error {
694            relay_log::error!(
695                error = &error as &dyn std::error::Error,
696                "failed to produce metric buckets: {error}"
697            );
698        }
699
700        for (namespace, (total, count, max)) in delay_stats {
701            if count == 0 {
702                continue;
703            }
704            metric!(
705                counter(RelayCounters::MetricDelaySum) += total,
706                namespace = namespace.as_str()
707            );
708            metric!(
709                counter(RelayCounters::MetricDelayCount) += count,
710                namespace = namespace.as_str()
711            );
712            metric!(
713                gauge(RelayGauges::MetricDelayMax) = max,
714                namespace = namespace.as_str()
715            );
716        }
717    }
718
719    fn handle_store_trace_item(
720        &self,
721        message: Managed<StoreTraceItem>,
722    ) -> Result<(), Rejected<StoreError>> {
723        let scoping = message.scoping();
724        let received_at = message.received_at();
725
726        let eap_emits_outcomes = utils::is_rolled_out(
727            scoping.organization_id.value(),
728            self.global_config
729                .current()
730                .options
731                .eap_outcomes_rollout_rate,
732        )
733        .is_keep();
734
735        let outcomes = message.try_accept(|mut item| {
736            let outcomes = match eap_emits_outcomes {
737                true => None,
738                false => item.trace_item.outcomes.take(),
739            };
740
741            let message = KafkaMessage::for_item(scoping, item.trace_item);
742            self.produce(KafkaTopic::Items, message).map(|()| outcomes)
743        })?;
744
745        // Accepted outcomes when items have been successfully produced to rdkafka.
746        //
747        // This is only a temporary measure, long term these outcomes will be part of the trace
748        // item and emitted by Snuba to guarantee a delivery to storage.
749        if let Some(outcomes) = outcomes {
750            for (category, quantity) in outcomes.quantities() {
751                self.outcome_aggregator.send(TrackOutcome {
752                    category,
753                    event_id: None,
754                    outcome: Outcome::Accepted,
755                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
756                    remote_addr: None,
757                    scoping,
758                    timestamp: received_at,
759                });
760            }
761        }
762
763        Ok(())
764    }
765
766    fn handle_store_span(
767        &self,
768        message: Managed<Box<StoreSpanV2>>,
769    ) -> Result<(), Rejected<StoreError>> {
770        let scoping = message.scoping();
771        let received_at = message.received_at();
772
773        let relay_emits_accepted_outcome = !utils::is_rolled_out(
774            scoping.organization_id.value(),
775            self.global_config
776                .current()
777                .options
778                .eap_span_outcomes_rollout_rate,
779        )
780        .is_keep();
781
782        let meta = SpanMeta {
783            organization_id: scoping.organization_id,
784            project_id: scoping.project_id,
785            key_id: scoping.key_id,
786            event_id: None,
787            retention_days: message.retention_days,
788            downsampled_retention_days: message.downsampled_retention_days,
789            received: datetime_to_timestamp(received_at),
790            accepted_outcome_emitted: relay_emits_accepted_outcome,
791        };
792
793        message.try_accept(|span| {
794            let item = Annotated::new(span.item);
795            let message = KafkaMessage::SpanV2 {
796                routing_key: span.routing_key,
797                headers: BTreeMap::from([(
798                    "project_id".to_owned(),
799                    scoping.project_id.to_string(),
800                )]),
801                message: SpanKafkaMessage {
802                    meta,
803                    span: SerializableAnnotated(&item),
804                },
805                org_id: scoping.organization_id,
806            };
807
808            self.produce(KafkaTopic::Spans, message)
809        })?;
810
811        relay_statsd::metric!(
812            counter(RelayCounters::SpanV2Produced) += 1,
813            via = "processing"
814        );
815
816        if relay_emits_accepted_outcome {
817            // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
818            // or the segments consumer, depending on which will produce outcomes later.
819            self.outcome_aggregator.send(TrackOutcome {
820                category: DataCategory::SpanIndexed,
821                event_id: None,
822                outcome: Outcome::Accepted,
823                quantity: 1,
824                remote_addr: None,
825                scoping,
826                timestamp: received_at,
827            });
828        }
829
830        Ok(())
831    }
832
833    fn handle_store_profile_chunk(
834        &self,
835        message: Managed<StoreProfileChunk>,
836    ) -> Result<(), Rejected<StoreError>> {
837        let scoping = message.scoping();
838        let received_at = message.received_at();
839
840        message.try_accept(|message| {
841            let message = ProfileChunkKafkaMessage {
842                organization_id: scoping.organization_id,
843                project_id: scoping.project_id,
844                received: safe_timestamp(received_at),
845                retention_days: message.retention_days,
846                headers: BTreeMap::from([(
847                    "project_id".to_owned(),
848                    scoping.project_id.to_string(),
849                )]),
850                payload: message.payload,
851            };
852
853            self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
854        })
855    }
856
857    fn handle_store_replay(
858        &self,
859        message: Managed<StoreReplay>,
860    ) -> Result<(), Rejected<StoreError>> {
861        let scoping = message.scoping();
862        let received_at = message.received_at();
863
864        message.try_accept(|replay| {
865            let kafka_msg =
866                KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
867                    replay_id: replay.event_id,
868                    key_id: scoping.key_id,
869                    org_id: scoping.organization_id,
870                    project_id: scoping.project_id,
871                    received: safe_timestamp(received_at),
872                    retention_days: replay.retention_days,
873                    payload: &replay.recording,
874                    replay_event: replay.event.as_deref(),
875                    replay_video: replay.video.as_deref(),
876                    // Hardcoded to `true` to indicate to the consumer that it should always publish the
877                    // replay_event as relay no longer does it.
878                    relay_snuba_publish_disabled: true,
879                });
880            self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
881        })
882    }
883
884    fn handle_store_attachment(
885        &self,
886        message: Managed<StoreAttachment>,
887    ) -> Result<(), Rejected<StoreError>> {
888        let scoping = message.scoping();
889        message.try_accept(|attachment| {
890            let result = self.produce_attachment(
891                attachment.event_id,
892                scoping.project_id,
893                scoping.organization_id,
894                &attachment.attachment,
895                // Hardcoded to `true` since standalone attachments are 'individual attachments'.
896                true,
897                attachment.retention,
898            );
899            // Since we are sending an 'individual attachment' this function should never return a
900            // `ChunkedAttachment`.
901            debug_assert!(!matches!(result, Ok(Some(_))));
902            result.map(|_| ())
903        })
904    }
905
906    fn handle_user_report(
907        &self,
908        message: Managed<StoreUserReport>,
909    ) -> Result<(), Rejected<StoreError>> {
910        let scoping = message.scoping();
911        let received_at = message.received_at();
912
913        message.try_accept(|report| {
914            let kafka_msg = KafkaMessage::UserReport(UserReportKafkaMessage {
915                project_id: scoping.project_id,
916                event_id: report.event_id,
917                start_time: safe_timestamp(received_at),
918                payload: report.report.payload(),
919                org_id: scoping.organization_id,
920            });
921            self.produce(KafkaTopic::Attachments, kafka_msg)
922        })
923    }
924
925    fn handle_profile(&self, message: Managed<StoreProfile>) -> Result<(), Rejected<StoreError>> {
926        let scoping = message.scoping();
927        let received_at = message.received_at();
928
929        message.try_accept(|profile| {
930            self.produce_profile(
931                scoping.organization_id,
932                scoping.project_id,
933                scoping.key_id,
934                received_at,
935                profile.retention_days,
936                &profile.profile,
937            )
938        })
939    }
940
941    fn create_metric_message<'a>(
942        &self,
943        organization_id: OrganizationId,
944        project_id: ProjectId,
945        encoder: &'a mut BucketEncoder,
946        namespace: MetricNamespace,
947        view: &BucketView<'a>,
948        retention_days: u16,
949    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
950        let value = match view.value() {
951            BucketViewValue::Counter(c) => MetricValue::Counter(c),
952            BucketViewValue::Distribution(data) => MetricValue::Distribution(
953                encoder
954                    .encode_distribution(namespace, data)
955                    .map_err(StoreError::EncodingFailed)?,
956            ),
957            BucketViewValue::Set(data) => MetricValue::Set(
958                encoder
959                    .encode_set(namespace, data)
960                    .map_err(StoreError::EncodingFailed)?,
961            ),
962            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
963        };
964
965        Ok(MetricKafkaMessage {
966            org_id: organization_id,
967            project_id,
968            name: view.name(),
969            value,
970            timestamp: view.timestamp(),
971            tags: view.tags(),
972            retention_days,
973            received_at: view.metadata().received_at,
974        })
975    }
976
977    fn produce(
978        &self,
979        topic: KafkaTopic,
980        // Takes message by value to ensure it is not being produced twice.
981        message: KafkaMessage,
982    ) -> Result<(), StoreError> {
983        relay_log::trace!("Sending kafka message of type {}", message.variant());
984
985        let topic_name = self.producer.client.send_message(topic, &message)?;
986
987        match &message {
988            KafkaMessage::Metric {
989                message: metric, ..
990            } => {
991                metric!(
992                    counter(RelayCounters::ProcessingMessageProduced) += 1,
993                    event_type = message.variant(),
994                    topic = topic_name,
995                    metric_type = metric.value.variant(),
996                    metric_encoding = metric.value.encoding().unwrap_or(""),
997                );
998            }
999            KafkaMessage::ReplayRecordingNotChunked(replay) => {
1000                let has_video = replay.replay_video.is_some();
1001
1002                metric!(
1003                    counter(RelayCounters::ProcessingMessageProduced) += 1,
1004                    event_type = message.variant(),
1005                    topic = topic_name,
1006                    has_video = bool_to_str(has_video),
1007                );
1008            }
1009            message => {
1010                metric!(
1011                    counter(RelayCounters::ProcessingMessageProduced) += 1,
1012                    event_type = message.variant(),
1013                    topic = topic_name,
1014                );
1015            }
1016        }
1017
1018        Ok(())
1019    }
1020
1021    fn chunked_attachment_from_placeholder(
1022        &self,
1023        item: &Item,
1024        retention_days: u16,
1025    ) -> Result<ChunkedAttachment, StoreError> {
1026        debug_assert!(
1027            item.stored_key().is_none(),
1028            "AttachmentRef should not have been uploaded to objectstore"
1029        );
1030
1031        let payload = item.payload();
1032        let placeholder: AttachmentPlaceholder<'_> =
1033            serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?;
1034        let location = SignedLocation::try_from_str(placeholder.location)
1035            .ok_or(StoreError::InvalidAttachmentRef)?
1036            .verify(Utc::now(), &self.config)
1037            .map_err(|_| StoreError::InvalidAttachmentRef)?;
1038
1039        let store_key = location.key;
1040
1041        Ok(ChunkedAttachment {
1042            id: Uuid::new_v4().to_string(),
1043            name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(),
1044            rate_limited: item.rate_limited(),
1045            content_type: placeholder.content_type,
1046            attachment_type: item.attachment_type().unwrap_or_default(),
1047            size: item.attachment_body_size(),
1048            retention_days,
1049            payload: AttachmentPayload::Stored(store_key),
1050        })
1051    }
1052
1053    fn chunked_attachment_from_attachment(
1054        &self,
1055        event_id: EventId,
1056        project_id: ProjectId,
1057        org_id: OrganizationId,
1058        item: &Item,
1059        send_individual_attachments: bool,
1060        retention_days: u16,
1061    ) -> Result<ChunkedAttachment, StoreError> {
1062        let id = Uuid::new_v4().to_string();
1063
1064        let payload = item.payload();
1065        let size = item.len();
1066        let max_chunk_size = self.config.attachment_chunk_size();
1067
1068        let payload = if size == 0 {
1069            AttachmentPayload::Chunked(0)
1070        } else if let Some(stored_key) = item.stored_key() {
1071            AttachmentPayload::Stored(stored_key.into())
1072        } else if send_individual_attachments && size < max_chunk_size {
1073            // When sending individual attachments, and we have a single chunk, we want to send the
1074            // `data` inline in the `attachment` message.
1075            // This avoids a needless roundtrip through the attachments cache on the Sentry side.
1076            AttachmentPayload::Inline(payload)
1077        } else {
1078            let mut chunk_index = 0;
1079            let mut offset = 0;
1080            // This skips chunks for empty attachments. The consumer does not require chunks for
1081            // empty attachments. `chunks` will be `0` in this case.
1082            while offset < size {
1083                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
1084                let chunk_message = AttachmentChunkKafkaMessage {
1085                    payload: payload.slice(offset..offset + chunk_size),
1086                    event_id,
1087                    project_id,
1088                    id: id.clone(),
1089                    chunk_index,
1090                    org_id,
1091                };
1092
1093                self.produce(
1094                    KafkaTopic::Attachments,
1095                    KafkaMessage::AttachmentChunk(chunk_message),
1096                )?;
1097                offset += chunk_size;
1098                chunk_index += 1;
1099            }
1100
1101            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
1102            // is one larger than the last chunk, so it is equal to the number of chunks.
1103            AttachmentPayload::Chunked(chunk_index)
1104        };
1105
1106        Ok(ChunkedAttachment {
1107            id,
1108            name: match item.filename() {
1109                Some(name) => name.to_owned(),
1110                None => UNNAMED_ATTACHMENT.to_owned(),
1111            },
1112            rate_limited: item.rate_limited(),
1113            content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
1114            attachment_type: item.attachment_type().unwrap_or_default(),
1115            size,
1116            retention_days,
1117            payload,
1118        })
1119    }
1120
1121    /// Produces Kafka messages for the content and metadata of an attachment item.
1122    ///
1123    /// The `send_individual_attachments` controls whether the metadata of an attachment
1124    /// is produced directly as an individual `attachment` message, or returned from this function
1125    /// to be later sent as part of an `event` message.
1126    ///
1127    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
1128    /// unless the `send_individual_attachments` flag is set, and the content is small enough
1129    /// to fit inside a message.
1130    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
1131    /// of the `attachment` message instead.
1132    fn produce_attachment(
1133        &self,
1134        event_id: EventId,
1135        project_id: ProjectId,
1136        org_id: OrganizationId,
1137        item: &Item,
1138        send_individual_attachments: bool,
1139        retention_days: u16,
1140    ) -> Result<Option<ChunkedAttachment>, StoreError> {
1141        let attachment = if item.is_attachment_ref() {
1142            self.chunked_attachment_from_placeholder(item, retention_days)
1143        } else {
1144            self.chunked_attachment_from_attachment(
1145                event_id,
1146                project_id,
1147                org_id,
1148                item,
1149                send_individual_attachments,
1150                retention_days,
1151            )
1152        }?;
1153
1154        if send_individual_attachments {
1155            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
1156                event_id,
1157                project_id,
1158                attachment,
1159                org_id,
1160            });
1161            self.produce(KafkaTopic::Attachments, message)?;
1162            Ok(None)
1163        } else {
1164            Ok(Some(attachment))
1165        }
1166    }
1167
1168    fn produce_user_report(
1169        &self,
1170        event_id: EventId,
1171        project_id: ProjectId,
1172        org_id: OrganizationId,
1173        received_at: DateTime<Utc>,
1174        item: &Item,
1175    ) -> Result<(), StoreError> {
1176        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
1177            project_id,
1178            event_id,
1179            start_time: safe_timestamp(received_at),
1180            payload: item.payload(),
1181            org_id,
1182        });
1183
1184        self.produce(KafkaTopic::Attachments, message)
1185    }
1186
1187    fn produce_user_report_v2(
1188        &self,
1189        event_id: EventId,
1190        project_id: ProjectId,
1191        org_id: OrganizationId,
1192        received_at: DateTime<Utc>,
1193        item: &Item,
1194        remote_addr: Option<String>,
1195    ) -> Result<(), StoreError> {
1196        let message = KafkaMessage::Event(EventKafkaMessage {
1197            project_id,
1198            event_id,
1199            payload: item.payload(),
1200            start_time: safe_timestamp(received_at),
1201            remote_addr,
1202            attachments: vec![],
1203            org_id,
1204        });
1205        self.produce(KafkaTopic::Feedback, message)
1206    }
1207
1208    fn send_metric_message(
1209        &self,
1210        namespace: MetricNamespace,
1211        message: MetricKafkaMessage,
1212    ) -> Result<(), StoreError> {
1213        let topic = match namespace {
1214            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1215            MetricNamespace::Unsupported => {
1216                relay_log::with_scope(
1217                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
1218                    || relay_log::error!("store service dropping unknown metric usecase"),
1219                );
1220                return Ok(());
1221            }
1222            _ => KafkaTopic::MetricsGeneric,
1223        };
1224
1225        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1226        self.produce(topic, KafkaMessage::Metric { headers, message })?;
1227        Ok(())
1228    }
1229
1230    fn produce_profile(
1231        &self,
1232        organization_id: OrganizationId,
1233        project_id: ProjectId,
1234        key_id: Option<u64>,
1235        received_at: DateTime<Utc>,
1236        retention_days: u16,
1237        item: &Item,
1238    ) -> Result<(), StoreError> {
1239        let message = ProfileKafkaMessage {
1240            organization_id,
1241            project_id,
1242            key_id,
1243            received: safe_timestamp(received_at),
1244            retention_days,
1245            headers: BTreeMap::from([
1246                (
1247                    "sampled".to_owned(),
1248                    if item.sampled() { "true" } else { "false" }.to_owned(),
1249                ),
1250                ("project_id".to_owned(), project_id.to_string()),
1251            ]),
1252            payload: item.payload(),
1253        };
1254        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1255        Ok(())
1256    }
1257
1258    fn produce_check_in(
1259        &self,
1260        project_id: ProjectId,
1261        org_id: OrganizationId,
1262        received_at: DateTime<Utc>,
1263        client: Option<&str>,
1264        retention_days: u16,
1265        item: &Item,
1266    ) -> Result<(), StoreError> {
1267        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1268            message_type: CheckInMessageType::CheckIn,
1269            project_id,
1270            retention_days,
1271            start_time: safe_timestamp(received_at),
1272            sdk: client.map(str::to_owned),
1273            payload: item.payload(),
1274            routing_key_hint: item.routing_hint(),
1275            org_id,
1276        });
1277
1278        self.produce(KafkaTopic::Monitors, message)?;
1279
1280        Ok(())
1281    }
1282
1283    fn produce_span(
1284        &self,
1285        scoping: Scoping,
1286        received_at: DateTime<Utc>,
1287        event_id: Option<EventId>,
1288        retention_days: u16,
1289        downsampled_retention_days: u16,
1290        item: &Item,
1291    ) -> Result<(), StoreError> {
1292        debug_assert_eq!(item.ty(), &ItemType::Span);
1293        debug_assert_eq!(item.content_type(), Some(ContentType::Json));
1294
1295        let Scoping {
1296            organization_id,
1297            project_id,
1298            project_key: _,
1299            key_id,
1300        } = scoping;
1301
1302        let relay_emits_accepted_outcome = !utils::is_rolled_out(
1303            scoping.organization_id.value(),
1304            self.global_config
1305                .current()
1306                .options
1307                .eap_span_outcomes_rollout_rate,
1308        )
1309        .is_keep();
1310
1311        let payload = item.payload();
1312        let message = SpanKafkaMessageRaw {
1313            meta: SpanMeta {
1314                organization_id,
1315                project_id,
1316                key_id,
1317                event_id,
1318                retention_days,
1319                downsampled_retention_days,
1320                received: datetime_to_timestamp(received_at),
1321                accepted_outcome_emitted: relay_emits_accepted_outcome,
1322            },
1323            span: serde_json::from_slice(&payload)
1324                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1325        };
1326
1327        // Verify that this is a V2 span:
1328        debug_assert!(message.span.contains_key("attributes"));
1329        relay_statsd::metric!(
1330            counter(RelayCounters::SpanV2Produced) += 1,
1331            via = "envelope"
1332        );
1333
1334        self.produce(
1335            KafkaTopic::Spans,
1336            KafkaMessage::SpanRaw {
1337                routing_key: item.routing_hint(),
1338                headers: BTreeMap::from([(
1339                    "project_id".to_owned(),
1340                    scoping.project_id.to_string(),
1341                )]),
1342                message,
1343                org_id: organization_id,
1344            },
1345        )?;
1346
1347        if relay_emits_accepted_outcome {
1348            // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1349            // or the segments consumer, depending on which will produce outcomes later.
1350            self.outcome_aggregator.send(TrackOutcome {
1351                category: DataCategory::SpanIndexed,
1352                event_id: None,
1353                outcome: Outcome::Accepted,
1354                quantity: 1,
1355                remote_addr: None,
1356                scoping,
1357                timestamp: received_at,
1358            });
1359        }
1360
1361        Ok(())
1362    }
1363}
1364
1365impl Service for StoreService {
1366    type Interface = Store;
1367
1368    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1369        let this = Arc::new(self);
1370
1371        relay_log::info!("store forwarder started");
1372
1373        while let Some(message) = rx.recv().await {
1374            let service = Arc::clone(&this);
1375            // For now, we run each task synchronously, in the future we might explore how to make
1376            // the store async.
1377            this.pool
1378                .spawn_async(async move { service.handle_message(message) }.boxed())
1379                .await;
1380        }
1381
1382        relay_log::info!("store forwarder stopped");
1383    }
1384}
1385
1386/// This signifies how the attachment payload is being transfered.
1387#[derive(Debug, Serialize)]
1388enum AttachmentPayload {
1389    /// The payload has been split into multiple chunks.
1390    ///
1391    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1392    /// If the payload `size == 0`, the number of chunks will also be `0`.
1393    #[serde(rename = "chunks")]
1394    Chunked(usize),
1395
1396    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1397    #[serde(rename = "data")]
1398    Inline(Bytes),
1399
1400    /// The attachment has already been stored into the objectstore, with the given Id.
1401    #[serde(rename = "stored_id")]
1402    Stored(String),
1403}
1404
1405/// Common attributes for both standalone attachments and processing-relevant attachments.
1406#[derive(Debug, Serialize)]
1407struct ChunkedAttachment {
1408    /// The attachment ID within the event.
1409    ///
1410    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1411    id: String,
1412
1413    /// File name of the attachment file.
1414    name: String,
1415
1416    /// Whether this attachment was rate limited and should be removed after processing.
1417    ///
1418    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1419    /// native crash reports still need to be retained. These attachments are marked with the
1420    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1421    /// not be persisted after processing.
1422    rate_limited: bool,
1423
1424    /// Content type of the attachment payload.
1425    #[serde(skip_serializing_if = "Option::is_none")]
1426    content_type: Option<String>,
1427
1428    /// The Sentry-internal attachment type used in the processing pipeline.
1429    #[serde(serialize_with = "serialize_attachment_type")]
1430    attachment_type: AttachmentType,
1431
1432    /// The size of the attachment in bytes.
1433    size: usize,
1434
1435    /// The retention in days for this attachment.
1436    retention_days: u16,
1437
1438    /// The attachment payload, chunked, inlined, or already stored.
1439    #[serde(flatten)]
1440    payload: AttachmentPayload,
1441}
1442
1443/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1444///
1445/// Cannot serialize bytes.
1446///
1447/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1448fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1449where
1450    S: serde::Serializer,
1451    T: serde::Serialize,
1452{
1453    serde_json::to_value(t)
1454        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1455        .serialize(serializer)
1456}
1457
1458/// Container payload for event messages.
1459#[derive(Debug, Serialize)]
1460struct EventKafkaMessage {
1461    /// Raw event payload.
1462    payload: Bytes,
1463    /// Time at which the event was received by Relay.
1464    start_time: u64,
1465    /// The event id.
1466    event_id: EventId,
1467    /// The project id for the current event.
1468    project_id: ProjectId,
1469    /// The client ip address.
1470    remote_addr: Option<String>,
1471    /// Attachments that are potentially relevant for processing.
1472    attachments: Vec<ChunkedAttachment>,
1473
1474    /// Used for [`KafkaMessage::key`]
1475    #[serde(skip)]
1476    org_id: OrganizationId,
1477}
1478
1479/// Container payload for chunks of attachments.
1480#[derive(Debug, Serialize)]
1481struct AttachmentChunkKafkaMessage {
1482    /// Chunk payload of the attachment.
1483    payload: Bytes,
1484    /// The event id.
1485    event_id: EventId,
1486    /// The project id for the current event.
1487    project_id: ProjectId,
1488    /// The attachment ID within the event.
1489    ///
1490    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1491    id: String,
1492    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1493    chunk_index: usize,
1494
1495    /// Used for [`KafkaMessage::key`]
1496    #[serde(skip)]
1497    org_id: OrganizationId,
1498}
1499
1500/// A "standalone" attachment.
1501///
1502/// Still belongs to an event but can be sent independently (like UserReport) and is not
1503/// considered in processing.
1504#[derive(Debug, Serialize)]
1505struct AttachmentKafkaMessage {
1506    /// The event id.
1507    event_id: EventId,
1508    /// The project id for the current event.
1509    project_id: ProjectId,
1510    /// The attachment.
1511    attachment: ChunkedAttachment,
1512
1513    /// Used for [`KafkaMessage::key`]
1514    #[serde(skip)]
1515    org_id: OrganizationId,
1516}
1517
1518#[derive(Debug, Serialize)]
1519struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1520    replay_id: EventId,
1521    key_id: Option<u64>,
1522    org_id: OrganizationId,
1523    project_id: ProjectId,
1524    received: u64,
1525    retention_days: u16,
1526    #[serde(with = "serde_bytes")]
1527    payload: &'a [u8],
1528    #[serde(with = "serde_bytes")]
1529    replay_event: Option<&'a [u8]>,
1530    #[serde(with = "serde_bytes")]
1531    replay_video: Option<&'a [u8]>,
1532    relay_snuba_publish_disabled: bool,
1533}
1534
1535/// User report for an event wrapped up in a message ready for consumption in Kafka.
1536///
1537/// Is always independent of an event and can be sent as part of any envelope.
1538#[derive(Debug, Serialize)]
1539struct UserReportKafkaMessage {
1540    /// The project id for the current event.
1541    project_id: ProjectId,
1542    start_time: u64,
1543    payload: Bytes,
1544
1545    /// Used for [`KafkaMessage::key`]
1546    #[serde(skip)]
1547    event_id: EventId,
1548    /// Used for [`KafkaMessage::key`]
1549    #[serde(skip)]
1550    org_id: OrganizationId,
1551}
1552
1553#[derive(Clone, Debug, Serialize)]
1554struct MetricKafkaMessage<'a> {
1555    org_id: OrganizationId,
1556    project_id: ProjectId,
1557    name: &'a MetricName,
1558    #[serde(flatten)]
1559    value: MetricValue<'a>,
1560    timestamp: UnixTimestamp,
1561    tags: &'a BTreeMap<String, String>,
1562    retention_days: u16,
1563    #[serde(skip_serializing_if = "Option::is_none")]
1564    received_at: Option<UnixTimestamp>,
1565}
1566
1567#[derive(Clone, Debug, Serialize)]
1568#[serde(tag = "type", content = "value")]
1569enum MetricValue<'a> {
1570    #[serde(rename = "c")]
1571    Counter(FiniteF64),
1572    #[serde(rename = "d")]
1573    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1574    #[serde(rename = "s")]
1575    Set(ArrayEncoding<'a, SetView<'a>>),
1576    #[serde(rename = "g")]
1577    Gauge(GaugeValue),
1578}
1579
1580impl MetricValue<'_> {
1581    fn variant(&self) -> &'static str {
1582        match self {
1583            Self::Counter(_) => "counter",
1584            Self::Distribution(_) => "distribution",
1585            Self::Set(_) => "set",
1586            Self::Gauge(_) => "gauge",
1587        }
1588    }
1589
1590    fn encoding(&self) -> Option<&'static str> {
1591        match self {
1592            Self::Distribution(ae) => Some(ae.name()),
1593            Self::Set(ae) => Some(ae.name()),
1594            _ => None,
1595        }
1596    }
1597}
1598
1599#[derive(Clone, Debug, Serialize)]
1600struct ProfileKafkaMessage {
1601    organization_id: OrganizationId,
1602    project_id: ProjectId,
1603    key_id: Option<u64>,
1604    received: u64,
1605    retention_days: u16,
1606    #[serde(skip)]
1607    headers: BTreeMap<String, String>,
1608    payload: Bytes,
1609}
1610
1611/// Used to discriminate cron monitor ingestion messages.
1612///
1613/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1614/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1615/// intended to ensure the clock continues to run even when ingestion volume drops.
1616#[allow(dead_code)]
1617#[derive(Debug, Serialize)]
1618#[serde(rename_all = "snake_case")]
1619enum CheckInMessageType {
1620    ClockPulse,
1621    CheckIn,
1622}
1623
1624#[derive(Debug, Serialize)]
1625struct CheckInKafkaMessage {
1626    /// Used by the consumer to discrinminate the message.
1627    message_type: CheckInMessageType,
1628    /// Raw event payload.
1629    payload: Bytes,
1630    /// Time at which the event was received by Relay.
1631    start_time: u64,
1632    /// The SDK client which produced the event.
1633    sdk: Option<String>,
1634    /// The project id for the current event.
1635    project_id: ProjectId,
1636    /// Number of days to retain.
1637    retention_days: u16,
1638
1639    /// Used for [`KafkaMessage::key`]
1640    #[serde(skip)]
1641    routing_key_hint: Option<Uuid>,
1642    /// Used for [`KafkaMessage::key`]
1643    #[serde(skip)]
1644    org_id: OrganizationId,
1645}
1646
1647#[derive(Debug, Serialize)]
1648struct SpanKafkaMessageRaw<'a> {
1649    #[serde(flatten)]
1650    meta: SpanMeta,
1651    #[serde(flatten)]
1652    span: BTreeMap<&'a str, &'a RawValue>,
1653}
1654
1655#[derive(Debug, Serialize)]
1656struct SpanKafkaMessage<'a> {
1657    #[serde(flatten)]
1658    meta: SpanMeta,
1659    #[serde(flatten)]
1660    span: SerializableAnnotated<'a, SpanV2>,
1661}
1662
1663#[derive(Debug, Serialize)]
1664struct SpanMeta {
1665    organization_id: OrganizationId,
1666    project_id: ProjectId,
1667    // Required for the buffer to emit outcomes scoped to the DSN.
1668    #[serde(skip_serializing_if = "Option::is_none")]
1669    key_id: Option<u64>,
1670    #[serde(skip_serializing_if = "Option::is_none")]
1671    event_id: Option<EventId>,
1672    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1673    received: f64,
1674    /// Number of days until these data should be deleted.
1675    retention_days: u16,
1676    /// Number of days until the downsampled version of this data should be deleted.
1677    downsampled_retention_days: u16,
1678    /// Indicates whether Relay already emitted an accepted outcome or if EAP still needs to emit it.
1679    accepted_outcome_emitted: bool,
1680}
1681
1682#[derive(Clone, Debug, Serialize)]
1683struct ProfileChunkKafkaMessage {
1684    organization_id: OrganizationId,
1685    project_id: ProjectId,
1686    received: u64,
1687    retention_days: u16,
1688    #[serde(skip)]
1689    headers: BTreeMap<String, String>,
1690    payload: Bytes,
1691}
1692
1693/// An enum over all possible ingest messages.
1694#[derive(Debug, Serialize)]
1695#[serde(tag = "type", rename_all = "snake_case")]
1696#[allow(clippy::large_enum_variant)]
1697enum KafkaMessage<'a> {
1698    Event(EventKafkaMessage),
1699    UserReport(UserReportKafkaMessage),
1700    Metric {
1701        #[serde(skip)]
1702        headers: BTreeMap<String, String>,
1703        #[serde(flatten)]
1704        message: MetricKafkaMessage<'a>,
1705    },
1706    CheckIn(CheckInKafkaMessage),
1707    Item {
1708        #[serde(skip)]
1709        headers: BTreeMap<String, String>,
1710        #[serde(skip)]
1711        item_type: TraceItemType,
1712        #[serde(skip)]
1713        message: TraceItem,
1714    },
1715    SpanRaw {
1716        #[serde(skip)]
1717        routing_key: Option<Uuid>,
1718        #[serde(skip)]
1719        headers: BTreeMap<String, String>,
1720        #[serde(flatten)]
1721        message: SpanKafkaMessageRaw<'a>,
1722
1723        /// Used for [`KafkaMessage::key`]
1724        #[serde(skip)]
1725        org_id: OrganizationId,
1726    },
1727    SpanV2 {
1728        #[serde(skip)]
1729        routing_key: Option<Uuid>,
1730        #[serde(skip)]
1731        headers: BTreeMap<String, String>,
1732        #[serde(flatten)]
1733        message: SpanKafkaMessage<'a>,
1734
1735        /// Used for [`KafkaMessage::key`]
1736        #[serde(skip)]
1737        org_id: OrganizationId,
1738    },
1739
1740    Attachment(AttachmentKafkaMessage),
1741    AttachmentChunk(AttachmentChunkKafkaMessage),
1742
1743    Profile(ProfileKafkaMessage),
1744    ProfileChunk(ProfileChunkKafkaMessage),
1745
1746    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1747}
1748
1749impl KafkaMessage<'_> {
1750    /// Creates a [`KafkaMessage`] for a [`TraceItem`].
1751    fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1752        let item_type = item.item_type();
1753        KafkaMessage::Item {
1754            headers: BTreeMap::from([
1755                ("project_id".to_owned(), scoping.project_id.to_string()),
1756                ("item_type".to_owned(), (item_type as i32).to_string()),
1757            ]),
1758            message: item,
1759            item_type,
1760        }
1761    }
1762}
1763
1764impl Message for KafkaMessage<'_> {
1765    fn variant(&self) -> &'static str {
1766        match self {
1767            KafkaMessage::Event(_) => "event",
1768            KafkaMessage::UserReport(_) => "user_report",
1769            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1770                MetricNamespace::Sessions => "metric_sessions",
1771                MetricNamespace::Spans => "metric_spans",
1772                MetricNamespace::Transactions => "metric_transactions",
1773                MetricNamespace::Custom => "metric_custom",
1774                MetricNamespace::Unsupported => "metric_unsupported",
1775            },
1776            KafkaMessage::CheckIn(_) => "check_in",
1777            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1778            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1779
1780            KafkaMessage::Attachment(_) => "attachment",
1781            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1782
1783            KafkaMessage::Profile(_) => "profile",
1784            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1785
1786            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1787        }
1788    }
1789
1790    /// Returns the partitioning key for this Kafka message determining.
1791    fn key(&self) -> Option<relay_kafka::Key> {
1792        match self {
1793            Self::Event(message) => Some((message.event_id.0, message.org_id)),
1794            Self::UserReport(message) => Some((message.event_id.0, message.org_id)),
1795            Self::SpanRaw {
1796                routing_key,
1797                org_id,
1798                ..
1799            }
1800            | Self::SpanV2 {
1801                routing_key,
1802                org_id,
1803                ..
1804            } => routing_key.map(|r| (r, *org_id)),
1805
1806            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1807            //
1808            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1809            // recieve the routing_key_hint form their envelopes.
1810            Self::CheckIn(message) => message.routing_key_hint.map(|r| (r, message.org_id)),
1811
1812            Self::Attachment(message) => Some((message.event_id.0, message.org_id)),
1813            Self::AttachmentChunk(message) => Some((message.event_id.0, message.org_id)),
1814
1815            // Random partitioning
1816            Self::Metric { .. }
1817            | Self::Item { .. }
1818            | Self::Profile(_)
1819            | Self::ProfileChunk(_)
1820            | Self::ReplayRecordingNotChunked(_) => None,
1821        }
1822        .filter(|(uuid, _)| !uuid.is_nil())
1823        .map(|(uuid, org_id)| {
1824            // mix key with org id for better paritioning in case of incidents where messages
1825            // across orgs get the same id
1826            let mut res = uuid.into_bytes();
1827            for (i, &b) in org_id.value().to_be_bytes().iter().enumerate() {
1828                res[i] ^= b;
1829            }
1830            u128::from_be_bytes(res)
1831        })
1832    }
1833
1834    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1835        match &self {
1836            KafkaMessage::Metric { headers, .. }
1837            | KafkaMessage::SpanRaw { headers, .. }
1838            | KafkaMessage::SpanV2 { headers, .. }
1839            | KafkaMessage::Item { headers, .. }
1840            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1841            | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1842
1843            KafkaMessage::Event(_)
1844            | KafkaMessage::UserReport(_)
1845            | KafkaMessage::CheckIn(_)
1846            | KafkaMessage::Attachment(_)
1847            | KafkaMessage::AttachmentChunk(_)
1848            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1849        }
1850    }
1851
1852    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1853        match self {
1854            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1855            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1856            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1857            KafkaMessage::Item { message, .. } => {
1858                let mut payload = Vec::new();
1859                match message.encode(&mut payload) {
1860                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1861                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1862                }
1863            }
1864            KafkaMessage::Event(_)
1865            | KafkaMessage::UserReport(_)
1866            | KafkaMessage::CheckIn(_)
1867            | KafkaMessage::Attachment(_)
1868            | KafkaMessage::AttachmentChunk(_)
1869            | KafkaMessage::Profile(_)
1870            | KafkaMessage::ProfileChunk(_)
1871            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1872                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1873                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1874            },
1875        }
1876    }
1877}
1878
1879fn serialize_as_json<T: serde::Serialize>(
1880    value: &T,
1881) -> Result<SerializationOutput<'_>, ClientError> {
1882    match serde_json::to_vec(value) {
1883        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1884        Err(err) => Err(ClientError::InvalidJson(err)),
1885    }
1886}
1887
1888/// Determines if the given item is considered slow.
1889///
1890/// Slow items must be routed to the `Attachments` topic.
1891fn is_slow_item(item: &Item) -> bool {
1892    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1893}
1894
1895fn bool_to_str(value: bool) -> &'static str {
1896    if value { "true" } else { "false" }
1897}
1898
1899/// Returns a safe timestamp for Kafka.
1900///
1901/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1902fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1903    let ts = timestamp.timestamp();
1904    if ts >= 0 {
1905        return ts as u64;
1906    }
1907
1908    // We assume this call can't return < 0.
1909    Utc::now().timestamp() as u64
1910}
1911
1912#[cfg(test)]
1913mod tests {
1914
1915    use super::*;
1916
1917    #[test]
1918    fn disallow_outcomes() {
1919        struct TestMessage;
1920        impl relay_kafka::Message for TestMessage {
1921            fn key(&self) -> Option<relay_kafka::Key> {
1922                None
1923            }
1924
1925            fn variant(&self) -> &'static str {
1926                "test"
1927            }
1928
1929            fn headers(&self) -> Option<&BTreeMap<String, String>> {
1930                None
1931            }
1932
1933            fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1934                Ok(SerializationOutput::Json(Cow::Borrowed(
1935                    br#"{"timestamp": "foo", "outcome": 1}"#,
1936                )))
1937            }
1938        }
1939
1940        let config = Config::default();
1941        let producer = Producer::create(&config).unwrap();
1942
1943        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1944            let res = producer.client.send_message(topic, &TestMessage);
1945
1946            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1947        }
1948    }
1949}