1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42use crate::services::upload::SignedLocation;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46mod sessions;
47
48const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
50
51#[derive(Debug, thiserror::Error)]
52pub enum StoreError {
53 #[error("failed to send the message to kafka: {0}")]
54 SendFailed(#[from] ClientError),
55 #[error("failed to encode data: {0}")]
56 EncodingFailed(std::io::Error),
57 #[error("failed to store event because event id was missing")]
58 NoEventId,
59 #[error("invalid attachment reference")]
60 InvalidAttachmentRef,
61}
62
63impl OutcomeError for StoreError {
64 type Error = Self;
65
66 fn consume(self) -> (Option<Outcome>, Self::Error) {
67 let outcome = match self {
68 StoreError::SendFailed(_) | StoreError::EncodingFailed(_) | StoreError::NoEventId => {
69 Some(Outcome::Invalid(DiscardReason::Internal))
70 }
71 StoreError::InvalidAttachmentRef => {
72 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
73 }
74 };
75 (outcome, self)
76 }
77}
78
79struct Producer {
80 client: KafkaClient,
81}
82
83impl Producer {
84 pub fn create(config: &Config) -> anyhow::Result<Self> {
85 let mut client_builder = KafkaClient::builder();
86
87 for topic in KafkaTopic::iter().filter(|t| {
88 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
91 }) {
92 let kafka_configs = config.kafka_configs(*topic)?;
93 client_builder = client_builder
94 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
95 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
96 }
97
98 Ok(Self {
99 client: client_builder.build(),
100 })
101 }
102}
103
104#[derive(Debug)]
106pub struct StoreEnvelope {
107 pub envelope: ManagedEnvelope,
108}
109
110#[derive(Clone, Debug)]
112pub struct StoreMetrics {
113 pub buckets: Vec<Bucket>,
114 pub scoping: Scoping,
115 pub retention: u16,
116}
117
118#[derive(Debug)]
120pub struct StoreTraceItem {
121 pub trace_item: TraceItem,
123}
124
125impl Counted for StoreTraceItem {
126 fn quantities(&self) -> Quantities {
127 self.trace_item.quantities()
128 }
129}
130
131#[derive(Debug)]
133pub struct StoreSpanV2 {
134 pub routing_key: Option<Uuid>,
136 pub retention_days: u16,
138 pub downsampled_retention_days: u16,
140 pub item: SpanV2,
142}
143
144impl Counted for StoreSpanV2 {
145 fn quantities(&self) -> Quantities {
146 smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
147 }
148}
149
150#[derive(Debug)]
152pub struct StoreProfileChunk {
153 pub retention_days: u16,
155 pub payload: Bytes,
157 pub quantities: Quantities,
161}
162
163impl Counted for StoreProfileChunk {
164 fn quantities(&self) -> Quantities {
165 self.quantities.clone()
166 }
167}
168
169#[derive(Debug)]
171pub struct StoreReplay {
172 pub event_id: EventId,
174 pub retention_days: u16,
176 pub recording: Bytes,
178 pub event: Option<Bytes>,
180 pub video: Option<Bytes>,
182 pub quantities: Quantities,
186}
187
188impl Counted for StoreReplay {
189 fn quantities(&self) -> Quantities {
190 self.quantities.clone()
191 }
192}
193
194#[derive(Debug)]
196pub struct StoreAttachment {
197 pub event_id: EventId,
199 pub attachment: Item,
201 pub quantities: Quantities,
203 pub retention: u16,
205}
206
207impl Counted for StoreAttachment {
208 fn quantities(&self) -> Quantities {
209 self.quantities.clone()
210 }
211}
212
213#[derive(Debug)]
215pub struct StoreUserReport {
216 pub event_id: EventId,
218 pub report: Item,
220}
221
222impl Counted for StoreUserReport {
223 fn quantities(&self) -> Quantities {
224 smallvec::smallvec![(DataCategory::UserReportV2, 1)]
225 }
226}
227
228#[derive(Debug)]
230pub struct StoreProfile {
231 pub retention_days: u16,
233 pub profile: Item,
235 pub quantities: Quantities,
237}
238
239impl Counted for StoreProfile {
240 fn quantities(&self) -> Quantities {
241 self.quantities.clone()
242 }
243}
244
245pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
247
248#[derive(Debug)]
250pub enum Store {
251 Envelope(StoreEnvelope),
259 Metrics(StoreMetrics),
261 TraceItem(Managed<StoreTraceItem>),
263 Span(Managed<Box<StoreSpanV2>>),
265 ProfileChunk(Managed<StoreProfileChunk>),
267 Replay(Managed<StoreReplay>),
269 Attachment(Managed<StoreAttachment>),
271 UserReport(Managed<StoreUserReport>),
273 Profile(Managed<StoreProfile>),
275}
276
277impl Store {
278 fn variant(&self) -> &'static str {
280 match self {
281 Store::Envelope(_) => "envelope",
282 Store::Metrics(_) => "metrics",
283 Store::TraceItem(_) => "trace_item",
284 Store::Span(_) => "span",
285 Store::ProfileChunk(_) => "profile_chunk",
286 Store::Replay(_) => "replay",
287 Store::Attachment(_) => "attachment",
288 Store::UserReport(_) => "user_report",
289 Store::Profile(_) => "profile",
290 }
291 }
292}
293
294impl Interface for Store {}
295
296impl FromMessage<StoreEnvelope> for Store {
297 type Response = NoResponse;
298
299 fn from_message(message: StoreEnvelope, _: ()) -> Self {
300 Self::Envelope(message)
301 }
302}
303
304impl FromMessage<StoreMetrics> for Store {
305 type Response = NoResponse;
306
307 fn from_message(message: StoreMetrics, _: ()) -> Self {
308 Self::Metrics(message)
309 }
310}
311
312impl FromMessage<Managed<StoreTraceItem>> for Store {
313 type Response = NoResponse;
314
315 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
316 Self::TraceItem(message)
317 }
318}
319
320impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
321 type Response = NoResponse;
322
323 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
324 Self::Span(message)
325 }
326}
327
328impl FromMessage<Managed<StoreProfileChunk>> for Store {
329 type Response = NoResponse;
330
331 fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
332 Self::ProfileChunk(message)
333 }
334}
335
336impl FromMessage<Managed<StoreReplay>> for Store {
337 type Response = NoResponse;
338
339 fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
340 Self::Replay(message)
341 }
342}
343
344impl FromMessage<Managed<StoreAttachment>> for Store {
345 type Response = NoResponse;
346
347 fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
348 Self::Attachment(message)
349 }
350}
351
352impl FromMessage<Managed<StoreUserReport>> for Store {
353 type Response = NoResponse;
354
355 fn from_message(message: Managed<StoreUserReport>, _: ()) -> Self {
356 Self::UserReport(message)
357 }
358}
359
360impl FromMessage<Managed<StoreProfile>> for Store {
361 type Response = NoResponse;
362
363 fn from_message(message: Managed<StoreProfile>, _: ()) -> Self {
364 Self::Profile(message)
365 }
366}
367
368pub struct StoreService {
370 pool: StoreServicePool,
371 config: Arc<Config>,
372 global_config: GlobalConfigHandle,
373 outcome_aggregator: Addr<TrackOutcome>,
374 metric_outcomes: MetricOutcomes,
375 producer: Producer,
376}
377
378impl StoreService {
379 pub fn create(
380 pool: StoreServicePool,
381 config: Arc<Config>,
382 global_config: GlobalConfigHandle,
383 outcome_aggregator: Addr<TrackOutcome>,
384 metric_outcomes: MetricOutcomes,
385 ) -> anyhow::Result<Self> {
386 let producer = Producer::create(&config)?;
387 Ok(Self {
388 pool,
389 config,
390 global_config,
391 outcome_aggregator,
392 metric_outcomes,
393 producer,
394 })
395 }
396
397 fn handle_message(&self, message: Store) {
398 let ty = message.variant();
399 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
400 match message {
401 Store::Envelope(message) => self.handle_store_envelope(message),
402 Store::Metrics(message) => self.handle_store_metrics(message),
403 Store::TraceItem(message) => self.handle_store_trace_item(message),
404 Store::Span(message) => self.handle_store_span(message),
405 Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
406 Store::Replay(message) => self.handle_store_replay(message),
407 Store::Attachment(message) => self.handle_store_attachment(message),
408 Store::UserReport(message) => self.handle_user_report(message),
409 Store::Profile(message) => self.handle_profile(message),
410 }
411 })
412 }
413
414 fn handle_store_envelope(&self, message: StoreEnvelope) {
415 let StoreEnvelope { mut envelope } = message;
416
417 let scoping = envelope.scoping();
418 match self.store_envelope(&mut envelope) {
419 Ok(()) => envelope.accept(),
420 Err(error) => {
421 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
422 relay_log::error!(
423 error = &error as &dyn Error,
424 tags.project_key = %scoping.project_key,
425 "failed to store envelope"
426 );
427 }
428 }
429 }
430
431 fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
432 let mut envelope = managed_envelope.take_envelope();
433 let received_at = managed_envelope.received_at();
434 let scoping = managed_envelope.scoping();
435
436 let retention = envelope.retention();
437 let downsampled_retention = envelope.downsampled_retention();
438
439 let event_id = envelope.event_id();
440 let event_item = envelope.as_mut().take_item_by(|item| {
441 matches!(
442 item.ty(),
443 ItemType::Event | ItemType::Transaction | ItemType::Security
444 )
445 });
446 let event_type = event_item.as_ref().map(|item| item.ty());
447
448 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
452 KafkaTopic::Transactions
453 } else if envelope.get_item_by(is_slow_item).is_some() {
454 KafkaTopic::Attachments
455 } else {
456 KafkaTopic::Events
457 };
458
459 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
460
461 let mut attachments = Vec::new();
462
463 for item in envelope.items() {
464 let content_type = item.content_type();
465 match item.ty() {
466 ItemType::Attachment => {
467 if let Some(attachment) = self.produce_attachment(
468 event_id.ok_or(StoreError::NoEventId)?,
469 scoping.project_id,
470 scoping.organization_id,
471 item,
472 send_individual_attachments,
473 retention,
474 )? {
475 attachments.push(attachment);
476 }
477 }
478 ItemType::UserReport => {
479 debug_assert!(event_topic == KafkaTopic::Attachments);
480 self.produce_user_report(
481 event_id.ok_or(StoreError::NoEventId)?,
482 scoping.project_id,
483 scoping.organization_id,
484 received_at,
485 item,
486 )?;
487 }
488 ItemType::UserReportV2 => {
489 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
490 self.produce_user_report_v2(
491 event_id.ok_or(StoreError::NoEventId)?,
492 scoping.project_id,
493 scoping.organization_id,
494 received_at,
495 item,
496 remote_addr,
497 )?;
498 }
499 ItemType::Profile => self.produce_profile(
500 scoping.organization_id,
501 scoping.project_id,
502 scoping.key_id,
503 received_at,
504 retention,
505 item,
506 )?,
507 ItemType::CheckIn => {
508 let client = envelope.meta().client();
509 self.produce_check_in(
510 scoping.project_id,
511 scoping.organization_id,
512 received_at,
513 client,
514 retention,
515 item,
516 )?
517 }
518 ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span(
519 scoping,
520 received_at,
521 event_id,
522 retention,
523 downsampled_retention,
524 item,
525 )?,
526 ty @ ItemType::Log => {
527 debug_assert!(
528 false,
529 "received {ty} through an envelope, \
530 this item must be submitted via a specific store message instead"
531 );
532 relay_log::error!(
533 tags.project_key = %scoping.project_key,
534 "StoreService received unsupported item type '{ty}' in envelope"
535 );
536 }
537 other => {
538 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
539 let item_types = envelope
540 .items()
541 .map(|item| item.ty().as_str())
542 .collect::<Vec<_>>();
543 let attachment_types = envelope
544 .items()
545 .map(|item| {
546 item.attachment_type()
547 .map(|t| t.to_string())
548 .unwrap_or_default()
549 })
550 .collect::<Vec<_>>();
551
552 relay_log::with_scope(
553 |scope| {
554 scope.set_extra("item_types", item_types.into());
555 scope.set_extra("attachment_types", attachment_types.into());
556 if other == &ItemType::FormData {
557 let payload = item.payload();
558 let form_data_keys = FormDataIter::new(&payload)
559 .map(|entry| entry.key())
560 .collect::<Vec<_>>();
561 scope.set_extra("form_data_keys", form_data_keys.into());
562 }
563 },
564 || {
565 relay_log::error!(
566 tags.project_key = %scoping.project_key,
567 tags.event_type = event_type.unwrap_or("none"),
568 "StoreService received unexpected item type: {other}"
569 )
570 },
571 )
572 }
573 }
574 }
575
576 if let Some(event_item) = event_item {
577 let event_id = event_id.ok_or(StoreError::NoEventId)?;
578 let project_id = scoping.project_id;
579 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
580
581 self.produce(
582 event_topic,
583 KafkaMessage::Event(EventKafkaMessage {
584 payload: event_item.payload(),
585 start_time: safe_timestamp(received_at),
586 event_id,
587 project_id,
588 remote_addr,
589 attachments,
590 org_id: scoping.organization_id,
591 }),
592 )?;
593 } else {
594 debug_assert!(attachments.is_empty());
595 }
596
597 Ok(())
598 }
599
600 fn handle_store_metrics(&self, message: StoreMetrics) {
601 let StoreMetrics {
602 buckets,
603 scoping,
604 retention,
605 } = message;
606
607 let batch_size = self.config.metrics_max_batch_size_bytes();
608 let mut error = None;
609
610 let global_config = self.global_config.current();
611 let mut encoder = BucketEncoder::new(&global_config);
612
613 let emit_sessions_to_eap = utils::is_rolled_out(
614 scoping.organization_id.value(),
615 global_config.options.sessions_eap_rollout_rate,
616 )
617 .is_keep();
618
619 let now = UnixTimestamp::now();
620 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
621
622 for mut bucket in buckets {
623 let namespace = encoder.prepare(&mut bucket);
624
625 if let Some(received_at) = bucket.metadata.received_at {
626 let delay = now.as_secs().saturating_sub(received_at.as_secs());
627 let (total, count, max) = delay_stats.get_mut(namespace);
628 *total += delay;
629 *count += 1;
630 *max = (*max).max(delay);
631 }
632
633 for view in BucketsView::new(std::slice::from_ref(&bucket))
637 .by_size(batch_size)
638 .flatten()
639 {
640 let message = self.create_metric_message(
641 scoping.organization_id,
642 scoping.project_id,
643 &mut encoder,
644 namespace,
645 &view,
646 retention,
647 );
648
649 let result =
650 message.and_then(|message| self.send_metric_message(namespace, message));
651
652 let outcome = match result {
653 Ok(()) => Outcome::Accepted,
654 Err(e) => {
655 error.get_or_insert(e);
656 Outcome::Invalid(DiscardReason::Internal)
657 }
658 };
659
660 self.metric_outcomes.track(scoping, &[view], outcome);
661 }
662
663 if emit_sessions_to_eap
664 && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
665 {
666 let message = KafkaMessage::for_item(scoping, trace_item);
667 let _ = self.produce(KafkaTopic::Items, message);
668 }
669 }
670
671 if let Some(error) = error {
672 relay_log::error!(
673 error = &error as &dyn std::error::Error,
674 "failed to produce metric buckets: {error}"
675 );
676 }
677
678 for (namespace, (total, count, max)) in delay_stats {
679 if count == 0 {
680 continue;
681 }
682 metric!(
683 counter(RelayCounters::MetricDelaySum) += total,
684 namespace = namespace.as_str()
685 );
686 metric!(
687 counter(RelayCounters::MetricDelayCount) += count,
688 namespace = namespace.as_str()
689 );
690 metric!(
691 gauge(RelayGauges::MetricDelayMax) = max,
692 namespace = namespace.as_str()
693 );
694 }
695 }
696
697 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
698 let scoping = message.scoping();
699 let received_at = message.received_at();
700
701 let eap_emits_outcomes = utils::is_rolled_out(
702 scoping.organization_id.value(),
703 self.global_config
704 .current()
705 .options
706 .eap_outcomes_rollout_rate,
707 )
708 .is_keep();
709
710 let outcomes = message.try_accept(|mut item| {
711 let outcomes = match eap_emits_outcomes {
712 true => None,
713 false => item.trace_item.outcomes.take(),
714 };
715
716 let message = KafkaMessage::for_item(scoping, item.trace_item);
717 self.produce(KafkaTopic::Items, message).map(|()| outcomes)
718 });
719
720 if let Ok(Some(outcomes)) = outcomes {
725 for (category, quantity) in outcomes.quantities() {
726 self.outcome_aggregator.send(TrackOutcome {
727 category,
728 event_id: None,
729 outcome: Outcome::Accepted,
730 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
731 remote_addr: None,
732 scoping,
733 timestamp: received_at,
734 });
735 }
736 }
737 }
738
739 fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
740 let scoping = message.scoping();
741 let received_at = message.received_at();
742
743 let relay_emits_accepted_outcome = !utils::is_rolled_out(
744 scoping.organization_id.value(),
745 self.global_config
746 .current()
747 .options
748 .eap_span_outcomes_rollout_rate,
749 )
750 .is_keep();
751
752 let meta = SpanMeta {
753 organization_id: scoping.organization_id,
754 project_id: scoping.project_id,
755 key_id: scoping.key_id,
756 event_id: None,
757 retention_days: message.retention_days,
758 downsampled_retention_days: message.downsampled_retention_days,
759 received: datetime_to_timestamp(received_at),
760 accepted_outcome_emitted: relay_emits_accepted_outcome,
761 };
762
763 let result = message.try_accept(|span| {
764 let item = Annotated::new(span.item);
765 let message = KafkaMessage::SpanV2 {
766 routing_key: span.routing_key,
767 headers: BTreeMap::from([(
768 "project_id".to_owned(),
769 scoping.project_id.to_string(),
770 )]),
771 message: SpanKafkaMessage {
772 meta,
773 span: SerializableAnnotated(&item),
774 },
775 org_id: scoping.organization_id,
776 };
777
778 self.produce(KafkaTopic::Spans, message)
779 });
780
781 if result.is_ok() {
782 relay_statsd::metric!(
783 counter(RelayCounters::SpanV2Produced) += 1,
784 via = "processing"
785 );
786
787 if relay_emits_accepted_outcome {
788 self.outcome_aggregator.send(TrackOutcome {
791 category: DataCategory::SpanIndexed,
792 event_id: None,
793 outcome: Outcome::Accepted,
794 quantity: 1,
795 remote_addr: None,
796 scoping,
797 timestamp: received_at,
798 });
799 }
800 }
801 }
802
803 fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
804 let scoping = message.scoping();
805 let received_at = message.received_at();
806
807 let _ = message.try_accept(|message| {
808 let message = ProfileChunkKafkaMessage {
809 organization_id: scoping.organization_id,
810 project_id: scoping.project_id,
811 received: safe_timestamp(received_at),
812 retention_days: message.retention_days,
813 headers: BTreeMap::from([(
814 "project_id".to_owned(),
815 scoping.project_id.to_string(),
816 )]),
817 payload: message.payload,
818 };
819
820 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
821 });
822 }
823
824 fn handle_store_replay(&self, message: Managed<StoreReplay>) {
825 let scoping = message.scoping();
826 let received_at = message.received_at();
827
828 let _ = message.try_accept(|replay| {
829 let kafka_msg =
830 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
831 replay_id: replay.event_id,
832 key_id: scoping.key_id,
833 org_id: scoping.organization_id,
834 project_id: scoping.project_id,
835 received: safe_timestamp(received_at),
836 retention_days: replay.retention_days,
837 payload: &replay.recording,
838 replay_event: replay.event.as_deref(),
839 replay_video: replay.video.as_deref(),
840 relay_snuba_publish_disabled: true,
843 });
844 self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
845 });
846 }
847
848 fn handle_store_attachment(&self, message: Managed<StoreAttachment>) {
849 let scoping = message.scoping();
850 let _ = message.try_accept(|attachment| {
851 let result = self.produce_attachment(
852 attachment.event_id,
853 scoping.project_id,
854 scoping.organization_id,
855 &attachment.attachment,
856 true,
858 attachment.retention,
859 );
860 debug_assert!(!matches!(result, Ok(Some(_))));
863 result
864 });
865 }
866
867 fn handle_user_report(&self, message: Managed<StoreUserReport>) {
868 let scoping = message.scoping();
869 let received_at = message.received_at();
870
871 let _ = message.try_accept(|report| {
872 let kafka_msg = KafkaMessage::UserReport(UserReportKafkaMessage {
873 project_id: scoping.project_id,
874 event_id: report.event_id,
875 start_time: safe_timestamp(received_at),
876 payload: report.report.payload(),
877 org_id: scoping.organization_id,
878 });
879 self.produce(KafkaTopic::Attachments, kafka_msg)
880 });
881 }
882
883 fn handle_profile(&self, message: Managed<StoreProfile>) {
884 let scoping = message.scoping();
885 let received_at = message.received_at();
886
887 let _ = message.try_accept(|profile| {
888 self.produce_profile(
889 scoping.organization_id,
890 scoping.project_id,
891 scoping.key_id,
892 received_at,
893 profile.retention_days,
894 &profile.profile,
895 )
896 });
897 }
898
899 fn create_metric_message<'a>(
900 &self,
901 organization_id: OrganizationId,
902 project_id: ProjectId,
903 encoder: &'a mut BucketEncoder,
904 namespace: MetricNamespace,
905 view: &BucketView<'a>,
906 retention_days: u16,
907 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
908 let value = match view.value() {
909 BucketViewValue::Counter(c) => MetricValue::Counter(c),
910 BucketViewValue::Distribution(data) => MetricValue::Distribution(
911 encoder
912 .encode_distribution(namespace, data)
913 .map_err(StoreError::EncodingFailed)?,
914 ),
915 BucketViewValue::Set(data) => MetricValue::Set(
916 encoder
917 .encode_set(namespace, data)
918 .map_err(StoreError::EncodingFailed)?,
919 ),
920 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
921 };
922
923 Ok(MetricKafkaMessage {
924 org_id: organization_id,
925 project_id,
926 name: view.name(),
927 value,
928 timestamp: view.timestamp(),
929 tags: view.tags(),
930 retention_days,
931 received_at: view.metadata().received_at,
932 })
933 }
934
935 fn produce(
936 &self,
937 topic: KafkaTopic,
938 message: KafkaMessage,
940 ) -> Result<(), StoreError> {
941 relay_log::trace!("Sending kafka message of type {}", message.variant());
942
943 let topic_name = self
944 .producer
945 .client
946 .send_message(topic, &message)
947 .inspect_err(|err| {
948 relay_log::error!(
949 error = err as &dyn Error,
950 tags.topic = ?topic,
951 tags.message = message.variant(),
952 "failed to produce to Kafka"
953 )
954 })?;
955
956 match &message {
957 KafkaMessage::Metric {
958 message: metric, ..
959 } => {
960 metric!(
961 counter(RelayCounters::ProcessingMessageProduced) += 1,
962 event_type = message.variant(),
963 topic = topic_name,
964 metric_type = metric.value.variant(),
965 metric_encoding = metric.value.encoding().unwrap_or(""),
966 );
967 }
968 KafkaMessage::ReplayRecordingNotChunked(replay) => {
969 let has_video = replay.replay_video.is_some();
970
971 metric!(
972 counter(RelayCounters::ProcessingMessageProduced) += 1,
973 event_type = message.variant(),
974 topic = topic_name,
975 has_video = bool_to_str(has_video),
976 );
977 }
978 message => {
979 metric!(
980 counter(RelayCounters::ProcessingMessageProduced) += 1,
981 event_type = message.variant(),
982 topic = topic_name,
983 );
984 }
985 }
986
987 Ok(())
988 }
989
990 fn chunked_attachment_from_placeholder(
991 &self,
992 item: &Item,
993 retention_days: u16,
994 ) -> Result<ChunkedAttachment, StoreError> {
995 debug_assert!(
996 item.stored_key().is_none(),
997 "AttachmentRef should not have been uploaded to objectstore"
998 );
999
1000 let payload = item.payload();
1001 let placeholder: AttachmentPlaceholder<'_> =
1002 serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?;
1003 let location = SignedLocation::try_from_str(placeholder.location)
1004 .ok_or(StoreError::InvalidAttachmentRef)?
1005 .verify(Utc::now(), &self.config)
1006 .map_err(|_| StoreError::InvalidAttachmentRef)?;
1007
1008 let store_key = location.key;
1009
1010 Ok(ChunkedAttachment {
1011 id: Uuid::new_v4().to_string(),
1012 name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(),
1013 rate_limited: item.rate_limited(),
1014 content_type: placeholder.content_type.map(|c| c.as_str().to_owned()),
1015 attachment_type: item.attachment_type().unwrap_or_default(),
1016 size: item.attachment_body_size(),
1017 retention_days,
1018 payload: AttachmentPayload::Stored(store_key),
1019 })
1020 }
1021
1022 fn chunked_attachment_from_attachment(
1023 &self,
1024 event_id: EventId,
1025 project_id: ProjectId,
1026 org_id: OrganizationId,
1027 item: &Item,
1028 send_individual_attachments: bool,
1029 retention_days: u16,
1030 ) -> Result<ChunkedAttachment, StoreError> {
1031 let id = Uuid::new_v4().to_string();
1032
1033 let payload = item.payload();
1034 let size = item.len();
1035 let max_chunk_size = self.config.attachment_chunk_size();
1036
1037 let payload = if size == 0 {
1038 AttachmentPayload::Chunked(0)
1039 } else if let Some(stored_key) = item.stored_key() {
1040 AttachmentPayload::Stored(stored_key.into())
1041 } else if send_individual_attachments && size < max_chunk_size {
1042 AttachmentPayload::Inline(payload)
1046 } else {
1047 let mut chunk_index = 0;
1048 let mut offset = 0;
1049 while offset < size {
1052 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
1053 let chunk_message = AttachmentChunkKafkaMessage {
1054 payload: payload.slice(offset..offset + chunk_size),
1055 event_id,
1056 project_id,
1057 id: id.clone(),
1058 chunk_index,
1059 org_id,
1060 };
1061
1062 self.produce(
1063 KafkaTopic::Attachments,
1064 KafkaMessage::AttachmentChunk(chunk_message),
1065 )?;
1066 offset += chunk_size;
1067 chunk_index += 1;
1068 }
1069
1070 AttachmentPayload::Chunked(chunk_index)
1073 };
1074
1075 Ok(ChunkedAttachment {
1076 id,
1077 name: match item.filename() {
1078 Some(name) => name.to_owned(),
1079 None => UNNAMED_ATTACHMENT.to_owned(),
1080 },
1081 rate_limited: item.rate_limited(),
1082 content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
1083 attachment_type: item.attachment_type().unwrap_or_default(),
1084 size,
1085 retention_days,
1086 payload,
1087 })
1088 }
1089
1090 fn produce_attachment(
1102 &self,
1103 event_id: EventId,
1104 project_id: ProjectId,
1105 org_id: OrganizationId,
1106 item: &Item,
1107 send_individual_attachments: bool,
1108 retention_days: u16,
1109 ) -> Result<Option<ChunkedAttachment>, StoreError> {
1110 let attachment = if item.is_attachment_ref() {
1111 self.chunked_attachment_from_placeholder(item, retention_days)
1112 } else {
1113 self.chunked_attachment_from_attachment(
1114 event_id,
1115 project_id,
1116 org_id,
1117 item,
1118 send_individual_attachments,
1119 retention_days,
1120 )
1121 }?;
1122
1123 if send_individual_attachments {
1124 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
1125 event_id,
1126 project_id,
1127 attachment,
1128 org_id,
1129 });
1130 self.produce(KafkaTopic::Attachments, message)?;
1131 Ok(None)
1132 } else {
1133 Ok(Some(attachment))
1134 }
1135 }
1136
1137 fn produce_user_report(
1138 &self,
1139 event_id: EventId,
1140 project_id: ProjectId,
1141 org_id: OrganizationId,
1142 received_at: DateTime<Utc>,
1143 item: &Item,
1144 ) -> Result<(), StoreError> {
1145 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
1146 project_id,
1147 event_id,
1148 start_time: safe_timestamp(received_at),
1149 payload: item.payload(),
1150 org_id,
1151 });
1152
1153 self.produce(KafkaTopic::Attachments, message)
1154 }
1155
1156 fn produce_user_report_v2(
1157 &self,
1158 event_id: EventId,
1159 project_id: ProjectId,
1160 org_id: OrganizationId,
1161 received_at: DateTime<Utc>,
1162 item: &Item,
1163 remote_addr: Option<String>,
1164 ) -> Result<(), StoreError> {
1165 let message = KafkaMessage::Event(EventKafkaMessage {
1166 project_id,
1167 event_id,
1168 payload: item.payload(),
1169 start_time: safe_timestamp(received_at),
1170 remote_addr,
1171 attachments: vec![],
1172 org_id,
1173 });
1174 self.produce(KafkaTopic::Feedback, message)
1175 }
1176
1177 fn send_metric_message(
1178 &self,
1179 namespace: MetricNamespace,
1180 message: MetricKafkaMessage,
1181 ) -> Result<(), StoreError> {
1182 let topic = match namespace {
1183 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1184 MetricNamespace::Unsupported => {
1185 relay_log::with_scope(
1186 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
1187 || relay_log::error!("store service dropping unknown metric usecase"),
1188 );
1189 return Ok(());
1190 }
1191 _ => KafkaTopic::MetricsGeneric,
1192 };
1193
1194 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1195 self.produce(topic, KafkaMessage::Metric { headers, message })?;
1196 Ok(())
1197 }
1198
1199 fn produce_profile(
1200 &self,
1201 organization_id: OrganizationId,
1202 project_id: ProjectId,
1203 key_id: Option<u64>,
1204 received_at: DateTime<Utc>,
1205 retention_days: u16,
1206 item: &Item,
1207 ) -> Result<(), StoreError> {
1208 let message = ProfileKafkaMessage {
1209 organization_id,
1210 project_id,
1211 key_id,
1212 received: safe_timestamp(received_at),
1213 retention_days,
1214 headers: BTreeMap::from([
1215 (
1216 "sampled".to_owned(),
1217 if item.sampled() { "true" } else { "false" }.to_owned(),
1218 ),
1219 ("project_id".to_owned(), project_id.to_string()),
1220 ]),
1221 payload: item.payload(),
1222 };
1223 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1224 Ok(())
1225 }
1226
1227 fn produce_check_in(
1228 &self,
1229 project_id: ProjectId,
1230 org_id: OrganizationId,
1231 received_at: DateTime<Utc>,
1232 client: Option<&str>,
1233 retention_days: u16,
1234 item: &Item,
1235 ) -> Result<(), StoreError> {
1236 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1237 message_type: CheckInMessageType::CheckIn,
1238 project_id,
1239 retention_days,
1240 start_time: safe_timestamp(received_at),
1241 sdk: client.map(str::to_owned),
1242 payload: item.payload(),
1243 routing_key_hint: item.routing_hint(),
1244 org_id,
1245 });
1246
1247 self.produce(KafkaTopic::Monitors, message)?;
1248
1249 Ok(())
1250 }
1251
1252 fn produce_span(
1253 &self,
1254 scoping: Scoping,
1255 received_at: DateTime<Utc>,
1256 event_id: Option<EventId>,
1257 retention_days: u16,
1258 downsampled_retention_days: u16,
1259 item: &Item,
1260 ) -> Result<(), StoreError> {
1261 debug_assert_eq!(item.ty(), &ItemType::Span);
1262 debug_assert_eq!(item.content_type(), Some(ContentType::Json));
1263
1264 let Scoping {
1265 organization_id,
1266 project_id,
1267 project_key: _,
1268 key_id,
1269 } = scoping;
1270
1271 let relay_emits_accepted_outcome = !utils::is_rolled_out(
1272 scoping.organization_id.value(),
1273 self.global_config
1274 .current()
1275 .options
1276 .eap_span_outcomes_rollout_rate,
1277 )
1278 .is_keep();
1279
1280 let payload = item.payload();
1281 let message = SpanKafkaMessageRaw {
1282 meta: SpanMeta {
1283 organization_id,
1284 project_id,
1285 key_id,
1286 event_id,
1287 retention_days,
1288 downsampled_retention_days,
1289 received: datetime_to_timestamp(received_at),
1290 accepted_outcome_emitted: relay_emits_accepted_outcome,
1291 },
1292 span: serde_json::from_slice(&payload)
1293 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1294 };
1295
1296 debug_assert!(message.span.contains_key("attributes"));
1298 relay_statsd::metric!(
1299 counter(RelayCounters::SpanV2Produced) += 1,
1300 via = "envelope"
1301 );
1302
1303 self.produce(
1304 KafkaTopic::Spans,
1305 KafkaMessage::SpanRaw {
1306 routing_key: item.routing_hint(),
1307 headers: BTreeMap::from([(
1308 "project_id".to_owned(),
1309 scoping.project_id.to_string(),
1310 )]),
1311 message,
1312 org_id: organization_id,
1313 },
1314 )?;
1315
1316 if relay_emits_accepted_outcome {
1317 self.outcome_aggregator.send(TrackOutcome {
1320 category: DataCategory::SpanIndexed,
1321 event_id: None,
1322 outcome: Outcome::Accepted,
1323 quantity: 1,
1324 remote_addr: None,
1325 scoping,
1326 timestamp: received_at,
1327 });
1328 }
1329
1330 Ok(())
1331 }
1332}
1333
1334impl Service for StoreService {
1335 type Interface = Store;
1336
1337 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1338 let this = Arc::new(self);
1339
1340 relay_log::info!("store forwarder started");
1341
1342 while let Some(message) = rx.recv().await {
1343 let service = Arc::clone(&this);
1344 this.pool
1347 .spawn_async(async move { service.handle_message(message) }.boxed())
1348 .await;
1349 }
1350
1351 relay_log::info!("store forwarder stopped");
1352 }
1353}
1354
1355#[derive(Debug, Serialize)]
1357enum AttachmentPayload {
1358 #[serde(rename = "chunks")]
1363 Chunked(usize),
1364
1365 #[serde(rename = "data")]
1367 Inline(Bytes),
1368
1369 #[serde(rename = "stored_id")]
1371 Stored(String),
1372}
1373
1374#[derive(Debug, Serialize)]
1376struct ChunkedAttachment {
1377 id: String,
1381
1382 name: String,
1384
1385 rate_limited: bool,
1392
1393 #[serde(skip_serializing_if = "Option::is_none")]
1395 content_type: Option<String>,
1396
1397 #[serde(serialize_with = "serialize_attachment_type")]
1399 attachment_type: AttachmentType,
1400
1401 size: usize,
1403
1404 retention_days: u16,
1406
1407 #[serde(flatten)]
1409 payload: AttachmentPayload,
1410}
1411
1412fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1418where
1419 S: serde::Serializer,
1420 T: serde::Serialize,
1421{
1422 serde_json::to_value(t)
1423 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1424 .serialize(serializer)
1425}
1426
1427#[derive(Debug, Serialize)]
1429struct EventKafkaMessage {
1430 payload: Bytes,
1432 start_time: u64,
1434 event_id: EventId,
1436 project_id: ProjectId,
1438 remote_addr: Option<String>,
1440 attachments: Vec<ChunkedAttachment>,
1442
1443 #[serde(skip)]
1445 org_id: OrganizationId,
1446}
1447
1448#[derive(Debug, Serialize)]
1450struct AttachmentChunkKafkaMessage {
1451 payload: Bytes,
1453 event_id: EventId,
1455 project_id: ProjectId,
1457 id: String,
1461 chunk_index: usize,
1463
1464 #[serde(skip)]
1466 org_id: OrganizationId,
1467}
1468
1469#[derive(Debug, Serialize)]
1474struct AttachmentKafkaMessage {
1475 event_id: EventId,
1477 project_id: ProjectId,
1479 attachment: ChunkedAttachment,
1481
1482 #[serde(skip)]
1484 org_id: OrganizationId,
1485}
1486
1487#[derive(Debug, Serialize)]
1488struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1489 replay_id: EventId,
1490 key_id: Option<u64>,
1491 org_id: OrganizationId,
1492 project_id: ProjectId,
1493 received: u64,
1494 retention_days: u16,
1495 #[serde(with = "serde_bytes")]
1496 payload: &'a [u8],
1497 #[serde(with = "serde_bytes")]
1498 replay_event: Option<&'a [u8]>,
1499 #[serde(with = "serde_bytes")]
1500 replay_video: Option<&'a [u8]>,
1501 relay_snuba_publish_disabled: bool,
1502}
1503
1504#[derive(Debug, Serialize)]
1508struct UserReportKafkaMessage {
1509 project_id: ProjectId,
1511 start_time: u64,
1512 payload: Bytes,
1513
1514 #[serde(skip)]
1516 event_id: EventId,
1517 #[serde(skip)]
1519 org_id: OrganizationId,
1520}
1521
1522#[derive(Clone, Debug, Serialize)]
1523struct MetricKafkaMessage<'a> {
1524 org_id: OrganizationId,
1525 project_id: ProjectId,
1526 name: &'a MetricName,
1527 #[serde(flatten)]
1528 value: MetricValue<'a>,
1529 timestamp: UnixTimestamp,
1530 tags: &'a BTreeMap<String, String>,
1531 retention_days: u16,
1532 #[serde(skip_serializing_if = "Option::is_none")]
1533 received_at: Option<UnixTimestamp>,
1534}
1535
1536#[derive(Clone, Debug, Serialize)]
1537#[serde(tag = "type", content = "value")]
1538enum MetricValue<'a> {
1539 #[serde(rename = "c")]
1540 Counter(FiniteF64),
1541 #[serde(rename = "d")]
1542 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1543 #[serde(rename = "s")]
1544 Set(ArrayEncoding<'a, SetView<'a>>),
1545 #[serde(rename = "g")]
1546 Gauge(GaugeValue),
1547}
1548
1549impl MetricValue<'_> {
1550 fn variant(&self) -> &'static str {
1551 match self {
1552 Self::Counter(_) => "counter",
1553 Self::Distribution(_) => "distribution",
1554 Self::Set(_) => "set",
1555 Self::Gauge(_) => "gauge",
1556 }
1557 }
1558
1559 fn encoding(&self) -> Option<&'static str> {
1560 match self {
1561 Self::Distribution(ae) => Some(ae.name()),
1562 Self::Set(ae) => Some(ae.name()),
1563 _ => None,
1564 }
1565 }
1566}
1567
1568#[derive(Clone, Debug, Serialize)]
1569struct ProfileKafkaMessage {
1570 organization_id: OrganizationId,
1571 project_id: ProjectId,
1572 key_id: Option<u64>,
1573 received: u64,
1574 retention_days: u16,
1575 #[serde(skip)]
1576 headers: BTreeMap<String, String>,
1577 payload: Bytes,
1578}
1579
1580#[allow(dead_code)]
1586#[derive(Debug, Serialize)]
1587#[serde(rename_all = "snake_case")]
1588enum CheckInMessageType {
1589 ClockPulse,
1590 CheckIn,
1591}
1592
1593#[derive(Debug, Serialize)]
1594struct CheckInKafkaMessage {
1595 message_type: CheckInMessageType,
1597 payload: Bytes,
1599 start_time: u64,
1601 sdk: Option<String>,
1603 project_id: ProjectId,
1605 retention_days: u16,
1607
1608 #[serde(skip)]
1610 routing_key_hint: Option<Uuid>,
1611 #[serde(skip)]
1613 org_id: OrganizationId,
1614}
1615
1616#[derive(Debug, Serialize)]
1617struct SpanKafkaMessageRaw<'a> {
1618 #[serde(flatten)]
1619 meta: SpanMeta,
1620 #[serde(flatten)]
1621 span: BTreeMap<&'a str, &'a RawValue>,
1622}
1623
1624#[derive(Debug, Serialize)]
1625struct SpanKafkaMessage<'a> {
1626 #[serde(flatten)]
1627 meta: SpanMeta,
1628 #[serde(flatten)]
1629 span: SerializableAnnotated<'a, SpanV2>,
1630}
1631
1632#[derive(Debug, Serialize)]
1633struct SpanMeta {
1634 organization_id: OrganizationId,
1635 project_id: ProjectId,
1636 #[serde(skip_serializing_if = "Option::is_none")]
1638 key_id: Option<u64>,
1639 #[serde(skip_serializing_if = "Option::is_none")]
1640 event_id: Option<EventId>,
1641 received: f64,
1643 retention_days: u16,
1645 downsampled_retention_days: u16,
1647 accepted_outcome_emitted: bool,
1649}
1650
1651#[derive(Clone, Debug, Serialize)]
1652struct ProfileChunkKafkaMessage {
1653 organization_id: OrganizationId,
1654 project_id: ProjectId,
1655 received: u64,
1656 retention_days: u16,
1657 #[serde(skip)]
1658 headers: BTreeMap<String, String>,
1659 payload: Bytes,
1660}
1661
1662#[derive(Debug, Serialize)]
1664#[serde(tag = "type", rename_all = "snake_case")]
1665#[allow(clippy::large_enum_variant)]
1666enum KafkaMessage<'a> {
1667 Event(EventKafkaMessage),
1668 UserReport(UserReportKafkaMessage),
1669 Metric {
1670 #[serde(skip)]
1671 headers: BTreeMap<String, String>,
1672 #[serde(flatten)]
1673 message: MetricKafkaMessage<'a>,
1674 },
1675 CheckIn(CheckInKafkaMessage),
1676 Item {
1677 #[serde(skip)]
1678 headers: BTreeMap<String, String>,
1679 #[serde(skip)]
1680 item_type: TraceItemType,
1681 #[serde(skip)]
1682 message: TraceItem,
1683 },
1684 SpanRaw {
1685 #[serde(skip)]
1686 routing_key: Option<Uuid>,
1687 #[serde(skip)]
1688 headers: BTreeMap<String, String>,
1689 #[serde(flatten)]
1690 message: SpanKafkaMessageRaw<'a>,
1691
1692 #[serde(skip)]
1694 org_id: OrganizationId,
1695 },
1696 SpanV2 {
1697 #[serde(skip)]
1698 routing_key: Option<Uuid>,
1699 #[serde(skip)]
1700 headers: BTreeMap<String, String>,
1701 #[serde(flatten)]
1702 message: SpanKafkaMessage<'a>,
1703
1704 #[serde(skip)]
1706 org_id: OrganizationId,
1707 },
1708
1709 Attachment(AttachmentKafkaMessage),
1710 AttachmentChunk(AttachmentChunkKafkaMessage),
1711
1712 Profile(ProfileKafkaMessage),
1713 ProfileChunk(ProfileChunkKafkaMessage),
1714
1715 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1716}
1717
1718impl KafkaMessage<'_> {
1719 fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1721 let item_type = item.item_type();
1722 KafkaMessage::Item {
1723 headers: BTreeMap::from([
1724 ("project_id".to_owned(), scoping.project_id.to_string()),
1725 ("item_type".to_owned(), (item_type as i32).to_string()),
1726 ]),
1727 message: item,
1728 item_type,
1729 }
1730 }
1731}
1732
1733impl Message for KafkaMessage<'_> {
1734 fn variant(&self) -> &'static str {
1735 match self {
1736 KafkaMessage::Event(_) => "event",
1737 KafkaMessage::UserReport(_) => "user_report",
1738 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1739 MetricNamespace::Sessions => "metric_sessions",
1740 MetricNamespace::Transactions => "metric_transactions",
1741 MetricNamespace::Spans => "metric_spans",
1742 MetricNamespace::Custom => "metric_custom",
1743 MetricNamespace::Unsupported => "metric_unsupported",
1744 },
1745 KafkaMessage::CheckIn(_) => "check_in",
1746 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1747 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1748
1749 KafkaMessage::Attachment(_) => "attachment",
1750 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1751
1752 KafkaMessage::Profile(_) => "profile",
1753 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1754
1755 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1756 }
1757 }
1758
1759 fn key(&self) -> Option<relay_kafka::Key> {
1761 match self {
1762 Self::Event(message) => Some((message.event_id.0, message.org_id)),
1763 Self::UserReport(message) => Some((message.event_id.0, message.org_id)),
1764 Self::SpanRaw {
1765 routing_key,
1766 org_id,
1767 ..
1768 }
1769 | Self::SpanV2 {
1770 routing_key,
1771 org_id,
1772 ..
1773 } => routing_key.map(|r| (r, *org_id)),
1774
1775 Self::CheckIn(message) => message.routing_key_hint.map(|r| (r, message.org_id)),
1780
1781 Self::Attachment(message) => Some((message.event_id.0, message.org_id)),
1782 Self::AttachmentChunk(message) => Some((message.event_id.0, message.org_id)),
1783
1784 Self::Metric { .. }
1786 | Self::Item { .. }
1787 | Self::Profile(_)
1788 | Self::ProfileChunk(_)
1789 | Self::ReplayRecordingNotChunked(_) => None,
1790 }
1791 .filter(|(uuid, _)| !uuid.is_nil())
1792 .map(|(uuid, org_id)| {
1793 let mut res = uuid.into_bytes();
1796 for (i, &b) in org_id.value().to_be_bytes().iter().enumerate() {
1797 res[i] ^= b;
1798 }
1799 u128::from_be_bytes(res)
1800 })
1801 }
1802
1803 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1804 match &self {
1805 KafkaMessage::Metric { headers, .. }
1806 | KafkaMessage::SpanRaw { headers, .. }
1807 | KafkaMessage::SpanV2 { headers, .. }
1808 | KafkaMessage::Item { headers, .. }
1809 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1810 | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1811
1812 KafkaMessage::Event(_)
1813 | KafkaMessage::UserReport(_)
1814 | KafkaMessage::CheckIn(_)
1815 | KafkaMessage::Attachment(_)
1816 | KafkaMessage::AttachmentChunk(_)
1817 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1818 }
1819 }
1820
1821 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1822 match self {
1823 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1824 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1825 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1826 KafkaMessage::Item { message, .. } => {
1827 let mut payload = Vec::new();
1828 match message.encode(&mut payload) {
1829 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1830 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1831 }
1832 }
1833 KafkaMessage::Event(_)
1834 | KafkaMessage::UserReport(_)
1835 | KafkaMessage::CheckIn(_)
1836 | KafkaMessage::Attachment(_)
1837 | KafkaMessage::AttachmentChunk(_)
1838 | KafkaMessage::Profile(_)
1839 | KafkaMessage::ProfileChunk(_)
1840 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1841 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1842 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1843 },
1844 }
1845 }
1846}
1847
1848fn serialize_as_json<T: serde::Serialize>(
1849 value: &T,
1850) -> Result<SerializationOutput<'_>, ClientError> {
1851 match serde_json::to_vec(value) {
1852 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1853 Err(err) => Err(ClientError::InvalidJson(err)),
1854 }
1855}
1856
1857fn is_slow_item(item: &Item) -> bool {
1861 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1862}
1863
1864fn bool_to_str(value: bool) -> &'static str {
1865 if value { "true" } else { "false" }
1866}
1867
1868fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1872 let ts = timestamp.timestamp();
1873 if ts >= 0 {
1874 return ts as u64;
1875 }
1876
1877 Utc::now().timestamp() as u64
1879}
1880
1881#[cfg(test)]
1882mod tests {
1883
1884 use super::*;
1885
1886 #[test]
1887 fn disallow_outcomes() {
1888 let config = Config::default();
1889 let producer = Producer::create(&config).unwrap();
1890
1891 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1892 let res = producer
1893 .client
1894 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1895
1896 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1897 }
1898 }
1899}