1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
37use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::services::upload::Upload;
44use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
45use crate::utils::{self, FormDataIter};
46
47const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
49
50#[derive(Debug, thiserror::Error)]
51pub enum StoreError {
52 #[error("failed to send the message to kafka: {0}")]
53 SendFailed(#[from] ClientError),
54 #[error("failed to encode data: {0}")]
55 EncodingFailed(std::io::Error),
56 #[error("failed to store event because event id was missing")]
57 NoEventId,
58}
59
60impl OutcomeError for StoreError {
61 type Error = Self;
62
63 fn consume(self) -> (Option<Outcome>, Self::Error) {
64 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
65 }
66}
67
68struct Producer {
69 client: KafkaClient,
70}
71
72impl Producer {
73 pub fn create(config: &Config) -> anyhow::Result<Self> {
74 let mut client_builder = KafkaClient::builder();
75
76 for topic in KafkaTopic::iter().filter(|t| {
77 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
80 }) {
81 let kafka_configs = config.kafka_configs(*topic)?;
82 client_builder = client_builder
83 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
84 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
85 }
86
87 Ok(Self {
88 client: client_builder.build(),
89 })
90 }
91}
92
93#[derive(Debug)]
95pub struct StoreEnvelope {
96 pub envelope: TypedEnvelope<Processed>,
97}
98
99#[derive(Clone, Debug)]
101pub struct StoreMetrics {
102 pub buckets: Vec<Bucket>,
103 pub scoping: Scoping,
104 pub retention: u16,
105}
106
107#[derive(Debug)]
109pub struct StoreTraceItem {
110 pub trace_item: TraceItem,
112 pub quantities: Quantities,
117}
118
119impl Counted for StoreTraceItem {
120 fn quantities(&self) -> Quantities {
121 self.quantities.clone()
122 }
123}
124
125#[derive(Debug)]
127pub struct StoreSpanV2 {
128 pub routing_key: Option<Uuid>,
130 pub retention_days: u16,
132 pub downsampled_retention_days: u16,
134 pub item: SpanV2,
136}
137
138impl Counted for StoreSpanV2 {
139 fn quantities(&self) -> Quantities {
140 self.item.quantities()
141 }
142}
143
144pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
146
147#[derive(Debug)]
149pub enum Store {
150 Envelope(StoreEnvelope),
158 Metrics(StoreMetrics),
160 TraceItem(Managed<StoreTraceItem>),
162 Span(Managed<Box<StoreSpanV2>>),
164}
165
166impl Store {
167 fn variant(&self) -> &'static str {
169 match self {
170 Store::Envelope(_) => "envelope",
171 Store::Metrics(_) => "metrics",
172 Store::TraceItem(_) => "log",
173 Store::Span(_) => "span",
174 }
175 }
176}
177
178impl Interface for Store {}
179
180impl FromMessage<StoreEnvelope> for Store {
181 type Response = NoResponse;
182
183 fn from_message(message: StoreEnvelope, _: ()) -> Self {
184 Self::Envelope(message)
185 }
186}
187
188impl FromMessage<StoreMetrics> for Store {
189 type Response = NoResponse;
190
191 fn from_message(message: StoreMetrics, _: ()) -> Self {
192 Self::Metrics(message)
193 }
194}
195
196impl FromMessage<Managed<StoreTraceItem>> for Store {
197 type Response = NoResponse;
198
199 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
200 Self::TraceItem(message)
201 }
202}
203
204impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
205 type Response = NoResponse;
206
207 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
208 Self::Span(message)
209 }
210}
211
212pub struct StoreService {
214 pool: StoreServicePool,
215 config: Arc<Config>,
216 global_config: GlobalConfigHandle,
217 outcome_aggregator: Addr<TrackOutcome>,
218 metric_outcomes: MetricOutcomes,
219 #[expect(unused)]
220 upload: Addr<Upload>,
221 producer: Producer,
222}
223
224impl StoreService {
225 pub fn create(
226 pool: StoreServicePool,
227 config: Arc<Config>,
228 global_config: GlobalConfigHandle,
229 outcome_aggregator: Addr<TrackOutcome>,
230 metric_outcomes: MetricOutcomes,
231 upload: Addr<Upload>,
232 ) -> anyhow::Result<Self> {
233 let producer = Producer::create(&config)?;
234 Ok(Self {
235 pool,
236 config,
237 global_config,
238 outcome_aggregator,
239 metric_outcomes,
240 upload,
241 producer,
242 })
243 }
244
245 fn handle_message(&self, message: Store) {
246 let ty = message.variant();
247 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
248 match message {
249 Store::Envelope(message) => self.handle_store_envelope(message),
250 Store::Metrics(message) => self.handle_store_metrics(message),
251 Store::TraceItem(message) => self.handle_store_trace_item(message),
252 Store::Span(message) => self.handle_store_span(message),
253 }
254 })
255 }
256
257 fn handle_store_envelope(&self, message: StoreEnvelope) {
258 let StoreEnvelope {
259 envelope: mut managed,
260 } = message;
261
262 let scoping = managed.scoping();
263 let envelope = managed.take_envelope();
264
265 match self.store_envelope(envelope, managed.received_at(), scoping) {
266 Ok(()) => managed.accept(),
267 Err(error) => {
268 managed.reject(Outcome::Invalid(DiscardReason::Internal));
269 relay_log::error!(
270 error = &error as &dyn Error,
271 tags.project_key = %scoping.project_key,
272 "failed to store envelope"
273 );
274 }
275 }
276 }
277
278 fn store_envelope(
279 &self,
280 mut envelope: Box<Envelope>,
281 received_at: DateTime<Utc>,
282 scoping: Scoping,
283 ) -> Result<(), StoreError> {
284 let retention = envelope.retention();
285 let downsampled_retention = envelope.downsampled_retention();
286
287 let event_id = envelope.event_id();
288 let event_item = envelope.as_mut().take_item_by(|item| {
289 matches!(
290 item.ty(),
291 ItemType::Event | ItemType::Transaction | ItemType::Security
292 )
293 });
294 let event_type = event_item.as_ref().map(|item| item.ty());
295
296 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
300 KafkaTopic::Transactions
301 } else if envelope.get_item_by(is_slow_item).is_some() {
302 KafkaTopic::Attachments
303 } else {
304 KafkaTopic::Events
305 };
306
307 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
308
309 let mut attachments = Vec::new();
310 let mut replay_event = None;
311 let mut replay_recording = None;
312
313 let replay_relay_snuba_publish_disabled = utils::sample(
315 self.global_config
316 .current()
317 .options
318 .replay_relay_snuba_publish_disabled_sample_rate,
319 )
320 .is_keep();
321
322 for item in envelope.items() {
323 let content_type = item.content_type();
324 match item.ty() {
325 ItemType::Attachment => {
326 if let Some(attachment) = self.produce_attachment(
327 event_id.ok_or(StoreError::NoEventId)?,
328 scoping.project_id,
329 item,
330 send_individual_attachments,
331 )? {
332 attachments.push(attachment);
333 }
334 }
335 ItemType::UserReport => {
336 debug_assert!(event_topic == KafkaTopic::Attachments);
337 self.produce_user_report(
338 event_id.ok_or(StoreError::NoEventId)?,
339 scoping.project_id,
340 received_at,
341 item,
342 )?;
343 }
344 ItemType::UserReportV2 => {
345 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
346 self.produce_user_report_v2(
347 event_id.ok_or(StoreError::NoEventId)?,
348 scoping.project_id,
349 received_at,
350 item,
351 remote_addr,
352 )?;
353 }
354 ItemType::Profile => self.produce_profile(
355 scoping.organization_id,
356 scoping.project_id,
357 scoping.key_id,
358 received_at,
359 retention,
360 item,
361 )?,
362 ItemType::ReplayVideo => {
363 self.produce_replay_video(
364 event_id,
365 scoping,
366 item.payload(),
367 received_at,
368 retention,
369 replay_relay_snuba_publish_disabled,
370 )?;
371 }
372 ItemType::ReplayRecording => {
373 replay_recording = Some(item);
374 }
375 ItemType::ReplayEvent => {
376 replay_event = Some(item);
377 self.produce_replay_event(
378 event_id.ok_or(StoreError::NoEventId)?,
379 scoping.project_id,
380 received_at,
381 retention,
382 &item.payload(),
383 replay_relay_snuba_publish_disabled,
384 )?;
385 }
386 ItemType::CheckIn => {
387 let client = envelope.meta().client();
388 self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
389 }
390 ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
391 scoping,
392 received_at,
393 event_id,
394 retention,
395 downsampled_retention,
396 item,
397 )?,
398 ty @ ItemType::Log => {
399 debug_assert!(
400 false,
401 "received {ty} through an envelope, \
402 this item must be submitted via a specific store message instead"
403 );
404 relay_log::error!(
405 tags.project_key = %scoping.project_key,
406 "StoreService received unsupported item type '{ty}' in envelope"
407 );
408 }
409 ItemType::ProfileChunk => self.produce_profile_chunk(
410 scoping.organization_id,
411 scoping.project_id,
412 received_at,
413 retention,
414 item,
415 )?,
416 other => {
417 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
418 let item_types = envelope
419 .items()
420 .map(|item| item.ty().as_str())
421 .collect::<Vec<_>>();
422 let attachment_types = envelope
423 .items()
424 .map(|item| {
425 item.attachment_type()
426 .map(|t| t.to_string())
427 .unwrap_or_default()
428 })
429 .collect::<Vec<_>>();
430
431 relay_log::with_scope(
432 |scope| {
433 scope.set_extra("item_types", item_types.into());
434 scope.set_extra("attachment_types", attachment_types.into());
435 if other == &ItemType::FormData {
436 let payload = item.payload();
437 let form_data_keys = FormDataIter::new(&payload)
438 .map(|entry| entry.key())
439 .collect::<Vec<_>>();
440 scope.set_extra("form_data_keys", form_data_keys.into());
441 }
442 },
443 || {
444 relay_log::error!(
445 tags.project_key = %scoping.project_key,
446 tags.event_type = event_type.unwrap_or("none"),
447 "StoreService received unexpected item type: {other}"
448 )
449 },
450 )
451 }
452 }
453 }
454
455 if let Some(recording) = replay_recording {
456 let replay_event = replay_event.map(|rv| rv.payload());
462 self.produce_replay_recording(
463 event_id,
464 scoping,
465 &recording.payload(),
466 replay_event.as_deref(),
467 None,
468 received_at,
469 retention,
470 replay_relay_snuba_publish_disabled,
471 )?;
472 }
473
474 if let Some(event_item) = event_item {
475 let event_id = event_id.ok_or(StoreError::NoEventId)?;
476 let project_id = scoping.project_id;
477 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
478
479 self.produce(
480 event_topic,
481 KafkaMessage::Event(EventKafkaMessage {
482 payload: event_item.payload(),
483 start_time: safe_timestamp(received_at),
484 event_id,
485 project_id,
486 remote_addr,
487 attachments,
488 }),
489 )?;
490 } else {
491 debug_assert!(attachments.is_empty());
492 }
493
494 Ok(())
495 }
496
497 fn handle_store_metrics(&self, message: StoreMetrics) {
498 let StoreMetrics {
499 buckets,
500 scoping,
501 retention,
502 } = message;
503
504 let batch_size = self.config.metrics_max_batch_size_bytes();
505 let mut error = None;
506
507 let global_config = self.global_config.current();
508 let mut encoder = BucketEncoder::new(&global_config);
509
510 let now = UnixTimestamp::now();
511 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
512
513 for mut bucket in buckets {
514 let namespace = encoder.prepare(&mut bucket);
515
516 if let Some(received_at) = bucket.metadata.received_at {
517 let delay = now.as_secs().saturating_sub(received_at.as_secs());
518 let (total, count, max) = delay_stats.get_mut(namespace);
519 *total += delay;
520 *count += 1;
521 *max = (*max).max(delay);
522 }
523
524 for view in BucketsView::new(std::slice::from_ref(&bucket))
528 .by_size(batch_size)
529 .flatten()
530 {
531 let message = self.create_metric_message(
532 scoping.organization_id,
533 scoping.project_id,
534 &mut encoder,
535 namespace,
536 &view,
537 retention,
538 );
539
540 let result =
541 message.and_then(|message| self.send_metric_message(namespace, message));
542
543 let outcome = match result {
544 Ok(()) => Outcome::Accepted,
545 Err(e) => {
546 error.get_or_insert(e);
547 Outcome::Invalid(DiscardReason::Internal)
548 }
549 };
550
551 self.metric_outcomes.track(scoping, &[view], outcome);
552 }
553 }
554
555 if let Some(error) = error {
556 relay_log::error!(
557 error = &error as &dyn std::error::Error,
558 "failed to produce metric buckets: {error}"
559 );
560 }
561
562 for (namespace, (total, count, max)) in delay_stats {
563 if count == 0 {
564 continue;
565 }
566 metric!(
567 counter(RelayCounters::MetricDelaySum) += total,
568 namespace = namespace.as_str()
569 );
570 metric!(
571 counter(RelayCounters::MetricDelayCount) += count,
572 namespace = namespace.as_str()
573 );
574 metric!(
575 gauge(RelayGauges::MetricDelayMax) = max,
576 namespace = namespace.as_str()
577 );
578 }
579 }
580
581 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
582 let scoping = message.scoping();
583 let received_at = message.received_at();
584
585 let quantities = message.try_accept(|item| {
586 let item_type = item.trace_item.item_type();
587 let message = KafkaMessage::Item {
588 headers: BTreeMap::from([
589 ("project_id".to_owned(), scoping.project_id.to_string()),
590 ("item_type".to_owned(), (item_type as i32).to_string()),
591 ]),
592 message: item.trace_item,
593 item_type,
594 };
595
596 self.produce(KafkaTopic::Items, message)
597 .map(|()| item.quantities)
598 });
599
600 if let Ok(quantities) = quantities {
605 for (category, quantity) in quantities {
606 self.outcome_aggregator.send(TrackOutcome {
607 category,
608 event_id: None,
609 outcome: Outcome::Accepted,
610 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
611 remote_addr: None,
612 scoping,
613 timestamp: received_at,
614 });
615 }
616 }
617 }
618
619 fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
620 let scoping = message.scoping();
621 let received_at = message.received_at();
622
623 let meta = SpanMeta {
624 organization_id: scoping.organization_id,
625 project_id: scoping.project_id,
626 key_id: scoping.key_id,
627 event_id: None,
628 retention_days: message.retention_days,
629 downsampled_retention_days: message.downsampled_retention_days,
630 received: datetime_to_timestamp(received_at),
631 };
632
633 let result = message.try_accept(|span| {
634 let item = Annotated::new(span.item);
635 let message = KafkaMessage::SpanV2 {
636 routing_key: span.routing_key,
637 headers: BTreeMap::from([(
638 "project_id".to_owned(),
639 scoping.project_id.to_string(),
640 )]),
641 message: SpanKafkaMessage {
642 meta,
643 span: SerializableAnnotated(&item),
644 },
645 };
646
647 self.produce(KafkaTopic::Spans, message)
648 });
649
650 match result {
651 Ok(()) => {
652 relay_statsd::metric!(
653 counter(RelayCounters::SpanV2Produced) += 1,
654 via = "processing"
655 );
656
657 self.outcome_aggregator.send(TrackOutcome {
660 category: DataCategory::SpanIndexed,
661 event_id: None,
662 outcome: Outcome::Accepted,
663 quantity: 1,
664 remote_addr: None,
665 scoping,
666 timestamp: received_at,
667 });
668 }
669 Err(error) => {
670 relay_log::error!(
671 error = &error as &dyn Error,
672 tags.project_key = %scoping.project_key,
673 "failed to store span"
674 );
675 }
676 }
677 }
678
679 fn create_metric_message<'a>(
680 &self,
681 organization_id: OrganizationId,
682 project_id: ProjectId,
683 encoder: &'a mut BucketEncoder,
684 namespace: MetricNamespace,
685 view: &BucketView<'a>,
686 retention_days: u16,
687 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
688 let value = match view.value() {
689 BucketViewValue::Counter(c) => MetricValue::Counter(c),
690 BucketViewValue::Distribution(data) => MetricValue::Distribution(
691 encoder
692 .encode_distribution(namespace, data)
693 .map_err(StoreError::EncodingFailed)?,
694 ),
695 BucketViewValue::Set(data) => MetricValue::Set(
696 encoder
697 .encode_set(namespace, data)
698 .map_err(StoreError::EncodingFailed)?,
699 ),
700 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
701 };
702
703 Ok(MetricKafkaMessage {
704 org_id: organization_id,
705 project_id,
706 name: view.name(),
707 value,
708 timestamp: view.timestamp(),
709 tags: view.tags(),
710 retention_days,
711 received_at: view.metadata().received_at,
712 })
713 }
714
715 fn produce(
716 &self,
717 topic: KafkaTopic,
718 message: KafkaMessage,
720 ) -> Result<(), StoreError> {
721 relay_log::trace!("Sending kafka message of type {}", message.variant());
722
723 let topic_name = self.producer.client.send_message(topic, &message)?;
724
725 match &message {
726 KafkaMessage::Metric {
727 message: metric, ..
728 } => {
729 metric!(
730 counter(RelayCounters::ProcessingMessageProduced) += 1,
731 event_type = message.variant(),
732 topic = topic_name,
733 metric_type = metric.value.variant(),
734 metric_encoding = metric.value.encoding().unwrap_or(""),
735 );
736 }
737 KafkaMessage::ReplayRecordingNotChunked(replay) => {
738 let has_video = replay.replay_video.is_some();
739
740 metric!(
741 counter(RelayCounters::ProcessingMessageProduced) += 1,
742 event_type = message.variant(),
743 topic = topic_name,
744 has_video = bool_to_str(has_video),
745 );
746 }
747 message => {
748 metric!(
749 counter(RelayCounters::ProcessingMessageProduced) += 1,
750 event_type = message.variant(),
751 topic = topic_name,
752 );
753 }
754 }
755
756 Ok(())
757 }
758
759 fn produce_attachment(
771 &self,
772 event_id: EventId,
773 project_id: ProjectId,
774 item: &Item,
775 send_individual_attachments: bool,
776 ) -> Result<Option<ChunkedAttachment>, StoreError> {
777 let id = Uuid::new_v4().to_string();
778
779 let payload = item.payload();
780 let size = item.len();
781 let max_chunk_size = self.config.attachment_chunk_size();
782
783 let payload = if size == 0 {
787 AttachmentPayload::Chunked(0)
788 } else if send_individual_attachments && size < max_chunk_size {
789 AttachmentPayload::Inline(payload)
790 } else {
791 let mut chunk_index = 0;
792 let mut offset = 0;
793 while offset < size {
796 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
797 let chunk_message = AttachmentChunkKafkaMessage {
798 payload: payload.slice(offset..offset + chunk_size),
799 event_id,
800 project_id,
801 id: id.clone(),
802 chunk_index,
803 };
804
805 self.produce(
806 KafkaTopic::Attachments,
807 KafkaMessage::AttachmentChunk(chunk_message),
808 )?;
809 offset += chunk_size;
810 chunk_index += 1;
811 }
812
813 AttachmentPayload::Chunked(chunk_index)
816 };
817
818 let attachment = ChunkedAttachment {
819 id,
820 name: match item.filename() {
821 Some(name) => name.to_owned(),
822 None => UNNAMED_ATTACHMENT.to_owned(),
823 },
824 rate_limited: item.rate_limited(),
825 content_type: item
826 .content_type()
827 .map(|content_type| content_type.as_str().to_owned()),
828 attachment_type: item.attachment_type().cloned().unwrap_or_default(),
829 size,
830 payload,
831 };
832
833 if send_individual_attachments {
834 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
835 event_id,
836 project_id,
837 attachment,
838 });
839 self.produce(KafkaTopic::Attachments, message)?;
840 Ok(None)
841 } else {
842 Ok(Some(attachment))
843 }
844 }
845
846 fn produce_user_report(
847 &self,
848 event_id: EventId,
849 project_id: ProjectId,
850 received_at: DateTime<Utc>,
851 item: &Item,
852 ) -> Result<(), StoreError> {
853 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
854 project_id,
855 event_id,
856 start_time: safe_timestamp(received_at),
857 payload: item.payload(),
858 });
859
860 self.produce(KafkaTopic::Attachments, message)
861 }
862
863 fn produce_user_report_v2(
864 &self,
865 event_id: EventId,
866 project_id: ProjectId,
867 received_at: DateTime<Utc>,
868 item: &Item,
869 remote_addr: Option<String>,
870 ) -> Result<(), StoreError> {
871 let message = KafkaMessage::Event(EventKafkaMessage {
872 project_id,
873 event_id,
874 payload: item.payload(),
875 start_time: safe_timestamp(received_at),
876 remote_addr,
877 attachments: vec![],
878 });
879 self.produce(KafkaTopic::Feedback, message)
880 }
881
882 fn send_metric_message(
883 &self,
884 namespace: MetricNamespace,
885 message: MetricKafkaMessage,
886 ) -> Result<(), StoreError> {
887 let topic = match namespace {
888 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
889 MetricNamespace::Unsupported => {
890 relay_log::with_scope(
891 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
892 || relay_log::error!("store service dropping unknown metric usecase"),
893 );
894 return Ok(());
895 }
896 _ => KafkaTopic::MetricsGeneric,
897 };
898 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
899
900 self.produce(topic, KafkaMessage::Metric { headers, message })?;
901 Ok(())
902 }
903
904 fn produce_profile(
905 &self,
906 organization_id: OrganizationId,
907 project_id: ProjectId,
908 key_id: Option<u64>,
909 received_at: DateTime<Utc>,
910 retention_days: u16,
911 item: &Item,
912 ) -> Result<(), StoreError> {
913 let message = ProfileKafkaMessage {
914 organization_id,
915 project_id,
916 key_id,
917 received: safe_timestamp(received_at),
918 retention_days,
919 headers: BTreeMap::from([(
920 "sampled".to_owned(),
921 if item.sampled() { "true" } else { "false" }.to_owned(),
922 )]),
923 payload: item.payload(),
924 };
925 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
926 Ok(())
927 }
928
929 fn produce_replay_event(
930 &self,
931 replay_id: EventId,
932 project_id: ProjectId,
933 received_at: DateTime<Utc>,
934 retention_days: u16,
935 payload: &[u8],
936 relay_snuba_publish_disabled: bool,
937 ) -> Result<(), StoreError> {
938 if relay_snuba_publish_disabled {
939 return Ok(());
940 }
941
942 let message = ReplayEventKafkaMessage {
943 replay_id,
944 project_id,
945 retention_days,
946 start_time: safe_timestamp(received_at),
947 payload,
948 };
949 self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
950 Ok(())
951 }
952
953 #[allow(clippy::too_many_arguments)]
954 fn produce_replay_recording(
955 &self,
956 event_id: Option<EventId>,
957 scoping: Scoping,
958 payload: &[u8],
959 replay_event: Option<&[u8]>,
960 replay_video: Option<&[u8]>,
961 received_at: DateTime<Utc>,
962 retention: u16,
963 relay_snuba_publish_disabled: bool,
964 ) -> Result<(), StoreError> {
965 let max_payload_size = self.config.max_replay_message_size();
967
968 let mut payload_size = 2000; payload_size += replay_event.as_ref().map_or(0, |b| b.len());
972 payload_size += replay_video.as_ref().map_or(0, |b| b.len());
973 payload_size += payload.len();
974
975 if payload_size >= max_payload_size {
977 relay_log::debug!("replay_recording over maximum size.");
978 self.outcome_aggregator.send(TrackOutcome {
979 category: DataCategory::Replay,
980 event_id,
981 outcome: Outcome::Invalid(DiscardReason::TooLarge(
982 DiscardItemType::ReplayRecording,
983 )),
984 quantity: 1,
985 remote_addr: None,
986 scoping,
987 timestamp: received_at,
988 });
989 return Ok(());
990 }
991
992 let message =
993 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
994 replay_id: event_id.ok_or(StoreError::NoEventId)?,
995 project_id: scoping.project_id,
996 key_id: scoping.key_id,
997 org_id: scoping.organization_id,
998 received: safe_timestamp(received_at),
999 retention_days: retention,
1000 payload,
1001 replay_event,
1002 replay_video,
1003 relay_snuba_publish_disabled,
1004 });
1005
1006 self.produce(KafkaTopic::ReplayRecordings, message)?;
1007
1008 Ok(())
1009 }
1010
1011 fn produce_replay_video(
1012 &self,
1013 event_id: Option<EventId>,
1014 scoping: Scoping,
1015 payload: Bytes,
1016 received_at: DateTime<Utc>,
1017 retention: u16,
1018 relay_snuba_publish_disabled: bool,
1019 ) -> Result<(), StoreError> {
1020 #[derive(Deserialize)]
1021 struct VideoEvent<'a> {
1022 replay_event: &'a [u8],
1023 replay_recording: &'a [u8],
1024 replay_video: &'a [u8],
1025 }
1026
1027 let Ok(VideoEvent {
1028 replay_video,
1029 replay_event,
1030 replay_recording,
1031 }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1032 else {
1033 self.outcome_aggregator.send(TrackOutcome {
1034 category: DataCategory::Replay,
1035 event_id,
1036 outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1037 quantity: 1,
1038 remote_addr: None,
1039 scoping,
1040 timestamp: received_at,
1041 });
1042 return Ok(());
1043 };
1044
1045 self.produce_replay_event(
1046 event_id.ok_or(StoreError::NoEventId)?,
1047 scoping.project_id,
1048 received_at,
1049 retention,
1050 replay_event,
1051 relay_snuba_publish_disabled,
1052 )?;
1053
1054 self.produce_replay_recording(
1055 event_id,
1056 scoping,
1057 replay_recording,
1058 Some(replay_event),
1059 Some(replay_video),
1060 received_at,
1061 retention,
1062 relay_snuba_publish_disabled,
1063 )
1064 }
1065
1066 fn produce_check_in(
1067 &self,
1068 project_id: ProjectId,
1069 received_at: DateTime<Utc>,
1070 client: Option<&str>,
1071 retention_days: u16,
1072 item: &Item,
1073 ) -> Result<(), StoreError> {
1074 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1075 message_type: CheckInMessageType::CheckIn,
1076 project_id,
1077 retention_days,
1078 start_time: safe_timestamp(received_at),
1079 sdk: client.map(str::to_owned),
1080 payload: item.payload(),
1081 routing_key_hint: item.routing_hint(),
1082 });
1083
1084 self.produce(KafkaTopic::Monitors, message)?;
1085
1086 Ok(())
1087 }
1088
1089 fn produce_span(
1090 &self,
1091 scoping: Scoping,
1092 received_at: DateTime<Utc>,
1093 event_id: Option<EventId>,
1094 retention_days: u16,
1095 downsampled_retention_days: u16,
1096 item: &Item,
1097 ) -> Result<(), StoreError> {
1098 debug_assert_eq!(item.ty(), &ItemType::Span);
1099 debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1100
1101 let Scoping {
1102 organization_id,
1103 project_id,
1104 project_key: _,
1105 key_id,
1106 } = scoping;
1107
1108 let payload = item.payload();
1109 let message = SpanKafkaMessageRaw {
1110 meta: SpanMeta {
1111 organization_id,
1112 project_id,
1113 key_id,
1114 event_id,
1115 retention_days,
1116 downsampled_retention_days,
1117 received: datetime_to_timestamp(received_at),
1118 },
1119 span: serde_json::from_slice(&payload)
1120 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1121 };
1122
1123 debug_assert!(message.span.contains_key("attributes"));
1125 relay_statsd::metric!(
1126 counter(RelayCounters::SpanV2Produced) += 1,
1127 via = "envelope"
1128 );
1129
1130 self.produce(
1131 KafkaTopic::Spans,
1132 KafkaMessage::SpanRaw {
1133 routing_key: item.routing_hint(),
1134 headers: BTreeMap::from([(
1135 "project_id".to_owned(),
1136 scoping.project_id.to_string(),
1137 )]),
1138 message,
1139 },
1140 )?;
1141
1142 self.outcome_aggregator.send(TrackOutcome {
1145 category: DataCategory::SpanIndexed,
1146 event_id: None,
1147 outcome: Outcome::Accepted,
1148 quantity: 1,
1149 remote_addr: None,
1150 scoping,
1151 timestamp: received_at,
1152 });
1153
1154 Ok(())
1155 }
1156
1157 fn produce_profile_chunk(
1158 &self,
1159 organization_id: OrganizationId,
1160 project_id: ProjectId,
1161 received_at: DateTime<Utc>,
1162 retention_days: u16,
1163 item: &Item,
1164 ) -> Result<(), StoreError> {
1165 let message = ProfileChunkKafkaMessage {
1166 organization_id,
1167 project_id,
1168 received: safe_timestamp(received_at),
1169 retention_days,
1170 payload: item.payload(),
1171 };
1172 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1173 Ok(())
1174 }
1175}
1176
1177impl Service for StoreService {
1178 type Interface = Store;
1179
1180 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1181 let this = Arc::new(self);
1182
1183 relay_log::info!("store forwarder started");
1184
1185 while let Some(message) = rx.recv().await {
1186 let service = Arc::clone(&this);
1187 this.pool
1190 .spawn_async(async move { service.handle_message(message) }.boxed())
1191 .await;
1192 }
1193
1194 relay_log::info!("store forwarder stopped");
1195 }
1196}
1197
1198#[derive(Debug, Serialize)]
1200enum AttachmentPayload {
1201 #[serde(rename = "chunks")]
1206 Chunked(usize),
1207
1208 #[serde(rename = "data")]
1210 Inline(Bytes),
1211
1212 #[serde(rename = "stored_id")]
1214 #[allow(unused)] Stored(String),
1216}
1217
1218#[derive(Debug, Serialize)]
1220struct ChunkedAttachment {
1221 id: String,
1225
1226 name: String,
1228
1229 rate_limited: bool,
1236
1237 #[serde(skip_serializing_if = "Option::is_none")]
1239 content_type: Option<String>,
1240
1241 #[serde(serialize_with = "serialize_attachment_type")]
1243 attachment_type: AttachmentType,
1244
1245 size: usize,
1247
1248 #[serde(flatten)]
1250 payload: AttachmentPayload,
1251}
1252
1253fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1259where
1260 S: serde::Serializer,
1261 T: serde::Serialize,
1262{
1263 serde_json::to_value(t)
1264 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1265 .serialize(serializer)
1266}
1267
1268#[derive(Debug, Serialize)]
1270struct EventKafkaMessage {
1271 payload: Bytes,
1273 start_time: u64,
1275 event_id: EventId,
1277 project_id: ProjectId,
1279 remote_addr: Option<String>,
1281 attachments: Vec<ChunkedAttachment>,
1283}
1284
1285#[derive(Debug, Serialize)]
1286struct ReplayEventKafkaMessage<'a> {
1287 payload: &'a [u8],
1289 start_time: u64,
1291 replay_id: EventId,
1293 project_id: ProjectId,
1295 retention_days: u16,
1297}
1298
1299#[derive(Debug, Serialize)]
1301struct AttachmentChunkKafkaMessage {
1302 payload: Bytes,
1304 event_id: EventId,
1306 project_id: ProjectId,
1308 id: String,
1312 chunk_index: usize,
1314}
1315
1316#[derive(Debug, Serialize)]
1321struct AttachmentKafkaMessage {
1322 event_id: EventId,
1324 project_id: ProjectId,
1326 attachment: ChunkedAttachment,
1328}
1329
1330#[derive(Debug, Serialize)]
1331struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1332 replay_id: EventId,
1333 key_id: Option<u64>,
1334 org_id: OrganizationId,
1335 project_id: ProjectId,
1336 received: u64,
1337 retention_days: u16,
1338 #[serde(with = "serde_bytes")]
1339 payload: &'a [u8],
1340 #[serde(with = "serde_bytes")]
1341 replay_event: Option<&'a [u8]>,
1342 #[serde(with = "serde_bytes")]
1343 replay_video: Option<&'a [u8]>,
1344 relay_snuba_publish_disabled: bool,
1345}
1346
1347#[derive(Debug, Serialize)]
1351struct UserReportKafkaMessage {
1352 project_id: ProjectId,
1354 start_time: u64,
1355 payload: Bytes,
1356
1357 #[serde(skip)]
1359 event_id: EventId,
1360}
1361
1362#[derive(Clone, Debug, Serialize)]
1363struct MetricKafkaMessage<'a> {
1364 org_id: OrganizationId,
1365 project_id: ProjectId,
1366 name: &'a MetricName,
1367 #[serde(flatten)]
1368 value: MetricValue<'a>,
1369 timestamp: UnixTimestamp,
1370 tags: &'a BTreeMap<String, String>,
1371 retention_days: u16,
1372 #[serde(skip_serializing_if = "Option::is_none")]
1373 received_at: Option<UnixTimestamp>,
1374}
1375
1376#[derive(Clone, Debug, Serialize)]
1377#[serde(tag = "type", content = "value")]
1378enum MetricValue<'a> {
1379 #[serde(rename = "c")]
1380 Counter(FiniteF64),
1381 #[serde(rename = "d")]
1382 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1383 #[serde(rename = "s")]
1384 Set(ArrayEncoding<'a, SetView<'a>>),
1385 #[serde(rename = "g")]
1386 Gauge(GaugeValue),
1387}
1388
1389impl MetricValue<'_> {
1390 fn variant(&self) -> &'static str {
1391 match self {
1392 Self::Counter(_) => "counter",
1393 Self::Distribution(_) => "distribution",
1394 Self::Set(_) => "set",
1395 Self::Gauge(_) => "gauge",
1396 }
1397 }
1398
1399 fn encoding(&self) -> Option<&'static str> {
1400 match self {
1401 Self::Distribution(ae) => Some(ae.name()),
1402 Self::Set(ae) => Some(ae.name()),
1403 _ => None,
1404 }
1405 }
1406}
1407
1408#[derive(Clone, Debug, Serialize)]
1409struct ProfileKafkaMessage {
1410 organization_id: OrganizationId,
1411 project_id: ProjectId,
1412 key_id: Option<u64>,
1413 received: u64,
1414 retention_days: u16,
1415 #[serde(skip)]
1416 headers: BTreeMap<String, String>,
1417 payload: Bytes,
1418}
1419
1420#[allow(dead_code)]
1426#[derive(Debug, Serialize)]
1427#[serde(rename_all = "snake_case")]
1428enum CheckInMessageType {
1429 ClockPulse,
1430 CheckIn,
1431}
1432
1433#[derive(Debug, Serialize)]
1434struct CheckInKafkaMessage {
1435 #[serde(skip)]
1436 routing_key_hint: Option<Uuid>,
1437
1438 message_type: CheckInMessageType,
1440 payload: Bytes,
1442 start_time: u64,
1444 sdk: Option<String>,
1446 project_id: ProjectId,
1448 retention_days: u16,
1450}
1451
1452#[derive(Debug, Serialize)]
1453struct SpanKafkaMessageRaw<'a> {
1454 #[serde(flatten)]
1455 meta: SpanMeta,
1456 #[serde(flatten)]
1457 span: BTreeMap<&'a str, &'a RawValue>,
1458}
1459
1460#[derive(Debug, Serialize)]
1461struct SpanKafkaMessage<'a> {
1462 #[serde(flatten)]
1463 meta: SpanMeta,
1464 #[serde(flatten)]
1465 span: SerializableAnnotated<'a, SpanV2>,
1466}
1467
1468#[derive(Debug, Serialize)]
1469struct SpanMeta {
1470 organization_id: OrganizationId,
1471 project_id: ProjectId,
1472 #[serde(skip_serializing_if = "Option::is_none")]
1474 key_id: Option<u64>,
1475 #[serde(skip_serializing_if = "Option::is_none")]
1476 event_id: Option<EventId>,
1477 received: f64,
1479 retention_days: u16,
1481 downsampled_retention_days: u16,
1483}
1484
1485#[derive(Clone, Debug, Serialize)]
1486struct ProfileChunkKafkaMessage {
1487 organization_id: OrganizationId,
1488 project_id: ProjectId,
1489 received: u64,
1490 retention_days: u16,
1491 payload: Bytes,
1492}
1493
1494#[derive(Debug, Serialize)]
1496#[serde(tag = "type", rename_all = "snake_case")]
1497#[allow(clippy::large_enum_variant)]
1498enum KafkaMessage<'a> {
1499 Event(EventKafkaMessage),
1500 UserReport(UserReportKafkaMessage),
1501 Metric {
1502 #[serde(skip)]
1503 headers: BTreeMap<String, String>,
1504 #[serde(flatten)]
1505 message: MetricKafkaMessage<'a>,
1506 },
1507 CheckIn(CheckInKafkaMessage),
1508 Item {
1509 #[serde(skip)]
1510 headers: BTreeMap<String, String>,
1511 #[serde(skip)]
1512 item_type: TraceItemType,
1513 #[serde(skip)]
1514 message: TraceItem,
1515 },
1516 SpanRaw {
1517 #[serde(skip)]
1518 routing_key: Option<Uuid>,
1519 #[serde(skip)]
1520 headers: BTreeMap<String, String>,
1521 #[serde(flatten)]
1522 message: SpanKafkaMessageRaw<'a>,
1523 },
1524 SpanV2 {
1525 #[serde(skip)]
1526 routing_key: Option<Uuid>,
1527 #[serde(skip)]
1528 headers: BTreeMap<String, String>,
1529 #[serde(flatten)]
1530 message: SpanKafkaMessage<'a>,
1531 },
1532
1533 Attachment(AttachmentKafkaMessage),
1534 AttachmentChunk(AttachmentChunkKafkaMessage),
1535
1536 Profile(ProfileKafkaMessage),
1537 ProfileChunk(ProfileChunkKafkaMessage),
1538
1539 ReplayEvent(ReplayEventKafkaMessage<'a>),
1540 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1541}
1542
1543impl Message for KafkaMessage<'_> {
1544 fn variant(&self) -> &'static str {
1545 match self {
1546 KafkaMessage::Event(_) => "event",
1547 KafkaMessage::UserReport(_) => "user_report",
1548 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1549 MetricNamespace::Sessions => "metric_sessions",
1550 MetricNamespace::Transactions => "metric_transactions",
1551 MetricNamespace::Spans => "metric_spans",
1552 MetricNamespace::Custom => "metric_custom",
1553 MetricNamespace::Unsupported => "metric_unsupported",
1554 },
1555 KafkaMessage::CheckIn(_) => "check_in",
1556 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1557 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1558
1559 KafkaMessage::Attachment(_) => "attachment",
1560 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1561
1562 KafkaMessage::Profile(_) => "profile",
1563 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1564
1565 KafkaMessage::ReplayEvent(_) => "replay_event",
1566 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1567 }
1568 }
1569
1570 fn key(&self) -> Option<relay_kafka::Key> {
1572 match self {
1573 Self::Event(message) => Some(message.event_id.0),
1574 Self::UserReport(message) => Some(message.event_id.0),
1575 Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1576
1577 Self::CheckIn(message) => message.routing_key_hint,
1582
1583 Self::Attachment(message) => Some(message.event_id.0),
1584 Self::AttachmentChunk(message) => Some(message.event_id.0),
1585 Self::ReplayEvent(message) => Some(message.replay_id.0),
1586
1587 Self::Metric { .. }
1589 | Self::Item { .. }
1590 | Self::Profile(_)
1591 | Self::ProfileChunk(_)
1592 | Self::ReplayRecordingNotChunked(_) => None,
1593 }
1594 .filter(|uuid| !uuid.is_nil())
1595 .map(|uuid| uuid.as_u128())
1596 }
1597
1598 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1599 match &self {
1600 KafkaMessage::Metric { headers, .. }
1601 | KafkaMessage::SpanRaw { headers, .. }
1602 | KafkaMessage::SpanV2 { headers, .. }
1603 | KafkaMessage::Item { headers, .. }
1604 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers),
1605
1606 KafkaMessage::Event(_)
1607 | KafkaMessage::UserReport(_)
1608 | KafkaMessage::CheckIn(_)
1609 | KafkaMessage::Attachment(_)
1610 | KafkaMessage::AttachmentChunk(_)
1611 | KafkaMessage::ProfileChunk(_)
1612 | KafkaMessage::ReplayEvent(_)
1613 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1614 }
1615 }
1616
1617 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1618 match self {
1619 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1620 KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1621 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1622 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1623 KafkaMessage::Item { message, .. } => {
1624 let mut payload = Vec::new();
1625 match message.encode(&mut payload) {
1626 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1627 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1628 }
1629 }
1630 KafkaMessage::Event(_)
1631 | KafkaMessage::UserReport(_)
1632 | KafkaMessage::CheckIn(_)
1633 | KafkaMessage::Attachment(_)
1634 | KafkaMessage::AttachmentChunk(_)
1635 | KafkaMessage::Profile(_)
1636 | KafkaMessage::ProfileChunk(_)
1637 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1638 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1639 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1640 },
1641 }
1642 }
1643}
1644
1645fn serialize_as_json<T: serde::Serialize>(
1646 value: &T,
1647) -> Result<SerializationOutput<'_>, ClientError> {
1648 match serde_json::to_vec(value) {
1649 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1650 Err(err) => Err(ClientError::InvalidJson(err)),
1651 }
1652}
1653
1654fn is_slow_item(item: &Item) -> bool {
1658 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1659}
1660
1661fn bool_to_str(value: bool) -> &'static str {
1662 if value { "true" } else { "false" }
1663}
1664
1665fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1669 let ts = timestamp.timestamp();
1670 if ts >= 0 {
1671 return ts as u64;
1672 }
1673
1674 Utc::now().timestamp() as u64
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680
1681 use super::*;
1682
1683 #[test]
1684 fn disallow_outcomes() {
1685 let config = Config::default();
1686 let producer = Producer::create(&config).unwrap();
1687
1688 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1689 let res = producer
1690 .client
1691 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1692
1693 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1694 }
1695 }
1696}