relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
43use crate::utils::{self, FormDataIter};
44
45mod sessions;
46
47/// Fallback name used for attachment items without a `filename` header.
48const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
49
50#[derive(Debug, thiserror::Error)]
51pub enum StoreError {
52    #[error("failed to send the message to kafka: {0}")]
53    SendFailed(#[from] ClientError),
54    #[error("failed to encode data: {0}")]
55    EncodingFailed(std::io::Error),
56    #[error("failed to store event because event id was missing")]
57    NoEventId,
58}
59
60impl OutcomeError for StoreError {
61    type Error = Self;
62
63    fn consume(self) -> (Option<Outcome>, Self::Error) {
64        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
65    }
66}
67
68struct Producer {
69    client: KafkaClient,
70}
71
72impl Producer {
73    pub fn create(config: &Config) -> anyhow::Result<Self> {
74        let mut client_builder = KafkaClient::builder();
75
76        for topic in KafkaTopic::iter().filter(|t| {
77            // Outcomes should not be sent from the store forwarder.
78            // See `KafkaOutcomesProducer`.
79            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
80        }) {
81            let kafka_configs = config.kafka_configs(*topic)?;
82            client_builder = client_builder
83                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
84                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
85        }
86
87        Ok(Self {
88            client: client_builder.build(),
89        })
90    }
91}
92
93/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
94#[derive(Debug)]
95pub struct StoreEnvelope {
96    pub envelope: ManagedEnvelope,
97}
98
99/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
100#[derive(Clone, Debug)]
101pub struct StoreMetrics {
102    pub buckets: Vec<Bucket>,
103    pub scoping: Scoping,
104    pub retention: u16,
105}
106
107/// Publishes a log item to the Sentry core application through Kafka.
108#[derive(Debug)]
109pub struct StoreTraceItem {
110    /// The final trace item which will be produced to Kafka.
111    pub trace_item: TraceItem,
112}
113
114impl Counted for StoreTraceItem {
115    fn quantities(&self) -> Quantities {
116        self.trace_item.quantities()
117    }
118}
119
120/// Publishes a span item to the Sentry core application through Kafka.
121#[derive(Debug)]
122pub struct StoreSpanV2 {
123    /// Routing key to assign a Kafka partition.
124    pub routing_key: Option<Uuid>,
125    /// Default retention of the span.
126    pub retention_days: u16,
127    /// Downsampled retention of the span.
128    pub downsampled_retention_days: u16,
129    /// The final Sentry compatible span item.
130    pub item: SpanV2,
131}
132
133impl Counted for StoreSpanV2 {
134    fn quantities(&self) -> Quantities {
135        smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
136    }
137}
138
139/// Publishes a singular profile chunk to Kafka.
140#[derive(Debug)]
141pub struct StoreProfileChunk {
142    /// Default retention of the span.
143    pub retention_days: u16,
144    /// The serialized profile chunk payload.
145    pub payload: Bytes,
146    /// Outcome quantities associated with this profile.
147    ///
148    /// Quantities are different for backend and ui profile chunks.
149    pub quantities: Quantities,
150}
151
152impl Counted for StoreProfileChunk {
153    fn quantities(&self) -> Quantities {
154        self.quantities.clone()
155    }
156}
157
158/// A replay to be stored to Kafka.
159#[derive(Debug)]
160pub struct StoreReplay {
161    /// The event ID.
162    pub event_id: EventId,
163    /// Number of days to retain.
164    pub retention_days: u16,
165    /// The recording payload (rrweb data).
166    pub recording: Bytes,
167    /// Optional replay event payload (JSON).
168    pub event: Option<Bytes>,
169    /// Optional replay video.
170    pub video: Option<Bytes>,
171    /// Outcome quantities associated with this replay.
172    ///
173    /// Quantities are different for web and native replays.
174    pub quantities: Quantities,
175}
176
177impl Counted for StoreReplay {
178    fn quantities(&self) -> Quantities {
179        self.quantities.clone()
180    }
181}
182
183/// An attachment to be stored to Kafka.
184#[derive(Debug)]
185pub struct StoreAttachment {
186    /// The event ID.
187    pub event_id: EventId,
188    /// That attachment item.
189    pub attachment: Item,
190    /// Outcome quantities associated with this attachment.
191    pub quantities: Quantities,
192}
193
194impl Counted for StoreAttachment {
195    fn quantities(&self) -> Quantities {
196        self.quantities.clone()
197    }
198}
199
200/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
201pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
202
203/// Service interface for the [`StoreEnvelope`] message.
204#[derive(Debug)]
205pub enum Store {
206    /// An envelope containing a mixture of items.
207    ///
208    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
209    /// for example logs must be submitted via [`Self::TraceItem`] instead.
210    ///
211    /// Long term this variant is going to be replaced with fully typed variants of items which can
212    /// be stored instead.
213    Envelope(StoreEnvelope),
214    /// Aggregated generic metrics.
215    Metrics(StoreMetrics),
216    /// A singular [`TraceItem`].
217    TraceItem(Managed<StoreTraceItem>),
218    /// A singular Span.
219    Span(Managed<Box<StoreSpanV2>>),
220    /// A singular profile chunk.
221    ProfileChunk(Managed<StoreProfileChunk>),
222    /// A singular replay.
223    Replay(Managed<StoreReplay>),
224    /// A singular attachment.
225    Attachment(Managed<StoreAttachment>),
226}
227
228impl Store {
229    /// Returns the name of the message variant.
230    fn variant(&self) -> &'static str {
231        match self {
232            Store::Envelope(_) => "envelope",
233            Store::Metrics(_) => "metrics",
234            Store::TraceItem(_) => "trace_item",
235            Store::Span(_) => "span",
236            Store::ProfileChunk(_) => "profile_chunk",
237            Store::Replay(_) => "replay",
238            Store::Attachment(_) => "attachment",
239        }
240    }
241}
242
243impl Interface for Store {}
244
245impl FromMessage<StoreEnvelope> for Store {
246    type Response = NoResponse;
247
248    fn from_message(message: StoreEnvelope, _: ()) -> Self {
249        Self::Envelope(message)
250    }
251}
252
253impl FromMessage<StoreMetrics> for Store {
254    type Response = NoResponse;
255
256    fn from_message(message: StoreMetrics, _: ()) -> Self {
257        Self::Metrics(message)
258    }
259}
260
261impl FromMessage<Managed<StoreTraceItem>> for Store {
262    type Response = NoResponse;
263
264    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
265        Self::TraceItem(message)
266    }
267}
268
269impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
270    type Response = NoResponse;
271
272    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
273        Self::Span(message)
274    }
275}
276
277impl FromMessage<Managed<StoreProfileChunk>> for Store {
278    type Response = NoResponse;
279
280    fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
281        Self::ProfileChunk(message)
282    }
283}
284
285impl FromMessage<Managed<StoreReplay>> for Store {
286    type Response = NoResponse;
287
288    fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
289        Self::Replay(message)
290    }
291}
292
293impl FromMessage<Managed<StoreAttachment>> for Store {
294    type Response = NoResponse;
295
296    fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
297        Self::Attachment(message)
298    }
299}
300
301/// Service implementing the [`Store`] interface.
302pub struct StoreService {
303    pool: StoreServicePool,
304    config: Arc<Config>,
305    global_config: GlobalConfigHandle,
306    outcome_aggregator: Addr<TrackOutcome>,
307    metric_outcomes: MetricOutcomes,
308    producer: Producer,
309}
310
311impl StoreService {
312    pub fn create(
313        pool: StoreServicePool,
314        config: Arc<Config>,
315        global_config: GlobalConfigHandle,
316        outcome_aggregator: Addr<TrackOutcome>,
317        metric_outcomes: MetricOutcomes,
318    ) -> anyhow::Result<Self> {
319        let producer = Producer::create(&config)?;
320        Ok(Self {
321            pool,
322            config,
323            global_config,
324            outcome_aggregator,
325            metric_outcomes,
326            producer,
327        })
328    }
329
330    fn handle_message(&self, message: Store) {
331        let ty = message.variant();
332        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
333            match message {
334                Store::Envelope(message) => self.handle_store_envelope(message),
335                Store::Metrics(message) => self.handle_store_metrics(message),
336                Store::TraceItem(message) => self.handle_store_trace_item(message),
337                Store::Span(message) => self.handle_store_span(message),
338                Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
339                Store::Replay(message) => self.handle_store_replay(message),
340                Store::Attachment(message) => self.handle_store_attachment(message),
341            }
342        })
343    }
344
345    fn handle_store_envelope(&self, message: StoreEnvelope) {
346        let StoreEnvelope { mut envelope } = message;
347
348        let scoping = envelope.scoping();
349        match self.store_envelope(&mut envelope) {
350            Ok(()) => envelope.accept(),
351            Err(error) => {
352                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
353                relay_log::error!(
354                    error = &error as &dyn Error,
355                    tags.project_key = %scoping.project_key,
356                    "failed to store envelope"
357                );
358            }
359        }
360    }
361
362    fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
363        let mut envelope = managed_envelope.take_envelope();
364        let received_at = managed_envelope.received_at();
365        let scoping = managed_envelope.scoping();
366
367        let retention = envelope.retention();
368        let downsampled_retention = envelope.downsampled_retention();
369
370        let event_id = envelope.event_id();
371        let event_item = envelope.as_mut().take_item_by(|item| {
372            matches!(
373                item.ty(),
374                ItemType::Event | ItemType::Transaction | ItemType::Security
375            )
376        });
377        let event_type = event_item.as_ref().map(|item| item.ty());
378
379        // Some error events like minidumps need all attachment chunks to be processed _before_
380        // the event payload on the consumer side. Transaction attachments do not require this ordering
381        // guarantee, so they do not have to go to the same topic as their event payload.
382        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
383            KafkaTopic::Transactions
384        } else if envelope.get_item_by(is_slow_item).is_some() {
385            KafkaTopic::Attachments
386        } else {
387            KafkaTopic::Events
388        };
389
390        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
391
392        let mut attachments = Vec::new();
393
394        for item in envelope.items() {
395            let content_type = item.content_type();
396            match item.ty() {
397                ItemType::Attachment => {
398                    if let Some(attachment) = self.produce_attachment(
399                        event_id.ok_or(StoreError::NoEventId)?,
400                        scoping.project_id,
401                        item,
402                        send_individual_attachments,
403                    )? {
404                        attachments.push(attachment);
405                    }
406                }
407                ItemType::UserReport => {
408                    debug_assert!(event_topic == KafkaTopic::Attachments);
409                    self.produce_user_report(
410                        event_id.ok_or(StoreError::NoEventId)?,
411                        scoping.project_id,
412                        received_at,
413                        item,
414                    )?;
415                }
416                ItemType::UserReportV2 => {
417                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
418                    self.produce_user_report_v2(
419                        event_id.ok_or(StoreError::NoEventId)?,
420                        scoping.project_id,
421                        received_at,
422                        item,
423                        remote_addr,
424                    )?;
425                }
426                ItemType::Profile => self.produce_profile(
427                    scoping.organization_id,
428                    scoping.project_id,
429                    scoping.key_id,
430                    received_at,
431                    retention,
432                    item,
433                )?,
434                ItemType::CheckIn => {
435                    let client = envelope.meta().client();
436                    self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
437                }
438                ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span(
439                    scoping,
440                    received_at,
441                    event_id,
442                    retention,
443                    downsampled_retention,
444                    item,
445                )?,
446                ty @ ItemType::Log => {
447                    debug_assert!(
448                        false,
449                        "received {ty} through an envelope, \
450                        this item must be submitted via a specific store message instead"
451                    );
452                    relay_log::error!(
453                        tags.project_key = %scoping.project_key,
454                        "StoreService received unsupported item type '{ty}' in envelope"
455                    );
456                }
457                other => {
458                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
459                    let item_types = envelope
460                        .items()
461                        .map(|item| item.ty().as_str())
462                        .collect::<Vec<_>>();
463                    let attachment_types = envelope
464                        .items()
465                        .map(|item| {
466                            item.attachment_type()
467                                .map(|t| t.to_string())
468                                .unwrap_or_default()
469                        })
470                        .collect::<Vec<_>>();
471
472                    relay_log::with_scope(
473                        |scope| {
474                            scope.set_extra("item_types", item_types.into());
475                            scope.set_extra("attachment_types", attachment_types.into());
476                            if other == &ItemType::FormData {
477                                let payload = item.payload();
478                                let form_data_keys = FormDataIter::new(&payload)
479                                    .map(|entry| entry.key())
480                                    .collect::<Vec<_>>();
481                                scope.set_extra("form_data_keys", form_data_keys.into());
482                            }
483                        },
484                        || {
485                            relay_log::error!(
486                                tags.project_key = %scoping.project_key,
487                                tags.event_type = event_type.unwrap_or("none"),
488                                "StoreService received unexpected item type: {other}"
489                            )
490                        },
491                    )
492                }
493            }
494        }
495
496        if let Some(event_item) = event_item {
497            let event_id = event_id.ok_or(StoreError::NoEventId)?;
498            let project_id = scoping.project_id;
499            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
500
501            self.produce(
502                event_topic,
503                KafkaMessage::Event(EventKafkaMessage {
504                    payload: event_item.payload(),
505                    start_time: safe_timestamp(received_at),
506                    event_id,
507                    project_id,
508                    remote_addr,
509                    attachments,
510                }),
511            )?;
512        } else {
513            debug_assert!(attachments.is_empty());
514        }
515
516        Ok(())
517    }
518
519    fn handle_store_metrics(&self, message: StoreMetrics) {
520        let StoreMetrics {
521            buckets,
522            scoping,
523            retention,
524        } = message;
525
526        let batch_size = self.config.metrics_max_batch_size_bytes();
527        let mut error = None;
528
529        let global_config = self.global_config.current();
530        let mut encoder = BucketEncoder::new(&global_config);
531
532        let emit_sessions_to_eap = utils::is_rolled_out(
533            scoping.organization_id.value(),
534            global_config.options.sessions_eap_rollout_rate,
535        )
536        .is_keep();
537
538        let now = UnixTimestamp::now();
539        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
540
541        for mut bucket in buckets {
542            let namespace = encoder.prepare(&mut bucket);
543
544            if let Some(received_at) = bucket.metadata.received_at {
545                let delay = now.as_secs().saturating_sub(received_at.as_secs());
546                let (total, count, max) = delay_stats.get_mut(namespace);
547                *total += delay;
548                *count += 1;
549                *max = (*max).max(delay);
550            }
551
552            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
553            // each bucket separately, we only need to split buckets that exceed the size, but not
554            // batches.
555            for view in BucketsView::new(std::slice::from_ref(&bucket))
556                .by_size(batch_size)
557                .flatten()
558            {
559                let message = self.create_metric_message(
560                    scoping.organization_id,
561                    scoping.project_id,
562                    &mut encoder,
563                    namespace,
564                    &view,
565                    retention,
566                );
567
568                let result =
569                    message.and_then(|message| self.send_metric_message(namespace, message));
570
571                let outcome = match result {
572                    Ok(()) => Outcome::Accepted,
573                    Err(e) => {
574                        error.get_or_insert(e);
575                        Outcome::Invalid(DiscardReason::Internal)
576                    }
577                };
578
579                self.metric_outcomes.track(scoping, &[view], outcome);
580            }
581
582            if emit_sessions_to_eap
583                && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
584            {
585                let message = KafkaMessage::for_item(scoping, trace_item);
586                let _ = self.produce(KafkaTopic::Items, message);
587            }
588        }
589
590        if let Some(error) = error {
591            relay_log::error!(
592                error = &error as &dyn std::error::Error,
593                "failed to produce metric buckets: {error}"
594            );
595        }
596
597        for (namespace, (total, count, max)) in delay_stats {
598            if count == 0 {
599                continue;
600            }
601            metric!(
602                counter(RelayCounters::MetricDelaySum) += total,
603                namespace = namespace.as_str()
604            );
605            metric!(
606                counter(RelayCounters::MetricDelayCount) += count,
607                namespace = namespace.as_str()
608            );
609            metric!(
610                gauge(RelayGauges::MetricDelayMax) = max,
611                namespace = namespace.as_str()
612            );
613        }
614    }
615
616    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
617        let scoping = message.scoping();
618        let received_at = message.received_at();
619
620        let eap_emits_outcomes = utils::is_rolled_out(
621            scoping.organization_id.value(),
622            self.global_config
623                .current()
624                .options
625                .eap_outcomes_rollout_rate,
626        )
627        .is_keep();
628
629        let outcomes = message.try_accept(|mut item| {
630            let outcomes = match eap_emits_outcomes {
631                true => None,
632                false => item.trace_item.outcomes.take(),
633            };
634
635            let message = KafkaMessage::for_item(scoping, item.trace_item);
636            self.produce(KafkaTopic::Items, message).map(|()| outcomes)
637        });
638
639        // Accepted outcomes when items have been successfully produced to rdkafka.
640        //
641        // This is only a temporary measure, long term these outcomes will be part of the trace
642        // item and emitted by Snuba to guarantee a delivery to storage.
643        if let Ok(Some(outcomes)) = outcomes {
644            for (category, quantity) in outcomes.quantities() {
645                self.outcome_aggregator.send(TrackOutcome {
646                    category,
647                    event_id: None,
648                    outcome: Outcome::Accepted,
649                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
650                    remote_addr: None,
651                    scoping,
652                    timestamp: received_at,
653                });
654            }
655        }
656    }
657
658    fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
659        let scoping = message.scoping();
660        let received_at = message.received_at();
661
662        let relay_emits_accepted_outcome = !utils::is_rolled_out(
663            scoping.organization_id.value(),
664            self.global_config
665                .current()
666                .options
667                .eap_span_outcomes_rollout_rate,
668        )
669        .is_keep();
670
671        let meta = SpanMeta {
672            organization_id: scoping.organization_id,
673            project_id: scoping.project_id,
674            key_id: scoping.key_id,
675            event_id: None,
676            retention_days: message.retention_days,
677            downsampled_retention_days: message.downsampled_retention_days,
678            received: datetime_to_timestamp(received_at),
679            accepted_outcome_emitted: relay_emits_accepted_outcome,
680        };
681
682        let result = message.try_accept(|span| {
683            let item = Annotated::new(span.item);
684            let message = KafkaMessage::SpanV2 {
685                routing_key: span.routing_key,
686                headers: BTreeMap::from([(
687                    "project_id".to_owned(),
688                    scoping.project_id.to_string(),
689                )]),
690                message: SpanKafkaMessage {
691                    meta,
692                    span: SerializableAnnotated(&item),
693                },
694            };
695
696            self.produce(KafkaTopic::Spans, message)
697        });
698
699        if result.is_ok() {
700            relay_statsd::metric!(
701                counter(RelayCounters::SpanV2Produced) += 1,
702                via = "processing"
703            );
704
705            if relay_emits_accepted_outcome {
706                // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
707                // or the segments consumer, depending on which will produce outcomes later.
708                self.outcome_aggregator.send(TrackOutcome {
709                    category: DataCategory::SpanIndexed,
710                    event_id: None,
711                    outcome: Outcome::Accepted,
712                    quantity: 1,
713                    remote_addr: None,
714                    scoping,
715                    timestamp: received_at,
716                });
717            }
718        }
719    }
720
721    fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
722        let scoping = message.scoping();
723        let received_at = message.received_at();
724
725        let _ = message.try_accept(|message| {
726            let message = ProfileChunkKafkaMessage {
727                organization_id: scoping.organization_id,
728                project_id: scoping.project_id,
729                received: safe_timestamp(received_at),
730                retention_days: message.retention_days,
731                headers: BTreeMap::from([(
732                    "project_id".to_owned(),
733                    scoping.project_id.to_string(),
734                )]),
735                payload: message.payload,
736            };
737
738            self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
739        });
740    }
741
742    fn handle_store_replay(&self, message: Managed<StoreReplay>) {
743        let scoping = message.scoping();
744        let received_at = message.received_at();
745
746        let _ = message.try_accept(|replay| {
747            let kafka_msg =
748                KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
749                    replay_id: replay.event_id,
750                    key_id: scoping.key_id,
751                    org_id: scoping.organization_id,
752                    project_id: scoping.project_id,
753                    received: safe_timestamp(received_at),
754                    retention_days: replay.retention_days,
755                    payload: &replay.recording,
756                    replay_event: replay.event.as_deref(),
757                    replay_video: replay.video.as_deref(),
758                    // Hardcoded to `true` to indicate to the consumer that it should always publish the
759                    // replay_event as relay no longer does it.
760                    relay_snuba_publish_disabled: true,
761                });
762            self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
763        });
764    }
765
766    fn handle_store_attachment(&self, message: Managed<StoreAttachment>) {
767        let scoping = message.scoping();
768        let _ = message.try_accept(|attachment| {
769            let result = self.produce_attachment(
770                attachment.event_id,
771                scoping.project_id,
772                &attachment.attachment,
773                // Hardcoded to `true` since standalone attachments are 'individual attachments'.
774                true,
775            );
776            // Since we are sending an 'individual attachment' this function should never return a
777            // `ChunkedAttachment`.
778            debug_assert!(!matches!(result, Ok(Some(_))));
779            result
780        });
781    }
782
783    fn create_metric_message<'a>(
784        &self,
785        organization_id: OrganizationId,
786        project_id: ProjectId,
787        encoder: &'a mut BucketEncoder,
788        namespace: MetricNamespace,
789        view: &BucketView<'a>,
790        retention_days: u16,
791    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
792        let value = match view.value() {
793            BucketViewValue::Counter(c) => MetricValue::Counter(c),
794            BucketViewValue::Distribution(data) => MetricValue::Distribution(
795                encoder
796                    .encode_distribution(namespace, data)
797                    .map_err(StoreError::EncodingFailed)?,
798            ),
799            BucketViewValue::Set(data) => MetricValue::Set(
800                encoder
801                    .encode_set(namespace, data)
802                    .map_err(StoreError::EncodingFailed)?,
803            ),
804            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
805        };
806
807        Ok(MetricKafkaMessage {
808            org_id: organization_id,
809            project_id,
810            name: view.name(),
811            value,
812            timestamp: view.timestamp(),
813            tags: view.tags(),
814            retention_days,
815            received_at: view.metadata().received_at,
816        })
817    }
818
819    fn produce(
820        &self,
821        topic: KafkaTopic,
822        // Takes message by value to ensure it is not being produced twice.
823        message: KafkaMessage,
824    ) -> Result<(), StoreError> {
825        relay_log::trace!("Sending kafka message of type {}", message.variant());
826
827        let topic_name = self
828            .producer
829            .client
830            .send_message(topic, &message)
831            .inspect_err(|err| {
832                relay_log::error!(
833                    error = err as &dyn Error,
834                    tags.topic = ?topic,
835                    tags.message = message.variant(),
836                    "failed to produce to Kafka"
837                )
838            })?;
839
840        match &message {
841            KafkaMessage::Metric {
842                message: metric, ..
843            } => {
844                metric!(
845                    counter(RelayCounters::ProcessingMessageProduced) += 1,
846                    event_type = message.variant(),
847                    topic = topic_name,
848                    metric_type = metric.value.variant(),
849                    metric_encoding = metric.value.encoding().unwrap_or(""),
850                );
851            }
852            KafkaMessage::ReplayRecordingNotChunked(replay) => {
853                let has_video = replay.replay_video.is_some();
854
855                metric!(
856                    counter(RelayCounters::ProcessingMessageProduced) += 1,
857                    event_type = message.variant(),
858                    topic = topic_name,
859                    has_video = bool_to_str(has_video),
860                );
861            }
862            message => {
863                metric!(
864                    counter(RelayCounters::ProcessingMessageProduced) += 1,
865                    event_type = message.variant(),
866                    topic = topic_name,
867                );
868            }
869        }
870
871        Ok(())
872    }
873
874    /// Produces Kafka messages for the content and metadata of an attachment item.
875    ///
876    /// The `send_individual_attachments` controls whether the metadata of an attachment
877    /// is produced directly as an individual `attachment` message, or returned from this function
878    /// to be later sent as part of an `event` message.
879    ///
880    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
881    /// unless the `send_individual_attachments` flag is set, and the content is small enough
882    /// to fit inside a message.
883    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
884    /// of the `attachment` message instead.
885    fn produce_attachment(
886        &self,
887        event_id: EventId,
888        project_id: ProjectId,
889        item: &Item,
890        send_individual_attachments: bool,
891    ) -> Result<Option<ChunkedAttachment>, StoreError> {
892        let id = Uuid::new_v4().to_string();
893
894        let payload = item.payload();
895        let size = item.len();
896        let max_chunk_size = self.config.attachment_chunk_size();
897
898        let payload = if size == 0 {
899            AttachmentPayload::Chunked(0)
900        } else if let Some(stored_key) = item.stored_key() {
901            AttachmentPayload::Stored(stored_key.into())
902        } else if send_individual_attachments && size < max_chunk_size {
903            // When sending individual attachments, and we have a single chunk, we want to send the
904            // `data` inline in the `attachment` message.
905            // This avoids a needless roundtrip through the attachments cache on the Sentry side.
906            AttachmentPayload::Inline(payload)
907        } else {
908            let mut chunk_index = 0;
909            let mut offset = 0;
910            // This skips chunks for empty attachments. The consumer does not require chunks for
911            // empty attachments. `chunks` will be `0` in this case.
912            while offset < size {
913                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
914                let chunk_message = AttachmentChunkKafkaMessage {
915                    payload: payload.slice(offset..offset + chunk_size),
916                    event_id,
917                    project_id,
918                    id: id.clone(),
919                    chunk_index,
920                };
921
922                self.produce(
923                    KafkaTopic::Attachments,
924                    KafkaMessage::AttachmentChunk(chunk_message),
925                )?;
926                offset += chunk_size;
927                chunk_index += 1;
928            }
929
930            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
931            // is one larger than the last chunk, so it is equal to the number of chunks.
932            AttachmentPayload::Chunked(chunk_index)
933        };
934
935        let attachment = ChunkedAttachment {
936            id,
937            name: match item.filename() {
938                Some(name) => name.to_owned(),
939                None => UNNAMED_ATTACHMENT.to_owned(),
940            },
941            rate_limited: item.rate_limited(),
942            content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
943            attachment_type: item.attachment_type().unwrap_or_default(),
944            size,
945            payload,
946        };
947
948        if send_individual_attachments {
949            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
950                event_id,
951                project_id,
952                attachment,
953            });
954            self.produce(KafkaTopic::Attachments, message)?;
955            Ok(None)
956        } else {
957            Ok(Some(attachment))
958        }
959    }
960
961    fn produce_user_report(
962        &self,
963        event_id: EventId,
964        project_id: ProjectId,
965        received_at: DateTime<Utc>,
966        item: &Item,
967    ) -> Result<(), StoreError> {
968        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
969            project_id,
970            event_id,
971            start_time: safe_timestamp(received_at),
972            payload: item.payload(),
973        });
974
975        self.produce(KafkaTopic::Attachments, message)
976    }
977
978    fn produce_user_report_v2(
979        &self,
980        event_id: EventId,
981        project_id: ProjectId,
982        received_at: DateTime<Utc>,
983        item: &Item,
984        remote_addr: Option<String>,
985    ) -> Result<(), StoreError> {
986        let message = KafkaMessage::Event(EventKafkaMessage {
987            project_id,
988            event_id,
989            payload: item.payload(),
990            start_time: safe_timestamp(received_at),
991            remote_addr,
992            attachments: vec![],
993        });
994        self.produce(KafkaTopic::Feedback, message)
995    }
996
997    fn send_metric_message(
998        &self,
999        namespace: MetricNamespace,
1000        message: MetricKafkaMessage,
1001    ) -> Result<(), StoreError> {
1002        let topic = match namespace {
1003            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1004            MetricNamespace::Unsupported => {
1005                relay_log::with_scope(
1006                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
1007                    || relay_log::error!("store service dropping unknown metric usecase"),
1008                );
1009                return Ok(());
1010            }
1011            _ => KafkaTopic::MetricsGeneric,
1012        };
1013
1014        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1015        self.produce(topic, KafkaMessage::Metric { headers, message })?;
1016        Ok(())
1017    }
1018
1019    fn produce_profile(
1020        &self,
1021        organization_id: OrganizationId,
1022        project_id: ProjectId,
1023        key_id: Option<u64>,
1024        received_at: DateTime<Utc>,
1025        retention_days: u16,
1026        item: &Item,
1027    ) -> Result<(), StoreError> {
1028        let message = ProfileKafkaMessage {
1029            organization_id,
1030            project_id,
1031            key_id,
1032            received: safe_timestamp(received_at),
1033            retention_days,
1034            headers: BTreeMap::from([
1035                (
1036                    "sampled".to_owned(),
1037                    if item.sampled() { "true" } else { "false" }.to_owned(),
1038                ),
1039                ("project_id".to_owned(), project_id.to_string()),
1040            ]),
1041            payload: item.payload(),
1042        };
1043        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1044        Ok(())
1045    }
1046
1047    fn produce_check_in(
1048        &self,
1049        project_id: ProjectId,
1050        received_at: DateTime<Utc>,
1051        client: Option<&str>,
1052        retention_days: u16,
1053        item: &Item,
1054    ) -> Result<(), StoreError> {
1055        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1056            message_type: CheckInMessageType::CheckIn,
1057            project_id,
1058            retention_days,
1059            start_time: safe_timestamp(received_at),
1060            sdk: client.map(str::to_owned),
1061            payload: item.payload(),
1062            routing_key_hint: item.routing_hint(),
1063        });
1064
1065        self.produce(KafkaTopic::Monitors, message)?;
1066
1067        Ok(())
1068    }
1069
1070    fn produce_span(
1071        &self,
1072        scoping: Scoping,
1073        received_at: DateTime<Utc>,
1074        event_id: Option<EventId>,
1075        retention_days: u16,
1076        downsampled_retention_days: u16,
1077        item: &Item,
1078    ) -> Result<(), StoreError> {
1079        debug_assert_eq!(item.ty(), &ItemType::Span);
1080        debug_assert_eq!(item.content_type(), Some(ContentType::Json));
1081
1082        let Scoping {
1083            organization_id,
1084            project_id,
1085            project_key: _,
1086            key_id,
1087        } = scoping;
1088
1089        let relay_emits_accepted_outcome = !utils::is_rolled_out(
1090            scoping.organization_id.value(),
1091            self.global_config
1092                .current()
1093                .options
1094                .eap_span_outcomes_rollout_rate,
1095        )
1096        .is_keep();
1097
1098        let payload = item.payload();
1099        let message = SpanKafkaMessageRaw {
1100            meta: SpanMeta {
1101                organization_id,
1102                project_id,
1103                key_id,
1104                event_id,
1105                retention_days,
1106                downsampled_retention_days,
1107                received: datetime_to_timestamp(received_at),
1108                accepted_outcome_emitted: relay_emits_accepted_outcome,
1109            },
1110            span: serde_json::from_slice(&payload)
1111                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1112        };
1113
1114        // Verify that this is a V2 span:
1115        debug_assert!(message.span.contains_key("attributes"));
1116        relay_statsd::metric!(
1117            counter(RelayCounters::SpanV2Produced) += 1,
1118            via = "envelope"
1119        );
1120
1121        self.produce(
1122            KafkaTopic::Spans,
1123            KafkaMessage::SpanRaw {
1124                routing_key: item.routing_hint(),
1125                headers: BTreeMap::from([(
1126                    "project_id".to_owned(),
1127                    scoping.project_id.to_string(),
1128                )]),
1129                message,
1130            },
1131        )?;
1132
1133        if relay_emits_accepted_outcome {
1134            // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1135            // or the segments consumer, depending on which will produce outcomes later.
1136            self.outcome_aggregator.send(TrackOutcome {
1137                category: DataCategory::SpanIndexed,
1138                event_id: None,
1139                outcome: Outcome::Accepted,
1140                quantity: 1,
1141                remote_addr: None,
1142                scoping,
1143                timestamp: received_at,
1144            });
1145        }
1146
1147        Ok(())
1148    }
1149}
1150
1151impl Service for StoreService {
1152    type Interface = Store;
1153
1154    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1155        let this = Arc::new(self);
1156
1157        relay_log::info!("store forwarder started");
1158
1159        while let Some(message) = rx.recv().await {
1160            let service = Arc::clone(&this);
1161            // For now, we run each task synchronously, in the future we might explore how to make
1162            // the store async.
1163            this.pool
1164                .spawn_async(async move { service.handle_message(message) }.boxed())
1165                .await;
1166        }
1167
1168        relay_log::info!("store forwarder stopped");
1169    }
1170}
1171
1172/// This signifies how the attachment payload is being transfered.
1173#[derive(Debug, Serialize)]
1174enum AttachmentPayload {
1175    /// The payload has been split into multiple chunks.
1176    ///
1177    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1178    /// If the payload `size == 0`, the number of chunks will also be `0`.
1179    #[serde(rename = "chunks")]
1180    Chunked(usize),
1181
1182    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1183    #[serde(rename = "data")]
1184    Inline(Bytes),
1185
1186    /// The attachment has already been stored into the objectstore, with the given Id.
1187    #[serde(rename = "stored_id")]
1188    Stored(String),
1189}
1190
1191/// Common attributes for both standalone attachments and processing-relevant attachments.
1192#[derive(Debug, Serialize)]
1193struct ChunkedAttachment {
1194    /// The attachment ID within the event.
1195    ///
1196    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1197    id: String,
1198
1199    /// File name of the attachment file.
1200    name: String,
1201
1202    /// Whether this attachment was rate limited and should be removed after processing.
1203    ///
1204    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1205    /// native crash reports still need to be retained. These attachments are marked with the
1206    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1207    /// not be persisted after processing.
1208    rate_limited: bool,
1209
1210    /// Content type of the attachment payload.
1211    #[serde(skip_serializing_if = "Option::is_none")]
1212    content_type: Option<String>,
1213
1214    /// The Sentry-internal attachment type used in the processing pipeline.
1215    #[serde(serialize_with = "serialize_attachment_type")]
1216    attachment_type: AttachmentType,
1217
1218    /// The size of the attachment in bytes.
1219    size: usize,
1220
1221    /// The attachment payload, chunked, inlined, or already stored.
1222    #[serde(flatten)]
1223    payload: AttachmentPayload,
1224}
1225
1226/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1227///
1228/// Cannot serialize bytes.
1229///
1230/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1231fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1232where
1233    S: serde::Serializer,
1234    T: serde::Serialize,
1235{
1236    serde_json::to_value(t)
1237        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1238        .serialize(serializer)
1239}
1240
1241/// Container payload for event messages.
1242#[derive(Debug, Serialize)]
1243struct EventKafkaMessage {
1244    /// Raw event payload.
1245    payload: Bytes,
1246    /// Time at which the event was received by Relay.
1247    start_time: u64,
1248    /// The event id.
1249    event_id: EventId,
1250    /// The project id for the current event.
1251    project_id: ProjectId,
1252    /// The client ip address.
1253    remote_addr: Option<String>,
1254    /// Attachments that are potentially relevant for processing.
1255    attachments: Vec<ChunkedAttachment>,
1256}
1257
1258/// Container payload for chunks of attachments.
1259#[derive(Debug, Serialize)]
1260struct AttachmentChunkKafkaMessage {
1261    /// Chunk payload of the attachment.
1262    payload: Bytes,
1263    /// The event id.
1264    event_id: EventId,
1265    /// The project id for the current event.
1266    project_id: ProjectId,
1267    /// The attachment ID within the event.
1268    ///
1269    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1270    id: String,
1271    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1272    chunk_index: usize,
1273}
1274
1275/// A "standalone" attachment.
1276///
1277/// Still belongs to an event but can be sent independently (like UserReport) and is not
1278/// considered in processing.
1279#[derive(Debug, Serialize)]
1280struct AttachmentKafkaMessage {
1281    /// The event id.
1282    event_id: EventId,
1283    /// The project id for the current event.
1284    project_id: ProjectId,
1285    /// The attachment.
1286    attachment: ChunkedAttachment,
1287}
1288
1289#[derive(Debug, Serialize)]
1290struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1291    replay_id: EventId,
1292    key_id: Option<u64>,
1293    org_id: OrganizationId,
1294    project_id: ProjectId,
1295    received: u64,
1296    retention_days: u16,
1297    #[serde(with = "serde_bytes")]
1298    payload: &'a [u8],
1299    #[serde(with = "serde_bytes")]
1300    replay_event: Option<&'a [u8]>,
1301    #[serde(with = "serde_bytes")]
1302    replay_video: Option<&'a [u8]>,
1303    relay_snuba_publish_disabled: bool,
1304}
1305
1306/// User report for an event wrapped up in a message ready for consumption in Kafka.
1307///
1308/// Is always independent of an event and can be sent as part of any envelope.
1309#[derive(Debug, Serialize)]
1310struct UserReportKafkaMessage {
1311    /// The project id for the current event.
1312    project_id: ProjectId,
1313    start_time: u64,
1314    payload: Bytes,
1315
1316    // Used for KafkaMessage::key
1317    #[serde(skip)]
1318    event_id: EventId,
1319}
1320
1321#[derive(Clone, Debug, Serialize)]
1322struct MetricKafkaMessage<'a> {
1323    org_id: OrganizationId,
1324    project_id: ProjectId,
1325    name: &'a MetricName,
1326    #[serde(flatten)]
1327    value: MetricValue<'a>,
1328    timestamp: UnixTimestamp,
1329    tags: &'a BTreeMap<String, String>,
1330    retention_days: u16,
1331    #[serde(skip_serializing_if = "Option::is_none")]
1332    received_at: Option<UnixTimestamp>,
1333}
1334
1335#[derive(Clone, Debug, Serialize)]
1336#[serde(tag = "type", content = "value")]
1337enum MetricValue<'a> {
1338    #[serde(rename = "c")]
1339    Counter(FiniteF64),
1340    #[serde(rename = "d")]
1341    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1342    #[serde(rename = "s")]
1343    Set(ArrayEncoding<'a, SetView<'a>>),
1344    #[serde(rename = "g")]
1345    Gauge(GaugeValue),
1346}
1347
1348impl MetricValue<'_> {
1349    fn variant(&self) -> &'static str {
1350        match self {
1351            Self::Counter(_) => "counter",
1352            Self::Distribution(_) => "distribution",
1353            Self::Set(_) => "set",
1354            Self::Gauge(_) => "gauge",
1355        }
1356    }
1357
1358    fn encoding(&self) -> Option<&'static str> {
1359        match self {
1360            Self::Distribution(ae) => Some(ae.name()),
1361            Self::Set(ae) => Some(ae.name()),
1362            _ => None,
1363        }
1364    }
1365}
1366
1367#[derive(Clone, Debug, Serialize)]
1368struct ProfileKafkaMessage {
1369    organization_id: OrganizationId,
1370    project_id: ProjectId,
1371    key_id: Option<u64>,
1372    received: u64,
1373    retention_days: u16,
1374    #[serde(skip)]
1375    headers: BTreeMap<String, String>,
1376    payload: Bytes,
1377}
1378
1379/// Used to discriminate cron monitor ingestion messages.
1380///
1381/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1382/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1383/// intended to ensure the clock continues to run even when ingestion volume drops.
1384#[allow(dead_code)]
1385#[derive(Debug, Serialize)]
1386#[serde(rename_all = "snake_case")]
1387enum CheckInMessageType {
1388    ClockPulse,
1389    CheckIn,
1390}
1391
1392#[derive(Debug, Serialize)]
1393struct CheckInKafkaMessage {
1394    #[serde(skip)]
1395    routing_key_hint: Option<Uuid>,
1396
1397    /// Used by the consumer to discrinminate the message.
1398    message_type: CheckInMessageType,
1399    /// Raw event payload.
1400    payload: Bytes,
1401    /// Time at which the event was received by Relay.
1402    start_time: u64,
1403    /// The SDK client which produced the event.
1404    sdk: Option<String>,
1405    /// The project id for the current event.
1406    project_id: ProjectId,
1407    /// Number of days to retain.
1408    retention_days: u16,
1409}
1410
1411#[derive(Debug, Serialize)]
1412struct SpanKafkaMessageRaw<'a> {
1413    #[serde(flatten)]
1414    meta: SpanMeta,
1415    #[serde(flatten)]
1416    span: BTreeMap<&'a str, &'a RawValue>,
1417}
1418
1419#[derive(Debug, Serialize)]
1420struct SpanKafkaMessage<'a> {
1421    #[serde(flatten)]
1422    meta: SpanMeta,
1423    #[serde(flatten)]
1424    span: SerializableAnnotated<'a, SpanV2>,
1425}
1426
1427#[derive(Debug, Serialize)]
1428struct SpanMeta {
1429    organization_id: OrganizationId,
1430    project_id: ProjectId,
1431    // Required for the buffer to emit outcomes scoped to the DSN.
1432    #[serde(skip_serializing_if = "Option::is_none")]
1433    key_id: Option<u64>,
1434    #[serde(skip_serializing_if = "Option::is_none")]
1435    event_id: Option<EventId>,
1436    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1437    received: f64,
1438    /// Number of days until these data should be deleted.
1439    retention_days: u16,
1440    /// Number of days until the downsampled version of this data should be deleted.
1441    downsampled_retention_days: u16,
1442    /// Indicates whether Relay already emitted an accepted outcome or if EAP still needs to emit it.
1443    accepted_outcome_emitted: bool,
1444}
1445
1446#[derive(Clone, Debug, Serialize)]
1447struct ProfileChunkKafkaMessage {
1448    organization_id: OrganizationId,
1449    project_id: ProjectId,
1450    received: u64,
1451    retention_days: u16,
1452    #[serde(skip)]
1453    headers: BTreeMap<String, String>,
1454    payload: Bytes,
1455}
1456
1457/// An enum over all possible ingest messages.
1458#[derive(Debug, Serialize)]
1459#[serde(tag = "type", rename_all = "snake_case")]
1460#[allow(clippy::large_enum_variant)]
1461enum KafkaMessage<'a> {
1462    Event(EventKafkaMessage),
1463    UserReport(UserReportKafkaMessage),
1464    Metric {
1465        #[serde(skip)]
1466        headers: BTreeMap<String, String>,
1467        #[serde(flatten)]
1468        message: MetricKafkaMessage<'a>,
1469    },
1470    CheckIn(CheckInKafkaMessage),
1471    Item {
1472        #[serde(skip)]
1473        headers: BTreeMap<String, String>,
1474        #[serde(skip)]
1475        item_type: TraceItemType,
1476        #[serde(skip)]
1477        message: TraceItem,
1478    },
1479    SpanRaw {
1480        #[serde(skip)]
1481        routing_key: Option<Uuid>,
1482        #[serde(skip)]
1483        headers: BTreeMap<String, String>,
1484        #[serde(flatten)]
1485        message: SpanKafkaMessageRaw<'a>,
1486    },
1487    SpanV2 {
1488        #[serde(skip)]
1489        routing_key: Option<Uuid>,
1490        #[serde(skip)]
1491        headers: BTreeMap<String, String>,
1492        #[serde(flatten)]
1493        message: SpanKafkaMessage<'a>,
1494    },
1495
1496    Attachment(AttachmentKafkaMessage),
1497    AttachmentChunk(AttachmentChunkKafkaMessage),
1498
1499    Profile(ProfileKafkaMessage),
1500    ProfileChunk(ProfileChunkKafkaMessage),
1501
1502    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1503}
1504
1505impl KafkaMessage<'_> {
1506    /// Creates a [`KafkaMessage`] for a [`TraceItem`].
1507    fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1508        let item_type = item.item_type();
1509        KafkaMessage::Item {
1510            headers: BTreeMap::from([
1511                ("project_id".to_owned(), scoping.project_id.to_string()),
1512                ("item_type".to_owned(), (item_type as i32).to_string()),
1513            ]),
1514            message: item,
1515            item_type,
1516        }
1517    }
1518}
1519
1520impl Message for KafkaMessage<'_> {
1521    fn variant(&self) -> &'static str {
1522        match self {
1523            KafkaMessage::Event(_) => "event",
1524            KafkaMessage::UserReport(_) => "user_report",
1525            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1526                MetricNamespace::Sessions => "metric_sessions",
1527                MetricNamespace::Transactions => "metric_transactions",
1528                MetricNamespace::Spans => "metric_spans",
1529                MetricNamespace::Custom => "metric_custom",
1530                MetricNamespace::Unsupported => "metric_unsupported",
1531            },
1532            KafkaMessage::CheckIn(_) => "check_in",
1533            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1534            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1535
1536            KafkaMessage::Attachment(_) => "attachment",
1537            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1538
1539            KafkaMessage::Profile(_) => "profile",
1540            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1541
1542            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1543        }
1544    }
1545
1546    /// Returns the partitioning key for this Kafka message determining.
1547    fn key(&self) -> Option<relay_kafka::Key> {
1548        match self {
1549            Self::Event(message) => Some(message.event_id.0),
1550            Self::UserReport(message) => Some(message.event_id.0),
1551            Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1552
1553            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1554            //
1555            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1556            // recieve the routing_key_hint form their envelopes.
1557            Self::CheckIn(message) => message.routing_key_hint,
1558
1559            Self::Attachment(message) => Some(message.event_id.0),
1560            Self::AttachmentChunk(message) => Some(message.event_id.0),
1561
1562            // Random partitioning
1563            Self::Metric { .. }
1564            | Self::Item { .. }
1565            | Self::Profile(_)
1566            | Self::ProfileChunk(_)
1567            | Self::ReplayRecordingNotChunked(_) => None,
1568        }
1569        .filter(|uuid| !uuid.is_nil())
1570        .map(|uuid| uuid.as_u128())
1571    }
1572
1573    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1574        match &self {
1575            KafkaMessage::Metric { headers, .. }
1576            | KafkaMessage::SpanRaw { headers, .. }
1577            | KafkaMessage::SpanV2 { headers, .. }
1578            | KafkaMessage::Item { headers, .. }
1579            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1580            | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1581
1582            KafkaMessage::Event(_)
1583            | KafkaMessage::UserReport(_)
1584            | KafkaMessage::CheckIn(_)
1585            | KafkaMessage::Attachment(_)
1586            | KafkaMessage::AttachmentChunk(_)
1587            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1588        }
1589    }
1590
1591    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1592        match self {
1593            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1594            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1595            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1596            KafkaMessage::Item { message, .. } => {
1597                let mut payload = Vec::new();
1598                match message.encode(&mut payload) {
1599                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1600                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1601                }
1602            }
1603            KafkaMessage::Event(_)
1604            | KafkaMessage::UserReport(_)
1605            | KafkaMessage::CheckIn(_)
1606            | KafkaMessage::Attachment(_)
1607            | KafkaMessage::AttachmentChunk(_)
1608            | KafkaMessage::Profile(_)
1609            | KafkaMessage::ProfileChunk(_)
1610            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1611                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1612                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1613            },
1614        }
1615    }
1616}
1617
1618fn serialize_as_json<T: serde::Serialize>(
1619    value: &T,
1620) -> Result<SerializationOutput<'_>, ClientError> {
1621    match serde_json::to_vec(value) {
1622        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1623        Err(err) => Err(ClientError::InvalidJson(err)),
1624    }
1625}
1626
1627/// Determines if the given item is considered slow.
1628///
1629/// Slow items must be routed to the `Attachments` topic.
1630fn is_slow_item(item: &Item) -> bool {
1631    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1632}
1633
1634fn bool_to_str(value: bool) -> &'static str {
1635    if value { "true" } else { "false" }
1636}
1637
1638/// Returns a safe timestamp for Kafka.
1639///
1640/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1641fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1642    let ts = timestamp.timestamp();
1643    if ts >= 0 {
1644        return ts as u64;
1645    }
1646
1647    // We assume this call can't return < 0.
1648    Utc::now().timestamp() as u64
1649}
1650
1651#[cfg(test)]
1652mod tests {
1653
1654    use super::*;
1655
1656    #[test]
1657    fn disallow_outcomes() {
1658        let config = Config::default();
1659        let producer = Producer::create(&config).unwrap();
1660
1661        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1662            let res = producer
1663                .client
1664                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1665
1666            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1667        }
1668    }
1669}