1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task;
10
11use bytes::Bytes;
12use chrono::{DateTime, SecondsFormat, Utc};
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use uuid::Uuid;
17
18use relay_base_schema::data_category::DataCategory;
19use relay_base_schema::organization::OrganizationId;
20use relay_base_schema::project::ProjectId;
21use relay_common::time::UnixTimestamp;
22use relay_config::Config;
23use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
24use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
25use relay_metrics::{
26 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
27 MetricNamespace, SetView,
28};
29use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
30use relay_quotas::Scoping;
31use relay_statsd::metric;
32use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
33use relay_threading::AsyncPool;
34
35use crate::envelope::{AttachmentPlaceholder, AttachmentType, Item, ItemType};
36use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
37use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
38use crate::service::ServiceError;
39use crate::services::global_config::GlobalConfigHandle;
40use crate::services::outcome::{self, DiscardReason, Outcome, OutcomeId, TrackOutcome};
41use crate::services::upload::{Final, SignedLocation};
42use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
43use crate::utils::{self, FormDataIter};
44
45mod sessions;
46
47const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
49
50#[derive(Debug, thiserror::Error)]
51pub enum StoreError {
52 #[error("failed to send the message to kafka: {0}")]
53 SendFailed(#[from] ClientError),
54 #[error("failed to encode data: {0}")]
55 EncodingFailed(std::io::Error),
56 #[error("failed to store event because event id was missing")]
57 NoEventId,
58 #[error("invalid attachment reference")]
59 InvalidAttachmentRef,
60}
61
62impl OutcomeError for StoreError {
63 type Error = Self;
64
65 fn consume(self) -> (Option<Outcome>, Self::Error) {
66 let outcome = match self {
67 StoreError::SendFailed(_) | StoreError::EncodingFailed(_) | StoreError::NoEventId => {
68 Some(Outcome::Invalid(DiscardReason::Internal))
69 }
70 StoreError::InvalidAttachmentRef => {
71 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
72 }
73 };
74 (outcome, self)
75 }
76}
77
78struct Producer {
79 client: KafkaClient,
80}
81
82impl Producer {
83 pub fn create(config: &Config) -> anyhow::Result<Self> {
84 let mut client_builder = KafkaClient::builder();
85
86 for topic in KafkaTopic::iter() {
87 let kafka_configs = config.kafka_configs(*topic)?;
88 client_builder = client_builder
89 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
90 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
91 }
92
93 Ok(Self {
94 client: client_builder.build(),
95 })
96 }
97}
98
99#[derive(Debug)]
101pub struct StoreEnvelope {
102 pub envelope: ManagedEnvelope,
103}
104
105#[derive(Clone, Debug)]
107pub struct StoreMetrics {
108 pub buckets: Vec<Bucket>,
109 pub scoping: Scoping,
110 pub retention: u16,
111}
112
113#[derive(Debug)]
115pub struct StoreTraceItem {
116 pub trace_item: TraceItem,
118}
119
120impl Counted for StoreTraceItem {
121 fn quantities(&self) -> Quantities {
122 self.trace_item.quantities()
123 }
124}
125
126#[derive(Debug)]
128pub struct StoreSpanV2 {
129 pub routing_key: Option<Uuid>,
131 pub retention_days: u16,
133 pub downsampled_retention_days: u16,
135 pub event_id: Option<EventId>,
137 pub item: SpanV2,
139 pub performance_issues_spans: bool,
142}
143
144impl Counted for StoreSpanV2 {
145 fn quantities(&self) -> Quantities {
146 smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
147 }
148}
149
150#[derive(Debug)]
152pub struct StoreProfileChunk {
153 pub retention_days: u16,
155 pub payload: Bytes,
157 pub quantities: Quantities,
161}
162
163impl Counted for StoreProfileChunk {
164 fn quantities(&self) -> Quantities {
165 self.quantities.clone()
166 }
167}
168
169#[derive(Debug)]
171pub struct StoreReplay {
172 pub event_id: EventId,
174 pub retention_days: u16,
176 pub recording: Bytes,
178 pub event: Option<Bytes>,
180 pub video: Option<Bytes>,
182 pub quantities: Quantities,
186}
187
188impl Counted for StoreReplay {
189 fn quantities(&self) -> Quantities {
190 self.quantities.clone()
191 }
192}
193
194#[derive(Debug)]
196pub struct StoreAttachment {
197 pub event_id: EventId,
199 pub attachment: Item,
201 pub quantities: Quantities,
203 pub retention: u16,
205}
206
207impl Counted for StoreAttachment {
208 fn quantities(&self) -> Quantities {
209 self.quantities.clone()
210 }
211}
212
213#[derive(Debug)]
215pub struct StoreUserReport {
216 pub event_id: EventId,
218 pub report: Item,
220}
221
222impl Counted for StoreUserReport {
223 fn quantities(&self) -> Quantities {
224 smallvec::smallvec![(DataCategory::UserReportV2, 1)]
225 }
226}
227
228#[derive(Debug)]
230pub struct StoreProfile {
231 pub retention_days: u16,
233 pub profile: Item,
235 pub quantities: Quantities,
237}
238
239impl Counted for StoreProfile {
240 fn quantities(&self) -> Quantities {
241 self.quantities.clone()
242 }
243}
244
245pub type StoreServicePool = AsyncPool<StoreTask>;
247
248#[derive(Debug)]
250pub enum Store {
251 Envelope(StoreEnvelope),
259 Metrics(StoreMetrics),
261 TraceItem(Managed<StoreTraceItem>),
263 Span(Managed<Box<StoreSpanV2>>),
265 ProfileChunk(Managed<StoreProfileChunk>),
267 Replay(Managed<StoreReplay>),
269 Attachment(Managed<StoreAttachment>),
271 UserReport(Managed<StoreUserReport>),
273 Profile(Managed<StoreProfile>),
275}
276
277impl Store {
278 fn variant(&self) -> &'static str {
280 match self {
281 Store::Envelope(_) => "envelope",
282 Store::Metrics(_) => "metrics",
283 Store::TraceItem(_) => "trace_item",
284 Store::Span(_) => "span",
285 Store::ProfileChunk(_) => "profile_chunk",
286 Store::Replay(_) => "replay",
287 Store::Attachment(_) => "attachment",
288 Store::UserReport(_) => "user_report",
289 Store::Profile(_) => "profile",
290 }
291 }
292}
293
294impl Interface for Store {}
295
296impl FromMessage<StoreEnvelope> for Store {
297 type Response = NoResponse;
298
299 fn from_message(message: StoreEnvelope, _: ()) -> Self {
300 Self::Envelope(message)
301 }
302}
303
304impl FromMessage<StoreMetrics> for Store {
305 type Response = NoResponse;
306
307 fn from_message(message: StoreMetrics, _: ()) -> Self {
308 Self::Metrics(message)
309 }
310}
311
312impl FromMessage<Managed<StoreTraceItem>> for Store {
313 type Response = NoResponse;
314
315 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
316 Self::TraceItem(message)
317 }
318}
319
320impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
321 type Response = NoResponse;
322
323 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
324 Self::Span(message)
325 }
326}
327
328impl FromMessage<Managed<StoreProfileChunk>> for Store {
329 type Response = NoResponse;
330
331 fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
332 Self::ProfileChunk(message)
333 }
334}
335
336impl FromMessage<Managed<StoreReplay>> for Store {
337 type Response = NoResponse;
338
339 fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
340 Self::Replay(message)
341 }
342}
343
344impl FromMessage<Managed<StoreAttachment>> for Store {
345 type Response = NoResponse;
346
347 fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
348 Self::Attachment(message)
349 }
350}
351
352impl FromMessage<Managed<StoreUserReport>> for Store {
353 type Response = NoResponse;
354
355 fn from_message(message: Managed<StoreUserReport>, _: ()) -> Self {
356 Self::UserReport(message)
357 }
358}
359
360impl FromMessage<Managed<StoreProfile>> for Store {
361 type Response = NoResponse;
362
363 fn from_message(message: Managed<StoreProfile>, _: ()) -> Self {
364 Self::Profile(message)
365 }
366}
367
368pub struct StoreService {
370 pool: StoreServicePool,
371 config: Arc<Config>,
372 global_config: GlobalConfigHandle,
373 outcome_aggregator: Addr<TrackOutcome>,
374 metric_outcomes: MetricOutcomes,
375 producer: Producer,
376}
377
378impl StoreService {
379 pub fn create(
380 pool: StoreServicePool,
381 config: Arc<Config>,
382 global_config: GlobalConfigHandle,
383 outcome_aggregator: Addr<TrackOutcome>,
384 metric_outcomes: MetricOutcomes,
385 ) -> anyhow::Result<Self> {
386 let producer = Producer::create(&config)?;
387 Ok(Self {
388 pool,
389 config,
390 global_config,
391 outcome_aggregator,
392 metric_outcomes,
393 producer,
394 })
395 }
396
397 fn handle_message(&self, message: Store) {
398 let ty = message.variant();
399 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
400 let result = match message {
401 Store::Envelope(message) => self.handle_store_envelope(message),
402 Store::Metrics(message) => {
403 self.handle_store_metrics(message);
404 Ok(())
405 }
406 Store::TraceItem(message) => self.handle_store_trace_item(message),
407 Store::Span(message) => self.handle_store_span(message),
408 Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
409 Store::Replay(message) => self.handle_store_replay(message),
410 Store::Attachment(message) => self.handle_store_attachment(message),
411 Store::UserReport(message) => self.handle_user_report(message),
412 Store::Profile(message) => self.handle_profile(message),
413 };
414 if let Err(error) = result {
415 relay_log::error!(
416 error = &error as &dyn Error,
417 tags.message = ty,
418 "failed to store message"
419 );
420 }
421 })
422 }
423
424 fn handle_store_envelope(&self, message: StoreEnvelope) -> Result<(), Rejected<StoreError>> {
425 let StoreEnvelope { mut envelope } = message;
426
427 match self.store_envelope(&mut envelope) {
428 Ok(()) => {
429 envelope.accept();
430 Ok(())
431 }
432 Err(error) => {
433 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
437 Err(Managed::with_meta_from_managed_envelope(&envelope, ()).reject_err(error))
438 }
439 }
440 }
441
442 fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
443 let mut envelope = managed_envelope.take_envelope();
444 let received_at = managed_envelope.received_at();
445 let scoping = managed_envelope.scoping();
446
447 let retention = envelope.retention();
448
449 let event_id = envelope.event_id();
450 let event_item = envelope.as_mut().take_item_by(|item| {
451 matches!(
452 item.ty(),
453 ItemType::Event | ItemType::Transaction | ItemType::Security
454 )
455 });
456 let event_type = event_item.as_ref().map(|item| item.ty());
457
458 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
462 KafkaTopic::Transactions
463 } else if envelope.get_item_by(is_slow_item).is_some() {
464 KafkaTopic::Attachments
465 } else {
466 KafkaTopic::Events
467 };
468
469 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
470
471 let mut attachments = Vec::new();
472
473 for item in envelope.items() {
474 match item.ty() {
475 ItemType::Attachment => {
476 if let Some(attachment) = self.produce_attachment(
477 event_id.ok_or(StoreError::NoEventId)?,
478 scoping.project_id,
479 scoping.organization_id,
480 item,
481 send_individual_attachments,
482 retention,
483 )? {
484 attachments.push(attachment);
485 }
486 }
487 ItemType::UserReport => {
488 debug_assert!(event_topic == KafkaTopic::Attachments);
489 self.produce_user_report(
490 event_id.ok_or(StoreError::NoEventId)?,
491 scoping.project_id,
492 scoping.organization_id,
493 received_at,
494 item,
495 )?;
496 }
497 ItemType::UserReportV2 => {
498 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
499 self.produce_user_report_v2(
500 event_id.ok_or(StoreError::NoEventId)?,
501 scoping.project_id,
502 scoping.organization_id,
503 received_at,
504 item,
505 remote_addr,
506 )?;
507 }
508 ItemType::Profile => self.produce_profile(
509 scoping.organization_id,
510 scoping.project_id,
511 scoping.key_id,
512 received_at,
513 retention,
514 item,
515 )?,
516 ItemType::CheckIn => {
517 let client = envelope.meta().client();
518 self.produce_check_in(
519 scoping.project_id,
520 scoping.organization_id,
521 received_at,
522 client,
523 retention,
524 item,
525 )?
526 }
527 ty @ (ItemType::Log | ItemType::Span) => {
528 debug_assert!(
529 false,
530 "received {ty} through an envelope, \
531 this item must be submitted via a specific store message instead"
532 );
533 relay_log::error!(
534 tags.project_key = %scoping.project_key,
535 "StoreService received unsupported item type '{ty}' in envelope"
536 );
537 }
538 other => {
539 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
540 let item_types = envelope
541 .items()
542 .map(|item| item.ty().as_str())
543 .collect::<Vec<_>>();
544 let attachment_types = envelope
545 .items()
546 .map(|item| {
547 item.attachment_type()
548 .map(|t| t.to_string())
549 .unwrap_or_default()
550 })
551 .collect::<Vec<_>>();
552
553 relay_log::with_scope(
554 |scope| {
555 scope.set_extra("item_types", item_types.into());
556 scope.set_extra("attachment_types", attachment_types.into());
557 if other == &ItemType::FormData {
558 let payload = item.payload();
559 let form_data_keys = FormDataIter::new(&payload)
560 .map(|entry| entry.key())
561 .collect::<Vec<_>>();
562 scope.set_extra("form_data_keys", form_data_keys.into());
563 }
564 },
565 || {
566 relay_log::error!(
567 tags.project_key = %scoping.project_key,
568 tags.event_type = event_type.unwrap_or("none"),
569 "StoreService received unexpected item type: {other}"
570 )
571 },
572 )
573 }
574 }
575 }
576
577 if let Some(event_item) = event_item {
578 let event_id = event_id.ok_or(StoreError::NoEventId)?;
579 let project_id = scoping.project_id;
580 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
581
582 self.produce(
583 event_topic,
584 KafkaMessage::Event(EventKafkaMessage {
585 payload: event_item.payload(),
586 start_time: safe_timestamp(received_at),
587 event_id,
588 project_id,
589 remote_addr,
590 attachments,
591 org_id: scoping.organization_id,
592 }),
593 )?;
594 } else {
595 debug_assert!(attachments.is_empty());
596 }
597
598 Ok(())
599 }
600
601 fn handle_store_metrics(&self, message: StoreMetrics) {
602 let StoreMetrics {
603 buckets,
604 scoping,
605 retention,
606 } = message;
607
608 let batch_size = self.config.metrics_max_batch_size_bytes();
609 let mut error = None;
610
611 let global_config = self.global_config.current().unwrap_or_default();
612 let mut encoder = BucketEncoder::new(&global_config);
613
614 let emit_sessions_to_eap = utils::is_rolled_out(
615 scoping.organization_id.value(),
616 global_config.options.sessions_eap_rollout_rate,
617 )
618 .is_keep();
619
620 let now = UnixTimestamp::now();
621 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
622
623 for mut bucket in buckets {
624 let namespace = encoder.prepare(&mut bucket);
625
626 if let Some(received_at) = bucket.metadata.received_at {
627 let delay = now.as_secs().saturating_sub(received_at.as_secs());
628 let (total, count, max) = delay_stats.get_mut(namespace);
629 *total += delay;
630 *count += 1;
631 *max = (*max).max(delay);
632 }
633
634 for view in BucketsView::new(std::slice::from_ref(&bucket))
638 .by_size(batch_size)
639 .flatten()
640 {
641 let message =
642 self.create_metric_message(&scoping, &mut encoder, namespace, &view, retention);
643
644 let result =
645 message.and_then(|message| self.send_metric_message(namespace, message));
646
647 let outcome = match result {
648 Ok(()) => Outcome::Accepted,
649 Err(e) => {
650 error.get_or_insert(e);
651 Outcome::Invalid(DiscardReason::Internal)
652 }
653 };
654
655 self.metric_outcomes.track(scoping, &[view], outcome);
656 }
657
658 if emit_sessions_to_eap
659 && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
660 {
661 let message = KafkaMessage::for_item(scoping, trace_item);
662 let res = self.produce(KafkaTopic::Items, message);
663 if let Err(error) = res {
664 relay_log::error!(
665 error = &error as &dyn std::error::Error,
666 "failed to produce session metrics to EAP"
667 )
668 }
669 }
670 }
671
672 if let Some(error) = error {
673 relay_log::error!(
674 error = &error as &dyn std::error::Error,
675 "failed to produce metric buckets: {error}"
676 );
677 }
678
679 for (namespace, (total, count, max)) in delay_stats {
680 if count == 0 {
681 continue;
682 }
683 metric!(
684 counter(RelayCounters::MetricDelaySum) += total,
685 namespace = namespace.as_str()
686 );
687 metric!(
688 counter(RelayCounters::MetricDelayCount) += count,
689 namespace = namespace.as_str()
690 );
691 metric!(
692 gauge(RelayGauges::MetricDelayMax) = max,
693 namespace = namespace.as_str()
694 );
695 }
696 }
697
698 fn handle_store_trace_item(
699 &self,
700 message: Managed<StoreTraceItem>,
701 ) -> Result<(), Rejected<StoreError>> {
702 let scoping = message.scoping();
703 let received_at = message.received_at();
704
705 let eap_emits_outcomes = utils::is_rolled_out(
706 scoping.organization_id.value(),
707 self.global_config
708 .current()
709 .unwrap_or_default()
710 .options
711 .eap_outcomes_rollout_rate,
712 )
713 .is_keep();
714
715 let outcomes = message.try_accept(|mut item| {
716 let outcomes = match eap_emits_outcomes {
717 true => None,
718 false => item.trace_item.outcomes.take(),
719 };
720
721 let message = KafkaMessage::for_item(scoping, item.trace_item);
722 self.produce(KafkaTopic::Items, message).map(|()| outcomes)
723 })?;
724
725 if let Some(outcomes) = outcomes {
730 for (category, quantity) in outcomes.quantities() {
731 self.outcome_aggregator.send(TrackOutcome {
732 category,
733 event_id: None,
734 outcome: Outcome::Accepted,
735 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
736 remote_addr: None,
737 scoping,
738 timestamp: received_at,
739 });
740 }
741 }
742
743 Ok(())
744 }
745
746 fn handle_store_span(
747 &self,
748 message: Managed<Box<StoreSpanV2>>,
749 ) -> Result<(), Rejected<StoreError>> {
750 let scoping = message.scoping();
751 let received_at = message.received_at();
752
753 let relay_emits_accepted_outcome = !utils::is_rolled_out(
754 scoping.organization_id.value(),
755 self.global_config
756 .current()
757 .unwrap_or_default()
758 .options
759 .eap_span_outcomes_rollout_rate,
760 )
761 .is_keep();
762
763 let meta = SpanMeta {
764 organization_id: scoping.organization_id,
765 project_id: scoping.project_id,
766 key_id: scoping.key_id,
767 event_id: message.event_id,
768 retention_days: message.retention_days,
769 downsampled_retention_days: message.downsampled_retention_days,
770 received: datetime_to_timestamp(received_at),
771 accepted_outcome_emitted: relay_emits_accepted_outcome,
772 performance_issues_spans: message.performance_issues_spans,
773 };
774
775 message.try_accept(|span| {
776 let item = Annotated::new(span.item);
777 let message = KafkaMessage::SpanV2 {
778 routing_key: span.routing_key,
779 headers: BTreeMap::from([(
780 "project_id".to_owned(),
781 scoping.project_id.to_string(),
782 )]),
783 message: SpanKafkaMessage {
784 meta,
785 span: SerializableAnnotated(&item),
786 },
787 org_id: scoping.organization_id,
788 };
789
790 self.produce(KafkaTopic::Spans, message)
791 })?;
792
793 relay_statsd::metric!(
794 counter(RelayCounters::SpanV2Produced) += 1,
795 via = "processing"
796 );
797
798 if relay_emits_accepted_outcome {
799 self.outcome_aggregator.send(TrackOutcome {
802 category: DataCategory::SpanIndexed,
803 event_id: None,
804 outcome: Outcome::Accepted,
805 quantity: 1,
806 remote_addr: None,
807 scoping,
808 timestamp: received_at,
809 });
810 }
811
812 Ok(())
813 }
814
815 fn handle_store_profile_chunk(
816 &self,
817 message: Managed<StoreProfileChunk>,
818 ) -> Result<(), Rejected<StoreError>> {
819 let scoping = message.scoping();
820 let received_at = message.received_at();
821
822 message.try_accept(|message| {
823 let message = ProfileChunkKafkaMessage {
824 organization_id: scoping.organization_id,
825 project_id: scoping.project_id,
826 received: safe_timestamp(received_at),
827 retention_days: message.retention_days,
828 headers: BTreeMap::from([(
829 "project_id".to_owned(),
830 scoping.project_id.to_string(),
831 )]),
832 payload: message.payload,
833 };
834
835 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
836 })
837 }
838
839 fn handle_store_replay(
840 &self,
841 message: Managed<StoreReplay>,
842 ) -> Result<(), Rejected<StoreError>> {
843 let scoping = message.scoping();
844 let received_at = message.received_at();
845
846 message.try_accept(|replay| {
847 let kafka_msg =
848 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
849 replay_id: replay.event_id,
850 key_id: scoping.key_id,
851 org_id: scoping.organization_id,
852 project_id: scoping.project_id,
853 received: safe_timestamp(received_at),
854 retention_days: replay.retention_days,
855 payload: &replay.recording,
856 replay_event: replay.event.as_deref(),
857 replay_video: replay.video.as_deref(),
858 relay_snuba_publish_disabled: true,
861 });
862 self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
863 })
864 }
865
866 fn handle_store_attachment(
867 &self,
868 message: Managed<StoreAttachment>,
869 ) -> Result<(), Rejected<StoreError>> {
870 let scoping = message.scoping();
871 message.try_accept(|attachment| {
872 let result = self.produce_attachment(
873 attachment.event_id,
874 scoping.project_id,
875 scoping.organization_id,
876 &attachment.attachment,
877 true,
879 attachment.retention,
880 );
881 debug_assert!(!matches!(result, Ok(Some(_))));
884 result.map(|_| ())
885 })
886 }
887
888 fn handle_user_report(
889 &self,
890 message: Managed<StoreUserReport>,
891 ) -> Result<(), Rejected<StoreError>> {
892 let scoping = message.scoping();
893 let received_at = message.received_at();
894
895 message.try_accept(|report| {
896 let kafka_msg = KafkaMessage::UserReport(UserReportKafkaMessage {
897 project_id: scoping.project_id,
898 event_id: report.event_id,
899 start_time: safe_timestamp(received_at),
900 payload: report.report.payload(),
901 org_id: scoping.organization_id,
902 });
903 self.produce(KafkaTopic::Attachments, kafka_msg)
904 })
905 }
906
907 fn handle_profile(&self, message: Managed<StoreProfile>) -> Result<(), Rejected<StoreError>> {
908 let scoping = message.scoping();
909 let received_at = message.received_at();
910
911 message.try_accept(|profile| {
912 self.produce_profile(
913 scoping.organization_id,
914 scoping.project_id,
915 scoping.key_id,
916 received_at,
917 profile.retention_days,
918 &profile.profile,
919 )
920 })
921 }
922
923 fn create_metric_message<'a>(
924 &self,
925 scoping: &Scoping,
926 encoder: &'a mut BucketEncoder,
927 namespace: MetricNamespace,
928 view: &BucketView<'a>,
929 retention_days: u16,
930 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
931 let value = match view.value() {
932 BucketViewValue::Counter(c) => MetricValue::Counter(c),
933 BucketViewValue::Distribution(data) => MetricValue::Distribution(
934 encoder
935 .encode_distribution(namespace, data)
936 .map_err(StoreError::EncodingFailed)?,
937 ),
938 BucketViewValue::Set(data) => MetricValue::Set(
939 encoder
940 .encode_set(namespace, data)
941 .map_err(StoreError::EncodingFailed)?,
942 ),
943 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
944 };
945
946 Ok(MetricKafkaMessage {
947 org_id: scoping.organization_id,
948 project_id: scoping.project_id,
949 key_id: scoping.key_id,
950 name: view.name(),
951 value,
952 timestamp: view.timestamp(),
953 tags: view.tags(),
954 retention_days,
955 received_at: view.metadata().received_at,
956 })
957 }
958
959 fn produce(
960 &self,
961 topic: KafkaTopic,
962 message: KafkaMessage,
964 ) -> Result<(), StoreError> {
965 relay_log::trace!("Sending kafka message of type {}", message.variant());
966
967 let topic_name = self.producer.client.send_message(topic, &message)?;
968
969 match &message {
970 KafkaMessage::Metric {
971 message: metric, ..
972 } => {
973 metric!(
974 counter(RelayCounters::ProcessingMessageProduced) += 1,
975 event_type = message.variant(),
976 topic = topic_name,
977 metric_type = metric.value.variant(),
978 metric_encoding = metric.value.encoding().unwrap_or(""),
979 );
980 }
981 KafkaMessage::ReplayRecordingNotChunked(replay) => {
982 let has_video = replay.replay_video.is_some();
983
984 metric!(
985 counter(RelayCounters::ProcessingMessageProduced) += 1,
986 event_type = message.variant(),
987 topic = topic_name,
988 has_video = bool_to_str(has_video),
989 );
990 }
991 message => {
992 metric!(
993 counter(RelayCounters::ProcessingMessageProduced) += 1,
994 event_type = message.variant(),
995 topic = topic_name,
996 );
997 }
998 }
999
1000 Ok(())
1001 }
1002
1003 fn chunked_attachment_from_placeholder(
1004 &self,
1005 item: &Item,
1006 retention_days: u16,
1007 ) -> Result<ChunkedAttachment, StoreError> {
1008 debug_assert!(
1009 item.stored_key().is_none(),
1010 "AttachmentRef should not have been uploaded to objectstore"
1011 );
1012
1013 let payload = item.payload();
1014 let placeholder: AttachmentPlaceholder<'_> =
1015 serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?;
1016 let location = SignedLocation::<Final>::try_from_str(placeholder.location)
1017 .ok_or(StoreError::InvalidAttachmentRef)?
1018 .verify(Utc::now(), &self.config)
1019 .map_err(|_| StoreError::InvalidAttachmentRef)?;
1020
1021 let store_key = location.key;
1022
1023 Ok(ChunkedAttachment {
1024 id: Uuid::new_v4().to_string(),
1025 name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(),
1026 rate_limited: item.rate_limited(),
1027 content_type: placeholder.content_type,
1028 attachment_type: item.attachment_type().unwrap_or_default(),
1029 size: item.attachment_body_size(),
1030 retention_days,
1031 payload: AttachmentPayload::Stored(store_key),
1032 })
1033 }
1034
1035 fn chunked_attachment_from_attachment(
1036 &self,
1037 event_id: EventId,
1038 project_id: ProjectId,
1039 org_id: OrganizationId,
1040 item: &Item,
1041 send_individual_attachments: bool,
1042 retention_days: u16,
1043 ) -> Result<ChunkedAttachment, StoreError> {
1044 let id = Uuid::new_v4().to_string();
1045
1046 let payload = item.payload();
1047 let size = item.len();
1048 let max_chunk_size = self.config.attachment_chunk_size();
1049
1050 let payload = if size == 0 {
1051 AttachmentPayload::Chunked(0)
1052 } else if let Some(stored_key) = item.stored_key() {
1053 AttachmentPayload::Stored(stored_key.into())
1054 } else if send_individual_attachments && size < max_chunk_size {
1055 AttachmentPayload::Inline(payload)
1059 } else {
1060 let mut chunk_index = 0;
1061 let mut offset = 0;
1062 while offset < size {
1065 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
1066 let chunk_message = AttachmentChunkKafkaMessage {
1067 payload: payload.slice(offset..offset + chunk_size),
1068 event_id,
1069 project_id,
1070 id: id.clone(),
1071 chunk_index,
1072 org_id,
1073 };
1074
1075 self.produce(
1076 KafkaTopic::Attachments,
1077 KafkaMessage::AttachmentChunk(chunk_message),
1078 )?;
1079 offset += chunk_size;
1080 chunk_index += 1;
1081 }
1082
1083 AttachmentPayload::Chunked(chunk_index)
1086 };
1087
1088 Ok(ChunkedAttachment {
1089 id,
1090 name: match item.filename() {
1091 Some(name) => name.to_owned(),
1092 None => UNNAMED_ATTACHMENT.to_owned(),
1093 },
1094 rate_limited: item.rate_limited(),
1095 content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
1096 attachment_type: item.attachment_type().unwrap_or_default(),
1097 size,
1098 retention_days,
1099 payload,
1100 })
1101 }
1102
1103 fn produce_attachment(
1115 &self,
1116 event_id: EventId,
1117 project_id: ProjectId,
1118 org_id: OrganizationId,
1119 item: &Item,
1120 send_individual_attachments: bool,
1121 retention_days: u16,
1122 ) -> Result<Option<ChunkedAttachment>, StoreError> {
1123 let attachment = if item.is_attachment_ref() {
1124 self.chunked_attachment_from_placeholder(item, retention_days)
1125 } else {
1126 self.chunked_attachment_from_attachment(
1127 event_id,
1128 project_id,
1129 org_id,
1130 item,
1131 send_individual_attachments,
1132 retention_days,
1133 )
1134 }?;
1135
1136 if send_individual_attachments {
1137 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
1138 event_id,
1139 project_id,
1140 attachment,
1141 org_id,
1142 });
1143 self.produce(KafkaTopic::Attachments, message)?;
1144 Ok(None)
1145 } else {
1146 Ok(Some(attachment))
1147 }
1148 }
1149
1150 fn produce_user_report(
1151 &self,
1152 event_id: EventId,
1153 project_id: ProjectId,
1154 org_id: OrganizationId,
1155 received_at: DateTime<Utc>,
1156 item: &Item,
1157 ) -> Result<(), StoreError> {
1158 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
1159 project_id,
1160 event_id,
1161 start_time: safe_timestamp(received_at),
1162 payload: item.payload(),
1163 org_id,
1164 });
1165
1166 self.produce(KafkaTopic::Attachments, message)
1167 }
1168
1169 fn produce_user_report_v2(
1170 &self,
1171 event_id: EventId,
1172 project_id: ProjectId,
1173 org_id: OrganizationId,
1174 received_at: DateTime<Utc>,
1175 item: &Item,
1176 remote_addr: Option<String>,
1177 ) -> Result<(), StoreError> {
1178 let message = KafkaMessage::Event(EventKafkaMessage {
1179 project_id,
1180 event_id,
1181 payload: item.payload(),
1182 start_time: safe_timestamp(received_at),
1183 remote_addr,
1184 attachments: vec![],
1185 org_id,
1186 });
1187 self.produce(KafkaTopic::Feedback, message)
1188 }
1189
1190 fn send_metric_message(
1191 &self,
1192 namespace: MetricNamespace,
1193 message: MetricKafkaMessage,
1194 ) -> Result<(), StoreError> {
1195 let topic = match namespace {
1196 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1197 MetricNamespace::Outcomes => {
1198 return self.send_metric_based_outcome(message);
1199 }
1200 MetricNamespace::Unsupported => {
1201 relay_log::error!(
1202 metric_message.name = message.name.as_ref(),
1203 "store service dropping unknown metric usecase"
1204 );
1205 return Ok(());
1206 }
1207 _ => KafkaTopic::MetricsGeneric,
1208 };
1209
1210 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1211 self.produce(topic, KafkaMessage::Metric { headers, message })?;
1212 Ok(())
1213 }
1214
1215 fn send_metric_based_outcome(&self, message: MetricKafkaMessage) -> Result<(), StoreError> {
1216 let Some(outcome) = outcome::metric::to_outcome_id(message.name) else {
1217 relay_log::error!(
1218 mri = message.name.as_ref(),
1219 "invalid outcome metric, cannot infer outcome id from metric name"
1220 );
1221 return Ok(());
1222 };
1223 let quantity = match message.value {
1224 MetricValue::Counter(c) => c.to_f64() as u32,
1225 v => {
1226 relay_log::error!(
1227 mri = message.name.as_ref(),
1228 "invalid outcome metric, expected a counter got '{}'",
1229 v.variant()
1230 );
1231 return Ok(());
1232 }
1233 };
1234
1235 let outcome = OutcomeMessage {
1236 timestamp: message
1237 .timestamp
1238 .as_datetime()
1239 .unwrap_or_else(Utc::now)
1240 .to_rfc3339_opts(SecondsFormat::Micros, true),
1241 org_id: Some(message.org_id).filter(|id| id.value() != 0),
1242 project_id: message.project_id,
1243 key_id: message.key_id,
1244 outcome,
1245 reason: message.tags.get("reason").map(|s| s.as_str()),
1246 event_id: message.tags.get("event_id").map(|s| s.as_str()),
1247 remote_addr: message.tags.get("remote_addr").map(|s| s.as_str()),
1248 source: message.tags.get("source").map(|s| s.as_str()),
1249 category: message.tags.get("category").and_then(|s| s.parse().ok()),
1250 quantity: Some(quantity),
1251 };
1252
1253 let topic = match outcome.outcome.is_billing() {
1254 true => KafkaTopic::OutcomesBilling,
1255 false => KafkaTopic::Outcomes,
1256 };
1257
1258 self.produce(topic, KafkaMessage::Outcome(outcome))
1259 }
1260
1261 fn produce_profile(
1262 &self,
1263 organization_id: OrganizationId,
1264 project_id: ProjectId,
1265 key_id: Option<u64>,
1266 received_at: DateTime<Utc>,
1267 retention_days: u16,
1268 item: &Item,
1269 ) -> Result<(), StoreError> {
1270 let message = ProfileKafkaMessage {
1271 organization_id,
1272 project_id,
1273 key_id,
1274 received: safe_timestamp(received_at),
1275 retention_days,
1276 headers: BTreeMap::from([
1277 (
1278 "sampled".to_owned(),
1279 if item.sampled() { "true" } else { "false" }.to_owned(),
1280 ),
1281 ("project_id".to_owned(), project_id.to_string()),
1282 ]),
1283 payload: item.payload(),
1284 };
1285 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1286 Ok(())
1287 }
1288
1289 fn produce_check_in(
1290 &self,
1291 project_id: ProjectId,
1292 org_id: OrganizationId,
1293 received_at: DateTime<Utc>,
1294 client: Option<&str>,
1295 retention_days: u16,
1296 item: &Item,
1297 ) -> Result<(), StoreError> {
1298 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1299 message_type: CheckInMessageType::CheckIn,
1300 project_id,
1301 retention_days,
1302 start_time: safe_timestamp(received_at),
1303 sdk: client.map(str::to_owned),
1304 payload: item.payload(),
1305 routing_key_hint: item.routing_hint(),
1306 org_id,
1307 });
1308
1309 self.produce(KafkaTopic::Monitors, message)?;
1310
1311 Ok(())
1312 }
1313}
1314
1315impl Service for StoreService {
1316 type Interface = Store;
1317
1318 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1319 let this = Arc::new(self);
1320
1321 relay_log::info!("store forwarder started");
1322
1323 while let Some(message) = rx.recv().await {
1324 let task = StoreTask {
1325 service: Arc::clone(&this),
1326 message: Some(message),
1327 };
1328 this.pool.spawn_async(task).await;
1329 }
1330
1331 relay_log::info!("store forwarder stopped");
1332 }
1333}
1334
1335pub struct StoreTask {
1337 service: Arc<StoreService>,
1338 message: Option<Store>,
1339}
1340
1341impl Future for StoreTask {
1342 type Output = ();
1343
1344 fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> task::Poll<Self::Output> {
1345 let message = self
1346 .message
1347 .take()
1348 .expect("StoreTask polled after completion");
1349 let () = relay_log::with_scope(|_| {}, || self.service.handle_message(message));
1350 task::Poll::Ready(())
1351 }
1352}
1353
1354#[derive(Debug, Serialize)]
1356enum AttachmentPayload {
1357 #[serde(rename = "chunks")]
1362 Chunked(usize),
1363
1364 #[serde(rename = "data")]
1366 Inline(Bytes),
1367
1368 #[serde(rename = "stored_id")]
1370 Stored(String),
1371}
1372
1373#[derive(Debug, Serialize)]
1375struct ChunkedAttachment {
1376 id: String,
1380
1381 name: String,
1383
1384 rate_limited: bool,
1391
1392 #[serde(skip_serializing_if = "Option::is_none")]
1394 content_type: Option<String>,
1395
1396 #[serde(serialize_with = "serialize_attachment_type")]
1398 attachment_type: AttachmentType,
1399
1400 size: usize,
1402
1403 retention_days: u16,
1405
1406 #[serde(flatten)]
1408 payload: AttachmentPayload,
1409}
1410
1411fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1417where
1418 S: serde::Serializer,
1419 T: serde::Serialize,
1420{
1421 serde_json::to_value(t)
1422 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1423 .serialize(serializer)
1424}
1425
1426#[derive(Debug, Serialize)]
1428struct EventKafkaMessage {
1429 payload: Bytes,
1431 start_time: u64,
1433 event_id: EventId,
1435 project_id: ProjectId,
1437 remote_addr: Option<String>,
1439 attachments: Vec<ChunkedAttachment>,
1441
1442 #[serde(skip)]
1444 org_id: OrganizationId,
1445}
1446
1447#[derive(Debug, Serialize)]
1449struct AttachmentChunkKafkaMessage {
1450 payload: Bytes,
1452 event_id: EventId,
1454 project_id: ProjectId,
1456 id: String,
1460 chunk_index: usize,
1462
1463 #[serde(skip)]
1465 org_id: OrganizationId,
1466}
1467
1468#[derive(Debug, Serialize)]
1473struct AttachmentKafkaMessage {
1474 event_id: EventId,
1476 project_id: ProjectId,
1478 attachment: ChunkedAttachment,
1480
1481 #[serde(skip)]
1483 org_id: OrganizationId,
1484}
1485
1486#[derive(Debug, Serialize)]
1487struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1488 replay_id: EventId,
1489 key_id: Option<u64>,
1490 org_id: OrganizationId,
1491 project_id: ProjectId,
1492 received: u64,
1493 retention_days: u16,
1494 #[serde(with = "serde_bytes")]
1495 payload: &'a [u8],
1496 #[serde(with = "serde_bytes")]
1497 replay_event: Option<&'a [u8]>,
1498 #[serde(with = "serde_bytes")]
1499 replay_video: Option<&'a [u8]>,
1500 relay_snuba_publish_disabled: bool,
1501}
1502
1503#[derive(Debug, Serialize)]
1507struct UserReportKafkaMessage {
1508 project_id: ProjectId,
1510 start_time: u64,
1511 payload: Bytes,
1512
1513 #[serde(skip)]
1515 event_id: EventId,
1516 #[serde(skip)]
1518 org_id: OrganizationId,
1519}
1520
1521#[derive(Clone, Debug, Serialize)]
1522struct MetricKafkaMessage<'a> {
1523 org_id: OrganizationId,
1524 project_id: ProjectId,
1525 #[serde(skip)]
1526 key_id: Option<u64>,
1527 name: &'a MetricName,
1528 #[serde(flatten)]
1529 value: MetricValue<'a>,
1530 timestamp: UnixTimestamp,
1531 tags: &'a BTreeMap<String, String>,
1532 retention_days: u16,
1533 #[serde(skip_serializing_if = "Option::is_none")]
1534 received_at: Option<UnixTimestamp>,
1535}
1536
1537#[derive(Clone, Debug, Serialize)]
1538#[serde(tag = "type", content = "value")]
1539enum MetricValue<'a> {
1540 #[serde(rename = "c")]
1541 Counter(FiniteF64),
1542 #[serde(rename = "d")]
1543 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1544 #[serde(rename = "s")]
1545 Set(ArrayEncoding<'a, SetView<'a>>),
1546 #[serde(rename = "g")]
1547 Gauge(GaugeValue),
1548}
1549
1550impl MetricValue<'_> {
1551 fn variant(&self) -> &'static str {
1552 match self {
1553 Self::Counter(_) => "counter",
1554 Self::Distribution(_) => "distribution",
1555 Self::Set(_) => "set",
1556 Self::Gauge(_) => "gauge",
1557 }
1558 }
1559
1560 fn encoding(&self) -> Option<&'static str> {
1561 match self {
1562 Self::Distribution(ae) => Some(ae.name()),
1563 Self::Set(ae) => Some(ae.name()),
1564 _ => None,
1565 }
1566 }
1567}
1568
1569#[derive(Debug, Serialize, Clone)]
1571pub struct OutcomeMessage<'a> {
1572 timestamp: String,
1574 #[serde(skip_serializing_if = "Option::is_none")]
1576 org_id: Option<OrganizationId>,
1577 project_id: ProjectId,
1579 #[serde(skip_serializing_if = "Option::is_none")]
1581 key_id: Option<u64>,
1582 outcome: OutcomeId,
1584 #[serde(skip_serializing_if = "Option::is_none")]
1586 reason: Option<&'a str>,
1587 #[serde(skip_serializing_if = "Option::is_none")]
1589 event_id: Option<&'a str>,
1590 #[serde(skip_serializing_if = "Option::is_none")]
1592 remote_addr: Option<&'a str>,
1593 #[serde(skip_serializing_if = "Option::is_none")]
1595 source: Option<&'a str>,
1596 #[serde(skip_serializing_if = "Option::is_none")]
1598 category: Option<u8>,
1599 #[serde(skip_serializing_if = "Option::is_none")]
1601 quantity: Option<u32>,
1602}
1603
1604#[derive(Clone, Debug, Serialize)]
1605struct ProfileKafkaMessage {
1606 organization_id: OrganizationId,
1607 project_id: ProjectId,
1608 key_id: Option<u64>,
1609 received: u64,
1610 retention_days: u16,
1611 #[serde(skip)]
1612 headers: BTreeMap<String, String>,
1613 payload: Bytes,
1614}
1615
1616#[allow(dead_code)]
1622#[derive(Debug, Serialize)]
1623#[serde(rename_all = "snake_case")]
1624enum CheckInMessageType {
1625 ClockPulse,
1626 CheckIn,
1627}
1628
1629#[derive(Debug, Serialize)]
1630struct CheckInKafkaMessage {
1631 message_type: CheckInMessageType,
1633 payload: Bytes,
1635 start_time: u64,
1637 sdk: Option<String>,
1639 project_id: ProjectId,
1641 retention_days: u16,
1643
1644 #[serde(skip)]
1646 routing_key_hint: Option<Uuid>,
1647 #[serde(skip)]
1649 org_id: OrganizationId,
1650}
1651
1652#[derive(Debug, Serialize)]
1653struct SpanKafkaMessage<'a> {
1654 #[serde(flatten)]
1655 meta: SpanMeta,
1656 #[serde(flatten)]
1657 span: SerializableAnnotated<'a, SpanV2>,
1658}
1659
1660#[derive(Debug, Serialize)]
1661struct SpanMeta {
1662 organization_id: OrganizationId,
1663 project_id: ProjectId,
1664 #[serde(skip_serializing_if = "Option::is_none")]
1666 key_id: Option<u64>,
1667 #[serde(skip_serializing_if = "Option::is_none")]
1668 event_id: Option<EventId>,
1669 received: f64,
1671 retention_days: u16,
1673 downsampled_retention_days: u16,
1675 accepted_outcome_emitted: bool,
1677 #[serde(rename = "_performance_issues_spans", skip_serializing_if = "is_false")]
1679 performance_issues_spans: bool,
1680}
1681
1682fn is_false(val: &bool) -> bool {
1683 !val
1684}
1685
1686#[derive(Clone, Debug, Serialize)]
1687struct ProfileChunkKafkaMessage {
1688 organization_id: OrganizationId,
1689 project_id: ProjectId,
1690 received: u64,
1691 retention_days: u16,
1692 #[serde(skip)]
1693 headers: BTreeMap<String, String>,
1694 payload: Bytes,
1695}
1696
1697#[derive(Debug, Serialize)]
1699#[serde(tag = "type", rename_all = "snake_case")]
1700#[allow(clippy::large_enum_variant)]
1701enum KafkaMessage<'a> {
1702 Event(EventKafkaMessage),
1703 UserReport(UserReportKafkaMessage),
1704 Metric {
1705 #[serde(skip)]
1706 headers: BTreeMap<String, String>,
1707 #[serde(flatten)]
1708 message: MetricKafkaMessage<'a>,
1709 },
1710 CheckIn(CheckInKafkaMessage),
1711 Item {
1712 #[serde(skip)]
1713 headers: BTreeMap<String, String>,
1714 #[serde(skip)]
1715 item_type: TraceItemType,
1716 #[serde(skip)]
1717 message: TraceItem,
1718 },
1719 SpanV2 {
1720 #[serde(skip)]
1721 routing_key: Option<Uuid>,
1722 #[serde(skip)]
1723 headers: BTreeMap<String, String>,
1724 #[serde(flatten)]
1725 message: SpanKafkaMessage<'a>,
1726
1727 #[serde(skip)]
1729 org_id: OrganizationId,
1730 },
1731
1732 Attachment(AttachmentKafkaMessage),
1733 AttachmentChunk(AttachmentChunkKafkaMessage),
1734
1735 Profile(ProfileKafkaMessage),
1736 ProfileChunk(ProfileChunkKafkaMessage),
1737
1738 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1739
1740 Outcome(OutcomeMessage<'a>),
1741}
1742
1743impl KafkaMessage<'_> {
1744 fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1746 let item_type = item.item_type();
1747 KafkaMessage::Item {
1748 headers: BTreeMap::from([
1749 ("project_id".to_owned(), scoping.project_id.to_string()),
1750 ("item_type".to_owned(), (item_type as i32).to_string()),
1751 ]),
1752 message: item,
1753 item_type,
1754 }
1755 }
1756}
1757
1758impl Message for KafkaMessage<'_> {
1759 fn variant(&self) -> &'static str {
1760 match self {
1761 KafkaMessage::Event(_) => "event",
1762 KafkaMessage::UserReport(_) => "user_report",
1763 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1764 MetricNamespace::Sessions => "metric_sessions",
1765 MetricNamespace::Spans => "metric_spans",
1766 MetricNamespace::Transactions => "metric_transactions",
1767 MetricNamespace::Custom => "metric_custom",
1768 MetricNamespace::Outcomes => "metric_outcomes",
1769 MetricNamespace::Unsupported => "metric_unsupported",
1770 },
1771 KafkaMessage::CheckIn(_) => "check_in",
1772 KafkaMessage::SpanV2 { .. } => "span",
1773 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1774
1775 KafkaMessage::Attachment(_) => "attachment",
1776 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1777
1778 KafkaMessage::Profile(_) => "profile",
1779 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1780
1781 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1782
1783 KafkaMessage::Outcome(_) => "outcome",
1784 }
1785 }
1786
1787 fn key(&self) -> Option<relay_kafka::Key> {
1789 match self {
1790 Self::Event(message) => Some((message.event_id.0, message.org_id)),
1791 Self::UserReport(message) => Some((message.event_id.0, message.org_id)),
1792 Self::SpanV2 {
1793 routing_key,
1794 org_id,
1795 ..
1796 } => routing_key.map(|r| (r, *org_id)),
1797
1798 Self::CheckIn(message) => message.routing_key_hint.map(|r| (r, message.org_id)),
1803
1804 Self::Attachment(message) => Some((message.event_id.0, message.org_id)),
1805 Self::AttachmentChunk(message) => Some((message.event_id.0, message.org_id)),
1806
1807 Self::Metric { .. }
1809 | Self::Item { .. }
1810 | Self::Profile(_)
1811 | Self::ProfileChunk(_)
1812 | Self::ReplayRecordingNotChunked(_)
1813 | Self::Outcome(_) => None,
1814 }
1815 .filter(|(uuid, _)| !uuid.is_nil())
1816 .map(|(uuid, org_id)| {
1817 let mut res = uuid.into_bytes();
1820 for (i, &b) in org_id.value().to_be_bytes().iter().enumerate() {
1821 res[i] ^= b;
1822 }
1823 u128::from_be_bytes(res)
1824 })
1825 }
1826
1827 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1828 match &self {
1829 KafkaMessage::Metric { headers, .. }
1830 | KafkaMessage::SpanV2 { headers, .. }
1831 | KafkaMessage::Item { headers, .. }
1832 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1833 | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1834
1835 KafkaMessage::Event(_)
1836 | KafkaMessage::UserReport(_)
1837 | KafkaMessage::CheckIn(_)
1838 | KafkaMessage::Attachment(_)
1839 | KafkaMessage::AttachmentChunk(_)
1840 | KafkaMessage::ReplayRecordingNotChunked(_)
1841 | KafkaMessage::Outcome(_) => None,
1842 }
1843 }
1844
1845 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1846 match self {
1847 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1848 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1849 KafkaMessage::Item { message, .. } => {
1850 let mut payload = Vec::new();
1851 match message.encode(&mut payload) {
1852 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1853 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1854 }
1855 }
1856 KafkaMessage::Outcome(outcome) => serialize_as_json(outcome),
1857 KafkaMessage::Event(_)
1858 | KafkaMessage::UserReport(_)
1859 | KafkaMessage::CheckIn(_)
1860 | KafkaMessage::Attachment(_)
1861 | KafkaMessage::AttachmentChunk(_)
1862 | KafkaMessage::Profile(_)
1863 | KafkaMessage::ProfileChunk(_)
1864 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1865 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1866 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1867 },
1868 }
1869 }
1870}
1871
1872fn serialize_as_json<T: serde::Serialize>(
1873 value: &T,
1874) -> Result<SerializationOutput<'_>, ClientError> {
1875 match serde_json::to_vec(value) {
1876 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1877 Err(err) => Err(ClientError::InvalidJson(err)),
1878 }
1879}
1880
1881fn is_slow_item(item: &Item) -> bool {
1885 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1886}
1887
1888fn bool_to_str(value: bool) -> &'static str {
1889 if value { "true" } else { "false" }
1890}
1891
1892fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1896 let ts = timestamp.timestamp();
1897 if ts >= 0 {
1898 return ts as u64;
1899 }
1900
1901 Utc::now().timestamp() as u64
1903}