relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
37use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46/// Fallback name used for attachment items without a `filename` header.
47const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
48
49#[derive(Debug, thiserror::Error)]
50pub enum StoreError {
51    #[error("failed to send the message to kafka: {0}")]
52    SendFailed(#[from] ClientError),
53    #[error("failed to encode data: {0}")]
54    EncodingFailed(std::io::Error),
55    #[error("failed to store event because event id was missing")]
56    NoEventId,
57}
58
59impl OutcomeError for StoreError {
60    type Error = Self;
61
62    fn consume(self) -> (Option<Outcome>, Self::Error) {
63        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
64    }
65}
66
67struct Producer {
68    client: KafkaClient,
69}
70
71impl Producer {
72    pub fn create(config: &Config) -> anyhow::Result<Self> {
73        let mut client_builder = KafkaClient::builder();
74
75        for topic in KafkaTopic::iter().filter(|t| {
76            // Outcomes should not be sent from the store forwarder.
77            // See `KafkaOutcomesProducer`.
78            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
79        }) {
80            let kafka_configs = config.kafka_configs(*topic)?;
81            client_builder = client_builder
82                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
83                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
84        }
85
86        Ok(Self {
87            client: client_builder.build(),
88        })
89    }
90}
91
92/// Publishes an [`Envelope`] to the Sentry core application through Kafka topics.
93#[derive(Debug)]
94pub struct StoreEnvelope {
95    pub envelope: TypedEnvelope<Processed>,
96}
97
98/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
99#[derive(Clone, Debug)]
100pub struct StoreMetrics {
101    pub buckets: Vec<Bucket>,
102    pub scoping: Scoping,
103    pub retention: u16,
104}
105
106/// Publishes a log item to the Sentry core application through Kafka.
107#[derive(Debug)]
108pub struct StoreTraceItem {
109    /// The final trace item which will be produced to Kafka.
110    pub trace_item: TraceItem,
111    /// Outcomes to be emitted when successfully producing the item to Kafka.
112    ///
113    /// Note: this is only a temporary measure, long term these outcomes will be part of the trace
114    /// item and emitted by Snuba to guarantee a delivery to storage.
115    pub quantities: Quantities,
116}
117
118impl Counted for StoreTraceItem {
119    fn quantities(&self) -> Quantities {
120        self.quantities.clone()
121    }
122}
123
124/// Publishes a span item to the Sentry core application through Kafka.
125#[derive(Debug)]
126pub struct StoreSpanV2 {
127    /// Routing key to assign a Kafka partition.
128    pub routing_key: Option<Uuid>,
129    /// Default retention of the span.
130    pub retention_days: u16,
131    /// Downsampled retention of the span.
132    pub downsampled_retention_days: u16,
133    /// The final Sentry compatible span item.
134    pub item: SpanV2,
135}
136
137impl Counted for StoreSpanV2 {
138    fn quantities(&self) -> Quantities {
139        self.item.quantities()
140    }
141}
142
143/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
144pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
145
146/// Service interface for the [`StoreEnvelope`] message.
147#[derive(Debug)]
148pub enum Store {
149    /// An envelope containing a mixture of items.
150    ///
151    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
152    /// for example logs must be submitted via [`Self::TraceItem`] instead.
153    ///
154    /// Long term this variant is going to be replaced with fully typed variants of items which can
155    /// be stored instead.
156    Envelope(StoreEnvelope),
157    /// Aggregated generic metrics.
158    Metrics(StoreMetrics),
159    /// A singular [`TraceItem`].
160    TraceItem(Managed<StoreTraceItem>),
161    /// A singular Span.
162    Span(Managed<Box<StoreSpanV2>>),
163}
164
165impl Store {
166    /// Returns the name of the message variant.
167    fn variant(&self) -> &'static str {
168        match self {
169            Store::Envelope(_) => "envelope",
170            Store::Metrics(_) => "metrics",
171            Store::TraceItem(_) => "log",
172            Store::Span(_) => "span",
173        }
174    }
175}
176
177impl Interface for Store {}
178
179impl FromMessage<StoreEnvelope> for Store {
180    type Response = NoResponse;
181
182    fn from_message(message: StoreEnvelope, _: ()) -> Self {
183        Self::Envelope(message)
184    }
185}
186
187impl FromMessage<StoreMetrics> for Store {
188    type Response = NoResponse;
189
190    fn from_message(message: StoreMetrics, _: ()) -> Self {
191        Self::Metrics(message)
192    }
193}
194
195impl FromMessage<Managed<StoreTraceItem>> for Store {
196    type Response = NoResponse;
197
198    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
199        Self::TraceItem(message)
200    }
201}
202
203impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
204    type Response = NoResponse;
205
206    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
207        Self::Span(message)
208    }
209}
210
211/// Service implementing the [`Store`] interface.
212pub struct StoreService {
213    pool: StoreServicePool,
214    config: Arc<Config>,
215    global_config: GlobalConfigHandle,
216    outcome_aggregator: Addr<TrackOutcome>,
217    metric_outcomes: MetricOutcomes,
218    producer: Producer,
219}
220
221impl StoreService {
222    pub fn create(
223        pool: StoreServicePool,
224        config: Arc<Config>,
225        global_config: GlobalConfigHandle,
226        outcome_aggregator: Addr<TrackOutcome>,
227        metric_outcomes: MetricOutcomes,
228    ) -> anyhow::Result<Self> {
229        let producer = Producer::create(&config)?;
230        Ok(Self {
231            pool,
232            config,
233            global_config,
234            outcome_aggregator,
235            metric_outcomes,
236            producer,
237        })
238    }
239
240    fn handle_message(&self, message: Store) {
241        let ty = message.variant();
242        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
243            match message {
244                Store::Envelope(message) => self.handle_store_envelope(message),
245                Store::Metrics(message) => self.handle_store_metrics(message),
246                Store::TraceItem(message) => self.handle_store_trace_item(message),
247                Store::Span(message) => self.handle_store_span(message),
248            }
249        })
250    }
251
252    fn handle_store_envelope(&self, message: StoreEnvelope) {
253        let StoreEnvelope {
254            envelope: mut managed,
255        } = message;
256
257        let scoping = managed.scoping();
258        let envelope = managed.take_envelope();
259
260        match self.store_envelope(envelope, managed.received_at(), scoping) {
261            Ok(()) => managed.accept(),
262            Err(error) => {
263                managed.reject(Outcome::Invalid(DiscardReason::Internal));
264                relay_log::error!(
265                    error = &error as &dyn Error,
266                    tags.project_key = %scoping.project_key,
267                    "failed to store envelope"
268                );
269            }
270        }
271    }
272
273    fn store_envelope(
274        &self,
275        mut envelope: Box<Envelope>,
276        received_at: DateTime<Utc>,
277        scoping: Scoping,
278    ) -> Result<(), StoreError> {
279        let retention = envelope.retention();
280        let downsampled_retention = envelope.downsampled_retention();
281
282        let event_id = envelope.event_id();
283        let event_item = envelope.as_mut().take_item_by(|item| {
284            matches!(
285                item.ty(),
286                ItemType::Event | ItemType::Transaction | ItemType::Security
287            )
288        });
289        let event_type = event_item.as_ref().map(|item| item.ty());
290
291        // Some error events like minidumps need all attachment chunks to be processed _before_
292        // the event payload on the consumer side. Transaction attachments do not require this ordering
293        // guarantee, so they do not have to go to the same topic as their event payload.
294        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
295            KafkaTopic::Transactions
296        } else if envelope.get_item_by(is_slow_item).is_some() {
297            KafkaTopic::Attachments
298        } else {
299            KafkaTopic::Events
300        };
301
302        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
303
304        let mut attachments = Vec::new();
305        let mut replay_event = None;
306        let mut replay_recording = None;
307
308        // Whether Relay will submit the replay-event to snuba or not.
309        let replay_relay_snuba_publish_disabled = utils::sample(
310            self.global_config
311                .current()
312                .options
313                .replay_relay_snuba_publish_disabled_sample_rate,
314        )
315        .is_keep();
316
317        for item in envelope.items() {
318            let content_type = item.content_type();
319            match item.ty() {
320                ItemType::Attachment => {
321                    if let Some(attachment) = self.produce_attachment(
322                        event_id.ok_or(StoreError::NoEventId)?,
323                        scoping.project_id,
324                        item,
325                        send_individual_attachments,
326                    )? {
327                        attachments.push(attachment);
328                    }
329                }
330                ItemType::UserReport => {
331                    debug_assert!(event_topic == KafkaTopic::Attachments);
332                    self.produce_user_report(
333                        event_id.ok_or(StoreError::NoEventId)?,
334                        scoping.project_id,
335                        received_at,
336                        item,
337                    )?;
338                }
339                ItemType::UserReportV2 => {
340                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
341                    self.produce_user_report_v2(
342                        event_id.ok_or(StoreError::NoEventId)?,
343                        scoping.project_id,
344                        received_at,
345                        item,
346                        remote_addr,
347                    )?;
348                }
349                ItemType::Profile => self.produce_profile(
350                    scoping.organization_id,
351                    scoping.project_id,
352                    scoping.key_id,
353                    received_at,
354                    retention,
355                    item,
356                )?,
357                ItemType::ReplayVideo => {
358                    self.produce_replay_video(
359                        event_id,
360                        scoping,
361                        item.payload(),
362                        received_at,
363                        retention,
364                        replay_relay_snuba_publish_disabled,
365                    )?;
366                }
367                ItemType::ReplayRecording => {
368                    replay_recording = Some(item);
369                }
370                ItemType::ReplayEvent => {
371                    replay_event = Some(item);
372                    self.produce_replay_event(
373                        event_id.ok_or(StoreError::NoEventId)?,
374                        scoping.project_id,
375                        received_at,
376                        retention,
377                        &item.payload(),
378                        replay_relay_snuba_publish_disabled,
379                    )?;
380                }
381                ItemType::CheckIn => {
382                    let client = envelope.meta().client();
383                    self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
384                }
385                ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
386                    scoping,
387                    received_at,
388                    event_id,
389                    retention,
390                    downsampled_retention,
391                    item,
392                )?,
393                ty @ ItemType::Log => {
394                    debug_assert!(
395                        false,
396                        "received {ty} through an envelope, \
397                        this item must be submitted via a specific store message instead"
398                    );
399                    relay_log::error!(
400                        tags.project_key = %scoping.project_key,
401                        "StoreService received unsupported item type '{ty}' in envelope"
402                    );
403                }
404                ItemType::ProfileChunk => self.produce_profile_chunk(
405                    scoping.organization_id,
406                    scoping.project_id,
407                    received_at,
408                    retention,
409                    item,
410                )?,
411                other => {
412                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
413                    let item_types = envelope
414                        .items()
415                        .map(|item| item.ty().as_str())
416                        .collect::<Vec<_>>();
417                    let attachment_types = envelope
418                        .items()
419                        .map(|item| {
420                            item.attachment_type()
421                                .map(|t| t.to_string())
422                                .unwrap_or_default()
423                        })
424                        .collect::<Vec<_>>();
425
426                    relay_log::with_scope(
427                        |scope| {
428                            scope.set_extra("item_types", item_types.into());
429                            scope.set_extra("attachment_types", attachment_types.into());
430                            if other == &ItemType::FormData {
431                                let payload = item.payload();
432                                let form_data_keys = FormDataIter::new(&payload)
433                                    .map(|entry| entry.key())
434                                    .collect::<Vec<_>>();
435                                scope.set_extra("form_data_keys", form_data_keys.into());
436                            }
437                        },
438                        || {
439                            relay_log::error!(
440                                tags.project_key = %scoping.project_key,
441                                tags.event_type = event_type.unwrap_or("none"),
442                                "StoreService received unexpected item type: {other}"
443                            )
444                        },
445                    )
446                }
447            }
448        }
449
450        if let Some(recording) = replay_recording {
451            // If a recording item type was seen we produce it to Kafka with the replay-event
452            // payload (should it have been provided).
453            //
454            // The replay_video value is always specified as `None`. We do not allow separate
455            // item types for `ReplayVideo` events.
456            let replay_event = replay_event.map(|rv| rv.payload());
457            self.produce_replay_recording(
458                event_id,
459                scoping,
460                &recording.payload(),
461                replay_event.as_deref(),
462                None,
463                received_at,
464                retention,
465                replay_relay_snuba_publish_disabled,
466            )?;
467        }
468
469        if let Some(event_item) = event_item {
470            let event_id = event_id.ok_or(StoreError::NoEventId)?;
471            let project_id = scoping.project_id;
472            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
473
474            self.produce(
475                event_topic,
476                KafkaMessage::Event(EventKafkaMessage {
477                    payload: event_item.payload(),
478                    start_time: safe_timestamp(received_at),
479                    event_id,
480                    project_id,
481                    remote_addr,
482                    attachments,
483                }),
484            )?;
485        } else {
486            debug_assert!(attachments.is_empty());
487        }
488
489        Ok(())
490    }
491
492    fn handle_store_metrics(&self, message: StoreMetrics) {
493        let StoreMetrics {
494            buckets,
495            scoping,
496            retention,
497        } = message;
498
499        let batch_size = self.config.metrics_max_batch_size_bytes();
500        let mut error = None;
501
502        let global_config = self.global_config.current();
503        let mut encoder = BucketEncoder::new(&global_config);
504
505        let now = UnixTimestamp::now();
506        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
507
508        for mut bucket in buckets {
509            let namespace = encoder.prepare(&mut bucket);
510
511            if let Some(received_at) = bucket.metadata.received_at {
512                let delay = now.as_secs().saturating_sub(received_at.as_secs());
513                let (total, count, max) = delay_stats.get_mut(namespace);
514                *total += delay;
515                *count += 1;
516                *max = (*max).max(delay);
517            }
518
519            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
520            // each bucket separately, we only need to split buckets that exceed the size, but not
521            // batches.
522            for view in BucketsView::new(std::slice::from_ref(&bucket))
523                .by_size(batch_size)
524                .flatten()
525            {
526                let message = self.create_metric_message(
527                    scoping.organization_id,
528                    scoping.project_id,
529                    &mut encoder,
530                    namespace,
531                    &view,
532                    retention,
533                );
534
535                let result =
536                    message.and_then(|message| self.send_metric_message(namespace, message));
537
538                let outcome = match result {
539                    Ok(()) => Outcome::Accepted,
540                    Err(e) => {
541                        error.get_or_insert(e);
542                        Outcome::Invalid(DiscardReason::Internal)
543                    }
544                };
545
546                self.metric_outcomes.track(scoping, &[view], outcome);
547            }
548        }
549
550        if let Some(error) = error {
551            relay_log::error!(
552                error = &error as &dyn std::error::Error,
553                "failed to produce metric buckets: {error}"
554            );
555        }
556
557        for (namespace, (total, count, max)) in delay_stats {
558            if count == 0 {
559                continue;
560            }
561            metric!(
562                counter(RelayCounters::MetricDelaySum) += total,
563                namespace = namespace.as_str()
564            );
565            metric!(
566                counter(RelayCounters::MetricDelayCount) += count,
567                namespace = namespace.as_str()
568            );
569            metric!(
570                gauge(RelayGauges::MetricDelayMax) = max,
571                namespace = namespace.as_str()
572            );
573        }
574    }
575
576    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
577        let scoping = message.scoping();
578        let received_at = message.received_at();
579
580        let quantities = message.try_accept(|item| {
581            let item_type = item.trace_item.item_type();
582            let message = KafkaMessage::Item {
583                headers: BTreeMap::from([
584                    ("project_id".to_owned(), scoping.project_id.to_string()),
585                    ("item_type".to_owned(), (item_type as i32).to_string()),
586                ]),
587                message: item.trace_item,
588                item_type,
589            };
590
591            self.produce(KafkaTopic::Items, message)
592                .map(|()| item.quantities)
593        });
594
595        // Accepted outcomes when items have been successfully produced to rdkafka.
596        //
597        // This is only a temporary measure, long term these outcomes will be part of the trace
598        // item and emitted by Snuba to guarantee a delivery to storage.
599        if let Ok(quantities) = quantities {
600            for (category, quantity) in quantities {
601                self.outcome_aggregator.send(TrackOutcome {
602                    category,
603                    event_id: None,
604                    outcome: Outcome::Accepted,
605                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
606                    remote_addr: None,
607                    scoping,
608                    timestamp: received_at,
609                });
610            }
611        }
612    }
613
614    fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
615        let scoping = message.scoping();
616        let received_at = message.received_at();
617
618        let meta = SpanMeta {
619            organization_id: scoping.organization_id,
620            project_id: scoping.project_id,
621            key_id: scoping.key_id,
622            event_id: None,
623            retention_days: message.retention_days,
624            downsampled_retention_days: message.downsampled_retention_days,
625            received: datetime_to_timestamp(received_at),
626        };
627
628        let result = message.try_accept(|span| {
629            let item = Annotated::new(span.item);
630            let message = KafkaMessage::SpanV2 {
631                routing_key: span.routing_key,
632                headers: BTreeMap::from([(
633                    "project_id".to_owned(),
634                    scoping.project_id.to_string(),
635                )]),
636                message: SpanKafkaMessage {
637                    meta,
638                    span: SerializableAnnotated(&item),
639                },
640            };
641
642            self.produce(KafkaTopic::Spans, message)
643        });
644
645        match result {
646            Ok(()) => {
647                relay_statsd::metric!(
648                    counter(RelayCounters::SpanV2Produced) += 1,
649                    via = "processing"
650                );
651
652                // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
653                // or the segments consumer, depending on which will produce outcomes later.
654                self.outcome_aggregator.send(TrackOutcome {
655                    category: DataCategory::SpanIndexed,
656                    event_id: None,
657                    outcome: Outcome::Accepted,
658                    quantity: 1,
659                    remote_addr: None,
660                    scoping,
661                    timestamp: received_at,
662                });
663            }
664            Err(error) => {
665                relay_log::error!(
666                    error = &error as &dyn Error,
667                    tags.project_key = %scoping.project_key,
668                    "failed to store span"
669                );
670            }
671        }
672    }
673
674    fn create_metric_message<'a>(
675        &self,
676        organization_id: OrganizationId,
677        project_id: ProjectId,
678        encoder: &'a mut BucketEncoder,
679        namespace: MetricNamespace,
680        view: &BucketView<'a>,
681        retention_days: u16,
682    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
683        let value = match view.value() {
684            BucketViewValue::Counter(c) => MetricValue::Counter(c),
685            BucketViewValue::Distribution(data) => MetricValue::Distribution(
686                encoder
687                    .encode_distribution(namespace, data)
688                    .map_err(StoreError::EncodingFailed)?,
689            ),
690            BucketViewValue::Set(data) => MetricValue::Set(
691                encoder
692                    .encode_set(namespace, data)
693                    .map_err(StoreError::EncodingFailed)?,
694            ),
695            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
696        };
697
698        Ok(MetricKafkaMessage {
699            org_id: organization_id,
700            project_id,
701            name: view.name(),
702            value,
703            timestamp: view.timestamp(),
704            tags: view.tags(),
705            retention_days,
706            received_at: view.metadata().received_at,
707        })
708    }
709
710    fn produce(
711        &self,
712        topic: KafkaTopic,
713        // Takes message by value to ensure it is not being produced twice.
714        message: KafkaMessage,
715    ) -> Result<(), StoreError> {
716        relay_log::trace!("Sending kafka message of type {}", message.variant());
717
718        let topic_name = self.producer.client.send_message(topic, &message)?;
719
720        match &message {
721            KafkaMessage::Metric {
722                message: metric, ..
723            } => {
724                metric!(
725                    counter(RelayCounters::ProcessingMessageProduced) += 1,
726                    event_type = message.variant(),
727                    topic = topic_name,
728                    metric_type = metric.value.variant(),
729                    metric_encoding = metric.value.encoding().unwrap_or(""),
730                );
731            }
732            KafkaMessage::ReplayRecordingNotChunked(replay) => {
733                let has_video = replay.replay_video.is_some();
734
735                metric!(
736                    counter(RelayCounters::ProcessingMessageProduced) += 1,
737                    event_type = message.variant(),
738                    topic = topic_name,
739                    has_video = bool_to_str(has_video),
740                );
741            }
742            message => {
743                metric!(
744                    counter(RelayCounters::ProcessingMessageProduced) += 1,
745                    event_type = message.variant(),
746                    topic = topic_name,
747                );
748            }
749        }
750
751        Ok(())
752    }
753
754    /// Produces Kafka messages for the content and metadata of an attachment item.
755    ///
756    /// The `send_individual_attachments` controls whether the metadata of an attachment
757    /// is produced directly as an individual `attachment` message, or returned from this function
758    /// to be later sent as part of an `event` message.
759    ///
760    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
761    /// unless the `send_individual_attachments` flag is set, and the content is small enough
762    /// to fit inside a message.
763    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
764    /// of the `attachment` message instead.
765    fn produce_attachment(
766        &self,
767        event_id: EventId,
768        project_id: ProjectId,
769        item: &Item,
770        send_individual_attachments: bool,
771    ) -> Result<Option<ChunkedAttachment>, StoreError> {
772        let id = Uuid::new_v4().to_string();
773
774        let payload = item.payload();
775        let size = item.len();
776        let max_chunk_size = self.config.attachment_chunk_size();
777
778        // When sending individual attachments, and we have a single chunk, we want to send the
779        // `data` inline in the `attachment` message.
780        // This avoids a needless roundtrip through the attachments cache on the Sentry side.
781        let payload = if size == 0 {
782            AttachmentPayload::Chunked(0)
783        } else if send_individual_attachments && size < max_chunk_size {
784            AttachmentPayload::Inline(payload)
785        } else {
786            let mut chunk_index = 0;
787            let mut offset = 0;
788            // This skips chunks for empty attachments. The consumer does not require chunks for
789            // empty attachments. `chunks` will be `0` in this case.
790            while offset < size {
791                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
792                let chunk_message = AttachmentChunkKafkaMessage {
793                    payload: payload.slice(offset..offset + chunk_size),
794                    event_id,
795                    project_id,
796                    id: id.clone(),
797                    chunk_index,
798                };
799
800                self.produce(
801                    KafkaTopic::Attachments,
802                    KafkaMessage::AttachmentChunk(chunk_message),
803                )?;
804                offset += chunk_size;
805                chunk_index += 1;
806            }
807
808            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
809            // is one larger than the last chunk, so it is equal to the number of chunks.
810            AttachmentPayload::Chunked(chunk_index)
811        };
812
813        let attachment = ChunkedAttachment {
814            id,
815            name: match item.filename() {
816                Some(name) => name.to_owned(),
817                None => UNNAMED_ATTACHMENT.to_owned(),
818            },
819            rate_limited: item.rate_limited(),
820            content_type: item
821                .content_type()
822                .map(|content_type| content_type.as_str().to_owned()),
823            attachment_type: item.attachment_type().cloned().unwrap_or_default(),
824            size,
825            payload,
826        };
827
828        if send_individual_attachments {
829            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
830                event_id,
831                project_id,
832                attachment,
833            });
834            self.produce(KafkaTopic::Attachments, message)?;
835            Ok(None)
836        } else {
837            Ok(Some(attachment))
838        }
839    }
840
841    fn produce_user_report(
842        &self,
843        event_id: EventId,
844        project_id: ProjectId,
845        received_at: DateTime<Utc>,
846        item: &Item,
847    ) -> Result<(), StoreError> {
848        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
849            project_id,
850            event_id,
851            start_time: safe_timestamp(received_at),
852            payload: item.payload(),
853        });
854
855        self.produce(KafkaTopic::Attachments, message)
856    }
857
858    fn produce_user_report_v2(
859        &self,
860        event_id: EventId,
861        project_id: ProjectId,
862        received_at: DateTime<Utc>,
863        item: &Item,
864        remote_addr: Option<String>,
865    ) -> Result<(), StoreError> {
866        let message = KafkaMessage::Event(EventKafkaMessage {
867            project_id,
868            event_id,
869            payload: item.payload(),
870            start_time: safe_timestamp(received_at),
871            remote_addr,
872            attachments: vec![],
873        });
874        self.produce(KafkaTopic::Feedback, message)
875    }
876
877    fn send_metric_message(
878        &self,
879        namespace: MetricNamespace,
880        message: MetricKafkaMessage,
881    ) -> Result<(), StoreError> {
882        let topic = match namespace {
883            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
884            MetricNamespace::Unsupported => {
885                relay_log::with_scope(
886                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
887                    || relay_log::error!("store service dropping unknown metric usecase"),
888                );
889                return Ok(());
890            }
891            _ => KafkaTopic::MetricsGeneric,
892        };
893        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
894
895        self.produce(topic, KafkaMessage::Metric { headers, message })?;
896        Ok(())
897    }
898
899    fn produce_profile(
900        &self,
901        organization_id: OrganizationId,
902        project_id: ProjectId,
903        key_id: Option<u64>,
904        received_at: DateTime<Utc>,
905        retention_days: u16,
906        item: &Item,
907    ) -> Result<(), StoreError> {
908        let message = ProfileKafkaMessage {
909            organization_id,
910            project_id,
911            key_id,
912            received: safe_timestamp(received_at),
913            retention_days,
914            headers: BTreeMap::from([(
915                "sampled".to_owned(),
916                if item.sampled() { "true" } else { "false" }.to_owned(),
917            )]),
918            payload: item.payload(),
919        };
920        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
921        Ok(())
922    }
923
924    fn produce_replay_event(
925        &self,
926        replay_id: EventId,
927        project_id: ProjectId,
928        received_at: DateTime<Utc>,
929        retention_days: u16,
930        payload: &[u8],
931        relay_snuba_publish_disabled: bool,
932    ) -> Result<(), StoreError> {
933        if relay_snuba_publish_disabled {
934            return Ok(());
935        }
936
937        let message = ReplayEventKafkaMessage {
938            replay_id,
939            project_id,
940            retention_days,
941            start_time: safe_timestamp(received_at),
942            payload,
943        };
944        self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
945        Ok(())
946    }
947
948    #[allow(clippy::too_many_arguments)]
949    fn produce_replay_recording(
950        &self,
951        event_id: Option<EventId>,
952        scoping: Scoping,
953        payload: &[u8],
954        replay_event: Option<&[u8]>,
955        replay_video: Option<&[u8]>,
956        received_at: DateTime<Utc>,
957        retention: u16,
958        relay_snuba_publish_disabled: bool,
959    ) -> Result<(), StoreError> {
960        // Maximum number of bytes accepted by the consumer.
961        let max_payload_size = self.config.max_replay_message_size();
962
963        // Size of the consumer message. We can be reasonably sure this won't overflow because
964        // of the request size validation provided by Nginx and Relay.
965        let mut payload_size = 2000; // Reserve 2KB for the message metadata.
966        payload_size += replay_event.as_ref().map_or(0, |b| b.len());
967        payload_size += replay_video.as_ref().map_or(0, |b| b.len());
968        payload_size += payload.len();
969
970        // If the recording payload can not fit in to the message do not produce and quit early.
971        if payload_size >= max_payload_size {
972            relay_log::debug!("replay_recording over maximum size.");
973            self.outcome_aggregator.send(TrackOutcome {
974                category: DataCategory::Replay,
975                event_id,
976                outcome: Outcome::Invalid(DiscardReason::TooLarge(
977                    DiscardItemType::ReplayRecording,
978                )),
979                quantity: 1,
980                remote_addr: None,
981                scoping,
982                timestamp: received_at,
983            });
984            return Ok(());
985        }
986
987        let message =
988            KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
989                replay_id: event_id.ok_or(StoreError::NoEventId)?,
990                project_id: scoping.project_id,
991                key_id: scoping.key_id,
992                org_id: scoping.organization_id,
993                received: safe_timestamp(received_at),
994                retention_days: retention,
995                payload,
996                replay_event,
997                replay_video,
998                relay_snuba_publish_disabled,
999            });
1000
1001        self.produce(KafkaTopic::ReplayRecordings, message)?;
1002
1003        Ok(())
1004    }
1005
1006    fn produce_replay_video(
1007        &self,
1008        event_id: Option<EventId>,
1009        scoping: Scoping,
1010        payload: Bytes,
1011        received_at: DateTime<Utc>,
1012        retention: u16,
1013        relay_snuba_publish_disabled: bool,
1014    ) -> Result<(), StoreError> {
1015        #[derive(Deserialize)]
1016        struct VideoEvent<'a> {
1017            replay_event: &'a [u8],
1018            replay_recording: &'a [u8],
1019            replay_video: &'a [u8],
1020        }
1021
1022        let Ok(VideoEvent {
1023            replay_video,
1024            replay_event,
1025            replay_recording,
1026        }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1027        else {
1028            self.outcome_aggregator.send(TrackOutcome {
1029                category: DataCategory::Replay,
1030                event_id,
1031                outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1032                quantity: 1,
1033                remote_addr: None,
1034                scoping,
1035                timestamp: received_at,
1036            });
1037            return Ok(());
1038        };
1039
1040        self.produce_replay_event(
1041            event_id.ok_or(StoreError::NoEventId)?,
1042            scoping.project_id,
1043            received_at,
1044            retention,
1045            replay_event,
1046            relay_snuba_publish_disabled,
1047        )?;
1048
1049        self.produce_replay_recording(
1050            event_id,
1051            scoping,
1052            replay_recording,
1053            Some(replay_event),
1054            Some(replay_video),
1055            received_at,
1056            retention,
1057            relay_snuba_publish_disabled,
1058        )
1059    }
1060
1061    fn produce_check_in(
1062        &self,
1063        project_id: ProjectId,
1064        received_at: DateTime<Utc>,
1065        client: Option<&str>,
1066        retention_days: u16,
1067        item: &Item,
1068    ) -> Result<(), StoreError> {
1069        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1070            message_type: CheckInMessageType::CheckIn,
1071            project_id,
1072            retention_days,
1073            start_time: safe_timestamp(received_at),
1074            sdk: client.map(str::to_owned),
1075            payload: item.payload(),
1076            routing_key_hint: item.routing_hint(),
1077        });
1078
1079        self.produce(KafkaTopic::Monitors, message)?;
1080
1081        Ok(())
1082    }
1083
1084    fn produce_span(
1085        &self,
1086        scoping: Scoping,
1087        received_at: DateTime<Utc>,
1088        event_id: Option<EventId>,
1089        retention_days: u16,
1090        downsampled_retention_days: u16,
1091        item: &Item,
1092    ) -> Result<(), StoreError> {
1093        debug_assert_eq!(item.ty(), &ItemType::Span);
1094        debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1095
1096        let Scoping {
1097            organization_id,
1098            project_id,
1099            project_key: _,
1100            key_id,
1101        } = scoping;
1102
1103        let payload = item.payload();
1104        let message = SpanKafkaMessageRaw {
1105            meta: SpanMeta {
1106                organization_id,
1107                project_id,
1108                key_id,
1109                event_id,
1110                retention_days,
1111                downsampled_retention_days,
1112                received: datetime_to_timestamp(received_at),
1113            },
1114            span: serde_json::from_slice(&payload)
1115                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1116        };
1117
1118        // Verify that this is a V2 span:
1119        debug_assert!(message.span.contains_key("attributes"));
1120        relay_statsd::metric!(
1121            counter(RelayCounters::SpanV2Produced) += 1,
1122            via = "envelope"
1123        );
1124
1125        self.produce(
1126            KafkaTopic::Spans,
1127            KafkaMessage::SpanRaw {
1128                routing_key: item.routing_hint(),
1129                headers: BTreeMap::from([(
1130                    "project_id".to_owned(),
1131                    scoping.project_id.to_string(),
1132                )]),
1133                message,
1134            },
1135        )?;
1136
1137        // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1138        // or the segments consumer, depending on which will produce outcomes later.
1139        self.outcome_aggregator.send(TrackOutcome {
1140            category: DataCategory::SpanIndexed,
1141            event_id: None,
1142            outcome: Outcome::Accepted,
1143            quantity: 1,
1144            remote_addr: None,
1145            scoping,
1146            timestamp: received_at,
1147        });
1148
1149        Ok(())
1150    }
1151
1152    fn produce_profile_chunk(
1153        &self,
1154        organization_id: OrganizationId,
1155        project_id: ProjectId,
1156        received_at: DateTime<Utc>,
1157        retention_days: u16,
1158        item: &Item,
1159    ) -> Result<(), StoreError> {
1160        let message = ProfileChunkKafkaMessage {
1161            organization_id,
1162            project_id,
1163            received: safe_timestamp(received_at),
1164            retention_days,
1165            payload: item.payload(),
1166        };
1167        self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1168        Ok(())
1169    }
1170}
1171
1172impl Service for StoreService {
1173    type Interface = Store;
1174
1175    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1176        let this = Arc::new(self);
1177
1178        relay_log::info!("store forwarder started");
1179
1180        while let Some(message) = rx.recv().await {
1181            let service = Arc::clone(&this);
1182            // For now, we run each task synchronously, in the future we might explore how to make
1183            // the store async.
1184            this.pool
1185                .spawn_async(async move { service.handle_message(message) }.boxed())
1186                .await;
1187        }
1188
1189        relay_log::info!("store forwarder stopped");
1190    }
1191}
1192
1193/// This signifies how the attachment payload is being transfered.
1194#[derive(Debug, Serialize)]
1195enum AttachmentPayload {
1196    /// The payload has been split into multiple chunks.
1197    ///
1198    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1199    /// If the payload `size == 0`, the number of chunks will also be `0`.
1200    #[serde(rename = "chunks")]
1201    Chunked(usize),
1202
1203    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1204    #[serde(rename = "data")]
1205    Inline(Bytes),
1206
1207    /// The attachment has already been stored into the objectstore, with the given Id.
1208    #[serde(rename = "stored_id")]
1209    #[allow(unused)] // TODO: actually storing it in objectstore first is still WIP
1210    Stored(String),
1211}
1212
1213/// Common attributes for both standalone attachments and processing-relevant attachments.
1214#[derive(Debug, Serialize)]
1215struct ChunkedAttachment {
1216    /// The attachment ID within the event.
1217    ///
1218    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1219    id: String,
1220
1221    /// File name of the attachment file.
1222    name: String,
1223
1224    /// Whether this attachment was rate limited and should be removed after processing.
1225    ///
1226    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1227    /// native crash reports still need to be retained. These attachments are marked with the
1228    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1229    /// not be persisted after processing.
1230    rate_limited: bool,
1231
1232    /// Content type of the attachment payload.
1233    #[serde(skip_serializing_if = "Option::is_none")]
1234    content_type: Option<String>,
1235
1236    /// The Sentry-internal attachment type used in the processing pipeline.
1237    #[serde(serialize_with = "serialize_attachment_type")]
1238    attachment_type: AttachmentType,
1239
1240    /// The size of the attachment in bytes.
1241    size: usize,
1242
1243    /// The attachment payload, chunked, inlined, or already stored.
1244    #[serde(flatten)]
1245    payload: AttachmentPayload,
1246}
1247
1248/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1249///
1250/// Cannot serialize bytes.
1251///
1252/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1253fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1254where
1255    S: serde::Serializer,
1256    T: serde::Serialize,
1257{
1258    serde_json::to_value(t)
1259        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1260        .serialize(serializer)
1261}
1262
1263/// Container payload for event messages.
1264#[derive(Debug, Serialize)]
1265struct EventKafkaMessage {
1266    /// Raw event payload.
1267    payload: Bytes,
1268    /// Time at which the event was received by Relay.
1269    start_time: u64,
1270    /// The event id.
1271    event_id: EventId,
1272    /// The project id for the current event.
1273    project_id: ProjectId,
1274    /// The client ip address.
1275    remote_addr: Option<String>,
1276    /// Attachments that are potentially relevant for processing.
1277    attachments: Vec<ChunkedAttachment>,
1278}
1279
1280#[derive(Debug, Serialize)]
1281struct ReplayEventKafkaMessage<'a> {
1282    /// Raw event payload.
1283    payload: &'a [u8],
1284    /// Time at which the event was received by Relay.
1285    start_time: u64,
1286    /// The event id.
1287    replay_id: EventId,
1288    /// The project id for the current event.
1289    project_id: ProjectId,
1290    // Number of days to retain.
1291    retention_days: u16,
1292}
1293
1294/// Container payload for chunks of attachments.
1295#[derive(Debug, Serialize)]
1296struct AttachmentChunkKafkaMessage {
1297    /// Chunk payload of the attachment.
1298    payload: Bytes,
1299    /// The event id.
1300    event_id: EventId,
1301    /// The project id for the current event.
1302    project_id: ProjectId,
1303    /// The attachment ID within the event.
1304    ///
1305    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1306    id: String,
1307    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1308    chunk_index: usize,
1309}
1310
1311/// A "standalone" attachment.
1312///
1313/// Still belongs to an event but can be sent independently (like UserReport) and is not
1314/// considered in processing.
1315#[derive(Debug, Serialize)]
1316struct AttachmentKafkaMessage {
1317    /// The event id.
1318    event_id: EventId,
1319    /// The project id for the current event.
1320    project_id: ProjectId,
1321    /// The attachment.
1322    attachment: ChunkedAttachment,
1323}
1324
1325#[derive(Debug, Serialize)]
1326struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1327    replay_id: EventId,
1328    key_id: Option<u64>,
1329    org_id: OrganizationId,
1330    project_id: ProjectId,
1331    received: u64,
1332    retention_days: u16,
1333    #[serde(with = "serde_bytes")]
1334    payload: &'a [u8],
1335    #[serde(with = "serde_bytes")]
1336    replay_event: Option<&'a [u8]>,
1337    #[serde(with = "serde_bytes")]
1338    replay_video: Option<&'a [u8]>,
1339    relay_snuba_publish_disabled: bool,
1340}
1341
1342/// User report for an event wrapped up in a message ready for consumption in Kafka.
1343///
1344/// Is always independent of an event and can be sent as part of any envelope.
1345#[derive(Debug, Serialize)]
1346struct UserReportKafkaMessage {
1347    /// The project id for the current event.
1348    project_id: ProjectId,
1349    start_time: u64,
1350    payload: Bytes,
1351
1352    // Used for KafkaMessage::key
1353    #[serde(skip)]
1354    event_id: EventId,
1355}
1356
1357#[derive(Clone, Debug, Serialize)]
1358struct MetricKafkaMessage<'a> {
1359    org_id: OrganizationId,
1360    project_id: ProjectId,
1361    name: &'a MetricName,
1362    #[serde(flatten)]
1363    value: MetricValue<'a>,
1364    timestamp: UnixTimestamp,
1365    tags: &'a BTreeMap<String, String>,
1366    retention_days: u16,
1367    #[serde(skip_serializing_if = "Option::is_none")]
1368    received_at: Option<UnixTimestamp>,
1369}
1370
1371#[derive(Clone, Debug, Serialize)]
1372#[serde(tag = "type", content = "value")]
1373enum MetricValue<'a> {
1374    #[serde(rename = "c")]
1375    Counter(FiniteF64),
1376    #[serde(rename = "d")]
1377    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1378    #[serde(rename = "s")]
1379    Set(ArrayEncoding<'a, SetView<'a>>),
1380    #[serde(rename = "g")]
1381    Gauge(GaugeValue),
1382}
1383
1384impl MetricValue<'_> {
1385    fn variant(&self) -> &'static str {
1386        match self {
1387            Self::Counter(_) => "counter",
1388            Self::Distribution(_) => "distribution",
1389            Self::Set(_) => "set",
1390            Self::Gauge(_) => "gauge",
1391        }
1392    }
1393
1394    fn encoding(&self) -> Option<&'static str> {
1395        match self {
1396            Self::Distribution(ae) => Some(ae.name()),
1397            Self::Set(ae) => Some(ae.name()),
1398            _ => None,
1399        }
1400    }
1401}
1402
1403#[derive(Clone, Debug, Serialize)]
1404struct ProfileKafkaMessage {
1405    organization_id: OrganizationId,
1406    project_id: ProjectId,
1407    key_id: Option<u64>,
1408    received: u64,
1409    retention_days: u16,
1410    #[serde(skip)]
1411    headers: BTreeMap<String, String>,
1412    payload: Bytes,
1413}
1414
1415/// Used to discriminate cron monitor ingestion messages.
1416///
1417/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1418/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1419/// intended to ensure the clock continues to run even when ingestion volume drops.
1420#[allow(dead_code)]
1421#[derive(Debug, Serialize)]
1422#[serde(rename_all = "snake_case")]
1423enum CheckInMessageType {
1424    ClockPulse,
1425    CheckIn,
1426}
1427
1428#[derive(Debug, Serialize)]
1429struct CheckInKafkaMessage {
1430    #[serde(skip)]
1431    routing_key_hint: Option<Uuid>,
1432
1433    /// Used by the consumer to discrinminate the message.
1434    message_type: CheckInMessageType,
1435    /// Raw event payload.
1436    payload: Bytes,
1437    /// Time at which the event was received by Relay.
1438    start_time: u64,
1439    /// The SDK client which produced the event.
1440    sdk: Option<String>,
1441    /// The project id for the current event.
1442    project_id: ProjectId,
1443    /// Number of days to retain.
1444    retention_days: u16,
1445}
1446
1447#[derive(Debug, Serialize)]
1448struct SpanKafkaMessageRaw<'a> {
1449    #[serde(flatten)]
1450    meta: SpanMeta,
1451    #[serde(flatten)]
1452    span: BTreeMap<&'a str, &'a RawValue>,
1453}
1454
1455#[derive(Debug, Serialize)]
1456struct SpanKafkaMessage<'a> {
1457    #[serde(flatten)]
1458    meta: SpanMeta,
1459    #[serde(flatten)]
1460    span: SerializableAnnotated<'a, SpanV2>,
1461}
1462
1463#[derive(Debug, Serialize)]
1464struct SpanMeta {
1465    organization_id: OrganizationId,
1466    project_id: ProjectId,
1467    // Required for the buffer to emit outcomes scoped to the DSN.
1468    #[serde(skip_serializing_if = "Option::is_none")]
1469    key_id: Option<u64>,
1470    #[serde(skip_serializing_if = "Option::is_none")]
1471    event_id: Option<EventId>,
1472    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1473    received: f64,
1474    /// Number of days until these data should be deleted.
1475    retention_days: u16,
1476    /// Number of days until the downsampled version of this data should be deleted.
1477    downsampled_retention_days: u16,
1478}
1479
1480#[derive(Clone, Debug, Serialize)]
1481struct ProfileChunkKafkaMessage {
1482    organization_id: OrganizationId,
1483    project_id: ProjectId,
1484    received: u64,
1485    retention_days: u16,
1486    payload: Bytes,
1487}
1488
1489/// An enum over all possible ingest messages.
1490#[derive(Debug, Serialize)]
1491#[serde(tag = "type", rename_all = "snake_case")]
1492#[allow(clippy::large_enum_variant)]
1493enum KafkaMessage<'a> {
1494    Event(EventKafkaMessage),
1495    UserReport(UserReportKafkaMessage),
1496    Metric {
1497        #[serde(skip)]
1498        headers: BTreeMap<String, String>,
1499        #[serde(flatten)]
1500        message: MetricKafkaMessage<'a>,
1501    },
1502    CheckIn(CheckInKafkaMessage),
1503    Item {
1504        #[serde(skip)]
1505        headers: BTreeMap<String, String>,
1506        #[serde(skip)]
1507        item_type: TraceItemType,
1508        #[serde(skip)]
1509        message: TraceItem,
1510    },
1511    SpanRaw {
1512        #[serde(skip)]
1513        routing_key: Option<Uuid>,
1514        #[serde(skip)]
1515        headers: BTreeMap<String, String>,
1516        #[serde(flatten)]
1517        message: SpanKafkaMessageRaw<'a>,
1518    },
1519    SpanV2 {
1520        #[serde(skip)]
1521        routing_key: Option<Uuid>,
1522        #[serde(skip)]
1523        headers: BTreeMap<String, String>,
1524        #[serde(flatten)]
1525        message: SpanKafkaMessage<'a>,
1526    },
1527
1528    Attachment(AttachmentKafkaMessage),
1529    AttachmentChunk(AttachmentChunkKafkaMessage),
1530
1531    Profile(ProfileKafkaMessage),
1532    ProfileChunk(ProfileChunkKafkaMessage),
1533
1534    ReplayEvent(ReplayEventKafkaMessage<'a>),
1535    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1536}
1537
1538impl Message for KafkaMessage<'_> {
1539    fn variant(&self) -> &'static str {
1540        match self {
1541            KafkaMessage::Event(_) => "event",
1542            KafkaMessage::UserReport(_) => "user_report",
1543            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1544                MetricNamespace::Sessions => "metric_sessions",
1545                MetricNamespace::Transactions => "metric_transactions",
1546                MetricNamespace::Spans => "metric_spans",
1547                MetricNamespace::Custom => "metric_custom",
1548                MetricNamespace::Unsupported => "metric_unsupported",
1549            },
1550            KafkaMessage::CheckIn(_) => "check_in",
1551            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1552            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1553
1554            KafkaMessage::Attachment(_) => "attachment",
1555            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1556
1557            KafkaMessage::Profile(_) => "profile",
1558            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1559
1560            KafkaMessage::ReplayEvent(_) => "replay_event",
1561            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1562        }
1563    }
1564
1565    /// Returns the partitioning key for this Kafka message determining.
1566    fn key(&self) -> Option<relay_kafka::Key> {
1567        match self {
1568            Self::Event(message) => Some(message.event_id.0),
1569            Self::UserReport(message) => Some(message.event_id.0),
1570            Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1571
1572            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1573            //
1574            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1575            // recieve the routing_key_hint form their envelopes.
1576            Self::CheckIn(message) => message.routing_key_hint,
1577
1578            Self::Attachment(message) => Some(message.event_id.0),
1579            Self::AttachmentChunk(message) => Some(message.event_id.0),
1580            Self::ReplayEvent(message) => Some(message.replay_id.0),
1581
1582            // Random partitioning
1583            Self::Metric { .. }
1584            | Self::Item { .. }
1585            | Self::Profile(_)
1586            | Self::ProfileChunk(_)
1587            | Self::ReplayRecordingNotChunked(_) => None,
1588        }
1589        .filter(|uuid| !uuid.is_nil())
1590        .map(|uuid| uuid.as_u128())
1591    }
1592
1593    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1594        match &self {
1595            KafkaMessage::Metric { headers, .. }
1596            | KafkaMessage::SpanRaw { headers, .. }
1597            | KafkaMessage::SpanV2 { headers, .. }
1598            | KafkaMessage::Item { headers, .. }
1599            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers),
1600
1601            KafkaMessage::Event(_)
1602            | KafkaMessage::UserReport(_)
1603            | KafkaMessage::CheckIn(_)
1604            | KafkaMessage::Attachment(_)
1605            | KafkaMessage::AttachmentChunk(_)
1606            | KafkaMessage::ProfileChunk(_)
1607            | KafkaMessage::ReplayEvent(_)
1608            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1609        }
1610    }
1611
1612    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1613        match self {
1614            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1615            KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1616            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1617            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1618            KafkaMessage::Item { message, .. } => {
1619                let mut payload = Vec::new();
1620                match message.encode(&mut payload) {
1621                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1622                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1623                }
1624            }
1625            KafkaMessage::Event(_)
1626            | KafkaMessage::UserReport(_)
1627            | KafkaMessage::CheckIn(_)
1628            | KafkaMessage::Attachment(_)
1629            | KafkaMessage::AttachmentChunk(_)
1630            | KafkaMessage::Profile(_)
1631            | KafkaMessage::ProfileChunk(_)
1632            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1633                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1634                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1635            },
1636        }
1637    }
1638}
1639
1640fn serialize_as_json<T: serde::Serialize>(
1641    value: &T,
1642) -> Result<SerializationOutput<'_>, ClientError> {
1643    match serde_json::to_vec(value) {
1644        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1645        Err(err) => Err(ClientError::InvalidJson(err)),
1646    }
1647}
1648
1649/// Determines if the given item is considered slow.
1650///
1651/// Slow items must be routed to the `Attachments` topic.
1652fn is_slow_item(item: &Item) -> bool {
1653    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1654}
1655
1656fn bool_to_str(value: bool) -> &'static str {
1657    if value { "true" } else { "false" }
1658}
1659
1660/// Returns a safe timestamp for Kafka.
1661///
1662/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1663fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1664    let ts = timestamp.timestamp();
1665    if ts >= 0 {
1666        return ts as u64;
1667    }
1668
1669    // We assume this call can't return < 0.
1670    Utc::now().timestamp() as u64
1671}
1672
1673#[cfg(test)]
1674mod tests {
1675
1676    use super::*;
1677
1678    #[test]
1679    fn disallow_outcomes() {
1680        let config = Config::default();
1681        let producer = Producer::create(&config).unwrap();
1682
1683        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1684            let res = producer
1685                .client
1686                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1687
1688            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1689        }
1690    }
1691}