relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46/// Fallback name used for attachment items without a `filename` header.
47const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
48
49#[derive(Debug, thiserror::Error)]
50pub enum StoreError {
51    #[error("failed to send the message to kafka: {0}")]
52    SendFailed(#[from] ClientError),
53    #[error("failed to encode data: {0}")]
54    EncodingFailed(std::io::Error),
55    #[error("failed to store event because event id was missing")]
56    NoEventId,
57}
58
59impl OutcomeError for StoreError {
60    type Error = Self;
61
62    fn consume(self) -> (Option<Outcome>, Self::Error) {
63        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
64    }
65}
66
67struct Producer {
68    client: KafkaClient,
69}
70
71impl Producer {
72    pub fn create(config: &Config) -> anyhow::Result<Self> {
73        let mut client_builder = KafkaClient::builder();
74
75        for topic in KafkaTopic::iter().filter(|t| {
76            // Outcomes should not be sent from the store forwarder.
77            // See `KafkaOutcomesProducer`.
78            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
79        }) {
80            let kafka_configs = config.kafka_configs(*topic)?;
81            client_builder = client_builder
82                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
83                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
84        }
85
86        Ok(Self {
87            client: client_builder.build(),
88        })
89    }
90}
91
92/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
93#[derive(Debug)]
94pub struct StoreEnvelope {
95    pub envelope: TypedEnvelope<Processed>,
96}
97
98/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
99#[derive(Clone, Debug)]
100pub struct StoreMetrics {
101    pub buckets: Vec<Bucket>,
102    pub scoping: Scoping,
103    pub retention: u16,
104}
105
106/// Publishes a log item to the Sentry core application through Kafka.
107#[derive(Debug)]
108pub struct StoreTraceItem {
109    /// The final trace item which will be produced to Kafka.
110    pub trace_item: TraceItem,
111    /// Outcomes to be emitted when successfully producing the item to Kafka.
112    ///
113    /// Note: this is only a temporary measure, long term these outcomes will be part of the trace
114    /// item and emitted by Snuba to guarantee a delivery to storage.
115    pub quantities: Quantities,
116}
117
118impl Counted for StoreTraceItem {
119    fn quantities(&self) -> Quantities {
120        self.quantities.clone()
121    }
122}
123
124/// Publishes a span item to the Sentry core application through Kafka.
125#[derive(Debug)]
126pub struct StoreSpanV2 {
127    /// Routing key to assign a Kafka partition.
128    pub routing_key: Option<Uuid>,
129    /// Default retention of the span.
130    pub retention_days: u16,
131    /// Downsampled retention of the span.
132    pub downsampled_retention_days: u16,
133    /// The final Sentry compatible span item.
134    pub item: SpanV2,
135}
136
137impl Counted for StoreSpanV2 {
138    fn quantities(&self) -> Quantities {
139        smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
140    }
141}
142
143/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
144pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
145
146/// Service interface for the [`StoreEnvelope`] message.
147#[derive(Debug)]
148pub enum Store {
149    /// An envelope containing a mixture of items.
150    ///
151    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
152    /// for example logs must be submitted via [`Self::TraceItem`] instead.
153    ///
154    /// Long term this variant is going to be replaced with fully typed variants of items which can
155    /// be stored instead.
156    Envelope(StoreEnvelope),
157    /// Aggregated generic metrics.
158    Metrics(StoreMetrics),
159    /// A singular [`TraceItem`].
160    TraceItem(Managed<StoreTraceItem>),
161    /// A singular Span.
162    Span(Managed<Box<StoreSpanV2>>),
163}
164
165impl Store {
166    /// Returns the name of the message variant.
167    fn variant(&self) -> &'static str {
168        match self {
169            Store::Envelope(_) => "envelope",
170            Store::Metrics(_) => "metrics",
171            Store::TraceItem(_) => "log",
172            Store::Span(_) => "span",
173        }
174    }
175}
176
177impl Interface for Store {}
178
179impl FromMessage<StoreEnvelope> for Store {
180    type Response = NoResponse;
181
182    fn from_message(message: StoreEnvelope, _: ()) -> Self {
183        Self::Envelope(message)
184    }
185}
186
187impl FromMessage<StoreMetrics> for Store {
188    type Response = NoResponse;
189
190    fn from_message(message: StoreMetrics, _: ()) -> Self {
191        Self::Metrics(message)
192    }
193}
194
195impl FromMessage<Managed<StoreTraceItem>> for Store {
196    type Response = NoResponse;
197
198    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
199        Self::TraceItem(message)
200    }
201}
202
203impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
204    type Response = NoResponse;
205
206    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
207        Self::Span(message)
208    }
209}
210
211/// Service implementing the [`Store`] interface.
212pub struct StoreService {
213    pool: StoreServicePool,
214    config: Arc<Config>,
215    global_config: GlobalConfigHandle,
216    outcome_aggregator: Addr<TrackOutcome>,
217    metric_outcomes: MetricOutcomes,
218    producer: Producer,
219}
220
221impl StoreService {
222    pub fn create(
223        pool: StoreServicePool,
224        config: Arc<Config>,
225        global_config: GlobalConfigHandle,
226        outcome_aggregator: Addr<TrackOutcome>,
227        metric_outcomes: MetricOutcomes,
228    ) -> anyhow::Result<Self> {
229        let producer = Producer::create(&config)?;
230        Ok(Self {
231            pool,
232            config,
233            global_config,
234            outcome_aggregator,
235            metric_outcomes,
236            producer,
237        })
238    }
239
240    fn handle_message(&self, message: Store) {
241        let ty = message.variant();
242        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
243            match message {
244                Store::Envelope(message) => self.handle_store_envelope(message),
245                Store::Metrics(message) => self.handle_store_metrics(message),
246                Store::TraceItem(message) => self.handle_store_trace_item(message),
247                Store::Span(message) => self.handle_store_span(message),
248            }
249        })
250    }
251
252    fn handle_store_envelope(&self, message: StoreEnvelope) {
253        let StoreEnvelope {
254            envelope: mut managed,
255        } = message;
256
257        let scoping = managed.scoping();
258
259        match self.store_envelope(&mut managed) {
260            Ok(()) => managed.accept(),
261            Err(error) => {
262                managed.reject(Outcome::Invalid(DiscardReason::Internal));
263                relay_log::error!(
264                    error = &error as &dyn Error,
265                    tags.project_key = %scoping.project_key,
266                    "failed to store envelope"
267                );
268            }
269        }
270    }
271
272    fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
273        let mut envelope = managed_envelope.take_envelope();
274        let received_at = managed_envelope.received_at();
275        let scoping = managed_envelope.scoping();
276
277        let retention = envelope.retention();
278        let downsampled_retention = envelope.downsampled_retention();
279
280        let event_id = envelope.event_id();
281        let event_item = envelope.as_mut().take_item_by(|item| {
282            matches!(
283                item.ty(),
284                ItemType::Event | ItemType::Transaction | ItemType::Security
285            )
286        });
287        let event_type = event_item.as_ref().map(|item| item.ty());
288
289        // Some error events like minidumps need all attachment chunks to be processed _before_
290        // the event payload on the consumer side. Transaction attachments do not require this ordering
291        // guarantee, so they do not have to go to the same topic as their event payload.
292        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
293            KafkaTopic::Transactions
294        } else if envelope.get_item_by(is_slow_item).is_some() {
295            KafkaTopic::Attachments
296        } else {
297            KafkaTopic::Events
298        };
299
300        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
301
302        let mut attachments = Vec::new();
303        let mut replay_event = None;
304        let mut replay_recording = None;
305
306        let options = &self.global_config.current().options;
307
308        // Whether Relay will submit the replay-event to snuba or not.
309        let replay_relay_snuba_publish_disabled =
310            utils::sample(options.replay_relay_snuba_publish_disabled_sample_rate).is_keep();
311
312        for item in envelope.items() {
313            let content_type = item.content_type();
314            match item.ty() {
315                ItemType::Attachment => {
316                    if let Some(attachment) = self.produce_attachment(
317                        event_id.ok_or(StoreError::NoEventId)?,
318                        scoping.project_id,
319                        item,
320                        send_individual_attachments,
321                    )? {
322                        attachments.push(attachment);
323                    }
324                }
325                ItemType::UserReport => {
326                    debug_assert!(event_topic == KafkaTopic::Attachments);
327                    self.produce_user_report(
328                        event_id.ok_or(StoreError::NoEventId)?,
329                        scoping.project_id,
330                        received_at,
331                        item,
332                    )?;
333                }
334                ItemType::UserReportV2 => {
335                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
336                    self.produce_user_report_v2(
337                        event_id.ok_or(StoreError::NoEventId)?,
338                        scoping.project_id,
339                        received_at,
340                        item,
341                        remote_addr,
342                    )?;
343                }
344                ItemType::Profile => self.produce_profile(
345                    scoping.organization_id,
346                    scoping.project_id,
347                    scoping.key_id,
348                    received_at,
349                    retention,
350                    item,
351                )?,
352                ItemType::ReplayVideo => {
353                    self.produce_replay_video(
354                        event_id,
355                        scoping,
356                        item.payload(),
357                        received_at,
358                        retention,
359                        replay_relay_snuba_publish_disabled,
360                    )?;
361                }
362                ItemType::ReplayRecording => {
363                    replay_recording = Some(item);
364                }
365                ItemType::ReplayEvent => {
366                    replay_event = Some(item);
367                    self.produce_replay_event(
368                        event_id.ok_or(StoreError::NoEventId)?,
369                        scoping.project_id,
370                        received_at,
371                        retention,
372                        &item.payload(),
373                        replay_relay_snuba_publish_disabled,
374                    )?;
375                }
376                ItemType::CheckIn => {
377                    let client = envelope.meta().client();
378                    self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
379                }
380                ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
381                    scoping,
382                    received_at,
383                    event_id,
384                    retention,
385                    downsampled_retention,
386                    item,
387                )?,
388                ty @ ItemType::Log => {
389                    debug_assert!(
390                        false,
391                        "received {ty} through an envelope, \
392                        this item must be submitted via a specific store message instead"
393                    );
394                    relay_log::error!(
395                        tags.project_key = %scoping.project_key,
396                        "StoreService received unsupported item type '{ty}' in envelope"
397                    );
398                }
399                ItemType::ProfileChunk => self.produce_profile_chunk(
400                    scoping.organization_id,
401                    scoping.project_id,
402                    received_at,
403                    retention,
404                    item,
405                )?,
406                other => {
407                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
408                    let item_types = envelope
409                        .items()
410                        .map(|item| item.ty().as_str())
411                        .collect::<Vec<_>>();
412                    let attachment_types = envelope
413                        .items()
414                        .map(|item| {
415                            item.attachment_type()
416                                .map(|t| t.to_string())
417                                .unwrap_or_default()
418                        })
419                        .collect::<Vec<_>>();
420
421                    relay_log::with_scope(
422                        |scope| {
423                            scope.set_extra("item_types", item_types.into());
424                            scope.set_extra("attachment_types", attachment_types.into());
425                            if other == &ItemType::FormData {
426                                let payload = item.payload();
427                                let form_data_keys = FormDataIter::new(&payload)
428                                    .map(|entry| entry.key())
429                                    .collect::<Vec<_>>();
430                                scope.set_extra("form_data_keys", form_data_keys.into());
431                            }
432                        },
433                        || {
434                            relay_log::error!(
435                                tags.project_key = %scoping.project_key,
436                                tags.event_type = event_type.unwrap_or("none"),
437                                "StoreService received unexpected item type: {other}"
438                            )
439                        },
440                    )
441                }
442            }
443        }
444
445        if let Some(recording) = replay_recording {
446            // If a recording item type was seen we produce it to Kafka with the replay-event
447            // payload (should it have been provided).
448            //
449            // The replay_video value is always specified as `None`. We do not allow separate
450            // item types for `ReplayVideo` events.
451            let replay_event = replay_event.map(|rv| rv.payload());
452            self.produce_replay_recording(
453                event_id,
454                scoping,
455                &recording.payload(),
456                replay_event.as_deref(),
457                None,
458                received_at,
459                retention,
460                replay_relay_snuba_publish_disabled,
461            )?;
462        }
463
464        if let Some(event_item) = event_item {
465            let event_id = event_id.ok_or(StoreError::NoEventId)?;
466            let project_id = scoping.project_id;
467            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
468
469            self.produce(
470                event_topic,
471                KafkaMessage::Event(EventKafkaMessage {
472                    payload: event_item.payload(),
473                    start_time: safe_timestamp(received_at),
474                    event_id,
475                    project_id,
476                    remote_addr,
477                    attachments,
478                }),
479            )?;
480        } else {
481            debug_assert!(attachments.is_empty());
482        }
483
484        Ok(())
485    }
486
487    fn handle_store_metrics(&self, message: StoreMetrics) {
488        let StoreMetrics {
489            buckets,
490            scoping,
491            retention,
492        } = message;
493
494        let batch_size = self.config.metrics_max_batch_size_bytes();
495        let mut error = None;
496
497        let global_config = self.global_config.current();
498        let mut encoder = BucketEncoder::new(&global_config);
499
500        let now = UnixTimestamp::now();
501        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
502
503        for mut bucket in buckets {
504            let namespace = encoder.prepare(&mut bucket);
505
506            if let Some(received_at) = bucket.metadata.received_at {
507                let delay = now.as_secs().saturating_sub(received_at.as_secs());
508                let (total, count, max) = delay_stats.get_mut(namespace);
509                *total += delay;
510                *count += 1;
511                *max = (*max).max(delay);
512            }
513
514            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
515            // each bucket separately, we only need to split buckets that exceed the size, but not
516            // batches.
517            for view in BucketsView::new(std::slice::from_ref(&bucket))
518                .by_size(batch_size)
519                .flatten()
520            {
521                let message = self.create_metric_message(
522                    scoping.organization_id,
523                    scoping.project_id,
524                    &mut encoder,
525                    namespace,
526                    &view,
527                    retention,
528                );
529
530                let result =
531                    message.and_then(|message| self.send_metric_message(namespace, message));
532
533                let outcome = match result {
534                    Ok(()) => Outcome::Accepted,
535                    Err(e) => {
536                        error.get_or_insert(e);
537                        Outcome::Invalid(DiscardReason::Internal)
538                    }
539                };
540
541                self.metric_outcomes.track(scoping, &[view], outcome);
542            }
543        }
544
545        if let Some(error) = error {
546            relay_log::error!(
547                error = &error as &dyn std::error::Error,
548                "failed to produce metric buckets: {error}"
549            );
550        }
551
552        for (namespace, (total, count, max)) in delay_stats {
553            if count == 0 {
554                continue;
555            }
556            metric!(
557                counter(RelayCounters::MetricDelaySum) += total,
558                namespace = namespace.as_str()
559            );
560            metric!(
561                counter(RelayCounters::MetricDelayCount) += count,
562                namespace = namespace.as_str()
563            );
564            metric!(
565                gauge(RelayGauges::MetricDelayMax) = max,
566                namespace = namespace.as_str()
567            );
568        }
569    }
570
571    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
572        let scoping = message.scoping();
573        let received_at = message.received_at();
574
575        let quantities = message.try_accept(|item| {
576            let item_type = item.trace_item.item_type();
577            let message = KafkaMessage::Item {
578                headers: BTreeMap::from([
579                    ("project_id".to_owned(), scoping.project_id.to_string()),
580                    ("item_type".to_owned(), (item_type as i32).to_string()),
581                ]),
582                message: item.trace_item,
583                item_type,
584            };
585
586            self.produce(KafkaTopic::Items, message)
587                .map(|()| item.quantities)
588        });
589
590        // Accepted outcomes when items have been successfully produced to rdkafka.
591        //
592        // This is only a temporary measure, long term these outcomes will be part of the trace
593        // item and emitted by Snuba to guarantee a delivery to storage.
594        if let Ok(quantities) = quantities {
595            for (category, quantity) in quantities {
596                self.outcome_aggregator.send(TrackOutcome {
597                    category,
598                    event_id: None,
599                    outcome: Outcome::Accepted,
600                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
601                    remote_addr: None,
602                    scoping,
603                    timestamp: received_at,
604                });
605            }
606        }
607    }
608
609    fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
610        let scoping = message.scoping();
611        let received_at = message.received_at();
612
613        let meta = SpanMeta {
614            organization_id: scoping.organization_id,
615            project_id: scoping.project_id,
616            key_id: scoping.key_id,
617            event_id: None,
618            retention_days: message.retention_days,
619            downsampled_retention_days: message.downsampled_retention_days,
620            received: datetime_to_timestamp(received_at),
621        };
622
623        let result = message.try_accept(|span| {
624            let item = Annotated::new(span.item);
625            let message = KafkaMessage::SpanV2 {
626                routing_key: span.routing_key,
627                headers: BTreeMap::from([(
628                    "project_id".to_owned(),
629                    scoping.project_id.to_string(),
630                )]),
631                message: SpanKafkaMessage {
632                    meta,
633                    span: SerializableAnnotated(&item),
634                },
635            };
636
637            self.produce(KafkaTopic::Spans, message)
638        });
639
640        match result {
641            Ok(()) => {
642                relay_statsd::metric!(
643                    counter(RelayCounters::SpanV2Produced) += 1,
644                    via = "processing"
645                );
646
647                // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
648                // or the segments consumer, depending on which will produce outcomes later.
649                self.outcome_aggregator.send(TrackOutcome {
650                    category: DataCategory::SpanIndexed,
651                    event_id: None,
652                    outcome: Outcome::Accepted,
653                    quantity: 1,
654                    remote_addr: None,
655                    scoping,
656                    timestamp: received_at,
657                });
658            }
659            Err(error) => {
660                relay_log::error!(
661                    error = &error as &dyn Error,
662                    tags.project_key = %scoping.project_key,
663                    "failed to store span"
664                );
665            }
666        }
667    }
668
669    fn create_metric_message<'a>(
670        &self,
671        organization_id: OrganizationId,
672        project_id: ProjectId,
673        encoder: &'a mut BucketEncoder,
674        namespace: MetricNamespace,
675        view: &BucketView<'a>,
676        retention_days: u16,
677    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
678        let value = match view.value() {
679            BucketViewValue::Counter(c) => MetricValue::Counter(c),
680            BucketViewValue::Distribution(data) => MetricValue::Distribution(
681                encoder
682                    .encode_distribution(namespace, data)
683                    .map_err(StoreError::EncodingFailed)?,
684            ),
685            BucketViewValue::Set(data) => MetricValue::Set(
686                encoder
687                    .encode_set(namespace, data)
688                    .map_err(StoreError::EncodingFailed)?,
689            ),
690            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
691        };
692
693        Ok(MetricKafkaMessage {
694            org_id: organization_id,
695            project_id,
696            name: view.name(),
697            value,
698            timestamp: view.timestamp(),
699            tags: view.tags(),
700            retention_days,
701            received_at: view.metadata().received_at,
702        })
703    }
704
705    fn produce(
706        &self,
707        topic: KafkaTopic,
708        // Takes message by value to ensure it is not being produced twice.
709        message: KafkaMessage,
710    ) -> Result<(), StoreError> {
711        relay_log::trace!("Sending kafka message of type {}", message.variant());
712
713        let topic_name = self.producer.client.send_message(topic, &message)?;
714
715        match &message {
716            KafkaMessage::Metric {
717                message: metric, ..
718            } => {
719                metric!(
720                    counter(RelayCounters::ProcessingMessageProduced) += 1,
721                    event_type = message.variant(),
722                    topic = topic_name,
723                    metric_type = metric.value.variant(),
724                    metric_encoding = metric.value.encoding().unwrap_or(""),
725                );
726            }
727            KafkaMessage::ReplayRecordingNotChunked(replay) => {
728                let has_video = replay.replay_video.is_some();
729
730                metric!(
731                    counter(RelayCounters::ProcessingMessageProduced) += 1,
732                    event_type = message.variant(),
733                    topic = topic_name,
734                    has_video = bool_to_str(has_video),
735                );
736            }
737            message => {
738                metric!(
739                    counter(RelayCounters::ProcessingMessageProduced) += 1,
740                    event_type = message.variant(),
741                    topic = topic_name,
742                );
743            }
744        }
745
746        Ok(())
747    }
748
749    /// Produces Kafka messages for the content and metadata of an attachment item.
750    ///
751    /// The `send_individual_attachments` controls whether the metadata of an attachment
752    /// is produced directly as an individual `attachment` message, or returned from this function
753    /// to be later sent as part of an `event` message.
754    ///
755    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
756    /// unless the `send_individual_attachments` flag is set, and the content is small enough
757    /// to fit inside a message.
758    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
759    /// of the `attachment` message instead.
760    fn produce_attachment(
761        &self,
762        event_id: EventId,
763        project_id: ProjectId,
764        item: &Item,
765        send_individual_attachments: bool,
766    ) -> Result<Option<ChunkedAttachment>, StoreError> {
767        let id = Uuid::new_v4().to_string();
768
769        let payload = item.payload();
770        let size = item.len();
771        let max_chunk_size = self.config.attachment_chunk_size();
772
773        let payload = if size == 0 {
774            AttachmentPayload::Chunked(0)
775        } else if let Some(stored_key) = item.stored_key() {
776            AttachmentPayload::Stored(stored_key.into())
777        } else if send_individual_attachments && size < max_chunk_size {
778            // When sending individual attachments, and we have a single chunk, we want to send the
779            // `data` inline in the `attachment` message.
780            // This avoids a needless roundtrip through the attachments cache on the Sentry side.
781            AttachmentPayload::Inline(payload)
782        } else {
783            let mut chunk_index = 0;
784            let mut offset = 0;
785            // This skips chunks for empty attachments. The consumer does not require chunks for
786            // empty attachments. `chunks` will be `0` in this case.
787            while offset < size {
788                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
789                let chunk_message = AttachmentChunkKafkaMessage {
790                    payload: payload.slice(offset..offset + chunk_size),
791                    event_id,
792                    project_id,
793                    id: id.clone(),
794                    chunk_index,
795                };
796
797                self.produce(
798                    KafkaTopic::Attachments,
799                    KafkaMessage::AttachmentChunk(chunk_message),
800                )?;
801                offset += chunk_size;
802                chunk_index += 1;
803            }
804
805            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
806            // is one larger than the last chunk, so it is equal to the number of chunks.
807            AttachmentPayload::Chunked(chunk_index)
808        };
809
810        let attachment = ChunkedAttachment {
811            id,
812            name: match item.filename() {
813                Some(name) => name.to_owned(),
814                None => UNNAMED_ATTACHMENT.to_owned(),
815            },
816            rate_limited: item.rate_limited(),
817            content_type: item
818                .content_type()
819                .map(|content_type| content_type.as_str().to_owned()),
820            attachment_type: item.attachment_type().cloned().unwrap_or_default(),
821            size,
822            payload,
823        };
824
825        if send_individual_attachments {
826            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
827                event_id,
828                project_id,
829                attachment,
830            });
831            self.produce(KafkaTopic::Attachments, message)?;
832            Ok(None)
833        } else {
834            Ok(Some(attachment))
835        }
836    }
837
838    fn produce_user_report(
839        &self,
840        event_id: EventId,
841        project_id: ProjectId,
842        received_at: DateTime<Utc>,
843        item: &Item,
844    ) -> Result<(), StoreError> {
845        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
846            project_id,
847            event_id,
848            start_time: safe_timestamp(received_at),
849            payload: item.payload(),
850        });
851
852        self.produce(KafkaTopic::Attachments, message)
853    }
854
855    fn produce_user_report_v2(
856        &self,
857        event_id: EventId,
858        project_id: ProjectId,
859        received_at: DateTime<Utc>,
860        item: &Item,
861        remote_addr: Option<String>,
862    ) -> Result<(), StoreError> {
863        let message = KafkaMessage::Event(EventKafkaMessage {
864            project_id,
865            event_id,
866            payload: item.payload(),
867            start_time: safe_timestamp(received_at),
868            remote_addr,
869            attachments: vec![],
870        });
871        self.produce(KafkaTopic::Feedback, message)
872    }
873
874    fn send_metric_message(
875        &self,
876        namespace: MetricNamespace,
877        message: MetricKafkaMessage,
878    ) -> Result<(), StoreError> {
879        let topic = match namespace {
880            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
881            MetricNamespace::Unsupported => {
882                relay_log::with_scope(
883                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
884                    || relay_log::error!("store service dropping unknown metric usecase"),
885                );
886                return Ok(());
887            }
888            _ => KafkaTopic::MetricsGeneric,
889        };
890        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
891
892        self.produce(topic, KafkaMessage::Metric { headers, message })?;
893        Ok(())
894    }
895
896    fn produce_profile(
897        &self,
898        organization_id: OrganizationId,
899        project_id: ProjectId,
900        key_id: Option<u64>,
901        received_at: DateTime<Utc>,
902        retention_days: u16,
903        item: &Item,
904    ) -> Result<(), StoreError> {
905        let message = ProfileKafkaMessage {
906            organization_id,
907            project_id,
908            key_id,
909            received: safe_timestamp(received_at),
910            retention_days,
911            headers: BTreeMap::from([(
912                "sampled".to_owned(),
913                if item.sampled() { "true" } else { "false" }.to_owned(),
914            )]),
915            payload: item.payload(),
916        };
917        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
918        Ok(())
919    }
920
921    fn produce_replay_event(
922        &self,
923        replay_id: EventId,
924        project_id: ProjectId,
925        received_at: DateTime<Utc>,
926        retention_days: u16,
927        payload: &[u8],
928        relay_snuba_publish_disabled: bool,
929    ) -> Result<(), StoreError> {
930        if relay_snuba_publish_disabled {
931            return Ok(());
932        }
933
934        let message = ReplayEventKafkaMessage {
935            replay_id,
936            project_id,
937            retention_days,
938            start_time: safe_timestamp(received_at),
939            payload,
940        };
941        self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
942        Ok(())
943    }
944
945    #[allow(clippy::too_many_arguments)]
946    fn produce_replay_recording(
947        &self,
948        event_id: Option<EventId>,
949        scoping: Scoping,
950        payload: &[u8],
951        replay_event: Option<&[u8]>,
952        replay_video: Option<&[u8]>,
953        received_at: DateTime<Utc>,
954        retention: u16,
955        relay_snuba_publish_disabled: bool,
956    ) -> Result<(), StoreError> {
957        // Maximum number of bytes accepted by the consumer.
958        let max_payload_size = self.config.max_replay_message_size();
959
960        // Size of the consumer message. We can be reasonably sure this won't overflow because
961        // of the request size validation provided by Nginx and Relay.
962        let mut payload_size = 2000; // Reserve 2KB for the message metadata.
963        payload_size += replay_event.as_ref().map_or(0, |b| b.len());
964        payload_size += replay_video.as_ref().map_or(0, |b| b.len());
965        payload_size += payload.len();
966
967        // If the recording payload can not fit in to the message do not produce and quit early.
968        if payload_size >= max_payload_size {
969            relay_log::debug!("replay_recording over maximum size.");
970            self.outcome_aggregator.send(TrackOutcome {
971                category: DataCategory::Replay,
972                event_id,
973                outcome: Outcome::Invalid(DiscardReason::TooLarge(
974                    DiscardItemType::ReplayRecording,
975                )),
976                quantity: 1,
977                remote_addr: None,
978                scoping,
979                timestamp: received_at,
980            });
981            return Ok(());
982        }
983
984        let message =
985            KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
986                replay_id: event_id.ok_or(StoreError::NoEventId)?,
987                project_id: scoping.project_id,
988                key_id: scoping.key_id,
989                org_id: scoping.organization_id,
990                received: safe_timestamp(received_at),
991                retention_days: retention,
992                payload,
993                replay_event,
994                replay_video,
995                relay_snuba_publish_disabled,
996            });
997
998        self.produce(KafkaTopic::ReplayRecordings, message)?;
999
1000        Ok(())
1001    }
1002
1003    fn produce_replay_video(
1004        &self,
1005        event_id: Option<EventId>,
1006        scoping: Scoping,
1007        payload: Bytes,
1008        received_at: DateTime<Utc>,
1009        retention: u16,
1010        relay_snuba_publish_disabled: bool,
1011    ) -> Result<(), StoreError> {
1012        #[derive(Deserialize)]
1013        struct VideoEvent<'a> {
1014            replay_event: &'a [u8],
1015            replay_recording: &'a [u8],
1016            replay_video: &'a [u8],
1017        }
1018
1019        let Ok(VideoEvent {
1020            replay_video,
1021            replay_event,
1022            replay_recording,
1023        }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1024        else {
1025            self.outcome_aggregator.send(TrackOutcome {
1026                category: DataCategory::Replay,
1027                event_id,
1028                outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1029                quantity: 1,
1030                remote_addr: None,
1031                scoping,
1032                timestamp: received_at,
1033            });
1034            return Ok(());
1035        };
1036
1037        self.produce_replay_event(
1038            event_id.ok_or(StoreError::NoEventId)?,
1039            scoping.project_id,
1040            received_at,
1041            retention,
1042            replay_event,
1043            relay_snuba_publish_disabled,
1044        )?;
1045
1046        self.produce_replay_recording(
1047            event_id,
1048            scoping,
1049            replay_recording,
1050            Some(replay_event),
1051            Some(replay_video),
1052            received_at,
1053            retention,
1054            relay_snuba_publish_disabled,
1055        )
1056    }
1057
1058    fn produce_check_in(
1059        &self,
1060        project_id: ProjectId,
1061        received_at: DateTime<Utc>,
1062        client: Option<&str>,
1063        retention_days: u16,
1064        item: &Item,
1065    ) -> Result<(), StoreError> {
1066        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1067            message_type: CheckInMessageType::CheckIn,
1068            project_id,
1069            retention_days,
1070            start_time: safe_timestamp(received_at),
1071            sdk: client.map(str::to_owned),
1072            payload: item.payload(),
1073            routing_key_hint: item.routing_hint(),
1074        });
1075
1076        self.produce(KafkaTopic::Monitors, message)?;
1077
1078        Ok(())
1079    }
1080
1081    fn produce_span(
1082        &self,
1083        scoping: Scoping,
1084        received_at: DateTime<Utc>,
1085        event_id: Option<EventId>,
1086        retention_days: u16,
1087        downsampled_retention_days: u16,
1088        item: &Item,
1089    ) -> Result<(), StoreError> {
1090        debug_assert_eq!(item.ty(), &ItemType::Span);
1091        debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1092
1093        let Scoping {
1094            organization_id,
1095            project_id,
1096            project_key: _,
1097            key_id,
1098        } = scoping;
1099
1100        let payload = item.payload();
1101        let message = SpanKafkaMessageRaw {
1102            meta: SpanMeta {
1103                organization_id,
1104                project_id,
1105                key_id,
1106                event_id,
1107                retention_days,
1108                downsampled_retention_days,
1109                received: datetime_to_timestamp(received_at),
1110            },
1111            span: serde_json::from_slice(&payload)
1112                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1113        };
1114
1115        // Verify that this is a V2 span:
1116        debug_assert!(message.span.contains_key("attributes"));
1117        relay_statsd::metric!(
1118            counter(RelayCounters::SpanV2Produced) += 1,
1119            via = "envelope"
1120        );
1121
1122        self.produce(
1123            KafkaTopic::Spans,
1124            KafkaMessage::SpanRaw {
1125                routing_key: item.routing_hint(),
1126                headers: BTreeMap::from([(
1127                    "project_id".to_owned(),
1128                    scoping.project_id.to_string(),
1129                )]),
1130                message,
1131            },
1132        )?;
1133
1134        // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1135        // or the segments consumer, depending on which will produce outcomes later.
1136        self.outcome_aggregator.send(TrackOutcome {
1137            category: DataCategory::SpanIndexed,
1138            event_id: None,
1139            outcome: Outcome::Accepted,
1140            quantity: 1,
1141            remote_addr: None,
1142            scoping,
1143            timestamp: received_at,
1144        });
1145
1146        Ok(())
1147    }
1148
1149    fn produce_profile_chunk(
1150        &self,
1151        organization_id: OrganizationId,
1152        project_id: ProjectId,
1153        received_at: DateTime<Utc>,
1154        retention_days: u16,
1155        item: &Item,
1156    ) -> Result<(), StoreError> {
1157        let message = ProfileChunkKafkaMessage {
1158            organization_id,
1159            project_id,
1160            received: safe_timestamp(received_at),
1161            retention_days,
1162            payload: item.payload(),
1163        };
1164        self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1165        Ok(())
1166    }
1167}
1168
1169impl Service for StoreService {
1170    type Interface = Store;
1171
1172    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1173        let this = Arc::new(self);
1174
1175        relay_log::info!("store forwarder started");
1176
1177        while let Some(message) = rx.recv().await {
1178            let service = Arc::clone(&this);
1179            // For now, we run each task synchronously, in the future we might explore how to make
1180            // the store async.
1181            this.pool
1182                .spawn_async(async move { service.handle_message(message) }.boxed())
1183                .await;
1184        }
1185
1186        relay_log::info!("store forwarder stopped");
1187    }
1188}
1189
1190/// This signifies how the attachment payload is being transfered.
1191#[derive(Debug, Serialize)]
1192enum AttachmentPayload {
1193    /// The payload has been split into multiple chunks.
1194    ///
1195    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1196    /// If the payload `size == 0`, the number of chunks will also be `0`.
1197    #[serde(rename = "chunks")]
1198    Chunked(usize),
1199
1200    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1201    #[serde(rename = "data")]
1202    Inline(Bytes),
1203
1204    /// The attachment has already been stored into the objectstore, with the given Id.
1205    #[serde(rename = "stored_id")]
1206    Stored(String),
1207}
1208
1209/// Common attributes for both standalone attachments and processing-relevant attachments.
1210#[derive(Debug, Serialize)]
1211struct ChunkedAttachment {
1212    /// The attachment ID within the event.
1213    ///
1214    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1215    id: String,
1216
1217    /// File name of the attachment file.
1218    name: String,
1219
1220    /// Whether this attachment was rate limited and should be removed after processing.
1221    ///
1222    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1223    /// native crash reports still need to be retained. These attachments are marked with the
1224    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1225    /// not be persisted after processing.
1226    rate_limited: bool,
1227
1228    /// Content type of the attachment payload.
1229    #[serde(skip_serializing_if = "Option::is_none")]
1230    content_type: Option<String>,
1231
1232    /// The Sentry-internal attachment type used in the processing pipeline.
1233    #[serde(serialize_with = "serialize_attachment_type")]
1234    attachment_type: AttachmentType,
1235
1236    /// The size of the attachment in bytes.
1237    size: usize,
1238
1239    /// The attachment payload, chunked, inlined, or already stored.
1240    #[serde(flatten)]
1241    payload: AttachmentPayload,
1242}
1243
1244/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1245///
1246/// Cannot serialize bytes.
1247///
1248/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1249fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1250where
1251    S: serde::Serializer,
1252    T: serde::Serialize,
1253{
1254    serde_json::to_value(t)
1255        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1256        .serialize(serializer)
1257}
1258
1259/// Container payload for event messages.
1260#[derive(Debug, Serialize)]
1261struct EventKafkaMessage {
1262    /// Raw event payload.
1263    payload: Bytes,
1264    /// Time at which the event was received by Relay.
1265    start_time: u64,
1266    /// The event id.
1267    event_id: EventId,
1268    /// The project id for the current event.
1269    project_id: ProjectId,
1270    /// The client ip address.
1271    remote_addr: Option<String>,
1272    /// Attachments that are potentially relevant for processing.
1273    attachments: Vec<ChunkedAttachment>,
1274}
1275
1276#[derive(Debug, Serialize)]
1277struct ReplayEventKafkaMessage<'a> {
1278    /// Raw event payload.
1279    payload: &'a [u8],
1280    /// Time at which the event was received by Relay.
1281    start_time: u64,
1282    /// The event id.
1283    replay_id: EventId,
1284    /// The project id for the current event.
1285    project_id: ProjectId,
1286    // Number of days to retain.
1287    retention_days: u16,
1288}
1289
1290/// Container payload for chunks of attachments.
1291#[derive(Debug, Serialize)]
1292struct AttachmentChunkKafkaMessage {
1293    /// Chunk payload of the attachment.
1294    payload: Bytes,
1295    /// The event id.
1296    event_id: EventId,
1297    /// The project id for the current event.
1298    project_id: ProjectId,
1299    /// The attachment ID within the event.
1300    ///
1301    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1302    id: String,
1303    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1304    chunk_index: usize,
1305}
1306
1307/// A "standalone" attachment.
1308///
1309/// Still belongs to an event but can be sent independently (like UserReport) and is not
1310/// considered in processing.
1311#[derive(Debug, Serialize)]
1312struct AttachmentKafkaMessage {
1313    /// The event id.
1314    event_id: EventId,
1315    /// The project id for the current event.
1316    project_id: ProjectId,
1317    /// The attachment.
1318    attachment: ChunkedAttachment,
1319}
1320
1321#[derive(Debug, Serialize)]
1322struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1323    replay_id: EventId,
1324    key_id: Option<u64>,
1325    org_id: OrganizationId,
1326    project_id: ProjectId,
1327    received: u64,
1328    retention_days: u16,
1329    #[serde(with = "serde_bytes")]
1330    payload: &'a [u8],
1331    #[serde(with = "serde_bytes")]
1332    replay_event: Option<&'a [u8]>,
1333    #[serde(with = "serde_bytes")]
1334    replay_video: Option<&'a [u8]>,
1335    relay_snuba_publish_disabled: bool,
1336}
1337
1338/// User report for an event wrapped up in a message ready for consumption in Kafka.
1339///
1340/// Is always independent of an event and can be sent as part of any envelope.
1341#[derive(Debug, Serialize)]
1342struct UserReportKafkaMessage {
1343    /// The project id for the current event.
1344    project_id: ProjectId,
1345    start_time: u64,
1346    payload: Bytes,
1347
1348    // Used for KafkaMessage::key
1349    #[serde(skip)]
1350    event_id: EventId,
1351}
1352
1353#[derive(Clone, Debug, Serialize)]
1354struct MetricKafkaMessage<'a> {
1355    org_id: OrganizationId,
1356    project_id: ProjectId,
1357    name: &'a MetricName,
1358    #[serde(flatten)]
1359    value: MetricValue<'a>,
1360    timestamp: UnixTimestamp,
1361    tags: &'a BTreeMap<String, String>,
1362    retention_days: u16,
1363    #[serde(skip_serializing_if = "Option::is_none")]
1364    received_at: Option<UnixTimestamp>,
1365}
1366
1367#[derive(Clone, Debug, Serialize)]
1368#[serde(tag = "type", content = "value")]
1369enum MetricValue<'a> {
1370    #[serde(rename = "c")]
1371    Counter(FiniteF64),
1372    #[serde(rename = "d")]
1373    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1374    #[serde(rename = "s")]
1375    Set(ArrayEncoding<'a, SetView<'a>>),
1376    #[serde(rename = "g")]
1377    Gauge(GaugeValue),
1378}
1379
1380impl MetricValue<'_> {
1381    fn variant(&self) -> &'static str {
1382        match self {
1383            Self::Counter(_) => "counter",
1384            Self::Distribution(_) => "distribution",
1385            Self::Set(_) => "set",
1386            Self::Gauge(_) => "gauge",
1387        }
1388    }
1389
1390    fn encoding(&self) -> Option<&'static str> {
1391        match self {
1392            Self::Distribution(ae) => Some(ae.name()),
1393            Self::Set(ae) => Some(ae.name()),
1394            _ => None,
1395        }
1396    }
1397}
1398
1399#[derive(Clone, Debug, Serialize)]
1400struct ProfileKafkaMessage {
1401    organization_id: OrganizationId,
1402    project_id: ProjectId,
1403    key_id: Option<u64>,
1404    received: u64,
1405    retention_days: u16,
1406    #[serde(skip)]
1407    headers: BTreeMap<String, String>,
1408    payload: Bytes,
1409}
1410
1411/// Used to discriminate cron monitor ingestion messages.
1412///
1413/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1414/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1415/// intended to ensure the clock continues to run even when ingestion volume drops.
1416#[allow(dead_code)]
1417#[derive(Debug, Serialize)]
1418#[serde(rename_all = "snake_case")]
1419enum CheckInMessageType {
1420    ClockPulse,
1421    CheckIn,
1422}
1423
1424#[derive(Debug, Serialize)]
1425struct CheckInKafkaMessage {
1426    #[serde(skip)]
1427    routing_key_hint: Option<Uuid>,
1428
1429    /// Used by the consumer to discrinminate the message.
1430    message_type: CheckInMessageType,
1431    /// Raw event payload.
1432    payload: Bytes,
1433    /// Time at which the event was received by Relay.
1434    start_time: u64,
1435    /// The SDK client which produced the event.
1436    sdk: Option<String>,
1437    /// The project id for the current event.
1438    project_id: ProjectId,
1439    /// Number of days to retain.
1440    retention_days: u16,
1441}
1442
1443#[derive(Debug, Serialize)]
1444struct SpanKafkaMessageRaw<'a> {
1445    #[serde(flatten)]
1446    meta: SpanMeta,
1447    #[serde(flatten)]
1448    span: BTreeMap<&'a str, &'a RawValue>,
1449}
1450
1451#[derive(Debug, Serialize)]
1452struct SpanKafkaMessage<'a> {
1453    #[serde(flatten)]
1454    meta: SpanMeta,
1455    #[serde(flatten)]
1456    span: SerializableAnnotated<'a, SpanV2>,
1457}
1458
1459#[derive(Debug, Serialize)]
1460struct SpanMeta {
1461    organization_id: OrganizationId,
1462    project_id: ProjectId,
1463    // Required for the buffer to emit outcomes scoped to the DSN.
1464    #[serde(skip_serializing_if = "Option::is_none")]
1465    key_id: Option<u64>,
1466    #[serde(skip_serializing_if = "Option::is_none")]
1467    event_id: Option<EventId>,
1468    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1469    received: f64,
1470    /// Number of days until these data should be deleted.
1471    retention_days: u16,
1472    /// Number of days until the downsampled version of this data should be deleted.
1473    downsampled_retention_days: u16,
1474}
1475
1476#[derive(Clone, Debug, Serialize)]
1477struct ProfileChunkKafkaMessage {
1478    organization_id: OrganizationId,
1479    project_id: ProjectId,
1480    received: u64,
1481    retention_days: u16,
1482    payload: Bytes,
1483}
1484
1485/// An enum over all possible ingest messages.
1486#[derive(Debug, Serialize)]
1487#[serde(tag = "type", rename_all = "snake_case")]
1488#[allow(clippy::large_enum_variant)]
1489enum KafkaMessage<'a> {
1490    Event(EventKafkaMessage),
1491    UserReport(UserReportKafkaMessage),
1492    Metric {
1493        #[serde(skip)]
1494        headers: BTreeMap<String, String>,
1495        #[serde(flatten)]
1496        message: MetricKafkaMessage<'a>,
1497    },
1498    CheckIn(CheckInKafkaMessage),
1499    Item {
1500        #[serde(skip)]
1501        headers: BTreeMap<String, String>,
1502        #[serde(skip)]
1503        item_type: TraceItemType,
1504        #[serde(skip)]
1505        message: TraceItem,
1506    },
1507    SpanRaw {
1508        #[serde(skip)]
1509        routing_key: Option<Uuid>,
1510        #[serde(skip)]
1511        headers: BTreeMap<String, String>,
1512        #[serde(flatten)]
1513        message: SpanKafkaMessageRaw<'a>,
1514    },
1515    SpanV2 {
1516        #[serde(skip)]
1517        routing_key: Option<Uuid>,
1518        #[serde(skip)]
1519        headers: BTreeMap<String, String>,
1520        #[serde(flatten)]
1521        message: SpanKafkaMessage<'a>,
1522    },
1523
1524    Attachment(AttachmentKafkaMessage),
1525    AttachmentChunk(AttachmentChunkKafkaMessage),
1526
1527    Profile(ProfileKafkaMessage),
1528    ProfileChunk(ProfileChunkKafkaMessage),
1529
1530    ReplayEvent(ReplayEventKafkaMessage<'a>),
1531    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1532}
1533
1534impl Message for KafkaMessage<'_> {
1535    fn variant(&self) -> &'static str {
1536        match self {
1537            KafkaMessage::Event(_) => "event",
1538            KafkaMessage::UserReport(_) => "user_report",
1539            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1540                MetricNamespace::Sessions => "metric_sessions",
1541                MetricNamespace::Transactions => "metric_transactions",
1542                MetricNamespace::Spans => "metric_spans",
1543                MetricNamespace::Custom => "metric_custom",
1544                MetricNamespace::Unsupported => "metric_unsupported",
1545            },
1546            KafkaMessage::CheckIn(_) => "check_in",
1547            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1548            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1549
1550            KafkaMessage::Attachment(_) => "attachment",
1551            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1552
1553            KafkaMessage::Profile(_) => "profile",
1554            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1555
1556            KafkaMessage::ReplayEvent(_) => "replay_event",
1557            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1558        }
1559    }
1560
1561    /// Returns the partitioning key for this Kafka message determining.
1562    fn key(&self) -> Option<relay_kafka::Key> {
1563        match self {
1564            Self::Event(message) => Some(message.event_id.0),
1565            Self::UserReport(message) => Some(message.event_id.0),
1566            Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1567
1568            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1569            //
1570            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1571            // recieve the routing_key_hint form their envelopes.
1572            Self::CheckIn(message) => message.routing_key_hint,
1573
1574            Self::Attachment(message) => Some(message.event_id.0),
1575            Self::AttachmentChunk(message) => Some(message.event_id.0),
1576            Self::ReplayEvent(message) => Some(message.replay_id.0),
1577
1578            // Random partitioning
1579            Self::Metric { .. }
1580            | Self::Item { .. }
1581            | Self::Profile(_)
1582            | Self::ProfileChunk(_)
1583            | Self::ReplayRecordingNotChunked(_) => None,
1584        }
1585        .filter(|uuid| !uuid.is_nil())
1586        .map(|uuid| uuid.as_u128())
1587    }
1588
1589    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1590        match &self {
1591            KafkaMessage::Metric { headers, .. }
1592            | KafkaMessage::SpanRaw { headers, .. }
1593            | KafkaMessage::SpanV2 { headers, .. }
1594            | KafkaMessage::Item { headers, .. }
1595            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers),
1596
1597            KafkaMessage::Event(_)
1598            | KafkaMessage::UserReport(_)
1599            | KafkaMessage::CheckIn(_)
1600            | KafkaMessage::Attachment(_)
1601            | KafkaMessage::AttachmentChunk(_)
1602            | KafkaMessage::ProfileChunk(_)
1603            | KafkaMessage::ReplayEvent(_)
1604            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1605        }
1606    }
1607
1608    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1609        match self {
1610            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1611            KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1612            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1613            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1614            KafkaMessage::Item { message, .. } => {
1615                let mut payload = Vec::new();
1616                match message.encode(&mut payload) {
1617                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1618                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1619                }
1620            }
1621            KafkaMessage::Event(_)
1622            | KafkaMessage::UserReport(_)
1623            | KafkaMessage::CheckIn(_)
1624            | KafkaMessage::Attachment(_)
1625            | KafkaMessage::AttachmentChunk(_)
1626            | KafkaMessage::Profile(_)
1627            | KafkaMessage::ProfileChunk(_)
1628            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1629                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1630                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1631            },
1632        }
1633    }
1634}
1635
1636fn serialize_as_json<T: serde::Serialize>(
1637    value: &T,
1638) -> Result<SerializationOutput<'_>, ClientError> {
1639    match serde_json::to_vec(value) {
1640        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1641        Err(err) => Err(ClientError::InvalidJson(err)),
1642    }
1643}
1644
1645/// Determines if the given item is considered slow.
1646///
1647/// Slow items must be routed to the `Attachments` topic.
1648fn is_slow_item(item: &Item) -> bool {
1649    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1650}
1651
1652fn bool_to_str(value: bool) -> &'static str {
1653    if value { "true" } else { "false" }
1654}
1655
1656/// Returns a safe timestamp for Kafka.
1657///
1658/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1659fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1660    let ts = timestamp.timestamp();
1661    if ts >= 0 {
1662        return ts as u64;
1663    }
1664
1665    // We assume this call can't return < 0.
1666    Utc::now().timestamp() as u64
1667}
1668
1669#[cfg(test)]
1670mod tests {
1671
1672    use super::*;
1673
1674    #[test]
1675    fn disallow_outcomes() {
1676        let config = Config::default();
1677        let producer = Producer::create(&config).unwrap();
1678
1679        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1680            let res = producer
1681                .client
1682                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1683
1684            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1685        }
1686    }
1687}