relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use prost_types::Timestamp;
15use sentry_protos::snuba::v1::any_value::Value;
16use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType};
17use serde::ser::SerializeMap;
18use serde::{Deserialize, Serialize};
19use serde_json::value::RawValue;
20use serde_json::{Deserializer, Value as JsonValue};
21use uuid::Uuid;
22
23use relay_base_schema::data_category::DataCategory;
24use relay_base_schema::organization::OrganizationId;
25use relay_base_schema::project::ProjectId;
26use relay_common::time::UnixTimestamp;
27use relay_config::Config;
28use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};
29use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
30use relay_metrics::{
31    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
32    MetricNamespace, SetView,
33};
34use relay_protocol::FiniteF64;
35use relay_quotas::Scoping;
36use relay_statsd::metric;
37use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
38use relay_threading::AsyncPool;
39
40use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
41use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope};
42use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
43use crate::service::ServiceError;
44use crate::services::global_config::GlobalConfigHandle;
45use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
46use crate::services::processor::Processed;
47use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
48use crate::utils::{self, FormDataIter, PickResult};
49
50/// Fallback name used for attachment items without a `filename` header.
51const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
52
53#[derive(Debug, thiserror::Error)]
54pub enum StoreError {
55    #[error("failed to send the message to kafka: {0}")]
56    SendFailed(#[from] ClientError),
57    #[error("failed to encode data: {0}")]
58    EncodingFailed(std::io::Error),
59    #[error("failed to store event because event id was missing")]
60    NoEventId,
61}
62
63impl OutcomeError for StoreError {
64    type Error = Self;
65
66    fn consume(self) -> (Option<Outcome>, Self::Error) {
67        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
68    }
69}
70
71struct Producer {
72    client: KafkaClient,
73}
74
75impl Producer {
76    pub fn create(config: &Config) -> anyhow::Result<Self> {
77        let mut client_builder = KafkaClient::builder();
78
79        for topic in KafkaTopic::iter().filter(|t| {
80            // Outcomes should not be sent from the store forwarder.
81            // See `KafkaOutcomesProducer`.
82            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
83        }) {
84            let kafka_configs = config.kafka_configs(*topic)?;
85            client_builder = client_builder
86                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
87                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
88        }
89
90        Ok(Self {
91            client: client_builder.build(),
92        })
93    }
94}
95
96/// Publishes an [`Envelope`] to the Sentry core application through Kafka topics.
97#[derive(Debug)]
98pub struct StoreEnvelope {
99    pub envelope: TypedEnvelope<Processed>,
100}
101
102/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
103#[derive(Clone, Debug)]
104pub struct StoreMetrics {
105    pub buckets: Vec<Bucket>,
106    pub scoping: Scoping,
107    pub retention: u16,
108}
109
110/// Publishes a log item to Sentry core application through Kafka.
111#[derive(Debug)]
112pub struct StoreTraceItem {
113    /// The final trace item which will be produced to Kafka.
114    pub trace_item: TraceItem,
115    /// Outcomes to be emitted when successfully producing the item to Kafka.
116    ///
117    /// Note: this is only a temporary measure, long term these outcomes will be part of the trace
118    /// item and emitted by Snuba to guarantee a delivery to storage.
119    pub quantities: Quantities,
120}
121
122impl Counted for StoreTraceItem {
123    fn quantities(&self) -> Quantities {
124        self.quantities.clone()
125    }
126}
127
128/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
129pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
130
131/// Service interface for the [`StoreEnvelope`] message.
132#[derive(Debug)]
133pub enum Store {
134    /// An envelope containing a mixture of items.
135    ///
136    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
137    /// for example logs must be submitted via [`Self::TraceItem`] instead.
138    ///
139    /// Long term this variant is going to be replaced with fully typed variants of items which can
140    /// be stored instead.
141    Envelope(StoreEnvelope),
142    /// Aggregated generic metrics.
143    Metrics(StoreMetrics),
144    /// A singular [`TraceItem`].
145    TraceItem(Managed<StoreTraceItem>),
146}
147
148impl Store {
149    /// Returns the name of the message variant.
150    fn variant(&self) -> &'static str {
151        match self {
152            Store::Envelope(_) => "envelope",
153            Store::Metrics(_) => "metrics",
154            Store::TraceItem(_) => "log",
155        }
156    }
157}
158
159impl Interface for Store {}
160
161impl FromMessage<StoreEnvelope> for Store {
162    type Response = NoResponse;
163
164    fn from_message(message: StoreEnvelope, _: ()) -> Self {
165        Self::Envelope(message)
166    }
167}
168
169impl FromMessage<StoreMetrics> for Store {
170    type Response = NoResponse;
171
172    fn from_message(message: StoreMetrics, _: ()) -> Self {
173        Self::Metrics(message)
174    }
175}
176
177impl FromMessage<Managed<StoreTraceItem>> for Store {
178    type Response = NoResponse;
179
180    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
181        Self::TraceItem(message)
182    }
183}
184
185/// Service implementing the [`Store`] interface.
186pub struct StoreService {
187    pool: StoreServicePool,
188    config: Arc<Config>,
189    global_config: GlobalConfigHandle,
190    outcome_aggregator: Addr<TrackOutcome>,
191    metric_outcomes: MetricOutcomes,
192    producer: Producer,
193}
194
195impl StoreService {
196    pub fn create(
197        pool: StoreServicePool,
198        config: Arc<Config>,
199        global_config: GlobalConfigHandle,
200        outcome_aggregator: Addr<TrackOutcome>,
201        metric_outcomes: MetricOutcomes,
202    ) -> anyhow::Result<Self> {
203        let producer = Producer::create(&config)?;
204        Ok(Self {
205            pool,
206            config,
207            global_config,
208            outcome_aggregator,
209            metric_outcomes,
210            producer,
211        })
212    }
213
214    fn handle_message(&self, message: Store) {
215        let ty = message.variant();
216        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
217            match message {
218                Store::Envelope(message) => self.handle_store_envelope(message),
219                Store::Metrics(message) => self.handle_store_metrics(message),
220                Store::TraceItem(message) => self.handle_store_trace_item(message),
221            }
222        })
223    }
224
225    fn handle_store_envelope(&self, message: StoreEnvelope) {
226        let StoreEnvelope {
227            envelope: mut managed,
228        } = message;
229
230        let scoping = managed.scoping();
231        let envelope = managed.take_envelope();
232
233        match self.store_envelope(envelope, managed.received_at(), scoping) {
234            Ok(()) => managed.accept(),
235            Err(error) => {
236                managed.reject(Outcome::Invalid(DiscardReason::Internal));
237                relay_log::error!(
238                    error = &error as &dyn Error,
239                    tags.project_key = %scoping.project_key,
240                    "failed to store envelope"
241                );
242            }
243        }
244    }
245
246    fn store_envelope(
247        &self,
248        mut envelope: Box<Envelope>,
249        received_at: DateTime<Utc>,
250        scoping: Scoping,
251    ) -> Result<(), StoreError> {
252        let retention = envelope.retention();
253        let downsampled_retention = envelope.downsampled_retention();
254
255        let event_id = envelope.event_id();
256        let event_item = envelope.as_mut().take_item_by(|item| {
257            matches!(
258                item.ty(),
259                ItemType::Event | ItemType::Transaction | ItemType::Security
260            )
261        });
262        let event_type = event_item.as_ref().map(|item| item.ty());
263
264        // Some error events like minidumps need all attachment chunks to be processed _before_
265        // the event payload on the consumer side. Transaction attachments do not require this ordering
266        // guarantee, so they do not have to go to the same topic as their event payload.
267        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
268            KafkaTopic::Transactions
269        } else if envelope.get_item_by(is_slow_item).is_some() {
270            KafkaTopic::Attachments
271        } else {
272            KafkaTopic::Events
273        };
274
275        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
276
277        let mut attachments = Vec::new();
278        let mut replay_event = None;
279        let mut replay_recording = None;
280
281        // Whether Relay will submit the replay-event to snuba or not.
282        let replay_relay_snuba_publish_disabled = utils::sample(
283            self.global_config
284                .current()
285                .options
286                .replay_relay_snuba_publish_disabled_sample_rate,
287        )
288        .is_keep();
289
290        for item in envelope.items() {
291            match item.ty() {
292                ItemType::Attachment => {
293                    if let Some(attachment) = self.produce_attachment(
294                        event_id.ok_or(StoreError::NoEventId)?,
295                        scoping.project_id,
296                        item,
297                        send_individual_attachments,
298                    )? {
299                        attachments.push(attachment);
300                    }
301                }
302                ItemType::UserReport => {
303                    debug_assert!(event_topic == KafkaTopic::Attachments);
304                    self.produce_user_report(
305                        event_id.ok_or(StoreError::NoEventId)?,
306                        scoping.project_id,
307                        received_at,
308                        item,
309                    )?;
310                }
311                ItemType::UserReportV2 => {
312                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
313                    self.produce_user_report_v2(
314                        event_id.ok_or(StoreError::NoEventId)?,
315                        scoping.project_id,
316                        received_at,
317                        item,
318                        remote_addr,
319                    )?;
320                }
321                ItemType::Profile => self.produce_profile(
322                    scoping.organization_id,
323                    scoping.project_id,
324                    scoping.key_id,
325                    received_at,
326                    retention,
327                    item,
328                )?,
329                ItemType::ReplayVideo => {
330                    self.produce_replay_video(
331                        event_id,
332                        scoping,
333                        item.payload(),
334                        received_at,
335                        retention,
336                        replay_relay_snuba_publish_disabled,
337                    )?;
338                }
339                ItemType::ReplayRecording => {
340                    replay_recording = Some(item);
341                }
342                ItemType::ReplayEvent => {
343                    replay_event = Some(item);
344                    self.produce_replay_event(
345                        event_id.ok_or(StoreError::NoEventId)?,
346                        scoping.project_id,
347                        received_at,
348                        retention,
349                        &item.payload(),
350                        replay_relay_snuba_publish_disabled,
351                    )?;
352                }
353                ItemType::CheckIn => {
354                    let client = envelope.meta().client();
355                    self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
356                }
357                ItemType::Span => self.produce_span(
358                    scoping,
359                    received_at,
360                    event_id,
361                    retention,
362                    downsampled_retention,
363                    item,
364                )?,
365                ty @ ItemType::Log => {
366                    debug_assert!(
367                        false,
368                        "received {ty} through an envelope, \
369                        this item must be submitted via a specific store message instead"
370                    );
371                    relay_log::error!(
372                        tags.project_key = %scoping.project_key,
373                        "StoreService received unsupported item type '{ty}' in envelope"
374                    );
375                }
376                ItemType::ProfileChunk => self.produce_profile_chunk(
377                    scoping.organization_id,
378                    scoping.project_id,
379                    received_at,
380                    retention,
381                    item,
382                )?,
383                other => {
384                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
385                    let item_types = envelope
386                        .items()
387                        .map(|item| item.ty().as_str())
388                        .collect::<Vec<_>>();
389                    let attachment_types = envelope
390                        .items()
391                        .map(|item| {
392                            item.attachment_type()
393                                .map(|t| t.to_string())
394                                .unwrap_or_default()
395                        })
396                        .collect::<Vec<_>>();
397
398                    relay_log::with_scope(
399                        |scope| {
400                            scope.set_extra("item_types", item_types.into());
401                            scope.set_extra("attachment_types", attachment_types.into());
402                            if other == &ItemType::FormData {
403                                let payload = item.payload();
404                                let form_data_keys = FormDataIter::new(&payload)
405                                    .map(|entry| entry.key())
406                                    .collect::<Vec<_>>();
407                                scope.set_extra("form_data_keys", form_data_keys.into());
408                            }
409                        },
410                        || {
411                            relay_log::error!(
412                                tags.project_key = %scoping.project_key,
413                                tags.event_type = event_type.unwrap_or("none"),
414                                "StoreService received unexpected item type: {other}"
415                            )
416                        },
417                    )
418                }
419            }
420        }
421
422        if let Some(recording) = replay_recording {
423            // If a recording item type was seen we produce it to Kafka with the replay-event
424            // payload (should it have been provided).
425            //
426            // The replay_video value is always specified as `None`. We do not allow separate
427            // item types for `ReplayVideo` events.
428            let replay_event = replay_event.map(|rv| rv.payload());
429            self.produce_replay_recording(
430                event_id,
431                scoping,
432                &recording.payload(),
433                replay_event.as_deref(),
434                None,
435                received_at,
436                retention,
437                replay_relay_snuba_publish_disabled,
438            )?;
439        }
440
441        if let Some(event_item) = event_item {
442            let event_id = event_id.ok_or(StoreError::NoEventId)?;
443            let project_id = scoping.project_id;
444            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
445
446            self.produce(
447                event_topic,
448                KafkaMessage::Event(EventKafkaMessage {
449                    payload: event_item.payload(),
450                    start_time: safe_timestamp(received_at),
451                    event_id,
452                    project_id,
453                    remote_addr,
454                    attachments,
455                }),
456            )?;
457        } else {
458            debug_assert!(attachments.is_empty());
459        }
460
461        Ok(())
462    }
463
464    fn handle_store_metrics(&self, message: StoreMetrics) {
465        let StoreMetrics {
466            buckets,
467            scoping,
468            retention,
469        } = message;
470
471        let batch_size = self.config.metrics_max_batch_size_bytes();
472        let mut error = None;
473
474        let global_config = self.global_config.current();
475        let mut encoder = BucketEncoder::new(&global_config);
476
477        let now = UnixTimestamp::now();
478        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
479
480        for mut bucket in buckets {
481            let namespace = encoder.prepare(&mut bucket);
482
483            if let Some(received_at) = bucket.metadata.received_at {
484                let delay = now.as_secs().saturating_sub(received_at.as_secs());
485                let (total, count, max) = delay_stats.get_mut(namespace);
486                *total += delay;
487                *count += 1;
488                *max = (*max).max(delay);
489            }
490
491            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
492            // each bucket separately, we only need to split buckets that exceed the size, but not
493            // batches.
494            for view in BucketsView::new(std::slice::from_ref(&bucket))
495                .by_size(batch_size)
496                .flatten()
497            {
498                let message = self.create_metric_message(
499                    scoping.organization_id,
500                    scoping.project_id,
501                    &mut encoder,
502                    namespace,
503                    &view,
504                    retention,
505                );
506
507                let result =
508                    message.and_then(|message| self.send_metric_message(namespace, message));
509
510                let outcome = match result {
511                    Ok(()) => Outcome::Accepted,
512                    Err(e) => {
513                        error.get_or_insert(e);
514                        Outcome::Invalid(DiscardReason::Internal)
515                    }
516                };
517
518                self.metric_outcomes.track(scoping, &[view], outcome);
519            }
520        }
521
522        if let Some(error) = error {
523            relay_log::error!(
524                error = &error as &dyn std::error::Error,
525                "failed to produce metric buckets: {error}"
526            );
527        }
528
529        for (namespace, (total, count, max)) in delay_stats {
530            if count == 0 {
531                continue;
532            }
533            metric!(
534                counter(RelayCounters::MetricDelaySum) += total,
535                namespace = namespace.as_str()
536            );
537            metric!(
538                counter(RelayCounters::MetricDelayCount) += count,
539                namespace = namespace.as_str()
540            );
541            metric!(
542                gauge(RelayGauges::MetricDelayMax) = max,
543                namespace = namespace.as_str()
544            );
545        }
546    }
547
548    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
549        let scoping = message.scoping();
550        let received_at = message.received_at();
551
552        let quantities = message.try_accept(|item| {
553            let item_type = item.trace_item.item_type();
554            let message = KafkaMessage::Item {
555                headers: BTreeMap::from([
556                    ("project_id".to_owned(), scoping.project_id.to_string()),
557                    ("item_type".to_owned(), (item_type as i32).to_string()),
558                ]),
559                message: item.trace_item,
560                item_type,
561            };
562
563            self.produce(KafkaTopic::Items, message)
564                .map(|()| item.quantities)
565        });
566
567        // Accepted outcomes when items have been successfully produced to rdkafka.
568        //
569        // This is only a temporary measure, long term these outcomes will be part of the trace
570        // item and emitted by Snuba to guarantee a delivery to storage.
571        if let Ok(quantities) = quantities {
572            for (category, quantity) in quantities {
573                self.outcome_aggregator.send(TrackOutcome {
574                    category,
575                    event_id: None,
576                    outcome: Outcome::Accepted,
577                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
578                    remote_addr: None,
579                    scoping,
580                    timestamp: received_at,
581                });
582            }
583        }
584    }
585
586    fn create_metric_message<'a>(
587        &self,
588        organization_id: OrganizationId,
589        project_id: ProjectId,
590        encoder: &'a mut BucketEncoder,
591        namespace: MetricNamespace,
592        view: &BucketView<'a>,
593        retention_days: u16,
594    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
595        let value = match view.value() {
596            BucketViewValue::Counter(c) => MetricValue::Counter(c),
597            BucketViewValue::Distribution(data) => MetricValue::Distribution(
598                encoder
599                    .encode_distribution(namespace, data)
600                    .map_err(StoreError::EncodingFailed)?,
601            ),
602            BucketViewValue::Set(data) => MetricValue::Set(
603                encoder
604                    .encode_set(namespace, data)
605                    .map_err(StoreError::EncodingFailed)?,
606            ),
607            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
608        };
609
610        Ok(MetricKafkaMessage {
611            org_id: organization_id,
612            project_id,
613            name: view.name(),
614            value,
615            timestamp: view.timestamp(),
616            tags: view.tags(),
617            retention_days,
618            received_at: view.metadata().received_at,
619        })
620    }
621
622    fn produce(
623        &self,
624        topic: KafkaTopic,
625        // Takes message by value to ensure it is not being produced twice.
626        message: KafkaMessage,
627    ) -> Result<(), StoreError> {
628        relay_log::trace!("Sending kafka message of type {}", message.variant());
629
630        let topic_name = self.producer.client.send_message(topic, &message)?;
631
632        match &message {
633            KafkaMessage::Metric {
634                message: metric, ..
635            } => {
636                metric!(
637                    counter(RelayCounters::ProcessingMessageProduced) += 1,
638                    event_type = message.variant(),
639                    topic = topic_name,
640                    metric_type = metric.value.variant(),
641                    metric_encoding = metric.value.encoding().unwrap_or(""),
642                );
643            }
644            KafkaMessage::Span { message: span, .. } => {
645                let is_segment = span.is_segment;
646                let has_parent = span.parent_span_id.is_some();
647                let platform = VALID_PLATFORMS.iter().find(|p| *p == &span.platform);
648
649                metric!(
650                    counter(RelayCounters::ProcessingMessageProduced) += 1,
651                    event_type = message.variant(),
652                    topic = topic_name,
653                    platform = platform.unwrap_or(&""),
654                    is_segment = bool_to_str(is_segment),
655                    has_parent = bool_to_str(has_parent),
656                    topic = topic_name,
657                );
658            }
659            KafkaMessage::ReplayRecordingNotChunked(replay) => {
660                let has_video = replay.replay_video.is_some();
661
662                metric!(
663                    counter(RelayCounters::ProcessingMessageProduced) += 1,
664                    event_type = message.variant(),
665                    topic = topic_name,
666                    has_video = bool_to_str(has_video),
667                );
668            }
669            message => {
670                metric!(
671                    counter(RelayCounters::ProcessingMessageProduced) += 1,
672                    event_type = message.variant(),
673                    topic = topic_name,
674                );
675            }
676        }
677
678        Ok(())
679    }
680
681    /// Produces Kafka messages for the content and metadata of an attachment item.
682    ///
683    /// The `send_individual_attachments` controls whether the metadata of an attachment
684    /// is produced directly as an individual `attachment` message, or returned from this function
685    /// to be later sent as part of an `event` message.
686    ///
687    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
688    /// unless the `send_individual_attachments` flag is set, and the content is small enough
689    /// to fit inside a message.
690    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
691    /// of the `attachment` message instead.
692    fn produce_attachment(
693        &self,
694        event_id: EventId,
695        project_id: ProjectId,
696        item: &Item,
697        send_individual_attachments: bool,
698    ) -> Result<Option<ChunkedAttachment>, StoreError> {
699        let id = Uuid::new_v4().to_string();
700
701        let payload = item.payload();
702        let size = item.len();
703        let max_chunk_size = self.config.attachment_chunk_size();
704
705        // When sending individual attachments, and we have a single chunk, we want to send the
706        // `data` inline in the `attachment` message.
707        // This avoids a needless roundtrip through the attachments cache on the Sentry side.
708        let payload = if size == 0 {
709            AttachmentPayload::Chunked(0)
710        } else if send_individual_attachments && size < max_chunk_size {
711            AttachmentPayload::Inline(payload)
712        } else {
713            let mut chunk_index = 0;
714            let mut offset = 0;
715            // This skips chunks for empty attachments. The consumer does not require chunks for
716            // empty attachments. `chunks` will be `0` in this case.
717            while offset < size {
718                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
719                let chunk_message = AttachmentChunkKafkaMessage {
720                    payload: payload.slice(offset..offset + chunk_size),
721                    event_id,
722                    project_id,
723                    id: id.clone(),
724                    chunk_index,
725                };
726
727                self.produce(
728                    KafkaTopic::Attachments,
729                    KafkaMessage::AttachmentChunk(chunk_message),
730                )?;
731                offset += chunk_size;
732                chunk_index += 1;
733            }
734
735            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
736            // is one larger than the last chunk, so it is equal to the number of chunks.
737            AttachmentPayload::Chunked(chunk_index)
738        };
739
740        let attachment = ChunkedAttachment {
741            id,
742            name: match item.filename() {
743                Some(name) => name.to_owned(),
744                None => UNNAMED_ATTACHMENT.to_owned(),
745            },
746            rate_limited: item.rate_limited(),
747            content_type: item
748                .content_type()
749                .map(|content_type| content_type.as_str().to_owned()),
750            attachment_type: item.attachment_type().cloned().unwrap_or_default(),
751            size,
752            payload,
753        };
754
755        if send_individual_attachments {
756            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
757                event_id,
758                project_id,
759                attachment,
760            });
761            self.produce(KafkaTopic::Attachments, message)?;
762            Ok(None)
763        } else {
764            Ok(Some(attachment))
765        }
766    }
767
768    fn produce_user_report(
769        &self,
770        event_id: EventId,
771        project_id: ProjectId,
772        received_at: DateTime<Utc>,
773        item: &Item,
774    ) -> Result<(), StoreError> {
775        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
776            project_id,
777            event_id,
778            start_time: safe_timestamp(received_at),
779            payload: item.payload(),
780        });
781
782        self.produce(KafkaTopic::Attachments, message)
783    }
784
785    fn produce_user_report_v2(
786        &self,
787        event_id: EventId,
788        project_id: ProjectId,
789        received_at: DateTime<Utc>,
790        item: &Item,
791        remote_addr: Option<String>,
792    ) -> Result<(), StoreError> {
793        let message = KafkaMessage::Event(EventKafkaMessage {
794            project_id,
795            event_id,
796            payload: item.payload(),
797            start_time: safe_timestamp(received_at),
798            remote_addr,
799            attachments: vec![],
800        });
801        self.produce(KafkaTopic::Feedback, message)
802    }
803
804    fn send_metric_message(
805        &self,
806        namespace: MetricNamespace,
807        message: MetricKafkaMessage,
808    ) -> Result<(), StoreError> {
809        let topic = match namespace {
810            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
811            MetricNamespace::Unsupported => {
812                relay_log::with_scope(
813                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
814                    || relay_log::error!("store service dropping unknown metric usecase"),
815                );
816                return Ok(());
817            }
818            _ => KafkaTopic::MetricsGeneric,
819        };
820        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
821
822        self.produce(topic, KafkaMessage::Metric { headers, message })?;
823        Ok(())
824    }
825
826    fn produce_profile(
827        &self,
828        organization_id: OrganizationId,
829        project_id: ProjectId,
830        key_id: Option<u64>,
831        received_at: DateTime<Utc>,
832        retention_days: u16,
833        item: &Item,
834    ) -> Result<(), StoreError> {
835        let message = ProfileKafkaMessage {
836            organization_id,
837            project_id,
838            key_id,
839            received: safe_timestamp(received_at),
840            retention_days,
841            headers: BTreeMap::from([(
842                "sampled".to_owned(),
843                if item.sampled() { "true" } else { "false" }.to_owned(),
844            )]),
845            payload: item.payload(),
846        };
847        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
848        Ok(())
849    }
850
851    fn produce_replay_event(
852        &self,
853        replay_id: EventId,
854        project_id: ProjectId,
855        received_at: DateTime<Utc>,
856        retention_days: u16,
857        payload: &[u8],
858        relay_snuba_publish_disabled: bool,
859    ) -> Result<(), StoreError> {
860        if relay_snuba_publish_disabled {
861            return Ok(());
862        }
863
864        let message = ReplayEventKafkaMessage {
865            replay_id,
866            project_id,
867            retention_days,
868            start_time: safe_timestamp(received_at),
869            payload,
870        };
871        self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
872        Ok(())
873    }
874
875    #[allow(clippy::too_many_arguments)]
876    fn produce_replay_recording(
877        &self,
878        event_id: Option<EventId>,
879        scoping: Scoping,
880        payload: &[u8],
881        replay_event: Option<&[u8]>,
882        replay_video: Option<&[u8]>,
883        received_at: DateTime<Utc>,
884        retention: u16,
885        relay_snuba_publish_disabled: bool,
886    ) -> Result<(), StoreError> {
887        // Maximum number of bytes accepted by the consumer.
888        let max_payload_size = self.config.max_replay_message_size();
889
890        // Size of the consumer message. We can be reasonably sure this won't overflow because
891        // of the request size validation provided by Nginx and Relay.
892        let mut payload_size = 2000; // Reserve 2KB for the message metadata.
893        payload_size += replay_event.as_ref().map_or(0, |b| b.len());
894        payload_size += replay_video.as_ref().map_or(0, |b| b.len());
895        payload_size += payload.len();
896
897        // If the recording payload can not fit in to the message do not produce and quit early.
898        if payload_size >= max_payload_size {
899            relay_log::debug!("replay_recording over maximum size.");
900            self.outcome_aggregator.send(TrackOutcome {
901                category: DataCategory::Replay,
902                event_id,
903                outcome: Outcome::Invalid(DiscardReason::TooLarge(
904                    DiscardItemType::ReplayRecording,
905                )),
906                quantity: 1,
907                remote_addr: None,
908                scoping,
909                timestamp: received_at,
910            });
911            return Ok(());
912        }
913
914        let message =
915            KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
916                replay_id: event_id.ok_or(StoreError::NoEventId)?,
917                project_id: scoping.project_id,
918                key_id: scoping.key_id,
919                org_id: scoping.organization_id,
920                received: safe_timestamp(received_at),
921                retention_days: retention,
922                payload,
923                replay_event,
924                replay_video,
925                relay_snuba_publish_disabled,
926            });
927
928        self.produce(KafkaTopic::ReplayRecordings, message)?;
929
930        Ok(())
931    }
932
933    fn produce_replay_video(
934        &self,
935        event_id: Option<EventId>,
936        scoping: Scoping,
937        payload: Bytes,
938        received_at: DateTime<Utc>,
939        retention: u16,
940        relay_snuba_publish_disabled: bool,
941    ) -> Result<(), StoreError> {
942        #[derive(Deserialize)]
943        struct VideoEvent<'a> {
944            replay_event: &'a [u8],
945            replay_recording: &'a [u8],
946            replay_video: &'a [u8],
947        }
948
949        let Ok(VideoEvent {
950            replay_video,
951            replay_event,
952            replay_recording,
953        }) = rmp_serde::from_slice::<VideoEvent>(&payload)
954        else {
955            self.outcome_aggregator.send(TrackOutcome {
956                category: DataCategory::Replay,
957                event_id,
958                outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
959                quantity: 1,
960                remote_addr: None,
961                scoping,
962                timestamp: received_at,
963            });
964            return Ok(());
965        };
966
967        self.produce_replay_event(
968            event_id.ok_or(StoreError::NoEventId)?,
969            scoping.project_id,
970            received_at,
971            retention,
972            replay_event,
973            relay_snuba_publish_disabled,
974        )?;
975
976        self.produce_replay_recording(
977            event_id,
978            scoping,
979            replay_recording,
980            Some(replay_event),
981            Some(replay_video),
982            received_at,
983            retention,
984            relay_snuba_publish_disabled,
985        )
986    }
987
988    fn produce_check_in(
989        &self,
990        project_id: ProjectId,
991        received_at: DateTime<Utc>,
992        client: Option<&str>,
993        retention_days: u16,
994        item: &Item,
995    ) -> Result<(), StoreError> {
996        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
997            message_type: CheckInMessageType::CheckIn,
998            project_id,
999            retention_days,
1000            start_time: safe_timestamp(received_at),
1001            sdk: client.map(str::to_owned),
1002            payload: item.payload(),
1003            routing_key_hint: item.routing_hint(),
1004        });
1005
1006        self.produce(KafkaTopic::Monitors, message)?;
1007
1008        Ok(())
1009    }
1010
1011    fn produce_span(
1012        &self,
1013        scoping: Scoping,
1014        received_at: DateTime<Utc>,
1015        event_id: Option<EventId>,
1016        retention_days: u16,
1017        downsampled_retention_days: u16,
1018        item: &Item,
1019    ) -> Result<(), StoreError> {
1020        relay_log::trace!("Producing span");
1021
1022        let payload = item.payload();
1023        let d = &mut Deserializer::from_slice(&payload);
1024        let mut span: SpanKafkaMessage = match serde_path_to_error::deserialize(d) {
1025            Ok(span) => span,
1026            Err(error) => {
1027                relay_log::error!(
1028                    error = &error as &dyn std::error::Error,
1029                    "failed to parse span"
1030                );
1031                self.outcome_aggregator.send(TrackOutcome {
1032                    category: DataCategory::SpanIndexed,
1033                    event_id: None,
1034                    outcome: Outcome::Invalid(DiscardReason::InvalidSpan),
1035                    quantity: 1,
1036                    remote_addr: None,
1037                    scoping,
1038                    timestamp: received_at,
1039                });
1040                return Ok(());
1041            }
1042        };
1043
1044        // Discard measurements with empty `value`s.
1045        if let Some(measurements) = &mut span.measurements {
1046            measurements.retain(|_, v| v.as_ref().and_then(|v| v.value).is_some());
1047        }
1048
1049        span.backfill_data();
1050        span.duration_ms =
1051            ((span.end_timestamp_precise - span.start_timestamp_precise) * 1e3) as u32;
1052        span.event_id = event_id;
1053        span.organization_id = scoping.organization_id.value();
1054        span.project_id = scoping.project_id.value();
1055        span.retention_days = retention_days;
1056        span.downsampled_retention_days = downsampled_retention_days;
1057        span.start_timestamp_ms = (span.start_timestamp_precise * 1e3) as u64;
1058        span.key_id = scoping.key_id;
1059
1060        let spans_target = self.spans_target(span.organization_id);
1061        if spans_target.is_protobuf() {
1062            self.inner_produce_protobuf_span(
1063                scoping,
1064                received_at,
1065                event_id,
1066                retention_days,
1067                span.clone(),
1068            )?;
1069        }
1070
1071        if spans_target.is_json() {
1072            self.inner_produce_json_span(scoping, span)?;
1073        }
1074
1075        // XXX: Temporarily produce span outcomes also for JSON spans. Keep in sync with either EAP
1076        // or the segments consumer, depending on which will produce outcomes later.
1077        self.outcome_aggregator.send(TrackOutcome {
1078            category: DataCategory::SpanIndexed,
1079            event_id: None,
1080            outcome: Outcome::Accepted,
1081            quantity: 1,
1082            remote_addr: None,
1083            scoping,
1084            timestamp: received_at,
1085        });
1086
1087        Ok(())
1088    }
1089
1090    fn spans_target(&self, org_id: u64) -> SpansTarget {
1091        let config = self.config.span_producers();
1092        if config.produce_json_orgs.contains(&org_id) {
1093            return SpansTarget::Json;
1094        } else if let Some(rate) = config.produce_json_sample_rate {
1095            return match utils::is_rolled_out(org_id, rate) {
1096                PickResult::Keep => SpansTarget::Json,
1097                PickResult::Discard => SpansTarget::Protobuf,
1098            };
1099        }
1100
1101        match (config.produce_json, config.produce_protobuf) {
1102            (true, true) => SpansTarget::Both,
1103            (true, false) => SpansTarget::Json,
1104            (false, true) => SpansTarget::Protobuf,
1105            (false, false) => SpansTarget::default(),
1106        }
1107    }
1108
1109    fn inner_produce_json_span(
1110        &self,
1111        scoping: Scoping,
1112        span: SpanKafkaMessage,
1113    ) -> Result<(), StoreError> {
1114        self.produce(
1115            KafkaTopic::Spans,
1116            KafkaMessage::Span {
1117                headers: BTreeMap::from([(
1118                    "project_id".to_owned(),
1119                    scoping.project_id.to_string(),
1120                )]),
1121                message: span,
1122            },
1123        )?;
1124
1125        Ok(())
1126    }
1127
1128    fn inner_produce_protobuf_span(
1129        &self,
1130        scoping: Scoping,
1131        received_at: DateTime<Utc>,
1132        event_id: Option<EventId>,
1133        _retention_days: u16,
1134        span: SpanKafkaMessage,
1135    ) -> Result<(), StoreError> {
1136        let mut trace_item = TraceItem {
1137            item_type: TraceItemType::Span.into(),
1138            organization_id: scoping.organization_id.value(),
1139            project_id: scoping.project_id.value(),
1140            received: Some(Timestamp {
1141                seconds: safe_timestamp(received_at) as i64,
1142                nanos: 0,
1143            }),
1144            retention_days: span.retention_days.into(),
1145            downsampled_retention_days: span.downsampled_retention_days.into(),
1146            timestamp: Some(Timestamp {
1147                seconds: span.start_timestamp_precise as i64,
1148                nanos: 0,
1149            }),
1150            trace_id: span.trace_id.to_string(),
1151            item_id: u128::from_str_radix(&span.span_id, 16)
1152                .unwrap_or_default()
1153                .to_le_bytes()
1154                .to_vec(),
1155            attributes: Default::default(),
1156            client_sample_rate: span.client_sample_rate.unwrap_or_default(),
1157            server_sample_rate: span.server_sample_rate.unwrap_or_default(),
1158        };
1159
1160        if let Some(data) = span.data {
1161            for (key, raw_value) in data {
1162                let Some(raw_value) = raw_value else {
1163                    continue;
1164                };
1165
1166                let json_value = match Deserialize::deserialize(raw_value) {
1167                    Ok(v) => v,
1168                    Err(error) => {
1169                        // This should not be possible: a `RawValue` is definitely valid JSON,
1170                        // so deserializing it to a `json::Value` must succeed. But better safe
1171                        // than sorry.
1172                        relay_log::error!(
1173                            error = &error as &dyn std::error::Error,
1174                            raw_value = %raw_value,
1175                            "failed to parse JSON value"
1176                        );
1177                        continue;
1178                    }
1179                };
1180
1181                let any_value = match json_value {
1182                    JsonValue::String(string) => AnyValue {
1183                        value: Some(Value::StringValue(string)),
1184                    },
1185                    JsonValue::Number(number) => {
1186                        if number.is_i64() || number.is_u64() {
1187                            AnyValue {
1188                                value: Some(Value::IntValue(number.as_i64().unwrap_or_default())),
1189                            }
1190                        } else {
1191                            AnyValue {
1192                                value: Some(Value::DoubleValue(
1193                                    number.as_f64().unwrap_or_default(),
1194                                )),
1195                            }
1196                        }
1197                    }
1198                    JsonValue::Bool(bool) => AnyValue {
1199                        value: Some(Value::BoolValue(bool)),
1200                    },
1201                    JsonValue::Array(array) => AnyValue {
1202                        value: Some(Value::StringValue(
1203                            serde_json::to_string(&array).unwrap_or_default(),
1204                        )),
1205                    },
1206                    JsonValue::Object(object) => AnyValue {
1207                        value: Some(Value::StringValue(
1208                            serde_json::to_string(&object).unwrap_or_default(),
1209                        )),
1210                    },
1211                    _ => continue,
1212                };
1213
1214                trace_item.attributes.insert(key.into(), any_value);
1215            }
1216        }
1217
1218        if let Some(description) = span.description {
1219            trace_item.attributes.insert(
1220                "sentry.raw_description".into(),
1221                AnyValue {
1222                    value: Some(Value::StringValue(description.into())),
1223                },
1224            );
1225        }
1226
1227        trace_item.attributes.insert(
1228            "sentry.duration_ms".into(),
1229            AnyValue {
1230                value: Some(Value::IntValue(span.duration_ms.into())),
1231            },
1232        );
1233
1234        if let Some(event_id) = event_id {
1235            trace_item.attributes.insert(
1236                "sentry.event_id".into(),
1237                AnyValue {
1238                    value: Some(Value::StringValue(event_id.0.as_simple().to_string())),
1239                },
1240            );
1241        }
1242
1243        trace_item.attributes.insert(
1244            "sentry.is_segment".into(),
1245            AnyValue {
1246                value: Some(Value::BoolValue(span.is_segment)),
1247            },
1248        );
1249
1250        trace_item.attributes.insert(
1251            "sentry.exclusive_time_ms".into(),
1252            AnyValue {
1253                value: Some(Value::DoubleValue(span.exclusive_time_ms)),
1254            },
1255        );
1256
1257        trace_item.attributes.insert(
1258            "sentry.start_timestamp_precise".into(),
1259            AnyValue {
1260                value: Some(Value::DoubleValue(span.start_timestamp_precise)),
1261            },
1262        );
1263
1264        trace_item.attributes.insert(
1265            "sentry.end_timestamp_precise".into(),
1266            AnyValue {
1267                value: Some(Value::DoubleValue(span.end_timestamp_precise)),
1268            },
1269        );
1270
1271        trace_item.attributes.insert(
1272            "sentry.start_timestamp_ms".into(),
1273            AnyValue {
1274                value: Some(Value::IntValue(span.start_timestamp_ms as i64)),
1275            },
1276        );
1277
1278        trace_item.attributes.insert(
1279            "sentry.is_remote".into(),
1280            AnyValue {
1281                value: Some(Value::BoolValue(span.is_remote)),
1282            },
1283        );
1284
1285        if let Some(parent_span_id) = span.parent_span_id {
1286            trace_item.attributes.insert(
1287                "sentry.parent_span_id".into(),
1288                AnyValue {
1289                    value: Some(Value::StringValue(parent_span_id.into_owned())),
1290                },
1291            );
1292        }
1293
1294        if let Some(profile_id) = span.profile_id {
1295            trace_item.attributes.insert(
1296                "sentry.profile_id".into(),
1297                AnyValue {
1298                    value: Some(Value::StringValue(profile_id.into_owned())),
1299                },
1300            );
1301        }
1302
1303        if let Some(segment_id) = span.segment_id {
1304            trace_item.attributes.insert(
1305                "sentry.segment_id".into(),
1306                AnyValue {
1307                    value: Some(Value::StringValue(segment_id.into_owned())),
1308                },
1309            );
1310        }
1311
1312        if let Some(origin) = span.origin {
1313            trace_item.attributes.insert(
1314                "sentry.origin".into(),
1315                AnyValue {
1316                    value: Some(Value::StringValue(origin.into_owned())),
1317                },
1318            );
1319        }
1320
1321        if let Some(kind) = span.kind {
1322            trace_item.attributes.insert(
1323                "sentry.kind".into(),
1324                AnyValue {
1325                    value: Some(Value::StringValue(kind.into_owned())),
1326                },
1327            );
1328        }
1329
1330        self.produce(
1331            KafkaTopic::Items,
1332            KafkaMessage::Item {
1333                headers: BTreeMap::from([
1334                    (
1335                        "item_type".to_owned(),
1336                        (TraceItemType::Span as i32).to_string(),
1337                    ),
1338                    ("project_id".to_owned(), scoping.project_id.to_string()),
1339                ]),
1340                item_type: TraceItemType::Span,
1341                message: trace_item,
1342            },
1343        )?;
1344
1345        Ok(())
1346    }
1347
1348    fn produce_profile_chunk(
1349        &self,
1350        organization_id: OrganizationId,
1351        project_id: ProjectId,
1352        received_at: DateTime<Utc>,
1353        retention_days: u16,
1354        item: &Item,
1355    ) -> Result<(), StoreError> {
1356        let message = ProfileChunkKafkaMessage {
1357            organization_id,
1358            project_id,
1359            received: safe_timestamp(received_at),
1360            retention_days,
1361            payload: item.payload(),
1362        };
1363        self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1364        Ok(())
1365    }
1366}
1367
1368impl Service for StoreService {
1369    type Interface = Store;
1370
1371    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1372        let this = Arc::new(self);
1373
1374        relay_log::info!("store forwarder started");
1375
1376        while let Some(message) = rx.recv().await {
1377            let service = Arc::clone(&this);
1378            // For now, we run each task synchronously, in the future we might explore how to make
1379            // the store async.
1380            this.pool
1381                .spawn_async(async move { service.handle_message(message) }.boxed())
1382                .await;
1383        }
1384
1385        relay_log::info!("store forwarder stopped");
1386    }
1387}
1388
1389/// This signifies how the attachment payload is being transfered.
1390#[derive(Debug, Serialize)]
1391enum AttachmentPayload {
1392    /// The payload has been split into multiple chunks.
1393    ///
1394    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1395    /// If the payload `size == 0`, the number of chunks will also be `0`.
1396    #[serde(rename = "chunks")]
1397    Chunked(usize),
1398
1399    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1400    #[serde(rename = "data")]
1401    Inline(Bytes),
1402
1403    /// The attachment has already been stored into the objectstore, with the given Id.
1404    #[serde(rename = "stored_id")]
1405    #[allow(unused)] // TODO: actually storing it in objectstore first is still WIP
1406    Stored(String),
1407}
1408
1409/// Common attributes for both standalone attachments and processing-relevant attachments.
1410#[derive(Debug, Serialize)]
1411struct ChunkedAttachment {
1412    /// The attachment ID within the event.
1413    ///
1414    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1415    id: String,
1416
1417    /// File name of the attachment file.
1418    name: String,
1419
1420    /// Whether this attachment was rate limited and should be removed after processing.
1421    ///
1422    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1423    /// native crash reports still need to be retained. These attachments are marked with the
1424    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1425    /// not be persisted after processing.
1426    rate_limited: bool,
1427
1428    /// Content type of the attachment payload.
1429    #[serde(skip_serializing_if = "Option::is_none")]
1430    content_type: Option<String>,
1431
1432    /// The Sentry-internal attachment type used in the processing pipeline.
1433    #[serde(serialize_with = "serialize_attachment_type")]
1434    attachment_type: AttachmentType,
1435
1436    /// The size of the attachment in bytes.
1437    size: usize,
1438
1439    /// The attachment payload, chunked, inlined, or already stored.
1440    #[serde(flatten)]
1441    payload: AttachmentPayload,
1442}
1443
1444/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1445///
1446/// Cannot serialize bytes.
1447///
1448/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1449fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1450where
1451    S: serde::Serializer,
1452    T: serde::Serialize,
1453{
1454    serde_json::to_value(t)
1455        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1456        .serialize(serializer)
1457}
1458
1459fn serialize_btreemap_skip_nulls<K, S, T>(
1460    map: &Option<BTreeMap<K, Option<T>>>,
1461    serializer: S,
1462) -> Result<S::Ok, S::Error>
1463where
1464    K: Serialize,
1465    S: serde::Serializer,
1466    T: serde::Serialize,
1467{
1468    let Some(map) = map else {
1469        return serializer.serialize_none();
1470    };
1471    let mut m = serializer.serialize_map(Some(map.len()))?;
1472    for (key, value) in map.iter() {
1473        if let Some(value) = value {
1474            m.serialize_entry(key, value)?;
1475        }
1476    }
1477    m.end()
1478}
1479
1480/// Container payload for event messages.
1481#[derive(Debug, Serialize)]
1482struct EventKafkaMessage {
1483    /// Raw event payload.
1484    payload: Bytes,
1485    /// Time at which the event was received by Relay.
1486    start_time: u64,
1487    /// The event id.
1488    event_id: EventId,
1489    /// The project id for the current event.
1490    project_id: ProjectId,
1491    /// The client ip address.
1492    remote_addr: Option<String>,
1493    /// Attachments that are potentially relevant for processing.
1494    attachments: Vec<ChunkedAttachment>,
1495}
1496
1497#[derive(Debug, Serialize)]
1498struct ReplayEventKafkaMessage<'a> {
1499    /// Raw event payload.
1500    payload: &'a [u8],
1501    /// Time at which the event was received by Relay.
1502    start_time: u64,
1503    /// The event id.
1504    replay_id: EventId,
1505    /// The project id for the current event.
1506    project_id: ProjectId,
1507    // Number of days to retain.
1508    retention_days: u16,
1509}
1510
1511/// Container payload for chunks of attachments.
1512#[derive(Debug, Serialize)]
1513struct AttachmentChunkKafkaMessage {
1514    /// Chunk payload of the attachment.
1515    payload: Bytes,
1516    /// The event id.
1517    event_id: EventId,
1518    /// The project id for the current event.
1519    project_id: ProjectId,
1520    /// The attachment ID within the event.
1521    ///
1522    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1523    id: String,
1524    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1525    chunk_index: usize,
1526}
1527
1528/// A "standalone" attachment.
1529///
1530/// Still belongs to an event but can be sent independently (like UserReport) and is not
1531/// considered in processing.
1532#[derive(Debug, Serialize)]
1533struct AttachmentKafkaMessage {
1534    /// The event id.
1535    event_id: EventId,
1536    /// The project id for the current event.
1537    project_id: ProjectId,
1538    /// The attachment.
1539    attachment: ChunkedAttachment,
1540}
1541
1542#[derive(Debug, Serialize)]
1543struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1544    replay_id: EventId,
1545    key_id: Option<u64>,
1546    org_id: OrganizationId,
1547    project_id: ProjectId,
1548    received: u64,
1549    retention_days: u16,
1550    #[serde(with = "serde_bytes")]
1551    payload: &'a [u8],
1552    #[serde(with = "serde_bytes")]
1553    replay_event: Option<&'a [u8]>,
1554    #[serde(with = "serde_bytes")]
1555    replay_video: Option<&'a [u8]>,
1556    relay_snuba_publish_disabled: bool,
1557}
1558
1559/// User report for an event wrapped up in a message ready for consumption in Kafka.
1560///
1561/// Is always independent of an event and can be sent as part of any envelope.
1562#[derive(Debug, Serialize)]
1563struct UserReportKafkaMessage {
1564    /// The project id for the current event.
1565    project_id: ProjectId,
1566    start_time: u64,
1567    payload: Bytes,
1568
1569    // Used for KafkaMessage::key
1570    #[serde(skip)]
1571    event_id: EventId,
1572}
1573
1574#[derive(Clone, Debug, Serialize)]
1575struct MetricKafkaMessage<'a> {
1576    org_id: OrganizationId,
1577    project_id: ProjectId,
1578    name: &'a MetricName,
1579    #[serde(flatten)]
1580    value: MetricValue<'a>,
1581    timestamp: UnixTimestamp,
1582    tags: &'a BTreeMap<String, String>,
1583    retention_days: u16,
1584    #[serde(skip_serializing_if = "Option::is_none")]
1585    received_at: Option<UnixTimestamp>,
1586}
1587
1588#[derive(Clone, Debug, Serialize)]
1589#[serde(tag = "type", content = "value")]
1590enum MetricValue<'a> {
1591    #[serde(rename = "c")]
1592    Counter(FiniteF64),
1593    #[serde(rename = "d")]
1594    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1595    #[serde(rename = "s")]
1596    Set(ArrayEncoding<'a, SetView<'a>>),
1597    #[serde(rename = "g")]
1598    Gauge(GaugeValue),
1599}
1600
1601impl MetricValue<'_> {
1602    fn variant(&self) -> &'static str {
1603        match self {
1604            Self::Counter(_) => "counter",
1605            Self::Distribution(_) => "distribution",
1606            Self::Set(_) => "set",
1607            Self::Gauge(_) => "gauge",
1608        }
1609    }
1610
1611    fn encoding(&self) -> Option<&'static str> {
1612        match self {
1613            Self::Distribution(ae) => Some(ae.name()),
1614            Self::Set(ae) => Some(ae.name()),
1615            _ => None,
1616        }
1617    }
1618}
1619
1620#[derive(Clone, Debug, Serialize)]
1621struct ProfileKafkaMessage {
1622    organization_id: OrganizationId,
1623    project_id: ProjectId,
1624    key_id: Option<u64>,
1625    received: u64,
1626    retention_days: u16,
1627    #[serde(skip)]
1628    headers: BTreeMap<String, String>,
1629    payload: Bytes,
1630}
1631
1632/// Used to discriminate cron monitor ingestion messages.
1633///
1634/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1635/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1636/// intended to ensure the clock continues to run even when ingestion volume drops.
1637#[allow(dead_code)]
1638#[derive(Debug, Serialize)]
1639#[serde(rename_all = "snake_case")]
1640enum CheckInMessageType {
1641    ClockPulse,
1642    CheckIn,
1643}
1644
1645#[derive(Debug, Serialize)]
1646struct CheckInKafkaMessage {
1647    #[serde(skip)]
1648    routing_key_hint: Option<Uuid>,
1649
1650    /// Used by the consumer to discrinminate the message.
1651    message_type: CheckInMessageType,
1652    /// Raw event payload.
1653    payload: Bytes,
1654    /// Time at which the event was received by Relay.
1655    start_time: u64,
1656    /// The SDK client which produced the event.
1657    sdk: Option<String>,
1658    /// The project id for the current event.
1659    project_id: ProjectId,
1660    /// Number of days to retain.
1661    retention_days: u16,
1662}
1663
1664#[derive(Debug, Deserialize, Serialize, Clone)]
1665struct SpanLink<'a> {
1666    pub trace_id: &'a str,
1667    pub span_id: &'a str,
1668    #[serde(default, skip_serializing_if = "Option::is_none")]
1669    pub sampled: Option<bool>,
1670    #[serde(borrow)]
1671    pub attributes: Option<&'a RawValue>,
1672}
1673
1674#[derive(Debug, Deserialize, Serialize, Clone)]
1675struct SpanMeasurement<'a> {
1676    #[serde(skip_serializing_if = "Option::is_none", borrow)]
1677    value: Option<&'a RawValue>,
1678}
1679
1680#[derive(Debug, Deserialize, Serialize, Clone)]
1681struct SpanKafkaMessage<'a> {
1682    #[serde(skip_serializing_if = "Option::is_none", borrow)]
1683    description: Option<Cow<'a, str>>,
1684    #[serde(default)]
1685    duration_ms: u32,
1686    /// The ID of the transaction event associated to this span, if any.
1687    #[serde(default, skip_serializing_if = "Option::is_none")]
1688    event_id: Option<EventId>,
1689    #[serde(rename(deserialize = "exclusive_time"))]
1690    exclusive_time_ms: f64,
1691    #[serde(default)]
1692    is_segment: bool,
1693    #[serde(default)]
1694    is_remote: bool,
1695
1696    #[serde(skip_serializing_if = "none_or_empty_map", borrow)]
1697    data: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1698    #[serde(default, skip_serializing_if = "Option::is_none")]
1699    kind: Option<Cow<'a, str>>,
1700    #[serde(default, skip_serializing_if = "none_or_empty_vec")]
1701    links: Option<Vec<SpanLink<'a>>>,
1702    #[serde(borrow, default, skip_serializing_if = "Option::is_none")]
1703    measurements: Option<BTreeMap<Cow<'a, str>, Option<SpanMeasurement<'a>>>>,
1704    #[serde(default)]
1705    organization_id: u64,
1706    #[serde(borrow, default, skip_serializing_if = "Option::is_none")]
1707    origin: Option<Cow<'a, str>>,
1708    #[serde(default, skip_serializing_if = "Option::is_none")]
1709    parent_span_id: Option<Cow<'a, str>>,
1710    #[serde(default, skip_serializing_if = "Option::is_none")]
1711    profile_id: Option<Cow<'a, str>>,
1712    /// The numeric ID of the project.
1713    #[serde(default)]
1714    project_id: u64,
1715    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1716    received: f64,
1717    /// Number of days until these data should be deleted.
1718    #[serde(default)]
1719    retention_days: u16,
1720    /// Number of days until the downsampled version of this data should be deleted.
1721    #[serde(default)]
1722    downsampled_retention_days: u16,
1723    #[serde(default, skip_serializing_if = "Option::is_none")]
1724    segment_id: Option<Cow<'a, str>>,
1725    #[serde(
1726        default,
1727        skip_serializing_if = "Option::is_none",
1728        serialize_with = "serialize_btreemap_skip_nulls"
1729    )]
1730    #[serde(borrow)]
1731    sentry_tags: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1732    span_id: Cow<'a, str>,
1733    #[serde(skip_serializing_if = "none_or_empty_map", borrow)]
1734    tags: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1735    trace_id: EventId,
1736
1737    #[serde(default)]
1738    start_timestamp_ms: u64,
1739    #[serde(rename(deserialize = "start_timestamp"))]
1740    start_timestamp_precise: f64,
1741    #[serde(rename(deserialize = "timestamp"))]
1742    end_timestamp_precise: f64,
1743
1744    #[serde(borrow, default, skip_serializing)]
1745    platform: Cow<'a, str>, // We only use this for logging for now
1746
1747    #[serde(default, skip_serializing_if = "Option::is_none")]
1748    client_sample_rate: Option<f64>,
1749    #[serde(default, skip_serializing_if = "Option::is_none")]
1750    server_sample_rate: Option<f64>,
1751
1752    #[serde(
1753        default,
1754        rename = "_meta",
1755        skip_serializing_if = "none_or_empty_object"
1756    )]
1757    meta: Option<&'a RawValue>,
1758
1759    #[serde(default, skip_serializing_if = "Option::is_none")]
1760    _performance_issues_spans: Option<bool>,
1761
1762    // Required for the buffer to emit outcomes scoped to the DSN.
1763    #[serde(skip_serializing_if = "Option::is_none")]
1764    key_id: Option<u64>,
1765}
1766
1767impl SpanKafkaMessage<'_> {
1768    /// Backfills `data` based on `sentry_tags`, `tags`, and `measurements`.
1769    ///
1770    /// * Every item in `sentry_tags` is copied to `data`, with the key prefixed with `sentry.`.
1771    ///   The only exception is the `description` tag, which is copied as `sentry.normalized_description`.
1772    ///
1773    /// * Every item in `tags` is copied to `data` verbatim, with the exception of `description`, which
1774    ///   is copied as `sentry.normalized_description`.
1775    ///
1776    /// * The value of every item in `measurements` is copied to `data` with the same key, with the exceptions
1777    ///   of `client_sample_rate` and `server_sample_rate`. Those measurements are instead written to the top-level
1778    ///   fields of the same names.
1779    ///
1780    /// In no case are existing keys overwritten. Thus, from highest to lowest, the order of precedence is
1781    /// * existing values in `data`
1782    /// * `measurements`
1783    /// * `tags`
1784    /// * `sentry_tags`
1785    fn backfill_data(&mut self) {
1786        let data = self.data.get_or_insert_default();
1787
1788        if let Some(measurements) = &self.measurements {
1789            for (key, value) in measurements {
1790                let Some(value) = value.as_ref().and_then(|v| v.value) else {
1791                    continue;
1792                };
1793
1794                match key.as_ref() {
1795                    "client_sample_rate" => {
1796                        data.entry(Cow::Borrowed("sentry.client_sample_rate"))
1797                            .or_insert(Some(value));
1798
1799                        if let Ok(client_sample_rate) = Deserialize::deserialize(value) {
1800                            self.client_sample_rate = Some(client_sample_rate);
1801                        }
1802                    }
1803                    "server_sample_rate" => {
1804                        data.entry(Cow::Borrowed("sentry.server_sample_rate"))
1805                            .or_insert(Some(value));
1806
1807                        if let Ok(server_sample_rate) = Deserialize::deserialize(value) {
1808                            self.server_sample_rate = Some(server_sample_rate);
1809                        }
1810                    }
1811                    _ => {
1812                        data.entry(key.clone()).or_insert(Some(value));
1813                    }
1814                }
1815            }
1816        }
1817
1818        if let Some(tags) = &self.tags {
1819            for (key, value) in tags {
1820                let Some(value) = value else {
1821                    continue;
1822                };
1823
1824                let key = if *key == "description" {
1825                    Cow::Borrowed("sentry.normalized_description")
1826                } else {
1827                    key.clone()
1828                };
1829
1830                data.entry(key).or_insert(Some(value));
1831            }
1832        }
1833
1834        if let Some(sentry_tags) = &self.sentry_tags {
1835            for (key, value) in sentry_tags {
1836                let Some(value) = value else {
1837                    continue;
1838                };
1839
1840                let key = if *key == "description" {
1841                    Cow::Borrowed("sentry.normalized_description")
1842                } else {
1843                    Cow::Owned(format!("sentry.{key}"))
1844                };
1845
1846                data.entry(key).or_insert(Some(value));
1847            }
1848        }
1849    }
1850}
1851
1852fn none_or_empty_object(value: &Option<&RawValue>) -> bool {
1853    match value {
1854        None => true,
1855        Some(raw) => raw.get() == "{}",
1856    }
1857}
1858
1859fn none_or_empty_vec<T>(value: &Option<Vec<T>>) -> bool {
1860    match &value {
1861        Some(vec) => vec.is_empty(),
1862        None => true,
1863    }
1864}
1865
1866fn none_or_empty_map<S, T>(value: &Option<BTreeMap<S, T>>) -> bool {
1867    value.as_ref().is_none_or(BTreeMap::is_empty)
1868}
1869
1870#[derive(Clone, Debug, Serialize)]
1871struct ProfileChunkKafkaMessage {
1872    organization_id: OrganizationId,
1873    project_id: ProjectId,
1874    received: u64,
1875    retention_days: u16,
1876    payload: Bytes,
1877}
1878
1879/// An enum over all possible ingest messages.
1880#[derive(Debug, Serialize)]
1881#[serde(tag = "type", rename_all = "snake_case")]
1882#[allow(clippy::large_enum_variant)]
1883enum KafkaMessage<'a> {
1884    Event(EventKafkaMessage),
1885    UserReport(UserReportKafkaMessage),
1886    Metric {
1887        #[serde(skip)]
1888        headers: BTreeMap<String, String>,
1889        #[serde(flatten)]
1890        message: MetricKafkaMessage<'a>,
1891    },
1892    CheckIn(CheckInKafkaMessage),
1893    Item {
1894        #[serde(skip)]
1895        headers: BTreeMap<String, String>,
1896        #[serde(skip)]
1897        item_type: TraceItemType,
1898        #[serde(skip)]
1899        message: TraceItem,
1900    },
1901    Span {
1902        #[serde(skip)]
1903        headers: BTreeMap<String, String>,
1904        #[serde(flatten)]
1905        message: SpanKafkaMessage<'a>,
1906    },
1907
1908    Attachment(AttachmentKafkaMessage),
1909    AttachmentChunk(AttachmentChunkKafkaMessage),
1910
1911    Profile(ProfileKafkaMessage),
1912    ProfileChunk(ProfileChunkKafkaMessage),
1913
1914    ReplayEvent(ReplayEventKafkaMessage<'a>),
1915    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1916}
1917
1918impl Message for KafkaMessage<'_> {
1919    fn variant(&self) -> &'static str {
1920        match self {
1921            KafkaMessage::Event(_) => "event",
1922            KafkaMessage::UserReport(_) => "user_report",
1923            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1924                MetricNamespace::Sessions => "metric_sessions",
1925                MetricNamespace::Transactions => "metric_transactions",
1926                MetricNamespace::Spans => "metric_spans",
1927                MetricNamespace::Custom => "metric_custom",
1928                MetricNamespace::Unsupported => "metric_unsupported",
1929            },
1930            KafkaMessage::CheckIn(_) => "check_in",
1931            KafkaMessage::Span { .. } => "span",
1932            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1933
1934            KafkaMessage::Attachment(_) => "attachment",
1935            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1936
1937            KafkaMessage::Profile(_) => "profile",
1938            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1939
1940            KafkaMessage::ReplayEvent(_) => "replay_event",
1941            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1942        }
1943    }
1944
1945    /// Returns the partitioning key for this Kafka message determining.
1946    fn key(&self) -> Option<relay_kafka::Key> {
1947        match self {
1948            Self::Event(message) => Some(message.event_id.0),
1949            Self::UserReport(message) => Some(message.event_id.0),
1950            Self::Span { message, .. } => Some(message.trace_id.0),
1951
1952            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1953            //
1954            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1955            // recieve the routing_key_hint form their envelopes.
1956            Self::CheckIn(message) => message.routing_key_hint,
1957
1958            Self::Attachment(message) => Some(message.event_id.0),
1959            Self::AttachmentChunk(message) => Some(message.event_id.0),
1960            Self::ReplayEvent(message) => Some(message.replay_id.0),
1961
1962            // Random partitioning
1963            _ => None,
1964        }
1965        .filter(|uuid| !uuid.is_nil())
1966        .map(|uuid| uuid.as_u128())
1967    }
1968
1969    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1970        match &self {
1971            KafkaMessage::Metric { headers, .. }
1972            | KafkaMessage::Span { headers, .. }
1973            | KafkaMessage::Item { headers, .. }
1974            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers),
1975            _ => None,
1976        }
1977    }
1978
1979    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1980        match self {
1981            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1982            KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1983            KafkaMessage::Span { message, .. } => serialize_as_json(message),
1984            KafkaMessage::Item { message, .. } => {
1985                let mut payload = Vec::new();
1986                match message.encode(&mut payload) {
1987                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1988                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1989                }
1990            }
1991            _ => match rmp_serde::to_vec_named(&self) {
1992                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1993                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1994            },
1995        }
1996    }
1997}
1998
1999fn serialize_as_json<T: serde::Serialize>(
2000    value: &T,
2001) -> Result<SerializationOutput<'_>, ClientError> {
2002    match serde_json::to_vec(value) {
2003        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
2004        Err(err) => Err(ClientError::InvalidJson(err)),
2005    }
2006}
2007
2008/// Determines if the given item is considered slow.
2009///
2010/// Slow items must be routed to the `Attachments` topic.
2011fn is_slow_item(item: &Item) -> bool {
2012    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
2013}
2014
2015fn bool_to_str(value: bool) -> &'static str {
2016    if value { "true" } else { "false" }
2017}
2018
2019/// Returns a safe timestamp for Kafka.
2020///
2021/// Kafka expects timestamps to be in UTC and in seconds since epoch.
2022fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
2023    let ts = timestamp.timestamp();
2024    if ts >= 0 {
2025        return ts as u64;
2026    }
2027
2028    // We assume this call can't return < 0.
2029    Utc::now().timestamp() as u64
2030}
2031
2032#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
2033enum SpansTarget {
2034    #[default]
2035    Protobuf,
2036    Json,
2037    Both,
2038}
2039
2040impl SpansTarget {
2041    fn is_protobuf(&self) -> bool {
2042        matches!(self, SpansTarget::Protobuf | SpansTarget::Both)
2043    }
2044
2045    fn is_json(&self) -> bool {
2046        matches!(self, SpansTarget::Json | SpansTarget::Both)
2047    }
2048}
2049
2050#[cfg(test)]
2051mod tests {
2052
2053    use super::*;
2054
2055    #[test]
2056    fn disallow_outcomes() {
2057        let config = Config::default();
2058        let producer = Producer::create(&config).unwrap();
2059
2060        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
2061            let res = producer
2062                .client
2063                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
2064
2065            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
2066        }
2067    }
2068
2069    #[test]
2070    fn backfill() {
2071        let json = r#"{
2072            "description": "/api/0/relays/projectconfigs/",
2073            "duration_ms": 152,
2074            "exclusive_time": 0.228,
2075            "is_segment": true,
2076            "data": {
2077                "sentry.environment": "development",
2078                "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
2079                "thread.name": "uWSGIWorker1Core0",
2080                "thread.id": "8522009600",
2081                "sentry.segment.name": "/api/0/relays/projectconfigs/",
2082                "sentry.sdk.name": "sentry.python.django",
2083                "sentry.sdk.version": "2.7.0",
2084                "my.float.field": 101.2,
2085                "my.int.field": 2000,
2086                "my.neg.field": -100,
2087                "my.neg.float.field": -101.2,
2088                "my.true.bool.field": true,
2089                "my.false.bool.field": false,
2090                "my.dict.field": {
2091                    "id": 42,
2092                    "name": "test"
2093                },
2094                "my.u64.field": 9447000002305251000,
2095                "my.array.field": [1, 2, ["nested", "array"]]
2096            },
2097            "measurements": {
2098                "num_of_spans": {"value": 50.0},
2099                "client_sample_rate": {"value": 0.1},
2100                "server_sample_rate": {"value": 0.2}
2101            },
2102            "profile_id": "56c7d1401ea14ad7b4ac86de46baebae",
2103            "organization_id": 1,
2104            "origin": "auto.http.django",
2105            "project_id": 1,
2106            "received": 1721319572.877828,
2107            "retention_days": 90,
2108            "segment_id": "8873a98879faf06d",
2109            "sentry_tags": {
2110                "description": "normalized_description",
2111                "category": "http",
2112                "environment": "development",
2113                "op": "http.server",
2114                "platform": "python",
2115                "release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
2116                "sdk.name": "sentry.python.django",
2117                "sdk.version": "2.7.0",
2118                "status": "ok",
2119                "status_code": "200",
2120                "thread.id": "8522009600",
2121                "thread.name": "uWSGIWorker1Core0",
2122                "trace.status": "ok",
2123                "transaction": "/api/0/relays/projectconfigs/",
2124                "transaction.method": "POST",
2125                "transaction.op": "http.server",
2126                "user": "ip:127.0.0.1"
2127            },
2128            "span_id": "8873a98879faf06d",
2129            "tags": {
2130                "http.status_code": "200",
2131                "relay_endpoint_version": "3",
2132                "relay_id": "88888888-4444-4444-8444-cccccccccccc",
2133                "relay_no_cache": "False",
2134                "relay_protocol_version": "3",
2135                "relay_use_post_or_schedule": "True",
2136                "relay_use_post_or_schedule_rejected": "version",
2137                "server_name": "D23CXQ4GK2.local",
2138                "spans_over_limit": "False"
2139            },
2140            "trace_id": "d099bf9ad5a143cf8f83a98081d0ed3b",
2141            "start_timestamp_ms": 1721319572616,
2142            "start_timestamp": 1721319572.616648,
2143            "timestamp": 1721319572.768806
2144        }"#;
2145        let mut span: SpanKafkaMessage = serde_json::from_str(json).unwrap();
2146        span.backfill_data();
2147
2148        assert_eq!(
2149            serde_json::to_string_pretty(&span.data).unwrap(),
2150            r#"{
2151  "http.status_code": "200",
2152  "my.array.field": [1, 2, ["nested", "array"]],
2153  "my.dict.field": {
2154                    "id": 42,
2155                    "name": "test"
2156                },
2157  "my.false.bool.field": false,
2158  "my.float.field": 101.2,
2159  "my.int.field": 2000,
2160  "my.neg.field": -100,
2161  "my.neg.float.field": -101.2,
2162  "my.true.bool.field": true,
2163  "my.u64.field": 9447000002305251000,
2164  "num_of_spans": 50.0,
2165  "relay_endpoint_version": "3",
2166  "relay_id": "88888888-4444-4444-8444-cccccccccccc",
2167  "relay_no_cache": "False",
2168  "relay_protocol_version": "3",
2169  "relay_use_post_or_schedule": "True",
2170  "relay_use_post_or_schedule_rejected": "version",
2171  "sentry.category": "http",
2172  "sentry.client_sample_rate": 0.1,
2173  "sentry.environment": "development",
2174  "sentry.normalized_description": "normalized_description",
2175  "sentry.op": "http.server",
2176  "sentry.platform": "python",
2177  "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
2178  "sentry.sdk.name": "sentry.python.django",
2179  "sentry.sdk.version": "2.7.0",
2180  "sentry.segment.name": "/api/0/relays/projectconfigs/",
2181  "sentry.server_sample_rate": 0.2,
2182  "sentry.status": "ok",
2183  "sentry.status_code": "200",
2184  "sentry.thread.id": "8522009600",
2185  "sentry.thread.name": "uWSGIWorker1Core0",
2186  "sentry.trace.status": "ok",
2187  "sentry.transaction": "/api/0/relays/projectconfigs/",
2188  "sentry.transaction.method": "POST",
2189  "sentry.transaction.op": "http.server",
2190  "sentry.user": "ip:127.0.0.1",
2191  "server_name": "D23CXQ4GK2.local",
2192  "spans_over_limit": "False",
2193  "thread.id": "8522009600",
2194  "thread.name": "uWSGIWorker1Core0"
2195}"#
2196        );
2197    }
2198}