relay_server/services/
store.rs

1//! This module contains the service that forwards events and attachments to the Sentry store.
2//! The service uses Kafka topics to forward data to Sentry
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27    Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28    MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
43use crate::utils::{self, FormDataIter};
44
45mod sessions;
46
47/// Fallback name used for attachment items without a `filename` header.
48const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
49
50#[derive(Debug, thiserror::Error)]
51pub enum StoreError {
52    #[error("failed to send the message to kafka: {0}")]
53    SendFailed(#[from] ClientError),
54    #[error("failed to encode data: {0}")]
55    EncodingFailed(std::io::Error),
56    #[error("failed to store event because event id was missing")]
57    NoEventId,
58}
59
60impl OutcomeError for StoreError {
61    type Error = Self;
62
63    fn consume(self) -> (Option<Outcome>, Self::Error) {
64        (Some(Outcome::Invalid(DiscardReason::Internal)), self)
65    }
66}
67
68struct Producer {
69    client: KafkaClient,
70}
71
72impl Producer {
73    pub fn create(config: &Config) -> anyhow::Result<Self> {
74        let mut client_builder = KafkaClient::builder();
75
76        for topic in KafkaTopic::iter().filter(|t| {
77            // Outcomes should not be sent from the store forwarder.
78            // See `KafkaOutcomesProducer`.
79            **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
80        }) {
81            let kafka_configs = config.kafka_configs(*topic)?;
82            client_builder = client_builder
83                .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
84                .map_err(|e| ServiceError::Kafka(e.to_string()))?;
85        }
86
87        Ok(Self {
88            client: client_builder.build(),
89        })
90    }
91}
92
93/// Publishes an [`Envelope`](crate::envelope::Envelope) to the Sentry core application through Kafka topics.
94#[derive(Debug)]
95pub struct StoreEnvelope {
96    pub envelope: ManagedEnvelope,
97}
98
99/// Publishes a list of [`Bucket`]s to the Sentry core application through Kafka topics.
100#[derive(Clone, Debug)]
101pub struct StoreMetrics {
102    pub buckets: Vec<Bucket>,
103    pub scoping: Scoping,
104    pub retention: u16,
105}
106
107/// Publishes a log item to the Sentry core application through Kafka.
108#[derive(Debug)]
109pub struct StoreTraceItem {
110    /// The final trace item which will be produced to Kafka.
111    pub trace_item: TraceItem,
112}
113
114impl Counted for StoreTraceItem {
115    fn quantities(&self) -> Quantities {
116        self.trace_item.quantities()
117    }
118}
119
120/// Publishes a span item to the Sentry core application through Kafka.
121#[derive(Debug)]
122pub struct StoreSpanV2 {
123    /// Routing key to assign a Kafka partition.
124    pub routing_key: Option<Uuid>,
125    /// Default retention of the span.
126    pub retention_days: u16,
127    /// Downsampled retention of the span.
128    pub downsampled_retention_days: u16,
129    /// The final Sentry compatible span item.
130    pub item: SpanV2,
131}
132
133impl Counted for StoreSpanV2 {
134    fn quantities(&self) -> Quantities {
135        smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
136    }
137}
138
139/// Publishes a singular profile chunk to Kafka.
140#[derive(Debug)]
141pub struct StoreProfileChunk {
142    /// Default retention of the span.
143    pub retention_days: u16,
144    /// The serialized profile chunk payload.
145    pub payload: Bytes,
146    /// Outcome quantities associated with this profile.
147    ///
148    /// Quantities are different for backend and ui profile chunks.
149    pub quantities: Quantities,
150}
151
152impl Counted for StoreProfileChunk {
153    fn quantities(&self) -> Quantities {
154        self.quantities.clone()
155    }
156}
157
158/// A replay to be stored to Kafka.
159#[derive(Debug)]
160pub struct StoreReplay {
161    /// The event ID.
162    pub event_id: EventId,
163    /// Number of days to retain.
164    pub retention_days: u16,
165    /// The recording payload (rrweb data).
166    pub recording: Bytes,
167    /// Optional replay event payload (JSON).
168    pub event: Option<Bytes>,
169    /// Optional replay video.
170    pub video: Option<Bytes>,
171    /// Outcome quantities associated with this replay.
172    ///
173    /// Quantities are different for web and native replays.
174    pub quantities: Quantities,
175}
176
177impl Counted for StoreReplay {
178    fn quantities(&self) -> Quantities {
179        self.quantities.clone()
180    }
181}
182
183/// An attachment to be stored to Kafka.
184#[derive(Debug)]
185pub struct StoreAttachment {
186    /// The event ID.
187    pub event_id: EventId,
188    /// That attachment item.
189    pub attachment: Item,
190    /// Outcome quantities associated with this attachment.
191    pub quantities: Quantities,
192}
193
194impl Counted for StoreAttachment {
195    fn quantities(&self) -> Quantities {
196        self.quantities.clone()
197    }
198}
199
200/// A user report to be stored to Kafka.
201#[derive(Debug)]
202pub struct StoreUserReport {
203    /// The event ID.
204    pub event_id: EventId,
205    /// The user report.
206    pub report: Item,
207}
208
209impl Counted for StoreUserReport {
210    fn quantities(&self) -> Quantities {
211        smallvec::smallvec![(DataCategory::UserReportV2, 1)]
212    }
213}
214
215/// A profile to be stored to Kafka.
216#[derive(Debug)]
217pub struct StoreProfile {
218    /// Number of days to retain.
219    pub retention_days: u16,
220    /// The profile.
221    pub profile: Item,
222    /// Outcome quantities associated with this profile.
223    pub quantities: Quantities,
224}
225
226impl Counted for StoreProfile {
227    fn quantities(&self) -> Quantities {
228        self.quantities.clone()
229    }
230}
231
232/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
233pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
234
235/// Service interface for the [`StoreEnvelope`] message.
236#[derive(Debug)]
237pub enum Store {
238    /// An envelope containing a mixture of items.
239    ///
240    /// Note: Some envelope items are not supported to be submitted at all or through an envelope,
241    /// for example logs must be submitted via [`Self::TraceItem`] instead.
242    ///
243    /// Long term this variant is going to be replaced with fully typed variants of items which can
244    /// be stored instead.
245    Envelope(StoreEnvelope),
246    /// Aggregated generic metrics.
247    Metrics(StoreMetrics),
248    /// A singular [`TraceItem`].
249    TraceItem(Managed<StoreTraceItem>),
250    /// A singular Span.
251    Span(Managed<Box<StoreSpanV2>>),
252    /// A singular profile chunk.
253    ProfileChunk(Managed<StoreProfileChunk>),
254    /// A singular replay.
255    Replay(Managed<StoreReplay>),
256    /// A singular attachment.
257    Attachment(Managed<StoreAttachment>),
258    /// A singular user report.
259    UserReport(Managed<StoreUserReport>),
260    /// A single profile.
261    Profile(Managed<StoreProfile>),
262}
263
264impl Store {
265    /// Returns the name of the message variant.
266    fn variant(&self) -> &'static str {
267        match self {
268            Store::Envelope(_) => "envelope",
269            Store::Metrics(_) => "metrics",
270            Store::TraceItem(_) => "trace_item",
271            Store::Span(_) => "span",
272            Store::ProfileChunk(_) => "profile_chunk",
273            Store::Replay(_) => "replay",
274            Store::Attachment(_) => "attachment",
275            Store::UserReport(_) => "user_report",
276            Store::Profile(_) => "profile",
277        }
278    }
279}
280
281impl Interface for Store {}
282
283impl FromMessage<StoreEnvelope> for Store {
284    type Response = NoResponse;
285
286    fn from_message(message: StoreEnvelope, _: ()) -> Self {
287        Self::Envelope(message)
288    }
289}
290
291impl FromMessage<StoreMetrics> for Store {
292    type Response = NoResponse;
293
294    fn from_message(message: StoreMetrics, _: ()) -> Self {
295        Self::Metrics(message)
296    }
297}
298
299impl FromMessage<Managed<StoreTraceItem>> for Store {
300    type Response = NoResponse;
301
302    fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
303        Self::TraceItem(message)
304    }
305}
306
307impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
308    type Response = NoResponse;
309
310    fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
311        Self::Span(message)
312    }
313}
314
315impl FromMessage<Managed<StoreProfileChunk>> for Store {
316    type Response = NoResponse;
317
318    fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
319        Self::ProfileChunk(message)
320    }
321}
322
323impl FromMessage<Managed<StoreReplay>> for Store {
324    type Response = NoResponse;
325
326    fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
327        Self::Replay(message)
328    }
329}
330
331impl FromMessage<Managed<StoreAttachment>> for Store {
332    type Response = NoResponse;
333
334    fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
335        Self::Attachment(message)
336    }
337}
338
339impl FromMessage<Managed<StoreUserReport>> for Store {
340    type Response = NoResponse;
341
342    fn from_message(message: Managed<StoreUserReport>, _: ()) -> Self {
343        Self::UserReport(message)
344    }
345}
346
347impl FromMessage<Managed<StoreProfile>> for Store {
348    type Response = NoResponse;
349
350    fn from_message(message: Managed<StoreProfile>, _: ()) -> Self {
351        Self::Profile(message)
352    }
353}
354
355/// Service implementing the [`Store`] interface.
356pub struct StoreService {
357    pool: StoreServicePool,
358    config: Arc<Config>,
359    global_config: GlobalConfigHandle,
360    outcome_aggregator: Addr<TrackOutcome>,
361    metric_outcomes: MetricOutcomes,
362    producer: Producer,
363}
364
365impl StoreService {
366    pub fn create(
367        pool: StoreServicePool,
368        config: Arc<Config>,
369        global_config: GlobalConfigHandle,
370        outcome_aggregator: Addr<TrackOutcome>,
371        metric_outcomes: MetricOutcomes,
372    ) -> anyhow::Result<Self> {
373        let producer = Producer::create(&config)?;
374        Ok(Self {
375            pool,
376            config,
377            global_config,
378            outcome_aggregator,
379            metric_outcomes,
380            producer,
381        })
382    }
383
384    fn handle_message(&self, message: Store) {
385        let ty = message.variant();
386        relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
387            match message {
388                Store::Envelope(message) => self.handle_store_envelope(message),
389                Store::Metrics(message) => self.handle_store_metrics(message),
390                Store::TraceItem(message) => self.handle_store_trace_item(message),
391                Store::Span(message) => self.handle_store_span(message),
392                Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
393                Store::Replay(message) => self.handle_store_replay(message),
394                Store::Attachment(message) => self.handle_store_attachment(message),
395                Store::UserReport(message) => self.handle_user_report(message),
396                Store::Profile(message) => self.handle_profile(message),
397            }
398        })
399    }
400
401    fn handle_store_envelope(&self, message: StoreEnvelope) {
402        let StoreEnvelope { mut envelope } = message;
403
404        let scoping = envelope.scoping();
405        match self.store_envelope(&mut envelope) {
406            Ok(()) => envelope.accept(),
407            Err(error) => {
408                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
409                relay_log::error!(
410                    error = &error as &dyn Error,
411                    tags.project_key = %scoping.project_key,
412                    "failed to store envelope"
413                );
414            }
415        }
416    }
417
418    fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
419        let mut envelope = managed_envelope.take_envelope();
420        let received_at = managed_envelope.received_at();
421        let scoping = managed_envelope.scoping();
422
423        let retention = envelope.retention();
424        let downsampled_retention = envelope.downsampled_retention();
425
426        let event_id = envelope.event_id();
427        let event_item = envelope.as_mut().take_item_by(|item| {
428            matches!(
429                item.ty(),
430                ItemType::Event | ItemType::Transaction | ItemType::Security
431            )
432        });
433        let event_type = event_item.as_ref().map(|item| item.ty());
434
435        // Some error events like minidumps need all attachment chunks to be processed _before_
436        // the event payload on the consumer side. Transaction attachments do not require this ordering
437        // guarantee, so they do not have to go to the same topic as their event payload.
438        let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
439            KafkaTopic::Transactions
440        } else if envelope.get_item_by(is_slow_item).is_some() {
441            KafkaTopic::Attachments
442        } else {
443            KafkaTopic::Events
444        };
445
446        let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
447
448        let mut attachments = Vec::new();
449
450        for item in envelope.items() {
451            let content_type = item.content_type();
452            match item.ty() {
453                ItemType::Attachment => {
454                    if let Some(attachment) = self.produce_attachment(
455                        event_id.ok_or(StoreError::NoEventId)?,
456                        scoping.project_id,
457                        scoping.organization_id,
458                        item,
459                        send_individual_attachments,
460                    )? {
461                        attachments.push(attachment);
462                    }
463                }
464                ItemType::UserReport => {
465                    debug_assert!(event_topic == KafkaTopic::Attachments);
466                    self.produce_user_report(
467                        event_id.ok_or(StoreError::NoEventId)?,
468                        scoping.project_id,
469                        scoping.organization_id,
470                        received_at,
471                        item,
472                    )?;
473                }
474                ItemType::UserReportV2 => {
475                    let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
476                    self.produce_user_report_v2(
477                        event_id.ok_or(StoreError::NoEventId)?,
478                        scoping.project_id,
479                        scoping.organization_id,
480                        received_at,
481                        item,
482                        remote_addr,
483                    )?;
484                }
485                ItemType::Profile => self.produce_profile(
486                    scoping.organization_id,
487                    scoping.project_id,
488                    scoping.key_id,
489                    received_at,
490                    retention,
491                    item,
492                )?,
493                ItemType::CheckIn => {
494                    let client = envelope.meta().client();
495                    self.produce_check_in(
496                        scoping.project_id,
497                        scoping.organization_id,
498                        received_at,
499                        client,
500                        retention,
501                        item,
502                    )?
503                }
504                ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span(
505                    scoping,
506                    received_at,
507                    event_id,
508                    retention,
509                    downsampled_retention,
510                    item,
511                )?,
512                ty @ ItemType::Log => {
513                    debug_assert!(
514                        false,
515                        "received {ty} through an envelope, \
516                        this item must be submitted via a specific store message instead"
517                    );
518                    relay_log::error!(
519                        tags.project_key = %scoping.project_key,
520                        "StoreService received unsupported item type '{ty}' in envelope"
521                    );
522                }
523                other => {
524                    let event_type = event_item.as_ref().map(|item| item.ty().as_str());
525                    let item_types = envelope
526                        .items()
527                        .map(|item| item.ty().as_str())
528                        .collect::<Vec<_>>();
529                    let attachment_types = envelope
530                        .items()
531                        .map(|item| {
532                            item.attachment_type()
533                                .map(|t| t.to_string())
534                                .unwrap_or_default()
535                        })
536                        .collect::<Vec<_>>();
537
538                    relay_log::with_scope(
539                        |scope| {
540                            scope.set_extra("item_types", item_types.into());
541                            scope.set_extra("attachment_types", attachment_types.into());
542                            if other == &ItemType::FormData {
543                                let payload = item.payload();
544                                let form_data_keys = FormDataIter::new(&payload)
545                                    .map(|entry| entry.key())
546                                    .collect::<Vec<_>>();
547                                scope.set_extra("form_data_keys", form_data_keys.into());
548                            }
549                        },
550                        || {
551                            relay_log::error!(
552                                tags.project_key = %scoping.project_key,
553                                tags.event_type = event_type.unwrap_or("none"),
554                                "StoreService received unexpected item type: {other}"
555                            )
556                        },
557                    )
558                }
559            }
560        }
561
562        if let Some(event_item) = event_item {
563            let event_id = event_id.ok_or(StoreError::NoEventId)?;
564            let project_id = scoping.project_id;
565            let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
566
567            self.produce(
568                event_topic,
569                KafkaMessage::Event(EventKafkaMessage {
570                    payload: event_item.payload(),
571                    start_time: safe_timestamp(received_at),
572                    event_id,
573                    project_id,
574                    remote_addr,
575                    attachments,
576                    org_id: scoping.organization_id,
577                }),
578            )?;
579        } else {
580            debug_assert!(attachments.is_empty());
581        }
582
583        Ok(())
584    }
585
586    fn handle_store_metrics(&self, message: StoreMetrics) {
587        let StoreMetrics {
588            buckets,
589            scoping,
590            retention,
591        } = message;
592
593        let batch_size = self.config.metrics_max_batch_size_bytes();
594        let mut error = None;
595
596        let global_config = self.global_config.current();
597        let mut encoder = BucketEncoder::new(&global_config);
598
599        let emit_sessions_to_eap = utils::is_rolled_out(
600            scoping.organization_id.value(),
601            global_config.options.sessions_eap_rollout_rate,
602        )
603        .is_keep();
604
605        let now = UnixTimestamp::now();
606        let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
607
608        for mut bucket in buckets {
609            let namespace = encoder.prepare(&mut bucket);
610
611            if let Some(received_at) = bucket.metadata.received_at {
612                let delay = now.as_secs().saturating_sub(received_at.as_secs());
613                let (total, count, max) = delay_stats.get_mut(namespace);
614                *total += delay;
615                *count += 1;
616                *max = (*max).max(delay);
617            }
618
619            // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
620            // each bucket separately, we only need to split buckets that exceed the size, but not
621            // batches.
622            for view in BucketsView::new(std::slice::from_ref(&bucket))
623                .by_size(batch_size)
624                .flatten()
625            {
626                let message = self.create_metric_message(
627                    scoping.organization_id,
628                    scoping.project_id,
629                    &mut encoder,
630                    namespace,
631                    &view,
632                    retention,
633                );
634
635                let result =
636                    message.and_then(|message| self.send_metric_message(namespace, message));
637
638                let outcome = match result {
639                    Ok(()) => Outcome::Accepted,
640                    Err(e) => {
641                        error.get_or_insert(e);
642                        Outcome::Invalid(DiscardReason::Internal)
643                    }
644                };
645
646                self.metric_outcomes.track(scoping, &[view], outcome);
647            }
648
649            if emit_sessions_to_eap
650                && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
651            {
652                let message = KafkaMessage::for_item(scoping, trace_item);
653                let _ = self.produce(KafkaTopic::Items, message);
654            }
655        }
656
657        if let Some(error) = error {
658            relay_log::error!(
659                error = &error as &dyn std::error::Error,
660                "failed to produce metric buckets: {error}"
661            );
662        }
663
664        for (namespace, (total, count, max)) in delay_stats {
665            if count == 0 {
666                continue;
667            }
668            metric!(
669                counter(RelayCounters::MetricDelaySum) += total,
670                namespace = namespace.as_str()
671            );
672            metric!(
673                counter(RelayCounters::MetricDelayCount) += count,
674                namespace = namespace.as_str()
675            );
676            metric!(
677                gauge(RelayGauges::MetricDelayMax) = max,
678                namespace = namespace.as_str()
679            );
680        }
681    }
682
683    fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
684        let scoping = message.scoping();
685        let received_at = message.received_at();
686
687        let eap_emits_outcomes = utils::is_rolled_out(
688            scoping.organization_id.value(),
689            self.global_config
690                .current()
691                .options
692                .eap_outcomes_rollout_rate,
693        )
694        .is_keep();
695
696        let outcomes = message.try_accept(|mut item| {
697            let outcomes = match eap_emits_outcomes {
698                true => None,
699                false => item.trace_item.outcomes.take(),
700            };
701
702            let message = KafkaMessage::for_item(scoping, item.trace_item);
703            self.produce(KafkaTopic::Items, message).map(|()| outcomes)
704        });
705
706        // Accepted outcomes when items have been successfully produced to rdkafka.
707        //
708        // This is only a temporary measure, long term these outcomes will be part of the trace
709        // item and emitted by Snuba to guarantee a delivery to storage.
710        if let Ok(Some(outcomes)) = outcomes {
711            for (category, quantity) in outcomes.quantities() {
712                self.outcome_aggregator.send(TrackOutcome {
713                    category,
714                    event_id: None,
715                    outcome: Outcome::Accepted,
716                    quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
717                    remote_addr: None,
718                    scoping,
719                    timestamp: received_at,
720                });
721            }
722        }
723    }
724
725    fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
726        let scoping = message.scoping();
727        let received_at = message.received_at();
728
729        let relay_emits_accepted_outcome = !utils::is_rolled_out(
730            scoping.organization_id.value(),
731            self.global_config
732                .current()
733                .options
734                .eap_span_outcomes_rollout_rate,
735        )
736        .is_keep();
737
738        let meta = SpanMeta {
739            organization_id: scoping.organization_id,
740            project_id: scoping.project_id,
741            key_id: scoping.key_id,
742            event_id: None,
743            retention_days: message.retention_days,
744            downsampled_retention_days: message.downsampled_retention_days,
745            received: datetime_to_timestamp(received_at),
746            accepted_outcome_emitted: relay_emits_accepted_outcome,
747        };
748
749        let result = message.try_accept(|span| {
750            let item = Annotated::new(span.item);
751            let message = KafkaMessage::SpanV2 {
752                routing_key: span.routing_key,
753                headers: BTreeMap::from([(
754                    "project_id".to_owned(),
755                    scoping.project_id.to_string(),
756                )]),
757                message: SpanKafkaMessage {
758                    meta,
759                    span: SerializableAnnotated(&item),
760                },
761                org_id: scoping.organization_id,
762            };
763
764            self.produce(KafkaTopic::Spans, message)
765        });
766
767        if result.is_ok() {
768            relay_statsd::metric!(
769                counter(RelayCounters::SpanV2Produced) += 1,
770                via = "processing"
771            );
772
773            if relay_emits_accepted_outcome {
774                // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
775                // or the segments consumer, depending on which will produce outcomes later.
776                self.outcome_aggregator.send(TrackOutcome {
777                    category: DataCategory::SpanIndexed,
778                    event_id: None,
779                    outcome: Outcome::Accepted,
780                    quantity: 1,
781                    remote_addr: None,
782                    scoping,
783                    timestamp: received_at,
784                });
785            }
786        }
787    }
788
789    fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
790        let scoping = message.scoping();
791        let received_at = message.received_at();
792
793        let _ = message.try_accept(|message| {
794            let message = ProfileChunkKafkaMessage {
795                organization_id: scoping.organization_id,
796                project_id: scoping.project_id,
797                received: safe_timestamp(received_at),
798                retention_days: message.retention_days,
799                headers: BTreeMap::from([(
800                    "project_id".to_owned(),
801                    scoping.project_id.to_string(),
802                )]),
803                payload: message.payload,
804            };
805
806            self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
807        });
808    }
809
810    fn handle_store_replay(&self, message: Managed<StoreReplay>) {
811        let scoping = message.scoping();
812        let received_at = message.received_at();
813
814        let _ = message.try_accept(|replay| {
815            let kafka_msg =
816                KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
817                    replay_id: replay.event_id,
818                    key_id: scoping.key_id,
819                    org_id: scoping.organization_id,
820                    project_id: scoping.project_id,
821                    received: safe_timestamp(received_at),
822                    retention_days: replay.retention_days,
823                    payload: &replay.recording,
824                    replay_event: replay.event.as_deref(),
825                    replay_video: replay.video.as_deref(),
826                    // Hardcoded to `true` to indicate to the consumer that it should always publish the
827                    // replay_event as relay no longer does it.
828                    relay_snuba_publish_disabled: true,
829                });
830            self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
831        });
832    }
833
834    fn handle_store_attachment(&self, message: Managed<StoreAttachment>) {
835        let scoping = message.scoping();
836        let _ = message.try_accept(|attachment| {
837            let result = self.produce_attachment(
838                attachment.event_id,
839                scoping.project_id,
840                scoping.organization_id,
841                &attachment.attachment,
842                // Hardcoded to `true` since standalone attachments are 'individual attachments'.
843                true,
844            );
845            // Since we are sending an 'individual attachment' this function should never return a
846            // `ChunkedAttachment`.
847            debug_assert!(!matches!(result, Ok(Some(_))));
848            result
849        });
850    }
851
852    fn handle_user_report(&self, message: Managed<StoreUserReport>) {
853        let scoping = message.scoping();
854        let received_at = message.received_at();
855
856        let _ = message.try_accept(|report| {
857            let kafka_msg = KafkaMessage::UserReport(UserReportKafkaMessage {
858                project_id: scoping.project_id,
859                event_id: report.event_id,
860                start_time: safe_timestamp(received_at),
861                payload: report.report.payload(),
862                org_id: scoping.organization_id,
863            });
864            self.produce(KafkaTopic::Attachments, kafka_msg)
865        });
866    }
867
868    fn handle_profile(&self, message: Managed<StoreProfile>) {
869        let scoping = message.scoping();
870        let received_at = message.received_at();
871
872        let _ = message.try_accept(|profile| {
873            self.produce_profile(
874                scoping.organization_id,
875                scoping.project_id,
876                scoping.key_id,
877                received_at,
878                profile.retention_days,
879                &profile.profile,
880            )
881        });
882    }
883
884    fn create_metric_message<'a>(
885        &self,
886        organization_id: OrganizationId,
887        project_id: ProjectId,
888        encoder: &'a mut BucketEncoder,
889        namespace: MetricNamespace,
890        view: &BucketView<'a>,
891        retention_days: u16,
892    ) -> Result<MetricKafkaMessage<'a>, StoreError> {
893        let value = match view.value() {
894            BucketViewValue::Counter(c) => MetricValue::Counter(c),
895            BucketViewValue::Distribution(data) => MetricValue::Distribution(
896                encoder
897                    .encode_distribution(namespace, data)
898                    .map_err(StoreError::EncodingFailed)?,
899            ),
900            BucketViewValue::Set(data) => MetricValue::Set(
901                encoder
902                    .encode_set(namespace, data)
903                    .map_err(StoreError::EncodingFailed)?,
904            ),
905            BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
906        };
907
908        Ok(MetricKafkaMessage {
909            org_id: organization_id,
910            project_id,
911            name: view.name(),
912            value,
913            timestamp: view.timestamp(),
914            tags: view.tags(),
915            retention_days,
916            received_at: view.metadata().received_at,
917        })
918    }
919
920    fn produce(
921        &self,
922        topic: KafkaTopic,
923        // Takes message by value to ensure it is not being produced twice.
924        message: KafkaMessage,
925    ) -> Result<(), StoreError> {
926        relay_log::trace!("Sending kafka message of type {}", message.variant());
927
928        let topic_name = self
929            .producer
930            .client
931            .send_message(topic, &message)
932            .inspect_err(|err| {
933                relay_log::error!(
934                    error = err as &dyn Error,
935                    tags.topic = ?topic,
936                    tags.message = message.variant(),
937                    "failed to produce to Kafka"
938                )
939            })?;
940
941        match &message {
942            KafkaMessage::Metric {
943                message: metric, ..
944            } => {
945                metric!(
946                    counter(RelayCounters::ProcessingMessageProduced) += 1,
947                    event_type = message.variant(),
948                    topic = topic_name,
949                    metric_type = metric.value.variant(),
950                    metric_encoding = metric.value.encoding().unwrap_or(""),
951                );
952            }
953            KafkaMessage::ReplayRecordingNotChunked(replay) => {
954                let has_video = replay.replay_video.is_some();
955
956                metric!(
957                    counter(RelayCounters::ProcessingMessageProduced) += 1,
958                    event_type = message.variant(),
959                    topic = topic_name,
960                    has_video = bool_to_str(has_video),
961                );
962            }
963            message => {
964                metric!(
965                    counter(RelayCounters::ProcessingMessageProduced) += 1,
966                    event_type = message.variant(),
967                    topic = topic_name,
968                );
969            }
970        }
971
972        Ok(())
973    }
974
975    /// Produces Kafka messages for the content and metadata of an attachment item.
976    ///
977    /// The `send_individual_attachments` controls whether the metadata of an attachment
978    /// is produced directly as an individual `attachment` message, or returned from this function
979    /// to be later sent as part of an `event` message.
980    ///
981    /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages,
982    /// unless the `send_individual_attachments` flag is set, and the content is small enough
983    /// to fit inside a message.
984    /// In that case, no `attachment_chunk` is produced, but the content is sent as part
985    /// of the `attachment` message instead.
986    fn produce_attachment(
987        &self,
988        event_id: EventId,
989        project_id: ProjectId,
990        org_id: OrganizationId,
991        item: &Item,
992        send_individual_attachments: bool,
993    ) -> Result<Option<ChunkedAttachment>, StoreError> {
994        let id = Uuid::new_v4().to_string();
995
996        let payload = item.payload();
997        let size = item.len();
998        let max_chunk_size = self.config.attachment_chunk_size();
999
1000        let payload = if size == 0 {
1001            AttachmentPayload::Chunked(0)
1002        } else if let Some(stored_key) = item.stored_key() {
1003            AttachmentPayload::Stored(stored_key.into())
1004        } else if send_individual_attachments && size < max_chunk_size {
1005            // When sending individual attachments, and we have a single chunk, we want to send the
1006            // `data` inline in the `attachment` message.
1007            // This avoids a needless roundtrip through the attachments cache on the Sentry side.
1008            AttachmentPayload::Inline(payload)
1009        } else {
1010            let mut chunk_index = 0;
1011            let mut offset = 0;
1012            // This skips chunks for empty attachments. The consumer does not require chunks for
1013            // empty attachments. `chunks` will be `0` in this case.
1014            while offset < size {
1015                let chunk_size = std::cmp::min(max_chunk_size, size - offset);
1016                let chunk_message = AttachmentChunkKafkaMessage {
1017                    payload: payload.slice(offset..offset + chunk_size),
1018                    event_id,
1019                    project_id,
1020                    id: id.clone(),
1021                    chunk_index,
1022                    org_id,
1023                };
1024
1025                self.produce(
1026                    KafkaTopic::Attachments,
1027                    KafkaMessage::AttachmentChunk(chunk_message),
1028                )?;
1029                offset += chunk_size;
1030                chunk_index += 1;
1031            }
1032
1033            // The chunk_index is incremented after every loop iteration. After we exit the loop, it
1034            // is one larger than the last chunk, so it is equal to the number of chunks.
1035            AttachmentPayload::Chunked(chunk_index)
1036        };
1037
1038        let attachment = ChunkedAttachment {
1039            id,
1040            name: match item.filename() {
1041                Some(name) => name.to_owned(),
1042                None => UNNAMED_ATTACHMENT.to_owned(),
1043            },
1044            rate_limited: item.rate_limited(),
1045            content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
1046            attachment_type: item.attachment_type().unwrap_or_default(),
1047            size,
1048            payload,
1049        };
1050
1051        if send_individual_attachments {
1052            let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
1053                event_id,
1054                project_id,
1055                attachment,
1056                org_id,
1057            });
1058            self.produce(KafkaTopic::Attachments, message)?;
1059            Ok(None)
1060        } else {
1061            Ok(Some(attachment))
1062        }
1063    }
1064
1065    fn produce_user_report(
1066        &self,
1067        event_id: EventId,
1068        project_id: ProjectId,
1069        org_id: OrganizationId,
1070        received_at: DateTime<Utc>,
1071        item: &Item,
1072    ) -> Result<(), StoreError> {
1073        let message = KafkaMessage::UserReport(UserReportKafkaMessage {
1074            project_id,
1075            event_id,
1076            start_time: safe_timestamp(received_at),
1077            payload: item.payload(),
1078            org_id,
1079        });
1080
1081        self.produce(KafkaTopic::Attachments, message)
1082    }
1083
1084    fn produce_user_report_v2(
1085        &self,
1086        event_id: EventId,
1087        project_id: ProjectId,
1088        org_id: OrganizationId,
1089        received_at: DateTime<Utc>,
1090        item: &Item,
1091        remote_addr: Option<String>,
1092    ) -> Result<(), StoreError> {
1093        let message = KafkaMessage::Event(EventKafkaMessage {
1094            project_id,
1095            event_id,
1096            payload: item.payload(),
1097            start_time: safe_timestamp(received_at),
1098            remote_addr,
1099            attachments: vec![],
1100            org_id,
1101        });
1102        self.produce(KafkaTopic::Feedback, message)
1103    }
1104
1105    fn send_metric_message(
1106        &self,
1107        namespace: MetricNamespace,
1108        message: MetricKafkaMessage,
1109    ) -> Result<(), StoreError> {
1110        let topic = match namespace {
1111            MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1112            MetricNamespace::Unsupported => {
1113                relay_log::with_scope(
1114                    |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
1115                    || relay_log::error!("store service dropping unknown metric usecase"),
1116                );
1117                return Ok(());
1118            }
1119            _ => KafkaTopic::MetricsGeneric,
1120        };
1121
1122        let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1123        self.produce(topic, KafkaMessage::Metric { headers, message })?;
1124        Ok(())
1125    }
1126
1127    fn produce_profile(
1128        &self,
1129        organization_id: OrganizationId,
1130        project_id: ProjectId,
1131        key_id: Option<u64>,
1132        received_at: DateTime<Utc>,
1133        retention_days: u16,
1134        item: &Item,
1135    ) -> Result<(), StoreError> {
1136        let message = ProfileKafkaMessage {
1137            organization_id,
1138            project_id,
1139            key_id,
1140            received: safe_timestamp(received_at),
1141            retention_days,
1142            headers: BTreeMap::from([
1143                (
1144                    "sampled".to_owned(),
1145                    if item.sampled() { "true" } else { "false" }.to_owned(),
1146                ),
1147                ("project_id".to_owned(), project_id.to_string()),
1148            ]),
1149            payload: item.payload(),
1150        };
1151        self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1152        Ok(())
1153    }
1154
1155    fn produce_check_in(
1156        &self,
1157        project_id: ProjectId,
1158        org_id: OrganizationId,
1159        received_at: DateTime<Utc>,
1160        client: Option<&str>,
1161        retention_days: u16,
1162        item: &Item,
1163    ) -> Result<(), StoreError> {
1164        let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1165            message_type: CheckInMessageType::CheckIn,
1166            project_id,
1167            retention_days,
1168            start_time: safe_timestamp(received_at),
1169            sdk: client.map(str::to_owned),
1170            payload: item.payload(),
1171            routing_key_hint: item.routing_hint(),
1172            org_id,
1173        });
1174
1175        self.produce(KafkaTopic::Monitors, message)?;
1176
1177        Ok(())
1178    }
1179
1180    fn produce_span(
1181        &self,
1182        scoping: Scoping,
1183        received_at: DateTime<Utc>,
1184        event_id: Option<EventId>,
1185        retention_days: u16,
1186        downsampled_retention_days: u16,
1187        item: &Item,
1188    ) -> Result<(), StoreError> {
1189        debug_assert_eq!(item.ty(), &ItemType::Span);
1190        debug_assert_eq!(item.content_type(), Some(ContentType::Json));
1191
1192        let Scoping {
1193            organization_id,
1194            project_id,
1195            project_key: _,
1196            key_id,
1197        } = scoping;
1198
1199        let relay_emits_accepted_outcome = !utils::is_rolled_out(
1200            scoping.organization_id.value(),
1201            self.global_config
1202                .current()
1203                .options
1204                .eap_span_outcomes_rollout_rate,
1205        )
1206        .is_keep();
1207
1208        let payload = item.payload();
1209        let message = SpanKafkaMessageRaw {
1210            meta: SpanMeta {
1211                organization_id,
1212                project_id,
1213                key_id,
1214                event_id,
1215                retention_days,
1216                downsampled_retention_days,
1217                received: datetime_to_timestamp(received_at),
1218                accepted_outcome_emitted: relay_emits_accepted_outcome,
1219            },
1220            span: serde_json::from_slice(&payload)
1221                .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1222        };
1223
1224        // Verify that this is a V2 span:
1225        debug_assert!(message.span.contains_key("attributes"));
1226        relay_statsd::metric!(
1227            counter(RelayCounters::SpanV2Produced) += 1,
1228            via = "envelope"
1229        );
1230
1231        self.produce(
1232            KafkaTopic::Spans,
1233            KafkaMessage::SpanRaw {
1234                routing_key: item.routing_hint(),
1235                headers: BTreeMap::from([(
1236                    "project_id".to_owned(),
1237                    scoping.project_id.to_string(),
1238                )]),
1239                message,
1240                org_id: organization_id,
1241            },
1242        )?;
1243
1244        if relay_emits_accepted_outcome {
1245            // XXX: Temporarily produce span outcomes. Keep in sync with either EAP
1246            // or the segments consumer, depending on which will produce outcomes later.
1247            self.outcome_aggregator.send(TrackOutcome {
1248                category: DataCategory::SpanIndexed,
1249                event_id: None,
1250                outcome: Outcome::Accepted,
1251                quantity: 1,
1252                remote_addr: None,
1253                scoping,
1254                timestamp: received_at,
1255            });
1256        }
1257
1258        Ok(())
1259    }
1260}
1261
1262impl Service for StoreService {
1263    type Interface = Store;
1264
1265    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1266        let this = Arc::new(self);
1267
1268        relay_log::info!("store forwarder started");
1269
1270        while let Some(message) = rx.recv().await {
1271            let service = Arc::clone(&this);
1272            // For now, we run each task synchronously, in the future we might explore how to make
1273            // the store async.
1274            this.pool
1275                .spawn_async(async move { service.handle_message(message) }.boxed())
1276                .await;
1277        }
1278
1279        relay_log::info!("store forwarder stopped");
1280    }
1281}
1282
1283/// This signifies how the attachment payload is being transfered.
1284#[derive(Debug, Serialize)]
1285enum AttachmentPayload {
1286    /// The payload has been split into multiple chunks.
1287    ///
1288    /// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1289    /// If the payload `size == 0`, the number of chunks will also be `0`.
1290    #[serde(rename = "chunks")]
1291    Chunked(usize),
1292
1293    /// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1294    #[serde(rename = "data")]
1295    Inline(Bytes),
1296
1297    /// The attachment has already been stored into the objectstore, with the given Id.
1298    #[serde(rename = "stored_id")]
1299    Stored(String),
1300}
1301
1302/// Common attributes for both standalone attachments and processing-relevant attachments.
1303#[derive(Debug, Serialize)]
1304struct ChunkedAttachment {
1305    /// The attachment ID within the event.
1306    ///
1307    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1308    id: String,
1309
1310    /// File name of the attachment file.
1311    name: String,
1312
1313    /// Whether this attachment was rate limited and should be removed after processing.
1314    ///
1315    /// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1316    /// native crash reports still need to be retained. These attachments are marked with the
1317    /// `rate_limited` header, which signals to the processing pipeline that the attachment should
1318    /// not be persisted after processing.
1319    rate_limited: bool,
1320
1321    /// Content type of the attachment payload.
1322    #[serde(skip_serializing_if = "Option::is_none")]
1323    content_type: Option<String>,
1324
1325    /// The Sentry-internal attachment type used in the processing pipeline.
1326    #[serde(serialize_with = "serialize_attachment_type")]
1327    attachment_type: AttachmentType,
1328
1329    /// The size of the attachment in bytes.
1330    size: usize,
1331
1332    /// The attachment payload, chunked, inlined, or already stored.
1333    #[serde(flatten)]
1334    payload: AttachmentPayload,
1335}
1336
1337/// A hack to make rmp-serde behave more like serde-json when serializing enums.
1338///
1339/// Cannot serialize bytes.
1340///
1341/// See <https://github.com/3Hren/msgpack-rust/pull/214>
1342fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1343where
1344    S: serde::Serializer,
1345    T: serde::Serialize,
1346{
1347    serde_json::to_value(t)
1348        .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1349        .serialize(serializer)
1350}
1351
1352/// Container payload for event messages.
1353#[derive(Debug, Serialize)]
1354struct EventKafkaMessage {
1355    /// Raw event payload.
1356    payload: Bytes,
1357    /// Time at which the event was received by Relay.
1358    start_time: u64,
1359    /// The event id.
1360    event_id: EventId,
1361    /// The project id for the current event.
1362    project_id: ProjectId,
1363    /// The client ip address.
1364    remote_addr: Option<String>,
1365    /// Attachments that are potentially relevant for processing.
1366    attachments: Vec<ChunkedAttachment>,
1367
1368    /// Used for [`KafkaMessage::key`]
1369    #[serde(skip)]
1370    org_id: OrganizationId,
1371}
1372
1373/// Container payload for chunks of attachments.
1374#[derive(Debug, Serialize)]
1375struct AttachmentChunkKafkaMessage {
1376    /// Chunk payload of the attachment.
1377    payload: Bytes,
1378    /// The event id.
1379    event_id: EventId,
1380    /// The project id for the current event.
1381    project_id: ProjectId,
1382    /// The attachment ID within the event.
1383    ///
1384    /// The triple `(project_id, event_id, id)` identifies an attachment uniquely.
1385    id: String,
1386    /// Sequence number of chunk. Starts at 0 and ends at `AttachmentKafkaMessage.num_chunks - 1`.
1387    chunk_index: usize,
1388
1389    /// Used for [`KafkaMessage::key`]
1390    #[serde(skip)]
1391    org_id: OrganizationId,
1392}
1393
1394/// A "standalone" attachment.
1395///
1396/// Still belongs to an event but can be sent independently (like UserReport) and is not
1397/// considered in processing.
1398#[derive(Debug, Serialize)]
1399struct AttachmentKafkaMessage {
1400    /// The event id.
1401    event_id: EventId,
1402    /// The project id for the current event.
1403    project_id: ProjectId,
1404    /// The attachment.
1405    attachment: ChunkedAttachment,
1406
1407    /// Used for [`KafkaMessage::key`]
1408    #[serde(skip)]
1409    org_id: OrganizationId,
1410}
1411
1412#[derive(Debug, Serialize)]
1413struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1414    replay_id: EventId,
1415    key_id: Option<u64>,
1416    org_id: OrganizationId,
1417    project_id: ProjectId,
1418    received: u64,
1419    retention_days: u16,
1420    #[serde(with = "serde_bytes")]
1421    payload: &'a [u8],
1422    #[serde(with = "serde_bytes")]
1423    replay_event: Option<&'a [u8]>,
1424    #[serde(with = "serde_bytes")]
1425    replay_video: Option<&'a [u8]>,
1426    relay_snuba_publish_disabled: bool,
1427}
1428
1429/// User report for an event wrapped up in a message ready for consumption in Kafka.
1430///
1431/// Is always independent of an event and can be sent as part of any envelope.
1432#[derive(Debug, Serialize)]
1433struct UserReportKafkaMessage {
1434    /// The project id for the current event.
1435    project_id: ProjectId,
1436    start_time: u64,
1437    payload: Bytes,
1438
1439    /// Used for [`KafkaMessage::key`]
1440    #[serde(skip)]
1441    event_id: EventId,
1442    /// Used for [`KafkaMessage::key`]
1443    #[serde(skip)]
1444    org_id: OrganizationId,
1445}
1446
1447#[derive(Clone, Debug, Serialize)]
1448struct MetricKafkaMessage<'a> {
1449    org_id: OrganizationId,
1450    project_id: ProjectId,
1451    name: &'a MetricName,
1452    #[serde(flatten)]
1453    value: MetricValue<'a>,
1454    timestamp: UnixTimestamp,
1455    tags: &'a BTreeMap<String, String>,
1456    retention_days: u16,
1457    #[serde(skip_serializing_if = "Option::is_none")]
1458    received_at: Option<UnixTimestamp>,
1459}
1460
1461#[derive(Clone, Debug, Serialize)]
1462#[serde(tag = "type", content = "value")]
1463enum MetricValue<'a> {
1464    #[serde(rename = "c")]
1465    Counter(FiniteF64),
1466    #[serde(rename = "d")]
1467    Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1468    #[serde(rename = "s")]
1469    Set(ArrayEncoding<'a, SetView<'a>>),
1470    #[serde(rename = "g")]
1471    Gauge(GaugeValue),
1472}
1473
1474impl MetricValue<'_> {
1475    fn variant(&self) -> &'static str {
1476        match self {
1477            Self::Counter(_) => "counter",
1478            Self::Distribution(_) => "distribution",
1479            Self::Set(_) => "set",
1480            Self::Gauge(_) => "gauge",
1481        }
1482    }
1483
1484    fn encoding(&self) -> Option<&'static str> {
1485        match self {
1486            Self::Distribution(ae) => Some(ae.name()),
1487            Self::Set(ae) => Some(ae.name()),
1488            _ => None,
1489        }
1490    }
1491}
1492
1493#[derive(Clone, Debug, Serialize)]
1494struct ProfileKafkaMessage {
1495    organization_id: OrganizationId,
1496    project_id: ProjectId,
1497    key_id: Option<u64>,
1498    received: u64,
1499    retention_days: u16,
1500    #[serde(skip)]
1501    headers: BTreeMap<String, String>,
1502    payload: Bytes,
1503}
1504
1505/// Used to discriminate cron monitor ingestion messages.
1506///
1507/// There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the
1508/// ones produced here in relay) and "clock_pulse" messages, which are produced externally and are
1509/// intended to ensure the clock continues to run even when ingestion volume drops.
1510#[allow(dead_code)]
1511#[derive(Debug, Serialize)]
1512#[serde(rename_all = "snake_case")]
1513enum CheckInMessageType {
1514    ClockPulse,
1515    CheckIn,
1516}
1517
1518#[derive(Debug, Serialize)]
1519struct CheckInKafkaMessage {
1520    /// Used by the consumer to discrinminate the message.
1521    message_type: CheckInMessageType,
1522    /// Raw event payload.
1523    payload: Bytes,
1524    /// Time at which the event was received by Relay.
1525    start_time: u64,
1526    /// The SDK client which produced the event.
1527    sdk: Option<String>,
1528    /// The project id for the current event.
1529    project_id: ProjectId,
1530    /// Number of days to retain.
1531    retention_days: u16,
1532
1533    /// Used for [`KafkaMessage::key`]
1534    #[serde(skip)]
1535    routing_key_hint: Option<Uuid>,
1536    /// Used for [`KafkaMessage::key`]
1537    #[serde(skip)]
1538    org_id: OrganizationId,
1539}
1540
1541#[derive(Debug, Serialize)]
1542struct SpanKafkaMessageRaw<'a> {
1543    #[serde(flatten)]
1544    meta: SpanMeta,
1545    #[serde(flatten)]
1546    span: BTreeMap<&'a str, &'a RawValue>,
1547}
1548
1549#[derive(Debug, Serialize)]
1550struct SpanKafkaMessage<'a> {
1551    #[serde(flatten)]
1552    meta: SpanMeta,
1553    #[serde(flatten)]
1554    span: SerializableAnnotated<'a, SpanV2>,
1555}
1556
1557#[derive(Debug, Serialize)]
1558struct SpanMeta {
1559    organization_id: OrganizationId,
1560    project_id: ProjectId,
1561    // Required for the buffer to emit outcomes scoped to the DSN.
1562    #[serde(skip_serializing_if = "Option::is_none")]
1563    key_id: Option<u64>,
1564    #[serde(skip_serializing_if = "Option::is_none")]
1565    event_id: Option<EventId>,
1566    /// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
1567    received: f64,
1568    /// Number of days until these data should be deleted.
1569    retention_days: u16,
1570    /// Number of days until the downsampled version of this data should be deleted.
1571    downsampled_retention_days: u16,
1572    /// Indicates whether Relay already emitted an accepted outcome or if EAP still needs to emit it.
1573    accepted_outcome_emitted: bool,
1574}
1575
1576#[derive(Clone, Debug, Serialize)]
1577struct ProfileChunkKafkaMessage {
1578    organization_id: OrganizationId,
1579    project_id: ProjectId,
1580    received: u64,
1581    retention_days: u16,
1582    #[serde(skip)]
1583    headers: BTreeMap<String, String>,
1584    payload: Bytes,
1585}
1586
1587/// An enum over all possible ingest messages.
1588#[derive(Debug, Serialize)]
1589#[serde(tag = "type", rename_all = "snake_case")]
1590#[allow(clippy::large_enum_variant)]
1591enum KafkaMessage<'a> {
1592    Event(EventKafkaMessage),
1593    UserReport(UserReportKafkaMessage),
1594    Metric {
1595        #[serde(skip)]
1596        headers: BTreeMap<String, String>,
1597        #[serde(flatten)]
1598        message: MetricKafkaMessage<'a>,
1599    },
1600    CheckIn(CheckInKafkaMessage),
1601    Item {
1602        #[serde(skip)]
1603        headers: BTreeMap<String, String>,
1604        #[serde(skip)]
1605        item_type: TraceItemType,
1606        #[serde(skip)]
1607        message: TraceItem,
1608    },
1609    SpanRaw {
1610        #[serde(skip)]
1611        routing_key: Option<Uuid>,
1612        #[serde(skip)]
1613        headers: BTreeMap<String, String>,
1614        #[serde(flatten)]
1615        message: SpanKafkaMessageRaw<'a>,
1616
1617        /// Used for [`KafkaMessage::key`]
1618        #[serde(skip)]
1619        org_id: OrganizationId,
1620    },
1621    SpanV2 {
1622        #[serde(skip)]
1623        routing_key: Option<Uuid>,
1624        #[serde(skip)]
1625        headers: BTreeMap<String, String>,
1626        #[serde(flatten)]
1627        message: SpanKafkaMessage<'a>,
1628
1629        /// Used for [`KafkaMessage::key`]
1630        #[serde(skip)]
1631        org_id: OrganizationId,
1632    },
1633
1634    Attachment(AttachmentKafkaMessage),
1635    AttachmentChunk(AttachmentChunkKafkaMessage),
1636
1637    Profile(ProfileKafkaMessage),
1638    ProfileChunk(ProfileChunkKafkaMessage),
1639
1640    ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1641}
1642
1643impl KafkaMessage<'_> {
1644    /// Creates a [`KafkaMessage`] for a [`TraceItem`].
1645    fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1646        let item_type = item.item_type();
1647        KafkaMessage::Item {
1648            headers: BTreeMap::from([
1649                ("project_id".to_owned(), scoping.project_id.to_string()),
1650                ("item_type".to_owned(), (item_type as i32).to_string()),
1651            ]),
1652            message: item,
1653            item_type,
1654        }
1655    }
1656}
1657
1658impl Message for KafkaMessage<'_> {
1659    fn variant(&self) -> &'static str {
1660        match self {
1661            KafkaMessage::Event(_) => "event",
1662            KafkaMessage::UserReport(_) => "user_report",
1663            KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1664                MetricNamespace::Sessions => "metric_sessions",
1665                MetricNamespace::Transactions => "metric_transactions",
1666                MetricNamespace::Spans => "metric_spans",
1667                MetricNamespace::Custom => "metric_custom",
1668                MetricNamespace::Unsupported => "metric_unsupported",
1669            },
1670            KafkaMessage::CheckIn(_) => "check_in",
1671            KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1672            KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1673
1674            KafkaMessage::Attachment(_) => "attachment",
1675            KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1676
1677            KafkaMessage::Profile(_) => "profile",
1678            KafkaMessage::ProfileChunk(_) => "profile_chunk",
1679
1680            KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1681        }
1682    }
1683
1684    /// Returns the partitioning key for this Kafka message determining.
1685    fn key(&self) -> Option<relay_kafka::Key> {
1686        match self {
1687            Self::Event(message) => Some((message.event_id.0, message.org_id)),
1688            Self::UserReport(message) => Some((message.event_id.0, message.org_id)),
1689            Self::SpanRaw {
1690                routing_key,
1691                org_id,
1692                ..
1693            }
1694            | Self::SpanV2 {
1695                routing_key,
1696                org_id,
1697                ..
1698            } => routing_key.map(|r| (r, *org_id)),
1699
1700            // Monitor check-ins use the hinted UUID passed through from the Envelope.
1701            //
1702            // XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
1703            // recieve the routing_key_hint form their envelopes.
1704            Self::CheckIn(message) => message.routing_key_hint.map(|r| (r, message.org_id)),
1705
1706            Self::Attachment(message) => Some((message.event_id.0, message.org_id)),
1707            Self::AttachmentChunk(message) => Some((message.event_id.0, message.org_id)),
1708
1709            // Random partitioning
1710            Self::Metric { .. }
1711            | Self::Item { .. }
1712            | Self::Profile(_)
1713            | Self::ProfileChunk(_)
1714            | Self::ReplayRecordingNotChunked(_) => None,
1715        }
1716        .filter(|(uuid, _)| !uuid.is_nil())
1717        .map(|(uuid, org_id)| {
1718            // mix key with org id for better paritioning in case of incidents where messages
1719            // across orgs get the same id
1720            let mut res = uuid.into_bytes();
1721            for (i, &b) in org_id.value().to_be_bytes().iter().enumerate() {
1722                res[i] ^= b;
1723            }
1724            u128::from_be_bytes(res)
1725        })
1726    }
1727
1728    fn headers(&self) -> Option<&BTreeMap<String, String>> {
1729        match &self {
1730            KafkaMessage::Metric { headers, .. }
1731            | KafkaMessage::SpanRaw { headers, .. }
1732            | KafkaMessage::SpanV2 { headers, .. }
1733            | KafkaMessage::Item { headers, .. }
1734            | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1735            | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1736
1737            KafkaMessage::Event(_)
1738            | KafkaMessage::UserReport(_)
1739            | KafkaMessage::CheckIn(_)
1740            | KafkaMessage::Attachment(_)
1741            | KafkaMessage::AttachmentChunk(_)
1742            | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1743        }
1744    }
1745
1746    fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1747        match self {
1748            KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1749            KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1750            KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1751            KafkaMessage::Item { message, .. } => {
1752                let mut payload = Vec::new();
1753                match message.encode(&mut payload) {
1754                    Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1755                    Err(_) => Err(ClientError::ProtobufEncodingFailed),
1756                }
1757            }
1758            KafkaMessage::Event(_)
1759            | KafkaMessage::UserReport(_)
1760            | KafkaMessage::CheckIn(_)
1761            | KafkaMessage::Attachment(_)
1762            | KafkaMessage::AttachmentChunk(_)
1763            | KafkaMessage::Profile(_)
1764            | KafkaMessage::ProfileChunk(_)
1765            | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1766                Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1767                Err(err) => Err(ClientError::InvalidMsgPack(err)),
1768            },
1769        }
1770    }
1771}
1772
1773fn serialize_as_json<T: serde::Serialize>(
1774    value: &T,
1775) -> Result<SerializationOutput<'_>, ClientError> {
1776    match serde_json::to_vec(value) {
1777        Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1778        Err(err) => Err(ClientError::InvalidJson(err)),
1779    }
1780}
1781
1782/// Determines if the given item is considered slow.
1783///
1784/// Slow items must be routed to the `Attachments` topic.
1785fn is_slow_item(item: &Item) -> bool {
1786    item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1787}
1788
1789fn bool_to_str(value: bool) -> &'static str {
1790    if value { "true" } else { "false" }
1791}
1792
1793/// Returns a safe timestamp for Kafka.
1794///
1795/// Kafka expects timestamps to be in UTC and in seconds since epoch.
1796fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1797    let ts = timestamp.timestamp();
1798    if ts >= 0 {
1799        return ts as u64;
1800    }
1801
1802    // We assume this call can't return < 0.
1803    Utc::now().timestamp() as u64
1804}
1805
1806#[cfg(test)]
1807mod tests {
1808
1809    use super::*;
1810
1811    #[test]
1812    fn disallow_outcomes() {
1813        let config = Config::default();
1814        let producer = Producer::create(&config).unwrap();
1815
1816        for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1817            let res = producer
1818                .client
1819                .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1820
1821            assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1822        }
1823    }
1824}