1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
43use crate::utils::{self, FormDataIter};
44
45mod sessions;
46
47const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
49
50#[derive(Debug, thiserror::Error)]
51pub enum StoreError {
52 #[error("failed to send the message to kafka: {0}")]
53 SendFailed(#[from] ClientError),
54 #[error("failed to encode data: {0}")]
55 EncodingFailed(std::io::Error),
56 #[error("failed to store event because event id was missing")]
57 NoEventId,
58}
59
60impl OutcomeError for StoreError {
61 type Error = Self;
62
63 fn consume(self) -> (Option<Outcome>, Self::Error) {
64 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
65 }
66}
67
68struct Producer {
69 client: KafkaClient,
70}
71
72impl Producer {
73 pub fn create(config: &Config) -> anyhow::Result<Self> {
74 let mut client_builder = KafkaClient::builder();
75
76 for topic in KafkaTopic::iter().filter(|t| {
77 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
80 }) {
81 let kafka_configs = config.kafka_configs(*topic)?;
82 client_builder = client_builder
83 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
84 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
85 }
86
87 Ok(Self {
88 client: client_builder.build(),
89 })
90 }
91}
92
93#[derive(Debug)]
95pub struct StoreEnvelope {
96 pub envelope: ManagedEnvelope,
97}
98
99#[derive(Clone, Debug)]
101pub struct StoreMetrics {
102 pub buckets: Vec<Bucket>,
103 pub scoping: Scoping,
104 pub retention: u16,
105}
106
107#[derive(Debug)]
109pub struct StoreTraceItem {
110 pub trace_item: TraceItem,
112}
113
114impl Counted for StoreTraceItem {
115 fn quantities(&self) -> Quantities {
116 self.trace_item.quantities()
117 }
118}
119
120#[derive(Debug)]
122pub struct StoreSpanV2 {
123 pub routing_key: Option<Uuid>,
125 pub retention_days: u16,
127 pub downsampled_retention_days: u16,
129 pub item: SpanV2,
131}
132
133impl Counted for StoreSpanV2 {
134 fn quantities(&self) -> Quantities {
135 smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
136 }
137}
138
139#[derive(Debug)]
141pub struct StoreProfileChunk {
142 pub retention_days: u16,
144 pub payload: Bytes,
146 pub quantities: Quantities,
150}
151
152impl Counted for StoreProfileChunk {
153 fn quantities(&self) -> Quantities {
154 self.quantities.clone()
155 }
156}
157
158#[derive(Debug)]
160pub struct StoreReplay {
161 pub event_id: EventId,
163 pub retention_days: u16,
165 pub recording: Bytes,
167 pub event: Option<Bytes>,
169 pub video: Option<Bytes>,
171 pub quantities: Quantities,
175}
176
177impl Counted for StoreReplay {
178 fn quantities(&self) -> Quantities {
179 self.quantities.clone()
180 }
181}
182
183#[derive(Debug)]
185pub struct StoreAttachment {
186 pub event_id: EventId,
188 pub attachment: Item,
190 pub quantities: Quantities,
192}
193
194impl Counted for StoreAttachment {
195 fn quantities(&self) -> Quantities {
196 self.quantities.clone()
197 }
198}
199
200pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
202
203#[derive(Debug)]
205pub enum Store {
206 Envelope(StoreEnvelope),
214 Metrics(StoreMetrics),
216 TraceItem(Managed<StoreTraceItem>),
218 Span(Managed<Box<StoreSpanV2>>),
220 ProfileChunk(Managed<StoreProfileChunk>),
222 Replay(Managed<StoreReplay>),
224 Attachment(Managed<StoreAttachment>),
226}
227
228impl Store {
229 fn variant(&self) -> &'static str {
231 match self {
232 Store::Envelope(_) => "envelope",
233 Store::Metrics(_) => "metrics",
234 Store::TraceItem(_) => "trace_item",
235 Store::Span(_) => "span",
236 Store::ProfileChunk(_) => "profile_chunk",
237 Store::Replay(_) => "replay",
238 Store::Attachment(_) => "attachment",
239 }
240 }
241}
242
243impl Interface for Store {}
244
245impl FromMessage<StoreEnvelope> for Store {
246 type Response = NoResponse;
247
248 fn from_message(message: StoreEnvelope, _: ()) -> Self {
249 Self::Envelope(message)
250 }
251}
252
253impl FromMessage<StoreMetrics> for Store {
254 type Response = NoResponse;
255
256 fn from_message(message: StoreMetrics, _: ()) -> Self {
257 Self::Metrics(message)
258 }
259}
260
261impl FromMessage<Managed<StoreTraceItem>> for Store {
262 type Response = NoResponse;
263
264 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
265 Self::TraceItem(message)
266 }
267}
268
269impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
270 type Response = NoResponse;
271
272 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
273 Self::Span(message)
274 }
275}
276
277impl FromMessage<Managed<StoreProfileChunk>> for Store {
278 type Response = NoResponse;
279
280 fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
281 Self::ProfileChunk(message)
282 }
283}
284
285impl FromMessage<Managed<StoreReplay>> for Store {
286 type Response = NoResponse;
287
288 fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
289 Self::Replay(message)
290 }
291}
292
293impl FromMessage<Managed<StoreAttachment>> for Store {
294 type Response = NoResponse;
295
296 fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
297 Self::Attachment(message)
298 }
299}
300
301pub struct StoreService {
303 pool: StoreServicePool,
304 config: Arc<Config>,
305 global_config: GlobalConfigHandle,
306 outcome_aggregator: Addr<TrackOutcome>,
307 metric_outcomes: MetricOutcomes,
308 producer: Producer,
309}
310
311impl StoreService {
312 pub fn create(
313 pool: StoreServicePool,
314 config: Arc<Config>,
315 global_config: GlobalConfigHandle,
316 outcome_aggregator: Addr<TrackOutcome>,
317 metric_outcomes: MetricOutcomes,
318 ) -> anyhow::Result<Self> {
319 let producer = Producer::create(&config)?;
320 Ok(Self {
321 pool,
322 config,
323 global_config,
324 outcome_aggregator,
325 metric_outcomes,
326 producer,
327 })
328 }
329
330 fn handle_message(&self, message: Store) {
331 let ty = message.variant();
332 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
333 match message {
334 Store::Envelope(message) => self.handle_store_envelope(message),
335 Store::Metrics(message) => self.handle_store_metrics(message),
336 Store::TraceItem(message) => self.handle_store_trace_item(message),
337 Store::Span(message) => self.handle_store_span(message),
338 Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
339 Store::Replay(message) => self.handle_store_replay(message),
340 Store::Attachment(message) => self.handle_store_attachment(message),
341 }
342 })
343 }
344
345 fn handle_store_envelope(&self, message: StoreEnvelope) {
346 let StoreEnvelope { mut envelope } = message;
347
348 let scoping = envelope.scoping();
349 match self.store_envelope(&mut envelope) {
350 Ok(()) => envelope.accept(),
351 Err(error) => {
352 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
353 relay_log::error!(
354 error = &error as &dyn Error,
355 tags.project_key = %scoping.project_key,
356 "failed to store envelope"
357 );
358 }
359 }
360 }
361
362 fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
363 let mut envelope = managed_envelope.take_envelope();
364 let received_at = managed_envelope.received_at();
365 let scoping = managed_envelope.scoping();
366
367 let retention = envelope.retention();
368 let downsampled_retention = envelope.downsampled_retention();
369
370 let event_id = envelope.event_id();
371 let event_item = envelope.as_mut().take_item_by(|item| {
372 matches!(
373 item.ty(),
374 ItemType::Event | ItemType::Transaction | ItemType::Security
375 )
376 });
377 let event_type = event_item.as_ref().map(|item| item.ty());
378
379 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
383 KafkaTopic::Transactions
384 } else if envelope.get_item_by(is_slow_item).is_some() {
385 KafkaTopic::Attachments
386 } else {
387 KafkaTopic::Events
388 };
389
390 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
391
392 let mut attachments = Vec::new();
393
394 for item in envelope.items() {
395 let content_type = item.content_type();
396 match item.ty() {
397 ItemType::Attachment => {
398 if let Some(attachment) = self.produce_attachment(
399 event_id.ok_or(StoreError::NoEventId)?,
400 scoping.project_id,
401 item,
402 send_individual_attachments,
403 )? {
404 attachments.push(attachment);
405 }
406 }
407 ItemType::UserReport => {
408 debug_assert!(event_topic == KafkaTopic::Attachments);
409 self.produce_user_report(
410 event_id.ok_or(StoreError::NoEventId)?,
411 scoping.project_id,
412 received_at,
413 item,
414 )?;
415 }
416 ItemType::UserReportV2 => {
417 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
418 self.produce_user_report_v2(
419 event_id.ok_or(StoreError::NoEventId)?,
420 scoping.project_id,
421 received_at,
422 item,
423 remote_addr,
424 )?;
425 }
426 ItemType::Profile => self.produce_profile(
427 scoping.organization_id,
428 scoping.project_id,
429 scoping.key_id,
430 received_at,
431 retention,
432 item,
433 )?,
434 ItemType::CheckIn => {
435 let client = envelope.meta().client();
436 self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
437 }
438 ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span(
439 scoping,
440 received_at,
441 event_id,
442 retention,
443 downsampled_retention,
444 item,
445 )?,
446 ty @ ItemType::Log => {
447 debug_assert!(
448 false,
449 "received {ty} through an envelope, \
450 this item must be submitted via a specific store message instead"
451 );
452 relay_log::error!(
453 tags.project_key = %scoping.project_key,
454 "StoreService received unsupported item type '{ty}' in envelope"
455 );
456 }
457 other => {
458 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
459 let item_types = envelope
460 .items()
461 .map(|item| item.ty().as_str())
462 .collect::<Vec<_>>();
463 let attachment_types = envelope
464 .items()
465 .map(|item| {
466 item.attachment_type()
467 .map(|t| t.to_string())
468 .unwrap_or_default()
469 })
470 .collect::<Vec<_>>();
471
472 relay_log::with_scope(
473 |scope| {
474 scope.set_extra("item_types", item_types.into());
475 scope.set_extra("attachment_types", attachment_types.into());
476 if other == &ItemType::FormData {
477 let payload = item.payload();
478 let form_data_keys = FormDataIter::new(&payload)
479 .map(|entry| entry.key())
480 .collect::<Vec<_>>();
481 scope.set_extra("form_data_keys", form_data_keys.into());
482 }
483 },
484 || {
485 relay_log::error!(
486 tags.project_key = %scoping.project_key,
487 tags.event_type = event_type.unwrap_or("none"),
488 "StoreService received unexpected item type: {other}"
489 )
490 },
491 )
492 }
493 }
494 }
495
496 if let Some(event_item) = event_item {
497 let event_id = event_id.ok_or(StoreError::NoEventId)?;
498 let project_id = scoping.project_id;
499 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
500
501 self.produce(
502 event_topic,
503 KafkaMessage::Event(EventKafkaMessage {
504 payload: event_item.payload(),
505 start_time: safe_timestamp(received_at),
506 event_id,
507 project_id,
508 remote_addr,
509 attachments,
510 }),
511 )?;
512 } else {
513 debug_assert!(attachments.is_empty());
514 }
515
516 Ok(())
517 }
518
519 fn handle_store_metrics(&self, message: StoreMetrics) {
520 let StoreMetrics {
521 buckets,
522 scoping,
523 retention,
524 } = message;
525
526 let batch_size = self.config.metrics_max_batch_size_bytes();
527 let mut error = None;
528
529 let global_config = self.global_config.current();
530 let mut encoder = BucketEncoder::new(&global_config);
531
532 let emit_sessions_to_eap = utils::is_rolled_out(
533 scoping.organization_id.value(),
534 global_config.options.sessions_eap_rollout_rate,
535 )
536 .is_keep();
537
538 let now = UnixTimestamp::now();
539 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
540
541 for mut bucket in buckets {
542 let namespace = encoder.prepare(&mut bucket);
543
544 if let Some(received_at) = bucket.metadata.received_at {
545 let delay = now.as_secs().saturating_sub(received_at.as_secs());
546 let (total, count, max) = delay_stats.get_mut(namespace);
547 *total += delay;
548 *count += 1;
549 *max = (*max).max(delay);
550 }
551
552 for view in BucketsView::new(std::slice::from_ref(&bucket))
556 .by_size(batch_size)
557 .flatten()
558 {
559 let message = self.create_metric_message(
560 scoping.organization_id,
561 scoping.project_id,
562 &mut encoder,
563 namespace,
564 &view,
565 retention,
566 );
567
568 let result =
569 message.and_then(|message| self.send_metric_message(namespace, message));
570
571 let outcome = match result {
572 Ok(()) => Outcome::Accepted,
573 Err(e) => {
574 error.get_or_insert(e);
575 Outcome::Invalid(DiscardReason::Internal)
576 }
577 };
578
579 self.metric_outcomes.track(scoping, &[view], outcome);
580 }
581
582 if emit_sessions_to_eap
583 && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
584 {
585 let message = KafkaMessage::for_item(scoping, trace_item);
586 let _ = self.produce(KafkaTopic::Items, message);
587 }
588 }
589
590 if let Some(error) = error {
591 relay_log::error!(
592 error = &error as &dyn std::error::Error,
593 "failed to produce metric buckets: {error}"
594 );
595 }
596
597 for (namespace, (total, count, max)) in delay_stats {
598 if count == 0 {
599 continue;
600 }
601 metric!(
602 counter(RelayCounters::MetricDelaySum) += total,
603 namespace = namespace.as_str()
604 );
605 metric!(
606 counter(RelayCounters::MetricDelayCount) += count,
607 namespace = namespace.as_str()
608 );
609 metric!(
610 gauge(RelayGauges::MetricDelayMax) = max,
611 namespace = namespace.as_str()
612 );
613 }
614 }
615
616 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
617 let scoping = message.scoping();
618 let received_at = message.received_at();
619
620 let eap_emits_outcomes = utils::is_rolled_out(
621 scoping.organization_id.value(),
622 self.global_config
623 .current()
624 .options
625 .eap_outcomes_rollout_rate,
626 )
627 .is_keep();
628
629 let outcomes = message.try_accept(|mut item| {
630 let outcomes = match eap_emits_outcomes {
631 true => None,
632 false => item.trace_item.outcomes.take(),
633 };
634
635 let message = KafkaMessage::for_item(scoping, item.trace_item);
636 self.produce(KafkaTopic::Items, message).map(|()| outcomes)
637 });
638
639 if let Ok(Some(outcomes)) = outcomes {
644 for (category, quantity) in outcomes.quantities() {
645 self.outcome_aggregator.send(TrackOutcome {
646 category,
647 event_id: None,
648 outcome: Outcome::Accepted,
649 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
650 remote_addr: None,
651 scoping,
652 timestamp: received_at,
653 });
654 }
655 }
656 }
657
658 fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
659 let scoping = message.scoping();
660 let received_at = message.received_at();
661
662 let relay_emits_accepted_outcome = !utils::is_rolled_out(
663 scoping.organization_id.value(),
664 self.global_config
665 .current()
666 .options
667 .eap_span_outcomes_rollout_rate,
668 )
669 .is_keep();
670
671 let meta = SpanMeta {
672 organization_id: scoping.organization_id,
673 project_id: scoping.project_id,
674 key_id: scoping.key_id,
675 event_id: None,
676 retention_days: message.retention_days,
677 downsampled_retention_days: message.downsampled_retention_days,
678 received: datetime_to_timestamp(received_at),
679 accepted_outcome_emitted: relay_emits_accepted_outcome,
680 };
681
682 let result = message.try_accept(|span| {
683 let item = Annotated::new(span.item);
684 let message = KafkaMessage::SpanV2 {
685 routing_key: span.routing_key,
686 headers: BTreeMap::from([(
687 "project_id".to_owned(),
688 scoping.project_id.to_string(),
689 )]),
690 message: SpanKafkaMessage {
691 meta,
692 span: SerializableAnnotated(&item),
693 },
694 };
695
696 self.produce(KafkaTopic::Spans, message)
697 });
698
699 if result.is_ok() {
700 relay_statsd::metric!(
701 counter(RelayCounters::SpanV2Produced) += 1,
702 via = "processing"
703 );
704
705 if relay_emits_accepted_outcome {
706 self.outcome_aggregator.send(TrackOutcome {
709 category: DataCategory::SpanIndexed,
710 event_id: None,
711 outcome: Outcome::Accepted,
712 quantity: 1,
713 remote_addr: None,
714 scoping,
715 timestamp: received_at,
716 });
717 }
718 }
719 }
720
721 fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
722 let scoping = message.scoping();
723 let received_at = message.received_at();
724
725 let _ = message.try_accept(|message| {
726 let message = ProfileChunkKafkaMessage {
727 organization_id: scoping.organization_id,
728 project_id: scoping.project_id,
729 received: safe_timestamp(received_at),
730 retention_days: message.retention_days,
731 headers: BTreeMap::from([(
732 "project_id".to_owned(),
733 scoping.project_id.to_string(),
734 )]),
735 payload: message.payload,
736 };
737
738 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
739 });
740 }
741
742 fn handle_store_replay(&self, message: Managed<StoreReplay>) {
743 let scoping = message.scoping();
744 let received_at = message.received_at();
745
746 let _ = message.try_accept(|replay| {
747 let kafka_msg =
748 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
749 replay_id: replay.event_id,
750 key_id: scoping.key_id,
751 org_id: scoping.organization_id,
752 project_id: scoping.project_id,
753 received: safe_timestamp(received_at),
754 retention_days: replay.retention_days,
755 payload: &replay.recording,
756 replay_event: replay.event.as_deref(),
757 replay_video: replay.video.as_deref(),
758 relay_snuba_publish_disabled: true,
761 });
762 self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
763 });
764 }
765
766 fn handle_store_attachment(&self, message: Managed<StoreAttachment>) {
767 let scoping = message.scoping();
768 let _ = message.try_accept(|attachment| {
769 let result = self.produce_attachment(
770 attachment.event_id,
771 scoping.project_id,
772 &attachment.attachment,
773 true,
775 );
776 debug_assert!(!matches!(result, Ok(Some(_))));
779 result
780 });
781 }
782
783 fn create_metric_message<'a>(
784 &self,
785 organization_id: OrganizationId,
786 project_id: ProjectId,
787 encoder: &'a mut BucketEncoder,
788 namespace: MetricNamespace,
789 view: &BucketView<'a>,
790 retention_days: u16,
791 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
792 let value = match view.value() {
793 BucketViewValue::Counter(c) => MetricValue::Counter(c),
794 BucketViewValue::Distribution(data) => MetricValue::Distribution(
795 encoder
796 .encode_distribution(namespace, data)
797 .map_err(StoreError::EncodingFailed)?,
798 ),
799 BucketViewValue::Set(data) => MetricValue::Set(
800 encoder
801 .encode_set(namespace, data)
802 .map_err(StoreError::EncodingFailed)?,
803 ),
804 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
805 };
806
807 Ok(MetricKafkaMessage {
808 org_id: organization_id,
809 project_id,
810 name: view.name(),
811 value,
812 timestamp: view.timestamp(),
813 tags: view.tags(),
814 retention_days,
815 received_at: view.metadata().received_at,
816 })
817 }
818
819 fn produce(
820 &self,
821 topic: KafkaTopic,
822 message: KafkaMessage,
824 ) -> Result<(), StoreError> {
825 relay_log::trace!("Sending kafka message of type {}", message.variant());
826
827 let topic_name = self
828 .producer
829 .client
830 .send_message(topic, &message)
831 .inspect_err(|err| {
832 relay_log::error!(
833 error = err as &dyn Error,
834 tags.topic = ?topic,
835 tags.message = message.variant(),
836 "failed to produce to Kafka"
837 )
838 })?;
839
840 match &message {
841 KafkaMessage::Metric {
842 message: metric, ..
843 } => {
844 metric!(
845 counter(RelayCounters::ProcessingMessageProduced) += 1,
846 event_type = message.variant(),
847 topic = topic_name,
848 metric_type = metric.value.variant(),
849 metric_encoding = metric.value.encoding().unwrap_or(""),
850 );
851 }
852 KafkaMessage::ReplayRecordingNotChunked(replay) => {
853 let has_video = replay.replay_video.is_some();
854
855 metric!(
856 counter(RelayCounters::ProcessingMessageProduced) += 1,
857 event_type = message.variant(),
858 topic = topic_name,
859 has_video = bool_to_str(has_video),
860 );
861 }
862 message => {
863 metric!(
864 counter(RelayCounters::ProcessingMessageProduced) += 1,
865 event_type = message.variant(),
866 topic = topic_name,
867 );
868 }
869 }
870
871 Ok(())
872 }
873
874 fn produce_attachment(
886 &self,
887 event_id: EventId,
888 project_id: ProjectId,
889 item: &Item,
890 send_individual_attachments: bool,
891 ) -> Result<Option<ChunkedAttachment>, StoreError> {
892 let id = Uuid::new_v4().to_string();
893
894 let payload = item.payload();
895 let size = item.len();
896 let max_chunk_size = self.config.attachment_chunk_size();
897
898 let payload = if size == 0 {
899 AttachmentPayload::Chunked(0)
900 } else if let Some(stored_key) = item.stored_key() {
901 AttachmentPayload::Stored(stored_key.into())
902 } else if send_individual_attachments && size < max_chunk_size {
903 AttachmentPayload::Inline(payload)
907 } else {
908 let mut chunk_index = 0;
909 let mut offset = 0;
910 while offset < size {
913 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
914 let chunk_message = AttachmentChunkKafkaMessage {
915 payload: payload.slice(offset..offset + chunk_size),
916 event_id,
917 project_id,
918 id: id.clone(),
919 chunk_index,
920 };
921
922 self.produce(
923 KafkaTopic::Attachments,
924 KafkaMessage::AttachmentChunk(chunk_message),
925 )?;
926 offset += chunk_size;
927 chunk_index += 1;
928 }
929
930 AttachmentPayload::Chunked(chunk_index)
933 };
934
935 let attachment = ChunkedAttachment {
936 id,
937 name: match item.filename() {
938 Some(name) => name.to_owned(),
939 None => UNNAMED_ATTACHMENT.to_owned(),
940 },
941 rate_limited: item.rate_limited(),
942 content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
943 attachment_type: item.attachment_type().unwrap_or_default(),
944 size,
945 payload,
946 };
947
948 if send_individual_attachments {
949 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
950 event_id,
951 project_id,
952 attachment,
953 });
954 self.produce(KafkaTopic::Attachments, message)?;
955 Ok(None)
956 } else {
957 Ok(Some(attachment))
958 }
959 }
960
961 fn produce_user_report(
962 &self,
963 event_id: EventId,
964 project_id: ProjectId,
965 received_at: DateTime<Utc>,
966 item: &Item,
967 ) -> Result<(), StoreError> {
968 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
969 project_id,
970 event_id,
971 start_time: safe_timestamp(received_at),
972 payload: item.payload(),
973 });
974
975 self.produce(KafkaTopic::Attachments, message)
976 }
977
978 fn produce_user_report_v2(
979 &self,
980 event_id: EventId,
981 project_id: ProjectId,
982 received_at: DateTime<Utc>,
983 item: &Item,
984 remote_addr: Option<String>,
985 ) -> Result<(), StoreError> {
986 let message = KafkaMessage::Event(EventKafkaMessage {
987 project_id,
988 event_id,
989 payload: item.payload(),
990 start_time: safe_timestamp(received_at),
991 remote_addr,
992 attachments: vec![],
993 });
994 self.produce(KafkaTopic::Feedback, message)
995 }
996
997 fn send_metric_message(
998 &self,
999 namespace: MetricNamespace,
1000 message: MetricKafkaMessage,
1001 ) -> Result<(), StoreError> {
1002 let topic = match namespace {
1003 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1004 MetricNamespace::Unsupported => {
1005 relay_log::with_scope(
1006 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
1007 || relay_log::error!("store service dropping unknown metric usecase"),
1008 );
1009 return Ok(());
1010 }
1011 _ => KafkaTopic::MetricsGeneric,
1012 };
1013
1014 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1015 self.produce(topic, KafkaMessage::Metric { headers, message })?;
1016 Ok(())
1017 }
1018
1019 fn produce_profile(
1020 &self,
1021 organization_id: OrganizationId,
1022 project_id: ProjectId,
1023 key_id: Option<u64>,
1024 received_at: DateTime<Utc>,
1025 retention_days: u16,
1026 item: &Item,
1027 ) -> Result<(), StoreError> {
1028 let message = ProfileKafkaMessage {
1029 organization_id,
1030 project_id,
1031 key_id,
1032 received: safe_timestamp(received_at),
1033 retention_days,
1034 headers: BTreeMap::from([
1035 (
1036 "sampled".to_owned(),
1037 if item.sampled() { "true" } else { "false" }.to_owned(),
1038 ),
1039 ("project_id".to_owned(), project_id.to_string()),
1040 ]),
1041 payload: item.payload(),
1042 };
1043 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1044 Ok(())
1045 }
1046
1047 fn produce_check_in(
1048 &self,
1049 project_id: ProjectId,
1050 received_at: DateTime<Utc>,
1051 client: Option<&str>,
1052 retention_days: u16,
1053 item: &Item,
1054 ) -> Result<(), StoreError> {
1055 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1056 message_type: CheckInMessageType::CheckIn,
1057 project_id,
1058 retention_days,
1059 start_time: safe_timestamp(received_at),
1060 sdk: client.map(str::to_owned),
1061 payload: item.payload(),
1062 routing_key_hint: item.routing_hint(),
1063 });
1064
1065 self.produce(KafkaTopic::Monitors, message)?;
1066
1067 Ok(())
1068 }
1069
1070 fn produce_span(
1071 &self,
1072 scoping: Scoping,
1073 received_at: DateTime<Utc>,
1074 event_id: Option<EventId>,
1075 retention_days: u16,
1076 downsampled_retention_days: u16,
1077 item: &Item,
1078 ) -> Result<(), StoreError> {
1079 debug_assert_eq!(item.ty(), &ItemType::Span);
1080 debug_assert_eq!(item.content_type(), Some(ContentType::Json));
1081
1082 let Scoping {
1083 organization_id,
1084 project_id,
1085 project_key: _,
1086 key_id,
1087 } = scoping;
1088
1089 let relay_emits_accepted_outcome = !utils::is_rolled_out(
1090 scoping.organization_id.value(),
1091 self.global_config
1092 .current()
1093 .options
1094 .eap_span_outcomes_rollout_rate,
1095 )
1096 .is_keep();
1097
1098 let payload = item.payload();
1099 let message = SpanKafkaMessageRaw {
1100 meta: SpanMeta {
1101 organization_id,
1102 project_id,
1103 key_id,
1104 event_id,
1105 retention_days,
1106 downsampled_retention_days,
1107 received: datetime_to_timestamp(received_at),
1108 accepted_outcome_emitted: relay_emits_accepted_outcome,
1109 },
1110 span: serde_json::from_slice(&payload)
1111 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1112 };
1113
1114 debug_assert!(message.span.contains_key("attributes"));
1116 relay_statsd::metric!(
1117 counter(RelayCounters::SpanV2Produced) += 1,
1118 via = "envelope"
1119 );
1120
1121 self.produce(
1122 KafkaTopic::Spans,
1123 KafkaMessage::SpanRaw {
1124 routing_key: item.routing_hint(),
1125 headers: BTreeMap::from([(
1126 "project_id".to_owned(),
1127 scoping.project_id.to_string(),
1128 )]),
1129 message,
1130 },
1131 )?;
1132
1133 if relay_emits_accepted_outcome {
1134 self.outcome_aggregator.send(TrackOutcome {
1137 category: DataCategory::SpanIndexed,
1138 event_id: None,
1139 outcome: Outcome::Accepted,
1140 quantity: 1,
1141 remote_addr: None,
1142 scoping,
1143 timestamp: received_at,
1144 });
1145 }
1146
1147 Ok(())
1148 }
1149}
1150
1151impl Service for StoreService {
1152 type Interface = Store;
1153
1154 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1155 let this = Arc::new(self);
1156
1157 relay_log::info!("store forwarder started");
1158
1159 while let Some(message) = rx.recv().await {
1160 let service = Arc::clone(&this);
1161 this.pool
1164 .spawn_async(async move { service.handle_message(message) }.boxed())
1165 .await;
1166 }
1167
1168 relay_log::info!("store forwarder stopped");
1169 }
1170}
1171
1172#[derive(Debug, Serialize)]
1174enum AttachmentPayload {
1175 #[serde(rename = "chunks")]
1180 Chunked(usize),
1181
1182 #[serde(rename = "data")]
1184 Inline(Bytes),
1185
1186 #[serde(rename = "stored_id")]
1188 Stored(String),
1189}
1190
1191#[derive(Debug, Serialize)]
1193struct ChunkedAttachment {
1194 id: String,
1198
1199 name: String,
1201
1202 rate_limited: bool,
1209
1210 #[serde(skip_serializing_if = "Option::is_none")]
1212 content_type: Option<String>,
1213
1214 #[serde(serialize_with = "serialize_attachment_type")]
1216 attachment_type: AttachmentType,
1217
1218 size: usize,
1220
1221 #[serde(flatten)]
1223 payload: AttachmentPayload,
1224}
1225
1226fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1232where
1233 S: serde::Serializer,
1234 T: serde::Serialize,
1235{
1236 serde_json::to_value(t)
1237 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1238 .serialize(serializer)
1239}
1240
1241#[derive(Debug, Serialize)]
1243struct EventKafkaMessage {
1244 payload: Bytes,
1246 start_time: u64,
1248 event_id: EventId,
1250 project_id: ProjectId,
1252 remote_addr: Option<String>,
1254 attachments: Vec<ChunkedAttachment>,
1256}
1257
1258#[derive(Debug, Serialize)]
1260struct AttachmentChunkKafkaMessage {
1261 payload: Bytes,
1263 event_id: EventId,
1265 project_id: ProjectId,
1267 id: String,
1271 chunk_index: usize,
1273}
1274
1275#[derive(Debug, Serialize)]
1280struct AttachmentKafkaMessage {
1281 event_id: EventId,
1283 project_id: ProjectId,
1285 attachment: ChunkedAttachment,
1287}
1288
1289#[derive(Debug, Serialize)]
1290struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1291 replay_id: EventId,
1292 key_id: Option<u64>,
1293 org_id: OrganizationId,
1294 project_id: ProjectId,
1295 received: u64,
1296 retention_days: u16,
1297 #[serde(with = "serde_bytes")]
1298 payload: &'a [u8],
1299 #[serde(with = "serde_bytes")]
1300 replay_event: Option<&'a [u8]>,
1301 #[serde(with = "serde_bytes")]
1302 replay_video: Option<&'a [u8]>,
1303 relay_snuba_publish_disabled: bool,
1304}
1305
1306#[derive(Debug, Serialize)]
1310struct UserReportKafkaMessage {
1311 project_id: ProjectId,
1313 start_time: u64,
1314 payload: Bytes,
1315
1316 #[serde(skip)]
1318 event_id: EventId,
1319}
1320
1321#[derive(Clone, Debug, Serialize)]
1322struct MetricKafkaMessage<'a> {
1323 org_id: OrganizationId,
1324 project_id: ProjectId,
1325 name: &'a MetricName,
1326 #[serde(flatten)]
1327 value: MetricValue<'a>,
1328 timestamp: UnixTimestamp,
1329 tags: &'a BTreeMap<String, String>,
1330 retention_days: u16,
1331 #[serde(skip_serializing_if = "Option::is_none")]
1332 received_at: Option<UnixTimestamp>,
1333}
1334
1335#[derive(Clone, Debug, Serialize)]
1336#[serde(tag = "type", content = "value")]
1337enum MetricValue<'a> {
1338 #[serde(rename = "c")]
1339 Counter(FiniteF64),
1340 #[serde(rename = "d")]
1341 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1342 #[serde(rename = "s")]
1343 Set(ArrayEncoding<'a, SetView<'a>>),
1344 #[serde(rename = "g")]
1345 Gauge(GaugeValue),
1346}
1347
1348impl MetricValue<'_> {
1349 fn variant(&self) -> &'static str {
1350 match self {
1351 Self::Counter(_) => "counter",
1352 Self::Distribution(_) => "distribution",
1353 Self::Set(_) => "set",
1354 Self::Gauge(_) => "gauge",
1355 }
1356 }
1357
1358 fn encoding(&self) -> Option<&'static str> {
1359 match self {
1360 Self::Distribution(ae) => Some(ae.name()),
1361 Self::Set(ae) => Some(ae.name()),
1362 _ => None,
1363 }
1364 }
1365}
1366
1367#[derive(Clone, Debug, Serialize)]
1368struct ProfileKafkaMessage {
1369 organization_id: OrganizationId,
1370 project_id: ProjectId,
1371 key_id: Option<u64>,
1372 received: u64,
1373 retention_days: u16,
1374 #[serde(skip)]
1375 headers: BTreeMap<String, String>,
1376 payload: Bytes,
1377}
1378
1379#[allow(dead_code)]
1385#[derive(Debug, Serialize)]
1386#[serde(rename_all = "snake_case")]
1387enum CheckInMessageType {
1388 ClockPulse,
1389 CheckIn,
1390}
1391
1392#[derive(Debug, Serialize)]
1393struct CheckInKafkaMessage {
1394 #[serde(skip)]
1395 routing_key_hint: Option<Uuid>,
1396
1397 message_type: CheckInMessageType,
1399 payload: Bytes,
1401 start_time: u64,
1403 sdk: Option<String>,
1405 project_id: ProjectId,
1407 retention_days: u16,
1409}
1410
1411#[derive(Debug, Serialize)]
1412struct SpanKafkaMessageRaw<'a> {
1413 #[serde(flatten)]
1414 meta: SpanMeta,
1415 #[serde(flatten)]
1416 span: BTreeMap<&'a str, &'a RawValue>,
1417}
1418
1419#[derive(Debug, Serialize)]
1420struct SpanKafkaMessage<'a> {
1421 #[serde(flatten)]
1422 meta: SpanMeta,
1423 #[serde(flatten)]
1424 span: SerializableAnnotated<'a, SpanV2>,
1425}
1426
1427#[derive(Debug, Serialize)]
1428struct SpanMeta {
1429 organization_id: OrganizationId,
1430 project_id: ProjectId,
1431 #[serde(skip_serializing_if = "Option::is_none")]
1433 key_id: Option<u64>,
1434 #[serde(skip_serializing_if = "Option::is_none")]
1435 event_id: Option<EventId>,
1436 received: f64,
1438 retention_days: u16,
1440 downsampled_retention_days: u16,
1442 accepted_outcome_emitted: bool,
1444}
1445
1446#[derive(Clone, Debug, Serialize)]
1447struct ProfileChunkKafkaMessage {
1448 organization_id: OrganizationId,
1449 project_id: ProjectId,
1450 received: u64,
1451 retention_days: u16,
1452 #[serde(skip)]
1453 headers: BTreeMap<String, String>,
1454 payload: Bytes,
1455}
1456
1457#[derive(Debug, Serialize)]
1459#[serde(tag = "type", rename_all = "snake_case")]
1460#[allow(clippy::large_enum_variant)]
1461enum KafkaMessage<'a> {
1462 Event(EventKafkaMessage),
1463 UserReport(UserReportKafkaMessage),
1464 Metric {
1465 #[serde(skip)]
1466 headers: BTreeMap<String, String>,
1467 #[serde(flatten)]
1468 message: MetricKafkaMessage<'a>,
1469 },
1470 CheckIn(CheckInKafkaMessage),
1471 Item {
1472 #[serde(skip)]
1473 headers: BTreeMap<String, String>,
1474 #[serde(skip)]
1475 item_type: TraceItemType,
1476 #[serde(skip)]
1477 message: TraceItem,
1478 },
1479 SpanRaw {
1480 #[serde(skip)]
1481 routing_key: Option<Uuid>,
1482 #[serde(skip)]
1483 headers: BTreeMap<String, String>,
1484 #[serde(flatten)]
1485 message: SpanKafkaMessageRaw<'a>,
1486 },
1487 SpanV2 {
1488 #[serde(skip)]
1489 routing_key: Option<Uuid>,
1490 #[serde(skip)]
1491 headers: BTreeMap<String, String>,
1492 #[serde(flatten)]
1493 message: SpanKafkaMessage<'a>,
1494 },
1495
1496 Attachment(AttachmentKafkaMessage),
1497 AttachmentChunk(AttachmentChunkKafkaMessage),
1498
1499 Profile(ProfileKafkaMessage),
1500 ProfileChunk(ProfileChunkKafkaMessage),
1501
1502 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1503}
1504
1505impl KafkaMessage<'_> {
1506 fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1508 let item_type = item.item_type();
1509 KafkaMessage::Item {
1510 headers: BTreeMap::from([
1511 ("project_id".to_owned(), scoping.project_id.to_string()),
1512 ("item_type".to_owned(), (item_type as i32).to_string()),
1513 ]),
1514 message: item,
1515 item_type,
1516 }
1517 }
1518}
1519
1520impl Message for KafkaMessage<'_> {
1521 fn variant(&self) -> &'static str {
1522 match self {
1523 KafkaMessage::Event(_) => "event",
1524 KafkaMessage::UserReport(_) => "user_report",
1525 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1526 MetricNamespace::Sessions => "metric_sessions",
1527 MetricNamespace::Transactions => "metric_transactions",
1528 MetricNamespace::Spans => "metric_spans",
1529 MetricNamespace::Custom => "metric_custom",
1530 MetricNamespace::Unsupported => "metric_unsupported",
1531 },
1532 KafkaMessage::CheckIn(_) => "check_in",
1533 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1534 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1535
1536 KafkaMessage::Attachment(_) => "attachment",
1537 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1538
1539 KafkaMessage::Profile(_) => "profile",
1540 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1541
1542 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1543 }
1544 }
1545
1546 fn key(&self) -> Option<relay_kafka::Key> {
1548 match self {
1549 Self::Event(message) => Some(message.event_id.0),
1550 Self::UserReport(message) => Some(message.event_id.0),
1551 Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1552
1553 Self::CheckIn(message) => message.routing_key_hint,
1558
1559 Self::Attachment(message) => Some(message.event_id.0),
1560 Self::AttachmentChunk(message) => Some(message.event_id.0),
1561
1562 Self::Metric { .. }
1564 | Self::Item { .. }
1565 | Self::Profile(_)
1566 | Self::ProfileChunk(_)
1567 | Self::ReplayRecordingNotChunked(_) => None,
1568 }
1569 .filter(|uuid| !uuid.is_nil())
1570 .map(|uuid| uuid.as_u128())
1571 }
1572
1573 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1574 match &self {
1575 KafkaMessage::Metric { headers, .. }
1576 | KafkaMessage::SpanRaw { headers, .. }
1577 | KafkaMessage::SpanV2 { headers, .. }
1578 | KafkaMessage::Item { headers, .. }
1579 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1580 | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1581
1582 KafkaMessage::Event(_)
1583 | KafkaMessage::UserReport(_)
1584 | KafkaMessage::CheckIn(_)
1585 | KafkaMessage::Attachment(_)
1586 | KafkaMessage::AttachmentChunk(_)
1587 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1588 }
1589 }
1590
1591 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1592 match self {
1593 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1594 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1595 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1596 KafkaMessage::Item { message, .. } => {
1597 let mut payload = Vec::new();
1598 match message.encode(&mut payload) {
1599 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1600 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1601 }
1602 }
1603 KafkaMessage::Event(_)
1604 | KafkaMessage::UserReport(_)
1605 | KafkaMessage::CheckIn(_)
1606 | KafkaMessage::Attachment(_)
1607 | KafkaMessage::AttachmentChunk(_)
1608 | KafkaMessage::Profile(_)
1609 | KafkaMessage::ProfileChunk(_)
1610 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1611 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1612 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1613 },
1614 }
1615 }
1616}
1617
1618fn serialize_as_json<T: serde::Serialize>(
1619 value: &T,
1620) -> Result<SerializationOutput<'_>, ClientError> {
1621 match serde_json::to_vec(value) {
1622 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1623 Err(err) => Err(ClientError::InvalidJson(err)),
1624 }
1625}
1626
1627fn is_slow_item(item: &Item) -> bool {
1631 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1632}
1633
1634fn bool_to_str(value: bool) -> &'static str {
1635 if value { "true" } else { "false" }
1636}
1637
1638fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1642 let ts = timestamp.timestamp();
1643 if ts >= 0 {
1644 return ts as u64;
1645 }
1646
1647 Utc::now().timestamp() as u64
1649}
1650
1651#[cfg(test)]
1652mod tests {
1653
1654 use super::*;
1655
1656 #[test]
1657 fn disallow_outcomes() {
1658 let config = Config::default();
1659 let producer = Producer::create(&config).unwrap();
1660
1661 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1662 let res = producer
1663 .client
1664 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1665
1666 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1667 }
1668 }
1669}