relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
37use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::services::upload::Upload;
44use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
45use crate::utils::{self, FormDataIter};
46
47/// Fallback name used for attachment items without a `filename` header.
48const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
49
50#[derive(Debug, thiserror::Error)]
51pub enum StoreError {
52    #[error("failed to send the message to kafka: {0}")]
53    SendFailed(#[from] ClientError),
54    #[error("failed to encode data: {0}")]
55    EncodingFailed(std::io::Error),
56    #[error("failed to store event because event id was missing")]
57    NoEventId,
58}
59
60impl OutcomeError for StoreError {
61    type Error = Self;
62
63    fn consume(self) -> (Option<Outcome>, Self::Error) {
64        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
65    }
66}
67
68struct Producer {
69    client: KafkaClient,
70}
71
72impl Producer {
73    pub fn create(config: &Config) -> anyhow::Result<Self> {
74        let mut client_builder = KafkaClient::builder();
75
76        for topic in KafkaTopic::iter().filter(|t| {
77            // Outcomes should not be sent from the store forwarder.
78            // See `KafkaOutcomesProducer`.
79            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
80        }) {
81            let kafka_configs = config.kafka_configs(*topic)?;
82            client_builder = client_builder
83                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
84                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
85        }
86
87        Ok(Self {
88            client: client_builder.build(),
89        })
90    }
91}
92
93/// Publishes an [`Envelope`] to the Sentry core application through Kafka topics.
94#[derive(Debug)]
95pub struct StoreEnvelope {
96    pub envelope: TypedEnvelope<Processed>,
97}
98
99/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
100#[derive(Clone, Debug)]
101pub struct StoreMetrics {
102    pub buckets: Vec<Bucket>,
103    pub scoping: Scoping,
104    pub retention: u16,
105}
106
107/// Publishes a log item to the Sentry core application through Kafka.
108#[derive(Debug)]
109pub struct StoreTraceItem {
110    /// The final trace item which will be produced to Kafka.
111    pub trace_item: TraceItem,
112    /// Outcomes to be emitted when successfully producing the item to Kafka.
113    ///
114    /// Note: this is only a temporary measure, long term these outcomes will be part of the trace
115    /// item and emitted by Snuba to guarantee a delivery to storage.
116    pub quantities: Quantities,
117}
118
119impl Counted for StoreTraceItem {
120    fn quantities(&self) -> Quantities {
121        self.quantities.clone()
122    }
123}
124
125/// Publishes a span item to the Sentry core application through Kafka.
126#[derive(Debug)]
127pub struct StoreSpanV2 {
128    /// Routing key to assign a Kafka partition.
129    pub routing_key: Option<Uuid>,
130    /// Default retention of the span.
131    pub retention_days: u16,
132    /// Downsampled retention of the span.
133    pub downsampled_retention_days: u16,
134    /// The final Sentry compatible span item.
135    pub item: SpanV2,
136}
137
138impl Counted for StoreSpanV2 {
139    fn quantities(&self) -> Quantities {
140        self.item.quantities()
141    }
142}
143
144/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
145pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
146
147/// Service interface for the [`StoreEnvelope`] message.
148#[derive(Debug)]
149pub enum Store {
150    /// An envelope containing a mixture of items.
151    ///
152    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
153    /// for example logs must be submitted via [`Self::TraceItem`] instead.
154    ///
155    /// Long term this variant is going to be replaced with fully typed variants of items which can
156    /// be stored instead.
157    Envelope(StoreEnvelope),
158    /// Aggregated generic metrics.
159    Metrics(StoreMetrics),
160    /// A singular [`TraceItem`].
161    TraceItem(Managed<StoreTraceItem>),
162    /// A singular Span.
163    Span(Managed<Box<StoreSpanV2>>),
164}
165
166impl Store {
167    /// Returns the name of the message variant.
168    fn variant(&self) -> &'static str {
169        match self {
170            Store::Envelope(_) => "envelope",
171            Store::Metrics(_) => "metrics",
172            Store::TraceItem(_) => "log",
173            Store::Span(_) => "span",
174        }
175    }
176}
177
178impl Interface for Store {}
179
180impl FromMessage<StoreEnvelope> for Store {
181    type Response = NoResponse;
182
183    fn from_message(message: StoreEnvelope, _: ()) -> Self {
184        Self::Envelope(message)
185    }
186}
187
188impl FromMessage<StoreMetrics> for Store {
189    type Response = NoResponse;
190
191    fn from_message(message: StoreMetrics, _: ()) -> Self {
192        Self::Metrics(message)
193    }
194}
195
196impl FromMessage<Managed<StoreTraceItem>> for Store {
197    type Response = NoResponse;
198
199    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
200        Self::TraceItem(message)
201    }
202}
203
204impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
205    type Response = NoResponse;
206
207    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
208        Self::Span(message)
209    }
210}
211
212/// Service implementing the [`Store`] interface.
213pub struct StoreService {
214    pool: StoreServicePool,
215    config: Arc<Config>,
216    global_config: GlobalConfigHandle,
217    outcome_aggregator: Addr<TrackOutcome>,
218    metric_outcomes: MetricOutcomes,
219    #[expect(unused)]
220    upload: Addr<Upload>,
221    producer: Producer,
222}
223
224impl StoreService {
225    pub fn create(
226        pool: StoreServicePool,
227        config: Arc<Config>,
228        global_config: GlobalConfigHandle,
229        outcome_aggregator: Addr<TrackOutcome>,
230        metric_outcomes: MetricOutcomes,
231        upload: Addr<Upload>,
232    ) -> anyhow::Result<Self> {
233        let producer = Producer::create(&config)?;
234        Ok(Self {
235            pool,
236            config,
237            global_config,
238            outcome_aggregator,
239            metric_outcomes,
240            upload,
241            producer,
242        })
243    }
244
245    fn handle_message(&self, message: Store) {
246        let ty = message.variant();
247        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
248            match message {
249                Store::Envelope(message) => self.handle_store_envelope(message),
250                Store::Metrics(message) => self.handle_store_metrics(message),
251                Store::TraceItem(message) => self.handle_store_trace_item(message),
252                Store::Span(message) => self.handle_store_span(message),
253            }
254        })
255    }
256
257    fn handle_store_envelope(&self, message: StoreEnvelope) {
258        let StoreEnvelope {
259            envelope: mut managed,
260        } = message;
261
262        let scoping = managed.scoping();
263        let envelope = managed.take_envelope();
264
265        match self.store_envelope(envelope, managed.received_at(), scoping) {
266            Ok(()) => managed.accept(),
267            Err(error) => {
268                managed.reject(Outcome::Invalid(DiscardReason::Internal));
269                relay_log::error!(
270                    error = &error as &dyn Error,
271                    tags.project_key = %scoping.project_key,
272                    "failed to store envelope"
273                );
274            }
275        }
276    }
277
278    fn store_envelope(
279        &self,
280        mut envelope: Box<Envelope>,
281        received_at: DateTime<Utc>,
282        scoping: Scoping,
283    ) -> Result<(), StoreError> {
284        let retention = envelope.retention();
285        let downsampled_retention = envelope.downsampled_retention();
286
287        let event_id = envelope.event_id();
288        let event_item = envelope.as_mut().take_item_by(|item| {
289            matches!(
290                item.ty(),
291                ItemType::Event | ItemType::Transaction | ItemType::Security
292            )
293        });
294        let event_type = event_item.as_ref().map(|item| item.ty());
295
296        // Some error events like minidumps need all attachment chunks to be processed _before_
297        // the event payload on the consumer side. Transaction attachments do not require this ordering
298        // guarantee, so they do not have to go to the same topic as their event payload.
299        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
300            KafkaTopic::Transactions
301        } else if envelope.get_item_by(is_slow_item).is_some() {
302            KafkaTopic::Attachments
303        } else {
304            KafkaTopic::Events
305        };
306
307        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
308
309        let mut attachments = Vec::new();
310        let mut replay_event = None;
311        let mut replay_recording = None;
312
313        // Whether Relay will submit the replay-event to snuba or not.
314        let replay_relay_snuba_publish_disabled = utils::sample(
315            self.global_config
316                .current()
317                .options
318                .replay_relay_snuba_publish_disabled_sample_rate,
319        )
320        .is_keep();
321
322        for item in envelope.items() {
323            let content_type = item.content_type();
324            match item.ty() {
325                ItemType::Attachment => {
326                    if let Some(attachment) = self.produce_attachment(
327                        event_id.ok_or(StoreError::NoEventId)?,
328                        scoping.project_id,
329                        item,
330                        send_individual_attachments,
331                    )? {
332                        attachments.push(attachment);
333                    }
334                }
335                ItemType::UserReport => {
336                    debug_assert!(event_topic == KafkaTopic::Attachments);
337                    self.produce_user_report(
338                        event_id.ok_or(StoreError::NoEventId)?,
339                        scoping.project_id,
340                        received_at,
341                        item,
342                    )?;
343                }
344                ItemType::UserReportV2 => {
345                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
346                    self.produce_user_report_v2(
347                        event_id.ok_or(StoreError::NoEventId)?,
348                        scoping.project_id,
349                        received_at,
350                        item,
351                        remote_addr,
352                    )?;
353                }
354                ItemType::Profile => self.produce_profile(
355                    scoping.organization_id,
356                    scoping.project_id,
357                    scoping.key_id,
358                    received_at,
359                    retention,
360                    item,
361                )?,
362                ItemType::ReplayVideo => {
363                    self.produce_replay_video(
364                        event_id,
365                        scoping,
366                        item.payload(),
367                        received_at,
368                        retention,
369                        replay_relay_snuba_publish_disabled,
370                    )?;
371                }
372                ItemType::ReplayRecording => {
373                    replay_recording = Some(item);
374                }
375                ItemType::ReplayEvent => {
376                    replay_event = Some(item);
377                    self.produce_replay_event(
378                        event_id.ok_or(StoreError::NoEventId)?,
379                        scoping.project_id,
380                        received_at,
381                        retention,
382                        &item.payload(),
383                        replay_relay_snuba_publish_disabled,
384                    )?;
385                }
386                ItemType::CheckIn => {
387                    let client = envelope.meta().client();
388                    self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
389                }
390                ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
391                    scoping,
392                    received_at,
393                    event_id,
394                    retention,
395                    downsampled_retention,
396                    item,
397                )?,
398                ty @ ItemType::Log => {
399                    debug_assert!(
400                        false,
401                        "received {ty} through an envelope, \
402                        this item must be submitted via a specific store message instead"
403                    );
404                    relay_log::error!(
405                        tags.project_key = %scoping.project_key,
406                        "StoreService received unsupported item type '{ty}' in envelope"
407                    );
408                }
409                ItemType::ProfileChunk => self.produce_profile_chunk(
410                    scoping.organization_id,
411                    scoping.project_id,
412                    received_at,
413                    retention,
414                    item,
415                )?,
416                other => {
417                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
418                    let item_types = envelope
419                        .items()
420                        .map(|item| item.ty().as_str())
421                        .collect::<Vec<_>>();
422                    let attachment_types = envelope
423                        .items()
424                        .map(|item| {
425                            item.attachment_type()
426                                .map(|t| t.to_string())
427                                .unwrap_or_default()
428                        })
429                        .collect::<Vec<_>>();
430
431                    relay_log::with_scope(
432                        |scope| {
433                            scope.set_extra("item_types", item_types.into());
434                            scope.set_extra("attachment_types", attachment_types.into());
435                            if other == &ItemType::FormData {
436                                let payload = item.payload();
437                                let form_data_keys = FormDataIter::new(&payload)
438                                    .map(|entry| entry.key())
439                                    .collect::<Vec<_>>();
440                                scope.set_extra("form_data_keys", form_data_keys.into());
441                            }
442                        },
443                        || {
444                            relay_log::error!(
445                                tags.project_key = %scoping.project_key,
446                                tags.event_type = event_type.unwrap_or("none"),
447                                "StoreService received unexpected item type: {other}"
448                            )
449                        },
450                    )
451                }
452            }
453        }
454
455        if let Some(recording) = replay_recording {
456            // If a recording item type was seen we produce it to Kafka with the replay-event
457            // payload (should it have been provided).
458            //
459            // The replay_video value is always specified as `None`. We do not allow separate
460            // item types for `ReplayVideo` events.
461            let replay_event = replay_event.map(|rv| rv.payload());
462            self.produce_replay_recording(
463                event_id,
464                scoping,
465                &recording.payload(),
466                replay_event.as_deref(),
467                None,
468                received_at,
469                retention,
470                replay_relay_snuba_publish_disabled,
471            )?;
472        }
473
474        if let Some(event_item) = event_item {
475            let event_id = event_id.ok_or(StoreError::NoEventId)?;
476            let project_id = scoping.project_id;
477            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
478
479            self.produce(
480                event_topic,
481                KafkaMessage::Event(EventKafkaMessage {
482                    payload: event_item.payload(),
483                    start_time: safe_timestamp(received_at),
484                    event_id,
485                    project_id,
486                    remote_addr,
487                    attachments,
488                }),
489            )?;
490        } else {
491            debug_assert!(attachments.is_empty());
492        }
493
494        Ok(())
495    }
496
497    fn handle_store_metrics(&self, message: StoreMetrics) {
498        let StoreMetrics {
499            buckets,
500            scoping,
501            retention,
502        } = message;
503
504        let batch_size = self.config.metrics_max_batch_size_bytes();
505        let mut error = None;
506
507        let global_config = self.global_config.current();
508        let mut encoder = BucketEncoder::new(&global_config);
509
510        let now = UnixTimestamp::now();
511        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
512
513        for mut bucket in buckets {
514            let namespace = encoder.prepare(&mut bucket);
515
516            if let Some(received_at) = bucket.metadata.received_at {
517                let delay = now.as_secs().saturating_sub(received_at.as_secs());
518                let (total, count, max) = delay_stats.get_mut(namespace);
519                *total += delay;
520                *count += 1;
521                *max = (*max).max(delay);
522            }
523
524            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
525            // each bucket separately, we only need to split buckets that exceed the size, but not
526            // batches.
527            for view in BucketsView::new(std::slice::from_ref(&bucket))
528                .by_size(batch_size)
529                .flatten()
530            {
531                let message = self.create_metric_message(
532                    scoping.organization_id,
533                    scoping.project_id,
534                    &mut encoder,
535                    namespace,
536                    &view,
537                    retention,
538                );
539
540                let result =
541                    message.and_then(|message| self.send_metric_message(namespace, message));
542
543                let outcome = match result {
544                    Ok(()) => Outcome::Accepted,
545                    Err(e) => {
546                        error.get_or_insert(e);
547                        Outcome::Invalid(DiscardReason::Internal)
548                    }
549                };
550
551                self.metric_outcomes.track(scoping, &[view], outcome);
552            }
553        }
554
555        if let Some(error) = error {
556            relay_log::error!(
557                error = &error as &dyn std::error::Error,
558                "failed to produce metric buckets: {error}"
559            );
560        }
561
562        for (namespace, (total, count, max)) in delay_stats {
563            if count == 0 {
564                continue;
565            }
566            metric!(
567                counter(RelayCounters::MetricDelaySum) += total,
568                namespace = namespace.as_str()
569            );
570            metric!(
571                counter(RelayCounters::MetricDelayCount) += count,
572                namespace = namespace.as_str()
573            );
574            metric!(
575                gauge(RelayGauges::MetricDelayMax) = max,
576                namespace = namespace.as_str()
577            );
578        }
579    }
580
581    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
582        let scoping = message.scoping();
583        let received_at = message.received_at();
584
585        let quantities = message.try_accept(|item| {
586            let item_type = item.trace_item.item_type();
587            let message = KafkaMessage::Item {
588                headers: BTreeMap::from([
589                    ("project_id".to_owned(), scoping.project_id.to_string()),
590                    ("item_type".to_owned(), (item_type as i32).to_string()),
591                ]),
592                message: item.trace_item,
593                item_type,
594            };
595
596            self.produce(KafkaTopic::Items, message)
597                .map(|()| item.quantities)
598        });
599
600        // Accepted outcomes when items have been successfully produced to rdkafka.
601        //
602        // This is only a temporary measure, long term these outcomes will be part of the trace
603        // item and emitted by Snuba to guarantee a delivery to storage.
604        if let Ok(quantities) = quantities {
605            for (category, quantity) in quantities {
606                self.outcome_aggregator.send(TrackOutcome {
607                    category,
608                    event_id: None,
609                    outcome: Outcome::Accepted,
610                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
611                    remote_addr: None,
612                    scoping,
613                    timestamp: received_at,
614                });
615            }
616        }
617    }
618
619    fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
620        let scoping = message.scoping();
621        let received_at = message.received_at();
622
623        let meta = SpanMeta {
624            organization_id: scoping.organization_id,
625            project_id: scoping.project_id,
626            key_id: scoping.key_id,
627            event_id: None,
628            retention_days: message.retention_days,
629            downsampled_retention_days: message.downsampled_retention_days,
630            received: datetime_to_timestamp(received_at),
631        };
632
633        let result = message.try_accept(|span| {
634            let item = Annotated::new(span.item);
635            let message = KafkaMessage::SpanV2 {
636                routing_key: span.routing_key,
637                headers: BTreeMap::from([(
638                    "project_id".to_owned(),
639                    scoping.project_id.to_string(),
640                )]),
641                message: SpanKafkaMessage {
642                    meta,
643                    span: SerializableAnnotated(&item),
644                },
645            };
646
647            self.produce(KafkaTopic::Spans, message)
648        });
649
650        match result {
651            Ok(()) => {
652                relay_statsd::metric!(
653                    counter(RelayCounters::SpanV2Produced) += 1,
654                    via = "processing"
655                );
656
657                // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
658                // or the segments consumer, depending on which will produce outcomes later.
659                self.outcome_aggregator.send(TrackOutcome {
660                    category: DataCategory::SpanIndexed,
661                    event_id: None,
662                    outcome: Outcome::Accepted,
663                    quantity: 1,
664                    remote_addr: None,
665                    scoping,
666                    timestamp: received_at,
667                });
668            }
669            Err(error) => {
670                relay_log::error!(
671                    error = &error as &dyn Error,
672                    tags.project_key = %scoping.project_key,
673                    "failed to store span"
674                );
675            }
676        }
677    }
678
679    fn create_metric_message<'a>(
680        &self,
681        organization_id: OrganizationId,
682        project_id: ProjectId,
683        encoder: &'a mut BucketEncoder,
684        namespace: MetricNamespace,
685        view: &BucketView<'a>,
686        retention_days: u16,
687    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
688        let value = match view.value() {
689            BucketViewValue::Counter(c) => MetricValue::Counter(c),
690            BucketViewValue::Distribution(data) => MetricValue::Distribution(
691                encoder
692                    .encode_distribution(namespace, data)
693                    .map_err(StoreError::EncodingFailed)?,
694            ),
695            BucketViewValue::Set(data) => MetricValue::Set(
696                encoder
697                    .encode_set(namespace, data)
698                    .map_err(StoreError::EncodingFailed)?,
699            ),
700            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
701        };
702
703        Ok(MetricKafkaMessage {
704            org_id: organization_id,
705            project_id,
706            name: view.name(),
707            value,
708            timestamp: view.timestamp(),
709            tags: view.tags(),
710            retention_days,
711            received_at: view.metadata().received_at,
712        })
713    }
714
715    fn produce(
716        &self,
717        topic: KafkaTopic,
718        // Takes message by value to ensure it is not being produced twice.
719        message: KafkaMessage,
720    ) -> Result<(), StoreError> {
721        relay_log::trace!("Sending kafka message of type {}", message.variant());
722
723        let topic_name = self.producer.client.send_message(topic, &message)?;
724
725        match &message {
726            KafkaMessage::Metric {
727                message: metric, ..
728            } => {
729                metric!(
730                    counter(RelayCounters::ProcessingMessageProduced) += 1,
731                    event_type = message.variant(),
732                    topic = topic_name,
733                    metric_type = metric.value.variant(),
734                    metric_encoding = metric.value.encoding().unwrap_or(""),
735                );
736            }
737            KafkaMessage::ReplayRecordingNotChunked(replay) => {
738                let has_video = replay.replay_video.is_some();
739
740                metric!(
741                    counter(RelayCounters::ProcessingMessageProduced) += 1,
742                    event_type = message.variant(),
743                    topic = topic_name,
744                    has_video = bool_to_str(has_video),
745                );
746            }
747            message => {
748                metric!(
749                    counter(RelayCounters::ProcessingMessageProduced) += 1,
750                    event_type = message.variant(),
751                    topic = topic_name,
752                );
753            }
754        }
755
756        Ok(())
757    }
758
759    /// Produces Kafka messages for the content and metadata of an attachment item.
760    ///
761    /// The `send_individual_attachments` controls whether the metadata of an attachment
762    /// is produced directly as an individual `attachment` message, or returned from this function
763    /// to be later sent as part of an `event` message.
764    ///
765    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
766    /// unless the `send_individual_attachments` flag is set, and the content is small enough
767    /// to fit inside a message.
768    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
769    /// of the `attachment` message instead.
770    fn produce_attachment(
771        &self,
772        event_id: EventId,
773        project_id: ProjectId,
774        item: &Item,
775        send_individual_attachments: bool,
776    ) -> Result<Option<ChunkedAttachment>, StoreError> {
777        let id = Uuid::new_v4().to_string();
778
779        let payload = item.payload();
780        let size = item.len();
781        let max_chunk_size = self.config.attachment_chunk_size();
782
783        // When sending individual attachments, and we have a single chunk, we want to send the
784        // `data` inline in the `attachment` message.
785        // This avoids a needless roundtrip through the attachments cache on the Sentry side.
786        let payload = if size == 0 {
787            AttachmentPayload::Chunked(0)
788        } else if send_individual_attachments && size < max_chunk_size {
789            AttachmentPayload::Inline(payload)
790        } else {
791            let mut chunk_index = 0;
792            let mut offset = 0;
793            // This skips chunks for empty attachments. The consumer does not require chunks for
794            // empty attachments. `chunks` will be `0` in this case.
795            while offset < size {
796                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
797                let chunk_message = AttachmentChunkKafkaMessage {
798                    payload: payload.slice(offset..offset + chunk_size),
799                    event_id,
800                    project_id,
801                    id: id.clone(),
802                    chunk_index,
803                };
804
805                self.produce(
806                    KafkaTopic::Attachments,
807                    KafkaMessage::AttachmentChunk(chunk_message),
808                )?;
809                offset += chunk_size;
810                chunk_index += 1;
811            }
812
813            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
814            // is one larger than the last chunk, so it is equal to the number of chunks.
815            AttachmentPayload::Chunked(chunk_index)
816        };
817
818        let attachment = ChunkedAttachment {
819            id,
820            name: match item.filename() {
821                Some(name) => name.to_owned(),
822                None => UNNAMED_ATTACHMENT.to_owned(),
823            },
824            rate_limited: item.rate_limited(),
825            content_type: item
826                .content_type()
827                .map(|content_type| content_type.as_str().to_owned()),
828            attachment_type: item.attachment_type().cloned().unwrap_or_default(),
829            size,
830            payload,
831        };
832
833        if send_individual_attachments {
834            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
835                event_id,
836                project_id,
837                attachment,
838            });
839            self.produce(KafkaTopic::Attachments, message)?;
840            Ok(None)
841        } else {
842            Ok(Some(attachment))
843        }
844    }
845
846    fn produce_user_report(
847        &self,
848        event_id: EventId,
849        project_id: ProjectId,
850        received_at: DateTime<Utc>,
851        item: &Item,
852    ) -> Result<(), StoreError> {
853        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
854            project_id,
855            event_id,
856            start_time: safe_timestamp(received_at),
857            payload: item.payload(),
858        });
859
860        self.produce(KafkaTopic::Attachments, message)
861    }
862
863    fn produce_user_report_v2(
864        &self,
865        event_id: EventId,
866        project_id: ProjectId,
867        received_at: DateTime<Utc>,
868        item: &Item,
869        remote_addr: Option<String>,
870    ) -> Result<(), StoreError> {
871        let message = KafkaMessage::Event(EventKafkaMessage {
872            project_id,
873            event_id,
874            payload: item.payload(),
875            start_time: safe_timestamp(received_at),
876            remote_addr,
877            attachments: vec![],
878        });
879        self.produce(KafkaTopic::Feedback, message)
880    }
881
882    fn send_metric_message(
883        &self,
884        namespace: MetricNamespace,
885        message: MetricKafkaMessage,
886    ) -> Result<(), StoreError> {
887        let topic = match namespace {
888            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
889            MetricNamespace::Unsupported => {
890                relay_log::with_scope(
891                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
892                    || relay_log::error!("store service dropping unknown metric usecase"),
893                );
894                return Ok(());
895            }
896            _ => KafkaTopic::MetricsGeneric,
897        };
898        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
899
900        self.produce(topic, KafkaMessage::Metric { headers, message })?;
901        Ok(())
902    }
903
904    fn produce_profile(
905        &self,
906        organization_id: OrganizationId,
907        project_id: ProjectId,
908        key_id: Option<u64>,
909        received_at: DateTime<Utc>,
910        retention_days: u16,
911        item: &Item,
912    ) -> Result<(), StoreError> {
913        let message = ProfileKafkaMessage {
914            organization_id,
915            project_id,
916            key_id,
917            received: safe_timestamp(received_at),
918            retention_days,
919            headers: BTreeMap::from([(
920                "sampled".to_owned(),
921                if item.sampled() { "true" } else { "false" }.to_owned(),
922            )]),
923            payload: item.payload(),
924        };
925        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
926        Ok(())
927    }
928
929    fn produce_replay_event(
930        &self,
931        replay_id: EventId,
932        project_id: ProjectId,
933        received_at: DateTime<Utc>,
934        retention_days: u16,
935        payload: &[u8],
936        relay_snuba_publish_disabled: bool,
937    ) -> Result<(), StoreError> {
938        if relay_snuba_publish_disabled {
939            return Ok(());
940        }
941
942        let message = ReplayEventKafkaMessage {
943            replay_id,
944            project_id,
945            retention_days,
946            start_time: safe_timestamp(received_at),
947            payload,
948        };
949        self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
950        Ok(())
951    }
952
953    #[allow(clippy::too_many_arguments)]
954    fn produce_replay_recording(
955        &self,
956        event_id: Option<EventId>,
957        scoping: Scoping,
958        payload: &[u8],
959        replay_event: Option<&[u8]>,
960        replay_video: Option<&[u8]>,
961        received_at: DateTime<Utc>,
962        retention: u16,
963        relay_snuba_publish_disabled: bool,
964    ) -> Result<(), StoreError> {
965        // Maximum number of bytes accepted by the consumer.
966        let max_payload_size = self.config.max_replay_message_size();
967
968        // Size of the consumer message. We can be reasonably sure this won't overflow because
969        // of the request size validation provided by Nginx and Relay.
970        let mut payload_size = 2000; // Reserve 2KB for the message metadata.
971        payload_size += replay_event.as_ref().map_or(0, |b| b.len());
972        payload_size += replay_video.as_ref().map_or(0, |b| b.len());
973        payload_size += payload.len();
974
975        // If the recording payload can not fit in to the message do not produce and quit early.
976        if payload_size >= max_payload_size {
977            relay_log::debug!("replay_recording over maximum size.");
978            self.outcome_aggregator.send(TrackOutcome {
979                category: DataCategory::Replay,
980                event_id,
981                outcome: Outcome::Invalid(DiscardReason::TooLarge(
982                    DiscardItemType::ReplayRecording,
983                )),
984                quantity: 1,
985                remote_addr: None,
986                scoping,
987                timestamp: received_at,
988            });
989            return Ok(());
990        }
991
992        let message =
993            KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
994                replay_id: event_id.ok_or(StoreError::NoEventId)?,
995                project_id: scoping.project_id,
996                key_id: scoping.key_id,
997                org_id: scoping.organization_id,
998                received: safe_timestamp(received_at),
999                retention_days: retention,
1000                payload,
1001                replay_event,
1002                replay_video,
1003                relay_snuba_publish_disabled,
1004            });
1005
1006        self.produce(KafkaTopic::ReplayRecordings, message)?;
1007
1008        Ok(())
1009    }
1010
1011    fn produce_replay_video(
1012        &self,
1013        event_id: Option<EventId>,
1014        scoping: Scoping,
1015        payload: Bytes,
1016        received_at: DateTime<Utc>,
1017        retention: u16,
1018        relay_snuba_publish_disabled: bool,
1019    ) -> Result<(), StoreError> {
1020        #[derive(Deserialize)]
1021        struct VideoEvent<'a> {
1022            replay_event: &'a [u8],
1023            replay_recording: &'a [u8],
1024            replay_video: &'a [u8],
1025        }
1026
1027        let Ok(VideoEvent {
1028            replay_video,
1029            replay_event,
1030            replay_recording,
1031        }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1032        else {
1033            self.outcome_aggregator.send(TrackOutcome {
1034                category: DataCategory::Replay,
1035                event_id,
1036                outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1037                quantity: 1,
1038                remote_addr: None,
1039                scoping,
1040                timestamp: received_at,
1041            });
1042            return Ok(());
1043        };
1044
1045        self.produce_replay_event(
1046            event_id.ok_or(StoreError::NoEventId)?,
1047            scoping.project_id,
1048            received_at,
1049            retention,
1050            replay_event,
1051            relay_snuba_publish_disabled,
1052        )?;
1053
1054        self.produce_replay_recording(
1055            event_id,
1056            scoping,
1057            replay_recording,
1058            Some(replay_event),
1059            Some(replay_video),
1060            received_at,
1061            retention,
1062            relay_snuba_publish_disabled,
1063        )
1064    }
1065
1066    fn produce_check_in(
1067        &self,
1068        project_id: ProjectId,
1069        received_at: DateTime<Utc>,
1070        client: Option<&str>,
1071        retention_days: u16,
1072        item: &Item,
1073    ) -> Result<(), StoreError> {
1074        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1075            message_type: CheckInMessageType::CheckIn,
1076            project_id,
1077            retention_days,
1078            start_time: safe_timestamp(received_at),
1079            sdk: client.map(str::to_owned),
1080            payload: item.payload(),
1081            routing_key_hint: item.routing_hint(),
1082        });
1083
1084        self.produce(KafkaTopic::Monitors, message)?;
1085
1086        Ok(())
1087    }
1088
1089    fn produce_span(
1090        &self,
1091        scoping: Scoping,
1092        received_at: DateTime<Utc>,
1093        event_id: Option<EventId>,
1094        retention_days: u16,
1095        downsampled_retention_days: u16,
1096        item: &Item,
1097    ) -> Result<(), StoreError> {
1098        debug_assert_eq!(item.ty(), &ItemType::Span);
1099        debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1100
1101        let Scoping {
1102            organization_id,
1103            project_id,
1104            project_key: _,
1105            key_id,
1106        } = scoping;
1107
1108        let payload = item.payload();
1109        let message = SpanKafkaMessageRaw {
1110            meta: SpanMeta {
1111                organization_id,
1112                project_id,
1113                key_id,
1114                event_id,
1115                retention_days,
1116                downsampled_retention_days,
1117                received: datetime_to_timestamp(received_at),
1118            },
1119            span: serde_json::from_slice(&payload)
1120                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1121        };
1122
1123        // Verify that this is a V2 span:
1124        debug_assert!(message.span.contains_key("attributes"));
1125        relay_statsd::metric!(
1126            counter(RelayCounters::SpanV2Produced) += 1,
1127            via = "envelope"
1128        );
1129
1130        self.produce(
1131            KafkaTopic::Spans,
1132            KafkaMessage::SpanRaw {
1133                routing_key: item.routing_hint(),
1134                headers: BTreeMap::from([(
1135                    "project_id".to_owned(),
1136                    scoping.project_id.to_string(),
1137                )]),
1138                message,
1139            },
1140        )?;
1141
1142        // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1143        // or the segments consumer, depending on which will produce outcomes later.
1144        self.outcome_aggregator.send(TrackOutcome {
1145            category: DataCategory::SpanIndexed,
1146            event_id: None,
1147            outcome: Outcome::Accepted,
1148            quantity: 1,
1149            remote_addr: None,
1150            scoping,
1151            timestamp: received_at,
1152        });
1153
1154        Ok(())
1155    }
1156
1157    fn produce_profile_chunk(
1158        &self,
1159        organization_id: OrganizationId,
1160        project_id: ProjectId,
1161        received_at: DateTime<Utc>,
1162        retention_days: u16,
1163        item: &Item,
1164    ) -> Result<(), StoreError> {
1165        let message = ProfileChunkKafkaMessage {
1166            organization_id,
1167            project_id,
1168            received: safe_timestamp(received_at),
1169            retention_days,
1170            payload: item.payload(),
1171        };
1172        self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1173        Ok(())
1174    }
1175}
1176
1177impl Service for StoreService {
1178    type Interface = Store;
1179
1180    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1181        let this = Arc::new(self);
1182
1183        relay_log::info!("store forwarder started");
1184
1185        while let Some(message) = rx.recv().await {
1186            let service = Arc::clone(&this);
1187            // For now, we run each task synchronously, in the future we might explore how to make
1188            // the store async.
1189            this.pool
1190                .spawn_async(async move { service.handle_message(message) }.boxed())
1191                .await;
1192        }
1193
1194        relay_log::info!("store forwarder stopped");
1195    }
1196}
1197
1198/// This signifies how the attachment payload is being transfered.
1199#[derive(Debug, Serialize)]
1200enum AttachmentPayload {
1201    /// The payload has been split into multiple chunks.
1202    ///
1203    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1204    /// If the payload `size == 0`, the number of chunks will also be `0`.
1205    #[serde(rename = "chunks")]
1206    Chunked(usize),
1207
1208    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1209    #[serde(rename = "data")]
1210    Inline(Bytes),
1211
1212    /// The attachment has already been stored into the objectstore, with the given Id.
1213    #[serde(rename = "stored_id")]
1214    #[allow(unused)] // TODO: actually storing it in objectstore first is still WIP
1215    Stored(String),
1216}
1217
1218/// Common attributes for both standalone attachments and processing-relevant attachments.
1219#[derive(Debug, Serialize)]
1220struct ChunkedAttachment {
1221    /// The attachment ID within the event.
1222    ///
1223    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1224    id: String,
1225
1226    /// File name of the attachment file.
1227    name: String,
1228
1229    /// Whether this attachment was rate limited and should be removed after processing.
1230    ///
1231    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1232    /// native crash reports still need to be retained. These attachments are marked with the
1233    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1234    /// not be persisted after processing.
1235    rate_limited: bool,
1236
1237    /// Content type of the attachment payload.
1238    #[serde(skip_serializing_if = "Option::is_none")]
1239    content_type: Option<String>,
1240
1241    /// The Sentry-internal attachment type used in the processing pipeline.
1242    #[serde(serialize_with = "serialize_attachment_type")]
1243    attachment_type: AttachmentType,
1244
1245    /// The size of the attachment in bytes.
1246    size: usize,
1247
1248    /// The attachment payload, chunked, inlined, or already stored.
1249    #[serde(flatten)]
1250    payload: AttachmentPayload,
1251}
1252
1253/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1254///
1255/// Cannot serialize bytes.
1256///
1257/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1258fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1259where
1260    S: serde::Serializer,
1261    T: serde::Serialize,
1262{
1263    serde_json::to_value(t)
1264        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1265        .serialize(serializer)
1266}
1267
1268/// Container payload for event messages.
1269#[derive(Debug, Serialize)]
1270struct EventKafkaMessage {
1271    /// Raw event payload.
1272    payload: Bytes,
1273    /// Time at which the event was received by Relay.
1274    start_time: u64,
1275    /// The event id.
1276    event_id: EventId,
1277    /// The project id for the current event.
1278    project_id: ProjectId,
1279    /// The client ip address.
1280    remote_addr: Option<String>,
1281    /// Attachments that are potentially relevant for processing.
1282    attachments: Vec<ChunkedAttachment>,
1283}
1284
1285#[derive(Debug, Serialize)]
1286struct ReplayEventKafkaMessage<'a> {
1287    /// Raw event payload.
1288    payload: &'a [u8],
1289    /// Time at which the event was received by Relay.
1290    start_time: u64,
1291    /// The event id.
1292    replay_id: EventId,
1293    /// The project id for the current event.
1294    project_id: ProjectId,
1295    // Number of days to retain.
1296    retention_days: u16,
1297}
1298
1299/// Container payload for chunks of attachments.
1300#[derive(Debug, Serialize)]
1301struct AttachmentChunkKafkaMessage {
1302    /// Chunk payload of the attachment.
1303    payload: Bytes,
1304    /// The event id.
1305    event_id: EventId,
1306    /// The project id for the current event.
1307    project_id: ProjectId,
1308    /// The attachment ID within the event.
1309    ///
1310    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1311    id: String,
1312    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1313    chunk_index: usize,
1314}
1315
1316/// A "standalone" attachment.
1317///
1318/// Still belongs to an event but can be sent independently (like UserReport) and is not
1319/// considered in processing.
1320#[derive(Debug, Serialize)]
1321struct AttachmentKafkaMessage {
1322    /// The event id.
1323    event_id: EventId,
1324    /// The project id for the current event.
1325    project_id: ProjectId,
1326    /// The attachment.
1327    attachment: ChunkedAttachment,
1328}
1329
1330#[derive(Debug, Serialize)]
1331struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1332    replay_id: EventId,
1333    key_id: Option<u64>,
1334    org_id: OrganizationId,
1335    project_id: ProjectId,
1336    received: u64,
1337    retention_days: u16,
1338    #[serde(with = "serde_bytes")]
1339    payload: &'a [u8],
1340    #[serde(with = "serde_bytes")]
1341    replay_event: Option<&'a [u8]>,
1342    #[serde(with = "serde_bytes")]
1343    replay_video: Option<&'a [u8]>,
1344    relay_snuba_publish_disabled: bool,
1345}
1346
1347/// User report for an event wrapped up in a message ready for consumption in Kafka.
1348///
1349/// Is always independent of an event and can be sent as part of any envelope.
1350#[derive(Debug, Serialize)]
1351struct UserReportKafkaMessage {
1352    /// The project id for the current event.
1353    project_id: ProjectId,
1354    start_time: u64,
1355    payload: Bytes,
1356
1357    // Used for KafkaMessage::key
1358    #[serde(skip)]
1359    event_id: EventId,
1360}
1361
1362#[derive(Clone, Debug, Serialize)]
1363struct MetricKafkaMessage<'a> {
1364    org_id: OrganizationId,
1365    project_id: ProjectId,
1366    name: &'a MetricName,
1367    #[serde(flatten)]
1368    value: MetricValue<'a>,
1369    timestamp: UnixTimestamp,
1370    tags: &'a BTreeMap<String, String>,
1371    retention_days: u16,
1372    #[serde(skip_serializing_if = "Option::is_none")]
1373    received_at: Option<UnixTimestamp>,
1374}
1375
1376#[derive(Clone, Debug, Serialize)]
1377#[serde(tag = "type", content = "value")]
1378enum MetricValue<'a> {
1379    #[serde(rename = "c")]
1380    Counter(FiniteF64),
1381    #[serde(rename = "d")]
1382    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1383    #[serde(rename = "s")]
1384    Set(ArrayEncoding<'a, SetView<'a>>),
1385    #[serde(rename = "g")]
1386    Gauge(GaugeValue),
1387}
1388
1389impl MetricValue<'_> {
1390    fn variant(&self) -> &'static str {
1391        match self {
1392            Self::Counter(_) => "counter",
1393            Self::Distribution(_) => "distribution",
1394            Self::Set(_) => "set",
1395            Self::Gauge(_) => "gauge",
1396        }
1397    }
1398
1399    fn encoding(&self) -> Option<&'static str> {
1400        match self {
1401            Self::Distribution(ae) => Some(ae.name()),
1402            Self::Set(ae) => Some(ae.name()),
1403            _ => None,
1404        }
1405    }
1406}
1407
1408#[derive(Clone, Debug, Serialize)]
1409struct ProfileKafkaMessage {
1410    organization_id: OrganizationId,
1411    project_id: ProjectId,
1412    key_id: Option<u64>,
1413    received: u64,
1414    retention_days: u16,
1415    #[serde(skip)]
1416    headers: BTreeMap<String, String>,
1417    payload: Bytes,
1418}
1419
1420/// Used to discriminate cron monitor ingestion messages.
1421///
1422/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1423/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1424/// intended to ensure the clock continues to run even when ingestion volume drops.
1425#[allow(dead_code)]
1426#[derive(Debug, Serialize)]
1427#[serde(rename_all = "snake_case")]
1428enum CheckInMessageType {
1429    ClockPulse,
1430    CheckIn,
1431}
1432
1433#[derive(Debug, Serialize)]
1434struct CheckInKafkaMessage {
1435    #[serde(skip)]
1436    routing_key_hint: Option<Uuid>,
1437
1438    /// Used by the consumer to discrinminate the message.
1439    message_type: CheckInMessageType,
1440    /// Raw event payload.
1441    payload: Bytes,
1442    /// Time at which the event was received by Relay.
1443    start_time: u64,
1444    /// The SDK client which produced the event.
1445    sdk: Option<String>,
1446    /// The project id for the current event.
1447    project_id: ProjectId,
1448    /// Number of days to retain.
1449    retention_days: u16,
1450}
1451
1452#[derive(Debug, Serialize)]
1453struct SpanKafkaMessageRaw<'a> {
1454    #[serde(flatten)]
1455    meta: SpanMeta,
1456    #[serde(flatten)]
1457    span: BTreeMap<&'a str, &'a RawValue>,
1458}
1459
1460#[derive(Debug, Serialize)]
1461struct SpanKafkaMessage<'a> {
1462    #[serde(flatten)]
1463    meta: SpanMeta,
1464    #[serde(flatten)]
1465    span: SerializableAnnotated<'a, SpanV2>,
1466}
1467
1468#[derive(Debug, Serialize)]
1469struct SpanMeta {
1470    organization_id: OrganizationId,
1471    project_id: ProjectId,
1472    // Required for the buffer to emit outcomes scoped to the DSN.
1473    #[serde(skip_serializing_if = "Option::is_none")]
1474    key_id: Option<u64>,
1475    #[serde(skip_serializing_if = "Option::is_none")]
1476    event_id: Option<EventId>,
1477    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1478    received: f64,
1479    /// Number of days until these data should be deleted.
1480    retention_days: u16,
1481    /// Number of days until the downsampled version of this data should be deleted.
1482    downsampled_retention_days: u16,
1483}
1484
1485#[derive(Clone, Debug, Serialize)]
1486struct ProfileChunkKafkaMessage {
1487    organization_id: OrganizationId,
1488    project_id: ProjectId,
1489    received: u64,
1490    retention_days: u16,
1491    payload: Bytes,
1492}
1493
1494/// An enum over all possible ingest messages.
1495#[derive(Debug, Serialize)]
1496#[serde(tag = "type", rename_all = "snake_case")]
1497#[allow(clippy::large_enum_variant)]
1498enum KafkaMessage<'a> {
1499    Event(EventKafkaMessage),
1500    UserReport(UserReportKafkaMessage),
1501    Metric {
1502        #[serde(skip)]
1503        headers: BTreeMap<String, String>,
1504        #[serde(flatten)]
1505        message: MetricKafkaMessage<'a>,
1506    },
1507    CheckIn(CheckInKafkaMessage),
1508    Item {
1509        #[serde(skip)]
1510        headers: BTreeMap<String, String>,
1511        #[serde(skip)]
1512        item_type: TraceItemType,
1513        #[serde(skip)]
1514        message: TraceItem,
1515    },
1516    SpanRaw {
1517        #[serde(skip)]
1518        routing_key: Option<Uuid>,
1519        #[serde(skip)]
1520        headers: BTreeMap<String, String>,
1521        #[serde(flatten)]
1522        message: SpanKafkaMessageRaw<'a>,
1523    },
1524    SpanV2 {
1525        #[serde(skip)]
1526        routing_key: Option<Uuid>,
1527        #[serde(skip)]
1528        headers: BTreeMap<String, String>,
1529        #[serde(flatten)]
1530        message: SpanKafkaMessage<'a>,
1531    },
1532
1533    Attachment(AttachmentKafkaMessage),
1534    AttachmentChunk(AttachmentChunkKafkaMessage),
1535
1536    Profile(ProfileKafkaMessage),
1537    ProfileChunk(ProfileChunkKafkaMessage),
1538
1539    ReplayEvent(ReplayEventKafkaMessage<'a>),
1540    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1541}
1542
1543impl Message for KafkaMessage<'_> {
1544    fn variant(&self) -> &'static str {
1545        match self {
1546            KafkaMessage::Event(_) => "event",
1547            KafkaMessage::UserReport(_) => "user_report",
1548            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1549                MetricNamespace::Sessions => "metric_sessions",
1550                MetricNamespace::Transactions => "metric_transactions",
1551                MetricNamespace::Spans => "metric_spans",
1552                MetricNamespace::Custom => "metric_custom",
1553                MetricNamespace::Unsupported => "metric_unsupported",
1554            },
1555            KafkaMessage::CheckIn(_) => "check_in",
1556            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1557            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1558
1559            KafkaMessage::Attachment(_) => "attachment",
1560            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1561
1562            KafkaMessage::Profile(_) => "profile",
1563            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1564
1565            KafkaMessage::ReplayEvent(_) => "replay_event",
1566            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1567        }
1568    }
1569
1570    /// Returns the partitioning key for this Kafka message determining.
1571    fn key(&self) -> Option<relay_kafka::Key> {
1572        match self {
1573            Self::Event(message) => Some(message.event_id.0),
1574            Self::UserReport(message) => Some(message.event_id.0),
1575            Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1576
1577            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1578            //
1579            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1580            // recieve the routing_key_hint form their envelopes.
1581            Self::CheckIn(message) => message.routing_key_hint,
1582
1583            Self::Attachment(message) => Some(message.event_id.0),
1584            Self::AttachmentChunk(message) => Some(message.event_id.0),
1585            Self::ReplayEvent(message) => Some(message.replay_id.0),
1586
1587            // Random partitioning
1588            Self::Metric { .. }
1589            | Self::Item { .. }
1590            | Self::Profile(_)
1591            | Self::ProfileChunk(_)
1592            | Self::ReplayRecordingNotChunked(_) => None,
1593        }
1594        .filter(|uuid| !uuid.is_nil())
1595        .map(|uuid| uuid.as_u128())
1596    }
1597
1598    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1599        match &self {
1600            KafkaMessage::Metric { headers, .. }
1601            | KafkaMessage::SpanRaw { headers, .. }
1602            | KafkaMessage::SpanV2 { headers, .. }
1603            | KafkaMessage::Item { headers, .. }
1604            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers),
1605
1606            KafkaMessage::Event(_)
1607            | KafkaMessage::UserReport(_)
1608            | KafkaMessage::CheckIn(_)
1609            | KafkaMessage::Attachment(_)
1610            | KafkaMessage::AttachmentChunk(_)
1611            | KafkaMessage::ProfileChunk(_)
1612            | KafkaMessage::ReplayEvent(_)
1613            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1614        }
1615    }
1616
1617    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1618        match self {
1619            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1620            KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1621            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1622            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1623            KafkaMessage::Item { message, .. } => {
1624                let mut payload = Vec::new();
1625                match message.encode(&mut payload) {
1626                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1627                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1628                }
1629            }
1630            KafkaMessage::Event(_)
1631            | KafkaMessage::UserReport(_)
1632            | KafkaMessage::CheckIn(_)
1633            | KafkaMessage::Attachment(_)
1634            | KafkaMessage::AttachmentChunk(_)
1635            | KafkaMessage::Profile(_)
1636            | KafkaMessage::ProfileChunk(_)
1637            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1638                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1639                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1640            },
1641        }
1642    }
1643}
1644
1645fn serialize_as_json<T: serde::Serialize>(
1646    value: &T,
1647) -> Result<SerializationOutput<'_>, ClientError> {
1648    match serde_json::to_vec(value) {
1649        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1650        Err(err) => Err(ClientError::InvalidJson(err)),
1651    }
1652}
1653
1654/// Determines if the given item is considered slow.
1655///
1656/// Slow items must be routed to the `Attachments` topic.
1657fn is_slow_item(item: &Item) -> bool {
1658    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1659}
1660
1661fn bool_to_str(value: bool) -> &'static str {
1662    if value { "true" } else { "false" }
1663}
1664
1665/// Returns a safe timestamp for Kafka.
1666///
1667/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1668fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1669    let ts = timestamp.timestamp();
1670    if ts >= 0 {
1671        return ts as u64;
1672    }
1673
1674    // We assume this call can't return < 0.
1675    Utc::now().timestamp() as u64
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680
1681    use super::*;
1682
1683    #[test]
1684    fn disallow_outcomes() {
1685        let config = Config::default();
1686        let producer = Producer::create(&config).unwrap();
1687
1688        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1689            let res = producer
1690                .client
1691                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1692
1693            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1694        }
1695    }
1696}