1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
37use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
48
49#[derive(Debug, thiserror::Error)]
50pub enum StoreError {
51 #[error("failed to send the message to kafka: {0}")]
52 SendFailed(#[from] ClientError),
53 #[error("failed to encode data: {0}")]
54 EncodingFailed(std::io::Error),
55 #[error("failed to store event because event id was missing")]
56 NoEventId,
57}
58
59impl OutcomeError for StoreError {
60 type Error = Self;
61
62 fn consume(self) -> (Option<Outcome>, Self::Error) {
63 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
64 }
65}
66
67struct Producer {
68 client: KafkaClient,
69}
70
71impl Producer {
72 pub fn create(config: &Config) -> anyhow::Result<Self> {
73 let mut client_builder = KafkaClient::builder();
74
75 for topic in KafkaTopic::iter().filter(|t| {
76 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
79 }) {
80 let kafka_configs = config.kafka_configs(*topic)?;
81 client_builder = client_builder
82 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
83 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
84 }
85
86 Ok(Self {
87 client: client_builder.build(),
88 })
89 }
90}
91
92#[derive(Debug)]
94pub struct StoreEnvelope {
95 pub envelope: TypedEnvelope<Processed>,
96}
97
98#[derive(Clone, Debug)]
100pub struct StoreMetrics {
101 pub buckets: Vec<Bucket>,
102 pub scoping: Scoping,
103 pub retention: u16,
104}
105
106#[derive(Debug)]
108pub struct StoreTraceItem {
109 pub trace_item: TraceItem,
111 pub quantities: Quantities,
116}
117
118impl Counted for StoreTraceItem {
119 fn quantities(&self) -> Quantities {
120 self.quantities.clone()
121 }
122}
123
124#[derive(Debug)]
126pub struct StoreSpanV2 {
127 pub routing_key: Option<Uuid>,
129 pub retention_days: u16,
131 pub downsampled_retention_days: u16,
133 pub item: SpanV2,
135}
136
137impl Counted for StoreSpanV2 {
138 fn quantities(&self) -> Quantities {
139 self.item.quantities()
140 }
141}
142
143pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
145
146#[derive(Debug)]
148pub enum Store {
149 Envelope(StoreEnvelope),
157 Metrics(StoreMetrics),
159 TraceItem(Managed<StoreTraceItem>),
161 Span(Managed<Box<StoreSpanV2>>),
163}
164
165impl Store {
166 fn variant(&self) -> &'static str {
168 match self {
169 Store::Envelope(_) => "envelope",
170 Store::Metrics(_) => "metrics",
171 Store::TraceItem(_) => "log",
172 Store::Span(_) => "span",
173 }
174 }
175}
176
177impl Interface for Store {}
178
179impl FromMessage<StoreEnvelope> for Store {
180 type Response = NoResponse;
181
182 fn from_message(message: StoreEnvelope, _: ()) -> Self {
183 Self::Envelope(message)
184 }
185}
186
187impl FromMessage<StoreMetrics> for Store {
188 type Response = NoResponse;
189
190 fn from_message(message: StoreMetrics, _: ()) -> Self {
191 Self::Metrics(message)
192 }
193}
194
195impl FromMessage<Managed<StoreTraceItem>> for Store {
196 type Response = NoResponse;
197
198 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
199 Self::TraceItem(message)
200 }
201}
202
203impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
204 type Response = NoResponse;
205
206 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
207 Self::Span(message)
208 }
209}
210
211pub struct StoreService {
213 pool: StoreServicePool,
214 config: Arc<Config>,
215 global_config: GlobalConfigHandle,
216 outcome_aggregator: Addr<TrackOutcome>,
217 metric_outcomes: MetricOutcomes,
218 producer: Producer,
219}
220
221impl StoreService {
222 pub fn create(
223 pool: StoreServicePool,
224 config: Arc<Config>,
225 global_config: GlobalConfigHandle,
226 outcome_aggregator: Addr<TrackOutcome>,
227 metric_outcomes: MetricOutcomes,
228 ) -> anyhow::Result<Self> {
229 let producer = Producer::create(&config)?;
230 Ok(Self {
231 pool,
232 config,
233 global_config,
234 outcome_aggregator,
235 metric_outcomes,
236 producer,
237 })
238 }
239
240 fn handle_message(&self, message: Store) {
241 let ty = message.variant();
242 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
243 match message {
244 Store::Envelope(message) => self.handle_store_envelope(message),
245 Store::Metrics(message) => self.handle_store_metrics(message),
246 Store::TraceItem(message) => self.handle_store_trace_item(message),
247 Store::Span(message) => self.handle_store_span(message),
248 }
249 })
250 }
251
252 fn handle_store_envelope(&self, message: StoreEnvelope) {
253 let StoreEnvelope {
254 envelope: mut managed,
255 } = message;
256
257 let scoping = managed.scoping();
258 let envelope = managed.take_envelope();
259
260 match self.store_envelope(envelope, managed.received_at(), scoping) {
261 Ok(()) => managed.accept(),
262 Err(error) => {
263 managed.reject(Outcome::Invalid(DiscardReason::Internal));
264 relay_log::error!(
265 error = &error as &dyn Error,
266 tags.project_key = %scoping.project_key,
267 "failed to store envelope"
268 );
269 }
270 }
271 }
272
273 fn store_envelope(
274 &self,
275 mut envelope: Box<Envelope>,
276 received_at: DateTime<Utc>,
277 scoping: Scoping,
278 ) -> Result<(), StoreError> {
279 let retention = envelope.retention();
280 let downsampled_retention = envelope.downsampled_retention();
281
282 let event_id = envelope.event_id();
283 let event_item = envelope.as_mut().take_item_by(|item| {
284 matches!(
285 item.ty(),
286 ItemType::Event | ItemType::Transaction | ItemType::Security
287 )
288 });
289 let event_type = event_item.as_ref().map(|item| item.ty());
290
291 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
295 KafkaTopic::Transactions
296 } else if envelope.get_item_by(is_slow_item).is_some() {
297 KafkaTopic::Attachments
298 } else {
299 KafkaTopic::Events
300 };
301
302 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
303
304 let mut attachments = Vec::new();
305 let mut replay_event = None;
306 let mut replay_recording = None;
307
308 let replay_relay_snuba_publish_disabled = utils::sample(
310 self.global_config
311 .current()
312 .options
313 .replay_relay_snuba_publish_disabled_sample_rate,
314 )
315 .is_keep();
316
317 for item in envelope.items() {
318 let content_type = item.content_type();
319 match item.ty() {
320 ItemType::Attachment => {
321 if let Some(attachment) = self.produce_attachment(
322 event_id.ok_or(StoreError::NoEventId)?,
323 scoping.project_id,
324 item,
325 send_individual_attachments,
326 )? {
327 attachments.push(attachment);
328 }
329 }
330 ItemType::UserReport => {
331 debug_assert!(event_topic == KafkaTopic::Attachments);
332 self.produce_user_report(
333 event_id.ok_or(StoreError::NoEventId)?,
334 scoping.project_id,
335 received_at,
336 item,
337 )?;
338 }
339 ItemType::UserReportV2 => {
340 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
341 self.produce_user_report_v2(
342 event_id.ok_or(StoreError::NoEventId)?,
343 scoping.project_id,
344 received_at,
345 item,
346 remote_addr,
347 )?;
348 }
349 ItemType::Profile => self.produce_profile(
350 scoping.organization_id,
351 scoping.project_id,
352 scoping.key_id,
353 received_at,
354 retention,
355 item,
356 )?,
357 ItemType::ReplayVideo => {
358 self.produce_replay_video(
359 event_id,
360 scoping,
361 item.payload(),
362 received_at,
363 retention,
364 replay_relay_snuba_publish_disabled,
365 )?;
366 }
367 ItemType::ReplayRecording => {
368 replay_recording = Some(item);
369 }
370 ItemType::ReplayEvent => {
371 replay_event = Some(item);
372 self.produce_replay_event(
373 event_id.ok_or(StoreError::NoEventId)?,
374 scoping.project_id,
375 received_at,
376 retention,
377 &item.payload(),
378 replay_relay_snuba_publish_disabled,
379 )?;
380 }
381 ItemType::CheckIn => {
382 let client = envelope.meta().client();
383 self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
384 }
385 ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
386 scoping,
387 received_at,
388 event_id,
389 retention,
390 downsampled_retention,
391 item,
392 )?,
393 ty @ ItemType::Log => {
394 debug_assert!(
395 false,
396 "received {ty} through an envelope, \
397 this item must be submitted via a specific store message instead"
398 );
399 relay_log::error!(
400 tags.project_key = %scoping.project_key,
401 "StoreService received unsupported item type '{ty}' in envelope"
402 );
403 }
404 ItemType::ProfileChunk => self.produce_profile_chunk(
405 scoping.organization_id,
406 scoping.project_id,
407 received_at,
408 retention,
409 item,
410 )?,
411 other => {
412 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
413 let item_types = envelope
414 .items()
415 .map(|item| item.ty().as_str())
416 .collect::<Vec<_>>();
417 let attachment_types = envelope
418 .items()
419 .map(|item| {
420 item.attachment_type()
421 .map(|t| t.to_string())
422 .unwrap_or_default()
423 })
424 .collect::<Vec<_>>();
425
426 relay_log::with_scope(
427 |scope| {
428 scope.set_extra("item_types", item_types.into());
429 scope.set_extra("attachment_types", attachment_types.into());
430 if other == &ItemType::FormData {
431 let payload = item.payload();
432 let form_data_keys = FormDataIter::new(&payload)
433 .map(|entry| entry.key())
434 .collect::<Vec<_>>();
435 scope.set_extra("form_data_keys", form_data_keys.into());
436 }
437 },
438 || {
439 relay_log::error!(
440 tags.project_key = %scoping.project_key,
441 tags.event_type = event_type.unwrap_or("none"),
442 "StoreService received unexpected item type: {other}"
443 )
444 },
445 )
446 }
447 }
448 }
449
450 if let Some(recording) = replay_recording {
451 let replay_event = replay_event.map(|rv| rv.payload());
457 self.produce_replay_recording(
458 event_id,
459 scoping,
460 &recording.payload(),
461 replay_event.as_deref(),
462 None,
463 received_at,
464 retention,
465 replay_relay_snuba_publish_disabled,
466 )?;
467 }
468
469 if let Some(event_item) = event_item {
470 let event_id = event_id.ok_or(StoreError::NoEventId)?;
471 let project_id = scoping.project_id;
472 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
473
474 self.produce(
475 event_topic,
476 KafkaMessage::Event(EventKafkaMessage {
477 payload: event_item.payload(),
478 start_time: safe_timestamp(received_at),
479 event_id,
480 project_id,
481 remote_addr,
482 attachments,
483 }),
484 )?;
485 } else {
486 debug_assert!(attachments.is_empty());
487 }
488
489 Ok(())
490 }
491
492 fn handle_store_metrics(&self, message: StoreMetrics) {
493 let StoreMetrics {
494 buckets,
495 scoping,
496 retention,
497 } = message;
498
499 let batch_size = self.config.metrics_max_batch_size_bytes();
500 let mut error = None;
501
502 let global_config = self.global_config.current();
503 let mut encoder = BucketEncoder::new(&global_config);
504
505 let now = UnixTimestamp::now();
506 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
507
508 for mut bucket in buckets {
509 let namespace = encoder.prepare(&mut bucket);
510
511 if let Some(received_at) = bucket.metadata.received_at {
512 let delay = now.as_secs().saturating_sub(received_at.as_secs());
513 let (total, count, max) = delay_stats.get_mut(namespace);
514 *total += delay;
515 *count += 1;
516 *max = (*max).max(delay);
517 }
518
519 for view in BucketsView::new(std::slice::from_ref(&bucket))
523 .by_size(batch_size)
524 .flatten()
525 {
526 let message = self.create_metric_message(
527 scoping.organization_id,
528 scoping.project_id,
529 &mut encoder,
530 namespace,
531 &view,
532 retention,
533 );
534
535 let result =
536 message.and_then(|message| self.send_metric_message(namespace, message));
537
538 let outcome = match result {
539 Ok(()) => Outcome::Accepted,
540 Err(e) => {
541 error.get_or_insert(e);
542 Outcome::Invalid(DiscardReason::Internal)
543 }
544 };
545
546 self.metric_outcomes.track(scoping, &[view], outcome);
547 }
548 }
549
550 if let Some(error) = error {
551 relay_log::error!(
552 error = &error as &dyn std::error::Error,
553 "failed to produce metric buckets: {error}"
554 );
555 }
556
557 for (namespace, (total, count, max)) in delay_stats {
558 if count == 0 {
559 continue;
560 }
561 metric!(
562 counter(RelayCounters::MetricDelaySum) += total,
563 namespace = namespace.as_str()
564 );
565 metric!(
566 counter(RelayCounters::MetricDelayCount) += count,
567 namespace = namespace.as_str()
568 );
569 metric!(
570 gauge(RelayGauges::MetricDelayMax) = max,
571 namespace = namespace.as_str()
572 );
573 }
574 }
575
576 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
577 let scoping = message.scoping();
578 let received_at = message.received_at();
579
580 let quantities = message.try_accept(|item| {
581 let item_type = item.trace_item.item_type();
582 let message = KafkaMessage::Item {
583 headers: BTreeMap::from([
584 ("project_id".to_owned(), scoping.project_id.to_string()),
585 ("item_type".to_owned(), (item_type as i32).to_string()),
586 ]),
587 message: item.trace_item,
588 item_type,
589 };
590
591 self.produce(KafkaTopic::Items, message)
592 .map(|()| item.quantities)
593 });
594
595 if let Ok(quantities) = quantities {
600 for (category, quantity) in quantities {
601 self.outcome_aggregator.send(TrackOutcome {
602 category,
603 event_id: None,
604 outcome: Outcome::Accepted,
605 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
606 remote_addr: None,
607 scoping,
608 timestamp: received_at,
609 });
610 }
611 }
612 }
613
614 fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
615 let scoping = message.scoping();
616 let received_at = message.received_at();
617
618 let meta = SpanMeta {
619 organization_id: scoping.organization_id,
620 project_id: scoping.project_id,
621 key_id: scoping.key_id,
622 event_id: None,
623 retention_days: message.retention_days,
624 downsampled_retention_days: message.downsampled_retention_days,
625 received: datetime_to_timestamp(received_at),
626 };
627
628 let result = message.try_accept(|span| {
629 let item = Annotated::new(span.item);
630 let message = KafkaMessage::SpanV2 {
631 routing_key: span.routing_key,
632 headers: BTreeMap::from([(
633 "project_id".to_owned(),
634 scoping.project_id.to_string(),
635 )]),
636 message: SpanKafkaMessage {
637 meta,
638 span: SerializableAnnotated(&item),
639 },
640 };
641
642 self.produce(KafkaTopic::Spans, message)
643 });
644
645 match result {
646 Ok(()) => {
647 relay_statsd::metric!(
648 counter(RelayCounters::SpanV2Produced) += 1,
649 via = "processing"
650 );
651
652 self.outcome_aggregator.send(TrackOutcome {
655 category: DataCategory::SpanIndexed,
656 event_id: None,
657 outcome: Outcome::Accepted,
658 quantity: 1,
659 remote_addr: None,
660 scoping,
661 timestamp: received_at,
662 });
663 }
664 Err(error) => {
665 relay_log::error!(
666 error = &error as &dyn Error,
667 tags.project_key = %scoping.project_key,
668 "failed to store span"
669 );
670 }
671 }
672 }
673
674 fn create_metric_message<'a>(
675 &self,
676 organization_id: OrganizationId,
677 project_id: ProjectId,
678 encoder: &'a mut BucketEncoder,
679 namespace: MetricNamespace,
680 view: &BucketView<'a>,
681 retention_days: u16,
682 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
683 let value = match view.value() {
684 BucketViewValue::Counter(c) => MetricValue::Counter(c),
685 BucketViewValue::Distribution(data) => MetricValue::Distribution(
686 encoder
687 .encode_distribution(namespace, data)
688 .map_err(StoreError::EncodingFailed)?,
689 ),
690 BucketViewValue::Set(data) => MetricValue::Set(
691 encoder
692 .encode_set(namespace, data)
693 .map_err(StoreError::EncodingFailed)?,
694 ),
695 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
696 };
697
698 Ok(MetricKafkaMessage {
699 org_id: organization_id,
700 project_id,
701 name: view.name(),
702 value,
703 timestamp: view.timestamp(),
704 tags: view.tags(),
705 retention_days,
706 received_at: view.metadata().received_at,
707 })
708 }
709
710 fn produce(
711 &self,
712 topic: KafkaTopic,
713 message: KafkaMessage,
715 ) -> Result<(), StoreError> {
716 relay_log::trace!("Sending kafka message of type {}", message.variant());
717
718 let topic_name = self.producer.client.send_message(topic, &message)?;
719
720 match &message {
721 KafkaMessage::Metric {
722 message: metric, ..
723 } => {
724 metric!(
725 counter(RelayCounters::ProcessingMessageProduced) += 1,
726 event_type = message.variant(),
727 topic = topic_name,
728 metric_type = metric.value.variant(),
729 metric_encoding = metric.value.encoding().unwrap_or(""),
730 );
731 }
732 KafkaMessage::ReplayRecordingNotChunked(replay) => {
733 let has_video = replay.replay_video.is_some();
734
735 metric!(
736 counter(RelayCounters::ProcessingMessageProduced) += 1,
737 event_type = message.variant(),
738 topic = topic_name,
739 has_video = bool_to_str(has_video),
740 );
741 }
742 message => {
743 metric!(
744 counter(RelayCounters::ProcessingMessageProduced) += 1,
745 event_type = message.variant(),
746 topic = topic_name,
747 );
748 }
749 }
750
751 Ok(())
752 }
753
754 fn produce_attachment(
766 &self,
767 event_id: EventId,
768 project_id: ProjectId,
769 item: &Item,
770 send_individual_attachments: bool,
771 ) -> Result<Option<ChunkedAttachment>, StoreError> {
772 let id = Uuid::new_v4().to_string();
773
774 let payload = item.payload();
775 let size = item.len();
776 let max_chunk_size = self.config.attachment_chunk_size();
777
778 let payload = if size == 0 {
782 AttachmentPayload::Chunked(0)
783 } else if send_individual_attachments && size < max_chunk_size {
784 AttachmentPayload::Inline(payload)
785 } else {
786 let mut chunk_index = 0;
787 let mut offset = 0;
788 while offset < size {
791 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
792 let chunk_message = AttachmentChunkKafkaMessage {
793 payload: payload.slice(offset..offset + chunk_size),
794 event_id,
795 project_id,
796 id: id.clone(),
797 chunk_index,
798 };
799
800 self.produce(
801 KafkaTopic::Attachments,
802 KafkaMessage::AttachmentChunk(chunk_message),
803 )?;
804 offset += chunk_size;
805 chunk_index += 1;
806 }
807
808 AttachmentPayload::Chunked(chunk_index)
811 };
812
813 let attachment = ChunkedAttachment {
814 id,
815 name: match item.filename() {
816 Some(name) => name.to_owned(),
817 None => UNNAMED_ATTACHMENT.to_owned(),
818 },
819 rate_limited: item.rate_limited(),
820 content_type: item
821 .content_type()
822 .map(|content_type| content_type.as_str().to_owned()),
823 attachment_type: item.attachment_type().cloned().unwrap_or_default(),
824 size,
825 payload,
826 };
827
828 if send_individual_attachments {
829 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
830 event_id,
831 project_id,
832 attachment,
833 });
834 self.produce(KafkaTopic::Attachments, message)?;
835 Ok(None)
836 } else {
837 Ok(Some(attachment))
838 }
839 }
840
841 fn produce_user_report(
842 &self,
843 event_id: EventId,
844 project_id: ProjectId,
845 received_at: DateTime<Utc>,
846 item: &Item,
847 ) -> Result<(), StoreError> {
848 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
849 project_id,
850 event_id,
851 start_time: safe_timestamp(received_at),
852 payload: item.payload(),
853 });
854
855 self.produce(KafkaTopic::Attachments, message)
856 }
857
858 fn produce_user_report_v2(
859 &self,
860 event_id: EventId,
861 project_id: ProjectId,
862 received_at: DateTime<Utc>,
863 item: &Item,
864 remote_addr: Option<String>,
865 ) -> Result<(), StoreError> {
866 let message = KafkaMessage::Event(EventKafkaMessage {
867 project_id,
868 event_id,
869 payload: item.payload(),
870 start_time: safe_timestamp(received_at),
871 remote_addr,
872 attachments: vec![],
873 });
874 self.produce(KafkaTopic::Feedback, message)
875 }
876
877 fn send_metric_message(
878 &self,
879 namespace: MetricNamespace,
880 message: MetricKafkaMessage,
881 ) -> Result<(), StoreError> {
882 let topic = match namespace {
883 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
884 MetricNamespace::Unsupported => {
885 relay_log::with_scope(
886 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
887 || relay_log::error!("store service dropping unknown metric usecase"),
888 );
889 return Ok(());
890 }
891 _ => KafkaTopic::MetricsGeneric,
892 };
893 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
894
895 self.produce(topic, KafkaMessage::Metric { headers, message })?;
896 Ok(())
897 }
898
899 fn produce_profile(
900 &self,
901 organization_id: OrganizationId,
902 project_id: ProjectId,
903 key_id: Option<u64>,
904 received_at: DateTime<Utc>,
905 retention_days: u16,
906 item: &Item,
907 ) -> Result<(), StoreError> {
908 let message = ProfileKafkaMessage {
909 organization_id,
910 project_id,
911 key_id,
912 received: safe_timestamp(received_at),
913 retention_days,
914 headers: BTreeMap::from([(
915 "sampled".to_owned(),
916 if item.sampled() { "true" } else { "false" }.to_owned(),
917 )]),
918 payload: item.payload(),
919 };
920 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
921 Ok(())
922 }
923
924 fn produce_replay_event(
925 &self,
926 replay_id: EventId,
927 project_id: ProjectId,
928 received_at: DateTime<Utc>,
929 retention_days: u16,
930 payload: &[u8],
931 relay_snuba_publish_disabled: bool,
932 ) -> Result<(), StoreError> {
933 if relay_snuba_publish_disabled {
934 return Ok(());
935 }
936
937 let message = ReplayEventKafkaMessage {
938 replay_id,
939 project_id,
940 retention_days,
941 start_time: safe_timestamp(received_at),
942 payload,
943 };
944 self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
945 Ok(())
946 }
947
948 #[allow(clippy::too_many_arguments)]
949 fn produce_replay_recording(
950 &self,
951 event_id: Option<EventId>,
952 scoping: Scoping,
953 payload: &[u8],
954 replay_event: Option<&[u8]>,
955 replay_video: Option<&[u8]>,
956 received_at: DateTime<Utc>,
957 retention: u16,
958 relay_snuba_publish_disabled: bool,
959 ) -> Result<(), StoreError> {
960 let max_payload_size = self.config.max_replay_message_size();
962
963 let mut payload_size = 2000; payload_size += replay_event.as_ref().map_or(0, |b| b.len());
967 payload_size += replay_video.as_ref().map_or(0, |b| b.len());
968 payload_size += payload.len();
969
970 if payload_size >= max_payload_size {
972 relay_log::debug!("replay_recording over maximum size.");
973 self.outcome_aggregator.send(TrackOutcome {
974 category: DataCategory::Replay,
975 event_id,
976 outcome: Outcome::Invalid(DiscardReason::TooLarge(
977 DiscardItemType::ReplayRecording,
978 )),
979 quantity: 1,
980 remote_addr: None,
981 scoping,
982 timestamp: received_at,
983 });
984 return Ok(());
985 }
986
987 let message =
988 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
989 replay_id: event_id.ok_or(StoreError::NoEventId)?,
990 project_id: scoping.project_id,
991 key_id: scoping.key_id,
992 org_id: scoping.organization_id,
993 received: safe_timestamp(received_at),
994 retention_days: retention,
995 payload,
996 replay_event,
997 replay_video,
998 relay_snuba_publish_disabled,
999 });
1000
1001 self.produce(KafkaTopic::ReplayRecordings, message)?;
1002
1003 Ok(())
1004 }
1005
1006 fn produce_replay_video(
1007 &self,
1008 event_id: Option<EventId>,
1009 scoping: Scoping,
1010 payload: Bytes,
1011 received_at: DateTime<Utc>,
1012 retention: u16,
1013 relay_snuba_publish_disabled: bool,
1014 ) -> Result<(), StoreError> {
1015 #[derive(Deserialize)]
1016 struct VideoEvent<'a> {
1017 replay_event: &'a [u8],
1018 replay_recording: &'a [u8],
1019 replay_video: &'a [u8],
1020 }
1021
1022 let Ok(VideoEvent {
1023 replay_video,
1024 replay_event,
1025 replay_recording,
1026 }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1027 else {
1028 self.outcome_aggregator.send(TrackOutcome {
1029 category: DataCategory::Replay,
1030 event_id,
1031 outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1032 quantity: 1,
1033 remote_addr: None,
1034 scoping,
1035 timestamp: received_at,
1036 });
1037 return Ok(());
1038 };
1039
1040 self.produce_replay_event(
1041 event_id.ok_or(StoreError::NoEventId)?,
1042 scoping.project_id,
1043 received_at,
1044 retention,
1045 replay_event,
1046 relay_snuba_publish_disabled,
1047 )?;
1048
1049 self.produce_replay_recording(
1050 event_id,
1051 scoping,
1052 replay_recording,
1053 Some(replay_event),
1054 Some(replay_video),
1055 received_at,
1056 retention,
1057 relay_snuba_publish_disabled,
1058 )
1059 }
1060
1061 fn produce_check_in(
1062 &self,
1063 project_id: ProjectId,
1064 received_at: DateTime<Utc>,
1065 client: Option<&str>,
1066 retention_days: u16,
1067 item: &Item,
1068 ) -> Result<(), StoreError> {
1069 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1070 message_type: CheckInMessageType::CheckIn,
1071 project_id,
1072 retention_days,
1073 start_time: safe_timestamp(received_at),
1074 sdk: client.map(str::to_owned),
1075 payload: item.payload(),
1076 routing_key_hint: item.routing_hint(),
1077 });
1078
1079 self.produce(KafkaTopic::Monitors, message)?;
1080
1081 Ok(())
1082 }
1083
1084 fn produce_span(
1085 &self,
1086 scoping: Scoping,
1087 received_at: DateTime<Utc>,
1088 event_id: Option<EventId>,
1089 retention_days: u16,
1090 downsampled_retention_days: u16,
1091 item: &Item,
1092 ) -> Result<(), StoreError> {
1093 debug_assert_eq!(item.ty(), &ItemType::Span);
1094 debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1095
1096 let Scoping {
1097 organization_id,
1098 project_id,
1099 project_key: _,
1100 key_id,
1101 } = scoping;
1102
1103 let payload = item.payload();
1104 let message = SpanKafkaMessageRaw {
1105 meta: SpanMeta {
1106 organization_id,
1107 project_id,
1108 key_id,
1109 event_id,
1110 retention_days,
1111 downsampled_retention_days,
1112 received: datetime_to_timestamp(received_at),
1113 },
1114 span: serde_json::from_slice(&payload)
1115 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1116 };
1117
1118 debug_assert!(message.span.contains_key("attributes"));
1120 relay_statsd::metric!(
1121 counter(RelayCounters::SpanV2Produced) += 1,
1122 via = "envelope"
1123 );
1124
1125 self.produce(
1126 KafkaTopic::Spans,
1127 KafkaMessage::SpanRaw {
1128 routing_key: item.routing_hint(),
1129 headers: BTreeMap::from([(
1130 "project_id".to_owned(),
1131 scoping.project_id.to_string(),
1132 )]),
1133 message,
1134 },
1135 )?;
1136
1137 self.outcome_aggregator.send(TrackOutcome {
1140 category: DataCategory::SpanIndexed,
1141 event_id: None,
1142 outcome: Outcome::Accepted,
1143 quantity: 1,
1144 remote_addr: None,
1145 scoping,
1146 timestamp: received_at,
1147 });
1148
1149 Ok(())
1150 }
1151
1152 fn produce_profile_chunk(
1153 &self,
1154 organization_id: OrganizationId,
1155 project_id: ProjectId,
1156 received_at: DateTime<Utc>,
1157 retention_days: u16,
1158 item: &Item,
1159 ) -> Result<(), StoreError> {
1160 let message = ProfileChunkKafkaMessage {
1161 organization_id,
1162 project_id,
1163 received: safe_timestamp(received_at),
1164 retention_days,
1165 payload: item.payload(),
1166 };
1167 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1168 Ok(())
1169 }
1170}
1171
1172impl Service for StoreService {
1173 type Interface = Store;
1174
1175 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1176 let this = Arc::new(self);
1177
1178 relay_log::info!("store forwarder started");
1179
1180 while let Some(message) = rx.recv().await {
1181 let service = Arc::clone(&this);
1182 this.pool
1185 .spawn_async(async move { service.handle_message(message) }.boxed())
1186 .await;
1187 }
1188
1189 relay_log::info!("store forwarder stopped");
1190 }
1191}
1192
1193#[derive(Debug, Serialize)]
1195enum AttachmentPayload {
1196 #[serde(rename = "chunks")]
1201 Chunked(usize),
1202
1203 #[serde(rename = "data")]
1205 Inline(Bytes),
1206
1207 #[serde(rename = "stored_id")]
1209 #[allow(unused)] Stored(String),
1211}
1212
1213#[derive(Debug, Serialize)]
1215struct ChunkedAttachment {
1216 id: String,
1220
1221 name: String,
1223
1224 rate_limited: bool,
1231
1232 #[serde(skip_serializing_if = "Option::is_none")]
1234 content_type: Option<String>,
1235
1236 #[serde(serialize_with = "serialize_attachment_type")]
1238 attachment_type: AttachmentType,
1239
1240 size: usize,
1242
1243 #[serde(flatten)]
1245 payload: AttachmentPayload,
1246}
1247
1248fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1254where
1255 S: serde::Serializer,
1256 T: serde::Serialize,
1257{
1258 serde_json::to_value(t)
1259 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1260 .serialize(serializer)
1261}
1262
1263#[derive(Debug, Serialize)]
1265struct EventKafkaMessage {
1266 payload: Bytes,
1268 start_time: u64,
1270 event_id: EventId,
1272 project_id: ProjectId,
1274 remote_addr: Option<String>,
1276 attachments: Vec<ChunkedAttachment>,
1278}
1279
1280#[derive(Debug, Serialize)]
1281struct ReplayEventKafkaMessage<'a> {
1282 payload: &'a [u8],
1284 start_time: u64,
1286 replay_id: EventId,
1288 project_id: ProjectId,
1290 retention_days: u16,
1292}
1293
1294#[derive(Debug, Serialize)]
1296struct AttachmentChunkKafkaMessage {
1297 payload: Bytes,
1299 event_id: EventId,
1301 project_id: ProjectId,
1303 id: String,
1307 chunk_index: usize,
1309}
1310
1311#[derive(Debug, Serialize)]
1316struct AttachmentKafkaMessage {
1317 event_id: EventId,
1319 project_id: ProjectId,
1321 attachment: ChunkedAttachment,
1323}
1324
1325#[derive(Debug, Serialize)]
1326struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1327 replay_id: EventId,
1328 key_id: Option<u64>,
1329 org_id: OrganizationId,
1330 project_id: ProjectId,
1331 received: u64,
1332 retention_days: u16,
1333 #[serde(with = "serde_bytes")]
1334 payload: &'a [u8],
1335 #[serde(with = "serde_bytes")]
1336 replay_event: Option<&'a [u8]>,
1337 #[serde(with = "serde_bytes")]
1338 replay_video: Option<&'a [u8]>,
1339 relay_snuba_publish_disabled: bool,
1340}
1341
1342#[derive(Debug, Serialize)]
1346struct UserReportKafkaMessage {
1347 project_id: ProjectId,
1349 start_time: u64,
1350 payload: Bytes,
1351
1352 #[serde(skip)]
1354 event_id: EventId,
1355}
1356
1357#[derive(Clone, Debug, Serialize)]
1358struct MetricKafkaMessage<'a> {
1359 org_id: OrganizationId,
1360 project_id: ProjectId,
1361 name: &'a MetricName,
1362 #[serde(flatten)]
1363 value: MetricValue<'a>,
1364 timestamp: UnixTimestamp,
1365 tags: &'a BTreeMap<String, String>,
1366 retention_days: u16,
1367 #[serde(skip_serializing_if = "Option::is_none")]
1368 received_at: Option<UnixTimestamp>,
1369}
1370
1371#[derive(Clone, Debug, Serialize)]
1372#[serde(tag = "type", content = "value")]
1373enum MetricValue<'a> {
1374 #[serde(rename = "c")]
1375 Counter(FiniteF64),
1376 #[serde(rename = "d")]
1377 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1378 #[serde(rename = "s")]
1379 Set(ArrayEncoding<'a, SetView<'a>>),
1380 #[serde(rename = "g")]
1381 Gauge(GaugeValue),
1382}
1383
1384impl MetricValue<'_> {
1385 fn variant(&self) -> &'static str {
1386 match self {
1387 Self::Counter(_) => "counter",
1388 Self::Distribution(_) => "distribution",
1389 Self::Set(_) => "set",
1390 Self::Gauge(_) => "gauge",
1391 }
1392 }
1393
1394 fn encoding(&self) -> Option<&'static str> {
1395 match self {
1396 Self::Distribution(ae) => Some(ae.name()),
1397 Self::Set(ae) => Some(ae.name()),
1398 _ => None,
1399 }
1400 }
1401}
1402
1403#[derive(Clone, Debug, Serialize)]
1404struct ProfileKafkaMessage {
1405 organization_id: OrganizationId,
1406 project_id: ProjectId,
1407 key_id: Option<u64>,
1408 received: u64,
1409 retention_days: u16,
1410 #[serde(skip)]
1411 headers: BTreeMap<String, String>,
1412 payload: Bytes,
1413}
1414
1415#[allow(dead_code)]
1421#[derive(Debug, Serialize)]
1422#[serde(rename_all = "snake_case")]
1423enum CheckInMessageType {
1424 ClockPulse,
1425 CheckIn,
1426}
1427
1428#[derive(Debug, Serialize)]
1429struct CheckInKafkaMessage {
1430 #[serde(skip)]
1431 routing_key_hint: Option<Uuid>,
1432
1433 message_type: CheckInMessageType,
1435 payload: Bytes,
1437 start_time: u64,
1439 sdk: Option<String>,
1441 project_id: ProjectId,
1443 retention_days: u16,
1445}
1446
1447#[derive(Debug, Serialize)]
1448struct SpanKafkaMessageRaw<'a> {
1449 #[serde(flatten)]
1450 meta: SpanMeta,
1451 #[serde(flatten)]
1452 span: BTreeMap<&'a str, &'a RawValue>,
1453}
1454
1455#[derive(Debug, Serialize)]
1456struct SpanKafkaMessage<'a> {
1457 #[serde(flatten)]
1458 meta: SpanMeta,
1459 #[serde(flatten)]
1460 span: SerializableAnnotated<'a, SpanV2>,
1461}
1462
1463#[derive(Debug, Serialize)]
1464struct SpanMeta {
1465 organization_id: OrganizationId,
1466 project_id: ProjectId,
1467 #[serde(skip_serializing_if = "Option::is_none")]
1469 key_id: Option<u64>,
1470 #[serde(skip_serializing_if = "Option::is_none")]
1471 event_id: Option<EventId>,
1472 received: f64,
1474 retention_days: u16,
1476 downsampled_retention_days: u16,
1478}
1479
1480#[derive(Clone, Debug, Serialize)]
1481struct ProfileChunkKafkaMessage {
1482 organization_id: OrganizationId,
1483 project_id: ProjectId,
1484 received: u64,
1485 retention_days: u16,
1486 payload: Bytes,
1487}
1488
1489#[derive(Debug, Serialize)]
1491#[serde(tag = "type", rename_all = "snake_case")]
1492#[allow(clippy::large_enum_variant)]
1493enum KafkaMessage<'a> {
1494 Event(EventKafkaMessage),
1495 UserReport(UserReportKafkaMessage),
1496 Metric {
1497 #[serde(skip)]
1498 headers: BTreeMap<String, String>,
1499 #[serde(flatten)]
1500 message: MetricKafkaMessage<'a>,
1501 },
1502 CheckIn(CheckInKafkaMessage),
1503 Item {
1504 #[serde(skip)]
1505 headers: BTreeMap<String, String>,
1506 #[serde(skip)]
1507 item_type: TraceItemType,
1508 #[serde(skip)]
1509 message: TraceItem,
1510 },
1511 SpanRaw {
1512 #[serde(skip)]
1513 routing_key: Option<Uuid>,
1514 #[serde(skip)]
1515 headers: BTreeMap<String, String>,
1516 #[serde(flatten)]
1517 message: SpanKafkaMessageRaw<'a>,
1518 },
1519 SpanV2 {
1520 #[serde(skip)]
1521 routing_key: Option<Uuid>,
1522 #[serde(skip)]
1523 headers: BTreeMap<String, String>,
1524 #[serde(flatten)]
1525 message: SpanKafkaMessage<'a>,
1526 },
1527
1528 Attachment(AttachmentKafkaMessage),
1529 AttachmentChunk(AttachmentChunkKafkaMessage),
1530
1531 Profile(ProfileKafkaMessage),
1532 ProfileChunk(ProfileChunkKafkaMessage),
1533
1534 ReplayEvent(ReplayEventKafkaMessage<'a>),
1535 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1536}
1537
1538impl Message for KafkaMessage<'_> {
1539 fn variant(&self) -> &'static str {
1540 match self {
1541 KafkaMessage::Event(_) => "event",
1542 KafkaMessage::UserReport(_) => "user_report",
1543 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1544 MetricNamespace::Sessions => "metric_sessions",
1545 MetricNamespace::Transactions => "metric_transactions",
1546 MetricNamespace::Spans => "metric_spans",
1547 MetricNamespace::Custom => "metric_custom",
1548 MetricNamespace::Unsupported => "metric_unsupported",
1549 },
1550 KafkaMessage::CheckIn(_) => "check_in",
1551 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1552 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1553
1554 KafkaMessage::Attachment(_) => "attachment",
1555 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1556
1557 KafkaMessage::Profile(_) => "profile",
1558 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1559
1560 KafkaMessage::ReplayEvent(_) => "replay_event",
1561 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1562 }
1563 }
1564
1565 fn key(&self) -> Option<relay_kafka::Key> {
1567 match self {
1568 Self::Event(message) => Some(message.event_id.0),
1569 Self::UserReport(message) => Some(message.event_id.0),
1570 Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1571
1572 Self::CheckIn(message) => message.routing_key_hint,
1577
1578 Self::Attachment(message) => Some(message.event_id.0),
1579 Self::AttachmentChunk(message) => Some(message.event_id.0),
1580 Self::ReplayEvent(message) => Some(message.replay_id.0),
1581
1582 Self::Metric { .. }
1584 | Self::Item { .. }
1585 | Self::Profile(_)
1586 | Self::ProfileChunk(_)
1587 | Self::ReplayRecordingNotChunked(_) => None,
1588 }
1589 .filter(|uuid| !uuid.is_nil())
1590 .map(|uuid| uuid.as_u128())
1591 }
1592
1593 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1594 match &self {
1595 KafkaMessage::Metric { headers, .. }
1596 | KafkaMessage::SpanRaw { headers, .. }
1597 | KafkaMessage::SpanV2 { headers, .. }
1598 | KafkaMessage::Item { headers, .. }
1599 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers),
1600
1601 KafkaMessage::Event(_)
1602 | KafkaMessage::UserReport(_)
1603 | KafkaMessage::CheckIn(_)
1604 | KafkaMessage::Attachment(_)
1605 | KafkaMessage::AttachmentChunk(_)
1606 | KafkaMessage::ProfileChunk(_)
1607 | KafkaMessage::ReplayEvent(_)
1608 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1609 }
1610 }
1611
1612 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1613 match self {
1614 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1615 KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1616 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1617 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1618 KafkaMessage::Item { message, .. } => {
1619 let mut payload = Vec::new();
1620 match message.encode(&mut payload) {
1621 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1622 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1623 }
1624 }
1625 KafkaMessage::Event(_)
1626 | KafkaMessage::UserReport(_)
1627 | KafkaMessage::CheckIn(_)
1628 | KafkaMessage::Attachment(_)
1629 | KafkaMessage::AttachmentChunk(_)
1630 | KafkaMessage::Profile(_)
1631 | KafkaMessage::ProfileChunk(_)
1632 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1633 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1634 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1635 },
1636 }
1637 }
1638}
1639
1640fn serialize_as_json<T: serde::Serialize>(
1641 value: &T,
1642) -> Result<SerializationOutput<'_>, ClientError> {
1643 match serde_json::to_vec(value) {
1644 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1645 Err(err) => Err(ClientError::InvalidJson(err)),
1646 }
1647}
1648
1649fn is_slow_item(item: &Item) -> bool {
1653 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1654}
1655
1656fn bool_to_str(value: bool) -> &'static str {
1657 if value { "true" } else { "false" }
1658}
1659
1660fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1664 let ts = timestamp.timestamp();
1665 if ts >= 0 {
1666 return ts as u64;
1667 }
1668
1669 Utc::now().timestamp() as u64
1671}
1672
1673#[cfg(test)]
1674mod tests {
1675
1676 use super::*;
1677
1678 #[test]
1679 fn disallow_outcomes() {
1680 let config = Config::default();
1681 let producer = Producer::create(&config).unwrap();
1682
1683 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1684 let res = producer
1685 .client
1686 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1687
1688 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1689 }
1690 }
1691}