1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use prost_types::Timestamp;
15use sentry_protos::snuba::v1::any_value::Value;
16use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType};
17use serde::ser::SerializeMap;
18use serde::{Deserialize, Serialize};
19use serde_json::value::RawValue;
20use serde_json::{Deserializer, Value as JsonValue};
21use uuid::Uuid;
22
23use relay_base_schema::data_category::DataCategory;
24use relay_base_schema::organization::OrganizationId;
25use relay_base_schema::project::ProjectId;
26use relay_common::time::UnixTimestamp;
27use relay_config::Config;
28use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};
29use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
30use relay_metrics::{
31 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
32 MetricNamespace, SetView,
33};
34use relay_protocol::FiniteF64;
35use relay_quotas::Scoping;
36use relay_statsd::metric;
37use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
38use relay_threading::AsyncPool;
39
40use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
41use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope};
42use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
43use crate::service::ServiceError;
44use crate::services::global_config::GlobalConfigHandle;
45use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
46use crate::services::processor::Processed;
47use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
48use crate::utils::{self, FormDataIter, PickResult};
49
50const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
52
53#[derive(Debug, thiserror::Error)]
54pub enum StoreError {
55 #[error("failed to send the message to kafka: {0}")]
56 SendFailed(#[from] ClientError),
57 #[error("failed to encode data: {0}")]
58 EncodingFailed(std::io::Error),
59 #[error("failed to store event because event id was missing")]
60 NoEventId,
61}
62
63impl OutcomeError for StoreError {
64 type Error = Self;
65
66 fn consume(self) -> (Option<Outcome>, Self::Error) {
67 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
68 }
69}
70
71struct Producer {
72 client: KafkaClient,
73}
74
75impl Producer {
76 pub fn create(config: &Config) -> anyhow::Result<Self> {
77 let mut client_builder = KafkaClient::builder();
78
79 for topic in KafkaTopic::iter().filter(|t| {
80 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
83 }) {
84 let kafka_configs = config.kafka_configs(*topic)?;
85 client_builder = client_builder
86 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
87 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
88 }
89
90 Ok(Self {
91 client: client_builder.build(),
92 })
93 }
94}
95
96#[derive(Debug)]
98pub struct StoreEnvelope {
99 pub envelope: TypedEnvelope<Processed>,
100}
101
102#[derive(Clone, Debug)]
104pub struct StoreMetrics {
105 pub buckets: Vec<Bucket>,
106 pub scoping: Scoping,
107 pub retention: u16,
108}
109
110#[derive(Debug)]
112pub struct StoreTraceItem {
113 pub trace_item: TraceItem,
115 pub quantities: Quantities,
120}
121
122impl Counted for StoreTraceItem {
123 fn quantities(&self) -> Quantities {
124 self.quantities.clone()
125 }
126}
127
128pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
130
131#[derive(Debug)]
133pub enum Store {
134 Envelope(StoreEnvelope),
142 Metrics(StoreMetrics),
144 TraceItem(Managed<StoreTraceItem>),
146}
147
148impl Store {
149 fn variant(&self) -> &'static str {
151 match self {
152 Store::Envelope(_) => "envelope",
153 Store::Metrics(_) => "metrics",
154 Store::TraceItem(_) => "log",
155 }
156 }
157}
158
159impl Interface for Store {}
160
161impl FromMessage<StoreEnvelope> for Store {
162 type Response = NoResponse;
163
164 fn from_message(message: StoreEnvelope, _: ()) -> Self {
165 Self::Envelope(message)
166 }
167}
168
169impl FromMessage<StoreMetrics> for Store {
170 type Response = NoResponse;
171
172 fn from_message(message: StoreMetrics, _: ()) -> Self {
173 Self::Metrics(message)
174 }
175}
176
177impl FromMessage<Managed<StoreTraceItem>> for Store {
178 type Response = NoResponse;
179
180 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
181 Self::TraceItem(message)
182 }
183}
184
185pub struct StoreService {
187 pool: StoreServicePool,
188 config: Arc<Config>,
189 global_config: GlobalConfigHandle,
190 outcome_aggregator: Addr<TrackOutcome>,
191 metric_outcomes: MetricOutcomes,
192 producer: Producer,
193}
194
195impl StoreService {
196 pub fn create(
197 pool: StoreServicePool,
198 config: Arc<Config>,
199 global_config: GlobalConfigHandle,
200 outcome_aggregator: Addr<TrackOutcome>,
201 metric_outcomes: MetricOutcomes,
202 ) -> anyhow::Result<Self> {
203 let producer = Producer::create(&config)?;
204 Ok(Self {
205 pool,
206 config,
207 global_config,
208 outcome_aggregator,
209 metric_outcomes,
210 producer,
211 })
212 }
213
214 fn handle_message(&self, message: Store) {
215 let ty = message.variant();
216 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
217 match message {
218 Store::Envelope(message) => self.handle_store_envelope(message),
219 Store::Metrics(message) => self.handle_store_metrics(message),
220 Store::TraceItem(message) => self.handle_store_trace_item(message),
221 }
222 })
223 }
224
225 fn handle_store_envelope(&self, message: StoreEnvelope) {
226 let StoreEnvelope {
227 envelope: mut managed,
228 } = message;
229
230 let scoping = managed.scoping();
231 let envelope = managed.take_envelope();
232
233 match self.store_envelope(envelope, managed.received_at(), scoping) {
234 Ok(()) => managed.accept(),
235 Err(error) => {
236 managed.reject(Outcome::Invalid(DiscardReason::Internal));
237 relay_log::error!(
238 error = &error as &dyn Error,
239 tags.project_key = %scoping.project_key,
240 "failed to store envelope"
241 );
242 }
243 }
244 }
245
246 fn store_envelope(
247 &self,
248 mut envelope: Box<Envelope>,
249 received_at: DateTime<Utc>,
250 scoping: Scoping,
251 ) -> Result<(), StoreError> {
252 let retention = envelope.retention();
253 let downsampled_retention = envelope.downsampled_retention();
254
255 let event_id = envelope.event_id();
256 let event_item = envelope.as_mut().take_item_by(|item| {
257 matches!(
258 item.ty(),
259 ItemType::Event | ItemType::Transaction | ItemType::Security
260 )
261 });
262 let event_type = event_item.as_ref().map(|item| item.ty());
263
264 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
268 KafkaTopic::Transactions
269 } else if envelope.get_item_by(is_slow_item).is_some() {
270 KafkaTopic::Attachments
271 } else {
272 KafkaTopic::Events
273 };
274
275 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
276
277 let mut attachments = Vec::new();
278 let mut replay_event = None;
279 let mut replay_recording = None;
280
281 let replay_relay_snuba_publish_disabled = utils::sample(
283 self.global_config
284 .current()
285 .options
286 .replay_relay_snuba_publish_disabled_sample_rate,
287 )
288 .is_keep();
289
290 for item in envelope.items() {
291 match item.ty() {
292 ItemType::Attachment => {
293 if let Some(attachment) = self.produce_attachment(
294 event_id.ok_or(StoreError::NoEventId)?,
295 scoping.project_id,
296 item,
297 send_individual_attachments,
298 )? {
299 attachments.push(attachment);
300 }
301 }
302 ItemType::UserReport => {
303 debug_assert!(event_topic == KafkaTopic::Attachments);
304 self.produce_user_report(
305 event_id.ok_or(StoreError::NoEventId)?,
306 scoping.project_id,
307 received_at,
308 item,
309 )?;
310 }
311 ItemType::UserReportV2 => {
312 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
313 self.produce_user_report_v2(
314 event_id.ok_or(StoreError::NoEventId)?,
315 scoping.project_id,
316 received_at,
317 item,
318 remote_addr,
319 )?;
320 }
321 ItemType::Profile => self.produce_profile(
322 scoping.organization_id,
323 scoping.project_id,
324 scoping.key_id,
325 received_at,
326 retention,
327 item,
328 )?,
329 ItemType::ReplayVideo => {
330 self.produce_replay_video(
331 event_id,
332 scoping,
333 item.payload(),
334 received_at,
335 retention,
336 replay_relay_snuba_publish_disabled,
337 )?;
338 }
339 ItemType::ReplayRecording => {
340 replay_recording = Some(item);
341 }
342 ItemType::ReplayEvent => {
343 replay_event = Some(item);
344 self.produce_replay_event(
345 event_id.ok_or(StoreError::NoEventId)?,
346 scoping.project_id,
347 received_at,
348 retention,
349 &item.payload(),
350 replay_relay_snuba_publish_disabled,
351 )?;
352 }
353 ItemType::CheckIn => {
354 let client = envelope.meta().client();
355 self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
356 }
357 ItemType::Span => self.produce_span(
358 scoping,
359 received_at,
360 event_id,
361 retention,
362 downsampled_retention,
363 item,
364 )?,
365 ty @ ItemType::Log => {
366 debug_assert!(
367 false,
368 "received {ty} through an envelope, \
369 this item must be submitted via a specific store message instead"
370 );
371 relay_log::error!(
372 tags.project_key = %scoping.project_key,
373 "StoreService received unsupported item type '{ty}' in envelope"
374 );
375 }
376 ItemType::ProfileChunk => self.produce_profile_chunk(
377 scoping.organization_id,
378 scoping.project_id,
379 received_at,
380 retention,
381 item,
382 )?,
383 other => {
384 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
385 let item_types = envelope
386 .items()
387 .map(|item| item.ty().as_str())
388 .collect::<Vec<_>>();
389 let attachment_types = envelope
390 .items()
391 .map(|item| {
392 item.attachment_type()
393 .map(|t| t.to_string())
394 .unwrap_or_default()
395 })
396 .collect::<Vec<_>>();
397
398 relay_log::with_scope(
399 |scope| {
400 scope.set_extra("item_types", item_types.into());
401 scope.set_extra("attachment_types", attachment_types.into());
402 if other == &ItemType::FormData {
403 let payload = item.payload();
404 let form_data_keys = FormDataIter::new(&payload)
405 .map(|entry| entry.key())
406 .collect::<Vec<_>>();
407 scope.set_extra("form_data_keys", form_data_keys.into());
408 }
409 },
410 || {
411 relay_log::error!(
412 tags.project_key = %scoping.project_key,
413 tags.event_type = event_type.unwrap_or("none"),
414 "StoreService received unexpected item type: {other}"
415 )
416 },
417 )
418 }
419 }
420 }
421
422 if let Some(recording) = replay_recording {
423 let replay_event = replay_event.map(|rv| rv.payload());
429 self.produce_replay_recording(
430 event_id,
431 scoping,
432 &recording.payload(),
433 replay_event.as_deref(),
434 None,
435 received_at,
436 retention,
437 replay_relay_snuba_publish_disabled,
438 )?;
439 }
440
441 if let Some(event_item) = event_item {
442 let event_id = event_id.ok_or(StoreError::NoEventId)?;
443 let project_id = scoping.project_id;
444 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
445
446 self.produce(
447 event_topic,
448 KafkaMessage::Event(EventKafkaMessage {
449 payload: event_item.payload(),
450 start_time: safe_timestamp(received_at),
451 event_id,
452 project_id,
453 remote_addr,
454 attachments,
455 }),
456 )?;
457 } else {
458 debug_assert!(attachments.is_empty());
459 }
460
461 Ok(())
462 }
463
464 fn handle_store_metrics(&self, message: StoreMetrics) {
465 let StoreMetrics {
466 buckets,
467 scoping,
468 retention,
469 } = message;
470
471 let batch_size = self.config.metrics_max_batch_size_bytes();
472 let mut error = None;
473
474 let global_config = self.global_config.current();
475 let mut encoder = BucketEncoder::new(&global_config);
476
477 let now = UnixTimestamp::now();
478 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
479
480 for mut bucket in buckets {
481 let namespace = encoder.prepare(&mut bucket);
482
483 if let Some(received_at) = bucket.metadata.received_at {
484 let delay = now.as_secs().saturating_sub(received_at.as_secs());
485 let (total, count, max) = delay_stats.get_mut(namespace);
486 *total += delay;
487 *count += 1;
488 *max = (*max).max(delay);
489 }
490
491 for view in BucketsView::new(std::slice::from_ref(&bucket))
495 .by_size(batch_size)
496 .flatten()
497 {
498 let message = self.create_metric_message(
499 scoping.organization_id,
500 scoping.project_id,
501 &mut encoder,
502 namespace,
503 &view,
504 retention,
505 );
506
507 let result =
508 message.and_then(|message| self.send_metric_message(namespace, message));
509
510 let outcome = match result {
511 Ok(()) => Outcome::Accepted,
512 Err(e) => {
513 error.get_or_insert(e);
514 Outcome::Invalid(DiscardReason::Internal)
515 }
516 };
517
518 self.metric_outcomes.track(scoping, &[view], outcome);
519 }
520 }
521
522 if let Some(error) = error {
523 relay_log::error!(
524 error = &error as &dyn std::error::Error,
525 "failed to produce metric buckets: {error}"
526 );
527 }
528
529 for (namespace, (total, count, max)) in delay_stats {
530 if count == 0 {
531 continue;
532 }
533 metric!(
534 counter(RelayCounters::MetricDelaySum) += total,
535 namespace = namespace.as_str()
536 );
537 metric!(
538 counter(RelayCounters::MetricDelayCount) += count,
539 namespace = namespace.as_str()
540 );
541 metric!(
542 gauge(RelayGauges::MetricDelayMax) = max,
543 namespace = namespace.as_str()
544 );
545 }
546 }
547
548 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
549 let scoping = message.scoping();
550 let received_at = message.received_at();
551
552 let quantities = message.try_accept(|item| {
553 let item_type = item.trace_item.item_type();
554 let message = KafkaMessage::Item {
555 headers: BTreeMap::from([
556 ("project_id".to_owned(), scoping.project_id.to_string()),
557 ("item_type".to_owned(), (item_type as i32).to_string()),
558 ]),
559 message: item.trace_item,
560 item_type,
561 };
562
563 self.produce(KafkaTopic::Items, message)
564 .map(|()| item.quantities)
565 });
566
567 if let Ok(quantities) = quantities {
572 for (category, quantity) in quantities {
573 self.outcome_aggregator.send(TrackOutcome {
574 category,
575 event_id: None,
576 outcome: Outcome::Accepted,
577 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
578 remote_addr: None,
579 scoping,
580 timestamp: received_at,
581 });
582 }
583 }
584 }
585
586 fn create_metric_message<'a>(
587 &self,
588 organization_id: OrganizationId,
589 project_id: ProjectId,
590 encoder: &'a mut BucketEncoder,
591 namespace: MetricNamespace,
592 view: &BucketView<'a>,
593 retention_days: u16,
594 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
595 let value = match view.value() {
596 BucketViewValue::Counter(c) => MetricValue::Counter(c),
597 BucketViewValue::Distribution(data) => MetricValue::Distribution(
598 encoder
599 .encode_distribution(namespace, data)
600 .map_err(StoreError::EncodingFailed)?,
601 ),
602 BucketViewValue::Set(data) => MetricValue::Set(
603 encoder
604 .encode_set(namespace, data)
605 .map_err(StoreError::EncodingFailed)?,
606 ),
607 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
608 };
609
610 Ok(MetricKafkaMessage {
611 org_id: organization_id,
612 project_id,
613 name: view.name(),
614 value,
615 timestamp: view.timestamp(),
616 tags: view.tags(),
617 retention_days,
618 received_at: view.metadata().received_at,
619 })
620 }
621
622 fn produce(
623 &self,
624 topic: KafkaTopic,
625 message: KafkaMessage,
627 ) -> Result<(), StoreError> {
628 relay_log::trace!("Sending kafka message of type {}", message.variant());
629
630 let topic_name = self.producer.client.send_message(topic, &message)?;
631
632 match &message {
633 KafkaMessage::Metric {
634 message: metric, ..
635 } => {
636 metric!(
637 counter(RelayCounters::ProcessingMessageProduced) += 1,
638 event_type = message.variant(),
639 topic = topic_name,
640 metric_type = metric.value.variant(),
641 metric_encoding = metric.value.encoding().unwrap_or(""),
642 );
643 }
644 KafkaMessage::Span { message: span, .. } => {
645 let is_segment = span.is_segment;
646 let has_parent = span.parent_span_id.is_some();
647 let platform = VALID_PLATFORMS.iter().find(|p| *p == &span.platform);
648
649 metric!(
650 counter(RelayCounters::ProcessingMessageProduced) += 1,
651 event_type = message.variant(),
652 topic = topic_name,
653 platform = platform.unwrap_or(&""),
654 is_segment = bool_to_str(is_segment),
655 has_parent = bool_to_str(has_parent),
656 topic = topic_name,
657 );
658 }
659 KafkaMessage::ReplayRecordingNotChunked(replay) => {
660 let has_video = replay.replay_video.is_some();
661
662 metric!(
663 counter(RelayCounters::ProcessingMessageProduced) += 1,
664 event_type = message.variant(),
665 topic = topic_name,
666 has_video = bool_to_str(has_video),
667 );
668 }
669 message => {
670 metric!(
671 counter(RelayCounters::ProcessingMessageProduced) += 1,
672 event_type = message.variant(),
673 topic = topic_name,
674 );
675 }
676 }
677
678 Ok(())
679 }
680
681 fn produce_attachment(
693 &self,
694 event_id: EventId,
695 project_id: ProjectId,
696 item: &Item,
697 send_individual_attachments: bool,
698 ) -> Result<Option<ChunkedAttachment>, StoreError> {
699 let id = Uuid::new_v4().to_string();
700
701 let payload = item.payload();
702 let size = item.len();
703 let max_chunk_size = self.config.attachment_chunk_size();
704
705 let payload = if size == 0 {
709 AttachmentPayload::Chunked(0)
710 } else if send_individual_attachments && size < max_chunk_size {
711 AttachmentPayload::Inline(payload)
712 } else {
713 let mut chunk_index = 0;
714 let mut offset = 0;
715 while offset < size {
718 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
719 let chunk_message = AttachmentChunkKafkaMessage {
720 payload: payload.slice(offset..offset + chunk_size),
721 event_id,
722 project_id,
723 id: id.clone(),
724 chunk_index,
725 };
726
727 self.produce(
728 KafkaTopic::Attachments,
729 KafkaMessage::AttachmentChunk(chunk_message),
730 )?;
731 offset += chunk_size;
732 chunk_index += 1;
733 }
734
735 AttachmentPayload::Chunked(chunk_index)
738 };
739
740 let attachment = ChunkedAttachment {
741 id,
742 name: match item.filename() {
743 Some(name) => name.to_owned(),
744 None => UNNAMED_ATTACHMENT.to_owned(),
745 },
746 rate_limited: item.rate_limited(),
747 content_type: item
748 .content_type()
749 .map(|content_type| content_type.as_str().to_owned()),
750 attachment_type: item.attachment_type().cloned().unwrap_or_default(),
751 size,
752 payload,
753 };
754
755 if send_individual_attachments {
756 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
757 event_id,
758 project_id,
759 attachment,
760 });
761 self.produce(KafkaTopic::Attachments, message)?;
762 Ok(None)
763 } else {
764 Ok(Some(attachment))
765 }
766 }
767
768 fn produce_user_report(
769 &self,
770 event_id: EventId,
771 project_id: ProjectId,
772 received_at: DateTime<Utc>,
773 item: &Item,
774 ) -> Result<(), StoreError> {
775 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
776 project_id,
777 event_id,
778 start_time: safe_timestamp(received_at),
779 payload: item.payload(),
780 });
781
782 self.produce(KafkaTopic::Attachments, message)
783 }
784
785 fn produce_user_report_v2(
786 &self,
787 event_id: EventId,
788 project_id: ProjectId,
789 received_at: DateTime<Utc>,
790 item: &Item,
791 remote_addr: Option<String>,
792 ) -> Result<(), StoreError> {
793 let message = KafkaMessage::Event(EventKafkaMessage {
794 project_id,
795 event_id,
796 payload: item.payload(),
797 start_time: safe_timestamp(received_at),
798 remote_addr,
799 attachments: vec![],
800 });
801 self.produce(KafkaTopic::Feedback, message)
802 }
803
804 fn send_metric_message(
805 &self,
806 namespace: MetricNamespace,
807 message: MetricKafkaMessage,
808 ) -> Result<(), StoreError> {
809 let topic = match namespace {
810 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
811 MetricNamespace::Unsupported => {
812 relay_log::with_scope(
813 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
814 || relay_log::error!("store service dropping unknown metric usecase"),
815 );
816 return Ok(());
817 }
818 _ => KafkaTopic::MetricsGeneric,
819 };
820 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
821
822 self.produce(topic, KafkaMessage::Metric { headers, message })?;
823 Ok(())
824 }
825
826 fn produce_profile(
827 &self,
828 organization_id: OrganizationId,
829 project_id: ProjectId,
830 key_id: Option<u64>,
831 received_at: DateTime<Utc>,
832 retention_days: u16,
833 item: &Item,
834 ) -> Result<(), StoreError> {
835 let message = ProfileKafkaMessage {
836 organization_id,
837 project_id,
838 key_id,
839 received: safe_timestamp(received_at),
840 retention_days,
841 headers: BTreeMap::from([(
842 "sampled".to_owned(),
843 if item.sampled() { "true" } else { "false" }.to_owned(),
844 )]),
845 payload: item.payload(),
846 };
847 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
848 Ok(())
849 }
850
851 fn produce_replay_event(
852 &self,
853 replay_id: EventId,
854 project_id: ProjectId,
855 received_at: DateTime<Utc>,
856 retention_days: u16,
857 payload: &[u8],
858 relay_snuba_publish_disabled: bool,
859 ) -> Result<(), StoreError> {
860 if relay_snuba_publish_disabled {
861 return Ok(());
862 }
863
864 let message = ReplayEventKafkaMessage {
865 replay_id,
866 project_id,
867 retention_days,
868 start_time: safe_timestamp(received_at),
869 payload,
870 };
871 self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
872 Ok(())
873 }
874
875 #[allow(clippy::too_many_arguments)]
876 fn produce_replay_recording(
877 &self,
878 event_id: Option<EventId>,
879 scoping: Scoping,
880 payload: &[u8],
881 replay_event: Option<&[u8]>,
882 replay_video: Option<&[u8]>,
883 received_at: DateTime<Utc>,
884 retention: u16,
885 relay_snuba_publish_disabled: bool,
886 ) -> Result<(), StoreError> {
887 let max_payload_size = self.config.max_replay_message_size();
889
890 let mut payload_size = 2000; payload_size += replay_event.as_ref().map_or(0, |b| b.len());
894 payload_size += replay_video.as_ref().map_or(0, |b| b.len());
895 payload_size += payload.len();
896
897 if payload_size >= max_payload_size {
899 relay_log::debug!("replay_recording over maximum size.");
900 self.outcome_aggregator.send(TrackOutcome {
901 category: DataCategory::Replay,
902 event_id,
903 outcome: Outcome::Invalid(DiscardReason::TooLarge(
904 DiscardItemType::ReplayRecording,
905 )),
906 quantity: 1,
907 remote_addr: None,
908 scoping,
909 timestamp: received_at,
910 });
911 return Ok(());
912 }
913
914 let message =
915 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
916 replay_id: event_id.ok_or(StoreError::NoEventId)?,
917 project_id: scoping.project_id,
918 key_id: scoping.key_id,
919 org_id: scoping.organization_id,
920 received: safe_timestamp(received_at),
921 retention_days: retention,
922 payload,
923 replay_event,
924 replay_video,
925 relay_snuba_publish_disabled,
926 });
927
928 self.produce(KafkaTopic::ReplayRecordings, message)?;
929
930 Ok(())
931 }
932
933 fn produce_replay_video(
934 &self,
935 event_id: Option<EventId>,
936 scoping: Scoping,
937 payload: Bytes,
938 received_at: DateTime<Utc>,
939 retention: u16,
940 relay_snuba_publish_disabled: bool,
941 ) -> Result<(), StoreError> {
942 #[derive(Deserialize)]
943 struct VideoEvent<'a> {
944 replay_event: &'a [u8],
945 replay_recording: &'a [u8],
946 replay_video: &'a [u8],
947 }
948
949 let Ok(VideoEvent {
950 replay_video,
951 replay_event,
952 replay_recording,
953 }) = rmp_serde::from_slice::<VideoEvent>(&payload)
954 else {
955 self.outcome_aggregator.send(TrackOutcome {
956 category: DataCategory::Replay,
957 event_id,
958 outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
959 quantity: 1,
960 remote_addr: None,
961 scoping,
962 timestamp: received_at,
963 });
964 return Ok(());
965 };
966
967 self.produce_replay_event(
968 event_id.ok_or(StoreError::NoEventId)?,
969 scoping.project_id,
970 received_at,
971 retention,
972 replay_event,
973 relay_snuba_publish_disabled,
974 )?;
975
976 self.produce_replay_recording(
977 event_id,
978 scoping,
979 replay_recording,
980 Some(replay_event),
981 Some(replay_video),
982 received_at,
983 retention,
984 relay_snuba_publish_disabled,
985 )
986 }
987
988 fn produce_check_in(
989 &self,
990 project_id: ProjectId,
991 received_at: DateTime<Utc>,
992 client: Option<&str>,
993 retention_days: u16,
994 item: &Item,
995 ) -> Result<(), StoreError> {
996 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
997 message_type: CheckInMessageType::CheckIn,
998 project_id,
999 retention_days,
1000 start_time: safe_timestamp(received_at),
1001 sdk: client.map(str::to_owned),
1002 payload: item.payload(),
1003 routing_key_hint: item.routing_hint(),
1004 });
1005
1006 self.produce(KafkaTopic::Monitors, message)?;
1007
1008 Ok(())
1009 }
1010
1011 fn produce_span(
1012 &self,
1013 scoping: Scoping,
1014 received_at: DateTime<Utc>,
1015 event_id: Option<EventId>,
1016 retention_days: u16,
1017 downsampled_retention_days: u16,
1018 item: &Item,
1019 ) -> Result<(), StoreError> {
1020 relay_log::trace!("Producing span");
1021
1022 let payload = item.payload();
1023 let d = &mut Deserializer::from_slice(&payload);
1024 let mut span: SpanKafkaMessage = match serde_path_to_error::deserialize(d) {
1025 Ok(span) => span,
1026 Err(error) => {
1027 relay_log::error!(
1028 error = &error as &dyn std::error::Error,
1029 "failed to parse span"
1030 );
1031 self.outcome_aggregator.send(TrackOutcome {
1032 category: DataCategory::SpanIndexed,
1033 event_id: None,
1034 outcome: Outcome::Invalid(DiscardReason::InvalidSpan),
1035 quantity: 1,
1036 remote_addr: None,
1037 scoping,
1038 timestamp: received_at,
1039 });
1040 return Ok(());
1041 }
1042 };
1043
1044 if let Some(measurements) = &mut span.measurements {
1046 measurements.retain(|_, v| v.as_ref().and_then(|v| v.value).is_some());
1047 }
1048
1049 span.backfill_data();
1050 span.duration_ms =
1051 ((span.end_timestamp_precise - span.start_timestamp_precise) * 1e3) as u32;
1052 span.event_id = event_id;
1053 span.organization_id = scoping.organization_id.value();
1054 span.project_id = scoping.project_id.value();
1055 span.retention_days = retention_days;
1056 span.downsampled_retention_days = downsampled_retention_days;
1057 span.start_timestamp_ms = (span.start_timestamp_precise * 1e3) as u64;
1058 span.key_id = scoping.key_id;
1059
1060 let spans_target = self.spans_target(span.organization_id);
1061 if spans_target.is_protobuf() {
1062 self.inner_produce_protobuf_span(
1063 scoping,
1064 received_at,
1065 event_id,
1066 retention_days,
1067 span.clone(),
1068 )?;
1069 }
1070
1071 if spans_target.is_json() {
1072 self.inner_produce_json_span(scoping, span)?;
1073 }
1074
1075 self.outcome_aggregator.send(TrackOutcome {
1078 category: DataCategory::SpanIndexed,
1079 event_id: None,
1080 outcome: Outcome::Accepted,
1081 quantity: 1,
1082 remote_addr: None,
1083 scoping,
1084 timestamp: received_at,
1085 });
1086
1087 Ok(())
1088 }
1089
1090 fn spans_target(&self, org_id: u64) -> SpansTarget {
1091 let config = self.config.span_producers();
1092 if config.produce_json_orgs.contains(&org_id) {
1093 return SpansTarget::Json;
1094 } else if let Some(rate) = config.produce_json_sample_rate {
1095 return match utils::is_rolled_out(org_id, rate) {
1096 PickResult::Keep => SpansTarget::Json,
1097 PickResult::Discard => SpansTarget::Protobuf,
1098 };
1099 }
1100
1101 match (config.produce_json, config.produce_protobuf) {
1102 (true, true) => SpansTarget::Both,
1103 (true, false) => SpansTarget::Json,
1104 (false, true) => SpansTarget::Protobuf,
1105 (false, false) => SpansTarget::default(),
1106 }
1107 }
1108
1109 fn inner_produce_json_span(
1110 &self,
1111 scoping: Scoping,
1112 span: SpanKafkaMessage,
1113 ) -> Result<(), StoreError> {
1114 self.produce(
1115 KafkaTopic::Spans,
1116 KafkaMessage::Span {
1117 headers: BTreeMap::from([(
1118 "project_id".to_owned(),
1119 scoping.project_id.to_string(),
1120 )]),
1121 message: span,
1122 },
1123 )?;
1124
1125 Ok(())
1126 }
1127
1128 fn inner_produce_protobuf_span(
1129 &self,
1130 scoping: Scoping,
1131 received_at: DateTime<Utc>,
1132 event_id: Option<EventId>,
1133 _retention_days: u16,
1134 span: SpanKafkaMessage,
1135 ) -> Result<(), StoreError> {
1136 let mut trace_item = TraceItem {
1137 item_type: TraceItemType::Span.into(),
1138 organization_id: scoping.organization_id.value(),
1139 project_id: scoping.project_id.value(),
1140 received: Some(Timestamp {
1141 seconds: safe_timestamp(received_at) as i64,
1142 nanos: 0,
1143 }),
1144 retention_days: span.retention_days.into(),
1145 downsampled_retention_days: span.downsampled_retention_days.into(),
1146 timestamp: Some(Timestamp {
1147 seconds: span.start_timestamp_precise as i64,
1148 nanos: 0,
1149 }),
1150 trace_id: span.trace_id.to_string(),
1151 item_id: u128::from_str_radix(&span.span_id, 16)
1152 .unwrap_or_default()
1153 .to_le_bytes()
1154 .to_vec(),
1155 attributes: Default::default(),
1156 client_sample_rate: span.client_sample_rate.unwrap_or_default(),
1157 server_sample_rate: span.server_sample_rate.unwrap_or_default(),
1158 };
1159
1160 if let Some(data) = span.data {
1161 for (key, raw_value) in data {
1162 let Some(raw_value) = raw_value else {
1163 continue;
1164 };
1165
1166 let json_value = match Deserialize::deserialize(raw_value) {
1167 Ok(v) => v,
1168 Err(error) => {
1169 relay_log::error!(
1173 error = &error as &dyn std::error::Error,
1174 raw_value = %raw_value,
1175 "failed to parse JSON value"
1176 );
1177 continue;
1178 }
1179 };
1180
1181 let any_value = match json_value {
1182 JsonValue::String(string) => AnyValue {
1183 value: Some(Value::StringValue(string)),
1184 },
1185 JsonValue::Number(number) => {
1186 if number.is_i64() || number.is_u64() {
1187 AnyValue {
1188 value: Some(Value::IntValue(number.as_i64().unwrap_or_default())),
1189 }
1190 } else {
1191 AnyValue {
1192 value: Some(Value::DoubleValue(
1193 number.as_f64().unwrap_or_default(),
1194 )),
1195 }
1196 }
1197 }
1198 JsonValue::Bool(bool) => AnyValue {
1199 value: Some(Value::BoolValue(bool)),
1200 },
1201 JsonValue::Array(array) => AnyValue {
1202 value: Some(Value::StringValue(
1203 serde_json::to_string(&array).unwrap_or_default(),
1204 )),
1205 },
1206 JsonValue::Object(object) => AnyValue {
1207 value: Some(Value::StringValue(
1208 serde_json::to_string(&object).unwrap_or_default(),
1209 )),
1210 },
1211 _ => continue,
1212 };
1213
1214 trace_item.attributes.insert(key.into(), any_value);
1215 }
1216 }
1217
1218 if let Some(description) = span.description {
1219 trace_item.attributes.insert(
1220 "sentry.raw_description".into(),
1221 AnyValue {
1222 value: Some(Value::StringValue(description.into())),
1223 },
1224 );
1225 }
1226
1227 trace_item.attributes.insert(
1228 "sentry.duration_ms".into(),
1229 AnyValue {
1230 value: Some(Value::IntValue(span.duration_ms.into())),
1231 },
1232 );
1233
1234 if let Some(event_id) = event_id {
1235 trace_item.attributes.insert(
1236 "sentry.event_id".into(),
1237 AnyValue {
1238 value: Some(Value::StringValue(event_id.0.as_simple().to_string())),
1239 },
1240 );
1241 }
1242
1243 trace_item.attributes.insert(
1244 "sentry.is_segment".into(),
1245 AnyValue {
1246 value: Some(Value::BoolValue(span.is_segment)),
1247 },
1248 );
1249
1250 trace_item.attributes.insert(
1251 "sentry.exclusive_time_ms".into(),
1252 AnyValue {
1253 value: Some(Value::DoubleValue(span.exclusive_time_ms)),
1254 },
1255 );
1256
1257 trace_item.attributes.insert(
1258 "sentry.start_timestamp_precise".into(),
1259 AnyValue {
1260 value: Some(Value::DoubleValue(span.start_timestamp_precise)),
1261 },
1262 );
1263
1264 trace_item.attributes.insert(
1265 "sentry.end_timestamp_precise".into(),
1266 AnyValue {
1267 value: Some(Value::DoubleValue(span.end_timestamp_precise)),
1268 },
1269 );
1270
1271 trace_item.attributes.insert(
1272 "sentry.start_timestamp_ms".into(),
1273 AnyValue {
1274 value: Some(Value::IntValue(span.start_timestamp_ms as i64)),
1275 },
1276 );
1277
1278 trace_item.attributes.insert(
1279 "sentry.is_remote".into(),
1280 AnyValue {
1281 value: Some(Value::BoolValue(span.is_remote)),
1282 },
1283 );
1284
1285 if let Some(parent_span_id) = span.parent_span_id {
1286 trace_item.attributes.insert(
1287 "sentry.parent_span_id".into(),
1288 AnyValue {
1289 value: Some(Value::StringValue(parent_span_id.into_owned())),
1290 },
1291 );
1292 }
1293
1294 if let Some(profile_id) = span.profile_id {
1295 trace_item.attributes.insert(
1296 "sentry.profile_id".into(),
1297 AnyValue {
1298 value: Some(Value::StringValue(profile_id.into_owned())),
1299 },
1300 );
1301 }
1302
1303 if let Some(segment_id) = span.segment_id {
1304 trace_item.attributes.insert(
1305 "sentry.segment_id".into(),
1306 AnyValue {
1307 value: Some(Value::StringValue(segment_id.into_owned())),
1308 },
1309 );
1310 }
1311
1312 if let Some(origin) = span.origin {
1313 trace_item.attributes.insert(
1314 "sentry.origin".into(),
1315 AnyValue {
1316 value: Some(Value::StringValue(origin.into_owned())),
1317 },
1318 );
1319 }
1320
1321 if let Some(kind) = span.kind {
1322 trace_item.attributes.insert(
1323 "sentry.kind".into(),
1324 AnyValue {
1325 value: Some(Value::StringValue(kind.into_owned())),
1326 },
1327 );
1328 }
1329
1330 self.produce(
1331 KafkaTopic::Items,
1332 KafkaMessage::Item {
1333 headers: BTreeMap::from([
1334 (
1335 "item_type".to_owned(),
1336 (TraceItemType::Span as i32).to_string(),
1337 ),
1338 ("project_id".to_owned(), scoping.project_id.to_string()),
1339 ]),
1340 item_type: TraceItemType::Span,
1341 message: trace_item,
1342 },
1343 )?;
1344
1345 Ok(())
1346 }
1347
1348 fn produce_profile_chunk(
1349 &self,
1350 organization_id: OrganizationId,
1351 project_id: ProjectId,
1352 received_at: DateTime<Utc>,
1353 retention_days: u16,
1354 item: &Item,
1355 ) -> Result<(), StoreError> {
1356 let message = ProfileChunkKafkaMessage {
1357 organization_id,
1358 project_id,
1359 received: safe_timestamp(received_at),
1360 retention_days,
1361 payload: item.payload(),
1362 };
1363 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1364 Ok(())
1365 }
1366}
1367
1368impl Service for StoreService {
1369 type Interface = Store;
1370
1371 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1372 let this = Arc::new(self);
1373
1374 relay_log::info!("store forwarder started");
1375
1376 while let Some(message) = rx.recv().await {
1377 let service = Arc::clone(&this);
1378 this.pool
1381 .spawn_async(async move { service.handle_message(message) }.boxed())
1382 .await;
1383 }
1384
1385 relay_log::info!("store forwarder stopped");
1386 }
1387}
1388
1389#[derive(Debug, Serialize)]
1391enum AttachmentPayload {
1392 #[serde(rename = "chunks")]
1397 Chunked(usize),
1398
1399 #[serde(rename = "data")]
1401 Inline(Bytes),
1402
1403 #[serde(rename = "stored_id")]
1405 #[allow(unused)] Stored(String),
1407}
1408
1409#[derive(Debug, Serialize)]
1411struct ChunkedAttachment {
1412 id: String,
1416
1417 name: String,
1419
1420 rate_limited: bool,
1427
1428 #[serde(skip_serializing_if = "Option::is_none")]
1430 content_type: Option<String>,
1431
1432 #[serde(serialize_with = "serialize_attachment_type")]
1434 attachment_type: AttachmentType,
1435
1436 size: usize,
1438
1439 #[serde(flatten)]
1441 payload: AttachmentPayload,
1442}
1443
1444fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1450where
1451 S: serde::Serializer,
1452 T: serde::Serialize,
1453{
1454 serde_json::to_value(t)
1455 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1456 .serialize(serializer)
1457}
1458
1459fn serialize_btreemap_skip_nulls<K, S, T>(
1460 map: &Option<BTreeMap<K, Option<T>>>,
1461 serializer: S,
1462) -> Result<S::Ok, S::Error>
1463where
1464 K: Serialize,
1465 S: serde::Serializer,
1466 T: serde::Serialize,
1467{
1468 let Some(map) = map else {
1469 return serializer.serialize_none();
1470 };
1471 let mut m = serializer.serialize_map(Some(map.len()))?;
1472 for (key, value) in map.iter() {
1473 if let Some(value) = value {
1474 m.serialize_entry(key, value)?;
1475 }
1476 }
1477 m.end()
1478}
1479
1480#[derive(Debug, Serialize)]
1482struct EventKafkaMessage {
1483 payload: Bytes,
1485 start_time: u64,
1487 event_id: EventId,
1489 project_id: ProjectId,
1491 remote_addr: Option<String>,
1493 attachments: Vec<ChunkedAttachment>,
1495}
1496
1497#[derive(Debug, Serialize)]
1498struct ReplayEventKafkaMessage<'a> {
1499 payload: &'a [u8],
1501 start_time: u64,
1503 replay_id: EventId,
1505 project_id: ProjectId,
1507 retention_days: u16,
1509}
1510
1511#[derive(Debug, Serialize)]
1513struct AttachmentChunkKafkaMessage {
1514 payload: Bytes,
1516 event_id: EventId,
1518 project_id: ProjectId,
1520 id: String,
1524 chunk_index: usize,
1526}
1527
1528#[derive(Debug, Serialize)]
1533struct AttachmentKafkaMessage {
1534 event_id: EventId,
1536 project_id: ProjectId,
1538 attachment: ChunkedAttachment,
1540}
1541
1542#[derive(Debug, Serialize)]
1543struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1544 replay_id: EventId,
1545 key_id: Option<u64>,
1546 org_id: OrganizationId,
1547 project_id: ProjectId,
1548 received: u64,
1549 retention_days: u16,
1550 #[serde(with = "serde_bytes")]
1551 payload: &'a [u8],
1552 #[serde(with = "serde_bytes")]
1553 replay_event: Option<&'a [u8]>,
1554 #[serde(with = "serde_bytes")]
1555 replay_video: Option<&'a [u8]>,
1556 relay_snuba_publish_disabled: bool,
1557}
1558
1559#[derive(Debug, Serialize)]
1563struct UserReportKafkaMessage {
1564 project_id: ProjectId,
1566 start_time: u64,
1567 payload: Bytes,
1568
1569 #[serde(skip)]
1571 event_id: EventId,
1572}
1573
1574#[derive(Clone, Debug, Serialize)]
1575struct MetricKafkaMessage<'a> {
1576 org_id: OrganizationId,
1577 project_id: ProjectId,
1578 name: &'a MetricName,
1579 #[serde(flatten)]
1580 value: MetricValue<'a>,
1581 timestamp: UnixTimestamp,
1582 tags: &'a BTreeMap<String, String>,
1583 retention_days: u16,
1584 #[serde(skip_serializing_if = "Option::is_none")]
1585 received_at: Option<UnixTimestamp>,
1586}
1587
1588#[derive(Clone, Debug, Serialize)]
1589#[serde(tag = "type", content = "value")]
1590enum MetricValue<'a> {
1591 #[serde(rename = "c")]
1592 Counter(FiniteF64),
1593 #[serde(rename = "d")]
1594 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1595 #[serde(rename = "s")]
1596 Set(ArrayEncoding<'a, SetView<'a>>),
1597 #[serde(rename = "g")]
1598 Gauge(GaugeValue),
1599}
1600
1601impl MetricValue<'_> {
1602 fn variant(&self) -> &'static str {
1603 match self {
1604 Self::Counter(_) => "counter",
1605 Self::Distribution(_) => "distribution",
1606 Self::Set(_) => "set",
1607 Self::Gauge(_) => "gauge",
1608 }
1609 }
1610
1611 fn encoding(&self) -> Option<&'static str> {
1612 match self {
1613 Self::Distribution(ae) => Some(ae.name()),
1614 Self::Set(ae) => Some(ae.name()),
1615 _ => None,
1616 }
1617 }
1618}
1619
1620#[derive(Clone, Debug, Serialize)]
1621struct ProfileKafkaMessage {
1622 organization_id: OrganizationId,
1623 project_id: ProjectId,
1624 key_id: Option<u64>,
1625 received: u64,
1626 retention_days: u16,
1627 #[serde(skip)]
1628 headers: BTreeMap<String, String>,
1629 payload: Bytes,
1630}
1631
1632#[allow(dead_code)]
1638#[derive(Debug, Serialize)]
1639#[serde(rename_all = "snake_case")]
1640enum CheckInMessageType {
1641 ClockPulse,
1642 CheckIn,
1643}
1644
1645#[derive(Debug, Serialize)]
1646struct CheckInKafkaMessage {
1647 #[serde(skip)]
1648 routing_key_hint: Option<Uuid>,
1649
1650 message_type: CheckInMessageType,
1652 payload: Bytes,
1654 start_time: u64,
1656 sdk: Option<String>,
1658 project_id: ProjectId,
1660 retention_days: u16,
1662}
1663
1664#[derive(Debug, Deserialize, Serialize, Clone)]
1665struct SpanLink<'a> {
1666 pub trace_id: &'a str,
1667 pub span_id: &'a str,
1668 #[serde(default, skip_serializing_if = "Option::is_none")]
1669 pub sampled: Option<bool>,
1670 #[serde(borrow)]
1671 pub attributes: Option<&'a RawValue>,
1672}
1673
1674#[derive(Debug, Deserialize, Serialize, Clone)]
1675struct SpanMeasurement<'a> {
1676 #[serde(skip_serializing_if = "Option::is_none", borrow)]
1677 value: Option<&'a RawValue>,
1678}
1679
1680#[derive(Debug, Deserialize, Serialize, Clone)]
1681struct SpanKafkaMessage<'a> {
1682 #[serde(skip_serializing_if = "Option::is_none", borrow)]
1683 description: Option<Cow<'a, str>>,
1684 #[serde(default)]
1685 duration_ms: u32,
1686 #[serde(default, skip_serializing_if = "Option::is_none")]
1688 event_id: Option<EventId>,
1689 #[serde(rename(deserialize = "exclusive_time"))]
1690 exclusive_time_ms: f64,
1691 #[serde(default)]
1692 is_segment: bool,
1693 #[serde(default)]
1694 is_remote: bool,
1695
1696 #[serde(skip_serializing_if = "none_or_empty_map", borrow)]
1697 data: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1698 #[serde(default, skip_serializing_if = "Option::is_none")]
1699 kind: Option<Cow<'a, str>>,
1700 #[serde(default, skip_serializing_if = "none_or_empty_vec")]
1701 links: Option<Vec<SpanLink<'a>>>,
1702 #[serde(borrow, default, skip_serializing_if = "Option::is_none")]
1703 measurements: Option<BTreeMap<Cow<'a, str>, Option<SpanMeasurement<'a>>>>,
1704 #[serde(default)]
1705 organization_id: u64,
1706 #[serde(borrow, default, skip_serializing_if = "Option::is_none")]
1707 origin: Option<Cow<'a, str>>,
1708 #[serde(default, skip_serializing_if = "Option::is_none")]
1709 parent_span_id: Option<Cow<'a, str>>,
1710 #[serde(default, skip_serializing_if = "Option::is_none")]
1711 profile_id: Option<Cow<'a, str>>,
1712 #[serde(default)]
1714 project_id: u64,
1715 received: f64,
1717 #[serde(default)]
1719 retention_days: u16,
1720 #[serde(default)]
1722 downsampled_retention_days: u16,
1723 #[serde(default, skip_serializing_if = "Option::is_none")]
1724 segment_id: Option<Cow<'a, str>>,
1725 #[serde(
1726 default,
1727 skip_serializing_if = "Option::is_none",
1728 serialize_with = "serialize_btreemap_skip_nulls"
1729 )]
1730 #[serde(borrow)]
1731 sentry_tags: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1732 span_id: Cow<'a, str>,
1733 #[serde(skip_serializing_if = "none_or_empty_map", borrow)]
1734 tags: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1735 trace_id: EventId,
1736
1737 #[serde(default)]
1738 start_timestamp_ms: u64,
1739 #[serde(rename(deserialize = "start_timestamp"))]
1740 start_timestamp_precise: f64,
1741 #[serde(rename(deserialize = "timestamp"))]
1742 end_timestamp_precise: f64,
1743
1744 #[serde(borrow, default, skip_serializing)]
1745 platform: Cow<'a, str>, #[serde(default, skip_serializing_if = "Option::is_none")]
1748 client_sample_rate: Option<f64>,
1749 #[serde(default, skip_serializing_if = "Option::is_none")]
1750 server_sample_rate: Option<f64>,
1751
1752 #[serde(
1753 default,
1754 rename = "_meta",
1755 skip_serializing_if = "none_or_empty_object"
1756 )]
1757 meta: Option<&'a RawValue>,
1758
1759 #[serde(default, skip_serializing_if = "Option::is_none")]
1760 _performance_issues_spans: Option<bool>,
1761
1762 #[serde(skip_serializing_if = "Option::is_none")]
1764 key_id: Option<u64>,
1765}
1766
1767impl SpanKafkaMessage<'_> {
1768 fn backfill_data(&mut self) {
1786 let data = self.data.get_or_insert_default();
1787
1788 if let Some(measurements) = &self.measurements {
1789 for (key, value) in measurements {
1790 let Some(value) = value.as_ref().and_then(|v| v.value) else {
1791 continue;
1792 };
1793
1794 match key.as_ref() {
1795 "client_sample_rate" => {
1796 data.entry(Cow::Borrowed("sentry.client_sample_rate"))
1797 .or_insert(Some(value));
1798
1799 if let Ok(client_sample_rate) = Deserialize::deserialize(value) {
1800 self.client_sample_rate = Some(client_sample_rate);
1801 }
1802 }
1803 "server_sample_rate" => {
1804 data.entry(Cow::Borrowed("sentry.server_sample_rate"))
1805 .or_insert(Some(value));
1806
1807 if let Ok(server_sample_rate) = Deserialize::deserialize(value) {
1808 self.server_sample_rate = Some(server_sample_rate);
1809 }
1810 }
1811 _ => {
1812 data.entry(key.clone()).or_insert(Some(value));
1813 }
1814 }
1815 }
1816 }
1817
1818 if let Some(tags) = &self.tags {
1819 for (key, value) in tags {
1820 let Some(value) = value else {
1821 continue;
1822 };
1823
1824 let key = if *key == "description" {
1825 Cow::Borrowed("sentry.normalized_description")
1826 } else {
1827 key.clone()
1828 };
1829
1830 data.entry(key).or_insert(Some(value));
1831 }
1832 }
1833
1834 if let Some(sentry_tags) = &self.sentry_tags {
1835 for (key, value) in sentry_tags {
1836 let Some(value) = value else {
1837 continue;
1838 };
1839
1840 let key = if *key == "description" {
1841 Cow::Borrowed("sentry.normalized_description")
1842 } else {
1843 Cow::Owned(format!("sentry.{key}"))
1844 };
1845
1846 data.entry(key).or_insert(Some(value));
1847 }
1848 }
1849 }
1850}
1851
1852fn none_or_empty_object(value: &Option<&RawValue>) -> bool {
1853 match value {
1854 None => true,
1855 Some(raw) => raw.get() == "{}",
1856 }
1857}
1858
1859fn none_or_empty_vec<T>(value: &Option<Vec<T>>) -> bool {
1860 match &value {
1861 Some(vec) => vec.is_empty(),
1862 None => true,
1863 }
1864}
1865
1866fn none_or_empty_map<S, T>(value: &Option<BTreeMap<S, T>>) -> bool {
1867 value.as_ref().is_none_or(BTreeMap::is_empty)
1868}
1869
1870#[derive(Clone, Debug, Serialize)]
1871struct ProfileChunkKafkaMessage {
1872 organization_id: OrganizationId,
1873 project_id: ProjectId,
1874 received: u64,
1875 retention_days: u16,
1876 payload: Bytes,
1877}
1878
1879#[derive(Debug, Serialize)]
1881#[serde(tag = "type", rename_all = "snake_case")]
1882#[allow(clippy::large_enum_variant)]
1883enum KafkaMessage<'a> {
1884 Event(EventKafkaMessage),
1885 UserReport(UserReportKafkaMessage),
1886 Metric {
1887 #[serde(skip)]
1888 headers: BTreeMap<String, String>,
1889 #[serde(flatten)]
1890 message: MetricKafkaMessage<'a>,
1891 },
1892 CheckIn(CheckInKafkaMessage),
1893 Item {
1894 #[serde(skip)]
1895 headers: BTreeMap<String, String>,
1896 #[serde(skip)]
1897 item_type: TraceItemType,
1898 #[serde(skip)]
1899 message: TraceItem,
1900 },
1901 Span {
1902 #[serde(skip)]
1903 headers: BTreeMap<String, String>,
1904 #[serde(flatten)]
1905 message: SpanKafkaMessage<'a>,
1906 },
1907
1908 Attachment(AttachmentKafkaMessage),
1909 AttachmentChunk(AttachmentChunkKafkaMessage),
1910
1911 Profile(ProfileKafkaMessage),
1912 ProfileChunk(ProfileChunkKafkaMessage),
1913
1914 ReplayEvent(ReplayEventKafkaMessage<'a>),
1915 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1916}
1917
1918impl Message for KafkaMessage<'_> {
1919 fn variant(&self) -> &'static str {
1920 match self {
1921 KafkaMessage::Event(_) => "event",
1922 KafkaMessage::UserReport(_) => "user_report",
1923 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1924 MetricNamespace::Sessions => "metric_sessions",
1925 MetricNamespace::Transactions => "metric_transactions",
1926 MetricNamespace::Spans => "metric_spans",
1927 MetricNamespace::Custom => "metric_custom",
1928 MetricNamespace::Unsupported => "metric_unsupported",
1929 },
1930 KafkaMessage::CheckIn(_) => "check_in",
1931 KafkaMessage::Span { .. } => "span",
1932 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1933
1934 KafkaMessage::Attachment(_) => "attachment",
1935 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1936
1937 KafkaMessage::Profile(_) => "profile",
1938 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1939
1940 KafkaMessage::ReplayEvent(_) => "replay_event",
1941 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1942 }
1943 }
1944
1945 fn key(&self) -> Option<relay_kafka::Key> {
1947 match self {
1948 Self::Event(message) => Some(message.event_id.0),
1949 Self::UserReport(message) => Some(message.event_id.0),
1950 Self::Span { message, .. } => Some(message.trace_id.0),
1951
1952 Self::CheckIn(message) => message.routing_key_hint,
1957
1958 Self::Attachment(message) => Some(message.event_id.0),
1959 Self::AttachmentChunk(message) => Some(message.event_id.0),
1960 Self::ReplayEvent(message) => Some(message.replay_id.0),
1961
1962 _ => None,
1964 }
1965 .filter(|uuid| !uuid.is_nil())
1966 .map(|uuid| uuid.as_u128())
1967 }
1968
1969 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1970 match &self {
1971 KafkaMessage::Metric { headers, .. }
1972 | KafkaMessage::Span { headers, .. }
1973 | KafkaMessage::Item { headers, .. }
1974 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) => Some(headers),
1975 _ => None,
1976 }
1977 }
1978
1979 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1980 match self {
1981 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1982 KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1983 KafkaMessage::Span { message, .. } => serialize_as_json(message),
1984 KafkaMessage::Item { message, .. } => {
1985 let mut payload = Vec::new();
1986 match message.encode(&mut payload) {
1987 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1988 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1989 }
1990 }
1991 _ => match rmp_serde::to_vec_named(&self) {
1992 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1993 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1994 },
1995 }
1996 }
1997}
1998
1999fn serialize_as_json<T: serde::Serialize>(
2000 value: &T,
2001) -> Result<SerializationOutput<'_>, ClientError> {
2002 match serde_json::to_vec(value) {
2003 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
2004 Err(err) => Err(ClientError::InvalidJson(err)),
2005 }
2006}
2007
2008fn is_slow_item(item: &Item) -> bool {
2012 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
2013}
2014
2015fn bool_to_str(value: bool) -> &'static str {
2016 if value { "true" } else { "false" }
2017}
2018
2019fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
2023 let ts = timestamp.timestamp();
2024 if ts >= 0 {
2025 return ts as u64;
2026 }
2027
2028 Utc::now().timestamp() as u64
2030}
2031
2032#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
2033enum SpansTarget {
2034 #[default]
2035 Protobuf,
2036 Json,
2037 Both,
2038}
2039
2040impl SpansTarget {
2041 fn is_protobuf(&self) -> bool {
2042 matches!(self, SpansTarget::Protobuf | SpansTarget::Both)
2043 }
2044
2045 fn is_json(&self) -> bool {
2046 matches!(self, SpansTarget::Json | SpansTarget::Both)
2047 }
2048}
2049
2050#[cfg(test)]
2051mod tests {
2052
2053 use super::*;
2054
2055 #[test]
2056 fn disallow_outcomes() {
2057 let config = Config::default();
2058 let producer = Producer::create(&config).unwrap();
2059
2060 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
2061 let res = producer
2062 .client
2063 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
2064
2065 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
2066 }
2067 }
2068
2069 #[test]
2070 fn backfill() {
2071 let json = r#"{
2072 "description": "/api/0/relays/projectconfigs/",
2073 "duration_ms": 152,
2074 "exclusive_time": 0.228,
2075 "is_segment": true,
2076 "data": {
2077 "sentry.environment": "development",
2078 "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
2079 "thread.name": "uWSGIWorker1Core0",
2080 "thread.id": "8522009600",
2081 "sentry.segment.name": "/api/0/relays/projectconfigs/",
2082 "sentry.sdk.name": "sentry.python.django",
2083 "sentry.sdk.version": "2.7.0",
2084 "my.float.field": 101.2,
2085 "my.int.field": 2000,
2086 "my.neg.field": -100,
2087 "my.neg.float.field": -101.2,
2088 "my.true.bool.field": true,
2089 "my.false.bool.field": false,
2090 "my.dict.field": {
2091 "id": 42,
2092 "name": "test"
2093 },
2094 "my.u64.field": 9447000002305251000,
2095 "my.array.field": [1, 2, ["nested", "array"]]
2096 },
2097 "measurements": {
2098 "num_of_spans": {"value": 50.0},
2099 "client_sample_rate": {"value": 0.1},
2100 "server_sample_rate": {"value": 0.2}
2101 },
2102 "profile_id": "56c7d1401ea14ad7b4ac86de46baebae",
2103 "organization_id": 1,
2104 "origin": "auto.http.django",
2105 "project_id": 1,
2106 "received": 1721319572.877828,
2107 "retention_days": 90,
2108 "segment_id": "8873a98879faf06d",
2109 "sentry_tags": {
2110 "description": "normalized_description",
2111 "category": "http",
2112 "environment": "development",
2113 "op": "http.server",
2114 "platform": "python",
2115 "release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
2116 "sdk.name": "sentry.python.django",
2117 "sdk.version": "2.7.0",
2118 "status": "ok",
2119 "status_code": "200",
2120 "thread.id": "8522009600",
2121 "thread.name": "uWSGIWorker1Core0",
2122 "trace.status": "ok",
2123 "transaction": "/api/0/relays/projectconfigs/",
2124 "transaction.method": "POST",
2125 "transaction.op": "http.server",
2126 "user": "ip:127.0.0.1"
2127 },
2128 "span_id": "8873a98879faf06d",
2129 "tags": {
2130 "http.status_code": "200",
2131 "relay_endpoint_version": "3",
2132 "relay_id": "88888888-4444-4444-8444-cccccccccccc",
2133 "relay_no_cache": "False",
2134 "relay_protocol_version": "3",
2135 "relay_use_post_or_schedule": "True",
2136 "relay_use_post_or_schedule_rejected": "version",
2137 "server_name": "D23CXQ4GK2.local",
2138 "spans_over_limit": "False"
2139 },
2140 "trace_id": "d099bf9ad5a143cf8f83a98081d0ed3b",
2141 "start_timestamp_ms": 1721319572616,
2142 "start_timestamp": 1721319572.616648,
2143 "timestamp": 1721319572.768806
2144 }"#;
2145 let mut span: SpanKafkaMessage = serde_json::from_str(json).unwrap();
2146 span.backfill_data();
2147
2148 assert_eq!(
2149 serde_json::to_string_pretty(&span.data).unwrap(),
2150 r#"{
2151 "http.status_code": "200",
2152 "my.array.field": [1, 2, ["nested", "array"]],
2153 "my.dict.field": {
2154 "id": 42,
2155 "name": "test"
2156 },
2157 "my.false.bool.field": false,
2158 "my.float.field": 101.2,
2159 "my.int.field": 2000,
2160 "my.neg.field": -100,
2161 "my.neg.float.field": -101.2,
2162 "my.true.bool.field": true,
2163 "my.u64.field": 9447000002305251000,
2164 "num_of_spans": 50.0,
2165 "relay_endpoint_version": "3",
2166 "relay_id": "88888888-4444-4444-8444-cccccccccccc",
2167 "relay_no_cache": "False",
2168 "relay_protocol_version": "3",
2169 "relay_use_post_or_schedule": "True",
2170 "relay_use_post_or_schedule_rejected": "version",
2171 "sentry.category": "http",
2172 "sentry.client_sample_rate": 0.1,
2173 "sentry.environment": "development",
2174 "sentry.normalized_description": "normalized_description",
2175 "sentry.op": "http.server",
2176 "sentry.platform": "python",
2177 "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
2178 "sentry.sdk.name": "sentry.python.django",
2179 "sentry.sdk.version": "2.7.0",
2180 "sentry.segment.name": "/api/0/relays/projectconfigs/",
2181 "sentry.server_sample_rate": 0.2,
2182 "sentry.status": "ok",
2183 "sentry.status_code": "200",
2184 "sentry.thread.id": "8522009600",
2185 "sentry.thread.name": "uWSGIWorker1Core0",
2186 "sentry.trace.status": "ok",
2187 "sentry.transaction": "/api/0/relays/projectconfigs/",
2188 "sentry.transaction.method": "POST",
2189 "sentry.transaction.op": "http.server",
2190 "sentry.user": "ip:127.0.0.1",
2191 "server_name": "D23CXQ4GK2.local",
2192 "spans_over_limit": "False",
2193 "thread.id": "8522009600",
2194 "thread.name": "uWSGIWorker1Core0"
2195}"#
2196 );
2197 }
2198}