relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use prost_types::Timestamp;
15use sentry_protos::snuba::v1::any_value::Value;
16use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType};
17use serde::ser::SerializeMap;
18use serde::{Deserialize, Serialize};
19use serde_json::value::RawValue;
20use serde_json::{Deserializer, Value as JsonValue};
21use uuid::Uuid;
22
23use relay_base_schema::data_category::DataCategory;
24use relay_base_schema::organization::OrganizationId;
25use relay_base_schema::project::ProjectId;
26use relay_common::time::UnixTimestamp;
27use relay_config::Config;
28use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};
29use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
30use relay_metrics::{
31    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
32    MetricNamespace, SetView,
33};
34use relay_protocol::FiniteF64;
35use relay_quotas::Scoping;
36use relay_statsd::metric;
37use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
38use relay_threading::AsyncPool;
39
40use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
41use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope};
42use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
43use crate::service::ServiceError;
44use crate::services::global_config::GlobalConfigHandle;
45use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
46use crate::services::processor::Processed;
47use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
48use crate::utils::FormDataIter;
49
50/// Fallback name used for attachment items without a `filename` header.
51const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
52
53#[derive(Debug, thiserror::Error)]
54pub enum StoreError {
55    #[error("failed to send the message to kafka: {0}")]
56    SendFailed(#[from] ClientError),
57    #[error("failed to encode data: {0}")]
58    EncodingFailed(std::io::Error),
59    #[error("failed to store event because event id was missing")]
60    NoEventId,
61}
62
63impl OutcomeError for StoreError {
64    type Error = Self;
65
66    fn consume(self) -> (Option<Outcome>, Self::Error) {
67        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
68    }
69}
70
71struct Producer {
72    client: KafkaClient,
73}
74
75impl Producer {
76    pub fn create(config: &Config) -> anyhow::Result<Self> {
77        let mut client_builder = KafkaClient::builder();
78
79        for topic in KafkaTopic::iter().filter(|t| {
80            // Outcomes should not be sent from the store forwarder.
81            // See `KafkaOutcomesProducer`.
82            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
83        }) {
84            let kafka_configs = config.kafka_configs(*topic)?;
85            client_builder = client_builder
86                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
87                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
88        }
89
90        Ok(Self {
91            client: client_builder.build(),
92        })
93    }
94}
95
96/// Publishes an [`Envelope`] to the Sentry core application through Kafka topics.
97#[derive(Debug)]
98pub struct StoreEnvelope {
99    pub envelope: TypedEnvelope<Processed>,
100}
101
102/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
103#[derive(Clone, Debug)]
104pub struct StoreMetrics {
105    pub buckets: Vec<Bucket>,
106    pub scoping: Scoping,
107    pub retention: u16,
108}
109
110/// Publishes a log item to Sentry core application through Kafka.
111#[derive(Debug)]
112pub struct StoreLog {
113    /// The final trace item which will be produced to Kafka.
114    pub trace_item: TraceItem,
115    /// Outcomes to be emitted when successfully producing the item to Kafka.
116    ///
117    /// Note: this is only a temporary measure, long term these outcomes will be part of the trace
118    /// item and emitted by Snuba to guarantee a delivery to storage.
119    pub quantities: Quantities,
120}
121
122impl Counted for StoreLog {
123    fn quantities(&self) -> Quantities {
124        self.quantities.clone()
125    }
126}
127
128/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
129pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
130
131/// Service interface for the [`StoreEnvelope`] message.
132#[derive(Debug)]
133pub enum Store {
134    /// An envelope containing a mixture of items.
135    ///
136    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
137    /// for example logs must be submitted via [`Self::Log`] instead.
138    ///
139    /// Long term this variant is going to be replaced with fully typed variants of items which can
140    /// be stored instead.
141    Envelope(StoreEnvelope),
142    /// Aggregated generic metrics.
143    Metrics(StoreMetrics),
144    /// A singular log item.
145    Log(Managed<StoreLog>),
146}
147
148impl Store {
149    /// Returns the name of the message variant.
150    fn variant(&self) -> &'static str {
151        match self {
152            Store::Envelope(_) => "envelope",
153            Store::Metrics(_) => "metrics",
154            Store::Log(_) => "log",
155        }
156    }
157}
158
159impl Interface for Store {}
160
161impl FromMessage<StoreEnvelope> for Store {
162    type Response = NoResponse;
163
164    fn from_message(message: StoreEnvelope, _: ()) -> Self {
165        Self::Envelope(message)
166    }
167}
168
169impl FromMessage<StoreMetrics> for Store {
170    type Response = NoResponse;
171
172    fn from_message(message: StoreMetrics, _: ()) -> Self {
173        Self::Metrics(message)
174    }
175}
176
177impl FromMessage<Managed<StoreLog>> for Store {
178    type Response = NoResponse;
179
180    fn from_message(message: Managed<StoreLog>, _: ()) -> Self {
181        Self::Log(message)
182    }
183}
184
185/// Service implementing the [`Store`] interface.
186pub struct StoreService {
187    pool: StoreServicePool,
188    config: Arc<Config>,
189    global_config: GlobalConfigHandle,
190    outcome_aggregator: Addr<TrackOutcome>,
191    metric_outcomes: MetricOutcomes,
192    producer: Producer,
193}
194
195impl StoreService {
196    pub fn create(
197        pool: StoreServicePool,
198        config: Arc<Config>,
199        global_config: GlobalConfigHandle,
200        outcome_aggregator: Addr<TrackOutcome>,
201        metric_outcomes: MetricOutcomes,
202    ) -> anyhow::Result<Self> {
203        let producer = Producer::create(&config)?;
204        Ok(Self {
205            pool,
206            config,
207            global_config,
208            outcome_aggregator,
209            metric_outcomes,
210            producer,
211        })
212    }
213
214    fn handle_message(&self, message: Store) {
215        let ty = message.variant();
216        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
217            match message {
218                Store::Envelope(message) => self.handle_store_envelope(message),
219                Store::Metrics(message) => self.handle_store_metrics(message),
220                Store::Log(message) => self.handle_store_log(message),
221            }
222        })
223    }
224
225    fn handle_store_envelope(&self, message: StoreEnvelope) {
226        let StoreEnvelope {
227            envelope: mut managed,
228        } = message;
229
230        let scoping = managed.scoping();
231        let envelope = managed.take_envelope();
232
233        match self.store_envelope(envelope, managed.received_at(), scoping) {
234            Ok(()) => managed.accept(),
235            Err(error) => {
236                managed.reject(Outcome::Invalid(DiscardReason::Internal));
237                relay_log::error!(
238                    error = &error as &dyn Error,
239                    tags.project_key = %scoping.project_key,
240                    "failed to store envelope"
241                );
242            }
243        }
244    }
245
246    fn store_envelope(
247        &self,
248        mut envelope: Box<Envelope>,
249        received_at: DateTime<Utc>,
250        scoping: Scoping,
251    ) -> Result<(), StoreError> {
252        let retention = envelope.retention();
253
254        let event_id = envelope.event_id();
255        let event_item = envelope.as_mut().take_item_by(|item| {
256            matches!(
257                item.ty(),
258                ItemType::Event | ItemType::Transaction | ItemType::Security
259            )
260        });
261        let event_type = event_item.as_ref().map(|item| item.ty());
262
263        let topic = if envelope.get_item_by(is_slow_item).is_some() {
264            KafkaTopic::Attachments
265        } else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
266            KafkaTopic::Transactions
267        } else {
268            KafkaTopic::Events
269        };
270
271        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
272
273        let mut attachments = Vec::new();
274        let mut replay_event = None;
275        let mut replay_recording = None;
276
277        for item in envelope.items() {
278            match item.ty() {
279                ItemType::Attachment => {
280                    debug_assert!(topic == KafkaTopic::Attachments);
281                    if let Some(attachment) = self.produce_attachment(
282                        event_id.ok_or(StoreError::NoEventId)?,
283                        scoping.project_id,
284                        item,
285                        send_individual_attachments,
286                    )? {
287                        attachments.push(attachment);
288                    }
289                }
290                ItemType::UserReport => {
291                    debug_assert!(topic == KafkaTopic::Attachments);
292                    self.produce_user_report(
293                        event_id.ok_or(StoreError::NoEventId)?,
294                        scoping.project_id,
295                        received_at,
296                        item,
297                    )?;
298                }
299                ItemType::UserReportV2 => {
300                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
301                    self.produce_user_report_v2(
302                        event_id.ok_or(StoreError::NoEventId)?,
303                        scoping.project_id,
304                        received_at,
305                        item,
306                        remote_addr,
307                    )?;
308                }
309                ItemType::Profile => self.produce_profile(
310                    scoping.organization_id,
311                    scoping.project_id,
312                    scoping.key_id,
313                    received_at,
314                    retention,
315                    item,
316                )?,
317                ItemType::ReplayVideo => {
318                    self.produce_replay_video(
319                        event_id,
320                        scoping,
321                        item.payload(),
322                        received_at,
323                        retention,
324                    )?;
325                }
326                ItemType::ReplayRecording => {
327                    replay_recording = Some(item);
328                }
329                ItemType::ReplayEvent => {
330                    replay_event = Some(item);
331                    self.produce_replay_event(
332                        event_id.ok_or(StoreError::NoEventId)?,
333                        scoping.project_id,
334                        received_at,
335                        retention,
336                        &item.payload(),
337                    )?;
338                }
339                ItemType::CheckIn => {
340                    let client = envelope.meta().client();
341                    self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
342                }
343                ItemType::Span => {
344                    self.produce_span(scoping, received_at, event_id, retention, item)?
345                }
346                ItemType::Log => self.produce_log(scoping, received_at, retention, item)?,
347                ItemType::ProfileChunk => self.produce_profile_chunk(
348                    scoping.organization_id,
349                    scoping.project_id,
350                    received_at,
351                    retention,
352                    item,
353                )?,
354                other => {
355                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
356                    let item_types = envelope
357                        .items()
358                        .map(|item| item.ty().as_str())
359                        .collect::<Vec<_>>();
360                    let attachment_types = envelope
361                        .items()
362                        .map(|item| {
363                            item.attachment_type()
364                                .map(|t| t.to_string())
365                                .unwrap_or_default()
366                        })
367                        .collect::<Vec<_>>();
368
369                    relay_log::with_scope(
370                        |scope| {
371                            scope.set_extra("item_types", item_types.into());
372                            scope.set_extra("attachment_types", attachment_types.into());
373                            if other == &ItemType::FormData {
374                                let payload = item.payload();
375                                let form_data_keys = FormDataIter::new(&payload)
376                                    .map(|entry| entry.key())
377                                    .collect::<Vec<_>>();
378                                scope.set_extra("form_data_keys", form_data_keys.into());
379                            }
380                        },
381                        || {
382                            relay_log::error!(
383                                tags.project_key = %scoping.project_key,
384                                tags.event_type = event_type.unwrap_or("none"),
385                                "StoreService received unexpected item type: {other}"
386                            )
387                        },
388                    )
389                }
390            }
391        }
392
393        if let Some(recording) = replay_recording {
394            // If a recording item type was seen we produce it to Kafka with the replay-event
395            // payload (should it have been provided).
396            //
397            // The replay_video value is always specified as `None`. We do not allow separate
398            // item types for `ReplayVideo` events.
399            let replay_event = replay_event.map(|rv| rv.payload());
400            self.produce_replay_recording(
401                event_id,
402                scoping,
403                &recording.payload(),
404                replay_event.as_deref(),
405                None,
406                received_at,
407                retention,
408            )?;
409        }
410
411        if let Some(event_item) = event_item {
412            let event_id = event_id.ok_or(StoreError::NoEventId)?;
413            let project_id = scoping.project_id;
414            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
415
416            self.produce(
417                topic,
418                KafkaMessage::Event(EventKafkaMessage {
419                    payload: event_item.payload(),
420                    start_time: safe_timestamp(received_at),
421                    event_id,
422                    project_id,
423                    remote_addr,
424                    attachments,
425                }),
426            )?;
427        } else {
428            debug_assert!(attachments.is_empty());
429        }
430
431        Ok(())
432    }
433
434    fn handle_store_metrics(&self, message: StoreMetrics) {
435        let StoreMetrics {
436            buckets,
437            scoping,
438            retention,
439        } = message;
440
441        let batch_size = self.config.metrics_max_batch_size_bytes();
442        let mut error = None;
443
444        let global_config = self.global_config.current();
445        let mut encoder = BucketEncoder::new(&global_config);
446
447        let now = UnixTimestamp::now();
448        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
449
450        for mut bucket in buckets {
451            let namespace = encoder.prepare(&mut bucket);
452
453            if let Some(received_at) = bucket.metadata.received_at {
454                let delay = now.as_secs().saturating_sub(received_at.as_secs());
455                let (total, count, max) = delay_stats.get_mut(namespace);
456                *total += delay;
457                *count += 1;
458                *max = (*max).max(delay);
459            }
460
461            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
462            // each bucket separately, we only need to split buckets that exceed the size, but not
463            // batches.
464            for view in BucketsView::new(std::slice::from_ref(&bucket))
465                .by_size(batch_size)
466                .flatten()
467            {
468                let message = self.create_metric_message(
469                    scoping.organization_id,
470                    scoping.project_id,
471                    &mut encoder,
472                    namespace,
473                    &view,
474                    retention,
475                );
476
477                let result =
478                    message.and_then(|message| self.send_metric_message(namespace, message));
479
480                let outcome = match result {
481                    Ok(()) => Outcome::Accepted,
482                    Err(e) => {
483                        error.get_or_insert(e);
484                        Outcome::Invalid(DiscardReason::Internal)
485                    }
486                };
487
488                self.metric_outcomes.track(scoping, &[view], outcome);
489            }
490        }
491
492        if let Some(error) = error {
493            relay_log::error!(
494                error = &error as &dyn std::error::Error,
495                "failed to produce metric buckets: {error}"
496            );
497        }
498
499        for (namespace, (total, count, max)) in delay_stats {
500            if count == 0 {
501                continue;
502            }
503            metric!(
504                counter(RelayCounters::MetricDelaySum) += total,
505                namespace = namespace.as_str()
506            );
507            metric!(
508                counter(RelayCounters::MetricDelayCount) += count,
509                namespace = namespace.as_str()
510            );
511            metric!(
512                gauge(RelayGauges::MetricDelayMax) = max,
513                namespace = namespace.as_str()
514            );
515        }
516    }
517
518    fn handle_store_log(&self, message: Managed<StoreLog>) {
519        let scoping = message.scoping();
520        let received_at = message.received_at();
521
522        let quantities = message.try_accept(|log| {
523            let message = KafkaMessage::Item {
524                headers: BTreeMap::from([
525                    ("project_id".to_owned(), scoping.project_id.to_string()),
526                    (
527                        "item_type".to_owned(),
528                        (TraceItemType::Log as i32).to_string(),
529                    ),
530                ]),
531                message: log.trace_item,
532                item_type: TraceItemType::Log,
533            };
534
535            self.produce(KafkaTopic::Items, message)
536                .map(|()| log.quantities)
537        });
538
539        // Accepted outcomes when items have been successfully produced to rdkafka.
540        //
541        // This is only a temporary measure, long term these outcomes will be part of the trace
542        // item and emitted by Snuba to guarantee a delivery to storage.
543        if let Ok(quantities) = quantities {
544            for (category, quantity) in quantities {
545                self.outcome_aggregator.send(TrackOutcome {
546                    category,
547                    event_id: None,
548                    outcome: Outcome::Accepted,
549                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
550                    remote_addr: None,
551                    scoping,
552                    timestamp: received_at,
553                });
554            }
555        }
556    }
557
558    fn create_metric_message<'a>(
559        &self,
560        organization_id: OrganizationId,
561        project_id: ProjectId,
562        encoder: &'a mut BucketEncoder,
563        namespace: MetricNamespace,
564        view: &BucketView<'a>,
565        retention_days: u16,
566    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
567        let value = match view.value() {
568            BucketViewValue::Counter(c) => MetricValue::Counter(c),
569            BucketViewValue::Distribution(data) => MetricValue::Distribution(
570                encoder
571                    .encode_distribution(namespace, data)
572                    .map_err(StoreError::EncodingFailed)?,
573            ),
574            BucketViewValue::Set(data) => MetricValue::Set(
575                encoder
576                    .encode_set(namespace, data)
577                    .map_err(StoreError::EncodingFailed)?,
578            ),
579            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
580        };
581
582        Ok(MetricKafkaMessage {
583            org_id: organization_id,
584            project_id,
585            name: view.name(),
586            value,
587            timestamp: view.timestamp(),
588            tags: view.tags(),
589            retention_days,
590            received_at: view.metadata().received_at,
591        })
592    }
593
594    fn produce(
595        &self,
596        topic: KafkaTopic,
597        // Takes message by value to ensure it is not being produced twice.
598        message: KafkaMessage,
599    ) -> Result<(), StoreError> {
600        relay_log::trace!("Sending kafka message of type {}", message.variant());
601
602        let topic_name = self.producer.client.send_message(topic, &message)?;
603
604        match &message {
605            KafkaMessage::Metric {
606                message: metric, ..
607            } => {
608                metric!(
609                    counter(RelayCounters::ProcessingMessageProduced) += 1,
610                    event_type = message.variant(),
611                    topic = topic_name,
612                    metric_type = metric.value.variant(),
613                    metric_encoding = metric.value.encoding().unwrap_or(""),
614                );
615            }
616            KafkaMessage::Span { message: span, .. } => {
617                let is_segment = span.is_segment;
618                let has_parent = span.parent_span_id.is_some();
619                let platform = VALID_PLATFORMS.iter().find(|p| *p == &span.platform);
620
621                metric!(
622                    counter(RelayCounters::ProcessingMessageProduced) += 1,
623                    event_type = message.variant(),
624                    topic = topic_name,
625                    platform = platform.unwrap_or(&""),
626                    is_segment = bool_to_str(is_segment),
627                    has_parent = bool_to_str(has_parent),
628                    topic = topic_name,
629                );
630            }
631            KafkaMessage::ReplayRecordingNotChunked(replay) => {
632                let has_video = replay.replay_video.is_some();
633
634                metric!(
635                    counter(RelayCounters::ProcessingMessageProduced) += 1,
636                    event_type = message.variant(),
637                    topic = topic_name,
638                    has_video = bool_to_str(has_video),
639                );
640            }
641            message => {
642                metric!(
643                    counter(RelayCounters::ProcessingMessageProduced) += 1,
644                    event_type = message.variant(),
645                    topic = topic_name,
646                );
647            }
648        }
649
650        Ok(())
651    }
652
653    /// Produces Kafka messages for the content and metadata of an attachment item.
654    ///
655    /// The `send_individual_attachments` controls whether the metadata of an attachment
656    /// is produced directly as an individual `attachment` message, or returned from this function
657    /// to be later sent as part of an `event` message.
658    ///
659    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
660    /// unless the `send_individual_attachments` flag is set, and the content is small enough
661    /// to fit inside a message.
662    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
663    /// of the `attachment` message instead.
664    fn produce_attachment(
665        &self,
666        event_id: EventId,
667        project_id: ProjectId,
668        item: &Item,
669        send_individual_attachments: bool,
670    ) -> Result<Option<ChunkedAttachment>, StoreError> {
671        let id = Uuid::new_v4().to_string();
672
673        let mut chunk_index = 0;
674        let payload = item.payload();
675        let size = item.len();
676        let max_chunk_size = self.config.attachment_chunk_size();
677
678        // When sending individual attachments, and we have a single chunk, we want to send the
679        // `data` inline in the `attachment` message.
680        // This avoids a needless roundtrip through the attachments cache on the Sentry side.
681        let data = if send_individual_attachments && size < max_chunk_size {
682            (size > 0).then_some(payload)
683        } else {
684            let mut offset = 0;
685            // This skips chunks for empty attachments. The consumer does not require chunks for
686            // empty attachments. `chunks` will be `0` in this case.
687            while offset < size {
688                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
689                let chunk_message = AttachmentChunkKafkaMessage {
690                    payload: payload.slice(offset..offset + chunk_size),
691                    event_id,
692                    project_id,
693                    id: id.clone(),
694                    chunk_index,
695                };
696
697                self.produce(
698                    KafkaTopic::Attachments,
699                    KafkaMessage::AttachmentChunk(chunk_message),
700                )?;
701                offset += chunk_size;
702                chunk_index += 1;
703            }
704            None
705        };
706
707        // The chunk_index is incremented after every loop iteration. After we exit the loop, it
708        // is one larger than the last chunk, so it is equal to the number of chunks.
709
710        let attachment = ChunkedAttachment {
711            id,
712            name: match item.filename() {
713                Some(name) => name.to_owned(),
714                None => UNNAMED_ATTACHMENT.to_owned(),
715            },
716            content_type: item
717                .content_type()
718                .map(|content_type| content_type.as_str().to_owned()),
719            attachment_type: item.attachment_type().cloned().unwrap_or_default(),
720            chunks: chunk_index,
721            data,
722            size: Some(size),
723            rate_limited: Some(item.rate_limited()),
724        };
725
726        if send_individual_attachments {
727            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
728                event_id,
729                project_id,
730                attachment,
731            });
732            self.produce(KafkaTopic::Attachments, message)?;
733            Ok(None)
734        } else {
735            Ok(Some(attachment))
736        }
737    }
738
739    fn produce_user_report(
740        &self,
741        event_id: EventId,
742        project_id: ProjectId,
743        received_at: DateTime<Utc>,
744        item: &Item,
745    ) -> Result<(), StoreError> {
746        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
747            project_id,
748            event_id,
749            start_time: safe_timestamp(received_at),
750            payload: item.payload(),
751        });
752
753        self.produce(KafkaTopic::Attachments, message)
754    }
755
756    fn produce_user_report_v2(
757        &self,
758        event_id: EventId,
759        project_id: ProjectId,
760        received_at: DateTime<Utc>,
761        item: &Item,
762        remote_addr: Option<String>,
763    ) -> Result<(), StoreError> {
764        let message = KafkaMessage::Event(EventKafkaMessage {
765            project_id,
766            event_id,
767            payload: item.payload(),
768            start_time: safe_timestamp(received_at),
769            remote_addr,
770            attachments: vec![],
771        });
772        self.produce(KafkaTopic::Feedback, message)
773    }
774
775    fn send_metric_message(
776        &self,
777        namespace: MetricNamespace,
778        message: MetricKafkaMessage,
779    ) -> Result<(), StoreError> {
780        let topic = match namespace {
781            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
782            MetricNamespace::Unsupported => {
783                relay_log::with_scope(
784                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
785                    || relay_log::error!("store service dropping unknown metric usecase"),
786                );
787                return Ok(());
788            }
789            _ => KafkaTopic::MetricsGeneric,
790        };
791        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
792
793        self.produce(topic, KafkaMessage::Metric { headers, message })?;
794        Ok(())
795    }
796
797    fn produce_profile(
798        &self,
799        organization_id: OrganizationId,
800        project_id: ProjectId,
801        key_id: Option<u64>,
802        received_at: DateTime<Utc>,
803        retention_days: u16,
804        item: &Item,
805    ) -> Result<(), StoreError> {
806        let message = ProfileKafkaMessage {
807            organization_id,
808            project_id,
809            key_id,
810            received: safe_timestamp(received_at),
811            retention_days,
812            headers: BTreeMap::from([(
813                "sampled".to_owned(),
814                if item.sampled() { "true" } else { "false" }.to_owned(),
815            )]),
816            payload: item.payload(),
817        };
818        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
819        Ok(())
820    }
821
822    fn produce_replay_event(
823        &self,
824        replay_id: EventId,
825        project_id: ProjectId,
826        received_at: DateTime<Utc>,
827        retention_days: u16,
828        payload: &[u8],
829    ) -> Result<(), StoreError> {
830        let message = ReplayEventKafkaMessage {
831            replay_id,
832            project_id,
833            retention_days,
834            start_time: safe_timestamp(received_at),
835            payload,
836        };
837        self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
838        Ok(())
839    }
840
841    #[allow(clippy::too_many_arguments)]
842    fn produce_replay_recording(
843        &self,
844        event_id: Option<EventId>,
845        scoping: Scoping,
846        payload: &[u8],
847        replay_event: Option<&[u8]>,
848        replay_video: Option<&[u8]>,
849        received_at: DateTime<Utc>,
850        retention: u16,
851    ) -> Result<(), StoreError> {
852        // Maximum number of bytes accepted by the consumer.
853        let max_payload_size = self.config.max_replay_message_size();
854
855        // Size of the consumer message. We can be reasonably sure this won't overflow because
856        // of the request size validation provided by Nginx and Relay.
857        let mut payload_size = 2000; // Reserve 2KB for the message metadata.
858        payload_size += replay_event.as_ref().map_or(0, |b| b.len());
859        payload_size += replay_video.as_ref().map_or(0, |b| b.len());
860        payload_size += payload.len();
861
862        // If the recording payload can not fit in to the message do not produce and quit early.
863        if payload_size >= max_payload_size {
864            relay_log::debug!("replay_recording over maximum size.");
865            self.outcome_aggregator.send(TrackOutcome {
866                category: DataCategory::Replay,
867                event_id,
868                outcome: Outcome::Invalid(DiscardReason::TooLarge(
869                    DiscardItemType::ReplayRecording,
870                )),
871                quantity: 1,
872                remote_addr: None,
873                scoping,
874                timestamp: received_at,
875            });
876            return Ok(());
877        }
878
879        let message =
880            KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
881                replay_id: event_id.ok_or(StoreError::NoEventId)?,
882                project_id: scoping.project_id,
883                key_id: scoping.key_id,
884                org_id: scoping.organization_id,
885                received: safe_timestamp(received_at),
886                retention_days: retention,
887                payload,
888                replay_event,
889                replay_video,
890            });
891
892        self.produce(KafkaTopic::ReplayRecordings, message)?;
893
894        Ok(())
895    }
896
897    fn produce_replay_video(
898        &self,
899        event_id: Option<EventId>,
900        scoping: Scoping,
901        payload: Bytes,
902        received_at: DateTime<Utc>,
903        retention: u16,
904    ) -> Result<(), StoreError> {
905        #[derive(Deserialize)]
906        struct VideoEvent<'a> {
907            replay_event: &'a [u8],
908            replay_recording: &'a [u8],
909            replay_video: &'a [u8],
910        }
911
912        let Ok(VideoEvent {
913            replay_video,
914            replay_event,
915            replay_recording,
916        }) = rmp_serde::from_slice::<VideoEvent>(&payload)
917        else {
918            self.outcome_aggregator.send(TrackOutcome {
919                category: DataCategory::Replay,
920                event_id,
921                outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
922                quantity: 1,
923                remote_addr: None,
924                scoping,
925                timestamp: received_at,
926            });
927            return Ok(());
928        };
929
930        self.produce_replay_event(
931            event_id.ok_or(StoreError::NoEventId)?,
932            scoping.project_id,
933            received_at,
934            retention,
935            replay_event,
936        )?;
937
938        self.produce_replay_recording(
939            event_id,
940            scoping,
941            replay_recording,
942            Some(replay_event),
943            Some(replay_video),
944            received_at,
945            retention,
946        )
947    }
948
949    fn produce_check_in(
950        &self,
951        project_id: ProjectId,
952        received_at: DateTime<Utc>,
953        client: Option<&str>,
954        retention_days: u16,
955        item: &Item,
956    ) -> Result<(), StoreError> {
957        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
958            message_type: CheckInMessageType::CheckIn,
959            project_id,
960            retention_days,
961            start_time: safe_timestamp(received_at),
962            sdk: client.map(str::to_owned),
963            payload: item.payload(),
964            routing_key_hint: item.routing_hint(),
965        });
966
967        self.produce(KafkaTopic::Monitors, message)?;
968
969        Ok(())
970    }
971
972    fn produce_span(
973        &self,
974        scoping: Scoping,
975        received_at: DateTime<Utc>,
976        event_id: Option<EventId>,
977        retention_days: u16,
978        item: &Item,
979    ) -> Result<(), StoreError> {
980        relay_log::trace!("Producing span");
981
982        let payload = item.payload();
983        let d = &mut Deserializer::from_slice(&payload);
984        let mut span: SpanKafkaMessage = match serde_path_to_error::deserialize(d) {
985            Ok(span) => span,
986            Err(error) => {
987                relay_log::error!(
988                    error = &error as &dyn std::error::Error,
989                    "failed to parse span"
990                );
991                self.outcome_aggregator.send(TrackOutcome {
992                    category: DataCategory::SpanIndexed,
993                    event_id: None,
994                    outcome: Outcome::Invalid(DiscardReason::InvalidSpan),
995                    quantity: 1,
996                    remote_addr: None,
997                    scoping,
998                    timestamp: received_at,
999                });
1000                return Ok(());
1001            }
1002        };
1003
1004        // Discard measurements with empty `value`s.
1005        if let Some(measurements) = &mut span.measurements {
1006            measurements.retain(|_, v| v.as_ref().and_then(|v| v.value).is_some());
1007        }
1008
1009        span.backfill_data();
1010        span.duration_ms =
1011            ((span.end_timestamp_precise - span.start_timestamp_precise) * 1e3) as u32;
1012        span.event_id = event_id;
1013        span.organization_id = scoping.organization_id.value();
1014        span.project_id = scoping.project_id.value();
1015        span.retention_days = retention_days;
1016        span.start_timestamp_ms = (span.start_timestamp_precise * 1e3) as u64;
1017
1018        if self.config.produce_protobuf_spans() {
1019            self.inner_produce_protobuf_span(
1020                scoping,
1021                received_at,
1022                event_id,
1023                retention_days,
1024                span.clone(),
1025            )?;
1026        }
1027
1028        if self.config.produce_json_spans() {
1029            self.inner_produce_json_span(scoping, span)?;
1030        }
1031
1032        self.outcome_aggregator.send(TrackOutcome {
1033            category: DataCategory::SpanIndexed,
1034            event_id: None,
1035            outcome: Outcome::Accepted,
1036            quantity: 1,
1037            remote_addr: None,
1038            scoping,
1039            timestamp: received_at,
1040        });
1041
1042        Ok(())
1043    }
1044
1045    fn inner_produce_json_span(
1046        &self,
1047        scoping: Scoping,
1048        span: SpanKafkaMessage,
1049    ) -> Result<(), StoreError> {
1050        self.produce(
1051            KafkaTopic::Spans,
1052            KafkaMessage::Span {
1053                headers: BTreeMap::from([(
1054                    "project_id".to_owned(),
1055                    scoping.project_id.to_string(),
1056                )]),
1057                message: span,
1058            },
1059        )?;
1060
1061        Ok(())
1062    }
1063
1064    fn inner_produce_protobuf_span(
1065        &self,
1066        scoping: Scoping,
1067        received_at: DateTime<Utc>,
1068        event_id: Option<EventId>,
1069        retention_days: u16,
1070        span: SpanKafkaMessage,
1071    ) -> Result<(), StoreError> {
1072        let mut trace_item = TraceItem {
1073            item_type: TraceItemType::Span.into(),
1074            organization_id: scoping.organization_id.value(),
1075            project_id: scoping.project_id.value(),
1076            received: Some(Timestamp {
1077                seconds: safe_timestamp(received_at) as i64,
1078                nanos: 0,
1079            }),
1080            retention_days: retention_days.into(),
1081            timestamp: Some(Timestamp {
1082                seconds: span.start_timestamp_precise as i64,
1083                nanos: 0,
1084            }),
1085            trace_id: span.trace_id.to_string(),
1086            item_id: u128::from_str_radix(&span.span_id, 16)
1087                .unwrap_or_default()
1088                .to_le_bytes()
1089                .to_vec(),
1090            attributes: Default::default(),
1091            client_sample_rate: span.client_sample_rate.unwrap_or_default(),
1092            server_sample_rate: span.server_sample_rate.unwrap_or_default(),
1093        };
1094
1095        if let Some(data) = span.data {
1096            for (key, raw_value) in data {
1097                let Some(raw_value) = raw_value else {
1098                    continue;
1099                };
1100
1101                let json_value = match Deserialize::deserialize(raw_value) {
1102                    Ok(v) => v,
1103                    Err(error) => {
1104                        // This should not be possible: a `RawValue` is definitely valid JSON,
1105                        // so deserializing it to a `json::Value` must succeed. But better safe
1106                        // than sorry.
1107                        relay_log::error!(
1108                            error = &error as &dyn std::error::Error,
1109                            raw_value = %raw_value,
1110                            "failed to parse JSON value"
1111                        );
1112                        continue;
1113                    }
1114                };
1115
1116                let any_value = match json_value {
1117                    JsonValue::String(string) => AnyValue {
1118                        value: Some(Value::StringValue(string)),
1119                    },
1120                    JsonValue::Number(number) => {
1121                        if number.is_i64() || number.is_u64() {
1122                            AnyValue {
1123                                value: Some(Value::IntValue(number.as_i64().unwrap_or_default())),
1124                            }
1125                        } else {
1126                            AnyValue {
1127                                value: Some(Value::DoubleValue(
1128                                    number.as_f64().unwrap_or_default(),
1129                                )),
1130                            }
1131                        }
1132                    }
1133                    JsonValue::Bool(bool) => AnyValue {
1134                        value: Some(Value::BoolValue(bool)),
1135                    },
1136                    JsonValue::Array(array) => AnyValue {
1137                        value: Some(Value::StringValue(
1138                            serde_json::to_string(&array).unwrap_or_default(),
1139                        )),
1140                    },
1141                    JsonValue::Object(object) => AnyValue {
1142                        value: Some(Value::StringValue(
1143                            serde_json::to_string(&object).unwrap_or_default(),
1144                        )),
1145                    },
1146                    _ => continue,
1147                };
1148
1149                trace_item.attributes.insert(key.into(), any_value);
1150            }
1151        }
1152
1153        if let Some(description) = span.description {
1154            trace_item.attributes.insert(
1155                "sentry.raw_description".into(),
1156                AnyValue {
1157                    value: Some(Value::StringValue(description.into())),
1158                },
1159            );
1160        }
1161
1162        trace_item.attributes.insert(
1163            "sentry.duration_ms".into(),
1164            AnyValue {
1165                value: Some(Value::IntValue(span.duration_ms.into())),
1166            },
1167        );
1168
1169        if let Some(event_id) = event_id {
1170            trace_item.attributes.insert(
1171                "sentry.event_id".into(),
1172                AnyValue {
1173                    value: Some(Value::StringValue(event_id.0.as_simple().to_string())),
1174                },
1175            );
1176        }
1177
1178        trace_item.attributes.insert(
1179            "sentry.is_segment".into(),
1180            AnyValue {
1181                value: Some(Value::BoolValue(span.is_segment)),
1182            },
1183        );
1184
1185        trace_item.attributes.insert(
1186            "sentry.exclusive_time_ms".into(),
1187            AnyValue {
1188                value: Some(Value::DoubleValue(span.exclusive_time_ms)),
1189            },
1190        );
1191
1192        trace_item.attributes.insert(
1193            "sentry.start_timestamp_precise".into(),
1194            AnyValue {
1195                value: Some(Value::DoubleValue(span.start_timestamp_precise)),
1196            },
1197        );
1198
1199        trace_item.attributes.insert(
1200            "sentry.end_timestamp_precise".into(),
1201            AnyValue {
1202                value: Some(Value::DoubleValue(span.end_timestamp_precise)),
1203            },
1204        );
1205
1206        trace_item.attributes.insert(
1207            "sentry.start_timestamp_ms".into(),
1208            AnyValue {
1209                value: Some(Value::IntValue(span.start_timestamp_ms as i64)),
1210            },
1211        );
1212
1213        trace_item.attributes.insert(
1214            "sentry.is_remote".into(),
1215            AnyValue {
1216                value: Some(Value::BoolValue(span.is_remote)),
1217            },
1218        );
1219
1220        if let Some(parent_span_id) = span.parent_span_id {
1221            trace_item.attributes.insert(
1222                "sentry.parent_span_id".into(),
1223                AnyValue {
1224                    value: Some(Value::StringValue(parent_span_id.into_owned())),
1225                },
1226            );
1227        }
1228
1229        if let Some(profile_id) = span.profile_id {
1230            trace_item.attributes.insert(
1231                "sentry.profile_id".into(),
1232                AnyValue {
1233                    value: Some(Value::StringValue(profile_id.into_owned())),
1234                },
1235            );
1236        }
1237
1238        if let Some(segment_id) = span.segment_id {
1239            trace_item.attributes.insert(
1240                "sentry.segment_id".into(),
1241                AnyValue {
1242                    value: Some(Value::StringValue(segment_id.into_owned())),
1243                },
1244            );
1245        }
1246
1247        if let Some(origin) = span.origin {
1248            trace_item.attributes.insert(
1249                "sentry.origin".into(),
1250                AnyValue {
1251                    value: Some(Value::StringValue(origin.into_owned())),
1252                },
1253            );
1254        }
1255
1256        if let Some(kind) = span.kind {
1257            trace_item.attributes.insert(
1258                "sentry.kind".into(),
1259                AnyValue {
1260                    value: Some(Value::StringValue(kind.into_owned())),
1261                },
1262            );
1263        }
1264
1265        self.produce(
1266            KafkaTopic::Items,
1267            KafkaMessage::Item {
1268                headers: BTreeMap::from([
1269                    (
1270                        "item_type".to_owned(),
1271                        (TraceItemType::Span as i32).to_string(),
1272                    ),
1273                    ("project_id".to_owned(), scoping.project_id.to_string()),
1274                ]),
1275                item_type: TraceItemType::Span,
1276                message: trace_item,
1277            },
1278        )?;
1279
1280        Ok(())
1281    }
1282
1283    fn produce_log(
1284        &self,
1285        scoping: Scoping,
1286        received_at: DateTime<Utc>,
1287        retention_days: u16,
1288        item: &Item,
1289    ) -> Result<(), StoreError> {
1290        relay_log::trace!("Producing log");
1291
1292        let payload = item.payload();
1293        let d = &mut Deserializer::from_slice(&payload);
1294        let logs: LogKafkaMessages = match serde_path_to_error::deserialize(d) {
1295            Ok(logs) => logs,
1296            Err(error) => {
1297                relay_log::error!(
1298                    error = &error as &dyn std::error::Error,
1299                    "failed to parse log"
1300                );
1301                self.outcome_aggregator.send(TrackOutcome {
1302                    category: DataCategory::LogItem,
1303                    event_id: None,
1304                    outcome: Outcome::Invalid(DiscardReason::InvalidLog),
1305                    quantity: 1,
1306                    remote_addr: None,
1307                    scoping,
1308                    timestamp: received_at,
1309                });
1310                self.outcome_aggregator.send(TrackOutcome {
1311                    category: DataCategory::LogByte,
1312                    event_id: None,
1313                    outcome: Outcome::Invalid(DiscardReason::InvalidLog),
1314                    quantity: payload.len() as u32,
1315                    remote_addr: None,
1316                    scoping,
1317                    timestamp: received_at,
1318                });
1319                return Ok(());
1320            }
1321        };
1322
1323        let num_logs = logs.items.len() as u32;
1324        for log in logs.items {
1325            let timestamp_seconds = log.timestamp as i64;
1326            let timestamp_nanos = (log.timestamp.fract() * 1e9) as u32;
1327            let item_id = u128::from_be_bytes(
1328                *Uuid::new_v7(uuid::Timestamp::from_unix(
1329                    uuid::NoContext,
1330                    timestamp_seconds as u64,
1331                    timestamp_nanos,
1332                ))
1333                .as_bytes(),
1334            )
1335            .to_le_bytes()
1336            .to_vec();
1337            let mut trace_item = TraceItem {
1338                item_type: TraceItemType::Log.into(),
1339                organization_id: scoping.organization_id.value(),
1340                project_id: scoping.project_id.value(),
1341                received: Some(Timestamp {
1342                    seconds: safe_timestamp(received_at) as i64,
1343                    nanos: 0,
1344                }),
1345                retention_days: retention_days.into(),
1346                timestamp: Some(Timestamp {
1347                    seconds: timestamp_seconds,
1348                    nanos: 0,
1349                }),
1350                trace_id: log.trace_id.to_string(),
1351                item_id,
1352                attributes: Default::default(),
1353                client_sample_rate: 1.0,
1354                server_sample_rate: 1.0,
1355            };
1356
1357            for (name, attribute) in log.attributes.unwrap_or_default() {
1358                if let Some(attribute_value) = attribute {
1359                    if let Some(v) = attribute_value.value {
1360                        let any_value = match v {
1361                            LogAttributeValue::String(value) => AnyValue {
1362                                value: Some(Value::StringValue(value)),
1363                            },
1364                            LogAttributeValue::Int(value) => AnyValue {
1365                                value: Some(Value::IntValue(value)),
1366                            },
1367                            LogAttributeValue::Bool(value) => AnyValue {
1368                                value: Some(Value::BoolValue(value)),
1369                            },
1370                            LogAttributeValue::Double(value) => AnyValue {
1371                                value: Some(Value::DoubleValue(value)),
1372                            },
1373                            LogAttributeValue::Unknown(_) => continue,
1374                        };
1375
1376                        trace_item.attributes.insert(name.into(), any_value);
1377                    }
1378                }
1379            }
1380
1381            let message = KafkaMessage::Item {
1382                item_type: TraceItemType::Log,
1383                headers: BTreeMap::from([
1384                    ("project_id".to_owned(), scoping.project_id.to_string()),
1385                    (
1386                        "item_type".to_owned(),
1387                        (TraceItemType::Log as i32).to_string(),
1388                    ),
1389                ]),
1390                message: trace_item,
1391            };
1392
1393            self.produce(KafkaTopic::Items, message)?;
1394        }
1395
1396        // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes.
1397        self.outcome_aggregator.send(TrackOutcome {
1398            category: DataCategory::LogItem,
1399            event_id: None,
1400            outcome: Outcome::Accepted,
1401            quantity: num_logs,
1402            remote_addr: None,
1403            scoping,
1404            timestamp: received_at,
1405        });
1406        self.outcome_aggregator.send(TrackOutcome {
1407            category: DataCategory::LogByte,
1408            event_id: None,
1409            outcome: Outcome::Accepted,
1410            quantity: payload.len() as u32,
1411            remote_addr: None,
1412            scoping,
1413            timestamp: received_at,
1414        });
1415
1416        Ok(())
1417    }
1418
1419    fn produce_profile_chunk(
1420        &self,
1421        organization_id: OrganizationId,
1422        project_id: ProjectId,
1423        received_at: DateTime<Utc>,
1424        retention_days: u16,
1425        item: &Item,
1426    ) -> Result<(), StoreError> {
1427        let message = ProfileChunkKafkaMessage {
1428            organization_id,
1429            project_id,
1430            received: safe_timestamp(received_at),
1431            retention_days,
1432            payload: item.payload(),
1433        };
1434        self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1435        Ok(())
1436    }
1437}
1438
1439impl Service for StoreService {
1440    type Interface = Store;
1441
1442    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1443        let this = Arc::new(self);
1444
1445        relay_log::info!("store forwarder started");
1446
1447        while let Some(message) = rx.recv().await {
1448            let service = Arc::clone(&this);
1449            // For now, we run each task synchronously, in the future we might explore how to make
1450            // the store async.
1451            this.pool
1452                .spawn_async(async move { service.handle_message(message) }.boxed())
1453                .await;
1454        }
1455
1456        relay_log::info!("store forwarder stopped");
1457    }
1458}
1459
1460/// Common attributes for both standalone attachments and processing-relevant attachments.
1461#[derive(Debug, Serialize)]
1462struct ChunkedAttachment {
1463    /// The attachment ID within the event.
1464    ///
1465    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1466    id: String,
1467
1468    /// File name of the attachment file.
1469    name: String,
1470
1471    /// Content type of the attachment payload.
1472    #[serde(skip_serializing_if = "Option::is_none")]
1473    content_type: Option<String>,
1474
1475    /// The Sentry-internal attachment type used in the processing pipeline.
1476    #[serde(serialize_with = "serialize_attachment_type")]
1477    attachment_type: AttachmentType,
1478
1479    /// Number of outlined chunks.
1480    /// Zero if the attachment has `size: 0`, or there was only a single chunk which has been inlined into `data`.
1481    chunks: usize,
1482
1483    /// The content of the attachment,
1484    /// if they are smaller than the configured `attachment_chunk_size`.
1485    #[serde(skip_serializing_if = "Option::is_none")]
1486    data: Option<Bytes>,
1487
1488    /// The size of the attachment in bytes.
1489    #[serde(skip_serializing_if = "Option::is_none")]
1490    size: Option<usize>,
1491
1492    /// Whether this attachment was rate limited and should be removed after processing.
1493    ///
1494    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1495    /// native crash reports still need to be retained. These attachments are marked with the
1496    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1497    /// not be persisted after processing.
1498    #[serde(skip_serializing_if = "Option::is_none")]
1499    rate_limited: Option<bool>,
1500}
1501
1502/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1503///
1504/// Cannot serialize bytes.
1505///
1506/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1507fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1508where
1509    S: serde::Serializer,
1510    T: serde::Serialize,
1511{
1512    serde_json::to_value(t)
1513        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1514        .serialize(serializer)
1515}
1516
1517fn serialize_btreemap_skip_nulls<K, S, T>(
1518    map: &Option<BTreeMap<K, Option<T>>>,
1519    serializer: S,
1520) -> Result<S::Ok, S::Error>
1521where
1522    K: Serialize,
1523    S: serde::Serializer,
1524    T: serde::Serialize,
1525{
1526    let Some(map) = map else {
1527        return serializer.serialize_none();
1528    };
1529    let mut m = serializer.serialize_map(Some(map.len()))?;
1530    for (key, value) in map.iter() {
1531        if let Some(value) = value {
1532            m.serialize_entry(key, value)?;
1533        }
1534    }
1535    m.end()
1536}
1537
1538/// Container payload for event messages.
1539#[derive(Debug, Serialize)]
1540struct EventKafkaMessage {
1541    /// Raw event payload.
1542    payload: Bytes,
1543    /// Time at which the event was received by Relay.
1544    start_time: u64,
1545    /// The event id.
1546    event_id: EventId,
1547    /// The project id for the current event.
1548    project_id: ProjectId,
1549    /// The client ip address.
1550    remote_addr: Option<String>,
1551    /// Attachments that are potentially relevant for processing.
1552    attachments: Vec<ChunkedAttachment>,
1553}
1554
1555#[derive(Debug, Serialize)]
1556struct ReplayEventKafkaMessage<'a> {
1557    /// Raw event payload.
1558    payload: &'a [u8],
1559    /// Time at which the event was received by Relay.
1560    start_time: u64,
1561    /// The event id.
1562    replay_id: EventId,
1563    /// The project id for the current event.
1564    project_id: ProjectId,
1565    // Number of days to retain.
1566    retention_days: u16,
1567}
1568
1569/// Container payload for chunks of attachments.
1570#[derive(Debug, Serialize)]
1571struct AttachmentChunkKafkaMessage {
1572    /// Chunk payload of the attachment.
1573    payload: Bytes,
1574    /// The event id.
1575    event_id: EventId,
1576    /// The project id for the current event.
1577    project_id: ProjectId,
1578    /// The attachment ID within the event.
1579    ///
1580    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1581    id: String,
1582    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1583    chunk_index: usize,
1584}
1585
1586/// A "standalone" attachment.
1587///
1588/// Still belongs to an event but can be sent independently (like UserReport) and is not
1589/// considered in processing.
1590#[derive(Debug, Serialize)]
1591struct AttachmentKafkaMessage {
1592    /// The event id.
1593    event_id: EventId,
1594    /// The project id for the current event.
1595    project_id: ProjectId,
1596    /// The attachment.
1597    attachment: ChunkedAttachment,
1598}
1599
1600#[derive(Debug, Serialize)]
1601struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1602    replay_id: EventId,
1603    key_id: Option<u64>,
1604    org_id: OrganizationId,
1605    project_id: ProjectId,
1606    received: u64,
1607    retention_days: u16,
1608    #[serde(with = "serde_bytes")]
1609    payload: &'a [u8],
1610    #[serde(with = "serde_bytes")]
1611    replay_event: Option<&'a [u8]>,
1612    #[serde(with = "serde_bytes")]
1613    replay_video: Option<&'a [u8]>,
1614}
1615
1616/// User report for an event wrapped up in a message ready for consumption in Kafka.
1617///
1618/// Is always independent of an event and can be sent as part of any envelope.
1619#[derive(Debug, Serialize)]
1620struct UserReportKafkaMessage {
1621    /// The project id for the current event.
1622    project_id: ProjectId,
1623    start_time: u64,
1624    payload: Bytes,
1625
1626    // Used for KafkaMessage::key
1627    #[serde(skip)]
1628    event_id: EventId,
1629}
1630
1631#[derive(Clone, Debug, Serialize)]
1632struct MetricKafkaMessage<'a> {
1633    org_id: OrganizationId,
1634    project_id: ProjectId,
1635    name: &'a MetricName,
1636    #[serde(flatten)]
1637    value: MetricValue<'a>,
1638    timestamp: UnixTimestamp,
1639    tags: &'a BTreeMap<String, String>,
1640    retention_days: u16,
1641    #[serde(skip_serializing_if = "Option::is_none")]
1642    received_at: Option<UnixTimestamp>,
1643}
1644
1645#[derive(Clone, Debug, Serialize)]
1646#[serde(tag = "type", content = "value")]
1647enum MetricValue<'a> {
1648    #[serde(rename = "c")]
1649    Counter(FiniteF64),
1650    #[serde(rename = "d")]
1651    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1652    #[serde(rename = "s")]
1653    Set(ArrayEncoding<'a, SetView<'a>>),
1654    #[serde(rename = "g")]
1655    Gauge(GaugeValue),
1656}
1657
1658impl MetricValue<'_> {
1659    fn variant(&self) -> &'static str {
1660        match self {
1661            Self::Counter(_) => "counter",
1662            Self::Distribution(_) => "distribution",
1663            Self::Set(_) => "set",
1664            Self::Gauge(_) => "gauge",
1665        }
1666    }
1667
1668    fn encoding(&self) -> Option<&'static str> {
1669        match self {
1670            Self::Distribution(ae) => Some(ae.name()),
1671            Self::Set(ae) => Some(ae.name()),
1672            _ => None,
1673        }
1674    }
1675}
1676
1677#[derive(Clone, Debug, Serialize)]
1678struct ProfileKafkaMessage {
1679    organization_id: OrganizationId,
1680    project_id: ProjectId,
1681    key_id: Option<u64>,
1682    received: u64,
1683    retention_days: u16,
1684    #[serde(skip)]
1685    headers: BTreeMap<String, String>,
1686    payload: Bytes,
1687}
1688
1689/// Used to discriminate cron monitor ingestion messages.
1690///
1691/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1692/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1693/// intended to ensure the clock continues to run even when ingestion volume drops.
1694#[allow(dead_code)]
1695#[derive(Debug, Serialize)]
1696#[serde(rename_all = "snake_case")]
1697enum CheckInMessageType {
1698    ClockPulse,
1699    CheckIn,
1700}
1701
1702#[derive(Debug, Serialize)]
1703struct CheckInKafkaMessage {
1704    #[serde(skip)]
1705    routing_key_hint: Option<Uuid>,
1706
1707    /// Used by the consumer to discrinminate the message.
1708    message_type: CheckInMessageType,
1709    /// Raw event payload.
1710    payload: Bytes,
1711    /// Time at which the event was received by Relay.
1712    start_time: u64,
1713    /// The SDK client which produced the event.
1714    sdk: Option<String>,
1715    /// The project id for the current event.
1716    project_id: ProjectId,
1717    /// Number of days to retain.
1718    retention_days: u16,
1719}
1720
1721#[derive(Debug, Deserialize, Serialize, Clone)]
1722struct SpanLink<'a> {
1723    pub trace_id: &'a str,
1724    pub span_id: &'a str,
1725    #[serde(default, skip_serializing_if = "Option::is_none")]
1726    pub sampled: Option<bool>,
1727    #[serde(borrow)]
1728    pub attributes: Option<&'a RawValue>,
1729}
1730
1731#[derive(Debug, Deserialize, Serialize, Clone)]
1732struct SpanMeasurement<'a> {
1733    #[serde(skip_serializing_if = "Option::is_none", borrow)]
1734    value: Option<&'a RawValue>,
1735}
1736
1737#[derive(Debug, Deserialize, Serialize, Clone)]
1738struct SpanKafkaMessage<'a> {
1739    #[serde(skip_serializing_if = "Option::is_none", borrow)]
1740    description: Option<Cow<'a, str>>,
1741    #[serde(default)]
1742    duration_ms: u32,
1743    /// The ID of the transaction event associated to this span, if any.
1744    #[serde(default, skip_serializing_if = "Option::is_none")]
1745    event_id: Option<EventId>,
1746    #[serde(rename(deserialize = "exclusive_time"))]
1747    exclusive_time_ms: f64,
1748    #[serde(default)]
1749    is_segment: bool,
1750    #[serde(default)]
1751    is_remote: bool,
1752
1753    #[serde(skip_serializing_if = "none_or_empty_map", borrow)]
1754    data: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1755    #[serde(default, skip_serializing_if = "Option::is_none")]
1756    kind: Option<Cow<'a, str>>,
1757    #[serde(default, skip_serializing_if = "none_or_empty_vec")]
1758    links: Option<Vec<SpanLink<'a>>>,
1759    #[serde(borrow, default, skip_serializing_if = "Option::is_none")]
1760    measurements: Option<BTreeMap<Cow<'a, str>, Option<SpanMeasurement<'a>>>>,
1761    #[serde(default)]
1762    organization_id: u64,
1763    #[serde(borrow, default, skip_serializing_if = "Option::is_none")]
1764    origin: Option<Cow<'a, str>>,
1765    #[serde(default, skip_serializing_if = "Option::is_none")]
1766    parent_span_id: Option<Cow<'a, str>>,
1767    #[serde(default, skip_serializing_if = "Option::is_none")]
1768    profile_id: Option<Cow<'a, str>>,
1769    /// The numeric ID of the project.
1770    #[serde(default)]
1771    project_id: u64,
1772    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1773    received: f64,
1774    /// Number of days until these data should be deleted.
1775    #[serde(default)]
1776    retention_days: u16,
1777    #[serde(default, skip_serializing_if = "Option::is_none")]
1778    segment_id: Option<Cow<'a, str>>,
1779    #[serde(
1780        default,
1781        skip_serializing_if = "Option::is_none",
1782        serialize_with = "serialize_btreemap_skip_nulls"
1783    )]
1784    #[serde(borrow)]
1785    sentry_tags: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1786    span_id: Cow<'a, str>,
1787    #[serde(skip_serializing_if = "none_or_empty_map", borrow)]
1788    tags: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1789    trace_id: EventId,
1790
1791    #[serde(default)]
1792    start_timestamp_ms: u64,
1793    #[serde(rename(deserialize = "start_timestamp"))]
1794    start_timestamp_precise: f64,
1795    #[serde(rename(deserialize = "timestamp"))]
1796    end_timestamp_precise: f64,
1797
1798    #[serde(borrow, default, skip_serializing)]
1799    platform: Cow<'a, str>, // We only use this for logging for now
1800
1801    #[serde(default, skip_serializing_if = "Option::is_none")]
1802    client_sample_rate: Option<f64>,
1803    #[serde(default, skip_serializing_if = "Option::is_none")]
1804    server_sample_rate: Option<f64>,
1805
1806    #[serde(
1807        default,
1808        rename = "_meta",
1809        skip_serializing_if = "none_or_empty_object"
1810    )]
1811    meta: Option<&'a RawValue>,
1812
1813    #[serde(default, skip_serializing_if = "Option::is_none")]
1814    _performance_issues_spans: Option<bool>,
1815}
1816
1817impl SpanKafkaMessage<'_> {
1818    /// Backfills `data` based on `sentry_tags`, `tags`, and `measurements`.
1819    ///
1820    /// * Every item in `sentry_tags` is copied to `data`, with the key prefixed with `sentry.`.
1821    ///   The only exception is the `description` tag, which is copied as `sentry.normalized_description`.
1822    ///
1823    /// * Every item in `tags` is copied to `data` verbatim, with the exception of `description`, which
1824    ///   is copied as `sentry.normalized_description`.
1825    ///
1826    /// * The value of every item in `measurements` is copied to `data` with the same key, with the exceptions
1827    ///   of `client_sample_rate` and `server_sample_rate`. Those measurements are instead written to the top-level
1828    ///   fields of the same names.
1829    ///
1830    /// In no case are existing keys overwritten. Thus, from highest to lowest, the order of precedence is
1831    /// * existing values in `data`
1832    /// * `measurements`
1833    /// * `tags`
1834    /// * `sentry_tags`
1835    fn backfill_data(&mut self) {
1836        let data = self.data.get_or_insert_default();
1837
1838        if let Some(measurements) = &self.measurements {
1839            for (key, value) in measurements {
1840                let Some(value) = value.as_ref().and_then(|v| v.value) else {
1841                    continue;
1842                };
1843
1844                match key.as_ref() {
1845                    "client_sample_rate" => {
1846                        if let Ok(client_sample_rate) = Deserialize::deserialize(value) {
1847                            self.client_sample_rate = Some(client_sample_rate);
1848                        }
1849                    }
1850                    "server_sample_rate" => {
1851                        if let Ok(server_sample_rate) = Deserialize::deserialize(value) {
1852                            self.server_sample_rate = Some(server_sample_rate);
1853                        }
1854                    }
1855                    _ => {
1856                        data.entry(key.clone()).or_insert(Some(value));
1857                    }
1858                }
1859            }
1860        }
1861
1862        if let Some(tags) = &self.tags {
1863            for (key, value) in tags {
1864                let Some(value) = value else {
1865                    continue;
1866                };
1867
1868                let key = if *key == "description" {
1869                    Cow::Borrowed("sentry.normalized_description")
1870                } else {
1871                    key.clone()
1872                };
1873
1874                data.entry(key).or_insert(Some(value));
1875            }
1876        }
1877
1878        if let Some(sentry_tags) = &self.sentry_tags {
1879            for (key, value) in sentry_tags {
1880                let Some(value) = value else {
1881                    continue;
1882                };
1883
1884                let key = if *key == "description" {
1885                    Cow::Borrowed("sentry.normalized_description")
1886                } else {
1887                    Cow::Owned(format!("sentry.{key}"))
1888                };
1889
1890                data.entry(key).or_insert(Some(value));
1891            }
1892        }
1893    }
1894}
1895
1896#[derive(Clone, Debug, Deserialize)]
1897#[serde(tag = "type", content = "value")]
1898enum LogAttributeValue {
1899    #[serde(rename = "string")]
1900    String(String),
1901    #[serde(rename = "boolean")]
1902    Bool(bool),
1903    #[serde(rename = "integer")]
1904    Int(i64),
1905    #[serde(rename = "double")]
1906    Double(f64),
1907    #[serde(rename = "unknown")]
1908    Unknown(()),
1909}
1910
1911/// This is a temporary struct to convert the old attribute format to the new one.
1912#[derive(Debug, Deserialize)]
1913#[allow(dead_code)]
1914struct LogAttribute {
1915    #[serde(flatten)]
1916    value: Option<LogAttributeValue>,
1917}
1918
1919#[derive(Debug, Deserialize)]
1920struct LogKafkaMessages<'a> {
1921    #[serde(borrow)]
1922    items: Vec<LogKafkaMessage<'a>>,
1923}
1924
1925#[derive(Debug, Deserialize)]
1926struct LogKafkaMessage<'a> {
1927    trace_id: EventId,
1928    #[serde(default)]
1929    timestamp: f64,
1930    #[serde(borrow, default)]
1931    attributes: Option<BTreeMap<&'a str, Option<LogAttribute>>>,
1932}
1933
1934fn none_or_empty_object(value: &Option<&RawValue>) -> bool {
1935    match value {
1936        None => true,
1937        Some(raw) => raw.get() == "{}",
1938    }
1939}
1940
1941fn none_or_empty_vec<T>(value: &Option<Vec<T>>) -> bool {
1942    match &value {
1943        Some(vec) => vec.is_empty(),
1944        None => true,
1945    }
1946}
1947
1948fn none_or_empty_map<S, T>(value: &Option<BTreeMap<S, T>>) -> bool {
1949    value.as_ref().is_none_or(BTreeMap::is_empty)
1950}
1951
1952#[derive(Clone, Debug, Serialize)]
1953struct ProfileChunkKafkaMessage {
1954    organization_id: OrganizationId,
1955    project_id: ProjectId,
1956    received: u64,
1957    retention_days: u16,
1958    payload: Bytes,
1959}
1960
1961/// An enum over all possible ingest messages.
1962#[derive(Debug, Serialize)]
1963#[serde(tag = "type", rename_all = "snake_case")]
1964#[allow(clippy::large_enum_variant)]
1965enum KafkaMessage<'a> {
1966    Event(EventKafkaMessage),
1967    Attachment(AttachmentKafkaMessage),
1968    AttachmentChunk(AttachmentChunkKafkaMessage),
1969    UserReport(UserReportKafkaMessage),
1970    Metric {
1971        #[serde(skip)]
1972        headers: BTreeMap<String, String>,
1973        #[serde(flatten)]
1974        message: MetricKafkaMessage<'a>,
1975    },
1976    Profile(ProfileKafkaMessage),
1977    ReplayEvent(ReplayEventKafkaMessage<'a>),
1978    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1979    CheckIn(CheckInKafkaMessage),
1980    Item {
1981        #[serde(skip)]
1982        headers: BTreeMap<String, String>,
1983        #[serde(skip)]
1984        item_type: TraceItemType,
1985        #[serde(skip)]
1986        message: TraceItem,
1987    },
1988    Span {
1989        #[serde(skip)]
1990        headers: BTreeMap<String, String>,
1991        #[serde(flatten)]
1992        message: SpanKafkaMessage<'a>,
1993    },
1994    ProfileChunk(ProfileChunkKafkaMessage),
1995}
1996
1997impl Message for KafkaMessage<'_> {
1998    fn variant(&self) -> &'static str {
1999        match self {
2000            KafkaMessage::Event(_) => "event",
2001            KafkaMessage::Attachment(_) => "attachment",
2002            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
2003            KafkaMessage::UserReport(_) => "user_report",
2004            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
2005                MetricNamespace::Sessions => "metric_sessions",
2006                MetricNamespace::Transactions => "metric_transactions",
2007                MetricNamespace::Spans => "metric_spans",
2008                MetricNamespace::Custom => "metric_custom",
2009                MetricNamespace::Stats => "metric_metric_stats",
2010                MetricNamespace::Unsupported => "metric_unsupported",
2011            },
2012            KafkaMessage::Profile(_) => "profile",
2013            KafkaMessage::ReplayEvent(_) => "replay_event",
2014            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
2015            KafkaMessage::CheckIn(_) => "check_in",
2016            KafkaMessage::Span { .. } => "span",
2017            KafkaMessage::ProfileChunk(_) => "profile_chunk",
2018            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
2019        }
2020    }
2021
2022    /// Returns the partitioning key for this kafka message determining.
2023    fn key(&self) -> Option<[u8; 16]> {
2024        match self {
2025            Self::Event(message) => Some(message.event_id.0),
2026            Self::Attachment(message) => Some(message.event_id.0),
2027            Self::AttachmentChunk(message) => Some(message.event_id.0),
2028            Self::UserReport(message) => Some(message.event_id.0),
2029            Self::ReplayEvent(message) => Some(message.replay_id.0),
2030            Self::Span { message, .. } => Some(message.trace_id.0),
2031
2032            // Monitor check-ins use the hinted UUID passed through from the Envelope.
2033            //
2034            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
2035            // recieve the routing_key_hint form their envelopes.
2036            Self::CheckIn(message) => message.routing_key_hint,
2037
2038            // Generate a new random routing key, instead of defaulting to `rdkafka` behaviour
2039            // for no routing key.
2040            //
2041            // This results in significantly more work for Kafka, but we've seen that the metrics
2042            // indexer consumer in Sentry, cannot deal with this load shape.
2043            // Until the metric indexer is updated, we still need to assign random keys here.
2044            Self::Metric { .. } => Some(Uuid::new_v4()),
2045
2046            // Random partitioning
2047            Self::Profile(_)
2048            | Self::ReplayRecordingNotChunked(_)
2049            | Self::ProfileChunk(_)
2050            | Self::Item { .. } => None,
2051        }
2052        .filter(|uuid| !uuid.is_nil())
2053        .map(|uuid| uuid.into_bytes())
2054    }
2055
2056    fn headers(&self) -> Option<&BTreeMap<String, String>> {
2057        match &self {
2058            KafkaMessage::Metric { headers, .. }
2059            | KafkaMessage::Span { headers, .. }
2060            | KafkaMessage::Item { headers, .. } => {
2061                if !headers.is_empty() {
2062                    return Some(headers);
2063                }
2064                None
2065            }
2066            KafkaMessage::Profile(profile) => {
2067                if !profile.headers.is_empty() {
2068                    return Some(&profile.headers);
2069                }
2070                None
2071            }
2072            _ => None,
2073        }
2074    }
2075
2076    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
2077        match self {
2078            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
2079            KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
2080            KafkaMessage::Span { message, .. } => serialize_as_json(message),
2081            KafkaMessage::Item { message, .. } => {
2082                let mut payload = Vec::new();
2083                match message.encode(&mut payload) {
2084                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
2085                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
2086                }
2087            }
2088            _ => match rmp_serde::to_vec_named(&self) {
2089                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
2090                Err(err) => Err(ClientError::InvalidMsgPack(err)),
2091            },
2092        }
2093    }
2094}
2095
2096fn serialize_as_json<T: serde::Serialize>(
2097    value: &T,
2098) -> Result<SerializationOutput<'_>, ClientError> {
2099    match serde_json::to_vec(value) {
2100        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
2101        Err(err) => Err(ClientError::InvalidJson(err)),
2102    }
2103}
2104
2105/// Determines if the given item is considered slow.
2106///
2107/// Slow items must be routed to the `Attachments` topic.
2108fn is_slow_item(item: &Item) -> bool {
2109    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
2110}
2111
2112fn bool_to_str(value: bool) -> &'static str {
2113    if value { "true" } else { "false" }
2114}
2115
2116/// Returns a safe timestamp for Kafka.
2117///
2118/// Kafka expects timestamps to be in UTC and in seconds since epoch.
2119fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
2120    let ts = timestamp.timestamp();
2121    if ts >= 0 {
2122        return ts as u64;
2123    }
2124
2125    // We assume this call can't return < 0.
2126    Utc::now().timestamp() as u64
2127}
2128
2129#[cfg(test)]
2130mod tests {
2131    use super::*;
2132
2133    #[test]
2134    fn disallow_outcomes() {
2135        let config = Config::default();
2136        let producer = Producer::create(&config).unwrap();
2137
2138        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
2139            let res = producer
2140                .client
2141                .send(topic, Some(*b"0123456789abcdef"), None, "foo", b"");
2142
2143            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
2144        }
2145    }
2146}