relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46/// Fallback name used for attachment items without a `filename` header.
47const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
48
49#[derive(Debug, thiserror::Error)]
50pub enum StoreError {
51    #[error("failed to send the message to kafka: {0}")]
52    SendFailed(#[from] ClientError),
53    #[error("failed to encode data: {0}")]
54    EncodingFailed(std::io::Error),
55    #[error("failed to store event because event id was missing")]
56    NoEventId,
57}
58
59impl OutcomeError for StoreError {
60    type Error = Self;
61
62    fn consume(self) -> (Option<Outcome>, Self::Error) {
63        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
64    }
65}
66
67struct Producer {
68    client: KafkaClient,
69}
70
71impl Producer {
72    pub fn create(config: &Config) -> anyhow::Result<Self> {
73        let mut client_builder = KafkaClient::builder();
74
75        for topic in KafkaTopic::iter().filter(|t| {
76            // Outcomes should not be sent from the store forwarder.
77            // See `KafkaOutcomesProducer`.
78            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
79        }) {
80            let kafka_configs = config.kafka_configs(*topic)?;
81            client_builder = client_builder
82                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
83                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
84        }
85
86        Ok(Self {
87            client: client_builder.build(),
88        })
89    }
90}
91
92/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
93#[derive(Debug)]
94pub struct StoreEnvelope {
95    pub envelope: TypedEnvelope<Processed>,
96}
97
98/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
99#[derive(Clone, Debug)]
100pub struct StoreMetrics {
101    pub buckets: Vec<Bucket>,
102    pub scoping: Scoping,
103    pub retention: u16,
104}
105
106/// Publishes a log item to the Sentry core application through Kafka.
107#[derive(Debug)]
108pub struct StoreTraceItem {
109    /// The final trace item which will be produced to Kafka.
110    pub trace_item: TraceItem,
111    /// Outcomes to be emitted when successfully producing the item to Kafka.
112    ///
113    /// Note: this is only a temporary measure, long term these outcomes will be part of the trace
114    /// item and emitted by Snuba to guarantee a delivery to storage.
115    pub quantities: Quantities,
116}
117
118impl Counted for StoreTraceItem {
119    fn quantities(&self) -> Quantities {
120        self.quantities.clone()
121    }
122}
123
124/// Publishes a span item to the Sentry core application through Kafka.
125#[derive(Debug)]
126pub struct StoreSpanV2 {
127    /// Routing key to assign a Kafka partition.
128    pub routing_key: Option<Uuid>,
129    /// Default retention of the span.
130    pub retention_days: u16,
131    /// Downsampled retention of the span.
132    pub downsampled_retention_days: u16,
133    /// The final Sentry compatible span item.
134    pub item: SpanV2,
135}
136
137impl Counted for StoreSpanV2 {
138    fn quantities(&self) -> Quantities {
139        smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
140    }
141}
142
143/// Publishes a singular profile chunk to Kafka.
144#[derive(Debug)]
145pub struct StoreProfileChunk {
146    /// Default retention of the span.
147    pub retention_days: u16,
148    /// The serialized profile chunk payload.
149    pub payload: Bytes,
150    /// Outcome quantities associated with this profile.
151    ///
152    /// Quantities are different for backend and ui profile chunks.
153    pub quantities: Quantities,
154}
155
156impl Counted for StoreProfileChunk {
157    fn quantities(&self) -> Quantities {
158        self.quantities.clone()
159    }
160}
161
162/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
163pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
164
165/// Service interface for the [`StoreEnvelope`] message.
166#[derive(Debug)]
167pub enum Store {
168    /// An envelope containing a mixture of items.
169    ///
170    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
171    /// for example logs must be submitted via [`Self::TraceItem`] instead.
172    ///
173    /// Long term this variant is going to be replaced with fully typed variants of items which can
174    /// be stored instead.
175    Envelope(StoreEnvelope),
176    /// Aggregated generic metrics.
177    Metrics(StoreMetrics),
178    /// A singular [`TraceItem`].
179    TraceItem(Managed<StoreTraceItem>),
180    /// A singular Span.
181    Span(Managed<Box<StoreSpanV2>>),
182    /// A singular profile chunk.
183    ProfileChunk(Managed<StoreProfileChunk>),
184}
185
186impl Store {
187    /// Returns the name of the message variant.
188    fn variant(&self) -> &'static str {
189        match self {
190            Store::Envelope(_) => "envelope",
191            Store::Metrics(_) => "metrics",
192            Store::TraceItem(_) => "log",
193            Store::Span(_) => "span",
194            Store::ProfileChunk(_) => "profile_chunk",
195        }
196    }
197}
198
199impl Interface for Store {}
200
201impl FromMessage<StoreEnvelope> for Store {
202    type Response = NoResponse;
203
204    fn from_message(message: StoreEnvelope, _: ()) -> Self {
205        Self::Envelope(message)
206    }
207}
208
209impl FromMessage<StoreMetrics> for Store {
210    type Response = NoResponse;
211
212    fn from_message(message: StoreMetrics, _: ()) -> Self {
213        Self::Metrics(message)
214    }
215}
216
217impl FromMessage<Managed<StoreTraceItem>> for Store {
218    type Response = NoResponse;
219
220    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
221        Self::TraceItem(message)
222    }
223}
224
225impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
226    type Response = NoResponse;
227
228    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
229        Self::Span(message)
230    }
231}
232
233impl FromMessage<Managed<StoreProfileChunk>> for Store {
234    type Response = NoResponse;
235
236    fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
237        Self::ProfileChunk(message)
238    }
239}
240
241/// Service implementing the [`Store`] interface.
242pub struct StoreService {
243    pool: StoreServicePool,
244    config: Arc<Config>,
245    global_config: GlobalConfigHandle,
246    outcome_aggregator: Addr<TrackOutcome>,
247    metric_outcomes: MetricOutcomes,
248    producer: Producer,
249}
250
251impl StoreService {
252    pub fn create(
253        pool: StoreServicePool,
254        config: Arc<Config>,
255        global_config: GlobalConfigHandle,
256        outcome_aggregator: Addr<TrackOutcome>,
257        metric_outcomes: MetricOutcomes,
258    ) -> anyhow::Result<Self> {
259        let producer = Producer::create(&config)?;
260        Ok(Self {
261            pool,
262            config,
263            global_config,
264            outcome_aggregator,
265            metric_outcomes,
266            producer,
267        })
268    }
269
270    fn handle_message(&self, message: Store) {
271        let ty = message.variant();
272        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
273            match message {
274                Store::Envelope(message) => self.handle_store_envelope(message),
275                Store::Metrics(message) => self.handle_store_metrics(message),
276                Store::TraceItem(message) => self.handle_store_trace_item(message),
277                Store::Span(message) => self.handle_store_span(message),
278                Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
279            }
280        })
281    }
282
283    fn handle_store_envelope(&self, message: StoreEnvelope) {
284        let StoreEnvelope {
285            envelope: mut managed,
286        } = message;
287
288        let scoping = managed.scoping();
289
290        match self.store_envelope(&mut managed) {
291            Ok(()) => managed.accept(),
292            Err(error) => {
293                managed.reject(Outcome::Invalid(DiscardReason::Internal));
294                relay_log::error!(
295                    error = &error as &dyn Error,
296                    tags.project_key = %scoping.project_key,
297                    "failed to store envelope"
298                );
299            }
300        }
301    }
302
303    fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
304        let mut envelope = managed_envelope.take_envelope();
305        let received_at = managed_envelope.received_at();
306        let scoping = managed_envelope.scoping();
307
308        let retention = envelope.retention();
309        let downsampled_retention = envelope.downsampled_retention();
310
311        let event_id = envelope.event_id();
312        let event_item = envelope.as_mut().take_item_by(|item| {
313            matches!(
314                item.ty(),
315                ItemType::Event | ItemType::Transaction | ItemType::Security
316            )
317        });
318        let event_type = event_item.as_ref().map(|item| item.ty());
319
320        // Some error events like minidumps need all attachment chunks to be processed _before_
321        // the event payload on the consumer side. Transaction attachments do not require this ordering
322        // guarantee, so they do not have to go to the same topic as their event payload.
323        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
324            KafkaTopic::Transactions
325        } else if envelope.get_item_by(is_slow_item).is_some() {
326            KafkaTopic::Attachments
327        } else {
328            KafkaTopic::Events
329        };
330
331        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
332
333        let mut attachments = Vec::new();
334        let mut replay_event = None;
335        let mut replay_recording = None;
336
337        let options = &self.global_config.current().options;
338
339        // Whether Relay will submit the replay-event to snuba or not.
340        let replay_relay_snuba_publish_disabled =
341            utils::sample(options.replay_relay_snuba_publish_disabled_sample_rate).is_keep();
342
343        for item in envelope.items() {
344            let content_type = item.content_type();
345            match item.ty() {
346                ItemType::Attachment => {
347                    if let Some(attachment) = self.produce_attachment(
348                        event_id.ok_or(StoreError::NoEventId)?,
349                        scoping.project_id,
350                        item,
351                        send_individual_attachments,
352                    )? {
353                        attachments.push(attachment);
354                    }
355                }
356                ItemType::UserReport => {
357                    debug_assert!(event_topic == KafkaTopic::Attachments);
358                    self.produce_user_report(
359                        event_id.ok_or(StoreError::NoEventId)?,
360                        scoping.project_id,
361                        received_at,
362                        item,
363                    )?;
364                }
365                ItemType::UserReportV2 => {
366                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
367                    self.produce_user_report_v2(
368                        event_id.ok_or(StoreError::NoEventId)?,
369                        scoping.project_id,
370                        received_at,
371                        item,
372                        remote_addr,
373                    )?;
374                }
375                ItemType::Profile => self.produce_profile(
376                    scoping.organization_id,
377                    scoping.project_id,
378                    scoping.key_id,
379                    received_at,
380                    retention,
381                    item,
382                )?,
383                ItemType::ReplayVideo => {
384                    self.produce_replay_video(
385                        event_id,
386                        scoping,
387                        item.payload(),
388                        received_at,
389                        retention,
390                        replay_relay_snuba_publish_disabled,
391                    )?;
392                }
393                ItemType::ReplayRecording => {
394                    replay_recording = Some(item);
395                }
396                ItemType::ReplayEvent => {
397                    replay_event = Some(item);
398                    self.produce_replay_event(
399                        event_id.ok_or(StoreError::NoEventId)?,
400                        scoping.project_id,
401                        received_at,
402                        retention,
403                        &item.payload(),
404                        replay_relay_snuba_publish_disabled,
405                    )?;
406                }
407                ItemType::CheckIn => {
408                    let client = envelope.meta().client();
409                    self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
410                }
411                ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
412                    scoping,
413                    received_at,
414                    event_id,
415                    retention,
416                    downsampled_retention,
417                    item,
418                )?,
419                ty @ ItemType::Log => {
420                    debug_assert!(
421                        false,
422                        "received {ty} through an envelope, \
423                        this item must be submitted via a specific store message instead"
424                    );
425                    relay_log::error!(
426                        tags.project_key = %scoping.project_key,
427                        "StoreService received unsupported item type '{ty}' in envelope"
428                    );
429                }
430                other => {
431                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
432                    let item_types = envelope
433                        .items()
434                        .map(|item| item.ty().as_str())
435                        .collect::<Vec<_>>();
436                    let attachment_types = envelope
437                        .items()
438                        .map(|item| {
439                            item.attachment_type()
440                                .map(|t| t.to_string())
441                                .unwrap_or_default()
442                        })
443                        .collect::<Vec<_>>();
444
445                    relay_log::with_scope(
446                        |scope| {
447                            scope.set_extra("item_types", item_types.into());
448                            scope.set_extra("attachment_types", attachment_types.into());
449                            if other == &ItemType::FormData {
450                                let payload = item.payload();
451                                let form_data_keys = FormDataIter::new(&payload)
452                                    .map(|entry| entry.key())
453                                    .collect::<Vec<_>>();
454                                scope.set_extra("form_data_keys", form_data_keys.into());
455                            }
456                        },
457                        || {
458                            relay_log::error!(
459                                tags.project_key = %scoping.project_key,
460                                tags.event_type = event_type.unwrap_or("none"),
461                                "StoreService received unexpected item type: {other}"
462                            )
463                        },
464                    )
465                }
466            }
467        }
468
469        if let Some(recording) = replay_recording {
470            // If a recording item type was seen we produce it to Kafka with the replay-event
471            // payload (should it have been provided).
472            //
473            // The replay_video value is always specified as `None`. We do not allow separate
474            // item types for `ReplayVideo` events.
475            let replay_event = replay_event.map(|rv| rv.payload());
476            self.produce_replay_recording(
477                event_id,
478                scoping,
479                &recording.payload(),
480                replay_event.as_deref(),
481                None,
482                received_at,
483                retention,
484                replay_relay_snuba_publish_disabled,
485            )?;
486        }
487
488        if let Some(event_item) = event_item {
489            let event_id = event_id.ok_or(StoreError::NoEventId)?;
490            let project_id = scoping.project_id;
491            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
492
493            self.produce(
494                event_topic,
495                KafkaMessage::Event(EventKafkaMessage {
496                    payload: event_item.payload(),
497                    start_time: safe_timestamp(received_at),
498                    event_id,
499                    project_id,
500                    remote_addr,
501                    attachments,
502                }),
503            )?;
504        } else {
505            debug_assert!(attachments.is_empty());
506        }
507
508        Ok(())
509    }
510
511    fn handle_store_metrics(&self, message: StoreMetrics) {
512        let StoreMetrics {
513            buckets,
514            scoping,
515            retention,
516        } = message;
517
518        let batch_size = self.config.metrics_max_batch_size_bytes();
519        let mut error = None;
520
521        let global_config = self.global_config.current();
522        let mut encoder = BucketEncoder::new(&global_config);
523
524        let now = UnixTimestamp::now();
525        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
526
527        for mut bucket in buckets {
528            let namespace = encoder.prepare(&mut bucket);
529
530            if let Some(received_at) = bucket.metadata.received_at {
531                let delay = now.as_secs().saturating_sub(received_at.as_secs());
532                let (total, count, max) = delay_stats.get_mut(namespace);
533                *total += delay;
534                *count += 1;
535                *max = (*max).max(delay);
536            }
537
538            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
539            // each bucket separately, we only need to split buckets that exceed the size, but not
540            // batches.
541            for view in BucketsView::new(std::slice::from_ref(&bucket))
542                .by_size(batch_size)
543                .flatten()
544            {
545                let message = self.create_metric_message(
546                    scoping.organization_id,
547                    scoping.project_id,
548                    &mut encoder,
549                    namespace,
550                    &view,
551                    retention,
552                );
553
554                let result =
555                    message.and_then(|message| self.send_metric_message(namespace, message));
556
557                let outcome = match result {
558                    Ok(()) => Outcome::Accepted,
559                    Err(e) => {
560                        error.get_or_insert(e);
561                        Outcome::Invalid(DiscardReason::Internal)
562                    }
563                };
564
565                self.metric_outcomes.track(scoping, &[view], outcome);
566            }
567        }
568
569        if let Some(error) = error {
570            relay_log::error!(
571                error = &error as &dyn std::error::Error,
572                "failed to produce metric buckets: {error}"
573            );
574        }
575
576        for (namespace, (total, count, max)) in delay_stats {
577            if count == 0 {
578                continue;
579            }
580            metric!(
581                counter(RelayCounters::MetricDelaySum) += total,
582                namespace = namespace.as_str()
583            );
584            metric!(
585                counter(RelayCounters::MetricDelayCount) += count,
586                namespace = namespace.as_str()
587            );
588            metric!(
589                gauge(RelayGauges::MetricDelayMax) = max,
590                namespace = namespace.as_str()
591            );
592        }
593    }
594
595    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
596        let scoping = message.scoping();
597        let received_at = message.received_at();
598
599        let quantities = message.try_accept(|item| {
600            let item_type = item.trace_item.item_type();
601            let message = KafkaMessage::Item {
602                headers: BTreeMap::from([
603                    ("project_id".to_owned(), scoping.project_id.to_string()),
604                    ("item_type".to_owned(), (item_type as i32).to_string()),
605                ]),
606                message: item.trace_item,
607                item_type,
608            };
609
610            self.produce(KafkaTopic::Items, message)
611                .map(|()| item.quantities)
612        });
613
614        // Accepted outcomes when items have been successfully produced to rdkafka.
615        //
616        // This is only a temporary measure, long term these outcomes will be part of the trace
617        // item and emitted by Snuba to guarantee a delivery to storage.
618        if let Ok(quantities) = quantities {
619            for (category, quantity) in quantities {
620                self.outcome_aggregator.send(TrackOutcome {
621                    category,
622                    event_id: None,
623                    outcome: Outcome::Accepted,
624                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
625                    remote_addr: None,
626                    scoping,
627                    timestamp: received_at,
628                });
629            }
630        }
631    }
632
633    fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
634        let scoping = message.scoping();
635        let received_at = message.received_at();
636
637        let meta = SpanMeta {
638            organization_id: scoping.organization_id,
639            project_id: scoping.project_id,
640            key_id: scoping.key_id,
641            event_id: None,
642            retention_days: message.retention_days,
643            downsampled_retention_days: message.downsampled_retention_days,
644            received: datetime_to_timestamp(received_at),
645        };
646
647        let result = message.try_accept(|span| {
648            let item = Annotated::new(span.item);
649            let message = KafkaMessage::SpanV2 {
650                routing_key: span.routing_key,
651                headers: BTreeMap::from([(
652                    "project_id".to_owned(),
653                    scoping.project_id.to_string(),
654                )]),
655                message: SpanKafkaMessage {
656                    meta,
657                    span: SerializableAnnotated(&item),
658                },
659            };
660
661            self.produce(KafkaTopic::Spans, message)
662        });
663
664        match result {
665            Ok(()) => {
666                relay_statsd::metric!(
667                    counter(RelayCounters::SpanV2Produced) += 1,
668                    via = "processing"
669                );
670
671                // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
672                // or the segments consumer, depending on which will produce outcomes later.
673                self.outcome_aggregator.send(TrackOutcome {
674                    category: DataCategory::SpanIndexed,
675                    event_id: None,
676                    outcome: Outcome::Accepted,
677                    quantity: 1,
678                    remote_addr: None,
679                    scoping,
680                    timestamp: received_at,
681                });
682            }
683            Err(error) => {
684                relay_log::error!(
685                    error = &error as &dyn Error,
686                    tags.project_key = %scoping.project_key,
687                    "failed to store span"
688                );
689            }
690        }
691    }
692
693    fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
694        let scoping = message.scoping();
695        let received_at = message.received_at();
696
697        let _ = message.try_accept(|message| {
698            let message = ProfileChunkKafkaMessage {
699                organization_id: scoping.organization_id,
700                project_id: scoping.project_id,
701                received: safe_timestamp(received_at),
702                retention_days: message.retention_days,
703                headers: BTreeMap::from([(
704                    "project_id".to_owned(),
705                    scoping.project_id.to_string(),
706                )]),
707                payload: message.payload,
708            };
709
710            self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
711        });
712    }
713
714    fn create_metric_message<'a>(
715        &self,
716        organization_id: OrganizationId,
717        project_id: ProjectId,
718        encoder: &'a mut BucketEncoder,
719        namespace: MetricNamespace,
720        view: &BucketView<'a>,
721        retention_days: u16,
722    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
723        let value = match view.value() {
724            BucketViewValue::Counter(c) => MetricValue::Counter(c),
725            BucketViewValue::Distribution(data) => MetricValue::Distribution(
726                encoder
727                    .encode_distribution(namespace, data)
728                    .map_err(StoreError::EncodingFailed)?,
729            ),
730            BucketViewValue::Set(data) => MetricValue::Set(
731                encoder
732                    .encode_set(namespace, data)
733                    .map_err(StoreError::EncodingFailed)?,
734            ),
735            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
736        };
737
738        Ok(MetricKafkaMessage {
739            org_id: organization_id,
740            project_id,
741            name: view.name(),
742            value,
743            timestamp: view.timestamp(),
744            tags: view.tags(),
745            retention_days,
746            received_at: view.metadata().received_at,
747        })
748    }
749
750    fn produce(
751        &self,
752        topic: KafkaTopic,
753        // Takes message by value to ensure it is not being produced twice.
754        message: KafkaMessage,
755    ) -> Result<(), StoreError> {
756        relay_log::trace!("Sending kafka message of type {}", message.variant());
757
758        let topic_name = self.producer.client.send_message(topic, &message)?;
759
760        match &message {
761            KafkaMessage::Metric {
762                message: metric, ..
763            } => {
764                metric!(
765                    counter(RelayCounters::ProcessingMessageProduced) += 1,
766                    event_type = message.variant(),
767                    topic = topic_name,
768                    metric_type = metric.value.variant(),
769                    metric_encoding = metric.value.encoding().unwrap_or(""),
770                );
771            }
772            KafkaMessage::ReplayRecordingNotChunked(replay) => {
773                let has_video = replay.replay_video.is_some();
774
775                metric!(
776                    counter(RelayCounters::ProcessingMessageProduced) += 1,
777                    event_type = message.variant(),
778                    topic = topic_name,
779                    has_video = bool_to_str(has_video),
780                );
781            }
782            message => {
783                metric!(
784                    counter(RelayCounters::ProcessingMessageProduced) += 1,
785                    event_type = message.variant(),
786                    topic = topic_name,
787                );
788            }
789        }
790
791        Ok(())
792    }
793
794    /// Produces Kafka messages for the content and metadata of an attachment item.
795    ///
796    /// The `send_individual_attachments` controls whether the metadata of an attachment
797    /// is produced directly as an individual `attachment` message, or returned from this function
798    /// to be later sent as part of an `event` message.
799    ///
800    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
801    /// unless the `send_individual_attachments` flag is set, and the content is small enough
802    /// to fit inside a message.
803    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
804    /// of the `attachment` message instead.
805    fn produce_attachment(
806        &self,
807        event_id: EventId,
808        project_id: ProjectId,
809        item: &Item,
810        send_individual_attachments: bool,
811    ) -> Result<Option<ChunkedAttachment>, StoreError> {
812        let id = Uuid::new_v4().to_string();
813
814        let payload = item.payload();
815        let size = item.len();
816        let max_chunk_size = self.config.attachment_chunk_size();
817
818        let payload = if size == 0 {
819            AttachmentPayload::Chunked(0)
820        } else if let Some(stored_key) = item.stored_key() {
821            AttachmentPayload::Stored(stored_key.into())
822        } else if send_individual_attachments && size < max_chunk_size {
823            // When sending individual attachments, and we have a single chunk, we want to send the
824            // `data` inline in the `attachment` message.
825            // This avoids a needless roundtrip through the attachments cache on the Sentry side.
826            AttachmentPayload::Inline(payload)
827        } else {
828            let mut chunk_index = 0;
829            let mut offset = 0;
830            // This skips chunks for empty attachments. The consumer does not require chunks for
831            // empty attachments. `chunks` will be `0` in this case.
832            while offset < size {
833                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
834                let chunk_message = AttachmentChunkKafkaMessage {
835                    payload: payload.slice(offset..offset + chunk_size),
836                    event_id,
837                    project_id,
838                    id: id.clone(),
839                    chunk_index,
840                };
841
842                self.produce(
843                    KafkaTopic::Attachments,
844                    KafkaMessage::AttachmentChunk(chunk_message),
845                )?;
846                offset += chunk_size;
847                chunk_index += 1;
848            }
849
850            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
851            // is one larger than the last chunk, so it is equal to the number of chunks.
852            AttachmentPayload::Chunked(chunk_index)
853        };
854
855        let attachment = ChunkedAttachment {
856            id,
857            name: match item.filename() {
858                Some(name) => name.to_owned(),
859                None => UNNAMED_ATTACHMENT.to_owned(),
860            },
861            rate_limited: item.rate_limited(),
862            content_type: item
863                .content_type()
864                .map(|content_type| content_type.as_str().to_owned()),
865            attachment_type: item.attachment_type().cloned().unwrap_or_default(),
866            size,
867            payload,
868        };
869
870        if send_individual_attachments {
871            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
872                event_id,
873                project_id,
874                attachment,
875            });
876            self.produce(KafkaTopic::Attachments, message)?;
877            Ok(None)
878        } else {
879            Ok(Some(attachment))
880        }
881    }
882
883    fn produce_user_report(
884        &self,
885        event_id: EventId,
886        project_id: ProjectId,
887        received_at: DateTime<Utc>,
888        item: &Item,
889    ) -> Result<(), StoreError> {
890        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
891            project_id,
892            event_id,
893            start_time: safe_timestamp(received_at),
894            payload: item.payload(),
895        });
896
897        self.produce(KafkaTopic::Attachments, message)
898    }
899
900    fn produce_user_report_v2(
901        &self,
902        event_id: EventId,
903        project_id: ProjectId,
904        received_at: DateTime<Utc>,
905        item: &Item,
906        remote_addr: Option<String>,
907    ) -> Result<(), StoreError> {
908        let message = KafkaMessage::Event(EventKafkaMessage {
909            project_id,
910            event_id,
911            payload: item.payload(),
912            start_time: safe_timestamp(received_at),
913            remote_addr,
914            attachments: vec![],
915        });
916        self.produce(KafkaTopic::Feedback, message)
917    }
918
919    fn send_metric_message(
920        &self,
921        namespace: MetricNamespace,
922        message: MetricKafkaMessage,
923    ) -> Result<(), StoreError> {
924        let topic = match namespace {
925            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
926            MetricNamespace::Unsupported => {
927                relay_log::with_scope(
928                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
929                    || relay_log::error!("store service dropping unknown metric usecase"),
930                );
931                return Ok(());
932            }
933            _ => KafkaTopic::MetricsGeneric,
934        };
935        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
936
937        self.produce(topic, KafkaMessage::Metric { headers, message })?;
938        Ok(())
939    }
940
941    fn produce_profile(
942        &self,
943        organization_id: OrganizationId,
944        project_id: ProjectId,
945        key_id: Option<u64>,
946        received_at: DateTime<Utc>,
947        retention_days: u16,
948        item: &Item,
949    ) -> Result<(), StoreError> {
950        let message = ProfileKafkaMessage {
951            organization_id,
952            project_id,
953            key_id,
954            received: safe_timestamp(received_at),
955            retention_days,
956            headers: BTreeMap::from([
957                (
958                    "sampled".to_owned(),
959                    if item.sampled() { "true" } else { "false" }.to_owned(),
960                ),
961                ("project_id".to_owned(), project_id.to_string()),
962            ]),
963            payload: item.payload(),
964        };
965        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
966        Ok(())
967    }
968
969    fn produce_replay_event(
970        &self,
971        replay_id: EventId,
972        project_id: ProjectId,
973        received_at: DateTime<Utc>,
974        retention_days: u16,
975        payload: &[u8],
976        relay_snuba_publish_disabled: bool,
977    ) -> Result<(), StoreError> {
978        if relay_snuba_publish_disabled {
979            return Ok(());
980        }
981
982        let message = ReplayEventKafkaMessage {
983            replay_id,
984            project_id,
985            retention_days,
986            start_time: safe_timestamp(received_at),
987            payload,
988        };
989        self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
990        Ok(())
991    }
992
993    #[allow(clippy::too_many_arguments)]
994    fn produce_replay_recording(
995        &self,
996        event_id: Option<EventId>,
997        scoping: Scoping,
998        payload: &[u8],
999        replay_event: Option<&[u8]>,
1000        replay_video: Option<&[u8]>,
1001        received_at: DateTime<Utc>,
1002        retention: u16,
1003        relay_snuba_publish_disabled: bool,
1004    ) -> Result<(), StoreError> {
1005        // Maximum number of bytes accepted by the consumer.
1006        let max_payload_size = self.config.max_replay_message_size();
1007
1008        // Size of the consumer message. We can be reasonably sure this won't overflow because
1009        // of the request size validation provided by Nginx and Relay.
1010        let mut payload_size = 2000; // Reserve 2KB for the message metadata.
1011        payload_size += replay_event.as_ref().map_or(0, |b| b.len());
1012        payload_size += replay_video.as_ref().map_or(0, |b| b.len());
1013        payload_size += payload.len();
1014
1015        // If the recording payload can not fit in to the message do not produce and quit early.
1016        if payload_size >= max_payload_size {
1017            relay_log::debug!("replay_recording over maximum size.");
1018            self.outcome_aggregator.send(TrackOutcome {
1019                category: DataCategory::Replay,
1020                event_id,
1021                outcome: Outcome::Invalid(DiscardReason::TooLarge(
1022                    DiscardItemType::ReplayRecording,
1023                )),
1024                quantity: 1,
1025                remote_addr: None,
1026                scoping,
1027                timestamp: received_at,
1028            });
1029            return Ok(());
1030        }
1031
1032        let message =
1033            KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
1034                replay_id: event_id.ok_or(StoreError::NoEventId)?,
1035                project_id: scoping.project_id,
1036                key_id: scoping.key_id,
1037                org_id: scoping.organization_id,
1038                received: safe_timestamp(received_at),
1039                retention_days: retention,
1040                payload,
1041                replay_event,
1042                replay_video,
1043                relay_snuba_publish_disabled,
1044            });
1045
1046        self.produce(KafkaTopic::ReplayRecordings, message)?;
1047
1048        Ok(())
1049    }
1050
1051    fn produce_replay_video(
1052        &self,
1053        event_id: Option<EventId>,
1054        scoping: Scoping,
1055        payload: Bytes,
1056        received_at: DateTime<Utc>,
1057        retention: u16,
1058        relay_snuba_publish_disabled: bool,
1059    ) -> Result<(), StoreError> {
1060        #[derive(Deserialize)]
1061        struct VideoEvent<'a> {
1062            replay_event: &'a [u8],
1063            replay_recording: &'a [u8],
1064            replay_video: &'a [u8],
1065        }
1066
1067        let Ok(VideoEvent {
1068            replay_video,
1069            replay_event,
1070            replay_recording,
1071        }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1072        else {
1073            self.outcome_aggregator.send(TrackOutcome {
1074                category: DataCategory::Replay,
1075                event_id,
1076                outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1077                quantity: 1,
1078                remote_addr: None,
1079                scoping,
1080                timestamp: received_at,
1081            });
1082            return Ok(());
1083        };
1084
1085        self.produce_replay_event(
1086            event_id.ok_or(StoreError::NoEventId)?,
1087            scoping.project_id,
1088            received_at,
1089            retention,
1090            replay_event,
1091            relay_snuba_publish_disabled,
1092        )?;
1093
1094        self.produce_replay_recording(
1095            event_id,
1096            scoping,
1097            replay_recording,
1098            Some(replay_event),
1099            Some(replay_video),
1100            received_at,
1101            retention,
1102            relay_snuba_publish_disabled,
1103        )
1104    }
1105
1106    fn produce_check_in(
1107        &self,
1108        project_id: ProjectId,
1109        received_at: DateTime<Utc>,
1110        client: Option<&str>,
1111        retention_days: u16,
1112        item: &Item,
1113    ) -> Result<(), StoreError> {
1114        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1115            message_type: CheckInMessageType::CheckIn,
1116            project_id,
1117            retention_days,
1118            start_time: safe_timestamp(received_at),
1119            sdk: client.map(str::to_owned),
1120            payload: item.payload(),
1121            routing_key_hint: item.routing_hint(),
1122        });
1123
1124        self.produce(KafkaTopic::Monitors, message)?;
1125
1126        Ok(())
1127    }
1128
1129    fn produce_span(
1130        &self,
1131        scoping: Scoping,
1132        received_at: DateTime<Utc>,
1133        event_id: Option<EventId>,
1134        retention_days: u16,
1135        downsampled_retention_days: u16,
1136        item: &Item,
1137    ) -> Result<(), StoreError> {
1138        debug_assert_eq!(item.ty(), &ItemType::Span);
1139        debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1140
1141        let Scoping {
1142            organization_id,
1143            project_id,
1144            project_key: _,
1145            key_id,
1146        } = scoping;
1147
1148        let payload = item.payload();
1149        let message = SpanKafkaMessageRaw {
1150            meta: SpanMeta {
1151                organization_id,
1152                project_id,
1153                key_id,
1154                event_id,
1155                retention_days,
1156                downsampled_retention_days,
1157                received: datetime_to_timestamp(received_at),
1158            },
1159            span: serde_json::from_slice(&payload)
1160                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1161        };
1162
1163        // Verify that this is a V2 span:
1164        debug_assert!(message.span.contains_key("attributes"));
1165        relay_statsd::metric!(
1166            counter(RelayCounters::SpanV2Produced) += 1,
1167            via = "envelope"
1168        );
1169
1170        self.produce(
1171            KafkaTopic::Spans,
1172            KafkaMessage::SpanRaw {
1173                routing_key: item.routing_hint(),
1174                headers: BTreeMap::from([(
1175                    "project_id".to_owned(),
1176                    scoping.project_id.to_string(),
1177                )]),
1178                message,
1179            },
1180        )?;
1181
1182        // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1183        // or the segments consumer, depending on which will produce outcomes later.
1184        self.outcome_aggregator.send(TrackOutcome {
1185            category: DataCategory::SpanIndexed,
1186            event_id: None,
1187            outcome: Outcome::Accepted,
1188            quantity: 1,
1189            remote_addr: None,
1190            scoping,
1191            timestamp: received_at,
1192        });
1193
1194        Ok(())
1195    }
1196}
1197
1198impl Service for StoreService {
1199    type Interface = Store;
1200
1201    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1202        let this = Arc::new(self);
1203
1204        relay_log::info!("store forwarder started");
1205
1206        while let Some(message) = rx.recv().await {
1207            let service = Arc::clone(&this);
1208            // For now, we run each task synchronously, in the future we might explore how to make
1209            // the store async.
1210            this.pool
1211                .spawn_async(async move { service.handle_message(message) }.boxed())
1212                .await;
1213        }
1214
1215        relay_log::info!("store forwarder stopped");
1216    }
1217}
1218
1219/// This signifies how the attachment payload is being transfered.
1220#[derive(Debug, Serialize)]
1221enum AttachmentPayload {
1222    /// The payload has been split into multiple chunks.
1223    ///
1224    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1225    /// If the payload `size == 0`, the number of chunks will also be `0`.
1226    #[serde(rename = "chunks")]
1227    Chunked(usize),
1228
1229    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1230    #[serde(rename = "data")]
1231    Inline(Bytes),
1232
1233    /// The attachment has already been stored into the objectstore, with the given Id.
1234    #[serde(rename = "stored_id")]
1235    Stored(String),
1236}
1237
1238/// Common attributes for both standalone attachments and processing-relevant attachments.
1239#[derive(Debug, Serialize)]
1240struct ChunkedAttachment {
1241    /// The attachment ID within the event.
1242    ///
1243    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1244    id: String,
1245
1246    /// File name of the attachment file.
1247    name: String,
1248
1249    /// Whether this attachment was rate limited and should be removed after processing.
1250    ///
1251    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1252    /// native crash reports still need to be retained. These attachments are marked with the
1253    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1254    /// not be persisted after processing.
1255    rate_limited: bool,
1256
1257    /// Content type of the attachment payload.
1258    #[serde(skip_serializing_if = "Option::is_none")]
1259    content_type: Option<String>,
1260
1261    /// The Sentry-internal attachment type used in the processing pipeline.
1262    #[serde(serialize_with = "serialize_attachment_type")]
1263    attachment_type: AttachmentType,
1264
1265    /// The size of the attachment in bytes.
1266    size: usize,
1267
1268    /// The attachment payload, chunked, inlined, or already stored.
1269    #[serde(flatten)]
1270    payload: AttachmentPayload,
1271}
1272
1273/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1274///
1275/// Cannot serialize bytes.
1276///
1277/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1278fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1279where
1280    S: serde::Serializer,
1281    T: serde::Serialize,
1282{
1283    serde_json::to_value(t)
1284        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1285        .serialize(serializer)
1286}
1287
1288/// Container payload for event messages.
1289#[derive(Debug, Serialize)]
1290struct EventKafkaMessage {
1291    /// Raw event payload.
1292    payload: Bytes,
1293    /// Time at which the event was received by Relay.
1294    start_time: u64,
1295    /// The event id.
1296    event_id: EventId,
1297    /// The project id for the current event.
1298    project_id: ProjectId,
1299    /// The client ip address.
1300    remote_addr: Option<String>,
1301    /// Attachments that are potentially relevant for processing.
1302    attachments: Vec<ChunkedAttachment>,
1303}
1304
1305#[derive(Debug, Serialize)]
1306struct ReplayEventKafkaMessage<'a> {
1307    /// Raw event payload.
1308    payload: &'a [u8],
1309    /// Time at which the event was received by Relay.
1310    start_time: u64,
1311    /// The event id.
1312    replay_id: EventId,
1313    /// The project id for the current event.
1314    project_id: ProjectId,
1315    // Number of days to retain.
1316    retention_days: u16,
1317}
1318
1319/// Container payload for chunks of attachments.
1320#[derive(Debug, Serialize)]
1321struct AttachmentChunkKafkaMessage {
1322    /// Chunk payload of the attachment.
1323    payload: Bytes,
1324    /// The event id.
1325    event_id: EventId,
1326    /// The project id for the current event.
1327    project_id: ProjectId,
1328    /// The attachment ID within the event.
1329    ///
1330    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1331    id: String,
1332    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1333    chunk_index: usize,
1334}
1335
1336/// A "standalone" attachment.
1337///
1338/// Still belongs to an event but can be sent independently (like UserReport) and is not
1339/// considered in processing.
1340#[derive(Debug, Serialize)]
1341struct AttachmentKafkaMessage {
1342    /// The event id.
1343    event_id: EventId,
1344    /// The project id for the current event.
1345    project_id: ProjectId,
1346    /// The attachment.
1347    attachment: ChunkedAttachment,
1348}
1349
1350#[derive(Debug, Serialize)]
1351struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1352    replay_id: EventId,
1353    key_id: Option<u64>,
1354    org_id: OrganizationId,
1355    project_id: ProjectId,
1356    received: u64,
1357    retention_days: u16,
1358    #[serde(with = "serde_bytes")]
1359    payload: &'a [u8],
1360    #[serde(with = "serde_bytes")]
1361    replay_event: Option<&'a [u8]>,
1362    #[serde(with = "serde_bytes")]
1363    replay_video: Option<&'a [u8]>,
1364    relay_snuba_publish_disabled: bool,
1365}
1366
1367/// User report for an event wrapped up in a message ready for consumption in Kafka.
1368///
1369/// Is always independent of an event and can be sent as part of any envelope.
1370#[derive(Debug, Serialize)]
1371struct UserReportKafkaMessage {
1372    /// The project id for the current event.
1373    project_id: ProjectId,
1374    start_time: u64,
1375    payload: Bytes,
1376
1377    // Used for KafkaMessage::key
1378    #[serde(skip)]
1379    event_id: EventId,
1380}
1381
1382#[derive(Clone, Debug, Serialize)]
1383struct MetricKafkaMessage<'a> {
1384    org_id: OrganizationId,
1385    project_id: ProjectId,
1386    name: &'a MetricName,
1387    #[serde(flatten)]
1388    value: MetricValue<'a>,
1389    timestamp: UnixTimestamp,
1390    tags: &'a BTreeMap<String, String>,
1391    retention_days: u16,
1392    #[serde(skip_serializing_if = "Option::is_none")]
1393    received_at: Option<UnixTimestamp>,
1394}
1395
1396#[derive(Clone, Debug, Serialize)]
1397#[serde(tag = "type", content = "value")]
1398enum MetricValue<'a> {
1399    #[serde(rename = "c")]
1400    Counter(FiniteF64),
1401    #[serde(rename = "d")]
1402    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1403    #[serde(rename = "s")]
1404    Set(ArrayEncoding<'a, SetView<'a>>),
1405    #[serde(rename = "g")]
1406    Gauge(GaugeValue),
1407}
1408
1409impl MetricValue<'_> {
1410    fn variant(&self) -> &'static str {
1411        match self {
1412            Self::Counter(_) => "counter",
1413            Self::Distribution(_) => "distribution",
1414            Self::Set(_) => "set",
1415            Self::Gauge(_) => "gauge",
1416        }
1417    }
1418
1419    fn encoding(&self) -> Option<&'static str> {
1420        match self {
1421            Self::Distribution(ae) => Some(ae.name()),
1422            Self::Set(ae) => Some(ae.name()),
1423            _ => None,
1424        }
1425    }
1426}
1427
1428#[derive(Clone, Debug, Serialize)]
1429struct ProfileKafkaMessage {
1430    organization_id: OrganizationId,
1431    project_id: ProjectId,
1432    key_id: Option<u64>,
1433    received: u64,
1434    retention_days: u16,
1435    #[serde(skip)]
1436    headers: BTreeMap<String, String>,
1437    payload: Bytes,
1438}
1439
1440/// Used to discriminate cron monitor ingestion messages.
1441///
1442/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1443/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1444/// intended to ensure the clock continues to run even when ingestion volume drops.
1445#[allow(dead_code)]
1446#[derive(Debug, Serialize)]
1447#[serde(rename_all = "snake_case")]
1448enum CheckInMessageType {
1449    ClockPulse,
1450    CheckIn,
1451}
1452
1453#[derive(Debug, Serialize)]
1454struct CheckInKafkaMessage {
1455    #[serde(skip)]
1456    routing_key_hint: Option<Uuid>,
1457
1458    /// Used by the consumer to discrinminate the message.
1459    message_type: CheckInMessageType,
1460    /// Raw event payload.
1461    payload: Bytes,
1462    /// Time at which the event was received by Relay.
1463    start_time: u64,
1464    /// The SDK client which produced the event.
1465    sdk: Option<String>,
1466    /// The project id for the current event.
1467    project_id: ProjectId,
1468    /// Number of days to retain.
1469    retention_days: u16,
1470}
1471
1472#[derive(Debug, Serialize)]
1473struct SpanKafkaMessageRaw<'a> {
1474    #[serde(flatten)]
1475    meta: SpanMeta,
1476    #[serde(flatten)]
1477    span: BTreeMap<&'a str, &'a RawValue>,
1478}
1479
1480#[derive(Debug, Serialize)]
1481struct SpanKafkaMessage<'a> {
1482    #[serde(flatten)]
1483    meta: SpanMeta,
1484    #[serde(flatten)]
1485    span: SerializableAnnotated<'a, SpanV2>,
1486}
1487
1488#[derive(Debug, Serialize)]
1489struct SpanMeta {
1490    organization_id: OrganizationId,
1491    project_id: ProjectId,
1492    // Required for the buffer to emit outcomes scoped to the DSN.
1493    #[serde(skip_serializing_if = "Option::is_none")]
1494    key_id: Option<u64>,
1495    #[serde(skip_serializing_if = "Option::is_none")]
1496    event_id: Option<EventId>,
1497    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1498    received: f64,
1499    /// Number of days until these data should be deleted.
1500    retention_days: u16,
1501    /// Number of days until the downsampled version of this data should be deleted.
1502    downsampled_retention_days: u16,
1503}
1504
1505#[derive(Clone, Debug, Serialize)]
1506struct ProfileChunkKafkaMessage {
1507    organization_id: OrganizationId,
1508    project_id: ProjectId,
1509    received: u64,
1510    retention_days: u16,
1511    #[serde(skip)]
1512    headers: BTreeMap<String, String>,
1513    payload: Bytes,
1514}
1515
1516/// An enum over all possible ingest messages.
1517#[derive(Debug, Serialize)]
1518#[serde(tag = "type", rename_all = "snake_case")]
1519#[allow(clippy::large_enum_variant)]
1520enum KafkaMessage<'a> {
1521    Event(EventKafkaMessage),
1522    UserReport(UserReportKafkaMessage),
1523    Metric {
1524        #[serde(skip)]
1525        headers: BTreeMap<String, String>,
1526        #[serde(flatten)]
1527        message: MetricKafkaMessage<'a>,
1528    },
1529    CheckIn(CheckInKafkaMessage),
1530    Item {
1531        #[serde(skip)]
1532        headers: BTreeMap<String, String>,
1533        #[serde(skip)]
1534        item_type: TraceItemType,
1535        #[serde(skip)]
1536        message: TraceItem,
1537    },
1538    SpanRaw {
1539        #[serde(skip)]
1540        routing_key: Option<Uuid>,
1541        #[serde(skip)]
1542        headers: BTreeMap<String, String>,
1543        #[serde(flatten)]
1544        message: SpanKafkaMessageRaw<'a>,
1545    },
1546    SpanV2 {
1547        #[serde(skip)]
1548        routing_key: Option<Uuid>,
1549        #[serde(skip)]
1550        headers: BTreeMap<String, String>,
1551        #[serde(flatten)]
1552        message: SpanKafkaMessage<'a>,
1553    },
1554
1555    Attachment(AttachmentKafkaMessage),
1556    AttachmentChunk(AttachmentChunkKafkaMessage),
1557
1558    Profile(ProfileKafkaMessage),
1559    ProfileChunk(ProfileChunkKafkaMessage),
1560
1561    ReplayEvent(ReplayEventKafkaMessage<'a>),
1562    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1563}
1564
1565impl Message for KafkaMessage<'_> {
1566    fn variant(&self) -> &'static str {
1567        match self {
1568            KafkaMessage::Event(_) => "event",
1569            KafkaMessage::UserReport(_) => "user_report",
1570            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1571                MetricNamespace::Sessions => "metric_sessions",
1572                MetricNamespace::Transactions => "metric_transactions",
1573                MetricNamespace::Spans => "metric_spans",
1574                MetricNamespace::Custom => "metric_custom",
1575                MetricNamespace::Unsupported => "metric_unsupported",
1576            },
1577            KafkaMessage::CheckIn(_) => "check_in",
1578            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1579            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1580
1581            KafkaMessage::Attachment(_) => "attachment",
1582            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1583
1584            KafkaMessage::Profile(_) => "profile",
1585            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1586
1587            KafkaMessage::ReplayEvent(_) => "replay_event",
1588            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1589        }
1590    }
1591
1592    /// Returns the partitioning key for this Kafka message determining.
1593    fn key(&self) -> Option<relay_kafka::Key> {
1594        match self {
1595            Self::Event(message) => Some(message.event_id.0),
1596            Self::UserReport(message) => Some(message.event_id.0),
1597            Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1598
1599            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1600            //
1601            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1602            // recieve the routing_key_hint form their envelopes.
1603            Self::CheckIn(message) => message.routing_key_hint,
1604
1605            Self::Attachment(message) => Some(message.event_id.0),
1606            Self::AttachmentChunk(message) => Some(message.event_id.0),
1607            Self::ReplayEvent(message) => Some(message.replay_id.0),
1608
1609            // Random partitioning
1610            Self::Metric { .. }
1611            | Self::Item { .. }
1612            | Self::Profile(_)
1613            | Self::ProfileChunk(_)
1614            | Self::ReplayRecordingNotChunked(_) => None,
1615        }
1616        .filter(|uuid| !uuid.is_nil())
1617        .map(|uuid| uuid.as_u128())
1618    }
1619
1620    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1621        match &self {
1622            KafkaMessage::Metric { headers, .. }
1623            | KafkaMessage::SpanRaw { headers, .. }
1624            | KafkaMessage::SpanV2 { headers, .. }
1625            | KafkaMessage::Item { headers, .. }
1626            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1627            | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1628
1629            KafkaMessage::Event(_)
1630            | KafkaMessage::UserReport(_)
1631            | KafkaMessage::CheckIn(_)
1632            | KafkaMessage::Attachment(_)
1633            | KafkaMessage::AttachmentChunk(_)
1634            | KafkaMessage::ReplayEvent(_)
1635            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1636        }
1637    }
1638
1639    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1640        match self {
1641            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1642            KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1643            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1644            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1645            KafkaMessage::Item { message, .. } => {
1646                let mut payload = Vec::new();
1647                match message.encode(&mut payload) {
1648                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1649                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1650                }
1651            }
1652            KafkaMessage::Event(_)
1653            | KafkaMessage::UserReport(_)
1654            | KafkaMessage::CheckIn(_)
1655            | KafkaMessage::Attachment(_)
1656            | KafkaMessage::AttachmentChunk(_)
1657            | KafkaMessage::Profile(_)
1658            | KafkaMessage::ProfileChunk(_)
1659            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1660                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1661                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1662            },
1663        }
1664    }
1665}
1666
1667fn serialize_as_json<T: serde::Serialize>(
1668    value: &T,
1669) -> Result<SerializationOutput<'_>, ClientError> {
1670    match serde_json::to_vec(value) {
1671        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1672        Err(err) => Err(ClientError::InvalidJson(err)),
1673    }
1674}
1675
1676/// Determines if the given item is considered slow.
1677///
1678/// Slow items must be routed to the `Attachments` topic.
1679fn is_slow_item(item: &Item) -> bool {
1680    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1681}
1682
1683fn bool_to_str(value: bool) -> &'static str {
1684    if value { "true" } else { "false" }
1685}
1686
1687/// Returns a safe timestamp for Kafka.
1688///
1689/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1690fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1691    let ts = timestamp.timestamp();
1692    if ts >= 0 {
1693        return ts as u64;
1694    }
1695
1696    // We assume this call can't return < 0.
1697    Utc::now().timestamp() as u64
1698}
1699
1700#[cfg(test)]
1701mod tests {
1702
1703    use super::*;
1704
1705    #[test]
1706    fn disallow_outcomes() {
1707        let config = Config::default();
1708        let producer = Producer::create(&config).unwrap();
1709
1710        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1711            let res = producer
1712                .client
1713                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1714
1715            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1716        }
1717    }
1718}