1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42use crate::services::upload::SignedLocation;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46mod sessions;
47
48const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
50
51#[derive(Debug, thiserror::Error)]
52pub enum StoreError {
53 #[error("failed to send the message to kafka: {0}")]
54 SendFailed(#[from] ClientError),
55 #[error("failed to encode data: {0}")]
56 EncodingFailed(std::io::Error),
57 #[error("failed to store event because event id was missing")]
58 NoEventId,
59 #[error("invalid attachment reference")]
60 InvalidAttachmentRef,
61}
62
63impl OutcomeError for StoreError {
64 type Error = Self;
65
66 fn consume(self) -> (Option<Outcome>, Self::Error) {
67 let outcome = match self {
68 StoreError::SendFailed(_) | StoreError::EncodingFailed(_) | StoreError::NoEventId => {
69 Some(Outcome::Invalid(DiscardReason::Internal))
70 }
71 StoreError::InvalidAttachmentRef => {
72 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
73 }
74 };
75 (outcome, self)
76 }
77}
78
79struct Producer {
80 client: KafkaClient,
81}
82
83impl Producer {
84 pub fn create(config: &Config) -> anyhow::Result<Self> {
85 let mut client_builder = KafkaClient::builder();
86
87 for topic in KafkaTopic::iter().filter(|t| {
88 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
91 }) {
92 let kafka_configs = config.kafka_configs(*topic)?;
93 client_builder = client_builder
94 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
95 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
96 }
97
98 Ok(Self {
99 client: client_builder.build(),
100 })
101 }
102}
103
104#[derive(Debug)]
106pub struct StoreEnvelope {
107 pub envelope: ManagedEnvelope,
108}
109
110#[derive(Clone, Debug)]
112pub struct StoreMetrics {
113 pub buckets: Vec<Bucket>,
114 pub scoping: Scoping,
115 pub retention: u16,
116}
117
118#[derive(Debug)]
120pub struct StoreTraceItem {
121 pub trace_item: TraceItem,
123}
124
125impl Counted for StoreTraceItem {
126 fn quantities(&self) -> Quantities {
127 self.trace_item.quantities()
128 }
129}
130
131#[derive(Debug)]
133pub struct StoreSpanV2 {
134 pub routing_key: Option<Uuid>,
136 pub retention_days: u16,
138 pub downsampled_retention_days: u16,
140 pub item: SpanV2,
142}
143
144impl Counted for StoreSpanV2 {
145 fn quantities(&self) -> Quantities {
146 smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
147 }
148}
149
150#[derive(Debug)]
152pub struct StoreProfileChunk {
153 pub retention_days: u16,
155 pub payload: Bytes,
157 pub quantities: Quantities,
161}
162
163impl Counted for StoreProfileChunk {
164 fn quantities(&self) -> Quantities {
165 self.quantities.clone()
166 }
167}
168
169#[derive(Debug)]
171pub struct StoreReplay {
172 pub event_id: EventId,
174 pub retention_days: u16,
176 pub recording: Bytes,
178 pub event: Option<Bytes>,
180 pub video: Option<Bytes>,
182 pub quantities: Quantities,
186}
187
188impl Counted for StoreReplay {
189 fn quantities(&self) -> Quantities {
190 self.quantities.clone()
191 }
192}
193
194#[derive(Debug)]
196pub struct StoreAttachment {
197 pub event_id: EventId,
199 pub attachment: Item,
201 pub quantities: Quantities,
203 pub retention: u16,
205}
206
207impl Counted for StoreAttachment {
208 fn quantities(&self) -> Quantities {
209 self.quantities.clone()
210 }
211}
212
213#[derive(Debug)]
215pub struct StoreUserReport {
216 pub event_id: EventId,
218 pub report: Item,
220}
221
222impl Counted for StoreUserReport {
223 fn quantities(&self) -> Quantities {
224 smallvec::smallvec![(DataCategory::UserReportV2, 1)]
225 }
226}
227
228#[derive(Debug)]
230pub struct StoreProfile {
231 pub retention_days: u16,
233 pub profile: Item,
235 pub quantities: Quantities,
237}
238
239impl Counted for StoreProfile {
240 fn quantities(&self) -> Quantities {
241 self.quantities.clone()
242 }
243}
244
245pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
247
248#[derive(Debug)]
250pub enum Store {
251 Envelope(StoreEnvelope),
259 Metrics(StoreMetrics),
261 TraceItem(Managed<StoreTraceItem>),
263 Span(Managed<Box<StoreSpanV2>>),
265 ProfileChunk(Managed<StoreProfileChunk>),
267 Replay(Managed<StoreReplay>),
269 Attachment(Managed<StoreAttachment>),
271 UserReport(Managed<StoreUserReport>),
273 Profile(Managed<StoreProfile>),
275}
276
277impl Store {
278 fn variant(&self) -> &'static str {
280 match self {
281 Store::Envelope(_) => "envelope",
282 Store::Metrics(_) => "metrics",
283 Store::TraceItem(_) => "trace_item",
284 Store::Span(_) => "span",
285 Store::ProfileChunk(_) => "profile_chunk",
286 Store::Replay(_) => "replay",
287 Store::Attachment(_) => "attachment",
288 Store::UserReport(_) => "user_report",
289 Store::Profile(_) => "profile",
290 }
291 }
292}
293
294impl Interface for Store {}
295
296impl FromMessage<StoreEnvelope> for Store {
297 type Response = NoResponse;
298
299 fn from_message(message: StoreEnvelope, _: ()) -> Self {
300 Self::Envelope(message)
301 }
302}
303
304impl FromMessage<StoreMetrics> for Store {
305 type Response = NoResponse;
306
307 fn from_message(message: StoreMetrics, _: ()) -> Self {
308 Self::Metrics(message)
309 }
310}
311
312impl FromMessage<Managed<StoreTraceItem>> for Store {
313 type Response = NoResponse;
314
315 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
316 Self::TraceItem(message)
317 }
318}
319
320impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
321 type Response = NoResponse;
322
323 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
324 Self::Span(message)
325 }
326}
327
328impl FromMessage<Managed<StoreProfileChunk>> for Store {
329 type Response = NoResponse;
330
331 fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
332 Self::ProfileChunk(message)
333 }
334}
335
336impl FromMessage<Managed<StoreReplay>> for Store {
337 type Response = NoResponse;
338
339 fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
340 Self::Replay(message)
341 }
342}
343
344impl FromMessage<Managed<StoreAttachment>> for Store {
345 type Response = NoResponse;
346
347 fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
348 Self::Attachment(message)
349 }
350}
351
352impl FromMessage<Managed<StoreUserReport>> for Store {
353 type Response = NoResponse;
354
355 fn from_message(message: Managed<StoreUserReport>, _: ()) -> Self {
356 Self::UserReport(message)
357 }
358}
359
360impl FromMessage<Managed<StoreProfile>> for Store {
361 type Response = NoResponse;
362
363 fn from_message(message: Managed<StoreProfile>, _: ()) -> Self {
364 Self::Profile(message)
365 }
366}
367
368pub struct StoreService {
370 pool: StoreServicePool,
371 config: Arc<Config>,
372 global_config: GlobalConfigHandle,
373 outcome_aggregator: Addr<TrackOutcome>,
374 metric_outcomes: MetricOutcomes,
375 producer: Producer,
376}
377
378impl StoreService {
379 pub fn create(
380 pool: StoreServicePool,
381 config: Arc<Config>,
382 global_config: GlobalConfigHandle,
383 outcome_aggregator: Addr<TrackOutcome>,
384 metric_outcomes: MetricOutcomes,
385 ) -> anyhow::Result<Self> {
386 let producer = Producer::create(&config)?;
387 Ok(Self {
388 pool,
389 config,
390 global_config,
391 outcome_aggregator,
392 metric_outcomes,
393 producer,
394 })
395 }
396
397 fn handle_message(&self, message: Store) {
398 relay_log::with_scope(|_| {}, || self.handle_message_inner(message))
400 }
401
402 fn handle_message_inner(&self, message: Store) {
403 let ty = message.variant();
404 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
405 let result = match message {
406 Store::Envelope(message) => self.handle_store_envelope(message),
407 Store::Metrics(message) => {
408 self.handle_store_metrics(message);
409 Ok(())
410 }
411 Store::TraceItem(message) => self.handle_store_trace_item(message),
412 Store::Span(message) => self.handle_store_span(message),
413 Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
414 Store::Replay(message) => self.handle_store_replay(message),
415 Store::Attachment(message) => self.handle_store_attachment(message),
416 Store::UserReport(message) => self.handle_user_report(message),
417 Store::Profile(message) => self.handle_profile(message),
418 };
419 if let Err(error) = result {
420 relay_log::error!(
421 error = &error as &dyn Error,
422 tags.message = ty,
423 "failed to store message"
424 );
425 }
426 })
427 }
428
429 fn handle_store_envelope(&self, message: StoreEnvelope) -> Result<(), Rejected<StoreError>> {
430 let StoreEnvelope { mut envelope } = message;
431
432 match self.store_envelope(&mut envelope) {
433 Ok(()) => {
434 envelope.accept();
435 Ok(())
436 }
437 Err(error) => {
438 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
442 Err(Managed::with_meta_from(&envelope, ()).reject_err(error))
443 }
444 }
445 }
446
447 fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
448 let mut envelope = managed_envelope.take_envelope();
449 let received_at = managed_envelope.received_at();
450 let scoping = managed_envelope.scoping();
451
452 let retention = envelope.retention();
453 let downsampled_retention = envelope.downsampled_retention();
454
455 let event_id = envelope.event_id();
456 let event_item = envelope.as_mut().take_item_by(|item| {
457 matches!(
458 item.ty(),
459 ItemType::Event | ItemType::Transaction | ItemType::Security
460 )
461 });
462 let event_type = event_item.as_ref().map(|item| item.ty());
463
464 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
468 KafkaTopic::Transactions
469 } else if envelope.get_item_by(is_slow_item).is_some() {
470 KafkaTopic::Attachments
471 } else {
472 KafkaTopic::Events
473 };
474
475 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
476
477 let mut attachments = Vec::new();
478
479 for item in envelope.items() {
480 let content_type = item.content_type();
481 match item.ty() {
482 ItemType::Attachment => {
483 if let Some(attachment) = self.produce_attachment(
484 event_id.ok_or(StoreError::NoEventId)?,
485 scoping.project_id,
486 scoping.organization_id,
487 item,
488 send_individual_attachments,
489 retention,
490 )? {
491 attachments.push(attachment);
492 }
493 }
494 ItemType::UserReport => {
495 debug_assert!(event_topic == KafkaTopic::Attachments);
496 self.produce_user_report(
497 event_id.ok_or(StoreError::NoEventId)?,
498 scoping.project_id,
499 scoping.organization_id,
500 received_at,
501 item,
502 )?;
503 }
504 ItemType::UserReportV2 => {
505 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
506 self.produce_user_report_v2(
507 event_id.ok_or(StoreError::NoEventId)?,
508 scoping.project_id,
509 scoping.organization_id,
510 received_at,
511 item,
512 remote_addr,
513 )?;
514 }
515 ItemType::Profile => self.produce_profile(
516 scoping.organization_id,
517 scoping.project_id,
518 scoping.key_id,
519 received_at,
520 retention,
521 item,
522 )?,
523 ItemType::CheckIn => {
524 let client = envelope.meta().client();
525 self.produce_check_in(
526 scoping.project_id,
527 scoping.organization_id,
528 received_at,
529 client,
530 retention,
531 item,
532 )?
533 }
534 ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span(
535 scoping,
536 received_at,
537 event_id,
538 retention,
539 downsampled_retention,
540 item,
541 )?,
542 ty @ ItemType::Log => {
543 debug_assert!(
544 false,
545 "received {ty} through an envelope, \
546 this item must be submitted via a specific store message instead"
547 );
548 relay_log::error!(
549 tags.project_key = %scoping.project_key,
550 "StoreService received unsupported item type '{ty}' in envelope"
551 );
552 }
553 other => {
554 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
555 let item_types = envelope
556 .items()
557 .map(|item| item.ty().as_str())
558 .collect::<Vec<_>>();
559 let attachment_types = envelope
560 .items()
561 .map(|item| {
562 item.attachment_type()
563 .map(|t| t.to_string())
564 .unwrap_or_default()
565 })
566 .collect::<Vec<_>>();
567
568 relay_log::with_scope(
569 |scope| {
570 scope.set_extra("item_types", item_types.into());
571 scope.set_extra("attachment_types", attachment_types.into());
572 if other == &ItemType::FormData {
573 let payload = item.payload();
574 let form_data_keys = FormDataIter::new(&payload)
575 .map(|entry| entry.key())
576 .collect::<Vec<_>>();
577 scope.set_extra("form_data_keys", form_data_keys.into());
578 }
579 },
580 || {
581 relay_log::error!(
582 tags.project_key = %scoping.project_key,
583 tags.event_type = event_type.unwrap_or("none"),
584 "StoreService received unexpected item type: {other}"
585 )
586 },
587 )
588 }
589 }
590 }
591
592 if let Some(event_item) = event_item {
593 let event_id = event_id.ok_or(StoreError::NoEventId)?;
594 let project_id = scoping.project_id;
595 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
596
597 self.produce(
598 event_topic,
599 KafkaMessage::Event(EventKafkaMessage {
600 payload: event_item.payload(),
601 start_time: safe_timestamp(received_at),
602 event_id,
603 project_id,
604 remote_addr,
605 attachments,
606 org_id: scoping.organization_id,
607 }),
608 )?;
609 } else {
610 debug_assert!(attachments.is_empty());
611 }
612
613 Ok(())
614 }
615
616 fn handle_store_metrics(&self, message: StoreMetrics) {
617 let StoreMetrics {
618 buckets,
619 scoping,
620 retention,
621 } = message;
622
623 let batch_size = self.config.metrics_max_batch_size_bytes();
624 let mut error = None;
625
626 let global_config = self.global_config.current();
627 let mut encoder = BucketEncoder::new(&global_config);
628
629 let emit_sessions_to_eap = utils::is_rolled_out(
630 scoping.organization_id.value(),
631 global_config.options.sessions_eap_rollout_rate,
632 )
633 .is_keep();
634
635 let now = UnixTimestamp::now();
636 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
637
638 for mut bucket in buckets {
639 let namespace = encoder.prepare(&mut bucket);
640
641 if let Some(received_at) = bucket.metadata.received_at {
642 let delay = now.as_secs().saturating_sub(received_at.as_secs());
643 let (total, count, max) = delay_stats.get_mut(namespace);
644 *total += delay;
645 *count += 1;
646 *max = (*max).max(delay);
647 }
648
649 for view in BucketsView::new(std::slice::from_ref(&bucket))
653 .by_size(batch_size)
654 .flatten()
655 {
656 let message = self.create_metric_message(
657 scoping.organization_id,
658 scoping.project_id,
659 &mut encoder,
660 namespace,
661 &view,
662 retention,
663 );
664
665 let result =
666 message.and_then(|message| self.send_metric_message(namespace, message));
667
668 let outcome = match result {
669 Ok(()) => Outcome::Accepted,
670 Err(e) => {
671 error.get_or_insert(e);
672 Outcome::Invalid(DiscardReason::Internal)
673 }
674 };
675
676 self.metric_outcomes.track(scoping, &[view], outcome);
677 }
678
679 if emit_sessions_to_eap
680 && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
681 {
682 let message = KafkaMessage::for_item(scoping, trace_item);
683 let res = self.produce(KafkaTopic::Items, message);
684 if let Err(error) = res {
685 relay_log::error!(
686 error = &error as &dyn std::error::Error,
687 "failed to produce session metrics to EAP"
688 )
689 }
690 }
691 }
692
693 if let Some(error) = error {
694 relay_log::error!(
695 error = &error as &dyn std::error::Error,
696 "failed to produce metric buckets: {error}"
697 );
698 }
699
700 for (namespace, (total, count, max)) in delay_stats {
701 if count == 0 {
702 continue;
703 }
704 metric!(
705 counter(RelayCounters::MetricDelaySum) += total,
706 namespace = namespace.as_str()
707 );
708 metric!(
709 counter(RelayCounters::MetricDelayCount) += count,
710 namespace = namespace.as_str()
711 );
712 metric!(
713 gauge(RelayGauges::MetricDelayMax) = max,
714 namespace = namespace.as_str()
715 );
716 }
717 }
718
719 fn handle_store_trace_item(
720 &self,
721 message: Managed<StoreTraceItem>,
722 ) -> Result<(), Rejected<StoreError>> {
723 let scoping = message.scoping();
724 let received_at = message.received_at();
725
726 let eap_emits_outcomes = utils::is_rolled_out(
727 scoping.organization_id.value(),
728 self.global_config
729 .current()
730 .options
731 .eap_outcomes_rollout_rate,
732 )
733 .is_keep();
734
735 let outcomes = message.try_accept(|mut item| {
736 let outcomes = match eap_emits_outcomes {
737 true => None,
738 false => item.trace_item.outcomes.take(),
739 };
740
741 let message = KafkaMessage::for_item(scoping, item.trace_item);
742 self.produce(KafkaTopic::Items, message).map(|()| outcomes)
743 })?;
744
745 if let Some(outcomes) = outcomes {
750 for (category, quantity) in outcomes.quantities() {
751 self.outcome_aggregator.send(TrackOutcome {
752 category,
753 event_id: None,
754 outcome: Outcome::Accepted,
755 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
756 remote_addr: None,
757 scoping,
758 timestamp: received_at,
759 });
760 }
761 }
762
763 Ok(())
764 }
765
766 fn handle_store_span(
767 &self,
768 message: Managed<Box<StoreSpanV2>>,
769 ) -> Result<(), Rejected<StoreError>> {
770 let scoping = message.scoping();
771 let received_at = message.received_at();
772
773 let relay_emits_accepted_outcome = !utils::is_rolled_out(
774 scoping.organization_id.value(),
775 self.global_config
776 .current()
777 .options
778 .eap_span_outcomes_rollout_rate,
779 )
780 .is_keep();
781
782 let meta = SpanMeta {
783 organization_id: scoping.organization_id,
784 project_id: scoping.project_id,
785 key_id: scoping.key_id,
786 event_id: None,
787 retention_days: message.retention_days,
788 downsampled_retention_days: message.downsampled_retention_days,
789 received: datetime_to_timestamp(received_at),
790 accepted_outcome_emitted: relay_emits_accepted_outcome,
791 };
792
793 message.try_accept(|span| {
794 let item = Annotated::new(span.item);
795 let message = KafkaMessage::SpanV2 {
796 routing_key: span.routing_key,
797 headers: BTreeMap::from([(
798 "project_id".to_owned(),
799 scoping.project_id.to_string(),
800 )]),
801 message: SpanKafkaMessage {
802 meta,
803 span: SerializableAnnotated(&item),
804 },
805 org_id: scoping.organization_id,
806 };
807
808 self.produce(KafkaTopic::Spans, message)
809 })?;
810
811 relay_statsd::metric!(
812 counter(RelayCounters::SpanV2Produced) += 1,
813 via = "processing"
814 );
815
816 if relay_emits_accepted_outcome {
817 self.outcome_aggregator.send(TrackOutcome {
820 category: DataCategory::SpanIndexed,
821 event_id: None,
822 outcome: Outcome::Accepted,
823 quantity: 1,
824 remote_addr: None,
825 scoping,
826 timestamp: received_at,
827 });
828 }
829
830 Ok(())
831 }
832
833 fn handle_store_profile_chunk(
834 &self,
835 message: Managed<StoreProfileChunk>,
836 ) -> Result<(), Rejected<StoreError>> {
837 let scoping = message.scoping();
838 let received_at = message.received_at();
839
840 message.try_accept(|message| {
841 let message = ProfileChunkKafkaMessage {
842 organization_id: scoping.organization_id,
843 project_id: scoping.project_id,
844 received: safe_timestamp(received_at),
845 retention_days: message.retention_days,
846 headers: BTreeMap::from([(
847 "project_id".to_owned(),
848 scoping.project_id.to_string(),
849 )]),
850 payload: message.payload,
851 };
852
853 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
854 })
855 }
856
857 fn handle_store_replay(
858 &self,
859 message: Managed<StoreReplay>,
860 ) -> Result<(), Rejected<StoreError>> {
861 let scoping = message.scoping();
862 let received_at = message.received_at();
863
864 message.try_accept(|replay| {
865 let kafka_msg =
866 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
867 replay_id: replay.event_id,
868 key_id: scoping.key_id,
869 org_id: scoping.organization_id,
870 project_id: scoping.project_id,
871 received: safe_timestamp(received_at),
872 retention_days: replay.retention_days,
873 payload: &replay.recording,
874 replay_event: replay.event.as_deref(),
875 replay_video: replay.video.as_deref(),
876 relay_snuba_publish_disabled: true,
879 });
880 self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
881 })
882 }
883
884 fn handle_store_attachment(
885 &self,
886 message: Managed<StoreAttachment>,
887 ) -> Result<(), Rejected<StoreError>> {
888 let scoping = message.scoping();
889 message.try_accept(|attachment| {
890 let result = self.produce_attachment(
891 attachment.event_id,
892 scoping.project_id,
893 scoping.organization_id,
894 &attachment.attachment,
895 true,
897 attachment.retention,
898 );
899 debug_assert!(!matches!(result, Ok(Some(_))));
902 result.map(|_| ())
903 })
904 }
905
906 fn handle_user_report(
907 &self,
908 message: Managed<StoreUserReport>,
909 ) -> Result<(), Rejected<StoreError>> {
910 let scoping = message.scoping();
911 let received_at = message.received_at();
912
913 message.try_accept(|report| {
914 let kafka_msg = KafkaMessage::UserReport(UserReportKafkaMessage {
915 project_id: scoping.project_id,
916 event_id: report.event_id,
917 start_time: safe_timestamp(received_at),
918 payload: report.report.payload(),
919 org_id: scoping.organization_id,
920 });
921 self.produce(KafkaTopic::Attachments, kafka_msg)
922 })
923 }
924
925 fn handle_profile(&self, message: Managed<StoreProfile>) -> Result<(), Rejected<StoreError>> {
926 let scoping = message.scoping();
927 let received_at = message.received_at();
928
929 message.try_accept(|profile| {
930 self.produce_profile(
931 scoping.organization_id,
932 scoping.project_id,
933 scoping.key_id,
934 received_at,
935 profile.retention_days,
936 &profile.profile,
937 )
938 })
939 }
940
941 fn create_metric_message<'a>(
942 &self,
943 organization_id: OrganizationId,
944 project_id: ProjectId,
945 encoder: &'a mut BucketEncoder,
946 namespace: MetricNamespace,
947 view: &BucketView<'a>,
948 retention_days: u16,
949 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
950 let value = match view.value() {
951 BucketViewValue::Counter(c) => MetricValue::Counter(c),
952 BucketViewValue::Distribution(data) => MetricValue::Distribution(
953 encoder
954 .encode_distribution(namespace, data)
955 .map_err(StoreError::EncodingFailed)?,
956 ),
957 BucketViewValue::Set(data) => MetricValue::Set(
958 encoder
959 .encode_set(namespace, data)
960 .map_err(StoreError::EncodingFailed)?,
961 ),
962 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
963 };
964
965 Ok(MetricKafkaMessage {
966 org_id: organization_id,
967 project_id,
968 name: view.name(),
969 value,
970 timestamp: view.timestamp(),
971 tags: view.tags(),
972 retention_days,
973 received_at: view.metadata().received_at,
974 })
975 }
976
977 fn produce(
978 &self,
979 topic: KafkaTopic,
980 message: KafkaMessage,
982 ) -> Result<(), StoreError> {
983 relay_log::trace!("Sending kafka message of type {}", message.variant());
984
985 let topic_name = self.producer.client.send_message(topic, &message)?;
986
987 match &message {
988 KafkaMessage::Metric {
989 message: metric, ..
990 } => {
991 metric!(
992 counter(RelayCounters::ProcessingMessageProduced) += 1,
993 event_type = message.variant(),
994 topic = topic_name,
995 metric_type = metric.value.variant(),
996 metric_encoding = metric.value.encoding().unwrap_or(""),
997 );
998 }
999 KafkaMessage::ReplayRecordingNotChunked(replay) => {
1000 let has_video = replay.replay_video.is_some();
1001
1002 metric!(
1003 counter(RelayCounters::ProcessingMessageProduced) += 1,
1004 event_type = message.variant(),
1005 topic = topic_name,
1006 has_video = bool_to_str(has_video),
1007 );
1008 }
1009 message => {
1010 metric!(
1011 counter(RelayCounters::ProcessingMessageProduced) += 1,
1012 event_type = message.variant(),
1013 topic = topic_name,
1014 );
1015 }
1016 }
1017
1018 Ok(())
1019 }
1020
1021 fn chunked_attachment_from_placeholder(
1022 &self,
1023 item: &Item,
1024 retention_days: u16,
1025 ) -> Result<ChunkedAttachment, StoreError> {
1026 debug_assert!(
1027 item.stored_key().is_none(),
1028 "AttachmentRef should not have been uploaded to objectstore"
1029 );
1030
1031 let payload = item.payload();
1032 let placeholder: AttachmentPlaceholder<'_> =
1033 serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?;
1034 let location = SignedLocation::try_from_str(placeholder.location)
1035 .ok_or(StoreError::InvalidAttachmentRef)?
1036 .verify(Utc::now(), &self.config)
1037 .map_err(|_| StoreError::InvalidAttachmentRef)?;
1038
1039 let store_key = location.key;
1040
1041 Ok(ChunkedAttachment {
1042 id: Uuid::new_v4().to_string(),
1043 name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(),
1044 rate_limited: item.rate_limited(),
1045 content_type: placeholder.content_type,
1046 attachment_type: item.attachment_type().unwrap_or_default(),
1047 size: item.attachment_body_size(),
1048 retention_days,
1049 payload: AttachmentPayload::Stored(store_key),
1050 })
1051 }
1052
1053 fn chunked_attachment_from_attachment(
1054 &self,
1055 event_id: EventId,
1056 project_id: ProjectId,
1057 org_id: OrganizationId,
1058 item: &Item,
1059 send_individual_attachments: bool,
1060 retention_days: u16,
1061 ) -> Result<ChunkedAttachment, StoreError> {
1062 let id = Uuid::new_v4().to_string();
1063
1064 let payload = item.payload();
1065 let size = item.len();
1066 let max_chunk_size = self.config.attachment_chunk_size();
1067
1068 let payload = if size == 0 {
1069 AttachmentPayload::Chunked(0)
1070 } else if let Some(stored_key) = item.stored_key() {
1071 AttachmentPayload::Stored(stored_key.into())
1072 } else if send_individual_attachments && size < max_chunk_size {
1073 AttachmentPayload::Inline(payload)
1077 } else {
1078 let mut chunk_index = 0;
1079 let mut offset = 0;
1080 while offset < size {
1083 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
1084 let chunk_message = AttachmentChunkKafkaMessage {
1085 payload: payload.slice(offset..offset + chunk_size),
1086 event_id,
1087 project_id,
1088 id: id.clone(),
1089 chunk_index,
1090 org_id,
1091 };
1092
1093 self.produce(
1094 KafkaTopic::Attachments,
1095 KafkaMessage::AttachmentChunk(chunk_message),
1096 )?;
1097 offset += chunk_size;
1098 chunk_index += 1;
1099 }
1100
1101 AttachmentPayload::Chunked(chunk_index)
1104 };
1105
1106 Ok(ChunkedAttachment {
1107 id,
1108 name: match item.filename() {
1109 Some(name) => name.to_owned(),
1110 None => UNNAMED_ATTACHMENT.to_owned(),
1111 },
1112 rate_limited: item.rate_limited(),
1113 content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
1114 attachment_type: item.attachment_type().unwrap_or_default(),
1115 size,
1116 retention_days,
1117 payload,
1118 })
1119 }
1120
1121 fn produce_attachment(
1133 &self,
1134 event_id: EventId,
1135 project_id: ProjectId,
1136 org_id: OrganizationId,
1137 item: &Item,
1138 send_individual_attachments: bool,
1139 retention_days: u16,
1140 ) -> Result<Option<ChunkedAttachment>, StoreError> {
1141 let attachment = if item.is_attachment_ref() {
1142 self.chunked_attachment_from_placeholder(item, retention_days)
1143 } else {
1144 self.chunked_attachment_from_attachment(
1145 event_id,
1146 project_id,
1147 org_id,
1148 item,
1149 send_individual_attachments,
1150 retention_days,
1151 )
1152 }?;
1153
1154 if send_individual_attachments {
1155 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
1156 event_id,
1157 project_id,
1158 attachment,
1159 org_id,
1160 });
1161 self.produce(KafkaTopic::Attachments, message)?;
1162 Ok(None)
1163 } else {
1164 Ok(Some(attachment))
1165 }
1166 }
1167
1168 fn produce_user_report(
1169 &self,
1170 event_id: EventId,
1171 project_id: ProjectId,
1172 org_id: OrganizationId,
1173 received_at: DateTime<Utc>,
1174 item: &Item,
1175 ) -> Result<(), StoreError> {
1176 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
1177 project_id,
1178 event_id,
1179 start_time: safe_timestamp(received_at),
1180 payload: item.payload(),
1181 org_id,
1182 });
1183
1184 self.produce(KafkaTopic::Attachments, message)
1185 }
1186
1187 fn produce_user_report_v2(
1188 &self,
1189 event_id: EventId,
1190 project_id: ProjectId,
1191 org_id: OrganizationId,
1192 received_at: DateTime<Utc>,
1193 item: &Item,
1194 remote_addr: Option<String>,
1195 ) -> Result<(), StoreError> {
1196 let message = KafkaMessage::Event(EventKafkaMessage {
1197 project_id,
1198 event_id,
1199 payload: item.payload(),
1200 start_time: safe_timestamp(received_at),
1201 remote_addr,
1202 attachments: vec![],
1203 org_id,
1204 });
1205 self.produce(KafkaTopic::Feedback, message)
1206 }
1207
1208 fn send_metric_message(
1209 &self,
1210 namespace: MetricNamespace,
1211 message: MetricKafkaMessage,
1212 ) -> Result<(), StoreError> {
1213 let topic = match namespace {
1214 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1215 MetricNamespace::Unsupported => {
1216 relay_log::with_scope(
1217 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
1218 || relay_log::error!("store service dropping unknown metric usecase"),
1219 );
1220 return Ok(());
1221 }
1222 _ => KafkaTopic::MetricsGeneric,
1223 };
1224
1225 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1226 self.produce(topic, KafkaMessage::Metric { headers, message })?;
1227 Ok(())
1228 }
1229
1230 fn produce_profile(
1231 &self,
1232 organization_id: OrganizationId,
1233 project_id: ProjectId,
1234 key_id: Option<u64>,
1235 received_at: DateTime<Utc>,
1236 retention_days: u16,
1237 item: &Item,
1238 ) -> Result<(), StoreError> {
1239 let message = ProfileKafkaMessage {
1240 organization_id,
1241 project_id,
1242 key_id,
1243 received: safe_timestamp(received_at),
1244 retention_days,
1245 headers: BTreeMap::from([
1246 (
1247 "sampled".to_owned(),
1248 if item.sampled() { "true" } else { "false" }.to_owned(),
1249 ),
1250 ("project_id".to_owned(), project_id.to_string()),
1251 ]),
1252 payload: item.payload(),
1253 };
1254 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1255 Ok(())
1256 }
1257
1258 fn produce_check_in(
1259 &self,
1260 project_id: ProjectId,
1261 org_id: OrganizationId,
1262 received_at: DateTime<Utc>,
1263 client: Option<&str>,
1264 retention_days: u16,
1265 item: &Item,
1266 ) -> Result<(), StoreError> {
1267 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1268 message_type: CheckInMessageType::CheckIn,
1269 project_id,
1270 retention_days,
1271 start_time: safe_timestamp(received_at),
1272 sdk: client.map(str::to_owned),
1273 payload: item.payload(),
1274 routing_key_hint: item.routing_hint(),
1275 org_id,
1276 });
1277
1278 self.produce(KafkaTopic::Monitors, message)?;
1279
1280 Ok(())
1281 }
1282
1283 fn produce_span(
1284 &self,
1285 scoping: Scoping,
1286 received_at: DateTime<Utc>,
1287 event_id: Option<EventId>,
1288 retention_days: u16,
1289 downsampled_retention_days: u16,
1290 item: &Item,
1291 ) -> Result<(), StoreError> {
1292 debug_assert_eq!(item.ty(), &ItemType::Span);
1293 debug_assert_eq!(item.content_type(), Some(ContentType::Json));
1294
1295 let Scoping {
1296 organization_id,
1297 project_id,
1298 project_key: _,
1299 key_id,
1300 } = scoping;
1301
1302 let relay_emits_accepted_outcome = !utils::is_rolled_out(
1303 scoping.organization_id.value(),
1304 self.global_config
1305 .current()
1306 .options
1307 .eap_span_outcomes_rollout_rate,
1308 )
1309 .is_keep();
1310
1311 let payload = item.payload();
1312 let message = SpanKafkaMessageRaw {
1313 meta: SpanMeta {
1314 organization_id,
1315 project_id,
1316 key_id,
1317 event_id,
1318 retention_days,
1319 downsampled_retention_days,
1320 received: datetime_to_timestamp(received_at),
1321 accepted_outcome_emitted: relay_emits_accepted_outcome,
1322 },
1323 span: serde_json::from_slice(&payload)
1324 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1325 };
1326
1327 debug_assert!(message.span.contains_key("attributes"));
1329 relay_statsd::metric!(
1330 counter(RelayCounters::SpanV2Produced) += 1,
1331 via = "envelope"
1332 );
1333
1334 self.produce(
1335 KafkaTopic::Spans,
1336 KafkaMessage::SpanRaw {
1337 routing_key: item.routing_hint(),
1338 headers: BTreeMap::from([(
1339 "project_id".to_owned(),
1340 scoping.project_id.to_string(),
1341 )]),
1342 message,
1343 org_id: organization_id,
1344 },
1345 )?;
1346
1347 if relay_emits_accepted_outcome {
1348 self.outcome_aggregator.send(TrackOutcome {
1351 category: DataCategory::SpanIndexed,
1352 event_id: None,
1353 outcome: Outcome::Accepted,
1354 quantity: 1,
1355 remote_addr: None,
1356 scoping,
1357 timestamp: received_at,
1358 });
1359 }
1360
1361 Ok(())
1362 }
1363}
1364
1365impl Service for StoreService {
1366 type Interface = Store;
1367
1368 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1369 let this = Arc::new(self);
1370
1371 relay_log::info!("store forwarder started");
1372
1373 while let Some(message) = rx.recv().await {
1374 let service = Arc::clone(&this);
1375 this.pool
1378 .spawn_async(async move { service.handle_message(message) }.boxed())
1379 .await;
1380 }
1381
1382 relay_log::info!("store forwarder stopped");
1383 }
1384}
1385
1386#[derive(Debug, Serialize)]
1388enum AttachmentPayload {
1389 #[serde(rename = "chunks")]
1394 Chunked(usize),
1395
1396 #[serde(rename = "data")]
1398 Inline(Bytes),
1399
1400 #[serde(rename = "stored_id")]
1402 Stored(String),
1403}
1404
1405#[derive(Debug, Serialize)]
1407struct ChunkedAttachment {
1408 id: String,
1412
1413 name: String,
1415
1416 rate_limited: bool,
1423
1424 #[serde(skip_serializing_if = "Option::is_none")]
1426 content_type: Option<String>,
1427
1428 #[serde(serialize_with = "serialize_attachment_type")]
1430 attachment_type: AttachmentType,
1431
1432 size: usize,
1434
1435 retention_days: u16,
1437
1438 #[serde(flatten)]
1440 payload: AttachmentPayload,
1441}
1442
1443fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1449where
1450 S: serde::Serializer,
1451 T: serde::Serialize,
1452{
1453 serde_json::to_value(t)
1454 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1455 .serialize(serializer)
1456}
1457
1458#[derive(Debug, Serialize)]
1460struct EventKafkaMessage {
1461 payload: Bytes,
1463 start_time: u64,
1465 event_id: EventId,
1467 project_id: ProjectId,
1469 remote_addr: Option<String>,
1471 attachments: Vec<ChunkedAttachment>,
1473
1474 #[serde(skip)]
1476 org_id: OrganizationId,
1477}
1478
1479#[derive(Debug, Serialize)]
1481struct AttachmentChunkKafkaMessage {
1482 payload: Bytes,
1484 event_id: EventId,
1486 project_id: ProjectId,
1488 id: String,
1492 chunk_index: usize,
1494
1495 #[serde(skip)]
1497 org_id: OrganizationId,
1498}
1499
1500#[derive(Debug, Serialize)]
1505struct AttachmentKafkaMessage {
1506 event_id: EventId,
1508 project_id: ProjectId,
1510 attachment: ChunkedAttachment,
1512
1513 #[serde(skip)]
1515 org_id: OrganizationId,
1516}
1517
1518#[derive(Debug, Serialize)]
1519struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1520 replay_id: EventId,
1521 key_id: Option<u64>,
1522 org_id: OrganizationId,
1523 project_id: ProjectId,
1524 received: u64,
1525 retention_days: u16,
1526 #[serde(with = "serde_bytes")]
1527 payload: &'a [u8],
1528 #[serde(with = "serde_bytes")]
1529 replay_event: Option<&'a [u8]>,
1530 #[serde(with = "serde_bytes")]
1531 replay_video: Option<&'a [u8]>,
1532 relay_snuba_publish_disabled: bool,
1533}
1534
1535#[derive(Debug, Serialize)]
1539struct UserReportKafkaMessage {
1540 project_id: ProjectId,
1542 start_time: u64,
1543 payload: Bytes,
1544
1545 #[serde(skip)]
1547 event_id: EventId,
1548 #[serde(skip)]
1550 org_id: OrganizationId,
1551}
1552
1553#[derive(Clone, Debug, Serialize)]
1554struct MetricKafkaMessage<'a> {
1555 org_id: OrganizationId,
1556 project_id: ProjectId,
1557 name: &'a MetricName,
1558 #[serde(flatten)]
1559 value: MetricValue<'a>,
1560 timestamp: UnixTimestamp,
1561 tags: &'a BTreeMap<String, String>,
1562 retention_days: u16,
1563 #[serde(skip_serializing_if = "Option::is_none")]
1564 received_at: Option<UnixTimestamp>,
1565}
1566
1567#[derive(Clone, Debug, Serialize)]
1568#[serde(tag = "type", content = "value")]
1569enum MetricValue<'a> {
1570 #[serde(rename = "c")]
1571 Counter(FiniteF64),
1572 #[serde(rename = "d")]
1573 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1574 #[serde(rename = "s")]
1575 Set(ArrayEncoding<'a, SetView<'a>>),
1576 #[serde(rename = "g")]
1577 Gauge(GaugeValue),
1578}
1579
1580impl MetricValue<'_> {
1581 fn variant(&self) -> &'static str {
1582 match self {
1583 Self::Counter(_) => "counter",
1584 Self::Distribution(_) => "distribution",
1585 Self::Set(_) => "set",
1586 Self::Gauge(_) => "gauge",
1587 }
1588 }
1589
1590 fn encoding(&self) -> Option<&'static str> {
1591 match self {
1592 Self::Distribution(ae) => Some(ae.name()),
1593 Self::Set(ae) => Some(ae.name()),
1594 _ => None,
1595 }
1596 }
1597}
1598
1599#[derive(Clone, Debug, Serialize)]
1600struct ProfileKafkaMessage {
1601 organization_id: OrganizationId,
1602 project_id: ProjectId,
1603 key_id: Option<u64>,
1604 received: u64,
1605 retention_days: u16,
1606 #[serde(skip)]
1607 headers: BTreeMap<String, String>,
1608 payload: Bytes,
1609}
1610
1611#[allow(dead_code)]
1617#[derive(Debug, Serialize)]
1618#[serde(rename_all = "snake_case")]
1619enum CheckInMessageType {
1620 ClockPulse,
1621 CheckIn,
1622}
1623
1624#[derive(Debug, Serialize)]
1625struct CheckInKafkaMessage {
1626 message_type: CheckInMessageType,
1628 payload: Bytes,
1630 start_time: u64,
1632 sdk: Option<String>,
1634 project_id: ProjectId,
1636 retention_days: u16,
1638
1639 #[serde(skip)]
1641 routing_key_hint: Option<Uuid>,
1642 #[serde(skip)]
1644 org_id: OrganizationId,
1645}
1646
1647#[derive(Debug, Serialize)]
1648struct SpanKafkaMessageRaw<'a> {
1649 #[serde(flatten)]
1650 meta: SpanMeta,
1651 #[serde(flatten)]
1652 span: BTreeMap<&'a str, &'a RawValue>,
1653}
1654
1655#[derive(Debug, Serialize)]
1656struct SpanKafkaMessage<'a> {
1657 #[serde(flatten)]
1658 meta: SpanMeta,
1659 #[serde(flatten)]
1660 span: SerializableAnnotated<'a, SpanV2>,
1661}
1662
1663#[derive(Debug, Serialize)]
1664struct SpanMeta {
1665 organization_id: OrganizationId,
1666 project_id: ProjectId,
1667 #[serde(skip_serializing_if = "Option::is_none")]
1669 key_id: Option<u64>,
1670 #[serde(skip_serializing_if = "Option::is_none")]
1671 event_id: Option<EventId>,
1672 received: f64,
1674 retention_days: u16,
1676 downsampled_retention_days: u16,
1678 accepted_outcome_emitted: bool,
1680}
1681
1682#[derive(Clone, Debug, Serialize)]
1683struct ProfileChunkKafkaMessage {
1684 organization_id: OrganizationId,
1685 project_id: ProjectId,
1686 received: u64,
1687 retention_days: u16,
1688 #[serde(skip)]
1689 headers: BTreeMap<String, String>,
1690 payload: Bytes,
1691}
1692
1693#[derive(Debug, Serialize)]
1695#[serde(tag = "type", rename_all = "snake_case")]
1696#[allow(clippy::large_enum_variant)]
1697enum KafkaMessage<'a> {
1698 Event(EventKafkaMessage),
1699 UserReport(UserReportKafkaMessage),
1700 Metric {
1701 #[serde(skip)]
1702 headers: BTreeMap<String, String>,
1703 #[serde(flatten)]
1704 message: MetricKafkaMessage<'a>,
1705 },
1706 CheckIn(CheckInKafkaMessage),
1707 Item {
1708 #[serde(skip)]
1709 headers: BTreeMap<String, String>,
1710 #[serde(skip)]
1711 item_type: TraceItemType,
1712 #[serde(skip)]
1713 message: TraceItem,
1714 },
1715 SpanRaw {
1716 #[serde(skip)]
1717 routing_key: Option<Uuid>,
1718 #[serde(skip)]
1719 headers: BTreeMap<String, String>,
1720 #[serde(flatten)]
1721 message: SpanKafkaMessageRaw<'a>,
1722
1723 #[serde(skip)]
1725 org_id: OrganizationId,
1726 },
1727 SpanV2 {
1728 #[serde(skip)]
1729 routing_key: Option<Uuid>,
1730 #[serde(skip)]
1731 headers: BTreeMap<String, String>,
1732 #[serde(flatten)]
1733 message: SpanKafkaMessage<'a>,
1734
1735 #[serde(skip)]
1737 org_id: OrganizationId,
1738 },
1739
1740 Attachment(AttachmentKafkaMessage),
1741 AttachmentChunk(AttachmentChunkKafkaMessage),
1742
1743 Profile(ProfileKafkaMessage),
1744 ProfileChunk(ProfileChunkKafkaMessage),
1745
1746 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1747}
1748
1749impl KafkaMessage<'_> {
1750 fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1752 let item_type = item.item_type();
1753 KafkaMessage::Item {
1754 headers: BTreeMap::from([
1755 ("project_id".to_owned(), scoping.project_id.to_string()),
1756 ("item_type".to_owned(), (item_type as i32).to_string()),
1757 ]),
1758 message: item,
1759 item_type,
1760 }
1761 }
1762}
1763
1764impl Message for KafkaMessage<'_> {
1765 fn variant(&self) -> &'static str {
1766 match self {
1767 KafkaMessage::Event(_) => "event",
1768 KafkaMessage::UserReport(_) => "user_report",
1769 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1770 MetricNamespace::Sessions => "metric_sessions",
1771 MetricNamespace::Spans => "metric_spans",
1772 MetricNamespace::Transactions => "metric_transactions",
1773 MetricNamespace::Custom => "metric_custom",
1774 MetricNamespace::Unsupported => "metric_unsupported",
1775 },
1776 KafkaMessage::CheckIn(_) => "check_in",
1777 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1778 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1779
1780 KafkaMessage::Attachment(_) => "attachment",
1781 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1782
1783 KafkaMessage::Profile(_) => "profile",
1784 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1785
1786 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1787 }
1788 }
1789
1790 fn key(&self) -> Option<relay_kafka::Key> {
1792 match self {
1793 Self::Event(message) => Some((message.event_id.0, message.org_id)),
1794 Self::UserReport(message) => Some((message.event_id.0, message.org_id)),
1795 Self::SpanRaw {
1796 routing_key,
1797 org_id,
1798 ..
1799 }
1800 | Self::SpanV2 {
1801 routing_key,
1802 org_id,
1803 ..
1804 } => routing_key.map(|r| (r, *org_id)),
1805
1806 Self::CheckIn(message) => message.routing_key_hint.map(|r| (r, message.org_id)),
1811
1812 Self::Attachment(message) => Some((message.event_id.0, message.org_id)),
1813 Self::AttachmentChunk(message) => Some((message.event_id.0, message.org_id)),
1814
1815 Self::Metric { .. }
1817 | Self::Item { .. }
1818 | Self::Profile(_)
1819 | Self::ProfileChunk(_)
1820 | Self::ReplayRecordingNotChunked(_) => None,
1821 }
1822 .filter(|(uuid, _)| !uuid.is_nil())
1823 .map(|(uuid, org_id)| {
1824 let mut res = uuid.into_bytes();
1827 for (i, &b) in org_id.value().to_be_bytes().iter().enumerate() {
1828 res[i] ^= b;
1829 }
1830 u128::from_be_bytes(res)
1831 })
1832 }
1833
1834 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1835 match &self {
1836 KafkaMessage::Metric { headers, .. }
1837 | KafkaMessage::SpanRaw { headers, .. }
1838 | KafkaMessage::SpanV2 { headers, .. }
1839 | KafkaMessage::Item { headers, .. }
1840 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1841 | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1842
1843 KafkaMessage::Event(_)
1844 | KafkaMessage::UserReport(_)
1845 | KafkaMessage::CheckIn(_)
1846 | KafkaMessage::Attachment(_)
1847 | KafkaMessage::AttachmentChunk(_)
1848 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1849 }
1850 }
1851
1852 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1853 match self {
1854 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1855 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1856 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1857 KafkaMessage::Item { message, .. } => {
1858 let mut payload = Vec::new();
1859 match message.encode(&mut payload) {
1860 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1861 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1862 }
1863 }
1864 KafkaMessage::Event(_)
1865 | KafkaMessage::UserReport(_)
1866 | KafkaMessage::CheckIn(_)
1867 | KafkaMessage::Attachment(_)
1868 | KafkaMessage::AttachmentChunk(_)
1869 | KafkaMessage::Profile(_)
1870 | KafkaMessage::ProfileChunk(_)
1871 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1872 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1873 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1874 },
1875 }
1876 }
1877}
1878
1879fn serialize_as_json<T: serde::Serialize>(
1880 value: &T,
1881) -> Result<SerializationOutput<'_>, ClientError> {
1882 match serde_json::to_vec(value) {
1883 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1884 Err(err) => Err(ClientError::InvalidJson(err)),
1885 }
1886}
1887
1888fn is_slow_item(item: &Item) -> bool {
1892 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1893}
1894
1895fn bool_to_str(value: bool) -> &'static str {
1896 if value { "true" } else { "false" }
1897}
1898
1899fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1903 let ts = timestamp.timestamp();
1904 if ts >= 0 {
1905 return ts as u64;
1906 }
1907
1908 Utc::now().timestamp() as u64
1910}
1911
1912#[cfg(test)]
1913mod tests {
1914
1915 use super::*;
1916
1917 #[test]
1918 fn disallow_outcomes() {
1919 struct TestMessage;
1920 impl relay_kafka::Message for TestMessage {
1921 fn key(&self) -> Option<relay_kafka::Key> {
1922 None
1923 }
1924
1925 fn variant(&self) -> &'static str {
1926 "test"
1927 }
1928
1929 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1930 None
1931 }
1932
1933 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1934 Ok(SerializationOutput::Json(Cow::Borrowed(
1935 br#"{"timestamp": "foo", "outcome": 1}"#,
1936 )))
1937 }
1938 }
1939
1940 let config = Config::default();
1941 let producer = Producer::create(&config).unwrap();
1942
1943 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1944 let res = producer.client.send_message(topic, &TestMessage);
1945
1946 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1947 }
1948 }
1949}