relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42use crate::services::upload::SignedLocation;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46mod sessions;
47
48/// Fallback name used for attachment items without a `filename` header.
49const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
50
51#[derive(Debug, thiserror::Error)]
52pub enum StoreError {
53    #[error("failed to send the message to kafka: {0}")]
54    SendFailed(#[from] ClientError),
55    #[error("failed to encode data: {0}")]
56    EncodingFailed(std::io::Error),
57    #[error("failed to store event because event id was missing")]
58    NoEventId,
59    #[error("invalid attachment reference")]
60    InvalidAttachmentRef,
61}
62
63impl OutcomeError for StoreError {
64    type Error = Self;
65
66    fn consume(self) -> (Option<Outcome>, Self::Error) {
67        let outcome = match self {
68            StoreError::SendFailed(_) | StoreError::EncodingFailed(_) | StoreError::NoEventId => {
69                Some(Outcome::Invalid(DiscardReason::Internal))
70            }
71            StoreError::InvalidAttachmentRef => {
72                Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
73            }
74        };
75        (outcome, self)
76    }
77}
78
79struct Producer {
80    client: KafkaClient,
81}
82
83impl Producer {
84    pub fn create(config: &Config) -> anyhow::Result<Self> {
85        let mut client_builder = KafkaClient::builder();
86
87        for topic in KafkaTopic::iter().filter(|t| {
88            // Outcomes should not be sent from the store forwarder.
89            // See `KafkaOutcomesProducer`.
90            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
91        }) {
92            let kafka_configs = config.kafka_configs(*topic)?;
93            client_builder = client_builder
94                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
95                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
96        }
97
98        Ok(Self {
99            client: client_builder.build(),
100        })
101    }
102}
103
104/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
105#[derive(Debug)]
106pub struct StoreEnvelope {
107    pub envelope: ManagedEnvelope,
108}
109
110/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
111#[derive(Clone, Debug)]
112pub struct StoreMetrics {
113    pub buckets: Vec<Bucket>,
114    pub scoping: Scoping,
115    pub retention: u16,
116}
117
118/// Publishes a log item to the Sentry core application through Kafka.
119#[derive(Debug)]
120pub struct StoreTraceItem {
121    /// The final trace item which will be produced to Kafka.
122    pub trace_item: TraceItem,
123}
124
125impl Counted for StoreTraceItem {
126    fn quantities(&self) -> Quantities {
127        self.trace_item.quantities()
128    }
129}
130
131/// Publishes a span item to the Sentry core application through Kafka.
132#[derive(Debug)]
133pub struct StoreSpanV2 {
134    /// Routing key to assign a Kafka partition.
135    pub routing_key: Option<Uuid>,
136    /// Default retention of the span.
137    pub retention_days: u16,
138    /// Downsampled retention of the span.
139    pub downsampled_retention_days: u16,
140    /// The final Sentry compatible span item.
141    pub item: SpanV2,
142}
143
144impl Counted for StoreSpanV2 {
145    fn quantities(&self) -> Quantities {
146        smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
147    }
148}
149
150/// Publishes a singular profile chunk to Kafka.
151#[derive(Debug)]
152pub struct StoreProfileChunk {
153    /// Default retention of the span.
154    pub retention_days: u16,
155    /// The serialized profile chunk payload.
156    pub payload: Bytes,
157    /// Outcome quantities associated with this profile.
158    ///
159    /// Quantities are different for backend and ui profile chunks.
160    pub quantities: Quantities,
161}
162
163impl Counted for StoreProfileChunk {
164    fn quantities(&self) -> Quantities {
165        self.quantities.clone()
166    }
167}
168
169/// A replay to be stored to Kafka.
170#[derive(Debug)]
171pub struct StoreReplay {
172    /// The event ID.
173    pub event_id: EventId,
174    /// Number of days to retain.
175    pub retention_days: u16,
176    /// The recording payload (rrweb data).
177    pub recording: Bytes,
178    /// Optional replay event payload (JSON).
179    pub event: Option<Bytes>,
180    /// Optional replay video.
181    pub video: Option<Bytes>,
182    /// Outcome quantities associated with this replay.
183    ///
184    /// Quantities are different for web and native replays.
185    pub quantities: Quantities,
186}
187
188impl Counted for StoreReplay {
189    fn quantities(&self) -> Quantities {
190        self.quantities.clone()
191    }
192}
193
194/// An attachment to be stored to Kafka.
195#[derive(Debug)]
196pub struct StoreAttachment {
197    /// The event ID.
198    pub event_id: EventId,
199    /// That attachment item.
200    pub attachment: Item,
201    /// Outcome quantities associated with this attachment.
202    pub quantities: Quantities,
203    /// Data retention in days for this attachment.
204    pub retention: u16,
205}
206
207impl Counted for StoreAttachment {
208    fn quantities(&self) -> Quantities {
209        self.quantities.clone()
210    }
211}
212
213/// A user report to be stored to Kafka.
214#[derive(Debug)]
215pub struct StoreUserReport {
216    /// The event ID.
217    pub event_id: EventId,
218    /// The user report.
219    pub report: Item,
220}
221
222impl Counted for StoreUserReport {
223    fn quantities(&self) -> Quantities {
224        smallvec::smallvec![(DataCategory::UserReportV2, 1)]
225    }
226}
227
228/// A profile to be stored to Kafka.
229#[derive(Debug)]
230pub struct StoreProfile {
231    /// Number of days to retain.
232    pub retention_days: u16,
233    /// The profile.
234    pub profile: Item,
235    /// Outcome quantities associated with this profile.
236    pub quantities: Quantities,
237}
238
239impl Counted for StoreProfile {
240    fn quantities(&self) -> Quantities {
241        self.quantities.clone()
242    }
243}
244
245/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
246pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
247
248/// Service interface for the [`StoreEnvelope`] message.
249#[derive(Debug)]
250pub enum Store {
251    /// An envelope containing a mixture of items.
252    ///
253    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
254    /// for example logs must be submitted via [`Self::TraceItem`] instead.
255    ///
256    /// Long term this variant is going to be replaced with fully typed variants of items which can
257    /// be stored instead.
258    Envelope(StoreEnvelope),
259    /// Aggregated generic metrics.
260    Metrics(StoreMetrics),
261    /// A singular [`TraceItem`].
262    TraceItem(Managed<StoreTraceItem>),
263    /// A singular Span.
264    Span(Managed<Box<StoreSpanV2>>),
265    /// A singular profile chunk.
266    ProfileChunk(Managed<StoreProfileChunk>),
267    /// A singular replay.
268    Replay(Managed<StoreReplay>),
269    /// A singular attachment.
270    Attachment(Managed<StoreAttachment>),
271    /// A singular user report.
272    UserReport(Managed<StoreUserReport>),
273    /// A single profile.
274    Profile(Managed<StoreProfile>),
275}
276
277impl Store {
278    /// Returns the name of the message variant.
279    fn variant(&self) -> &'static str {
280        match self {
281            Store::Envelope(_) => "envelope",
282            Store::Metrics(_) => "metrics",
283            Store::TraceItem(_) => "trace_item",
284            Store::Span(_) => "span",
285            Store::ProfileChunk(_) => "profile_chunk",
286            Store::Replay(_) => "replay",
287            Store::Attachment(_) => "attachment",
288            Store::UserReport(_) => "user_report",
289            Store::Profile(_) => "profile",
290        }
291    }
292}
293
294impl Interface for Store {}
295
296impl FromMessage<StoreEnvelope> for Store {
297    type Response = NoResponse;
298
299    fn from_message(message: StoreEnvelope, _: ()) -> Self {
300        Self::Envelope(message)
301    }
302}
303
304impl FromMessage<StoreMetrics> for Store {
305    type Response = NoResponse;
306
307    fn from_message(message: StoreMetrics, _: ()) -> Self {
308        Self::Metrics(message)
309    }
310}
311
312impl FromMessage<Managed<StoreTraceItem>> for Store {
313    type Response = NoResponse;
314
315    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
316        Self::TraceItem(message)
317    }
318}
319
320impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
321    type Response = NoResponse;
322
323    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
324        Self::Span(message)
325    }
326}
327
328impl FromMessage<Managed<StoreProfileChunk>> for Store {
329    type Response = NoResponse;
330
331    fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
332        Self::ProfileChunk(message)
333    }
334}
335
336impl FromMessage<Managed<StoreReplay>> for Store {
337    type Response = NoResponse;
338
339    fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
340        Self::Replay(message)
341    }
342}
343
344impl FromMessage<Managed<StoreAttachment>> for Store {
345    type Response = NoResponse;
346
347    fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
348        Self::Attachment(message)
349    }
350}
351
352impl FromMessage<Managed<StoreUserReport>> for Store {
353    type Response = NoResponse;
354
355    fn from_message(message: Managed<StoreUserReport>, _: ()) -> Self {
356        Self::UserReport(message)
357    }
358}
359
360impl FromMessage<Managed<StoreProfile>> for Store {
361    type Response = NoResponse;
362
363    fn from_message(message: Managed<StoreProfile>, _: ()) -> Self {
364        Self::Profile(message)
365    }
366}
367
368/// Service implementing the [`Store`] interface.
369pub struct StoreService {
370    pool: StoreServicePool,
371    config: Arc<Config>,
372    global_config: GlobalConfigHandle,
373    outcome_aggregator: Addr<TrackOutcome>,
374    metric_outcomes: MetricOutcomes,
375    producer: Producer,
376}
377
378impl StoreService {
379    pub fn create(
380        pool: StoreServicePool,
381        config: Arc<Config>,
382        global_config: GlobalConfigHandle,
383        outcome_aggregator: Addr<TrackOutcome>,
384        metric_outcomes: MetricOutcomes,
385    ) -> anyhow::Result<Self> {
386        let producer = Producer::create(&config)?;
387        Ok(Self {
388            pool,
389            config,
390            global_config,
391            outcome_aggregator,
392            metric_outcomes,
393            producer,
394        })
395    }
396
397    fn handle_message(&self, message: Store) {
398        let ty = message.variant();
399        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
400            match message {
401                Store::Envelope(message) => self.handle_store_envelope(message),
402                Store::Metrics(message) => self.handle_store_metrics(message),
403                Store::TraceItem(message) => self.handle_store_trace_item(message),
404                Store::Span(message) => self.handle_store_span(message),
405                Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
406                Store::Replay(message) => self.handle_store_replay(message),
407                Store::Attachment(message) => self.handle_store_attachment(message),
408                Store::UserReport(message) => self.handle_user_report(message),
409                Store::Profile(message) => self.handle_profile(message),
410            }
411        })
412    }
413
414    fn handle_store_envelope(&self, message: StoreEnvelope) {
415        let StoreEnvelope { mut envelope } = message;
416
417        let scoping = envelope.scoping();
418        match self.store_envelope(&mut envelope) {
419            Ok(()) => envelope.accept(),
420            Err(error) => {
421                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
422                relay_log::error!(
423                    error = &error as &dyn Error,
424                    tags.project_key = %scoping.project_key,
425                    "failed to store envelope"
426                );
427            }
428        }
429    }
430
431    fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
432        let mut envelope = managed_envelope.take_envelope();
433        let received_at = managed_envelope.received_at();
434        let scoping = managed_envelope.scoping();
435
436        let retention = envelope.retention();
437        let downsampled_retention = envelope.downsampled_retention();
438
439        let event_id = envelope.event_id();
440        let event_item = envelope.as_mut().take_item_by(|item| {
441            matches!(
442                item.ty(),
443                ItemType::Event | ItemType::Transaction | ItemType::Security
444            )
445        });
446        let event_type = event_item.as_ref().map(|item| item.ty());
447
448        // Some error events like minidumps need all attachment chunks to be processed _before_
449        // the event payload on the consumer side. Transaction attachments do not require this ordering
450        // guarantee, so they do not have to go to the same topic as their event payload.
451        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
452            KafkaTopic::Transactions
453        } else if envelope.get_item_by(is_slow_item).is_some() {
454            KafkaTopic::Attachments
455        } else {
456            KafkaTopic::Events
457        };
458
459        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
460
461        let mut attachments = Vec::new();
462
463        for item in envelope.items() {
464            let content_type = item.content_type();
465            match item.ty() {
466                ItemType::Attachment => {
467                    if let Some(attachment) = self.produce_attachment(
468                        event_id.ok_or(StoreError::NoEventId)?,
469                        scoping.project_id,
470                        scoping.organization_id,
471                        item,
472                        send_individual_attachments,
473                        retention,
474                    )? {
475                        attachments.push(attachment);
476                    }
477                }
478                ItemType::UserReport => {
479                    debug_assert!(event_topic == KafkaTopic::Attachments);
480                    self.produce_user_report(
481                        event_id.ok_or(StoreError::NoEventId)?,
482                        scoping.project_id,
483                        scoping.organization_id,
484                        received_at,
485                        item,
486                    )?;
487                }
488                ItemType::UserReportV2 => {
489                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
490                    self.produce_user_report_v2(
491                        event_id.ok_or(StoreError::NoEventId)?,
492                        scoping.project_id,
493                        scoping.organization_id,
494                        received_at,
495                        item,
496                        remote_addr,
497                    )?;
498                }
499                ItemType::Profile => self.produce_profile(
500                    scoping.organization_id,
501                    scoping.project_id,
502                    scoping.key_id,
503                    received_at,
504                    retention,
505                    item,
506                )?,
507                ItemType::CheckIn => {
508                    let client = envelope.meta().client();
509                    self.produce_check_in(
510                        scoping.project_id,
511                        scoping.organization_id,
512                        received_at,
513                        client,
514                        retention,
515                        item,
516                    )?
517                }
518                ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span(
519                    scoping,
520                    received_at,
521                    event_id,
522                    retention,
523                    downsampled_retention,
524                    item,
525                )?,
526                ty @ ItemType::Log => {
527                    debug_assert!(
528                        false,
529                        "received {ty} through an envelope, \
530                        this item must be submitted via a specific store message instead"
531                    );
532                    relay_log::error!(
533                        tags.project_key = %scoping.project_key,
534                        "StoreService received unsupported item type '{ty}' in envelope"
535                    );
536                }
537                other => {
538                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
539                    let item_types = envelope
540                        .items()
541                        .map(|item| item.ty().as_str())
542                        .collect::<Vec<_>>();
543                    let attachment_types = envelope
544                        .items()
545                        .map(|item| {
546                            item.attachment_type()
547                                .map(|t| t.to_string())
548                                .unwrap_or_default()
549                        })
550                        .collect::<Vec<_>>();
551
552                    relay_log::with_scope(
553                        |scope| {
554                            scope.set_extra("item_types", item_types.into());
555                            scope.set_extra("attachment_types", attachment_types.into());
556                            if other == &ItemType::FormData {
557                                let payload = item.payload();
558                                let form_data_keys = FormDataIter::new(&payload)
559                                    .map(|entry| entry.key())
560                                    .collect::<Vec<_>>();
561                                scope.set_extra("form_data_keys", form_data_keys.into());
562                            }
563                        },
564                        || {
565                            relay_log::error!(
566                                tags.project_key = %scoping.project_key,
567                                tags.event_type = event_type.unwrap_or("none"),
568                                "StoreService received unexpected item type: {other}"
569                            )
570                        },
571                    )
572                }
573            }
574        }
575
576        if let Some(event_item) = event_item {
577            let event_id = event_id.ok_or(StoreError::NoEventId)?;
578            let project_id = scoping.project_id;
579            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
580
581            self.produce(
582                event_topic,
583                KafkaMessage::Event(EventKafkaMessage {
584                    payload: event_item.payload(),
585                    start_time: safe_timestamp(received_at),
586                    event_id,
587                    project_id,
588                    remote_addr,
589                    attachments,
590                    org_id: scoping.organization_id,
591                }),
592            )?;
593        } else {
594            debug_assert!(attachments.is_empty());
595        }
596
597        Ok(())
598    }
599
600    fn handle_store_metrics(&self, message: StoreMetrics) {
601        let StoreMetrics {
602            buckets,
603            scoping,
604            retention,
605        } = message;
606
607        let batch_size = self.config.metrics_max_batch_size_bytes();
608        let mut error = None;
609
610        let global_config = self.global_config.current();
611        let mut encoder = BucketEncoder::new(&global_config);
612
613        let emit_sessions_to_eap = utils::is_rolled_out(
614            scoping.organization_id.value(),
615            global_config.options.sessions_eap_rollout_rate,
616        )
617        .is_keep();
618
619        let now = UnixTimestamp::now();
620        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
621
622        for mut bucket in buckets {
623            let namespace = encoder.prepare(&mut bucket);
624
625            if let Some(received_at) = bucket.metadata.received_at {
626                let delay = now.as_secs().saturating_sub(received_at.as_secs());
627                let (total, count, max) = delay_stats.get_mut(namespace);
628                *total += delay;
629                *count += 1;
630                *max = (*max).max(delay);
631            }
632
633            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
634            // each bucket separately, we only need to split buckets that exceed the size, but not
635            // batches.
636            for view in BucketsView::new(std::slice::from_ref(&bucket))
637                .by_size(batch_size)
638                .flatten()
639            {
640                let message = self.create_metric_message(
641                    scoping.organization_id,
642                    scoping.project_id,
643                    &mut encoder,
644                    namespace,
645                    &view,
646                    retention,
647                );
648
649                let result =
650                    message.and_then(|message| self.send_metric_message(namespace, message));
651
652                let outcome = match result {
653                    Ok(()) => Outcome::Accepted,
654                    Err(e) => {
655                        error.get_or_insert(e);
656                        Outcome::Invalid(DiscardReason::Internal)
657                    }
658                };
659
660                self.metric_outcomes.track(scoping, &[view], outcome);
661            }
662
663            if emit_sessions_to_eap
664                && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
665            {
666                let message = KafkaMessage::for_item(scoping, trace_item);
667                let _ = self.produce(KafkaTopic::Items, message);
668            }
669        }
670
671        if let Some(error) = error {
672            relay_log::error!(
673                error = &error as &dyn std::error::Error,
674                "failed to produce metric buckets: {error}"
675            );
676        }
677
678        for (namespace, (total, count, max)) in delay_stats {
679            if count == 0 {
680                continue;
681            }
682            metric!(
683                counter(RelayCounters::MetricDelaySum) += total,
684                namespace = namespace.as_str()
685            );
686            metric!(
687                counter(RelayCounters::MetricDelayCount) += count,
688                namespace = namespace.as_str()
689            );
690            metric!(
691                gauge(RelayGauges::MetricDelayMax) = max,
692                namespace = namespace.as_str()
693            );
694        }
695    }
696
697    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
698        let scoping = message.scoping();
699        let received_at = message.received_at();
700
701        let eap_emits_outcomes = utils::is_rolled_out(
702            scoping.organization_id.value(),
703            self.global_config
704                .current()
705                .options
706                .eap_outcomes_rollout_rate,
707        )
708        .is_keep();
709
710        let outcomes = message.try_accept(|mut item| {
711            let outcomes = match eap_emits_outcomes {
712                true => None,
713                false => item.trace_item.outcomes.take(),
714            };
715
716            let message = KafkaMessage::for_item(scoping, item.trace_item);
717            self.produce(KafkaTopic::Items, message).map(|()| outcomes)
718        });
719
720        // Accepted outcomes when items have been successfully produced to rdkafka.
721        //
722        // This is only a temporary measure, long term these outcomes will be part of the trace
723        // item and emitted by Snuba to guarantee a delivery to storage.
724        if let Ok(Some(outcomes)) = outcomes {
725            for (category, quantity) in outcomes.quantities() {
726                self.outcome_aggregator.send(TrackOutcome {
727                    category,
728                    event_id: None,
729                    outcome: Outcome::Accepted,
730                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
731                    remote_addr: None,
732                    scoping,
733                    timestamp: received_at,
734                });
735            }
736        }
737    }
738
739    fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
740        let scoping = message.scoping();
741        let received_at = message.received_at();
742
743        let relay_emits_accepted_outcome = !utils::is_rolled_out(
744            scoping.organization_id.value(),
745            self.global_config
746                .current()
747                .options
748                .eap_span_outcomes_rollout_rate,
749        )
750        .is_keep();
751
752        let meta = SpanMeta {
753            organization_id: scoping.organization_id,
754            project_id: scoping.project_id,
755            key_id: scoping.key_id,
756            event_id: None,
757            retention_days: message.retention_days,
758            downsampled_retention_days: message.downsampled_retention_days,
759            received: datetime_to_timestamp(received_at),
760            accepted_outcome_emitted: relay_emits_accepted_outcome,
761        };
762
763        let result = message.try_accept(|span| {
764            let item = Annotated::new(span.item);
765            let message = KafkaMessage::SpanV2 {
766                routing_key: span.routing_key,
767                headers: BTreeMap::from([(
768                    "project_id".to_owned(),
769                    scoping.project_id.to_string(),
770                )]),
771                message: SpanKafkaMessage {
772                    meta,
773                    span: SerializableAnnotated(&item),
774                },
775                org_id: scoping.organization_id,
776            };
777
778            self.produce(KafkaTopic::Spans, message)
779        });
780
781        if result.is_ok() {
782            relay_statsd::metric!(
783                counter(RelayCounters::SpanV2Produced) += 1,
784                via = "processing"
785            );
786
787            if relay_emits_accepted_outcome {
788                // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
789                // or the segments consumer, depending on which will produce outcomes later.
790                self.outcome_aggregator.send(TrackOutcome {
791                    category: DataCategory::SpanIndexed,
792                    event_id: None,
793                    outcome: Outcome::Accepted,
794                    quantity: 1,
795                    remote_addr: None,
796                    scoping,
797                    timestamp: received_at,
798                });
799            }
800        }
801    }
802
803    fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
804        let scoping = message.scoping();
805        let received_at = message.received_at();
806
807        let _ = message.try_accept(|message| {
808            let message = ProfileChunkKafkaMessage {
809                organization_id: scoping.organization_id,
810                project_id: scoping.project_id,
811                received: safe_timestamp(received_at),
812                retention_days: message.retention_days,
813                headers: BTreeMap::from([(
814                    "project_id".to_owned(),
815                    scoping.project_id.to_string(),
816                )]),
817                payload: message.payload,
818            };
819
820            self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
821        });
822    }
823
824    fn handle_store_replay(&self, message: Managed<StoreReplay>) {
825        let scoping = message.scoping();
826        let received_at = message.received_at();
827
828        let _ = message.try_accept(|replay| {
829            let kafka_msg =
830                KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
831                    replay_id: replay.event_id,
832                    key_id: scoping.key_id,
833                    org_id: scoping.organization_id,
834                    project_id: scoping.project_id,
835                    received: safe_timestamp(received_at),
836                    retention_days: replay.retention_days,
837                    payload: &replay.recording,
838                    replay_event: replay.event.as_deref(),
839                    replay_video: replay.video.as_deref(),
840                    // Hardcoded to `true` to indicate to the consumer that it should always publish the
841                    // replay_event as relay no longer does it.
842                    relay_snuba_publish_disabled: true,
843                });
844            self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
845        });
846    }
847
848    fn handle_store_attachment(&self, message: Managed<StoreAttachment>) {
849        let scoping = message.scoping();
850        let _ = message.try_accept(|attachment| {
851            let result = self.produce_attachment(
852                attachment.event_id,
853                scoping.project_id,
854                scoping.organization_id,
855                &attachment.attachment,
856                // Hardcoded to `true` since standalone attachments are 'individual attachments'.
857                true,
858                attachment.retention,
859            );
860            // Since we are sending an 'individual attachment' this function should never return a
861            // `ChunkedAttachment`.
862            debug_assert!(!matches!(result, Ok(Some(_))));
863            result
864        });
865    }
866
867    fn handle_user_report(&self, message: Managed<StoreUserReport>) {
868        let scoping = message.scoping();
869        let received_at = message.received_at();
870
871        let _ = message.try_accept(|report| {
872            let kafka_msg = KafkaMessage::UserReport(UserReportKafkaMessage {
873                project_id: scoping.project_id,
874                event_id: report.event_id,
875                start_time: safe_timestamp(received_at),
876                payload: report.report.payload(),
877                org_id: scoping.organization_id,
878            });
879            self.produce(KafkaTopic::Attachments, kafka_msg)
880        });
881    }
882
883    fn handle_profile(&self, message: Managed<StoreProfile>) {
884        let scoping = message.scoping();
885        let received_at = message.received_at();
886
887        let _ = message.try_accept(|profile| {
888            self.produce_profile(
889                scoping.organization_id,
890                scoping.project_id,
891                scoping.key_id,
892                received_at,
893                profile.retention_days,
894                &profile.profile,
895            )
896        });
897    }
898
899    fn create_metric_message<'a>(
900        &self,
901        organization_id: OrganizationId,
902        project_id: ProjectId,
903        encoder: &'a mut BucketEncoder,
904        namespace: MetricNamespace,
905        view: &BucketView<'a>,
906        retention_days: u16,
907    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
908        let value = match view.value() {
909            BucketViewValue::Counter(c) => MetricValue::Counter(c),
910            BucketViewValue::Distribution(data) => MetricValue::Distribution(
911                encoder
912                    .encode_distribution(namespace, data)
913                    .map_err(StoreError::EncodingFailed)?,
914            ),
915            BucketViewValue::Set(data) => MetricValue::Set(
916                encoder
917                    .encode_set(namespace, data)
918                    .map_err(StoreError::EncodingFailed)?,
919            ),
920            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
921        };
922
923        Ok(MetricKafkaMessage {
924            org_id: organization_id,
925            project_id,
926            name: view.name(),
927            value,
928            timestamp: view.timestamp(),
929            tags: view.tags(),
930            retention_days,
931            received_at: view.metadata().received_at,
932        })
933    }
934
935    fn produce(
936        &self,
937        topic: KafkaTopic,
938        // Takes message by value to ensure it is not being produced twice.
939        message: KafkaMessage,
940    ) -> Result<(), StoreError> {
941        relay_log::trace!("Sending kafka message of type {}", message.variant());
942
943        let topic_name = self
944            .producer
945            .client
946            .send_message(topic, &message)
947            .inspect_err(|err| {
948                relay_log::error!(
949                    error = err as &dyn Error,
950                    tags.topic = ?topic,
951                    tags.message = message.variant(),
952                    "failed to produce to Kafka"
953                )
954            })?;
955
956        match &message {
957            KafkaMessage::Metric {
958                message: metric, ..
959            } => {
960                metric!(
961                    counter(RelayCounters::ProcessingMessageProduced) += 1,
962                    event_type = message.variant(),
963                    topic = topic_name,
964                    metric_type = metric.value.variant(),
965                    metric_encoding = metric.value.encoding().unwrap_or(""),
966                );
967            }
968            KafkaMessage::ReplayRecordingNotChunked(replay) => {
969                let has_video = replay.replay_video.is_some();
970
971                metric!(
972                    counter(RelayCounters::ProcessingMessageProduced) += 1,
973                    event_type = message.variant(),
974                    topic = topic_name,
975                    has_video = bool_to_str(has_video),
976                );
977            }
978            message => {
979                metric!(
980                    counter(RelayCounters::ProcessingMessageProduced) += 1,
981                    event_type = message.variant(),
982                    topic = topic_name,
983                );
984            }
985        }
986
987        Ok(())
988    }
989
990    fn chunked_attachment_from_placeholder(
991        &self,
992        item: &Item,
993        retention_days: u16,
994    ) -> Result<ChunkedAttachment, StoreError> {
995        debug_assert!(
996            item.stored_key().is_none(),
997            "AttachmentRef should not have been uploaded to objectstore"
998        );
999
1000        let payload = item.payload();
1001        let placeholder: AttachmentPlaceholder<'_> =
1002            serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?;
1003        let location = SignedLocation::try_from_str(placeholder.location)
1004            .ok_or(StoreError::InvalidAttachmentRef)?
1005            .verify(Utc::now(), &self.config)
1006            .map_err(|_| StoreError::InvalidAttachmentRef)?;
1007
1008        let store_key = location.key;
1009
1010        Ok(ChunkedAttachment {
1011            id: Uuid::new_v4().to_string(),
1012            name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(),
1013            rate_limited: item.rate_limited(),
1014            content_type: placeholder.content_type.map(|c| c.as_str().to_owned()),
1015            attachment_type: item.attachment_type().unwrap_or_default(),
1016            size: item.attachment_body_size(),
1017            retention_days,
1018            payload: AttachmentPayload::Stored(store_key),
1019        })
1020    }
1021
1022    fn chunked_attachment_from_attachment(
1023        &self,
1024        event_id: EventId,
1025        project_id: ProjectId,
1026        org_id: OrganizationId,
1027        item: &Item,
1028        send_individual_attachments: bool,
1029        retention_days: u16,
1030    ) -> Result<ChunkedAttachment, StoreError> {
1031        let id = Uuid::new_v4().to_string();
1032
1033        let payload = item.payload();
1034        let size = item.len();
1035        let max_chunk_size = self.config.attachment_chunk_size();
1036
1037        let payload = if size == 0 {
1038            AttachmentPayload::Chunked(0)
1039        } else if let Some(stored_key) = item.stored_key() {
1040            AttachmentPayload::Stored(stored_key.into())
1041        } else if send_individual_attachments && size < max_chunk_size {
1042            // When sending individual attachments, and we have a single chunk, we want to send the
1043            // `data` inline in the `attachment` message.
1044            // This avoids a needless roundtrip through the attachments cache on the Sentry side.
1045            AttachmentPayload::Inline(payload)
1046        } else {
1047            let mut chunk_index = 0;
1048            let mut offset = 0;
1049            // This skips chunks for empty attachments. The consumer does not require chunks for
1050            // empty attachments. `chunks` will be `0` in this case.
1051            while offset < size {
1052                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
1053                let chunk_message = AttachmentChunkKafkaMessage {
1054                    payload: payload.slice(offset..offset + chunk_size),
1055                    event_id,
1056                    project_id,
1057                    id: id.clone(),
1058                    chunk_index,
1059                    org_id,
1060                };
1061
1062                self.produce(
1063                    KafkaTopic::Attachments,
1064                    KafkaMessage::AttachmentChunk(chunk_message),
1065                )?;
1066                offset += chunk_size;
1067                chunk_index += 1;
1068            }
1069
1070            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
1071            // is one larger than the last chunk, so it is equal to the number of chunks.
1072            AttachmentPayload::Chunked(chunk_index)
1073        };
1074
1075        Ok(ChunkedAttachment {
1076            id,
1077            name: match item.filename() {
1078                Some(name) => name.to_owned(),
1079                None => UNNAMED_ATTACHMENT.to_owned(),
1080            },
1081            rate_limited: item.rate_limited(),
1082            content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
1083            attachment_type: item.attachment_type().unwrap_or_default(),
1084            size,
1085            retention_days,
1086            payload,
1087        })
1088    }
1089
1090    /// Produces Kafka messages for the content and metadata of an attachment item.
1091    ///
1092    /// The `send_individual_attachments` controls whether the metadata of an attachment
1093    /// is produced directly as an individual `attachment` message, or returned from this function
1094    /// to be later sent as part of an `event` message.
1095    ///
1096    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
1097    /// unless the `send_individual_attachments` flag is set, and the content is small enough
1098    /// to fit inside a message.
1099    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
1100    /// of the `attachment` message instead.
1101    fn produce_attachment(
1102        &self,
1103        event_id: EventId,
1104        project_id: ProjectId,
1105        org_id: OrganizationId,
1106        item: &Item,
1107        send_individual_attachments: bool,
1108        retention_days: u16,
1109    ) -> Result<Option<ChunkedAttachment>, StoreError> {
1110        let attachment = if item.is_attachment_ref() {
1111            self.chunked_attachment_from_placeholder(item, retention_days)
1112        } else {
1113            self.chunked_attachment_from_attachment(
1114                event_id,
1115                project_id,
1116                org_id,
1117                item,
1118                send_individual_attachments,
1119                retention_days,
1120            )
1121        }?;
1122
1123        if send_individual_attachments {
1124            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
1125                event_id,
1126                project_id,
1127                attachment,
1128                org_id,
1129            });
1130            self.produce(KafkaTopic::Attachments, message)?;
1131            Ok(None)
1132        } else {
1133            Ok(Some(attachment))
1134        }
1135    }
1136
1137    fn produce_user_report(
1138        &self,
1139        event_id: EventId,
1140        project_id: ProjectId,
1141        org_id: OrganizationId,
1142        received_at: DateTime<Utc>,
1143        item: &Item,
1144    ) -> Result<(), StoreError> {
1145        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
1146            project_id,
1147            event_id,
1148            start_time: safe_timestamp(received_at),
1149            payload: item.payload(),
1150            org_id,
1151        });
1152
1153        self.produce(KafkaTopic::Attachments, message)
1154    }
1155
1156    fn produce_user_report_v2(
1157        &self,
1158        event_id: EventId,
1159        project_id: ProjectId,
1160        org_id: OrganizationId,
1161        received_at: DateTime<Utc>,
1162        item: &Item,
1163        remote_addr: Option<String>,
1164    ) -> Result<(), StoreError> {
1165        let message = KafkaMessage::Event(EventKafkaMessage {
1166            project_id,
1167            event_id,
1168            payload: item.payload(),
1169            start_time: safe_timestamp(received_at),
1170            remote_addr,
1171            attachments: vec![],
1172            org_id,
1173        });
1174        self.produce(KafkaTopic::Feedback, message)
1175    }
1176
1177    fn send_metric_message(
1178        &self,
1179        namespace: MetricNamespace,
1180        message: MetricKafkaMessage,
1181    ) -> Result<(), StoreError> {
1182        let topic = match namespace {
1183            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1184            MetricNamespace::Unsupported => {
1185                relay_log::with_scope(
1186                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
1187                    || relay_log::error!("store service dropping unknown metric usecase"),
1188                );
1189                return Ok(());
1190            }
1191            _ => KafkaTopic::MetricsGeneric,
1192        };
1193
1194        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1195        self.produce(topic, KafkaMessage::Metric { headers, message })?;
1196        Ok(())
1197    }
1198
1199    fn produce_profile(
1200        &self,
1201        organization_id: OrganizationId,
1202        project_id: ProjectId,
1203        key_id: Option<u64>,
1204        received_at: DateTime<Utc>,
1205        retention_days: u16,
1206        item: &Item,
1207    ) -> Result<(), StoreError> {
1208        let message = ProfileKafkaMessage {
1209            organization_id,
1210            project_id,
1211            key_id,
1212            received: safe_timestamp(received_at),
1213            retention_days,
1214            headers: BTreeMap::from([
1215                (
1216                    "sampled".to_owned(),
1217                    if item.sampled() { "true" } else { "false" }.to_owned(),
1218                ),
1219                ("project_id".to_owned(), project_id.to_string()),
1220            ]),
1221            payload: item.payload(),
1222        };
1223        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1224        Ok(())
1225    }
1226
1227    fn produce_check_in(
1228        &self,
1229        project_id: ProjectId,
1230        org_id: OrganizationId,
1231        received_at: DateTime<Utc>,
1232        client: Option<&str>,
1233        retention_days: u16,
1234        item: &Item,
1235    ) -> Result<(), StoreError> {
1236        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1237            message_type: CheckInMessageType::CheckIn,
1238            project_id,
1239            retention_days,
1240            start_time: safe_timestamp(received_at),
1241            sdk: client.map(str::to_owned),
1242            payload: item.payload(),
1243            routing_key_hint: item.routing_hint(),
1244            org_id,
1245        });
1246
1247        self.produce(KafkaTopic::Monitors, message)?;
1248
1249        Ok(())
1250    }
1251
1252    fn produce_span(
1253        &self,
1254        scoping: Scoping,
1255        received_at: DateTime<Utc>,
1256        event_id: Option<EventId>,
1257        retention_days: u16,
1258        downsampled_retention_days: u16,
1259        item: &Item,
1260    ) -> Result<(), StoreError> {
1261        debug_assert_eq!(item.ty(), &ItemType::Span);
1262        debug_assert_eq!(item.content_type(), Some(ContentType::Json));
1263
1264        let Scoping {
1265            organization_id,
1266            project_id,
1267            project_key: _,
1268            key_id,
1269        } = scoping;
1270
1271        let relay_emits_accepted_outcome = !utils::is_rolled_out(
1272            scoping.organization_id.value(),
1273            self.global_config
1274                .current()
1275                .options
1276                .eap_span_outcomes_rollout_rate,
1277        )
1278        .is_keep();
1279
1280        let payload = item.payload();
1281        let message = SpanKafkaMessageRaw {
1282            meta: SpanMeta {
1283                organization_id,
1284                project_id,
1285                key_id,
1286                event_id,
1287                retention_days,
1288                downsampled_retention_days,
1289                received: datetime_to_timestamp(received_at),
1290                accepted_outcome_emitted: relay_emits_accepted_outcome,
1291            },
1292            span: serde_json::from_slice(&payload)
1293                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1294        };
1295
1296        // Verify that this is a V2 span:
1297        debug_assert!(message.span.contains_key("attributes"));
1298        relay_statsd::metric!(
1299            counter(RelayCounters::SpanV2Produced) += 1,
1300            via = "envelope"
1301        );
1302
1303        self.produce(
1304            KafkaTopic::Spans,
1305            KafkaMessage::SpanRaw {
1306                routing_key: item.routing_hint(),
1307                headers: BTreeMap::from([(
1308                    "project_id".to_owned(),
1309                    scoping.project_id.to_string(),
1310                )]),
1311                message,
1312                org_id: organization_id,
1313            },
1314        )?;
1315
1316        if relay_emits_accepted_outcome {
1317            // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1318            // or the segments consumer, depending on which will produce outcomes later.
1319            self.outcome_aggregator.send(TrackOutcome {
1320                category: DataCategory::SpanIndexed,
1321                event_id: None,
1322                outcome: Outcome::Accepted,
1323                quantity: 1,
1324                remote_addr: None,
1325                scoping,
1326                timestamp: received_at,
1327            });
1328        }
1329
1330        Ok(())
1331    }
1332}
1333
1334impl Service for StoreService {
1335    type Interface = Store;
1336
1337    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1338        let this = Arc::new(self);
1339
1340        relay_log::info!("store forwarder started");
1341
1342        while let Some(message) = rx.recv().await {
1343            let service = Arc::clone(&this);
1344            // For now, we run each task synchronously, in the future we might explore how to make
1345            // the store async.
1346            this.pool
1347                .spawn_async(async move { service.handle_message(message) }.boxed())
1348                .await;
1349        }
1350
1351        relay_log::info!("store forwarder stopped");
1352    }
1353}
1354
1355/// This signifies how the attachment payload is being transfered.
1356#[derive(Debug, Serialize)]
1357enum AttachmentPayload {
1358    /// The payload has been split into multiple chunks.
1359    ///
1360    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1361    /// If the payload `size == 0`, the number of chunks will also be `0`.
1362    #[serde(rename = "chunks")]
1363    Chunked(usize),
1364
1365    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1366    #[serde(rename = "data")]
1367    Inline(Bytes),
1368
1369    /// The attachment has already been stored into the objectstore, with the given Id.
1370    #[serde(rename = "stored_id")]
1371    Stored(String),
1372}
1373
1374/// Common attributes for both standalone attachments and processing-relevant attachments.
1375#[derive(Debug, Serialize)]
1376struct ChunkedAttachment {
1377    /// The attachment ID within the event.
1378    ///
1379    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1380    id: String,
1381
1382    /// File name of the attachment file.
1383    name: String,
1384
1385    /// Whether this attachment was rate limited and should be removed after processing.
1386    ///
1387    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1388    /// native crash reports still need to be retained. These attachments are marked with the
1389    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1390    /// not be persisted after processing.
1391    rate_limited: bool,
1392
1393    /// Content type of the attachment payload.
1394    #[serde(skip_serializing_if = "Option::is_none")]
1395    content_type: Option<String>,
1396
1397    /// The Sentry-internal attachment type used in the processing pipeline.
1398    #[serde(serialize_with = "serialize_attachment_type")]
1399    attachment_type: AttachmentType,
1400
1401    /// The size of the attachment in bytes.
1402    size: usize,
1403
1404    /// The retention in days for this attachment.
1405    retention_days: u16,
1406
1407    /// The attachment payload, chunked, inlined, or already stored.
1408    #[serde(flatten)]
1409    payload: AttachmentPayload,
1410}
1411
1412/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1413///
1414/// Cannot serialize bytes.
1415///
1416/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1417fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1418where
1419    S: serde::Serializer,
1420    T: serde::Serialize,
1421{
1422    serde_json::to_value(t)
1423        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1424        .serialize(serializer)
1425}
1426
1427/// Container payload for event messages.
1428#[derive(Debug, Serialize)]
1429struct EventKafkaMessage {
1430    /// Raw event payload.
1431    payload: Bytes,
1432    /// Time at which the event was received by Relay.
1433    start_time: u64,
1434    /// The event id.
1435    event_id: EventId,
1436    /// The project id for the current event.
1437    project_id: ProjectId,
1438    /// The client ip address.
1439    remote_addr: Option<String>,
1440    /// Attachments that are potentially relevant for processing.
1441    attachments: Vec<ChunkedAttachment>,
1442
1443    /// Used for [`KafkaMessage::key`]
1444    #[serde(skip)]
1445    org_id: OrganizationId,
1446}
1447
1448/// Container payload for chunks of attachments.
1449#[derive(Debug, Serialize)]
1450struct AttachmentChunkKafkaMessage {
1451    /// Chunk payload of the attachment.
1452    payload: Bytes,
1453    /// The event id.
1454    event_id: EventId,
1455    /// The project id for the current event.
1456    project_id: ProjectId,
1457    /// The attachment ID within the event.
1458    ///
1459    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1460    id: String,
1461    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1462    chunk_index: usize,
1463
1464    /// Used for [`KafkaMessage::key`]
1465    #[serde(skip)]
1466    org_id: OrganizationId,
1467}
1468
1469/// A "standalone" attachment.
1470///
1471/// Still belongs to an event but can be sent independently (like UserReport) and is not
1472/// considered in processing.
1473#[derive(Debug, Serialize)]
1474struct AttachmentKafkaMessage {
1475    /// The event id.
1476    event_id: EventId,
1477    /// The project id for the current event.
1478    project_id: ProjectId,
1479    /// The attachment.
1480    attachment: ChunkedAttachment,
1481
1482    /// Used for [`KafkaMessage::key`]
1483    #[serde(skip)]
1484    org_id: OrganizationId,
1485}
1486
1487#[derive(Debug, Serialize)]
1488struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1489    replay_id: EventId,
1490    key_id: Option<u64>,
1491    org_id: OrganizationId,
1492    project_id: ProjectId,
1493    received: u64,
1494    retention_days: u16,
1495    #[serde(with = "serde_bytes")]
1496    payload: &'a [u8],
1497    #[serde(with = "serde_bytes")]
1498    replay_event: Option<&'a [u8]>,
1499    #[serde(with = "serde_bytes")]
1500    replay_video: Option<&'a [u8]>,
1501    relay_snuba_publish_disabled: bool,
1502}
1503
1504/// User report for an event wrapped up in a message ready for consumption in Kafka.
1505///
1506/// Is always independent of an event and can be sent as part of any envelope.
1507#[derive(Debug, Serialize)]
1508struct UserReportKafkaMessage {
1509    /// The project id for the current event.
1510    project_id: ProjectId,
1511    start_time: u64,
1512    payload: Bytes,
1513
1514    /// Used for [`KafkaMessage::key`]
1515    #[serde(skip)]
1516    event_id: EventId,
1517    /// Used for [`KafkaMessage::key`]
1518    #[serde(skip)]
1519    org_id: OrganizationId,
1520}
1521
1522#[derive(Clone, Debug, Serialize)]
1523struct MetricKafkaMessage<'a> {
1524    org_id: OrganizationId,
1525    project_id: ProjectId,
1526    name: &'a MetricName,
1527    #[serde(flatten)]
1528    value: MetricValue<'a>,
1529    timestamp: UnixTimestamp,
1530    tags: &'a BTreeMap<String, String>,
1531    retention_days: u16,
1532    #[serde(skip_serializing_if = "Option::is_none")]
1533    received_at: Option<UnixTimestamp>,
1534}
1535
1536#[derive(Clone, Debug, Serialize)]
1537#[serde(tag = "type", content = "value")]
1538enum MetricValue<'a> {
1539    #[serde(rename = "c")]
1540    Counter(FiniteF64),
1541    #[serde(rename = "d")]
1542    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1543    #[serde(rename = "s")]
1544    Set(ArrayEncoding<'a, SetView<'a>>),
1545    #[serde(rename = "g")]
1546    Gauge(GaugeValue),
1547}
1548
1549impl MetricValue<'_> {
1550    fn variant(&self) -> &'static str {
1551        match self {
1552            Self::Counter(_) => "counter",
1553            Self::Distribution(_) => "distribution",
1554            Self::Set(_) => "set",
1555            Self::Gauge(_) => "gauge",
1556        }
1557    }
1558
1559    fn encoding(&self) -> Option<&'static str> {
1560        match self {
1561            Self::Distribution(ae) => Some(ae.name()),
1562            Self::Set(ae) => Some(ae.name()),
1563            _ => None,
1564        }
1565    }
1566}
1567
1568#[derive(Clone, Debug, Serialize)]
1569struct ProfileKafkaMessage {
1570    organization_id: OrganizationId,
1571    project_id: ProjectId,
1572    key_id: Option<u64>,
1573    received: u64,
1574    retention_days: u16,
1575    #[serde(skip)]
1576    headers: BTreeMap<String, String>,
1577    payload: Bytes,
1578}
1579
1580/// Used to discriminate cron monitor ingestion messages.
1581///
1582/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1583/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1584/// intended to ensure the clock continues to run even when ingestion volume drops.
1585#[allow(dead_code)]
1586#[derive(Debug, Serialize)]
1587#[serde(rename_all = "snake_case")]
1588enum CheckInMessageType {
1589    ClockPulse,
1590    CheckIn,
1591}
1592
1593#[derive(Debug, Serialize)]
1594struct CheckInKafkaMessage {
1595    /// Used by the consumer to discrinminate the message.
1596    message_type: CheckInMessageType,
1597    /// Raw event payload.
1598    payload: Bytes,
1599    /// Time at which the event was received by Relay.
1600    start_time: u64,
1601    /// The SDK client which produced the event.
1602    sdk: Option<String>,
1603    /// The project id for the current event.
1604    project_id: ProjectId,
1605    /// Number of days to retain.
1606    retention_days: u16,
1607
1608    /// Used for [`KafkaMessage::key`]
1609    #[serde(skip)]
1610    routing_key_hint: Option<Uuid>,
1611    /// Used for [`KafkaMessage::key`]
1612    #[serde(skip)]
1613    org_id: OrganizationId,
1614}
1615
1616#[derive(Debug, Serialize)]
1617struct SpanKafkaMessageRaw<'a> {
1618    #[serde(flatten)]
1619    meta: SpanMeta,
1620    #[serde(flatten)]
1621    span: BTreeMap<&'a str, &'a RawValue>,
1622}
1623
1624#[derive(Debug, Serialize)]
1625struct SpanKafkaMessage<'a> {
1626    #[serde(flatten)]
1627    meta: SpanMeta,
1628    #[serde(flatten)]
1629    span: SerializableAnnotated<'a, SpanV2>,
1630}
1631
1632#[derive(Debug, Serialize)]
1633struct SpanMeta {
1634    organization_id: OrganizationId,
1635    project_id: ProjectId,
1636    // Required for the buffer to emit outcomes scoped to the DSN.
1637    #[serde(skip_serializing_if = "Option::is_none")]
1638    key_id: Option<u64>,
1639    #[serde(skip_serializing_if = "Option::is_none")]
1640    event_id: Option<EventId>,
1641    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1642    received: f64,
1643    /// Number of days until these data should be deleted.
1644    retention_days: u16,
1645    /// Number of days until the downsampled version of this data should be deleted.
1646    downsampled_retention_days: u16,
1647    /// Indicates whether Relay already emitted an accepted outcome or if EAP still needs to emit it.
1648    accepted_outcome_emitted: bool,
1649}
1650
1651#[derive(Clone, Debug, Serialize)]
1652struct ProfileChunkKafkaMessage {
1653    organization_id: OrganizationId,
1654    project_id: ProjectId,
1655    received: u64,
1656    retention_days: u16,
1657    #[serde(skip)]
1658    headers: BTreeMap<String, String>,
1659    payload: Bytes,
1660}
1661
1662/// An enum over all possible ingest messages.
1663#[derive(Debug, Serialize)]
1664#[serde(tag = "type", rename_all = "snake_case")]
1665#[allow(clippy::large_enum_variant)]
1666enum KafkaMessage<'a> {
1667    Event(EventKafkaMessage),
1668    UserReport(UserReportKafkaMessage),
1669    Metric {
1670        #[serde(skip)]
1671        headers: BTreeMap<String, String>,
1672        #[serde(flatten)]
1673        message: MetricKafkaMessage<'a>,
1674    },
1675    CheckIn(CheckInKafkaMessage),
1676    Item {
1677        #[serde(skip)]
1678        headers: BTreeMap<String, String>,
1679        #[serde(skip)]
1680        item_type: TraceItemType,
1681        #[serde(skip)]
1682        message: TraceItem,
1683    },
1684    SpanRaw {
1685        #[serde(skip)]
1686        routing_key: Option<Uuid>,
1687        #[serde(skip)]
1688        headers: BTreeMap<String, String>,
1689        #[serde(flatten)]
1690        message: SpanKafkaMessageRaw<'a>,
1691
1692        /// Used for [`KafkaMessage::key`]
1693        #[serde(skip)]
1694        org_id: OrganizationId,
1695    },
1696    SpanV2 {
1697        #[serde(skip)]
1698        routing_key: Option<Uuid>,
1699        #[serde(skip)]
1700        headers: BTreeMap<String, String>,
1701        #[serde(flatten)]
1702        message: SpanKafkaMessage<'a>,
1703
1704        /// Used for [`KafkaMessage::key`]
1705        #[serde(skip)]
1706        org_id: OrganizationId,
1707    },
1708
1709    Attachment(AttachmentKafkaMessage),
1710    AttachmentChunk(AttachmentChunkKafkaMessage),
1711
1712    Profile(ProfileKafkaMessage),
1713    ProfileChunk(ProfileChunkKafkaMessage),
1714
1715    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1716}
1717
1718impl KafkaMessage<'_> {
1719    /// Creates a [`KafkaMessage`] for a [`TraceItem`].
1720    fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1721        let item_type = item.item_type();
1722        KafkaMessage::Item {
1723            headers: BTreeMap::from([
1724                ("project_id".to_owned(), scoping.project_id.to_string()),
1725                ("item_type".to_owned(), (item_type as i32).to_string()),
1726            ]),
1727            message: item,
1728            item_type,
1729        }
1730    }
1731}
1732
1733impl Message for KafkaMessage<'_> {
1734    fn variant(&self) -> &'static str {
1735        match self {
1736            KafkaMessage::Event(_) => "event",
1737            KafkaMessage::UserReport(_) => "user_report",
1738            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1739                MetricNamespace::Sessions => "metric_sessions",
1740                MetricNamespace::Transactions => "metric_transactions",
1741                MetricNamespace::Spans => "metric_spans",
1742                MetricNamespace::Custom => "metric_custom",
1743                MetricNamespace::Unsupported => "metric_unsupported",
1744            },
1745            KafkaMessage::CheckIn(_) => "check_in",
1746            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1747            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1748
1749            KafkaMessage::Attachment(_) => "attachment",
1750            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1751
1752            KafkaMessage::Profile(_) => "profile",
1753            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1754
1755            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1756        }
1757    }
1758
1759    /// Returns the partitioning key for this Kafka message determining.
1760    fn key(&self) -> Option<relay_kafka::Key> {
1761        match self {
1762            Self::Event(message) => Some((message.event_id.0, message.org_id)),
1763            Self::UserReport(message) => Some((message.event_id.0, message.org_id)),
1764            Self::SpanRaw {
1765                routing_key,
1766                org_id,
1767                ..
1768            }
1769            | Self::SpanV2 {
1770                routing_key,
1771                org_id,
1772                ..
1773            } => routing_key.map(|r| (r, *org_id)),
1774
1775            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1776            //
1777            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1778            // recieve the routing_key_hint form their envelopes.
1779            Self::CheckIn(message) => message.routing_key_hint.map(|r| (r, message.org_id)),
1780
1781            Self::Attachment(message) => Some((message.event_id.0, message.org_id)),
1782            Self::AttachmentChunk(message) => Some((message.event_id.0, message.org_id)),
1783
1784            // Random partitioning
1785            Self::Metric { .. }
1786            | Self::Item { .. }
1787            | Self::Profile(_)
1788            | Self::ProfileChunk(_)
1789            | Self::ReplayRecordingNotChunked(_) => None,
1790        }
1791        .filter(|(uuid, _)| !uuid.is_nil())
1792        .map(|(uuid, org_id)| {
1793            // mix key with org id for better paritioning in case of incidents where messages
1794            // across orgs get the same id
1795            let mut res = uuid.into_bytes();
1796            for (i, &b) in org_id.value().to_be_bytes().iter().enumerate() {
1797                res[i] ^= b;
1798            }
1799            u128::from_be_bytes(res)
1800        })
1801    }
1802
1803    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1804        match &self {
1805            KafkaMessage::Metric { headers, .. }
1806            | KafkaMessage::SpanRaw { headers, .. }
1807            | KafkaMessage::SpanV2 { headers, .. }
1808            | KafkaMessage::Item { headers, .. }
1809            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1810            | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1811
1812            KafkaMessage::Event(_)
1813            | KafkaMessage::UserReport(_)
1814            | KafkaMessage::CheckIn(_)
1815            | KafkaMessage::Attachment(_)
1816            | KafkaMessage::AttachmentChunk(_)
1817            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1818        }
1819    }
1820
1821    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1822        match self {
1823            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1824            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1825            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1826            KafkaMessage::Item { message, .. } => {
1827                let mut payload = Vec::new();
1828                match message.encode(&mut payload) {
1829                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1830                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1831                }
1832            }
1833            KafkaMessage::Event(_)
1834            | KafkaMessage::UserReport(_)
1835            | KafkaMessage::CheckIn(_)
1836            | KafkaMessage::Attachment(_)
1837            | KafkaMessage::AttachmentChunk(_)
1838            | KafkaMessage::Profile(_)
1839            | KafkaMessage::ProfileChunk(_)
1840            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1841                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1842                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1843            },
1844        }
1845    }
1846}
1847
1848fn serialize_as_json<T: serde::Serialize>(
1849    value: &T,
1850) -> Result<SerializationOutput<'_>, ClientError> {
1851    match serde_json::to_vec(value) {
1852        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1853        Err(err) => Err(ClientError::InvalidJson(err)),
1854    }
1855}
1856
1857/// Determines if the given item is considered slow.
1858///
1859/// Slow items must be routed to the `Attachments` topic.
1860fn is_slow_item(item: &Item) -> bool {
1861    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1862}
1863
1864fn bool_to_str(value: bool) -> &'static str {
1865    if value { "true" } else { "false" }
1866}
1867
1868/// Returns a safe timestamp for Kafka.
1869///
1870/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1871fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1872    let ts = timestamp.timestamp();
1873    if ts >= 0 {
1874        return ts as u64;
1875    }
1876
1877    // We assume this call can't return < 0.
1878    Utc::now().timestamp() as u64
1879}
1880
1881#[cfg(test)]
1882mod tests {
1883
1884    use super::*;
1885
1886    #[test]
1887    fn disallow_outcomes() {
1888        let config = Config::default();
1889        let producer = Producer::create(&config).unwrap();
1890
1891        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1892            let res = producer
1893                .client
1894                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1895
1896            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1897        }
1898    }
1899}