1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46mod sessions;
47
48const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
50
51#[derive(Debug, thiserror::Error)]
52pub enum StoreError {
53 #[error("failed to send the message to kafka: {0}")]
54 SendFailed(#[from] ClientError),
55 #[error("failed to encode data: {0}")]
56 EncodingFailed(std::io::Error),
57 #[error("failed to store event because event id was missing")]
58 NoEventId,
59}
60
61impl OutcomeError for StoreError {
62 type Error = Self;
63
64 fn consume(self) -> (Option<Outcome>, Self::Error) {
65 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
66 }
67}
68
69struct Producer {
70 client: KafkaClient,
71}
72
73impl Producer {
74 pub fn create(config: &Config) -> anyhow::Result<Self> {
75 let mut client_builder = KafkaClient::builder();
76
77 for topic in KafkaTopic::iter().filter(|t| {
78 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
81 }) {
82 let kafka_configs = config.kafka_configs(*topic)?;
83 client_builder = client_builder
84 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
85 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
86 }
87
88 Ok(Self {
89 client: client_builder.build(),
90 })
91 }
92}
93
94#[derive(Debug)]
96pub struct StoreEnvelope {
97 pub envelope: TypedEnvelope<Processed>,
98}
99
100#[derive(Clone, Debug)]
102pub struct StoreMetrics {
103 pub buckets: Vec<Bucket>,
104 pub scoping: Scoping,
105 pub retention: u16,
106}
107
108#[derive(Debug)]
110pub struct StoreTraceItem {
111 pub trace_item: TraceItem,
113 pub quantities: Quantities,
118}
119
120impl Counted for StoreTraceItem {
121 fn quantities(&self) -> Quantities {
122 self.quantities.clone()
123 }
124}
125
126#[derive(Debug)]
128pub struct StoreSpanV2 {
129 pub routing_key: Option<Uuid>,
131 pub retention_days: u16,
133 pub downsampled_retention_days: u16,
135 pub item: SpanV2,
137}
138
139impl Counted for StoreSpanV2 {
140 fn quantities(&self) -> Quantities {
141 smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
142 }
143}
144
145#[derive(Debug)]
147pub struct StoreProfileChunk {
148 pub retention_days: u16,
150 pub payload: Bytes,
152 pub quantities: Quantities,
156}
157
158impl Counted for StoreProfileChunk {
159 fn quantities(&self) -> Quantities {
160 self.quantities.clone()
161 }
162}
163
164pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
166
167#[derive(Debug)]
169pub enum Store {
170 Envelope(StoreEnvelope),
178 Metrics(StoreMetrics),
180 TraceItem(Managed<StoreTraceItem>),
182 Span(Managed<Box<StoreSpanV2>>),
184 ProfileChunk(Managed<StoreProfileChunk>),
186}
187
188impl Store {
189 fn variant(&self) -> &'static str {
191 match self {
192 Store::Envelope(_) => "envelope",
193 Store::Metrics(_) => "metrics",
194 Store::TraceItem(_) => "trace_item",
195 Store::Span(_) => "span",
196 Store::ProfileChunk(_) => "profile_chunk",
197 }
198 }
199}
200
201impl Interface for Store {}
202
203impl FromMessage<StoreEnvelope> for Store {
204 type Response = NoResponse;
205
206 fn from_message(message: StoreEnvelope, _: ()) -> Self {
207 Self::Envelope(message)
208 }
209}
210
211impl FromMessage<StoreMetrics> for Store {
212 type Response = NoResponse;
213
214 fn from_message(message: StoreMetrics, _: ()) -> Self {
215 Self::Metrics(message)
216 }
217}
218
219impl FromMessage<Managed<StoreTraceItem>> for Store {
220 type Response = NoResponse;
221
222 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
223 Self::TraceItem(message)
224 }
225}
226
227impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
228 type Response = NoResponse;
229
230 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
231 Self::Span(message)
232 }
233}
234
235impl FromMessage<Managed<StoreProfileChunk>> for Store {
236 type Response = NoResponse;
237
238 fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
239 Self::ProfileChunk(message)
240 }
241}
242
243pub struct StoreService {
245 pool: StoreServicePool,
246 config: Arc<Config>,
247 global_config: GlobalConfigHandle,
248 outcome_aggregator: Addr<TrackOutcome>,
249 metric_outcomes: MetricOutcomes,
250 producer: Producer,
251}
252
253impl StoreService {
254 pub fn create(
255 pool: StoreServicePool,
256 config: Arc<Config>,
257 global_config: GlobalConfigHandle,
258 outcome_aggregator: Addr<TrackOutcome>,
259 metric_outcomes: MetricOutcomes,
260 ) -> anyhow::Result<Self> {
261 let producer = Producer::create(&config)?;
262 Ok(Self {
263 pool,
264 config,
265 global_config,
266 outcome_aggregator,
267 metric_outcomes,
268 producer,
269 })
270 }
271
272 fn handle_message(&self, message: Store) {
273 let ty = message.variant();
274 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
275 match message {
276 Store::Envelope(message) => self.handle_store_envelope(message),
277 Store::Metrics(message) => self.handle_store_metrics(message),
278 Store::TraceItem(message) => self.handle_store_trace_item(message),
279 Store::Span(message) => self.handle_store_span(message),
280 Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
281 }
282 })
283 }
284
285 fn handle_store_envelope(&self, message: StoreEnvelope) {
286 let StoreEnvelope { mut envelope } = message;
287
288 let scoping = envelope.scoping();
289 match self.store_envelope(&mut envelope) {
290 Ok(()) => envelope.accept(),
291 Err(error) => {
292 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
293 relay_log::error!(
294 error = &error as &dyn Error,
295 tags.project_key = %scoping.project_key,
296 "failed to store envelope"
297 );
298 }
299 }
300 }
301
302 fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
303 let mut envelope = managed_envelope.take_envelope();
304 let received_at = managed_envelope.received_at();
305 let scoping = managed_envelope.scoping();
306
307 let retention = envelope.retention();
308 let downsampled_retention = envelope.downsampled_retention();
309
310 let event_id = envelope.event_id();
311 let event_item = envelope.as_mut().take_item_by(|item| {
312 matches!(
313 item.ty(),
314 ItemType::Event | ItemType::Transaction | ItemType::Security
315 )
316 });
317 let event_type = event_item.as_ref().map(|item| item.ty());
318
319 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
323 KafkaTopic::Transactions
324 } else if envelope.get_item_by(is_slow_item).is_some() {
325 KafkaTopic::Attachments
326 } else {
327 KafkaTopic::Events
328 };
329
330 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
331
332 let mut attachments = Vec::new();
333 let mut replay_event = None;
334 let mut replay_recording = None;
335
336 for item in envelope.items() {
337 let content_type = item.content_type();
338 match item.ty() {
339 ItemType::Attachment => {
340 if let Some(attachment) = self.produce_attachment(
341 event_id.ok_or(StoreError::NoEventId)?,
342 scoping.project_id,
343 item,
344 send_individual_attachments,
345 )? {
346 attachments.push(attachment);
347 }
348 }
349 ItemType::UserReport => {
350 debug_assert!(event_topic == KafkaTopic::Attachments);
351 self.produce_user_report(
352 event_id.ok_or(StoreError::NoEventId)?,
353 scoping.project_id,
354 received_at,
355 item,
356 )?;
357 }
358 ItemType::UserReportV2 => {
359 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
360 self.produce_user_report_v2(
361 event_id.ok_or(StoreError::NoEventId)?,
362 scoping.project_id,
363 received_at,
364 item,
365 remote_addr,
366 )?;
367 }
368 ItemType::Profile => self.produce_profile(
369 scoping.organization_id,
370 scoping.project_id,
371 scoping.key_id,
372 received_at,
373 retention,
374 item,
375 )?,
376 ItemType::ReplayVideo => {
377 self.produce_replay_video(
378 event_id,
379 scoping,
380 item.payload(),
381 received_at,
382 retention,
383 )?;
384 }
385 ItemType::ReplayRecording => {
386 replay_recording = Some(item);
387 }
388 ItemType::ReplayEvent => {
389 replay_event = Some(item);
390 }
391 ItemType::CheckIn => {
392 let client = envelope.meta().client();
393 self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
394 }
395 ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
396 scoping,
397 received_at,
398 event_id,
399 retention,
400 downsampled_retention,
401 item,
402 )?,
403 ty @ ItemType::Log => {
404 debug_assert!(
405 false,
406 "received {ty} through an envelope, \
407 this item must be submitted via a specific store message instead"
408 );
409 relay_log::error!(
410 tags.project_key = %scoping.project_key,
411 "StoreService received unsupported item type '{ty}' in envelope"
412 );
413 }
414 other => {
415 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
416 let item_types = envelope
417 .items()
418 .map(|item| item.ty().as_str())
419 .collect::<Vec<_>>();
420 let attachment_types = envelope
421 .items()
422 .map(|item| {
423 item.attachment_type()
424 .map(|t| t.to_string())
425 .unwrap_or_default()
426 })
427 .collect::<Vec<_>>();
428
429 relay_log::with_scope(
430 |scope| {
431 scope.set_extra("item_types", item_types.into());
432 scope.set_extra("attachment_types", attachment_types.into());
433 if other == &ItemType::FormData {
434 let payload = item.payload();
435 let form_data_keys = FormDataIter::new(&payload)
436 .map(|entry| entry.key())
437 .collect::<Vec<_>>();
438 scope.set_extra("form_data_keys", form_data_keys.into());
439 }
440 },
441 || {
442 relay_log::error!(
443 tags.project_key = %scoping.project_key,
444 tags.event_type = event_type.unwrap_or("none"),
445 "StoreService received unexpected item type: {other}"
446 )
447 },
448 )
449 }
450 }
451 }
452
453 if let Some(recording) = replay_recording {
454 let replay_event = replay_event.map(|rv| rv.payload());
460 self.produce_replay_recording(
461 event_id,
462 scoping,
463 &recording.payload(),
464 replay_event.as_deref(),
465 None,
466 received_at,
467 retention,
468 )?;
469 }
470
471 if let Some(event_item) = event_item {
472 let event_id = event_id.ok_or(StoreError::NoEventId)?;
473 let project_id = scoping.project_id;
474 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
475
476 self.produce(
477 event_topic,
478 KafkaMessage::Event(EventKafkaMessage {
479 payload: event_item.payload(),
480 start_time: safe_timestamp(received_at),
481 event_id,
482 project_id,
483 remote_addr,
484 attachments,
485 }),
486 )?;
487 } else {
488 debug_assert!(attachments.is_empty());
489 }
490
491 Ok(())
492 }
493
494 fn handle_store_metrics(&self, message: StoreMetrics) {
495 let StoreMetrics {
496 buckets,
497 scoping,
498 retention,
499 } = message;
500
501 let batch_size = self.config.metrics_max_batch_size_bytes();
502 let mut error = None;
503
504 let global_config = self.global_config.current();
505 let mut encoder = BucketEncoder::new(&global_config);
506
507 let emit_sessions_to_eap = utils::is_rolled_out(
508 scoping.organization_id.value(),
509 global_config.options.sessions_eap_rollout_rate,
510 )
511 .is_keep();
512
513 let now = UnixTimestamp::now();
514 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
515
516 for mut bucket in buckets {
517 let namespace = encoder.prepare(&mut bucket);
518
519 if let Some(received_at) = bucket.metadata.received_at {
520 let delay = now.as_secs().saturating_sub(received_at.as_secs());
521 let (total, count, max) = delay_stats.get_mut(namespace);
522 *total += delay;
523 *count += 1;
524 *max = (*max).max(delay);
525 }
526
527 for view in BucketsView::new(std::slice::from_ref(&bucket))
531 .by_size(batch_size)
532 .flatten()
533 {
534 let message = self.create_metric_message(
535 scoping.organization_id,
536 scoping.project_id,
537 &mut encoder,
538 namespace,
539 &view,
540 retention,
541 );
542
543 let result =
544 message.and_then(|message| self.send_metric_message(namespace, message));
545
546 let outcome = match result {
547 Ok(()) => Outcome::Accepted,
548 Err(e) => {
549 error.get_or_insert(e);
550 Outcome::Invalid(DiscardReason::Internal)
551 }
552 };
553
554 self.metric_outcomes.track(scoping, &[view], outcome);
555 }
556
557 if emit_sessions_to_eap
558 && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
559 {
560 let message = KafkaMessage::for_item(scoping, trace_item);
561 let _ = self.produce(KafkaTopic::Items, message);
562 }
563 }
564
565 if let Some(error) = error {
566 relay_log::error!(
567 error = &error as &dyn std::error::Error,
568 "failed to produce metric buckets: {error}"
569 );
570 }
571
572 for (namespace, (total, count, max)) in delay_stats {
573 if count == 0 {
574 continue;
575 }
576 metric!(
577 counter(RelayCounters::MetricDelaySum) += total,
578 namespace = namespace.as_str()
579 );
580 metric!(
581 counter(RelayCounters::MetricDelayCount) += count,
582 namespace = namespace.as_str()
583 );
584 metric!(
585 gauge(RelayGauges::MetricDelayMax) = max,
586 namespace = namespace.as_str()
587 );
588 }
589 }
590
591 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
592 let scoping = message.scoping();
593 let received_at = message.received_at();
594
595 let quantities = message.try_accept(|item| {
596 let message = KafkaMessage::for_item(scoping, item.trace_item);
597 self.produce(KafkaTopic::Items, message)
598 .map(|()| item.quantities)
599 });
600
601 if let Ok(quantities) = quantities {
606 for (category, quantity) in quantities {
607 self.outcome_aggregator.send(TrackOutcome {
608 category,
609 event_id: None,
610 outcome: Outcome::Accepted,
611 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
612 remote_addr: None,
613 scoping,
614 timestamp: received_at,
615 });
616 }
617 }
618 }
619
620 fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
621 let scoping = message.scoping();
622 let received_at = message.received_at();
623
624 let meta = SpanMeta {
625 organization_id: scoping.organization_id,
626 project_id: scoping.project_id,
627 key_id: scoping.key_id,
628 event_id: None,
629 retention_days: message.retention_days,
630 downsampled_retention_days: message.downsampled_retention_days,
631 received: datetime_to_timestamp(received_at),
632 };
633
634 let result = message.try_accept(|span| {
635 let item = Annotated::new(span.item);
636 let message = KafkaMessage::SpanV2 {
637 routing_key: span.routing_key,
638 headers: BTreeMap::from([(
639 "project_id".to_owned(),
640 scoping.project_id.to_string(),
641 )]),
642 message: SpanKafkaMessage {
643 meta,
644 span: SerializableAnnotated(&item),
645 },
646 };
647
648 self.produce(KafkaTopic::Spans, message)
649 });
650
651 if result.is_ok() {
652 relay_statsd::metric!(
653 counter(RelayCounters::SpanV2Produced) += 1,
654 via = "processing"
655 );
656
657 self.outcome_aggregator.send(TrackOutcome {
660 category: DataCategory::SpanIndexed,
661 event_id: None,
662 outcome: Outcome::Accepted,
663 quantity: 1,
664 remote_addr: None,
665 scoping,
666 timestamp: received_at,
667 });
668 }
669 }
670
671 fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
672 let scoping = message.scoping();
673 let received_at = message.received_at();
674
675 let _ = message.try_accept(|message| {
676 let message = ProfileChunkKafkaMessage {
677 organization_id: scoping.organization_id,
678 project_id: scoping.project_id,
679 received: safe_timestamp(received_at),
680 retention_days: message.retention_days,
681 headers: BTreeMap::from([(
682 "project_id".to_owned(),
683 scoping.project_id.to_string(),
684 )]),
685 payload: message.payload,
686 };
687
688 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
689 });
690 }
691
692 fn create_metric_message<'a>(
693 &self,
694 organization_id: OrganizationId,
695 project_id: ProjectId,
696 encoder: &'a mut BucketEncoder,
697 namespace: MetricNamespace,
698 view: &BucketView<'a>,
699 retention_days: u16,
700 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
701 let value = match view.value() {
702 BucketViewValue::Counter(c) => MetricValue::Counter(c),
703 BucketViewValue::Distribution(data) => MetricValue::Distribution(
704 encoder
705 .encode_distribution(namespace, data)
706 .map_err(StoreError::EncodingFailed)?,
707 ),
708 BucketViewValue::Set(data) => MetricValue::Set(
709 encoder
710 .encode_set(namespace, data)
711 .map_err(StoreError::EncodingFailed)?,
712 ),
713 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
714 };
715
716 Ok(MetricKafkaMessage {
717 org_id: organization_id,
718 project_id,
719 name: view.name(),
720 value,
721 timestamp: view.timestamp(),
722 tags: view.tags(),
723 retention_days,
724 received_at: view.metadata().received_at,
725 })
726 }
727
728 fn produce(
729 &self,
730 topic: KafkaTopic,
731 message: KafkaMessage,
733 ) -> Result<(), StoreError> {
734 relay_log::trace!("Sending kafka message of type {}", message.variant());
735
736 let topic_name = self
737 .producer
738 .client
739 .send_message(topic, &message)
740 .inspect_err(|err| {
741 relay_log::error!(
742 error = err as &dyn Error,
743 tags.topic = ?topic,
744 tags.message = message.variant(),
745 "failed to produce to Kafka"
746 )
747 })?;
748
749 match &message {
750 KafkaMessage::Metric {
751 message: metric, ..
752 } => {
753 metric!(
754 counter(RelayCounters::ProcessingMessageProduced) += 1,
755 event_type = message.variant(),
756 topic = topic_name,
757 metric_type = metric.value.variant(),
758 metric_encoding = metric.value.encoding().unwrap_or(""),
759 );
760 }
761 KafkaMessage::ReplayRecordingNotChunked(replay) => {
762 let has_video = replay.replay_video.is_some();
763
764 metric!(
765 counter(RelayCounters::ProcessingMessageProduced) += 1,
766 event_type = message.variant(),
767 topic = topic_name,
768 has_video = bool_to_str(has_video),
769 );
770 }
771 message => {
772 metric!(
773 counter(RelayCounters::ProcessingMessageProduced) += 1,
774 event_type = message.variant(),
775 topic = topic_name,
776 );
777 }
778 }
779
780 Ok(())
781 }
782
783 fn produce_attachment(
795 &self,
796 event_id: EventId,
797 project_id: ProjectId,
798 item: &Item,
799 send_individual_attachments: bool,
800 ) -> Result<Option<ChunkedAttachment>, StoreError> {
801 let id = Uuid::new_v4().to_string();
802
803 let payload = item.payload();
804 let size = item.len();
805 let max_chunk_size = self.config.attachment_chunk_size();
806
807 let payload = if size == 0 {
808 AttachmentPayload::Chunked(0)
809 } else if let Some(stored_key) = item.stored_key() {
810 AttachmentPayload::Stored(stored_key.into())
811 } else if send_individual_attachments && size < max_chunk_size {
812 AttachmentPayload::Inline(payload)
816 } else {
817 let mut chunk_index = 0;
818 let mut offset = 0;
819 while offset < size {
822 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
823 let chunk_message = AttachmentChunkKafkaMessage {
824 payload: payload.slice(offset..offset + chunk_size),
825 event_id,
826 project_id,
827 id: id.clone(),
828 chunk_index,
829 };
830
831 self.produce(
832 KafkaTopic::Attachments,
833 KafkaMessage::AttachmentChunk(chunk_message),
834 )?;
835 offset += chunk_size;
836 chunk_index += 1;
837 }
838
839 AttachmentPayload::Chunked(chunk_index)
842 };
843
844 let attachment = ChunkedAttachment {
845 id,
846 name: match item.filename() {
847 Some(name) => name.to_owned(),
848 None => UNNAMED_ATTACHMENT.to_owned(),
849 },
850 rate_limited: item.rate_limited(),
851 content_type: item
852 .content_type()
853 .map(|content_type| content_type.as_str().to_owned()),
854 attachment_type: item.attachment_type().cloned().unwrap_or_default(),
855 size,
856 payload,
857 };
858
859 if send_individual_attachments {
860 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
861 event_id,
862 project_id,
863 attachment,
864 });
865 self.produce(KafkaTopic::Attachments, message)?;
866 Ok(None)
867 } else {
868 Ok(Some(attachment))
869 }
870 }
871
872 fn produce_user_report(
873 &self,
874 event_id: EventId,
875 project_id: ProjectId,
876 received_at: DateTime<Utc>,
877 item: &Item,
878 ) -> Result<(), StoreError> {
879 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
880 project_id,
881 event_id,
882 start_time: safe_timestamp(received_at),
883 payload: item.payload(),
884 });
885
886 self.produce(KafkaTopic::Attachments, message)
887 }
888
889 fn produce_user_report_v2(
890 &self,
891 event_id: EventId,
892 project_id: ProjectId,
893 received_at: DateTime<Utc>,
894 item: &Item,
895 remote_addr: Option<String>,
896 ) -> Result<(), StoreError> {
897 let message = KafkaMessage::Event(EventKafkaMessage {
898 project_id,
899 event_id,
900 payload: item.payload(),
901 start_time: safe_timestamp(received_at),
902 remote_addr,
903 attachments: vec![],
904 });
905 self.produce(KafkaTopic::Feedback, message)
906 }
907
908 fn send_metric_message(
909 &self,
910 namespace: MetricNamespace,
911 message: MetricKafkaMessage,
912 ) -> Result<(), StoreError> {
913 let topic = match namespace {
914 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
915 MetricNamespace::Unsupported => {
916 relay_log::with_scope(
917 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
918 || relay_log::error!("store service dropping unknown metric usecase"),
919 );
920 return Ok(());
921 }
922 _ => KafkaTopic::MetricsGeneric,
923 };
924
925 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
926 self.produce(topic, KafkaMessage::Metric { headers, message })?;
927 Ok(())
928 }
929
930 fn produce_profile(
931 &self,
932 organization_id: OrganizationId,
933 project_id: ProjectId,
934 key_id: Option<u64>,
935 received_at: DateTime<Utc>,
936 retention_days: u16,
937 item: &Item,
938 ) -> Result<(), StoreError> {
939 let message = ProfileKafkaMessage {
940 organization_id,
941 project_id,
942 key_id,
943 received: safe_timestamp(received_at),
944 retention_days,
945 headers: BTreeMap::from([
946 (
947 "sampled".to_owned(),
948 if item.sampled() { "true" } else { "false" }.to_owned(),
949 ),
950 ("project_id".to_owned(), project_id.to_string()),
951 ]),
952 payload: item.payload(),
953 };
954 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
955 Ok(())
956 }
957
958 #[allow(clippy::too_many_arguments)]
959 fn produce_replay_recording(
960 &self,
961 event_id: Option<EventId>,
962 scoping: Scoping,
963 payload: &[u8],
964 replay_event: Option<&[u8]>,
965 replay_video: Option<&[u8]>,
966 received_at: DateTime<Utc>,
967 retention: u16,
968 ) -> Result<(), StoreError> {
969 let max_payload_size = self.config.max_replay_message_size();
971
972 let mut payload_size = 2000; payload_size += replay_event.as_ref().map_or(0, |b| b.len());
976 payload_size += replay_video.as_ref().map_or(0, |b| b.len());
977 payload_size += payload.len();
978
979 if payload_size >= max_payload_size {
981 relay_log::debug!("replay_recording over maximum size.");
982 self.outcome_aggregator.send(TrackOutcome {
983 category: DataCategory::Replay,
984 event_id,
985 outcome: Outcome::Invalid(DiscardReason::TooLarge(
986 DiscardItemType::ReplayRecording,
987 )),
988 quantity: 1,
989 remote_addr: None,
990 scoping,
991 timestamp: received_at,
992 });
993 return Ok(());
994 }
995
996 let message =
997 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
998 replay_id: event_id.ok_or(StoreError::NoEventId)?,
999 project_id: scoping.project_id,
1000 key_id: scoping.key_id,
1001 org_id: scoping.organization_id,
1002 received: safe_timestamp(received_at),
1003 retention_days: retention,
1004 payload,
1005 replay_event,
1006 replay_video,
1007 relay_snuba_publish_disabled: true,
1010 });
1011
1012 self.produce(KafkaTopic::ReplayRecordings, message)?;
1013
1014 Ok(())
1015 }
1016
1017 fn produce_replay_video(
1018 &self,
1019 event_id: Option<EventId>,
1020 scoping: Scoping,
1021 payload: Bytes,
1022 received_at: DateTime<Utc>,
1023 retention: u16,
1024 ) -> Result<(), StoreError> {
1025 #[derive(Deserialize)]
1026 struct VideoEvent<'a> {
1027 replay_event: &'a [u8],
1028 replay_recording: &'a [u8],
1029 replay_video: &'a [u8],
1030 }
1031
1032 let Ok(VideoEvent {
1033 replay_video,
1034 replay_event,
1035 replay_recording,
1036 }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1037 else {
1038 self.outcome_aggregator.send(TrackOutcome {
1039 category: DataCategory::Replay,
1040 event_id,
1041 outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1042 quantity: 1,
1043 remote_addr: None,
1044 scoping,
1045 timestamp: received_at,
1046 });
1047 return Ok(());
1048 };
1049
1050 self.produce_replay_recording(
1051 event_id,
1052 scoping,
1053 replay_recording,
1054 Some(replay_event),
1055 Some(replay_video),
1056 received_at,
1057 retention,
1058 )
1059 }
1060
1061 fn produce_check_in(
1062 &self,
1063 project_id: ProjectId,
1064 received_at: DateTime<Utc>,
1065 client: Option<&str>,
1066 retention_days: u16,
1067 item: &Item,
1068 ) -> Result<(), StoreError> {
1069 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1070 message_type: CheckInMessageType::CheckIn,
1071 project_id,
1072 retention_days,
1073 start_time: safe_timestamp(received_at),
1074 sdk: client.map(str::to_owned),
1075 payload: item.payload(),
1076 routing_key_hint: item.routing_hint(),
1077 });
1078
1079 self.produce(KafkaTopic::Monitors, message)?;
1080
1081 Ok(())
1082 }
1083
1084 fn produce_span(
1085 &self,
1086 scoping: Scoping,
1087 received_at: DateTime<Utc>,
1088 event_id: Option<EventId>,
1089 retention_days: u16,
1090 downsampled_retention_days: u16,
1091 item: &Item,
1092 ) -> Result<(), StoreError> {
1093 debug_assert_eq!(item.ty(), &ItemType::Span);
1094 debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1095
1096 let Scoping {
1097 organization_id,
1098 project_id,
1099 project_key: _,
1100 key_id,
1101 } = scoping;
1102
1103 let payload = item.payload();
1104 let message = SpanKafkaMessageRaw {
1105 meta: SpanMeta {
1106 organization_id,
1107 project_id,
1108 key_id,
1109 event_id,
1110 retention_days,
1111 downsampled_retention_days,
1112 received: datetime_to_timestamp(received_at),
1113 },
1114 span: serde_json::from_slice(&payload)
1115 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1116 };
1117
1118 debug_assert!(message.span.contains_key("attributes"));
1120 relay_statsd::metric!(
1121 counter(RelayCounters::SpanV2Produced) += 1,
1122 via = "envelope"
1123 );
1124
1125 self.produce(
1126 KafkaTopic::Spans,
1127 KafkaMessage::SpanRaw {
1128 routing_key: item.routing_hint(),
1129 headers: BTreeMap::from([(
1130 "project_id".to_owned(),
1131 scoping.project_id.to_string(),
1132 )]),
1133 message,
1134 },
1135 )?;
1136
1137 self.outcome_aggregator.send(TrackOutcome {
1140 category: DataCategory::SpanIndexed,
1141 event_id: None,
1142 outcome: Outcome::Accepted,
1143 quantity: 1,
1144 remote_addr: None,
1145 scoping,
1146 timestamp: received_at,
1147 });
1148
1149 Ok(())
1150 }
1151}
1152
1153impl Service for StoreService {
1154 type Interface = Store;
1155
1156 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1157 let this = Arc::new(self);
1158
1159 relay_log::info!("store forwarder started");
1160
1161 while let Some(message) = rx.recv().await {
1162 let service = Arc::clone(&this);
1163 this.pool
1166 .spawn_async(async move { service.handle_message(message) }.boxed())
1167 .await;
1168 }
1169
1170 relay_log::info!("store forwarder stopped");
1171 }
1172}
1173
1174#[derive(Debug, Serialize)]
1176enum AttachmentPayload {
1177 #[serde(rename = "chunks")]
1182 Chunked(usize),
1183
1184 #[serde(rename = "data")]
1186 Inline(Bytes),
1187
1188 #[serde(rename = "stored_id")]
1190 Stored(String),
1191}
1192
1193#[derive(Debug, Serialize)]
1195struct ChunkedAttachment {
1196 id: String,
1200
1201 name: String,
1203
1204 rate_limited: bool,
1211
1212 #[serde(skip_serializing_if = "Option::is_none")]
1214 content_type: Option<String>,
1215
1216 #[serde(serialize_with = "serialize_attachment_type")]
1218 attachment_type: AttachmentType,
1219
1220 size: usize,
1222
1223 #[serde(flatten)]
1225 payload: AttachmentPayload,
1226}
1227
1228fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1234where
1235 S: serde::Serializer,
1236 T: serde::Serialize,
1237{
1238 serde_json::to_value(t)
1239 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1240 .serialize(serializer)
1241}
1242
1243#[derive(Debug, Serialize)]
1245struct EventKafkaMessage {
1246 payload: Bytes,
1248 start_time: u64,
1250 event_id: EventId,
1252 project_id: ProjectId,
1254 remote_addr: Option<String>,
1256 attachments: Vec<ChunkedAttachment>,
1258}
1259
1260#[derive(Debug, Serialize)]
1262struct AttachmentChunkKafkaMessage {
1263 payload: Bytes,
1265 event_id: EventId,
1267 project_id: ProjectId,
1269 id: String,
1273 chunk_index: usize,
1275}
1276
1277#[derive(Debug, Serialize)]
1282struct AttachmentKafkaMessage {
1283 event_id: EventId,
1285 project_id: ProjectId,
1287 attachment: ChunkedAttachment,
1289}
1290
1291#[derive(Debug, Serialize)]
1292struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1293 replay_id: EventId,
1294 key_id: Option<u64>,
1295 org_id: OrganizationId,
1296 project_id: ProjectId,
1297 received: u64,
1298 retention_days: u16,
1299 #[serde(with = "serde_bytes")]
1300 payload: &'a [u8],
1301 #[serde(with = "serde_bytes")]
1302 replay_event: Option<&'a [u8]>,
1303 #[serde(with = "serde_bytes")]
1304 replay_video: Option<&'a [u8]>,
1305 relay_snuba_publish_disabled: bool,
1306}
1307
1308#[derive(Debug, Serialize)]
1312struct UserReportKafkaMessage {
1313 project_id: ProjectId,
1315 start_time: u64,
1316 payload: Bytes,
1317
1318 #[serde(skip)]
1320 event_id: EventId,
1321}
1322
1323#[derive(Clone, Debug, Serialize)]
1324struct MetricKafkaMessage<'a> {
1325 org_id: OrganizationId,
1326 project_id: ProjectId,
1327 name: &'a MetricName,
1328 #[serde(flatten)]
1329 value: MetricValue<'a>,
1330 timestamp: UnixTimestamp,
1331 tags: &'a BTreeMap<String, String>,
1332 retention_days: u16,
1333 #[serde(skip_serializing_if = "Option::is_none")]
1334 received_at: Option<UnixTimestamp>,
1335}
1336
1337#[derive(Clone, Debug, Serialize)]
1338#[serde(tag = "type", content = "value")]
1339enum MetricValue<'a> {
1340 #[serde(rename = "c")]
1341 Counter(FiniteF64),
1342 #[serde(rename = "d")]
1343 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1344 #[serde(rename = "s")]
1345 Set(ArrayEncoding<'a, SetView<'a>>),
1346 #[serde(rename = "g")]
1347 Gauge(GaugeValue),
1348}
1349
1350impl MetricValue<'_> {
1351 fn variant(&self) -> &'static str {
1352 match self {
1353 Self::Counter(_) => "counter",
1354 Self::Distribution(_) => "distribution",
1355 Self::Set(_) => "set",
1356 Self::Gauge(_) => "gauge",
1357 }
1358 }
1359
1360 fn encoding(&self) -> Option<&'static str> {
1361 match self {
1362 Self::Distribution(ae) => Some(ae.name()),
1363 Self::Set(ae) => Some(ae.name()),
1364 _ => None,
1365 }
1366 }
1367}
1368
1369#[derive(Clone, Debug, Serialize)]
1370struct ProfileKafkaMessage {
1371 organization_id: OrganizationId,
1372 project_id: ProjectId,
1373 key_id: Option<u64>,
1374 received: u64,
1375 retention_days: u16,
1376 #[serde(skip)]
1377 headers: BTreeMap<String, String>,
1378 payload: Bytes,
1379}
1380
1381#[allow(dead_code)]
1387#[derive(Debug, Serialize)]
1388#[serde(rename_all = "snake_case")]
1389enum CheckInMessageType {
1390 ClockPulse,
1391 CheckIn,
1392}
1393
1394#[derive(Debug, Serialize)]
1395struct CheckInKafkaMessage {
1396 #[serde(skip)]
1397 routing_key_hint: Option<Uuid>,
1398
1399 message_type: CheckInMessageType,
1401 payload: Bytes,
1403 start_time: u64,
1405 sdk: Option<String>,
1407 project_id: ProjectId,
1409 retention_days: u16,
1411}
1412
1413#[derive(Debug, Serialize)]
1414struct SpanKafkaMessageRaw<'a> {
1415 #[serde(flatten)]
1416 meta: SpanMeta,
1417 #[serde(flatten)]
1418 span: BTreeMap<&'a str, &'a RawValue>,
1419}
1420
1421#[derive(Debug, Serialize)]
1422struct SpanKafkaMessage<'a> {
1423 #[serde(flatten)]
1424 meta: SpanMeta,
1425 #[serde(flatten)]
1426 span: SerializableAnnotated<'a, SpanV2>,
1427}
1428
1429#[derive(Debug, Serialize)]
1430struct SpanMeta {
1431 organization_id: OrganizationId,
1432 project_id: ProjectId,
1433 #[serde(skip_serializing_if = "Option::is_none")]
1435 key_id: Option<u64>,
1436 #[serde(skip_serializing_if = "Option::is_none")]
1437 event_id: Option<EventId>,
1438 received: f64,
1440 retention_days: u16,
1442 downsampled_retention_days: u16,
1444}
1445
1446#[derive(Clone, Debug, Serialize)]
1447struct ProfileChunkKafkaMessage {
1448 organization_id: OrganizationId,
1449 project_id: ProjectId,
1450 received: u64,
1451 retention_days: u16,
1452 #[serde(skip)]
1453 headers: BTreeMap<String, String>,
1454 payload: Bytes,
1455}
1456
1457#[derive(Debug, Serialize)]
1459#[serde(tag = "type", rename_all = "snake_case")]
1460#[allow(clippy::large_enum_variant)]
1461enum KafkaMessage<'a> {
1462 Event(EventKafkaMessage),
1463 UserReport(UserReportKafkaMessage),
1464 Metric {
1465 #[serde(skip)]
1466 headers: BTreeMap<String, String>,
1467 #[serde(flatten)]
1468 message: MetricKafkaMessage<'a>,
1469 },
1470 CheckIn(CheckInKafkaMessage),
1471 Item {
1472 #[serde(skip)]
1473 headers: BTreeMap<String, String>,
1474 #[serde(skip)]
1475 item_type: TraceItemType,
1476 #[serde(skip)]
1477 message: TraceItem,
1478 },
1479 SpanRaw {
1480 #[serde(skip)]
1481 routing_key: Option<Uuid>,
1482 #[serde(skip)]
1483 headers: BTreeMap<String, String>,
1484 #[serde(flatten)]
1485 message: SpanKafkaMessageRaw<'a>,
1486 },
1487 SpanV2 {
1488 #[serde(skip)]
1489 routing_key: Option<Uuid>,
1490 #[serde(skip)]
1491 headers: BTreeMap<String, String>,
1492 #[serde(flatten)]
1493 message: SpanKafkaMessage<'a>,
1494 },
1495
1496 Attachment(AttachmentKafkaMessage),
1497 AttachmentChunk(AttachmentChunkKafkaMessage),
1498
1499 Profile(ProfileKafkaMessage),
1500 ProfileChunk(ProfileChunkKafkaMessage),
1501
1502 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1503}
1504
1505impl KafkaMessage<'_> {
1506 fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1508 let item_type = item.item_type();
1509 KafkaMessage::Item {
1510 headers: BTreeMap::from([
1511 ("project_id".to_owned(), scoping.project_id.to_string()),
1512 ("item_type".to_owned(), (item_type as i32).to_string()),
1513 ]),
1514 message: item,
1515 item_type,
1516 }
1517 }
1518}
1519
1520impl Message for KafkaMessage<'_> {
1521 fn variant(&self) -> &'static str {
1522 match self {
1523 KafkaMessage::Event(_) => "event",
1524 KafkaMessage::UserReport(_) => "user_report",
1525 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1526 MetricNamespace::Sessions => "metric_sessions",
1527 MetricNamespace::Transactions => "metric_transactions",
1528 MetricNamespace::Spans => "metric_spans",
1529 MetricNamespace::Custom => "metric_custom",
1530 MetricNamespace::Unsupported => "metric_unsupported",
1531 },
1532 KafkaMessage::CheckIn(_) => "check_in",
1533 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1534 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1535
1536 KafkaMessage::Attachment(_) => "attachment",
1537 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1538
1539 KafkaMessage::Profile(_) => "profile",
1540 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1541
1542 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1543 }
1544 }
1545
1546 fn key(&self) -> Option<relay_kafka::Key> {
1548 match self {
1549 Self::Event(message) => Some(message.event_id.0),
1550 Self::UserReport(message) => Some(message.event_id.0),
1551 Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1552
1553 Self::CheckIn(message) => message.routing_key_hint,
1558
1559 Self::Attachment(message) => Some(message.event_id.0),
1560 Self::AttachmentChunk(message) => Some(message.event_id.0),
1561
1562 Self::Metric { .. }
1564 | Self::Item { .. }
1565 | Self::Profile(_)
1566 | Self::ProfileChunk(_)
1567 | Self::ReplayRecordingNotChunked(_) => None,
1568 }
1569 .filter(|uuid| !uuid.is_nil())
1570 .map(|uuid| uuid.as_u128())
1571 }
1572
1573 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1574 match &self {
1575 KafkaMessage::Metric { headers, .. }
1576 | KafkaMessage::SpanRaw { headers, .. }
1577 | KafkaMessage::SpanV2 { headers, .. }
1578 | KafkaMessage::Item { headers, .. }
1579 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1580 | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1581
1582 KafkaMessage::Event(_)
1583 | KafkaMessage::UserReport(_)
1584 | KafkaMessage::CheckIn(_)
1585 | KafkaMessage::Attachment(_)
1586 | KafkaMessage::AttachmentChunk(_)
1587 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1588 }
1589 }
1590
1591 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1592 match self {
1593 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1594 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1595 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1596 KafkaMessage::Item { message, .. } => {
1597 let mut payload = Vec::new();
1598 match message.encode(&mut payload) {
1599 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1600 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1601 }
1602 }
1603 KafkaMessage::Event(_)
1604 | KafkaMessage::UserReport(_)
1605 | KafkaMessage::CheckIn(_)
1606 | KafkaMessage::Attachment(_)
1607 | KafkaMessage::AttachmentChunk(_)
1608 | KafkaMessage::Profile(_)
1609 | KafkaMessage::ProfileChunk(_)
1610 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1611 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1612 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1613 },
1614 }
1615 }
1616}
1617
1618fn serialize_as_json<T: serde::Serialize>(
1619 value: &T,
1620) -> Result<SerializationOutput<'_>, ClientError> {
1621 match serde_json::to_vec(value) {
1622 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1623 Err(err) => Err(ClientError::InvalidJson(err)),
1624 }
1625}
1626
1627fn is_slow_item(item: &Item) -> bool {
1631 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1632}
1633
1634fn bool_to_str(value: bool) -> &'static str {
1635 if value { "true" } else { "false" }
1636}
1637
1638fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1642 let ts = timestamp.timestamp();
1643 if ts >= 0 {
1644 return ts as u64;
1645 }
1646
1647 Utc::now().timestamp() as u64
1649}
1650
1651#[cfg(test)]
1652mod tests {
1653
1654 use super::*;
1655
1656 #[test]
1657 fn disallow_outcomes() {
1658 let config = Config::default();
1659 let producer = Producer::create(&config).unwrap();
1660
1661 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1662 let res = producer
1663 .client
1664 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1665
1666 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1667 }
1668 }
1669}