1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
48
49#[derive(Debug, thiserror::Error)]
50pub enum StoreError {
51 #[error("failed to send the message to kafka: {0}")]
52 SendFailed(#[from] ClientError),
53 #[error("failed to encode data: {0}")]
54 EncodingFailed(std::io::Error),
55 #[error("failed to store event because event id was missing")]
56 NoEventId,
57}
58
59impl OutcomeError for StoreError {
60 type Error = Self;
61
62 fn consume(self) -> (Option<Outcome>, Self::Error) {
63 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
64 }
65}
66
67struct Producer {
68 client: KafkaClient,
69}
70
71impl Producer {
72 pub fn create(config: &Config) -> anyhow::Result<Self> {
73 let mut client_builder = KafkaClient::builder();
74
75 for topic in KafkaTopic::iter().filter(|t| {
76 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
79 }) {
80 let kafka_configs = config.kafka_configs(*topic)?;
81 client_builder = client_builder
82 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
83 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
84 }
85
86 Ok(Self {
87 client: client_builder.build(),
88 })
89 }
90}
91
92#[derive(Debug)]
94pub struct StoreEnvelope {
95 pub envelope: TypedEnvelope<Processed>,
96}
97
98#[derive(Clone, Debug)]
100pub struct StoreMetrics {
101 pub buckets: Vec<Bucket>,
102 pub scoping: Scoping,
103 pub retention: u16,
104}
105
106#[derive(Debug)]
108pub struct StoreTraceItem {
109 pub trace_item: TraceItem,
111 pub quantities: Quantities,
116}
117
118impl Counted for StoreTraceItem {
119 fn quantities(&self) -> Quantities {
120 self.quantities.clone()
121 }
122}
123
124#[derive(Debug)]
126pub struct StoreSpanV2 {
127 pub routing_key: Option<Uuid>,
129 pub retention_days: u16,
131 pub downsampled_retention_days: u16,
133 pub item: SpanV2,
135}
136
137impl Counted for StoreSpanV2 {
138 fn quantities(&self) -> Quantities {
139 smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
140 }
141}
142
143pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
145
146#[derive(Debug)]
148pub enum Store {
149 Envelope(StoreEnvelope),
157 Metrics(StoreMetrics),
159 TraceItem(Managed<StoreTraceItem>),
161 Span(Managed<Box<StoreSpanV2>>),
163}
164
165impl Store {
166 fn variant(&self) -> &'static str {
168 match self {
169 Store::Envelope(_) => "envelope",
170 Store::Metrics(_) => "metrics",
171 Store::TraceItem(_) => "log",
172 Store::Span(_) => "span",
173 }
174 }
175}
176
177impl Interface for Store {}
178
179impl FromMessage<StoreEnvelope> for Store {
180 type Response = NoResponse;
181
182 fn from_message(message: StoreEnvelope, _: ()) -> Self {
183 Self::Envelope(message)
184 }
185}
186
187impl FromMessage<StoreMetrics> for Store {
188 type Response = NoResponse;
189
190 fn from_message(message: StoreMetrics, _: ()) -> Self {
191 Self::Metrics(message)
192 }
193}
194
195impl FromMessage<Managed<StoreTraceItem>> for Store {
196 type Response = NoResponse;
197
198 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
199 Self::TraceItem(message)
200 }
201}
202
203impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
204 type Response = NoResponse;
205
206 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
207 Self::Span(message)
208 }
209}
210
211pub struct StoreService {
213 pool: StoreServicePool,
214 config: Arc<Config>,
215 global_config: GlobalConfigHandle,
216 outcome_aggregator: Addr<TrackOutcome>,
217 metric_outcomes: MetricOutcomes,
218 producer: Producer,
219}
220
221impl StoreService {
222 pub fn create(
223 pool: StoreServicePool,
224 config: Arc<Config>,
225 global_config: GlobalConfigHandle,
226 outcome_aggregator: Addr<TrackOutcome>,
227 metric_outcomes: MetricOutcomes,
228 ) -> anyhow::Result<Self> {
229 let producer = Producer::create(&config)?;
230 Ok(Self {
231 pool,
232 config,
233 global_config,
234 outcome_aggregator,
235 metric_outcomes,
236 producer,
237 })
238 }
239
240 fn handle_message(&self, message: Store) {
241 let ty = message.variant();
242 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
243 match message {
244 Store::Envelope(message) => self.handle_store_envelope(message),
245 Store::Metrics(message) => self.handle_store_metrics(message),
246 Store::TraceItem(message) => self.handle_store_trace_item(message),
247 Store::Span(message) => self.handle_store_span(message),
248 }
249 })
250 }
251
252 fn handle_store_envelope(&self, message: StoreEnvelope) {
253 let StoreEnvelope {
254 envelope: mut managed,
255 } = message;
256
257 let scoping = managed.scoping();
258
259 match self.store_envelope(&mut managed) {
260 Ok(()) => managed.accept(),
261 Err(error) => {
262 managed.reject(Outcome::Invalid(DiscardReason::Internal));
263 relay_log::error!(
264 error = &error as &dyn Error,
265 tags.project_key = %scoping.project_key,
266 "failed to store envelope"
267 );
268 }
269 }
270 }
271
272 fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
273 let mut envelope = managed_envelope.take_envelope();
274 let received_at = managed_envelope.received_at();
275 let scoping = managed_envelope.scoping();
276
277 let retention = envelope.retention();
278 let downsampled_retention = envelope.downsampled_retention();
279
280 let event_id = envelope.event_id();
281 let event_item = envelope.as_mut().take_item_by(|item| {
282 matches!(
283 item.ty(),
284 ItemType::Event | ItemType::Transaction | ItemType::Security
285 )
286 });
287 let event_type = event_item.as_ref().map(|item| item.ty());
288
289 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
293 KafkaTopic::Transactions
294 } else if envelope.get_item_by(is_slow_item).is_some() {
295 KafkaTopic::Attachments
296 } else {
297 KafkaTopic::Events
298 };
299
300 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
301
302 let mut attachments = Vec::new();
303 let mut replay_event = None;
304 let mut replay_recording = None;
305
306 let options = &self.global_config.current().options;
307
308 let replay_relay_snuba_publish_disabled =
310 utils::sample(options.replay_relay_snuba_publish_disabled_sample_rate).is_keep();
311
312 for item in envelope.items() {
313 let content_type = item.content_type();
314 match item.ty() {
315 ItemType::Attachment => {
316 if let Some(attachment) = self.produce_attachment(
317 event_id.ok_or(StoreError::NoEventId)?,
318 scoping.project_id,
319 item,
320 send_individual_attachments,
321 )? {
322 attachments.push(attachment);
323 }
324 }
325 ItemType::UserReport => {
326 debug_assert!(event_topic == KafkaTopic::Attachments);
327 self.produce_user_report(
328 event_id.ok_or(StoreError::NoEventId)?,
329 scoping.project_id,
330 received_at,
331 item,
332 )?;
333 }
334 ItemType::UserReportV2 => {
335 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
336 self.produce_user_report_v2(
337 event_id.ok_or(StoreError::NoEventId)?,
338 scoping.project_id,
339 received_at,
340 item,
341 remote_addr,
342 )?;
343 }
344 ItemType::Profile => self.produce_profile(
345 scoping.organization_id,
346 scoping.project_id,
347 scoping.key_id,
348 received_at,
349 retention,
350 item,
351 )?,
352 ItemType::ReplayVideo => {
353 self.produce_replay_video(
354 event_id,
355 scoping,
356 item.payload(),
357 received_at,
358 retention,
359 replay_relay_snuba_publish_disabled,
360 )?;
361 }
362 ItemType::ReplayRecording => {
363 replay_recording = Some(item);
364 }
365 ItemType::ReplayEvent => {
366 replay_event = Some(item);
367 self.produce_replay_event(
368 event_id.ok_or(StoreError::NoEventId)?,
369 scoping.project_id,
370 received_at,
371 retention,
372 &item.payload(),
373 replay_relay_snuba_publish_disabled,
374 )?;
375 }
376 ItemType::CheckIn => {
377 let client = envelope.meta().client();
378 self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
379 }
380 ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
381 scoping,
382 received_at,
383 event_id,
384 retention,
385 downsampled_retention,
386 item,
387 )?,
388 ty @ ItemType::Log => {
389 debug_assert!(
390 false,
391 "received {ty} through an envelope, \
392 this item must be submitted via a specific store message instead"
393 );
394 relay_log::error!(
395 tags.project_key = %scoping.project_key,
396 "StoreService received unsupported item type '{ty}' in envelope"
397 );
398 }
399 ItemType::ProfileChunk => self.produce_profile_chunk(
400 scoping.organization_id,
401 scoping.project_id,
402 received_at,
403 retention,
404 item,
405 )?,
406 other => {
407 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
408 let item_types = envelope
409 .items()
410 .map(|item| item.ty().as_str())
411 .collect::<Vec<_>>();
412 let attachment_types = envelope
413 .items()
414 .map(|item| {
415 item.attachment_type()
416 .map(|t| t.to_string())
417 .unwrap_or_default()
418 })
419 .collect::<Vec<_>>();
420
421 relay_log::with_scope(
422 |scope| {
423 scope.set_extra("item_types", item_types.into());
424 scope.set_extra("attachment_types", attachment_types.into());
425 if other == &ItemType::FormData {
426 let payload = item.payload();
427 let form_data_keys = FormDataIter::new(&payload)
428 .map(|entry| entry.key())
429 .collect::<Vec<_>>();
430 scope.set_extra("form_data_keys", form_data_keys.into());
431 }
432 },
433 || {
434 relay_log::error!(
435 tags.project_key = %scoping.project_key,
436 tags.event_type = event_type.unwrap_or("none"),
437 "StoreService received unexpected item type: {other}"
438 )
439 },
440 )
441 }
442 }
443 }
444
445 if let Some(recording) = replay_recording {
446 let replay_event = replay_event.map(|rv| rv.payload());
452 self.produce_replay_recording(
453 event_id,
454 scoping,
455 &recording.payload(),
456 replay_event.as_deref(),
457 None,
458 received_at,
459 retention,
460 replay_relay_snuba_publish_disabled,
461 )?;
462 }
463
464 if let Some(event_item) = event_item {
465 let event_id = event_id.ok_or(StoreError::NoEventId)?;
466 let project_id = scoping.project_id;
467 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
468
469 self.produce(
470 event_topic,
471 KafkaMessage::Event(EventKafkaMessage {
472 payload: event_item.payload(),
473 start_time: safe_timestamp(received_at),
474 event_id,
475 project_id,
476 remote_addr,
477 attachments,
478 }),
479 )?;
480 } else {
481 debug_assert!(attachments.is_empty());
482 }
483
484 Ok(())
485 }
486
487 fn handle_store_metrics(&self, message: StoreMetrics) {
488 let StoreMetrics {
489 buckets,
490 scoping,
491 retention,
492 } = message;
493
494 let batch_size = self.config.metrics_max_batch_size_bytes();
495 let mut error = None;
496
497 let global_config = self.global_config.current();
498 let mut encoder = BucketEncoder::new(&global_config);
499
500 let now = UnixTimestamp::now();
501 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
502
503 for mut bucket in buckets {
504 let namespace = encoder.prepare(&mut bucket);
505
506 if let Some(received_at) = bucket.metadata.received_at {
507 let delay = now.as_secs().saturating_sub(received_at.as_secs());
508 let (total, count, max) = delay_stats.get_mut(namespace);
509 *total += delay;
510 *count += 1;
511 *max = (*max).max(delay);
512 }
513
514 for view in BucketsView::new(std::slice::from_ref(&bucket))
518 .by_size(batch_size)
519 .flatten()
520 {
521 let message = self.create_metric_message(
522 scoping.organization_id,
523 scoping.project_id,
524 &mut encoder,
525 namespace,
526 &view,
527 retention,
528 );
529
530 let result =
531 message.and_then(|message| self.send_metric_message(namespace, message));
532
533 let outcome = match result {
534 Ok(()) => Outcome::Accepted,
535 Err(e) => {
536 error.get_or_insert(e);
537 Outcome::Invalid(DiscardReason::Internal)
538 }
539 };
540
541 self.metric_outcomes.track(scoping, &[view], outcome);
542 }
543 }
544
545 if let Some(error) = error {
546 relay_log::error!(
547 error = &error as &dyn std::error::Error,
548 "failed to produce metric buckets: {error}"
549 );
550 }
551
552 for (namespace, (total, count, max)) in delay_stats {
553 if count == 0 {
554 continue;
555 }
556 metric!(
557 counter(RelayCounters::MetricDelaySum) += total,
558 namespace = namespace.as_str()
559 );
560 metric!(
561 counter(RelayCounters::MetricDelayCount) += count,
562 namespace = namespace.as_str()
563 );
564 metric!(
565 gauge(RelayGauges::MetricDelayMax) = max,
566 namespace = namespace.as_str()
567 );
568 }
569 }
570
571 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
572 let scoping = message.scoping();
573 let received_at = message.received_at();
574
575 let quantities = message.try_accept(|item| {
576 let item_type = item.trace_item.item_type();
577 let message = KafkaMessage::Item {
578 headers: BTreeMap::from([
579 ("project_id".to_owned(), scoping.project_id.to_string()),
580 ("item_type".to_owned(), (item_type as i32).to_string()),
581 ]),
582 message: item.trace_item,
583 item_type,
584 };
585
586 self.produce(KafkaTopic::Items, message)
587 .map(|()| item.quantities)
588 });
589
590 if let Ok(quantities) = quantities {
595 for (category, quantity) in quantities {
596 self.outcome_aggregator.send(TrackOutcome {
597 category,
598 event_id: None,
599 outcome: Outcome::Accepted,
600 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
601 remote_addr: None,
602 scoping,
603 timestamp: received_at,
604 });
605 }
606 }
607 }
608
609 fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
610 let scoping = message.scoping();
611 let received_at = message.received_at();
612
613 let meta = SpanMeta {
614 organization_id: scoping.organization_id,
615 project_id: scoping.project_id,
616 key_id: scoping.key_id,
617 event_id: None,
618 retention_days: message.retention_days,
619 downsampled_retention_days: message.downsampled_retention_days,
620 received: datetime_to_timestamp(received_at),
621 };
622
623 let result = message.try_accept(|span| {
624 let item = Annotated::new(span.item);
625 let message = KafkaMessage::SpanV2 {
626 routing_key: span.routing_key,
627 headers: BTreeMap::from([(
628 "project_id".to_owned(),
629 scoping.project_id.to_string(),
630 )]),
631 message: SpanKafkaMessage {
632 meta,
633 span: SerializableAnnotated(&item),
634 },
635 };
636
637 self.produce(KafkaTopic::Spans, message)
638 });
639
640 match result {
641 Ok(()) => {
642 relay_statsd::metric!(
643 counter(RelayCounters::SpanV2Produced) += 1,
644 via = "processing"
645 );
646
647 self.outcome_aggregator.send(TrackOutcome {
650 category: DataCategory::SpanIndexed,
651 event_id: None,
652 outcome: Outcome::Accepted,
653 quantity: 1,
654 remote_addr: None,
655 scoping,
656 timestamp: received_at,
657 });
658 }
659 Err(error) => {
660 relay_log::error!(
661 error = &error as &dyn Error,
662 tags.project_key = %scoping.project_key,
663 "failed to store span"
664 );
665 }
666 }
667 }
668
669 fn create_metric_message<'a>(
670 &self,
671 organization_id: OrganizationId,
672 project_id: ProjectId,
673 encoder: &'a mut BucketEncoder,
674 namespace: MetricNamespace,
675 view: &BucketView<'a>,
676 retention_days: u16,
677 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
678 let value = match view.value() {
679 BucketViewValue::Counter(c) => MetricValue::Counter(c),
680 BucketViewValue::Distribution(data) => MetricValue::Distribution(
681 encoder
682 .encode_distribution(namespace, data)
683 .map_err(StoreError::EncodingFailed)?,
684 ),
685 BucketViewValue::Set(data) => MetricValue::Set(
686 encoder
687 .encode_set(namespace, data)
688 .map_err(StoreError::EncodingFailed)?,
689 ),
690 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
691 };
692
693 Ok(MetricKafkaMessage {
694 org_id: organization_id,
695 project_id,
696 name: view.name(),
697 value,
698 timestamp: view.timestamp(),
699 tags: view.tags(),
700 retention_days,
701 received_at: view.metadata().received_at,
702 })
703 }
704
705 fn produce(
706 &self,
707 topic: KafkaTopic,
708 message: KafkaMessage,
710 ) -> Result<(), StoreError> {
711 relay_log::trace!("Sending kafka message of type {}", message.variant());
712
713 let topic_name = self.producer.client.send_message(topic, &message)?;
714
715 match &message {
716 KafkaMessage::Metric {
717 message: metric, ..
718 } => {
719 metric!(
720 counter(RelayCounters::ProcessingMessageProduced) += 1,
721 event_type = message.variant(),
722 topic = topic_name,
723 metric_type = metric.value.variant(),
724 metric_encoding = metric.value.encoding().unwrap_or(""),
725 );
726 }
727 KafkaMessage::ReplayRecordingNotChunked(replay) => {
728 let has_video = replay.replay_video.is_some();
729
730 metric!(
731 counter(RelayCounters::ProcessingMessageProduced) += 1,
732 event_type = message.variant(),
733 topic = topic_name,
734 has_video = bool_to_str(has_video),
735 );
736 }
737 message => {
738 metric!(
739 counter(RelayCounters::ProcessingMessageProduced) += 1,
740 event_type = message.variant(),
741 topic = topic_name,
742 );
743 }
744 }
745
746 Ok(())
747 }
748
749 fn produce_attachment(
761 &self,
762 event_id: EventId,
763 project_id: ProjectId,
764 item: &Item,
765 send_individual_attachments: bool,
766 ) -> Result<Option<ChunkedAttachment>, StoreError> {
767 let id = Uuid::new_v4().to_string();
768
769 let payload = item.payload();
770 let size = item.len();
771 let max_chunk_size = self.config.attachment_chunk_size();
772
773 let payload = if size == 0 {
774 AttachmentPayload::Chunked(0)
775 } else if let Some(stored_key) = item.stored_key() {
776 AttachmentPayload::Stored(stored_key.into())
777 } else if send_individual_attachments && size < max_chunk_size {
778 AttachmentPayload::Inline(payload)
782 } else {
783 let mut chunk_index = 0;
784 let mut offset = 0;
785 while offset < size {
788 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
789 let chunk_message = AttachmentChunkKafkaMessage {
790 payload: payload.slice(offset..offset + chunk_size),
791 event_id,
792 project_id,
793 id: id.clone(),
794 chunk_index,
795 };
796
797 self.produce(
798 KafkaTopic::Attachments,
799 KafkaMessage::AttachmentChunk(chunk_message),
800 )?;
801 offset += chunk_size;
802 chunk_index += 1;
803 }
804
805 AttachmentPayload::Chunked(chunk_index)
808 };
809
810 let attachment = ChunkedAttachment {
811 id,
812 name: match item.filename() {
813 Some(name) => name.to_owned(),
814 None => UNNAMED_ATTACHMENT.to_owned(),
815 },
816 rate_limited: item.rate_limited(),
817 content_type: item
818 .content_type()
819 .map(|content_type| content_type.as_str().to_owned()),
820 attachment_type: item.attachment_type().cloned().unwrap_or_default(),
821 size,
822 payload,
823 };
824
825 if send_individual_attachments {
826 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
827 event_id,
828 project_id,
829 attachment,
830 });
831 self.produce(KafkaTopic::Attachments, message)?;
832 Ok(None)
833 } else {
834 Ok(Some(attachment))
835 }
836 }
837
838 fn produce_user_report(
839 &self,
840 event_id: EventId,
841 project_id: ProjectId,
842 received_at: DateTime<Utc>,
843 item: &Item,
844 ) -> Result<(), StoreError> {
845 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
846 project_id,
847 event_id,
848 start_time: safe_timestamp(received_at),
849 payload: item.payload(),
850 });
851
852 self.produce(KafkaTopic::Attachments, message)
853 }
854
855 fn produce_user_report_v2(
856 &self,
857 event_id: EventId,
858 project_id: ProjectId,
859 received_at: DateTime<Utc>,
860 item: &Item,
861 remote_addr: Option<String>,
862 ) -> Result<(), StoreError> {
863 let message = KafkaMessage::Event(EventKafkaMessage {
864 project_id,
865 event_id,
866 payload: item.payload(),
867 start_time: safe_timestamp(received_at),
868 remote_addr,
869 attachments: vec![],
870 });
871 self.produce(KafkaTopic::Feedback, message)
872 }
873
874 fn send_metric_message(
875 &self,
876 namespace: MetricNamespace,
877 message: MetricKafkaMessage,
878 ) -> Result<(), StoreError> {
879 let topic = match namespace {
880 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
881 MetricNamespace::Unsupported => {
882 relay_log::with_scope(
883 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
884 || relay_log::error!("store service dropping unknown metric usecase"),
885 );
886 return Ok(());
887 }
888 _ => KafkaTopic::MetricsGeneric,
889 };
890 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
891
892 self.produce(topic, KafkaMessage::Metric { headers, message })?;
893 Ok(())
894 }
895
896 fn produce_profile(
897 &self,
898 organization_id: OrganizationId,
899 project_id: ProjectId,
900 key_id: Option<u64>,
901 received_at: DateTime<Utc>,
902 retention_days: u16,
903 item: &Item,
904 ) -> Result<(), StoreError> {
905 let message = ProfileKafkaMessage {
906 organization_id,
907 project_id,
908 key_id,
909 received: safe_timestamp(received_at),
910 retention_days,
911 headers: BTreeMap::from([(
912 "sampled".to_owned(),
913 if item.sampled() { "true" } else { "false" }.to_owned(),
914 )]),
915 payload: item.payload(),
916 };
917 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
918 Ok(())
919 }
920
921 fn produce_replay_event(
922 &self,
923 replay_id: EventId,
924 project_id: ProjectId,
925 received_at: DateTime<Utc>,
926 retention_days: u16,
927 payload: &[u8],
928 relay_snuba_publish_disabled: bool,
929 ) -> Result<(), StoreError> {
930 if relay_snuba_publish_disabled {
931 return Ok(());
932 }
933
934 let message = ReplayEventKafkaMessage {
935 replay_id,
936 project_id,
937 retention_days,
938 start_time: safe_timestamp(received_at),
939 payload,
940 };
941 self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
942 Ok(())
943 }
944
945 #[allow(clippy::too_many_arguments)]
946 fn produce_replay_recording(
947 &self,
948 event_id: Option<EventId>,
949 scoping: Scoping,
950 payload: &[u8],
951 replay_event: Option<&[u8]>,
952 replay_video: Option<&[u8]>,
953 received_at: DateTime<Utc>,
954 retention: u16,
955 relay_snuba_publish_disabled: bool,
956 ) -> Result<(), StoreError> {
957 let max_payload_size = self.config.max_replay_message_size();
959
960 let mut payload_size = 2000; payload_size += replay_event.as_ref().map_or(0, |b| b.len());
964 payload_size += replay_video.as_ref().map_or(0, |b| b.len());
965 payload_size += payload.len();
966
967 if payload_size >= max_payload_size {
969 relay_log::debug!("replay_recording over maximum size.");
970 self.outcome_aggregator.send(TrackOutcome {
971 category: DataCategory::Replay,
972 event_id,
973 outcome: Outcome::Invalid(DiscardReason::TooLarge(
974 DiscardItemType::ReplayRecording,
975 )),
976 quantity: 1,
977 remote_addr: None,
978 scoping,
979 timestamp: received_at,
980 });
981 return Ok(());
982 }
983
984 let message =
985 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
986 replay_id: event_id.ok_or(StoreError::NoEventId)?,
987 project_id: scoping.project_id,
988 key_id: scoping.key_id,
989 org_id: scoping.organization_id,
990 received: safe_timestamp(received_at),
991 retention_days: retention,
992 payload,
993 replay_event,
994 replay_video,
995 relay_snuba_publish_disabled,
996 });
997
998 self.produce(KafkaTopic::ReplayRecordings, message)?;
999
1000 Ok(())
1001 }
1002
1003 fn produce_replay_video(
1004 &self,
1005 event_id: Option<EventId>,
1006 scoping: Scoping,
1007 payload: Bytes,
1008 received_at: DateTime<Utc>,
1009 retention: u16,
1010 relay_snuba_publish_disabled: bool,
1011 ) -> Result<(), StoreError> {
1012 #[derive(Deserialize)]
1013 struct VideoEvent<'a> {
1014 replay_event: &'a [u8],
1015 replay_recording: &'a [u8],
1016 replay_video: &'a [u8],
1017 }
1018
1019 let Ok(VideoEvent {
1020 replay_video,
1021 replay_event,
1022 replay_recording,
1023 }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1024 else {
1025 self.outcome_aggregator.send(TrackOutcome {
1026 category: DataCategory::Replay,
1027 event_id,
1028 outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1029 quantity: 1,
1030 remote_addr: None,
1031 scoping,
1032 timestamp: received_at,
1033 });
1034 return Ok(());
1035 };
1036
1037 self.produce_replay_event(
1038 event_id.ok_or(StoreError::NoEventId)?,
1039 scoping.project_id,
1040 received_at,
1041 retention,
1042 replay_event,
1043 relay_snuba_publish_disabled,
1044 )?;
1045
1046 self.produce_replay_recording(
1047 event_id,
1048 scoping,
1049 replay_recording,
1050 Some(replay_event),
1051 Some(replay_video),
1052 received_at,
1053 retention,
1054 relay_snuba_publish_disabled,
1055 )
1056 }
1057
1058 fn produce_check_in(
1059 &self,
1060 project_id: ProjectId,
1061 received_at: DateTime<Utc>,
1062 client: Option<&str>,
1063 retention_days: u16,
1064 item: &Item,
1065 ) -> Result<(), StoreError> {
1066 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1067 message_type: CheckInMessageType::CheckIn,
1068 project_id,
1069 retention_days,
1070 start_time: safe_timestamp(received_at),
1071 sdk: client.map(str::to_owned),
1072 payload: item.payload(),
1073 routing_key_hint: item.routing_hint(),
1074 });
1075
1076 self.produce(KafkaTopic::Monitors, message)?;
1077
1078 Ok(())
1079 }
1080
1081 fn produce_span(
1082 &self,
1083 scoping: Scoping,
1084 received_at: DateTime<Utc>,
1085 event_id: Option<EventId>,
1086 retention_days: u16,
1087 downsampled_retention_days: u16,
1088 item: &Item,
1089 ) -> Result<(), StoreError> {
1090 debug_assert_eq!(item.ty(), &ItemType::Span);
1091 debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1092
1093 let Scoping {
1094 organization_id,
1095 project_id,
1096 project_key: _,
1097 key_id,
1098 } = scoping;
1099
1100 let payload = item.payload();
1101 let message = SpanKafkaMessageRaw {
1102 meta: SpanMeta {
1103 organization_id,
1104 project_id,
1105 key_id,
1106 event_id,
1107 retention_days,
1108 downsampled_retention_days,
1109 received: datetime_to_timestamp(received_at),
1110 },
1111 span: serde_json::from_slice(&payload)
1112 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1113 };
1114
1115 debug_assert!(message.span.contains_key("attributes"));
1117 relay_statsd::metric!(
1118 counter(RelayCounters::SpanV2Produced) += 1,
1119 via = "envelope"
1120 );
1121
1122 self.produce(
1123 KafkaTopic::Spans,
1124 KafkaMessage::SpanRaw {
1125 routing_key: item.routing_hint(),
1126 headers: BTreeMap::from([(
1127 "project_id".to_owned(),
1128 scoping.project_id.to_string(),
1129 )]),
1130 message,
1131 },
1132 )?;
1133
1134 self.outcome_aggregator.send(TrackOutcome {
1137 category: DataCategory::SpanIndexed,
1138 event_id: None,
1139 outcome: Outcome::Accepted,
1140 quantity: 1,
1141 remote_addr: None,
1142 scoping,
1143 timestamp: received_at,
1144 });
1145
1146 Ok(())
1147 }
1148
1149 fn produce_profile_chunk(
1150 &self,
1151 organization_id: OrganizationId,
1152 project_id: ProjectId,
1153 received_at: DateTime<Utc>,
1154 retention_days: u16,
1155 item: &Item,
1156 ) -> Result<(), StoreError> {
1157 let message = ProfileChunkKafkaMessage {
1158 organization_id,
1159 project_id,
1160 received: safe_timestamp(received_at),
1161 retention_days,
1162 payload: item.payload(),
1163 };
1164 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1165 Ok(())
1166 }
1167}
1168
1169impl Service for StoreService {
1170 type Interface = Store;
1171
1172 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1173 let this = Arc::new(self);
1174
1175 relay_log::info!("store forwarder started");
1176
1177 while let Some(message) = rx.recv().await {
1178 let service = Arc::clone(&this);
1179 this.pool
1182 .spawn_async(async move { service.handle_message(message) }.boxed())
1183 .await;
1184 }
1185
1186 relay_log::info!("store forwarder stopped");
1187 }
1188}
1189
1190#[derive(Debug, Serialize)]
1192enum AttachmentPayload {
1193 #[serde(rename = "chunks")]
1198 Chunked(usize),
1199
1200 #[serde(rename = "data")]
1202 Inline(Bytes),
1203
1204 #[serde(rename = "stored_id")]
1206 Stored(String),
1207}
1208
1209#[derive(Debug, Serialize)]
1211struct ChunkedAttachment {
1212 id: String,
1216
1217 name: String,
1219
1220 rate_limited: bool,
1227
1228 #[serde(skip_serializing_if = "Option::is_none")]
1230 content_type: Option<String>,
1231
1232 #[serde(serialize_with = "serialize_attachment_type")]
1234 attachment_type: AttachmentType,
1235
1236 size: usize,
1238
1239 #[serde(flatten)]
1241 payload: AttachmentPayload,
1242}
1243
1244fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1250where
1251 S: serde::Serializer,
1252 T: serde::Serialize,
1253{
1254 serde_json::to_value(t)
1255 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1256 .serialize(serializer)
1257}
1258
1259#[derive(Debug, Serialize)]
1261struct EventKafkaMessage {
1262 payload: Bytes,
1264 start_time: u64,
1266 event_id: EventId,
1268 project_id: ProjectId,
1270 remote_addr: Option<String>,
1272 attachments: Vec<ChunkedAttachment>,
1274}
1275
1276#[derive(Debug, Serialize)]
1277struct ReplayEventKafkaMessage<'a> {
1278 payload: &'a [u8],
1280 start_time: u64,
1282 replay_id: EventId,
1284 project_id: ProjectId,
1286 retention_days: u16,
1288}
1289
1290#[derive(Debug, Serialize)]
1292struct AttachmentChunkKafkaMessage {
1293 payload: Bytes,
1295 event_id: EventId,
1297 project_id: ProjectId,
1299 id: String,
1303 chunk_index: usize,
1305}
1306
1307#[derive(Debug, Serialize)]
1312struct AttachmentKafkaMessage {
1313 event_id: EventId,
1315 project_id: ProjectId,
1317 attachment: ChunkedAttachment,
1319}
1320
1321#[derive(Debug, Serialize)]
1322struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1323 replay_id: EventId,
1324 key_id: Option<u64>,
1325 org_id: OrganizationId,
1326 project_id: ProjectId,
1327 received: u64,
1328 retention_days: u16,
1329 #[serde(with = "serde_bytes")]
1330 payload: &'a [u8],
1331 #[serde(with = "serde_bytes")]
1332 replay_event: Option<&'a [u8]>,
1333 #[serde(with = "serde_bytes")]
1334 replay_video: Option<&'a [u8]>,
1335 relay_snuba_publish_disabled: bool,
1336}
1337
1338#[derive(Debug, Serialize)]
1342struct UserReportKafkaMessage {
1343 project_id: ProjectId,
1345 start_time: u64,
1346 payload: Bytes,
1347
1348 #[serde(skip)]
1350 event_id: EventId,
1351}
1352
1353#[derive(Clone, Debug, Serialize)]
1354struct MetricKafkaMessage<'a> {
1355 org_id: OrganizationId,
1356 project_id: ProjectId,
1357 name: &'a MetricName,
1358 #[serde(flatten)]
1359 value: MetricValue<'a>,
1360 timestamp: UnixTimestamp,
1361 tags: &'a BTreeMap<String, String>,
1362 retention_days: u16,
1363 #[serde(skip_serializing_if = "Option::is_none")]
1364 received_at: Option<UnixTimestamp>,
1365}
1366
1367#[derive(Clone, Debug, Serialize)]
1368#[serde(tag = "type", content = "value")]
1369enum MetricValue<'a> {
1370 #[serde(rename = "c")]
1371 Counter(FiniteF64),
1372 #[serde(rename = "d")]
1373 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1374 #[serde(rename = "s")]
1375 Set(ArrayEncoding<'a, SetView<'a>>),
1376 #[serde(rename = "g")]
1377 Gauge(GaugeValue),
1378}
1379
1380impl MetricValue<'_> {
1381 fn variant(&self) -> &'static str {
1382 match self {
1383 Self::Counter(_) => "counter",
1384 Self::Distribution(_) => "distribution",
1385 Self::Set(_) => "set",
1386 Self::Gauge(_) => "gauge",
1387 }
1388 }
1389
1390 fn encoding(&self) -> Option<&'static str> {
1391 match self {
1392 Self::Distribution(ae) => Some(ae.name()),
1393 Self::Set(ae) => Some(ae.name()),
1394 _ => None,
1395 }
1396 }
1397}
1398
1399#[derive(Clone, Debug, Serialize)]
1400struct ProfileKafkaMessage {
1401 organization_id: OrganizationId,
1402 project_id: ProjectId,
1403 key_id: Option<u64>,
1404 received: u64,
1405 retention_days: u16,
1406 #[serde(skip)]
1407 headers: BTreeMap<String, String>,
1408 payload: Bytes,
1409}
1410
1411#[allow(dead_code)]
1417#[derive(Debug, Serialize)]
1418#[serde(rename_all = "snake_case")]
1419enum CheckInMessageType {
1420 ClockPulse,
1421 CheckIn,
1422}
1423
1424#[derive(Debug, Serialize)]
1425struct CheckInKafkaMessage {
1426 #[serde(skip)]
1427 routing_key_hint: Option<Uuid>,
1428
1429 message_type: CheckInMessageType,
1431 payload: Bytes,
1433 start_time: u64,
1435 sdk: Option<String>,
1437 project_id: ProjectId,
1439 retention_days: u16,
1441}
1442
1443#[derive(Debug, Serialize)]
1444struct SpanKafkaMessageRaw<'a> {
1445 #[serde(flatten)]
1446 meta: SpanMeta,
1447 #[serde(flatten)]
1448 span: BTreeMap<&'a str, &'a RawValue>,
1449}
1450
1451#[derive(Debug, Serialize)]
1452struct SpanKafkaMessage<'a> {
1453 #[serde(flatten)]
1454 meta: SpanMeta,
1455 #[serde(flatten)]
1456 span: SerializableAnnotated<'a, SpanV2>,
1457}
1458
1459#[derive(Debug, Serialize)]
1460struct SpanMeta {
1461 organization_id: OrganizationId,
1462 project_id: ProjectId,
1463 #[serde(skip_serializing_if = "Option::is_none")]
1465 key_id: Option<u64>,
1466 #[serde(skip_serializing_if = "Option::is_none")]
1467 event_id: Option<EventId>,
1468 received: f64,
1470 retention_days: u16,
1472 downsampled_retention_days: u16,
1474}
1475
1476#[derive(Clone, Debug, Serialize)]
1477struct ProfileChunkKafkaMessage {
1478 organization_id: OrganizationId,
1479 project_id: ProjectId,
1480 received: u64,
1481 retention_days: u16,
1482 payload: Bytes,
1483}
1484
1485#[derive(Debug, Serialize)]
1487#[serde(tag = "type", rename_all = "snake_case")]
1488#[allow(clippy::large_enum_variant)]
1489enum KafkaMessage<'a> {
1490 Event(EventKafkaMessage),
1491 UserReport(UserReportKafkaMessage),
1492 Metric {
1493 #[serde(skip)]
1494 headers: BTreeMap<String, String>,
1495 #[serde(flatten)]
1496 message: MetricKafkaMessage<'a>,
1497 },
1498 CheckIn(CheckInKafkaMessage),
1499 Item {
1500 #[serde(skip)]
1501 headers: BTreeMap<String, String>,
1502 #[serde(skip)]
1503 item_type: TraceItemType,
1504 #[serde(skip)]
1505 message: TraceItem,
1506 },
1507 SpanRaw {
1508 #[serde(skip)]
1509 routing_key: Option<Uuid>,
1510 #[serde(skip)]
1511 headers: BTreeMap<String, String>,
1512 #[serde(flatten)]
1513 message: SpanKafkaMessageRaw<'a>,
1514 },
1515 SpanV2 {
1516 #[serde(skip)]
1517 routing_key: Option<Uuid>,
1518 #[serde(skip)]
1519 headers: BTreeMap<String, String>,
1520 #[serde(flatten)]
1521 message: SpanKafkaMessage<'a>,
1522 },
1523
1524 Attachment(AttachmentKafkaMessage),
1525 AttachmentChunk(AttachmentChunkKafkaMessage),
1526
1527 Profile(ProfileKafkaMessage),
1528 ProfileChunk(ProfileChunkKafkaMessage),
1529
1530 ReplayEvent(ReplayEventKafkaMessage<'a>),
1531 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1532}
1533
1534impl Message for KafkaMessage<'_> {
1535 fn variant(&self) -> &'static str {
1536 match self {
1537 KafkaMessage::Event(_) => "event",
1538 KafkaMessage::UserReport(_) => "user_report",
1539 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1540 MetricNamespace::Sessions => "metric_sessions",
1541 MetricNamespace::Transactions => "metric_transactions",
1542 MetricNamespace::Spans => "metric_spans",
1543 MetricNamespace::Custom => "metric_custom",
1544 MetricNamespace::Unsupported => "metric_unsupported",
1545 },
1546 KafkaMessage::CheckIn(_) => "check_in",
1547 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1548 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1549
1550 KafkaMessage::Attachment(_) => "attachment",
1551 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1552
1553 KafkaMessage::Profile(_) => "profile",
1554 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1555
1556 KafkaMessage::ReplayEvent(_) => "replay_event",
1557 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1558 }
1559 }
1560
1561 fn key(&self) -> Option<relay_kafka::Key> {
1563 match self {
1564 Self::Event(message) => Some(message.event_id.0),
1565 Self::UserReport(message) => Some(message.event_id.0),
1566 Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1567
1568 Self::CheckIn(message) => message.routing_key_hint,
1573
1574 Self::Attachment(message) => Some(message.event_id.0),
1575 Self::AttachmentChunk(message) => Some(message.event_id.0),
1576 Self::ReplayEvent(message) => Some(message.replay_id.0),
1577
1578 Self::Metric { .. }
1580 | Self::Item { .. }
1581 | Self::Profile(_)
1582 | Self::ProfileChunk(_)
1583 | Self::ReplayRecordingNotChunked(_) => None,
1584 }
1585 .filter(|uuid| !uuid.is_nil())
1586 .map(|uuid| uuid.as_u128())
1587 }
1588
1589 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1590 match &self {
1591 KafkaMessage::Metric { headers, .. }
1592 | KafkaMessage::SpanRaw { headers, .. }
1593 | KafkaMessage::SpanV2 { headers, .. }
1594 | KafkaMessage::Item { headers, .. }
1595 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers),
1596
1597 KafkaMessage::Event(_)
1598 | KafkaMessage::UserReport(_)
1599 | KafkaMessage::CheckIn(_)
1600 | KafkaMessage::Attachment(_)
1601 | KafkaMessage::AttachmentChunk(_)
1602 | KafkaMessage::ProfileChunk(_)
1603 | KafkaMessage::ReplayEvent(_)
1604 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1605 }
1606 }
1607
1608 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1609 match self {
1610 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1611 KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1612 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1613 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1614 KafkaMessage::Item { message, .. } => {
1615 let mut payload = Vec::new();
1616 match message.encode(&mut payload) {
1617 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1618 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1619 }
1620 }
1621 KafkaMessage::Event(_)
1622 | KafkaMessage::UserReport(_)
1623 | KafkaMessage::CheckIn(_)
1624 | KafkaMessage::Attachment(_)
1625 | KafkaMessage::AttachmentChunk(_)
1626 | KafkaMessage::Profile(_)
1627 | KafkaMessage::ProfileChunk(_)
1628 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1629 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1630 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1631 },
1632 }
1633 }
1634}
1635
1636fn serialize_as_json<T: serde::Serialize>(
1637 value: &T,
1638) -> Result<SerializationOutput<'_>, ClientError> {
1639 match serde_json::to_vec(value) {
1640 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1641 Err(err) => Err(ClientError::InvalidJson(err)),
1642 }
1643}
1644
1645fn is_slow_item(item: &Item) -> bool {
1649 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1650}
1651
1652fn bool_to_str(value: bool) -> &'static str {
1653 if value { "true" } else { "false" }
1654}
1655
1656fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1660 let ts = timestamp.timestamp();
1661 if ts >= 0 {
1662 return ts as u64;
1663 }
1664
1665 Utc::now().timestamp() as u64
1667}
1668
1669#[cfg(test)]
1670mod tests {
1671
1672 use super::*;
1673
1674 #[test]
1675 fn disallow_outcomes() {
1676 let config = Config::default();
1677 let producer = Producer::create(&config).unwrap();
1678
1679 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1680 let res = producer
1681 .client
1682 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1683
1684 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1685 }
1686 }
1687}