relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46mod sessions;
47
48/// Fallback name used for attachment items without a `filename` header.
49const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
50
51#[derive(Debug, thiserror::Error)]
52pub enum StoreError {
53    #[error("failed to send the message to kafka: {0}")]
54    SendFailed(#[from] ClientError),
55    #[error("failed to encode data: {0}")]
56    EncodingFailed(std::io::Error),
57    #[error("failed to store event because event id was missing")]
58    NoEventId,
59}
60
61impl OutcomeError for StoreError {
62    type Error = Self;
63
64    fn consume(self) -> (Option<Outcome>, Self::Error) {
65        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
66    }
67}
68
69struct Producer {
70    client: KafkaClient,
71}
72
73impl Producer {
74    pub fn create(config: &Config) -> anyhow::Result<Self> {
75        let mut client_builder = KafkaClient::builder();
76
77        for topic in KafkaTopic::iter().filter(|t| {
78            // Outcomes should not be sent from the store forwarder.
79            // See `KafkaOutcomesProducer`.
80            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
81        }) {
82            let kafka_configs = config.kafka_configs(*topic)?;
83            client_builder = client_builder
84                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
85                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
86        }
87
88        Ok(Self {
89            client: client_builder.build(),
90        })
91    }
92}
93
94/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
95#[derive(Debug)]
96pub struct StoreEnvelope {
97    pub envelope: TypedEnvelope<Processed>,
98}
99
100/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
101#[derive(Clone, Debug)]
102pub struct StoreMetrics {
103    pub buckets: Vec<Bucket>,
104    pub scoping: Scoping,
105    pub retention: u16,
106}
107
108/// Publishes a log item to the Sentry core application through Kafka.
109#[derive(Debug)]
110pub struct StoreTraceItem {
111    /// The final trace item which will be produced to Kafka.
112    pub trace_item: TraceItem,
113    /// Outcomes to be emitted when successfully producing the item to Kafka.
114    ///
115    /// Note: this is only a temporary measure, long term these outcomes will be part of the trace
116    /// item and emitted by Snuba to guarantee a delivery to storage.
117    pub quantities: Quantities,
118}
119
120impl Counted for StoreTraceItem {
121    fn quantities(&self) -> Quantities {
122        self.quantities.clone()
123    }
124}
125
126/// Publishes a span item to the Sentry core application through Kafka.
127#[derive(Debug)]
128pub struct StoreSpanV2 {
129    /// Routing key to assign a Kafka partition.
130    pub routing_key: Option<Uuid>,
131    /// Default retention of the span.
132    pub retention_days: u16,
133    /// Downsampled retention of the span.
134    pub downsampled_retention_days: u16,
135    /// The final Sentry compatible span item.
136    pub item: SpanV2,
137}
138
139impl Counted for StoreSpanV2 {
140    fn quantities(&self) -> Quantities {
141        smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
142    }
143}
144
145/// Publishes a singular profile chunk to Kafka.
146#[derive(Debug)]
147pub struct StoreProfileChunk {
148    /// Default retention of the span.
149    pub retention_days: u16,
150    /// The serialized profile chunk payload.
151    pub payload: Bytes,
152    /// Outcome quantities associated with this profile.
153    ///
154    /// Quantities are different for backend and ui profile chunks.
155    pub quantities: Quantities,
156}
157
158impl Counted for StoreProfileChunk {
159    fn quantities(&self) -> Quantities {
160        self.quantities.clone()
161    }
162}
163
164/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
165pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
166
167/// Service interface for the [`StoreEnvelope`] message.
168#[derive(Debug)]
169pub enum Store {
170    /// An envelope containing a mixture of items.
171    ///
172    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
173    /// for example logs must be submitted via [`Self::TraceItem`] instead.
174    ///
175    /// Long term this variant is going to be replaced with fully typed variants of items which can
176    /// be stored instead.
177    Envelope(StoreEnvelope),
178    /// Aggregated generic metrics.
179    Metrics(StoreMetrics),
180    /// A singular [`TraceItem`].
181    TraceItem(Managed<StoreTraceItem>),
182    /// A singular Span.
183    Span(Managed<Box<StoreSpanV2>>),
184    /// A singular profile chunk.
185    ProfileChunk(Managed<StoreProfileChunk>),
186}
187
188impl Store {
189    /// Returns the name of the message variant.
190    fn variant(&self) -> &'static str {
191        match self {
192            Store::Envelope(_) => "envelope",
193            Store::Metrics(_) => "metrics",
194            Store::TraceItem(_) => "trace_item",
195            Store::Span(_) => "span",
196            Store::ProfileChunk(_) => "profile_chunk",
197        }
198    }
199}
200
201impl Interface for Store {}
202
203impl FromMessage<StoreEnvelope> for Store {
204    type Response = NoResponse;
205
206    fn from_message(message: StoreEnvelope, _: ()) -> Self {
207        Self::Envelope(message)
208    }
209}
210
211impl FromMessage<StoreMetrics> for Store {
212    type Response = NoResponse;
213
214    fn from_message(message: StoreMetrics, _: ()) -> Self {
215        Self::Metrics(message)
216    }
217}
218
219impl FromMessage<Managed<StoreTraceItem>> for Store {
220    type Response = NoResponse;
221
222    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
223        Self::TraceItem(message)
224    }
225}
226
227impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
228    type Response = NoResponse;
229
230    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
231        Self::Span(message)
232    }
233}
234
235impl FromMessage<Managed<StoreProfileChunk>> for Store {
236    type Response = NoResponse;
237
238    fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
239        Self::ProfileChunk(message)
240    }
241}
242
243/// Service implementing the [`Store`] interface.
244pub struct StoreService {
245    pool: StoreServicePool,
246    config: Arc<Config>,
247    global_config: GlobalConfigHandle,
248    outcome_aggregator: Addr<TrackOutcome>,
249    metric_outcomes: MetricOutcomes,
250    producer: Producer,
251}
252
253impl StoreService {
254    pub fn create(
255        pool: StoreServicePool,
256        config: Arc<Config>,
257        global_config: GlobalConfigHandle,
258        outcome_aggregator: Addr<TrackOutcome>,
259        metric_outcomes: MetricOutcomes,
260    ) -> anyhow::Result<Self> {
261        let producer = Producer::create(&config)?;
262        Ok(Self {
263            pool,
264            config,
265            global_config,
266            outcome_aggregator,
267            metric_outcomes,
268            producer,
269        })
270    }
271
272    fn handle_message(&self, message: Store) {
273        let ty = message.variant();
274        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
275            match message {
276                Store::Envelope(message) => self.handle_store_envelope(message),
277                Store::Metrics(message) => self.handle_store_metrics(message),
278                Store::TraceItem(message) => self.handle_store_trace_item(message),
279                Store::Span(message) => self.handle_store_span(message),
280                Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
281            }
282        })
283    }
284
285    fn handle_store_envelope(&self, message: StoreEnvelope) {
286        let StoreEnvelope { mut envelope } = message;
287
288        let scoping = envelope.scoping();
289        match self.store_envelope(&mut envelope) {
290            Ok(()) => envelope.accept(),
291            Err(error) => {
292                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
293                relay_log::error!(
294                    error = &error as &dyn Error,
295                    tags.project_key = %scoping.project_key,
296                    "failed to store envelope"
297                );
298            }
299        }
300    }
301
302    fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
303        let mut envelope = managed_envelope.take_envelope();
304        let received_at = managed_envelope.received_at();
305        let scoping = managed_envelope.scoping();
306
307        let retention = envelope.retention();
308        let downsampled_retention = envelope.downsampled_retention();
309
310        let event_id = envelope.event_id();
311        let event_item = envelope.as_mut().take_item_by(|item| {
312            matches!(
313                item.ty(),
314                ItemType::Event | ItemType::Transaction | ItemType::Security
315            )
316        });
317        let event_type = event_item.as_ref().map(|item| item.ty());
318
319        // Some error events like minidumps need all attachment chunks to be processed _before_
320        // the event payload on the consumer side. Transaction attachments do not require this ordering
321        // guarantee, so they do not have to go to the same topic as their event payload.
322        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
323            KafkaTopic::Transactions
324        } else if envelope.get_item_by(is_slow_item).is_some() {
325            KafkaTopic::Attachments
326        } else {
327            KafkaTopic::Events
328        };
329
330        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
331
332        let mut attachments = Vec::new();
333        let mut replay_event = None;
334        let mut replay_recording = None;
335
336        for item in envelope.items() {
337            let content_type = item.content_type();
338            match item.ty() {
339                ItemType::Attachment => {
340                    if let Some(attachment) = self.produce_attachment(
341                        event_id.ok_or(StoreError::NoEventId)?,
342                        scoping.project_id,
343                        item,
344                        send_individual_attachments,
345                    )? {
346                        attachments.push(attachment);
347                    }
348                }
349                ItemType::UserReport => {
350                    debug_assert!(event_topic == KafkaTopic::Attachments);
351                    self.produce_user_report(
352                        event_id.ok_or(StoreError::NoEventId)?,
353                        scoping.project_id,
354                        received_at,
355                        item,
356                    )?;
357                }
358                ItemType::UserReportV2 => {
359                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
360                    self.produce_user_report_v2(
361                        event_id.ok_or(StoreError::NoEventId)?,
362                        scoping.project_id,
363                        received_at,
364                        item,
365                        remote_addr,
366                    )?;
367                }
368                ItemType::Profile => self.produce_profile(
369                    scoping.organization_id,
370                    scoping.project_id,
371                    scoping.key_id,
372                    received_at,
373                    retention,
374                    item,
375                )?,
376                ItemType::ReplayVideo => {
377                    self.produce_replay_video(
378                        event_id,
379                        scoping,
380                        item.payload(),
381                        received_at,
382                        retention,
383                    )?;
384                }
385                ItemType::ReplayRecording => {
386                    replay_recording = Some(item);
387                }
388                ItemType::ReplayEvent => {
389                    replay_event = Some(item);
390                }
391                ItemType::CheckIn => {
392                    let client = envelope.meta().client();
393                    self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
394                }
395                ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
396                    scoping,
397                    received_at,
398                    event_id,
399                    retention,
400                    downsampled_retention,
401                    item,
402                )?,
403                ty @ ItemType::Log => {
404                    debug_assert!(
405                        false,
406                        "received {ty} through an envelope, \
407                        this item must be submitted via a specific store message instead"
408                    );
409                    relay_log::error!(
410                        tags.project_key = %scoping.project_key,
411                        "StoreService received unsupported item type '{ty}' in envelope"
412                    );
413                }
414                other => {
415                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
416                    let item_types = envelope
417                        .items()
418                        .map(|item| item.ty().as_str())
419                        .collect::<Vec<_>>();
420                    let attachment_types = envelope
421                        .items()
422                        .map(|item| {
423                            item.attachment_type()
424                                .map(|t| t.to_string())
425                                .unwrap_or_default()
426                        })
427                        .collect::<Vec<_>>();
428
429                    relay_log::with_scope(
430                        |scope| {
431                            scope.set_extra("item_types", item_types.into());
432                            scope.set_extra("attachment_types", attachment_types.into());
433                            if other == &ItemType::FormData {
434                                let payload = item.payload();
435                                let form_data_keys = FormDataIter::new(&payload)
436                                    .map(|entry| entry.key())
437                                    .collect::<Vec<_>>();
438                                scope.set_extra("form_data_keys", form_data_keys.into());
439                            }
440                        },
441                        || {
442                            relay_log::error!(
443                                tags.project_key = %scoping.project_key,
444                                tags.event_type = event_type.unwrap_or("none"),
445                                "StoreService received unexpected item type: {other}"
446                            )
447                        },
448                    )
449                }
450            }
451        }
452
453        if let Some(recording) = replay_recording {
454            // If a recording item type was seen we produce it to Kafka with the replay-event
455            // payload (should it have been provided).
456            //
457            // The replay_video value is always specified as `None`. We do not allow separate
458            // item types for `ReplayVideo` events.
459            let replay_event = replay_event.map(|rv| rv.payload());
460            self.produce_replay_recording(
461                event_id,
462                scoping,
463                &recording.payload(),
464                replay_event.as_deref(),
465                None,
466                received_at,
467                retention,
468            )?;
469        }
470
471        if let Some(event_item) = event_item {
472            let event_id = event_id.ok_or(StoreError::NoEventId)?;
473            let project_id = scoping.project_id;
474            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
475
476            self.produce(
477                event_topic,
478                KafkaMessage::Event(EventKafkaMessage {
479                    payload: event_item.payload(),
480                    start_time: safe_timestamp(received_at),
481                    event_id,
482                    project_id,
483                    remote_addr,
484                    attachments,
485                }),
486            )?;
487        } else {
488            debug_assert!(attachments.is_empty());
489        }
490
491        Ok(())
492    }
493
494    fn handle_store_metrics(&self, message: StoreMetrics) {
495        let StoreMetrics {
496            buckets,
497            scoping,
498            retention,
499        } = message;
500
501        let batch_size = self.config.metrics_max_batch_size_bytes();
502        let mut error = None;
503
504        let global_config = self.global_config.current();
505        let mut encoder = BucketEncoder::new(&global_config);
506
507        let emit_sessions_to_eap = utils::is_rolled_out(
508            scoping.organization_id.value(),
509            global_config.options.sessions_eap_rollout_rate,
510        )
511        .is_keep();
512
513        let now = UnixTimestamp::now();
514        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
515
516        for mut bucket in buckets {
517            let namespace = encoder.prepare(&mut bucket);
518
519            if let Some(received_at) = bucket.metadata.received_at {
520                let delay = now.as_secs().saturating_sub(received_at.as_secs());
521                let (total, count, max) = delay_stats.get_mut(namespace);
522                *total += delay;
523                *count += 1;
524                *max = (*max).max(delay);
525            }
526
527            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
528            // each bucket separately, we only need to split buckets that exceed the size, but not
529            // batches.
530            for view in BucketsView::new(std::slice::from_ref(&bucket))
531                .by_size(batch_size)
532                .flatten()
533            {
534                let message = self.create_metric_message(
535                    scoping.organization_id,
536                    scoping.project_id,
537                    &mut encoder,
538                    namespace,
539                    &view,
540                    retention,
541                );
542
543                let result =
544                    message.and_then(|message| self.send_metric_message(namespace, message));
545
546                let outcome = match result {
547                    Ok(()) => Outcome::Accepted,
548                    Err(e) => {
549                        error.get_or_insert(e);
550                        Outcome::Invalid(DiscardReason::Internal)
551                    }
552                };
553
554                self.metric_outcomes.track(scoping, &[view], outcome);
555            }
556
557            if emit_sessions_to_eap
558                && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
559            {
560                let message = KafkaMessage::for_item(scoping, trace_item);
561                let _ = self.produce(KafkaTopic::Items, message);
562            }
563        }
564
565        if let Some(error) = error {
566            relay_log::error!(
567                error = &error as &dyn std::error::Error,
568                "failed to produce metric buckets: {error}"
569            );
570        }
571
572        for (namespace, (total, count, max)) in delay_stats {
573            if count == 0 {
574                continue;
575            }
576            metric!(
577                counter(RelayCounters::MetricDelaySum) += total,
578                namespace = namespace.as_str()
579            );
580            metric!(
581                counter(RelayCounters::MetricDelayCount) += count,
582                namespace = namespace.as_str()
583            );
584            metric!(
585                gauge(RelayGauges::MetricDelayMax) = max,
586                namespace = namespace.as_str()
587            );
588        }
589    }
590
591    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
592        let scoping = message.scoping();
593        let received_at = message.received_at();
594
595        let quantities = message.try_accept(|item| {
596            let message = KafkaMessage::for_item(scoping, item.trace_item);
597            self.produce(KafkaTopic::Items, message)
598                .map(|()| item.quantities)
599        });
600
601        // Accepted outcomes when items have been successfully produced to rdkafka.
602        //
603        // This is only a temporary measure, long term these outcomes will be part of the trace
604        // item and emitted by Snuba to guarantee a delivery to storage.
605        if let Ok(quantities) = quantities {
606            for (category, quantity) in quantities {
607                self.outcome_aggregator.send(TrackOutcome {
608                    category,
609                    event_id: None,
610                    outcome: Outcome::Accepted,
611                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
612                    remote_addr: None,
613                    scoping,
614                    timestamp: received_at,
615                });
616            }
617        }
618    }
619
620    fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
621        let scoping = message.scoping();
622        let received_at = message.received_at();
623
624        let meta = SpanMeta {
625            organization_id: scoping.organization_id,
626            project_id: scoping.project_id,
627            key_id: scoping.key_id,
628            event_id: None,
629            retention_days: message.retention_days,
630            downsampled_retention_days: message.downsampled_retention_days,
631            received: datetime_to_timestamp(received_at),
632        };
633
634        let result = message.try_accept(|span| {
635            let item = Annotated::new(span.item);
636            let message = KafkaMessage::SpanV2 {
637                routing_key: span.routing_key,
638                headers: BTreeMap::from([(
639                    "project_id".to_owned(),
640                    scoping.project_id.to_string(),
641                )]),
642                message: SpanKafkaMessage {
643                    meta,
644                    span: SerializableAnnotated(&item),
645                },
646            };
647
648            self.produce(KafkaTopic::Spans, message)
649        });
650
651        if result.is_ok() {
652            relay_statsd::metric!(
653                counter(RelayCounters::SpanV2Produced) += 1,
654                via = "processing"
655            );
656
657            // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
658            // or the segments consumer, depending on which will produce outcomes later.
659            self.outcome_aggregator.send(TrackOutcome {
660                category: DataCategory::SpanIndexed,
661                event_id: None,
662                outcome: Outcome::Accepted,
663                quantity: 1,
664                remote_addr: None,
665                scoping,
666                timestamp: received_at,
667            });
668        }
669    }
670
671    fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
672        let scoping = message.scoping();
673        let received_at = message.received_at();
674
675        let _ = message.try_accept(|message| {
676            let message = ProfileChunkKafkaMessage {
677                organization_id: scoping.organization_id,
678                project_id: scoping.project_id,
679                received: safe_timestamp(received_at),
680                retention_days: message.retention_days,
681                headers: BTreeMap::from([(
682                    "project_id".to_owned(),
683                    scoping.project_id.to_string(),
684                )]),
685                payload: message.payload,
686            };
687
688            self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
689        });
690    }
691
692    fn create_metric_message<'a>(
693        &self,
694        organization_id: OrganizationId,
695        project_id: ProjectId,
696        encoder: &'a mut BucketEncoder,
697        namespace: MetricNamespace,
698        view: &BucketView<'a>,
699        retention_days: u16,
700    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
701        let value = match view.value() {
702            BucketViewValue::Counter(c) => MetricValue::Counter(c),
703            BucketViewValue::Distribution(data) => MetricValue::Distribution(
704                encoder
705                    .encode_distribution(namespace, data)
706                    .map_err(StoreError::EncodingFailed)?,
707            ),
708            BucketViewValue::Set(data) => MetricValue::Set(
709                encoder
710                    .encode_set(namespace, data)
711                    .map_err(StoreError::EncodingFailed)?,
712            ),
713            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
714        };
715
716        Ok(MetricKafkaMessage {
717            org_id: organization_id,
718            project_id,
719            name: view.name(),
720            value,
721            timestamp: view.timestamp(),
722            tags: view.tags(),
723            retention_days,
724            received_at: view.metadata().received_at,
725        })
726    }
727
728    fn produce(
729        &self,
730        topic: KafkaTopic,
731        // Takes message by value to ensure it is not being produced twice.
732        message: KafkaMessage,
733    ) -> Result<(), StoreError> {
734        relay_log::trace!("Sending kafka message of type {}", message.variant());
735
736        let topic_name = self
737            .producer
738            .client
739            .send_message(topic, &message)
740            .inspect_err(|err| {
741                relay_log::error!(
742                    error = err as &dyn Error,
743                    tags.topic = ?topic,
744                    tags.message = message.variant(),
745                    "failed to produce to Kafka"
746                )
747            })?;
748
749        match &message {
750            KafkaMessage::Metric {
751                message: metric, ..
752            } => {
753                metric!(
754                    counter(RelayCounters::ProcessingMessageProduced) += 1,
755                    event_type = message.variant(),
756                    topic = topic_name,
757                    metric_type = metric.value.variant(),
758                    metric_encoding = metric.value.encoding().unwrap_or(""),
759                );
760            }
761            KafkaMessage::ReplayRecordingNotChunked(replay) => {
762                let has_video = replay.replay_video.is_some();
763
764                metric!(
765                    counter(RelayCounters::ProcessingMessageProduced) += 1,
766                    event_type = message.variant(),
767                    topic = topic_name,
768                    has_video = bool_to_str(has_video),
769                );
770            }
771            message => {
772                metric!(
773                    counter(RelayCounters::ProcessingMessageProduced) += 1,
774                    event_type = message.variant(),
775                    topic = topic_name,
776                );
777            }
778        }
779
780        Ok(())
781    }
782
783    /// Produces Kafka messages for the content and metadata of an attachment item.
784    ///
785    /// The `send_individual_attachments` controls whether the metadata of an attachment
786    /// is produced directly as an individual `attachment` message, or returned from this function
787    /// to be later sent as part of an `event` message.
788    ///
789    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
790    /// unless the `send_individual_attachments` flag is set, and the content is small enough
791    /// to fit inside a message.
792    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
793    /// of the `attachment` message instead.
794    fn produce_attachment(
795        &self,
796        event_id: EventId,
797        project_id: ProjectId,
798        item: &Item,
799        send_individual_attachments: bool,
800    ) -> Result<Option<ChunkedAttachment>, StoreError> {
801        let id = Uuid::new_v4().to_string();
802
803        let payload = item.payload();
804        let size = item.len();
805        let max_chunk_size = self.config.attachment_chunk_size();
806
807        let payload = if size == 0 {
808            AttachmentPayload::Chunked(0)
809        } else if let Some(stored_key) = item.stored_key() {
810            AttachmentPayload::Stored(stored_key.into())
811        } else if send_individual_attachments && size < max_chunk_size {
812            // When sending individual attachments, and we have a single chunk, we want to send the
813            // `data` inline in the `attachment` message.
814            // This avoids a needless roundtrip through the attachments cache on the Sentry side.
815            AttachmentPayload::Inline(payload)
816        } else {
817            let mut chunk_index = 0;
818            let mut offset = 0;
819            // This skips chunks for empty attachments. The consumer does not require chunks for
820            // empty attachments. `chunks` will be `0` in this case.
821            while offset < size {
822                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
823                let chunk_message = AttachmentChunkKafkaMessage {
824                    payload: payload.slice(offset..offset + chunk_size),
825                    event_id,
826                    project_id,
827                    id: id.clone(),
828                    chunk_index,
829                };
830
831                self.produce(
832                    KafkaTopic::Attachments,
833                    KafkaMessage::AttachmentChunk(chunk_message),
834                )?;
835                offset += chunk_size;
836                chunk_index += 1;
837            }
838
839            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
840            // is one larger than the last chunk, so it is equal to the number of chunks.
841            AttachmentPayload::Chunked(chunk_index)
842        };
843
844        let attachment = ChunkedAttachment {
845            id,
846            name: match item.filename() {
847                Some(name) => name.to_owned(),
848                None => UNNAMED_ATTACHMENT.to_owned(),
849            },
850            rate_limited: item.rate_limited(),
851            content_type: item
852                .content_type()
853                .map(|content_type| content_type.as_str().to_owned()),
854            attachment_type: item.attachment_type().cloned().unwrap_or_default(),
855            size,
856            payload,
857        };
858
859        if send_individual_attachments {
860            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
861                event_id,
862                project_id,
863                attachment,
864            });
865            self.produce(KafkaTopic::Attachments, message)?;
866            Ok(None)
867        } else {
868            Ok(Some(attachment))
869        }
870    }
871
872    fn produce_user_report(
873        &self,
874        event_id: EventId,
875        project_id: ProjectId,
876        received_at: DateTime<Utc>,
877        item: &Item,
878    ) -> Result<(), StoreError> {
879        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
880            project_id,
881            event_id,
882            start_time: safe_timestamp(received_at),
883            payload: item.payload(),
884        });
885
886        self.produce(KafkaTopic::Attachments, message)
887    }
888
889    fn produce_user_report_v2(
890        &self,
891        event_id: EventId,
892        project_id: ProjectId,
893        received_at: DateTime<Utc>,
894        item: &Item,
895        remote_addr: Option<String>,
896    ) -> Result<(), StoreError> {
897        let message = KafkaMessage::Event(EventKafkaMessage {
898            project_id,
899            event_id,
900            payload: item.payload(),
901            start_time: safe_timestamp(received_at),
902            remote_addr,
903            attachments: vec![],
904        });
905        self.produce(KafkaTopic::Feedback, message)
906    }
907
908    fn send_metric_message(
909        &self,
910        namespace: MetricNamespace,
911        message: MetricKafkaMessage,
912    ) -> Result<(), StoreError> {
913        let topic = match namespace {
914            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
915            MetricNamespace::Unsupported => {
916                relay_log::with_scope(
917                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
918                    || relay_log::error!("store service dropping unknown metric usecase"),
919                );
920                return Ok(());
921            }
922            _ => KafkaTopic::MetricsGeneric,
923        };
924
925        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
926        self.produce(topic, KafkaMessage::Metric { headers, message })?;
927        Ok(())
928    }
929
930    fn produce_profile(
931        &self,
932        organization_id: OrganizationId,
933        project_id: ProjectId,
934        key_id: Option<u64>,
935        received_at: DateTime<Utc>,
936        retention_days: u16,
937        item: &Item,
938    ) -> Result<(), StoreError> {
939        let message = ProfileKafkaMessage {
940            organization_id,
941            project_id,
942            key_id,
943            received: safe_timestamp(received_at),
944            retention_days,
945            headers: BTreeMap::from([
946                (
947                    "sampled".to_owned(),
948                    if item.sampled() { "true" } else { "false" }.to_owned(),
949                ),
950                ("project_id".to_owned(), project_id.to_string()),
951            ]),
952            payload: item.payload(),
953        };
954        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
955        Ok(())
956    }
957
958    #[allow(clippy::too_many_arguments)]
959    fn produce_replay_recording(
960        &self,
961        event_id: Option<EventId>,
962        scoping: Scoping,
963        payload: &[u8],
964        replay_event: Option<&[u8]>,
965        replay_video: Option<&[u8]>,
966        received_at: DateTime<Utc>,
967        retention: u16,
968    ) -> Result<(), StoreError> {
969        // Maximum number of bytes accepted by the consumer.
970        let max_payload_size = self.config.max_replay_message_size();
971
972        // Size of the consumer message. We can be reasonably sure this won't overflow because
973        // of the request size validation provided by Nginx and Relay.
974        let mut payload_size = 2000; // Reserve 2KB for the message metadata.
975        payload_size += replay_event.as_ref().map_or(0, |b| b.len());
976        payload_size += replay_video.as_ref().map_or(0, |b| b.len());
977        payload_size += payload.len();
978
979        // If the recording payload can not fit in to the message do not produce and quit early.
980        if payload_size >= max_payload_size {
981            relay_log::debug!("replay_recording over maximum size.");
982            self.outcome_aggregator.send(TrackOutcome {
983                category: DataCategory::Replay,
984                event_id,
985                outcome: Outcome::Invalid(DiscardReason::TooLarge(
986                    DiscardItemType::ReplayRecording,
987                )),
988                quantity: 1,
989                remote_addr: None,
990                scoping,
991                timestamp: received_at,
992            });
993            return Ok(());
994        }
995
996        let message =
997            KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
998                replay_id: event_id.ok_or(StoreError::NoEventId)?,
999                project_id: scoping.project_id,
1000                key_id: scoping.key_id,
1001                org_id: scoping.organization_id,
1002                received: safe_timestamp(received_at),
1003                retention_days: retention,
1004                payload,
1005                replay_event,
1006                replay_video,
1007                // Hardcoded to `true` to indicate to the consumer that it should always publish the
1008                // replay_event as relay no longer does it.
1009                relay_snuba_publish_disabled: true,
1010            });
1011
1012        self.produce(KafkaTopic::ReplayRecordings, message)?;
1013
1014        Ok(())
1015    }
1016
1017    fn produce_replay_video(
1018        &self,
1019        event_id: Option<EventId>,
1020        scoping: Scoping,
1021        payload: Bytes,
1022        received_at: DateTime<Utc>,
1023        retention: u16,
1024    ) -> Result<(), StoreError> {
1025        #[derive(Deserialize)]
1026        struct VideoEvent<'a> {
1027            replay_event: &'a [u8],
1028            replay_recording: &'a [u8],
1029            replay_video: &'a [u8],
1030        }
1031
1032        let Ok(VideoEvent {
1033            replay_video,
1034            replay_event,
1035            replay_recording,
1036        }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1037        else {
1038            self.outcome_aggregator.send(TrackOutcome {
1039                category: DataCategory::Replay,
1040                event_id,
1041                outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1042                quantity: 1,
1043                remote_addr: None,
1044                scoping,
1045                timestamp: received_at,
1046            });
1047            return Ok(());
1048        };
1049
1050        self.produce_replay_recording(
1051            event_id,
1052            scoping,
1053            replay_recording,
1054            Some(replay_event),
1055            Some(replay_video),
1056            received_at,
1057            retention,
1058        )
1059    }
1060
1061    fn produce_check_in(
1062        &self,
1063        project_id: ProjectId,
1064        received_at: DateTime<Utc>,
1065        client: Option<&str>,
1066        retention_days: u16,
1067        item: &Item,
1068    ) -> Result<(), StoreError> {
1069        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1070            message_type: CheckInMessageType::CheckIn,
1071            project_id,
1072            retention_days,
1073            start_time: safe_timestamp(received_at),
1074            sdk: client.map(str::to_owned),
1075            payload: item.payload(),
1076            routing_key_hint: item.routing_hint(),
1077        });
1078
1079        self.produce(KafkaTopic::Monitors, message)?;
1080
1081        Ok(())
1082    }
1083
1084    fn produce_span(
1085        &self,
1086        scoping: Scoping,
1087        received_at: DateTime<Utc>,
1088        event_id: Option<EventId>,
1089        retention_days: u16,
1090        downsampled_retention_days: u16,
1091        item: &Item,
1092    ) -> Result<(), StoreError> {
1093        debug_assert_eq!(item.ty(), &ItemType::Span);
1094        debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1095
1096        let Scoping {
1097            organization_id,
1098            project_id,
1099            project_key: _,
1100            key_id,
1101        } = scoping;
1102
1103        let payload = item.payload();
1104        let message = SpanKafkaMessageRaw {
1105            meta: SpanMeta {
1106                organization_id,
1107                project_id,
1108                key_id,
1109                event_id,
1110                retention_days,
1111                downsampled_retention_days,
1112                received: datetime_to_timestamp(received_at),
1113            },
1114            span: serde_json::from_slice(&payload)
1115                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1116        };
1117
1118        // Verify that this is a V2 span:
1119        debug_assert!(message.span.contains_key("attributes"));
1120        relay_statsd::metric!(
1121            counter(RelayCounters::SpanV2Produced) += 1,
1122            via = "envelope"
1123        );
1124
1125        self.produce(
1126            KafkaTopic::Spans,
1127            KafkaMessage::SpanRaw {
1128                routing_key: item.routing_hint(),
1129                headers: BTreeMap::from([(
1130                    "project_id".to_owned(),
1131                    scoping.project_id.to_string(),
1132                )]),
1133                message,
1134            },
1135        )?;
1136
1137        // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1138        // or the segments consumer, depending on which will produce outcomes later.
1139        self.outcome_aggregator.send(TrackOutcome {
1140            category: DataCategory::SpanIndexed,
1141            event_id: None,
1142            outcome: Outcome::Accepted,
1143            quantity: 1,
1144            remote_addr: None,
1145            scoping,
1146            timestamp: received_at,
1147        });
1148
1149        Ok(())
1150    }
1151}
1152
1153impl Service for StoreService {
1154    type Interface = Store;
1155
1156    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1157        let this = Arc::new(self);
1158
1159        relay_log::info!("store forwarder started");
1160
1161        while let Some(message) = rx.recv().await {
1162            let service = Arc::clone(&this);
1163            // For now, we run each task synchronously, in the future we might explore how to make
1164            // the store async.
1165            this.pool
1166                .spawn_async(async move { service.handle_message(message) }.boxed())
1167                .await;
1168        }
1169
1170        relay_log::info!("store forwarder stopped");
1171    }
1172}
1173
1174/// This signifies how the attachment payload is being transfered.
1175#[derive(Debug, Serialize)]
1176enum AttachmentPayload {
1177    /// The payload has been split into multiple chunks.
1178    ///
1179    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1180    /// If the payload `size == 0`, the number of chunks will also be `0`.
1181    #[serde(rename = "chunks")]
1182    Chunked(usize),
1183
1184    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1185    #[serde(rename = "data")]
1186    Inline(Bytes),
1187
1188    /// The attachment has already been stored into the objectstore, with the given Id.
1189    #[serde(rename = "stored_id")]
1190    Stored(String),
1191}
1192
1193/// Common attributes for both standalone attachments and processing-relevant attachments.
1194#[derive(Debug, Serialize)]
1195struct ChunkedAttachment {
1196    /// The attachment ID within the event.
1197    ///
1198    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1199    id: String,
1200
1201    /// File name of the attachment file.
1202    name: String,
1203
1204    /// Whether this attachment was rate limited and should be removed after processing.
1205    ///
1206    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1207    /// native crash reports still need to be retained. These attachments are marked with the
1208    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1209    /// not be persisted after processing.
1210    rate_limited: bool,
1211
1212    /// Content type of the attachment payload.
1213    #[serde(skip_serializing_if = "Option::is_none")]
1214    content_type: Option<String>,
1215
1216    /// The Sentry-internal attachment type used in the processing pipeline.
1217    #[serde(serialize_with = "serialize_attachment_type")]
1218    attachment_type: AttachmentType,
1219
1220    /// The size of the attachment in bytes.
1221    size: usize,
1222
1223    /// The attachment payload, chunked, inlined, or already stored.
1224    #[serde(flatten)]
1225    payload: AttachmentPayload,
1226}
1227
1228/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1229///
1230/// Cannot serialize bytes.
1231///
1232/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1233fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1234where
1235    S: serde::Serializer,
1236    T: serde::Serialize,
1237{
1238    serde_json::to_value(t)
1239        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1240        .serialize(serializer)
1241}
1242
1243/// Container payload for event messages.
1244#[derive(Debug, Serialize)]
1245struct EventKafkaMessage {
1246    /// Raw event payload.
1247    payload: Bytes,
1248    /// Time at which the event was received by Relay.
1249    start_time: u64,
1250    /// The event id.
1251    event_id: EventId,
1252    /// The project id for the current event.
1253    project_id: ProjectId,
1254    /// The client ip address.
1255    remote_addr: Option<String>,
1256    /// Attachments that are potentially relevant for processing.
1257    attachments: Vec<ChunkedAttachment>,
1258}
1259
1260/// Container payload for chunks of attachments.
1261#[derive(Debug, Serialize)]
1262struct AttachmentChunkKafkaMessage {
1263    /// Chunk payload of the attachment.
1264    payload: Bytes,
1265    /// The event id.
1266    event_id: EventId,
1267    /// The project id for the current event.
1268    project_id: ProjectId,
1269    /// The attachment ID within the event.
1270    ///
1271    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1272    id: String,
1273    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1274    chunk_index: usize,
1275}
1276
1277/// A "standalone" attachment.
1278///
1279/// Still belongs to an event but can be sent independently (like UserReport) and is not
1280/// considered in processing.
1281#[derive(Debug, Serialize)]
1282struct AttachmentKafkaMessage {
1283    /// The event id.
1284    event_id: EventId,
1285    /// The project id for the current event.
1286    project_id: ProjectId,
1287    /// The attachment.
1288    attachment: ChunkedAttachment,
1289}
1290
1291#[derive(Debug, Serialize)]
1292struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1293    replay_id: EventId,
1294    key_id: Option<u64>,
1295    org_id: OrganizationId,
1296    project_id: ProjectId,
1297    received: u64,
1298    retention_days: u16,
1299    #[serde(with = "serde_bytes")]
1300    payload: &'a [u8],
1301    #[serde(with = "serde_bytes")]
1302    replay_event: Option<&'a [u8]>,
1303    #[serde(with = "serde_bytes")]
1304    replay_video: Option<&'a [u8]>,
1305    relay_snuba_publish_disabled: bool,
1306}
1307
1308/// User report for an event wrapped up in a message ready for consumption in Kafka.
1309///
1310/// Is always independent of an event and can be sent as part of any envelope.
1311#[derive(Debug, Serialize)]
1312struct UserReportKafkaMessage {
1313    /// The project id for the current event.
1314    project_id: ProjectId,
1315    start_time: u64,
1316    payload: Bytes,
1317
1318    // Used for KafkaMessage::key
1319    #[serde(skip)]
1320    event_id: EventId,
1321}
1322
1323#[derive(Clone, Debug, Serialize)]
1324struct MetricKafkaMessage<'a> {
1325    org_id: OrganizationId,
1326    project_id: ProjectId,
1327    name: &'a MetricName,
1328    #[serde(flatten)]
1329    value: MetricValue<'a>,
1330    timestamp: UnixTimestamp,
1331    tags: &'a BTreeMap<String, String>,
1332    retention_days: u16,
1333    #[serde(skip_serializing_if = "Option::is_none")]
1334    received_at: Option<UnixTimestamp>,
1335}
1336
1337#[derive(Clone, Debug, Serialize)]
1338#[serde(tag = "type", content = "value")]
1339enum MetricValue<'a> {
1340    #[serde(rename = "c")]
1341    Counter(FiniteF64),
1342    #[serde(rename = "d")]
1343    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1344    #[serde(rename = "s")]
1345    Set(ArrayEncoding<'a, SetView<'a>>),
1346    #[serde(rename = "g")]
1347    Gauge(GaugeValue),
1348}
1349
1350impl MetricValue<'_> {
1351    fn variant(&self) -> &'static str {
1352        match self {
1353            Self::Counter(_) => "counter",
1354            Self::Distribution(_) => "distribution",
1355            Self::Set(_) => "set",
1356            Self::Gauge(_) => "gauge",
1357        }
1358    }
1359
1360    fn encoding(&self) -> Option<&'static str> {
1361        match self {
1362            Self::Distribution(ae) => Some(ae.name()),
1363            Self::Set(ae) => Some(ae.name()),
1364            _ => None,
1365        }
1366    }
1367}
1368
1369#[derive(Clone, Debug, Serialize)]
1370struct ProfileKafkaMessage {
1371    organization_id: OrganizationId,
1372    project_id: ProjectId,
1373    key_id: Option<u64>,
1374    received: u64,
1375    retention_days: u16,
1376    #[serde(skip)]
1377    headers: BTreeMap<String, String>,
1378    payload: Bytes,
1379}
1380
1381/// Used to discriminate cron monitor ingestion messages.
1382///
1383/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1384/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1385/// intended to ensure the clock continues to run even when ingestion volume drops.
1386#[allow(dead_code)]
1387#[derive(Debug, Serialize)]
1388#[serde(rename_all = "snake_case")]
1389enum CheckInMessageType {
1390    ClockPulse,
1391    CheckIn,
1392}
1393
1394#[derive(Debug, Serialize)]
1395struct CheckInKafkaMessage {
1396    #[serde(skip)]
1397    routing_key_hint: Option<Uuid>,
1398
1399    /// Used by the consumer to discrinminate the message.
1400    message_type: CheckInMessageType,
1401    /// Raw event payload.
1402    payload: Bytes,
1403    /// Time at which the event was received by Relay.
1404    start_time: u64,
1405    /// The SDK client which produced the event.
1406    sdk: Option<String>,
1407    /// The project id for the current event.
1408    project_id: ProjectId,
1409    /// Number of days to retain.
1410    retention_days: u16,
1411}
1412
1413#[derive(Debug, Serialize)]
1414struct SpanKafkaMessageRaw<'a> {
1415    #[serde(flatten)]
1416    meta: SpanMeta,
1417    #[serde(flatten)]
1418    span: BTreeMap<&'a str, &'a RawValue>,
1419}
1420
1421#[derive(Debug, Serialize)]
1422struct SpanKafkaMessage<'a> {
1423    #[serde(flatten)]
1424    meta: SpanMeta,
1425    #[serde(flatten)]
1426    span: SerializableAnnotated<'a, SpanV2>,
1427}
1428
1429#[derive(Debug, Serialize)]
1430struct SpanMeta {
1431    organization_id: OrganizationId,
1432    project_id: ProjectId,
1433    // Required for the buffer to emit outcomes scoped to the DSN.
1434    #[serde(skip_serializing_if = "Option::is_none")]
1435    key_id: Option<u64>,
1436    #[serde(skip_serializing_if = "Option::is_none")]
1437    event_id: Option<EventId>,
1438    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1439    received: f64,
1440    /// Number of days until these data should be deleted.
1441    retention_days: u16,
1442    /// Number of days until the downsampled version of this data should be deleted.
1443    downsampled_retention_days: u16,
1444}
1445
1446#[derive(Clone, Debug, Serialize)]
1447struct ProfileChunkKafkaMessage {
1448    organization_id: OrganizationId,
1449    project_id: ProjectId,
1450    received: u64,
1451    retention_days: u16,
1452    #[serde(skip)]
1453    headers: BTreeMap<String, String>,
1454    payload: Bytes,
1455}
1456
1457/// An enum over all possible ingest messages.
1458#[derive(Debug, Serialize)]
1459#[serde(tag = "type", rename_all = "snake_case")]
1460#[allow(clippy::large_enum_variant)]
1461enum KafkaMessage<'a> {
1462    Event(EventKafkaMessage),
1463    UserReport(UserReportKafkaMessage),
1464    Metric {
1465        #[serde(skip)]
1466        headers: BTreeMap<String, String>,
1467        #[serde(flatten)]
1468        message: MetricKafkaMessage<'a>,
1469    },
1470    CheckIn(CheckInKafkaMessage),
1471    Item {
1472        #[serde(skip)]
1473        headers: BTreeMap<String, String>,
1474        #[serde(skip)]
1475        item_type: TraceItemType,
1476        #[serde(skip)]
1477        message: TraceItem,
1478    },
1479    SpanRaw {
1480        #[serde(skip)]
1481        routing_key: Option<Uuid>,
1482        #[serde(skip)]
1483        headers: BTreeMap<String, String>,
1484        #[serde(flatten)]
1485        message: SpanKafkaMessageRaw<'a>,
1486    },
1487    SpanV2 {
1488        #[serde(skip)]
1489        routing_key: Option<Uuid>,
1490        #[serde(skip)]
1491        headers: BTreeMap<String, String>,
1492        #[serde(flatten)]
1493        message: SpanKafkaMessage<'a>,
1494    },
1495
1496    Attachment(AttachmentKafkaMessage),
1497    AttachmentChunk(AttachmentChunkKafkaMessage),
1498
1499    Profile(ProfileKafkaMessage),
1500    ProfileChunk(ProfileChunkKafkaMessage),
1501
1502    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1503}
1504
1505impl KafkaMessage<'_> {
1506    /// Creates a [`KafkaMessage`] for a [`TraceItem`].
1507    fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1508        let item_type = item.item_type();
1509        KafkaMessage::Item {
1510            headers: BTreeMap::from([
1511                ("project_id".to_owned(), scoping.project_id.to_string()),
1512                ("item_type".to_owned(), (item_type as i32).to_string()),
1513            ]),
1514            message: item,
1515            item_type,
1516        }
1517    }
1518}
1519
1520impl Message for KafkaMessage<'_> {
1521    fn variant(&self) -> &'static str {
1522        match self {
1523            KafkaMessage::Event(_) => "event",
1524            KafkaMessage::UserReport(_) => "user_report",
1525            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1526                MetricNamespace::Sessions => "metric_sessions",
1527                MetricNamespace::Transactions => "metric_transactions",
1528                MetricNamespace::Spans => "metric_spans",
1529                MetricNamespace::Custom => "metric_custom",
1530                MetricNamespace::Unsupported => "metric_unsupported",
1531            },
1532            KafkaMessage::CheckIn(_) => "check_in",
1533            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1534            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1535
1536            KafkaMessage::Attachment(_) => "attachment",
1537            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1538
1539            KafkaMessage::Profile(_) => "profile",
1540            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1541
1542            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1543        }
1544    }
1545
1546    /// Returns the partitioning key for this Kafka message determining.
1547    fn key(&self) -> Option<relay_kafka::Key> {
1548        match self {
1549            Self::Event(message) => Some(message.event_id.0),
1550            Self::UserReport(message) => Some(message.event_id.0),
1551            Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1552
1553            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1554            //
1555            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1556            // recieve the routing_key_hint form their envelopes.
1557            Self::CheckIn(message) => message.routing_key_hint,
1558
1559            Self::Attachment(message) => Some(message.event_id.0),
1560            Self::AttachmentChunk(message) => Some(message.event_id.0),
1561
1562            // Random partitioning
1563            Self::Metric { .. }
1564            | Self::Item { .. }
1565            | Self::Profile(_)
1566            | Self::ProfileChunk(_)
1567            | Self::ReplayRecordingNotChunked(_) => None,
1568        }
1569        .filter(|uuid| !uuid.is_nil())
1570        .map(|uuid| uuid.as_u128())
1571    }
1572
1573    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1574        match &self {
1575            KafkaMessage::Metric { headers, .. }
1576            | KafkaMessage::SpanRaw { headers, .. }
1577            | KafkaMessage::SpanV2 { headers, .. }
1578            | KafkaMessage::Item { headers, .. }
1579            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1580            | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1581
1582            KafkaMessage::Event(_)
1583            | KafkaMessage::UserReport(_)
1584            | KafkaMessage::CheckIn(_)
1585            | KafkaMessage::Attachment(_)
1586            | KafkaMessage::AttachmentChunk(_)
1587            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1588        }
1589    }
1590
1591    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1592        match self {
1593            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1594            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1595            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1596            KafkaMessage::Item { message, .. } => {
1597                let mut payload = Vec::new();
1598                match message.encode(&mut payload) {
1599                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1600                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1601                }
1602            }
1603            KafkaMessage::Event(_)
1604            | KafkaMessage::UserReport(_)
1605            | KafkaMessage::CheckIn(_)
1606            | KafkaMessage::Attachment(_)
1607            | KafkaMessage::AttachmentChunk(_)
1608            | KafkaMessage::Profile(_)
1609            | KafkaMessage::ProfileChunk(_)
1610            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1611                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1612                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1613            },
1614        }
1615    }
1616}
1617
1618fn serialize_as_json<T: serde::Serialize>(
1619    value: &T,
1620) -> Result<SerializationOutput<'_>, ClientError> {
1621    match serde_json::to_vec(value) {
1622        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1623        Err(err) => Err(ClientError::InvalidJson(err)),
1624    }
1625}
1626
1627/// Determines if the given item is considered slow.
1628///
1629/// Slow items must be routed to the `Attachments` topic.
1630fn is_slow_item(item: &Item) -> bool {
1631    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1632}
1633
1634fn bool_to_str(value: bool) -> &'static str {
1635    if value { "true" } else { "false" }
1636}
1637
1638/// Returns a safe timestamp for Kafka.
1639///
1640/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1641fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1642    let ts = timestamp.timestamp();
1643    if ts >= 0 {
1644        return ts as u64;
1645    }
1646
1647    // We assume this call can't return < 0.
1648    Utc::now().timestamp() as u64
1649}
1650
1651#[cfg(test)]
1652mod tests {
1653
1654    use super::*;
1655
1656    #[test]
1657    fn disallow_outcomes() {
1658        let config = Config::default();
1659        let producer = Producer::create(&config).unwrap();
1660
1661        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1662            let res = producer
1663                .client
1664                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1665
1666            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1667        }
1668    }
1669}