1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use prost_types::Timestamp;
15use sentry_protos::snuba::v1::any_value::Value;
16use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType};
17use serde::ser::SerializeMap;
18use serde::{Deserialize, Serialize};
19use serde_json::value::RawValue;
20use serde_json::{Deserializer, Value as JsonValue};
21use uuid::Uuid;
22
23use relay_base_schema::data_category::DataCategory;
24use relay_base_schema::organization::OrganizationId;
25use relay_base_schema::project::ProjectId;
26use relay_common::time::UnixTimestamp;
27use relay_config::Config;
28use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};
29use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
30use relay_metrics::{
31 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
32 MetricNamespace, SetView,
33};
34use relay_protocol::FiniteF64;
35use relay_quotas::Scoping;
36use relay_statsd::metric;
37use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
38use relay_threading::AsyncPool;
39
40use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
41use crate::managed::{Counted, Managed, OutcomeError, Quantities, TypedEnvelope};
42use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
43use crate::service::ServiceError;
44use crate::services::global_config::GlobalConfigHandle;
45use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
46use crate::services::processor::Processed;
47use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
48use crate::utils::FormDataIter;
49
50const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
52
53#[derive(Debug, thiserror::Error)]
54pub enum StoreError {
55 #[error("failed to send the message to kafka: {0}")]
56 SendFailed(#[from] ClientError),
57 #[error("failed to encode data: {0}")]
58 EncodingFailed(std::io::Error),
59 #[error("failed to store event because event id was missing")]
60 NoEventId,
61}
62
63impl OutcomeError for StoreError {
64 type Error = Self;
65
66 fn consume(self) -> (Option<Outcome>, Self::Error) {
67 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
68 }
69}
70
71struct Producer {
72 client: KafkaClient,
73}
74
75impl Producer {
76 pub fn create(config: &Config) -> anyhow::Result<Self> {
77 let mut client_builder = KafkaClient::builder();
78
79 for topic in KafkaTopic::iter().filter(|t| {
80 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
83 }) {
84 let kafka_configs = config.kafka_configs(*topic)?;
85 client_builder = client_builder
86 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
87 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
88 }
89
90 Ok(Self {
91 client: client_builder.build(),
92 })
93 }
94}
95
96#[derive(Debug)]
98pub struct StoreEnvelope {
99 pub envelope: TypedEnvelope<Processed>,
100}
101
102#[derive(Clone, Debug)]
104pub struct StoreMetrics {
105 pub buckets: Vec<Bucket>,
106 pub scoping: Scoping,
107 pub retention: u16,
108}
109
110#[derive(Debug)]
112pub struct StoreLog {
113 pub trace_item: TraceItem,
115 pub quantities: Quantities,
120}
121
122impl Counted for StoreLog {
123 fn quantities(&self) -> Quantities {
124 self.quantities.clone()
125 }
126}
127
128pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
130
131#[derive(Debug)]
133pub enum Store {
134 Envelope(StoreEnvelope),
142 Metrics(StoreMetrics),
144 Log(Managed<StoreLog>),
146}
147
148impl Store {
149 fn variant(&self) -> &'static str {
151 match self {
152 Store::Envelope(_) => "envelope",
153 Store::Metrics(_) => "metrics",
154 Store::Log(_) => "log",
155 }
156 }
157}
158
159impl Interface for Store {}
160
161impl FromMessage<StoreEnvelope> for Store {
162 type Response = NoResponse;
163
164 fn from_message(message: StoreEnvelope, _: ()) -> Self {
165 Self::Envelope(message)
166 }
167}
168
169impl FromMessage<StoreMetrics> for Store {
170 type Response = NoResponse;
171
172 fn from_message(message: StoreMetrics, _: ()) -> Self {
173 Self::Metrics(message)
174 }
175}
176
177impl FromMessage<Managed<StoreLog>> for Store {
178 type Response = NoResponse;
179
180 fn from_message(message: Managed<StoreLog>, _: ()) -> Self {
181 Self::Log(message)
182 }
183}
184
185pub struct StoreService {
187 pool: StoreServicePool,
188 config: Arc<Config>,
189 global_config: GlobalConfigHandle,
190 outcome_aggregator: Addr<TrackOutcome>,
191 metric_outcomes: MetricOutcomes,
192 producer: Producer,
193}
194
195impl StoreService {
196 pub fn create(
197 pool: StoreServicePool,
198 config: Arc<Config>,
199 global_config: GlobalConfigHandle,
200 outcome_aggregator: Addr<TrackOutcome>,
201 metric_outcomes: MetricOutcomes,
202 ) -> anyhow::Result<Self> {
203 let producer = Producer::create(&config)?;
204 Ok(Self {
205 pool,
206 config,
207 global_config,
208 outcome_aggregator,
209 metric_outcomes,
210 producer,
211 })
212 }
213
214 fn handle_message(&self, message: Store) {
215 let ty = message.variant();
216 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
217 match message {
218 Store::Envelope(message) => self.handle_store_envelope(message),
219 Store::Metrics(message) => self.handle_store_metrics(message),
220 Store::Log(message) => self.handle_store_log(message),
221 }
222 })
223 }
224
225 fn handle_store_envelope(&self, message: StoreEnvelope) {
226 let StoreEnvelope {
227 envelope: mut managed,
228 } = message;
229
230 let scoping = managed.scoping();
231 let envelope = managed.take_envelope();
232
233 match self.store_envelope(envelope, managed.received_at(), scoping) {
234 Ok(()) => managed.accept(),
235 Err(error) => {
236 managed.reject(Outcome::Invalid(DiscardReason::Internal));
237 relay_log::error!(
238 error = &error as &dyn Error,
239 tags.project_key = %scoping.project_key,
240 "failed to store envelope"
241 );
242 }
243 }
244 }
245
246 fn store_envelope(
247 &self,
248 mut envelope: Box<Envelope>,
249 received_at: DateTime<Utc>,
250 scoping: Scoping,
251 ) -> Result<(), StoreError> {
252 let retention = envelope.retention();
253
254 let event_id = envelope.event_id();
255 let event_item = envelope.as_mut().take_item_by(|item| {
256 matches!(
257 item.ty(),
258 ItemType::Event | ItemType::Transaction | ItemType::Security
259 )
260 });
261 let event_type = event_item.as_ref().map(|item| item.ty());
262
263 let topic = if envelope.get_item_by(is_slow_item).is_some() {
264 KafkaTopic::Attachments
265 } else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
266 KafkaTopic::Transactions
267 } else {
268 KafkaTopic::Events
269 };
270
271 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
272
273 let mut attachments = Vec::new();
274 let mut replay_event = None;
275 let mut replay_recording = None;
276
277 for item in envelope.items() {
278 match item.ty() {
279 ItemType::Attachment => {
280 debug_assert!(topic == KafkaTopic::Attachments);
281 if let Some(attachment) = self.produce_attachment(
282 event_id.ok_or(StoreError::NoEventId)?,
283 scoping.project_id,
284 item,
285 send_individual_attachments,
286 )? {
287 attachments.push(attachment);
288 }
289 }
290 ItemType::UserReport => {
291 debug_assert!(topic == KafkaTopic::Attachments);
292 self.produce_user_report(
293 event_id.ok_or(StoreError::NoEventId)?,
294 scoping.project_id,
295 received_at,
296 item,
297 )?;
298 }
299 ItemType::UserReportV2 => {
300 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
301 self.produce_user_report_v2(
302 event_id.ok_or(StoreError::NoEventId)?,
303 scoping.project_id,
304 received_at,
305 item,
306 remote_addr,
307 )?;
308 }
309 ItemType::Profile => self.produce_profile(
310 scoping.organization_id,
311 scoping.project_id,
312 scoping.key_id,
313 received_at,
314 retention,
315 item,
316 )?,
317 ItemType::ReplayVideo => {
318 self.produce_replay_video(
319 event_id,
320 scoping,
321 item.payload(),
322 received_at,
323 retention,
324 )?;
325 }
326 ItemType::ReplayRecording => {
327 replay_recording = Some(item);
328 }
329 ItemType::ReplayEvent => {
330 replay_event = Some(item);
331 self.produce_replay_event(
332 event_id.ok_or(StoreError::NoEventId)?,
333 scoping.project_id,
334 received_at,
335 retention,
336 &item.payload(),
337 )?;
338 }
339 ItemType::CheckIn => {
340 let client = envelope.meta().client();
341 self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
342 }
343 ItemType::Span => {
344 self.produce_span(scoping, received_at, event_id, retention, item)?
345 }
346 ItemType::Log => self.produce_log(scoping, received_at, retention, item)?,
347 ItemType::ProfileChunk => self.produce_profile_chunk(
348 scoping.organization_id,
349 scoping.project_id,
350 received_at,
351 retention,
352 item,
353 )?,
354 other => {
355 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
356 let item_types = envelope
357 .items()
358 .map(|item| item.ty().as_str())
359 .collect::<Vec<_>>();
360 let attachment_types = envelope
361 .items()
362 .map(|item| {
363 item.attachment_type()
364 .map(|t| t.to_string())
365 .unwrap_or_default()
366 })
367 .collect::<Vec<_>>();
368
369 relay_log::with_scope(
370 |scope| {
371 scope.set_extra("item_types", item_types.into());
372 scope.set_extra("attachment_types", attachment_types.into());
373 if other == &ItemType::FormData {
374 let payload = item.payload();
375 let form_data_keys = FormDataIter::new(&payload)
376 .map(|entry| entry.key())
377 .collect::<Vec<_>>();
378 scope.set_extra("form_data_keys", form_data_keys.into());
379 }
380 },
381 || {
382 relay_log::error!(
383 tags.project_key = %scoping.project_key,
384 tags.event_type = event_type.unwrap_or("none"),
385 "StoreService received unexpected item type: {other}"
386 )
387 },
388 )
389 }
390 }
391 }
392
393 if let Some(recording) = replay_recording {
394 let replay_event = replay_event.map(|rv| rv.payload());
400 self.produce_replay_recording(
401 event_id,
402 scoping,
403 &recording.payload(),
404 replay_event.as_deref(),
405 None,
406 received_at,
407 retention,
408 )?;
409 }
410
411 if let Some(event_item) = event_item {
412 let event_id = event_id.ok_or(StoreError::NoEventId)?;
413 let project_id = scoping.project_id;
414 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
415
416 self.produce(
417 topic,
418 KafkaMessage::Event(EventKafkaMessage {
419 payload: event_item.payload(),
420 start_time: safe_timestamp(received_at),
421 event_id,
422 project_id,
423 remote_addr,
424 attachments,
425 }),
426 )?;
427 } else {
428 debug_assert!(attachments.is_empty());
429 }
430
431 Ok(())
432 }
433
434 fn handle_store_metrics(&self, message: StoreMetrics) {
435 let StoreMetrics {
436 buckets,
437 scoping,
438 retention,
439 } = message;
440
441 let batch_size = self.config.metrics_max_batch_size_bytes();
442 let mut error = None;
443
444 let global_config = self.global_config.current();
445 let mut encoder = BucketEncoder::new(&global_config);
446
447 let now = UnixTimestamp::now();
448 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
449
450 for mut bucket in buckets {
451 let namespace = encoder.prepare(&mut bucket);
452
453 if let Some(received_at) = bucket.metadata.received_at {
454 let delay = now.as_secs().saturating_sub(received_at.as_secs());
455 let (total, count, max) = delay_stats.get_mut(namespace);
456 *total += delay;
457 *count += 1;
458 *max = (*max).max(delay);
459 }
460
461 for view in BucketsView::new(std::slice::from_ref(&bucket))
465 .by_size(batch_size)
466 .flatten()
467 {
468 let message = self.create_metric_message(
469 scoping.organization_id,
470 scoping.project_id,
471 &mut encoder,
472 namespace,
473 &view,
474 retention,
475 );
476
477 let result =
478 message.and_then(|message| self.send_metric_message(namespace, message));
479
480 let outcome = match result {
481 Ok(()) => Outcome::Accepted,
482 Err(e) => {
483 error.get_or_insert(e);
484 Outcome::Invalid(DiscardReason::Internal)
485 }
486 };
487
488 self.metric_outcomes.track(scoping, &[view], outcome);
489 }
490 }
491
492 if let Some(error) = error {
493 relay_log::error!(
494 error = &error as &dyn std::error::Error,
495 "failed to produce metric buckets: {error}"
496 );
497 }
498
499 for (namespace, (total, count, max)) in delay_stats {
500 if count == 0 {
501 continue;
502 }
503 metric!(
504 counter(RelayCounters::MetricDelaySum) += total,
505 namespace = namespace.as_str()
506 );
507 metric!(
508 counter(RelayCounters::MetricDelayCount) += count,
509 namespace = namespace.as_str()
510 );
511 metric!(
512 gauge(RelayGauges::MetricDelayMax) = max,
513 namespace = namespace.as_str()
514 );
515 }
516 }
517
518 fn handle_store_log(&self, message: Managed<StoreLog>) {
519 let scoping = message.scoping();
520 let received_at = message.received_at();
521
522 let quantities = message.try_accept(|log| {
523 let message = KafkaMessage::Item {
524 headers: BTreeMap::from([
525 ("project_id".to_owned(), scoping.project_id.to_string()),
526 (
527 "item_type".to_owned(),
528 (TraceItemType::Log as i32).to_string(),
529 ),
530 ]),
531 message: log.trace_item,
532 item_type: TraceItemType::Log,
533 };
534
535 self.produce(KafkaTopic::Items, message)
536 .map(|()| log.quantities)
537 });
538
539 if let Ok(quantities) = quantities {
544 for (category, quantity) in quantities {
545 self.outcome_aggregator.send(TrackOutcome {
546 category,
547 event_id: None,
548 outcome: Outcome::Accepted,
549 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
550 remote_addr: None,
551 scoping,
552 timestamp: received_at,
553 });
554 }
555 }
556 }
557
558 fn create_metric_message<'a>(
559 &self,
560 organization_id: OrganizationId,
561 project_id: ProjectId,
562 encoder: &'a mut BucketEncoder,
563 namespace: MetricNamespace,
564 view: &BucketView<'a>,
565 retention_days: u16,
566 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
567 let value = match view.value() {
568 BucketViewValue::Counter(c) => MetricValue::Counter(c),
569 BucketViewValue::Distribution(data) => MetricValue::Distribution(
570 encoder
571 .encode_distribution(namespace, data)
572 .map_err(StoreError::EncodingFailed)?,
573 ),
574 BucketViewValue::Set(data) => MetricValue::Set(
575 encoder
576 .encode_set(namespace, data)
577 .map_err(StoreError::EncodingFailed)?,
578 ),
579 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
580 };
581
582 Ok(MetricKafkaMessage {
583 org_id: organization_id,
584 project_id,
585 name: view.name(),
586 value,
587 timestamp: view.timestamp(),
588 tags: view.tags(),
589 retention_days,
590 received_at: view.metadata().received_at,
591 })
592 }
593
594 fn produce(
595 &self,
596 topic: KafkaTopic,
597 message: KafkaMessage,
599 ) -> Result<(), StoreError> {
600 relay_log::trace!("Sending kafka message of type {}", message.variant());
601
602 let topic_name = self.producer.client.send_message(topic, &message)?;
603
604 match &message {
605 KafkaMessage::Metric {
606 message: metric, ..
607 } => {
608 metric!(
609 counter(RelayCounters::ProcessingMessageProduced) += 1,
610 event_type = message.variant(),
611 topic = topic_name,
612 metric_type = metric.value.variant(),
613 metric_encoding = metric.value.encoding().unwrap_or(""),
614 );
615 }
616 KafkaMessage::Span { message: span, .. } => {
617 let is_segment = span.is_segment;
618 let has_parent = span.parent_span_id.is_some();
619 let platform = VALID_PLATFORMS.iter().find(|p| *p == &span.platform);
620
621 metric!(
622 counter(RelayCounters::ProcessingMessageProduced) += 1,
623 event_type = message.variant(),
624 topic = topic_name,
625 platform = platform.unwrap_or(&""),
626 is_segment = bool_to_str(is_segment),
627 has_parent = bool_to_str(has_parent),
628 topic = topic_name,
629 );
630 }
631 KafkaMessage::ReplayRecordingNotChunked(replay) => {
632 let has_video = replay.replay_video.is_some();
633
634 metric!(
635 counter(RelayCounters::ProcessingMessageProduced) += 1,
636 event_type = message.variant(),
637 topic = topic_name,
638 has_video = bool_to_str(has_video),
639 );
640 }
641 message => {
642 metric!(
643 counter(RelayCounters::ProcessingMessageProduced) += 1,
644 event_type = message.variant(),
645 topic = topic_name,
646 );
647 }
648 }
649
650 Ok(())
651 }
652
653 fn produce_attachment(
665 &self,
666 event_id: EventId,
667 project_id: ProjectId,
668 item: &Item,
669 send_individual_attachments: bool,
670 ) -> Result<Option<ChunkedAttachment>, StoreError> {
671 let id = Uuid::new_v4().to_string();
672
673 let mut chunk_index = 0;
674 let payload = item.payload();
675 let size = item.len();
676 let max_chunk_size = self.config.attachment_chunk_size();
677
678 let data = if send_individual_attachments && size < max_chunk_size {
682 (size > 0).then_some(payload)
683 } else {
684 let mut offset = 0;
685 while offset < size {
688 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
689 let chunk_message = AttachmentChunkKafkaMessage {
690 payload: payload.slice(offset..offset + chunk_size),
691 event_id,
692 project_id,
693 id: id.clone(),
694 chunk_index,
695 };
696
697 self.produce(
698 KafkaTopic::Attachments,
699 KafkaMessage::AttachmentChunk(chunk_message),
700 )?;
701 offset += chunk_size;
702 chunk_index += 1;
703 }
704 None
705 };
706
707 let attachment = ChunkedAttachment {
711 id,
712 name: match item.filename() {
713 Some(name) => name.to_owned(),
714 None => UNNAMED_ATTACHMENT.to_owned(),
715 },
716 content_type: item
717 .content_type()
718 .map(|content_type| content_type.as_str().to_owned()),
719 attachment_type: item.attachment_type().cloned().unwrap_or_default(),
720 chunks: chunk_index,
721 data,
722 size: Some(size),
723 rate_limited: Some(item.rate_limited()),
724 };
725
726 if send_individual_attachments {
727 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
728 event_id,
729 project_id,
730 attachment,
731 });
732 self.produce(KafkaTopic::Attachments, message)?;
733 Ok(None)
734 } else {
735 Ok(Some(attachment))
736 }
737 }
738
739 fn produce_user_report(
740 &self,
741 event_id: EventId,
742 project_id: ProjectId,
743 received_at: DateTime<Utc>,
744 item: &Item,
745 ) -> Result<(), StoreError> {
746 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
747 project_id,
748 event_id,
749 start_time: safe_timestamp(received_at),
750 payload: item.payload(),
751 });
752
753 self.produce(KafkaTopic::Attachments, message)
754 }
755
756 fn produce_user_report_v2(
757 &self,
758 event_id: EventId,
759 project_id: ProjectId,
760 received_at: DateTime<Utc>,
761 item: &Item,
762 remote_addr: Option<String>,
763 ) -> Result<(), StoreError> {
764 let message = KafkaMessage::Event(EventKafkaMessage {
765 project_id,
766 event_id,
767 payload: item.payload(),
768 start_time: safe_timestamp(received_at),
769 remote_addr,
770 attachments: vec![],
771 });
772 self.produce(KafkaTopic::Feedback, message)
773 }
774
775 fn send_metric_message(
776 &self,
777 namespace: MetricNamespace,
778 message: MetricKafkaMessage,
779 ) -> Result<(), StoreError> {
780 let topic = match namespace {
781 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
782 MetricNamespace::Unsupported => {
783 relay_log::with_scope(
784 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
785 || relay_log::error!("store service dropping unknown metric usecase"),
786 );
787 return Ok(());
788 }
789 _ => KafkaTopic::MetricsGeneric,
790 };
791 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
792
793 self.produce(topic, KafkaMessage::Metric { headers, message })?;
794 Ok(())
795 }
796
797 fn produce_profile(
798 &self,
799 organization_id: OrganizationId,
800 project_id: ProjectId,
801 key_id: Option<u64>,
802 received_at: DateTime<Utc>,
803 retention_days: u16,
804 item: &Item,
805 ) -> Result<(), StoreError> {
806 let message = ProfileKafkaMessage {
807 organization_id,
808 project_id,
809 key_id,
810 received: safe_timestamp(received_at),
811 retention_days,
812 headers: BTreeMap::from([(
813 "sampled".to_owned(),
814 if item.sampled() { "true" } else { "false" }.to_owned(),
815 )]),
816 payload: item.payload(),
817 };
818 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
819 Ok(())
820 }
821
822 fn produce_replay_event(
823 &self,
824 replay_id: EventId,
825 project_id: ProjectId,
826 received_at: DateTime<Utc>,
827 retention_days: u16,
828 payload: &[u8],
829 ) -> Result<(), StoreError> {
830 let message = ReplayEventKafkaMessage {
831 replay_id,
832 project_id,
833 retention_days,
834 start_time: safe_timestamp(received_at),
835 payload,
836 };
837 self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
838 Ok(())
839 }
840
841 #[allow(clippy::too_many_arguments)]
842 fn produce_replay_recording(
843 &self,
844 event_id: Option<EventId>,
845 scoping: Scoping,
846 payload: &[u8],
847 replay_event: Option<&[u8]>,
848 replay_video: Option<&[u8]>,
849 received_at: DateTime<Utc>,
850 retention: u16,
851 ) -> Result<(), StoreError> {
852 let max_payload_size = self.config.max_replay_message_size();
854
855 let mut payload_size = 2000; payload_size += replay_event.as_ref().map_or(0, |b| b.len());
859 payload_size += replay_video.as_ref().map_or(0, |b| b.len());
860 payload_size += payload.len();
861
862 if payload_size >= max_payload_size {
864 relay_log::debug!("replay_recording over maximum size.");
865 self.outcome_aggregator.send(TrackOutcome {
866 category: DataCategory::Replay,
867 event_id,
868 outcome: Outcome::Invalid(DiscardReason::TooLarge(
869 DiscardItemType::ReplayRecording,
870 )),
871 quantity: 1,
872 remote_addr: None,
873 scoping,
874 timestamp: received_at,
875 });
876 return Ok(());
877 }
878
879 let message =
880 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
881 replay_id: event_id.ok_or(StoreError::NoEventId)?,
882 project_id: scoping.project_id,
883 key_id: scoping.key_id,
884 org_id: scoping.organization_id,
885 received: safe_timestamp(received_at),
886 retention_days: retention,
887 payload,
888 replay_event,
889 replay_video,
890 });
891
892 self.produce(KafkaTopic::ReplayRecordings, message)?;
893
894 Ok(())
895 }
896
897 fn produce_replay_video(
898 &self,
899 event_id: Option<EventId>,
900 scoping: Scoping,
901 payload: Bytes,
902 received_at: DateTime<Utc>,
903 retention: u16,
904 ) -> Result<(), StoreError> {
905 #[derive(Deserialize)]
906 struct VideoEvent<'a> {
907 replay_event: &'a [u8],
908 replay_recording: &'a [u8],
909 replay_video: &'a [u8],
910 }
911
912 let Ok(VideoEvent {
913 replay_video,
914 replay_event,
915 replay_recording,
916 }) = rmp_serde::from_slice::<VideoEvent>(&payload)
917 else {
918 self.outcome_aggregator.send(TrackOutcome {
919 category: DataCategory::Replay,
920 event_id,
921 outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
922 quantity: 1,
923 remote_addr: None,
924 scoping,
925 timestamp: received_at,
926 });
927 return Ok(());
928 };
929
930 self.produce_replay_event(
931 event_id.ok_or(StoreError::NoEventId)?,
932 scoping.project_id,
933 received_at,
934 retention,
935 replay_event,
936 )?;
937
938 self.produce_replay_recording(
939 event_id,
940 scoping,
941 replay_recording,
942 Some(replay_event),
943 Some(replay_video),
944 received_at,
945 retention,
946 )
947 }
948
949 fn produce_check_in(
950 &self,
951 project_id: ProjectId,
952 received_at: DateTime<Utc>,
953 client: Option<&str>,
954 retention_days: u16,
955 item: &Item,
956 ) -> Result<(), StoreError> {
957 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
958 message_type: CheckInMessageType::CheckIn,
959 project_id,
960 retention_days,
961 start_time: safe_timestamp(received_at),
962 sdk: client.map(str::to_owned),
963 payload: item.payload(),
964 routing_key_hint: item.routing_hint(),
965 });
966
967 self.produce(KafkaTopic::Monitors, message)?;
968
969 Ok(())
970 }
971
972 fn produce_span(
973 &self,
974 scoping: Scoping,
975 received_at: DateTime<Utc>,
976 event_id: Option<EventId>,
977 retention_days: u16,
978 item: &Item,
979 ) -> Result<(), StoreError> {
980 relay_log::trace!("Producing span");
981
982 let payload = item.payload();
983 let d = &mut Deserializer::from_slice(&payload);
984 let mut span: SpanKafkaMessage = match serde_path_to_error::deserialize(d) {
985 Ok(span) => span,
986 Err(error) => {
987 relay_log::error!(
988 error = &error as &dyn std::error::Error,
989 "failed to parse span"
990 );
991 self.outcome_aggregator.send(TrackOutcome {
992 category: DataCategory::SpanIndexed,
993 event_id: None,
994 outcome: Outcome::Invalid(DiscardReason::InvalidSpan),
995 quantity: 1,
996 remote_addr: None,
997 scoping,
998 timestamp: received_at,
999 });
1000 return Ok(());
1001 }
1002 };
1003
1004 if let Some(measurements) = &mut span.measurements {
1006 measurements.retain(|_, v| v.as_ref().and_then(|v| v.value).is_some());
1007 }
1008
1009 span.backfill_data();
1010 span.duration_ms =
1011 ((span.end_timestamp_precise - span.start_timestamp_precise) * 1e3) as u32;
1012 span.event_id = event_id;
1013 span.organization_id = scoping.organization_id.value();
1014 span.project_id = scoping.project_id.value();
1015 span.retention_days = retention_days;
1016 span.start_timestamp_ms = (span.start_timestamp_precise * 1e3) as u64;
1017
1018 if self.config.produce_protobuf_spans() {
1019 self.inner_produce_protobuf_span(
1020 scoping,
1021 received_at,
1022 event_id,
1023 retention_days,
1024 span.clone(),
1025 )?;
1026 }
1027
1028 if self.config.produce_json_spans() {
1029 self.inner_produce_json_span(scoping, span)?;
1030 }
1031
1032 self.outcome_aggregator.send(TrackOutcome {
1033 category: DataCategory::SpanIndexed,
1034 event_id: None,
1035 outcome: Outcome::Accepted,
1036 quantity: 1,
1037 remote_addr: None,
1038 scoping,
1039 timestamp: received_at,
1040 });
1041
1042 Ok(())
1043 }
1044
1045 fn inner_produce_json_span(
1046 &self,
1047 scoping: Scoping,
1048 span: SpanKafkaMessage,
1049 ) -> Result<(), StoreError> {
1050 self.produce(
1051 KafkaTopic::Spans,
1052 KafkaMessage::Span {
1053 headers: BTreeMap::from([(
1054 "project_id".to_owned(),
1055 scoping.project_id.to_string(),
1056 )]),
1057 message: span,
1058 },
1059 )?;
1060
1061 Ok(())
1062 }
1063
1064 fn inner_produce_protobuf_span(
1065 &self,
1066 scoping: Scoping,
1067 received_at: DateTime<Utc>,
1068 event_id: Option<EventId>,
1069 retention_days: u16,
1070 span: SpanKafkaMessage,
1071 ) -> Result<(), StoreError> {
1072 let mut trace_item = TraceItem {
1073 item_type: TraceItemType::Span.into(),
1074 organization_id: scoping.organization_id.value(),
1075 project_id: scoping.project_id.value(),
1076 received: Some(Timestamp {
1077 seconds: safe_timestamp(received_at) as i64,
1078 nanos: 0,
1079 }),
1080 retention_days: retention_days.into(),
1081 timestamp: Some(Timestamp {
1082 seconds: span.start_timestamp_precise as i64,
1083 nanos: 0,
1084 }),
1085 trace_id: span.trace_id.to_string(),
1086 item_id: u128::from_str_radix(&span.span_id, 16)
1087 .unwrap_or_default()
1088 .to_le_bytes()
1089 .to_vec(),
1090 attributes: Default::default(),
1091 client_sample_rate: span.client_sample_rate.unwrap_or_default(),
1092 server_sample_rate: span.server_sample_rate.unwrap_or_default(),
1093 };
1094
1095 if let Some(data) = span.data {
1096 for (key, raw_value) in data {
1097 let Some(raw_value) = raw_value else {
1098 continue;
1099 };
1100
1101 let json_value = match Deserialize::deserialize(raw_value) {
1102 Ok(v) => v,
1103 Err(error) => {
1104 relay_log::error!(
1108 error = &error as &dyn std::error::Error,
1109 raw_value = %raw_value,
1110 "failed to parse JSON value"
1111 );
1112 continue;
1113 }
1114 };
1115
1116 let any_value = match json_value {
1117 JsonValue::String(string) => AnyValue {
1118 value: Some(Value::StringValue(string)),
1119 },
1120 JsonValue::Number(number) => {
1121 if number.is_i64() || number.is_u64() {
1122 AnyValue {
1123 value: Some(Value::IntValue(number.as_i64().unwrap_or_default())),
1124 }
1125 } else {
1126 AnyValue {
1127 value: Some(Value::DoubleValue(
1128 number.as_f64().unwrap_or_default(),
1129 )),
1130 }
1131 }
1132 }
1133 JsonValue::Bool(bool) => AnyValue {
1134 value: Some(Value::BoolValue(bool)),
1135 },
1136 JsonValue::Array(array) => AnyValue {
1137 value: Some(Value::StringValue(
1138 serde_json::to_string(&array).unwrap_or_default(),
1139 )),
1140 },
1141 JsonValue::Object(object) => AnyValue {
1142 value: Some(Value::StringValue(
1143 serde_json::to_string(&object).unwrap_or_default(),
1144 )),
1145 },
1146 _ => continue,
1147 };
1148
1149 trace_item.attributes.insert(key.into(), any_value);
1150 }
1151 }
1152
1153 if let Some(description) = span.description {
1154 trace_item.attributes.insert(
1155 "sentry.raw_description".into(),
1156 AnyValue {
1157 value: Some(Value::StringValue(description.into())),
1158 },
1159 );
1160 }
1161
1162 trace_item.attributes.insert(
1163 "sentry.duration_ms".into(),
1164 AnyValue {
1165 value: Some(Value::IntValue(span.duration_ms.into())),
1166 },
1167 );
1168
1169 if let Some(event_id) = event_id {
1170 trace_item.attributes.insert(
1171 "sentry.event_id".into(),
1172 AnyValue {
1173 value: Some(Value::StringValue(event_id.0.as_simple().to_string())),
1174 },
1175 );
1176 }
1177
1178 trace_item.attributes.insert(
1179 "sentry.is_segment".into(),
1180 AnyValue {
1181 value: Some(Value::BoolValue(span.is_segment)),
1182 },
1183 );
1184
1185 trace_item.attributes.insert(
1186 "sentry.exclusive_time_ms".into(),
1187 AnyValue {
1188 value: Some(Value::DoubleValue(span.exclusive_time_ms)),
1189 },
1190 );
1191
1192 trace_item.attributes.insert(
1193 "sentry.start_timestamp_precise".into(),
1194 AnyValue {
1195 value: Some(Value::DoubleValue(span.start_timestamp_precise)),
1196 },
1197 );
1198
1199 trace_item.attributes.insert(
1200 "sentry.end_timestamp_precise".into(),
1201 AnyValue {
1202 value: Some(Value::DoubleValue(span.end_timestamp_precise)),
1203 },
1204 );
1205
1206 trace_item.attributes.insert(
1207 "sentry.start_timestamp_ms".into(),
1208 AnyValue {
1209 value: Some(Value::IntValue(span.start_timestamp_ms as i64)),
1210 },
1211 );
1212
1213 trace_item.attributes.insert(
1214 "sentry.is_remote".into(),
1215 AnyValue {
1216 value: Some(Value::BoolValue(span.is_remote)),
1217 },
1218 );
1219
1220 if let Some(parent_span_id) = span.parent_span_id {
1221 trace_item.attributes.insert(
1222 "sentry.parent_span_id".into(),
1223 AnyValue {
1224 value: Some(Value::StringValue(parent_span_id.into_owned())),
1225 },
1226 );
1227 }
1228
1229 if let Some(profile_id) = span.profile_id {
1230 trace_item.attributes.insert(
1231 "sentry.profile_id".into(),
1232 AnyValue {
1233 value: Some(Value::StringValue(profile_id.into_owned())),
1234 },
1235 );
1236 }
1237
1238 if let Some(segment_id) = span.segment_id {
1239 trace_item.attributes.insert(
1240 "sentry.segment_id".into(),
1241 AnyValue {
1242 value: Some(Value::StringValue(segment_id.into_owned())),
1243 },
1244 );
1245 }
1246
1247 if let Some(origin) = span.origin {
1248 trace_item.attributes.insert(
1249 "sentry.origin".into(),
1250 AnyValue {
1251 value: Some(Value::StringValue(origin.into_owned())),
1252 },
1253 );
1254 }
1255
1256 if let Some(kind) = span.kind {
1257 trace_item.attributes.insert(
1258 "sentry.kind".into(),
1259 AnyValue {
1260 value: Some(Value::StringValue(kind.into_owned())),
1261 },
1262 );
1263 }
1264
1265 self.produce(
1266 KafkaTopic::Items,
1267 KafkaMessage::Item {
1268 headers: BTreeMap::from([
1269 (
1270 "item_type".to_owned(),
1271 (TraceItemType::Span as i32).to_string(),
1272 ),
1273 ("project_id".to_owned(), scoping.project_id.to_string()),
1274 ]),
1275 item_type: TraceItemType::Span,
1276 message: trace_item,
1277 },
1278 )?;
1279
1280 Ok(())
1281 }
1282
1283 fn produce_log(
1284 &self,
1285 scoping: Scoping,
1286 received_at: DateTime<Utc>,
1287 retention_days: u16,
1288 item: &Item,
1289 ) -> Result<(), StoreError> {
1290 relay_log::trace!("Producing log");
1291
1292 let payload = item.payload();
1293 let d = &mut Deserializer::from_slice(&payload);
1294 let logs: LogKafkaMessages = match serde_path_to_error::deserialize(d) {
1295 Ok(logs) => logs,
1296 Err(error) => {
1297 relay_log::error!(
1298 error = &error as &dyn std::error::Error,
1299 "failed to parse log"
1300 );
1301 self.outcome_aggregator.send(TrackOutcome {
1302 category: DataCategory::LogItem,
1303 event_id: None,
1304 outcome: Outcome::Invalid(DiscardReason::InvalidLog),
1305 quantity: 1,
1306 remote_addr: None,
1307 scoping,
1308 timestamp: received_at,
1309 });
1310 self.outcome_aggregator.send(TrackOutcome {
1311 category: DataCategory::LogByte,
1312 event_id: None,
1313 outcome: Outcome::Invalid(DiscardReason::InvalidLog),
1314 quantity: payload.len() as u32,
1315 remote_addr: None,
1316 scoping,
1317 timestamp: received_at,
1318 });
1319 return Ok(());
1320 }
1321 };
1322
1323 let num_logs = logs.items.len() as u32;
1324 for log in logs.items {
1325 let timestamp_seconds = log.timestamp as i64;
1326 let timestamp_nanos = (log.timestamp.fract() * 1e9) as u32;
1327 let item_id = u128::from_be_bytes(
1328 *Uuid::new_v7(uuid::Timestamp::from_unix(
1329 uuid::NoContext,
1330 timestamp_seconds as u64,
1331 timestamp_nanos,
1332 ))
1333 .as_bytes(),
1334 )
1335 .to_le_bytes()
1336 .to_vec();
1337 let mut trace_item = TraceItem {
1338 item_type: TraceItemType::Log.into(),
1339 organization_id: scoping.organization_id.value(),
1340 project_id: scoping.project_id.value(),
1341 received: Some(Timestamp {
1342 seconds: safe_timestamp(received_at) as i64,
1343 nanos: 0,
1344 }),
1345 retention_days: retention_days.into(),
1346 timestamp: Some(Timestamp {
1347 seconds: timestamp_seconds,
1348 nanos: 0,
1349 }),
1350 trace_id: log.trace_id.to_string(),
1351 item_id,
1352 attributes: Default::default(),
1353 client_sample_rate: 1.0,
1354 server_sample_rate: 1.0,
1355 };
1356
1357 for (name, attribute) in log.attributes.unwrap_or_default() {
1358 if let Some(attribute_value) = attribute {
1359 if let Some(v) = attribute_value.value {
1360 let any_value = match v {
1361 LogAttributeValue::String(value) => AnyValue {
1362 value: Some(Value::StringValue(value)),
1363 },
1364 LogAttributeValue::Int(value) => AnyValue {
1365 value: Some(Value::IntValue(value)),
1366 },
1367 LogAttributeValue::Bool(value) => AnyValue {
1368 value: Some(Value::BoolValue(value)),
1369 },
1370 LogAttributeValue::Double(value) => AnyValue {
1371 value: Some(Value::DoubleValue(value)),
1372 },
1373 LogAttributeValue::Unknown(_) => continue,
1374 };
1375
1376 trace_item.attributes.insert(name.into(), any_value);
1377 }
1378 }
1379 }
1380
1381 let message = KafkaMessage::Item {
1382 item_type: TraceItemType::Log,
1383 headers: BTreeMap::from([
1384 ("project_id".to_owned(), scoping.project_id.to_string()),
1385 (
1386 "item_type".to_owned(),
1387 (TraceItemType::Log as i32).to_string(),
1388 ),
1389 ]),
1390 message: trace_item,
1391 };
1392
1393 self.produce(KafkaTopic::Items, message)?;
1394 }
1395
1396 self.outcome_aggregator.send(TrackOutcome {
1398 category: DataCategory::LogItem,
1399 event_id: None,
1400 outcome: Outcome::Accepted,
1401 quantity: num_logs,
1402 remote_addr: None,
1403 scoping,
1404 timestamp: received_at,
1405 });
1406 self.outcome_aggregator.send(TrackOutcome {
1407 category: DataCategory::LogByte,
1408 event_id: None,
1409 outcome: Outcome::Accepted,
1410 quantity: payload.len() as u32,
1411 remote_addr: None,
1412 scoping,
1413 timestamp: received_at,
1414 });
1415
1416 Ok(())
1417 }
1418
1419 fn produce_profile_chunk(
1420 &self,
1421 organization_id: OrganizationId,
1422 project_id: ProjectId,
1423 received_at: DateTime<Utc>,
1424 retention_days: u16,
1425 item: &Item,
1426 ) -> Result<(), StoreError> {
1427 let message = ProfileChunkKafkaMessage {
1428 organization_id,
1429 project_id,
1430 received: safe_timestamp(received_at),
1431 retention_days,
1432 payload: item.payload(),
1433 };
1434 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
1435 Ok(())
1436 }
1437}
1438
1439impl Service for StoreService {
1440 type Interface = Store;
1441
1442 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1443 let this = Arc::new(self);
1444
1445 relay_log::info!("store forwarder started");
1446
1447 while let Some(message) = rx.recv().await {
1448 let service = Arc::clone(&this);
1449 this.pool
1452 .spawn_async(async move { service.handle_message(message) }.boxed())
1453 .await;
1454 }
1455
1456 relay_log::info!("store forwarder stopped");
1457 }
1458}
1459
1460#[derive(Debug, Serialize)]
1462struct ChunkedAttachment {
1463 id: String,
1467
1468 name: String,
1470
1471 #[serde(skip_serializing_if = "Option::is_none")]
1473 content_type: Option<String>,
1474
1475 #[serde(serialize_with = "serialize_attachment_type")]
1477 attachment_type: AttachmentType,
1478
1479 chunks: usize,
1482
1483 #[serde(skip_serializing_if = "Option::is_none")]
1486 data: Option<Bytes>,
1487
1488 #[serde(skip_serializing_if = "Option::is_none")]
1490 size: Option<usize>,
1491
1492 #[serde(skip_serializing_if = "Option::is_none")]
1499 rate_limited: Option<bool>,
1500}
1501
1502fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1508where
1509 S: serde::Serializer,
1510 T: serde::Serialize,
1511{
1512 serde_json::to_value(t)
1513 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1514 .serialize(serializer)
1515}
1516
1517fn serialize_btreemap_skip_nulls<K, S, T>(
1518 map: &Option<BTreeMap<K, Option<T>>>,
1519 serializer: S,
1520) -> Result<S::Ok, S::Error>
1521where
1522 K: Serialize,
1523 S: serde::Serializer,
1524 T: serde::Serialize,
1525{
1526 let Some(map) = map else {
1527 return serializer.serialize_none();
1528 };
1529 let mut m = serializer.serialize_map(Some(map.len()))?;
1530 for (key, value) in map.iter() {
1531 if let Some(value) = value {
1532 m.serialize_entry(key, value)?;
1533 }
1534 }
1535 m.end()
1536}
1537
1538#[derive(Debug, Serialize)]
1540struct EventKafkaMessage {
1541 payload: Bytes,
1543 start_time: u64,
1545 event_id: EventId,
1547 project_id: ProjectId,
1549 remote_addr: Option<String>,
1551 attachments: Vec<ChunkedAttachment>,
1553}
1554
1555#[derive(Debug, Serialize)]
1556struct ReplayEventKafkaMessage<'a> {
1557 payload: &'a [u8],
1559 start_time: u64,
1561 replay_id: EventId,
1563 project_id: ProjectId,
1565 retention_days: u16,
1567}
1568
1569#[derive(Debug, Serialize)]
1571struct AttachmentChunkKafkaMessage {
1572 payload: Bytes,
1574 event_id: EventId,
1576 project_id: ProjectId,
1578 id: String,
1582 chunk_index: usize,
1584}
1585
1586#[derive(Debug, Serialize)]
1591struct AttachmentKafkaMessage {
1592 event_id: EventId,
1594 project_id: ProjectId,
1596 attachment: ChunkedAttachment,
1598}
1599
1600#[derive(Debug, Serialize)]
1601struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1602 replay_id: EventId,
1603 key_id: Option<u64>,
1604 org_id: OrganizationId,
1605 project_id: ProjectId,
1606 received: u64,
1607 retention_days: u16,
1608 #[serde(with = "serde_bytes")]
1609 payload: &'a [u8],
1610 #[serde(with = "serde_bytes")]
1611 replay_event: Option<&'a [u8]>,
1612 #[serde(with = "serde_bytes")]
1613 replay_video: Option<&'a [u8]>,
1614}
1615
1616#[derive(Debug, Serialize)]
1620struct UserReportKafkaMessage {
1621 project_id: ProjectId,
1623 start_time: u64,
1624 payload: Bytes,
1625
1626 #[serde(skip)]
1628 event_id: EventId,
1629}
1630
1631#[derive(Clone, Debug, Serialize)]
1632struct MetricKafkaMessage<'a> {
1633 org_id: OrganizationId,
1634 project_id: ProjectId,
1635 name: &'a MetricName,
1636 #[serde(flatten)]
1637 value: MetricValue<'a>,
1638 timestamp: UnixTimestamp,
1639 tags: &'a BTreeMap<String, String>,
1640 retention_days: u16,
1641 #[serde(skip_serializing_if = "Option::is_none")]
1642 received_at: Option<UnixTimestamp>,
1643}
1644
1645#[derive(Clone, Debug, Serialize)]
1646#[serde(tag = "type", content = "value")]
1647enum MetricValue<'a> {
1648 #[serde(rename = "c")]
1649 Counter(FiniteF64),
1650 #[serde(rename = "d")]
1651 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1652 #[serde(rename = "s")]
1653 Set(ArrayEncoding<'a, SetView<'a>>),
1654 #[serde(rename = "g")]
1655 Gauge(GaugeValue),
1656}
1657
1658impl MetricValue<'_> {
1659 fn variant(&self) -> &'static str {
1660 match self {
1661 Self::Counter(_) => "counter",
1662 Self::Distribution(_) => "distribution",
1663 Self::Set(_) => "set",
1664 Self::Gauge(_) => "gauge",
1665 }
1666 }
1667
1668 fn encoding(&self) -> Option<&'static str> {
1669 match self {
1670 Self::Distribution(ae) => Some(ae.name()),
1671 Self::Set(ae) => Some(ae.name()),
1672 _ => None,
1673 }
1674 }
1675}
1676
1677#[derive(Clone, Debug, Serialize)]
1678struct ProfileKafkaMessage {
1679 organization_id: OrganizationId,
1680 project_id: ProjectId,
1681 key_id: Option<u64>,
1682 received: u64,
1683 retention_days: u16,
1684 #[serde(skip)]
1685 headers: BTreeMap<String, String>,
1686 payload: Bytes,
1687}
1688
1689#[allow(dead_code)]
1695#[derive(Debug, Serialize)]
1696#[serde(rename_all = "snake_case")]
1697enum CheckInMessageType {
1698 ClockPulse,
1699 CheckIn,
1700}
1701
1702#[derive(Debug, Serialize)]
1703struct CheckInKafkaMessage {
1704 #[serde(skip)]
1705 routing_key_hint: Option<Uuid>,
1706
1707 message_type: CheckInMessageType,
1709 payload: Bytes,
1711 start_time: u64,
1713 sdk: Option<String>,
1715 project_id: ProjectId,
1717 retention_days: u16,
1719}
1720
1721#[derive(Debug, Deserialize, Serialize, Clone)]
1722struct SpanLink<'a> {
1723 pub trace_id: &'a str,
1724 pub span_id: &'a str,
1725 #[serde(default, skip_serializing_if = "Option::is_none")]
1726 pub sampled: Option<bool>,
1727 #[serde(borrow)]
1728 pub attributes: Option<&'a RawValue>,
1729}
1730
1731#[derive(Debug, Deserialize, Serialize, Clone)]
1732struct SpanMeasurement<'a> {
1733 #[serde(skip_serializing_if = "Option::is_none", borrow)]
1734 value: Option<&'a RawValue>,
1735}
1736
1737#[derive(Debug, Deserialize, Serialize, Clone)]
1738struct SpanKafkaMessage<'a> {
1739 #[serde(skip_serializing_if = "Option::is_none", borrow)]
1740 description: Option<Cow<'a, str>>,
1741 #[serde(default)]
1742 duration_ms: u32,
1743 #[serde(default, skip_serializing_if = "Option::is_none")]
1745 event_id: Option<EventId>,
1746 #[serde(rename(deserialize = "exclusive_time"))]
1747 exclusive_time_ms: f64,
1748 #[serde(default)]
1749 is_segment: bool,
1750 #[serde(default)]
1751 is_remote: bool,
1752
1753 #[serde(skip_serializing_if = "none_or_empty_map", borrow)]
1754 data: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1755 #[serde(default, skip_serializing_if = "Option::is_none")]
1756 kind: Option<Cow<'a, str>>,
1757 #[serde(default, skip_serializing_if = "none_or_empty_vec")]
1758 links: Option<Vec<SpanLink<'a>>>,
1759 #[serde(borrow, default, skip_serializing_if = "Option::is_none")]
1760 measurements: Option<BTreeMap<Cow<'a, str>, Option<SpanMeasurement<'a>>>>,
1761 #[serde(default)]
1762 organization_id: u64,
1763 #[serde(borrow, default, skip_serializing_if = "Option::is_none")]
1764 origin: Option<Cow<'a, str>>,
1765 #[serde(default, skip_serializing_if = "Option::is_none")]
1766 parent_span_id: Option<Cow<'a, str>>,
1767 #[serde(default, skip_serializing_if = "Option::is_none")]
1768 profile_id: Option<Cow<'a, str>>,
1769 #[serde(default)]
1771 project_id: u64,
1772 received: f64,
1774 #[serde(default)]
1776 retention_days: u16,
1777 #[serde(default, skip_serializing_if = "Option::is_none")]
1778 segment_id: Option<Cow<'a, str>>,
1779 #[serde(
1780 default,
1781 skip_serializing_if = "Option::is_none",
1782 serialize_with = "serialize_btreemap_skip_nulls"
1783 )]
1784 #[serde(borrow)]
1785 sentry_tags: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1786 span_id: Cow<'a, str>,
1787 #[serde(skip_serializing_if = "none_or_empty_map", borrow)]
1788 tags: Option<BTreeMap<Cow<'a, str>, Option<&'a RawValue>>>,
1789 trace_id: EventId,
1790
1791 #[serde(default)]
1792 start_timestamp_ms: u64,
1793 #[serde(rename(deserialize = "start_timestamp"))]
1794 start_timestamp_precise: f64,
1795 #[serde(rename(deserialize = "timestamp"))]
1796 end_timestamp_precise: f64,
1797
1798 #[serde(borrow, default, skip_serializing)]
1799 platform: Cow<'a, str>, #[serde(default, skip_serializing_if = "Option::is_none")]
1802 client_sample_rate: Option<f64>,
1803 #[serde(default, skip_serializing_if = "Option::is_none")]
1804 server_sample_rate: Option<f64>,
1805
1806 #[serde(
1807 default,
1808 rename = "_meta",
1809 skip_serializing_if = "none_or_empty_object"
1810 )]
1811 meta: Option<&'a RawValue>,
1812
1813 #[serde(default, skip_serializing_if = "Option::is_none")]
1814 _performance_issues_spans: Option<bool>,
1815}
1816
1817impl SpanKafkaMessage<'_> {
1818 fn backfill_data(&mut self) {
1836 let data = self.data.get_or_insert_default();
1837
1838 if let Some(measurements) = &self.measurements {
1839 for (key, value) in measurements {
1840 let Some(value) = value.as_ref().and_then(|v| v.value) else {
1841 continue;
1842 };
1843
1844 match key.as_ref() {
1845 "client_sample_rate" => {
1846 if let Ok(client_sample_rate) = Deserialize::deserialize(value) {
1847 self.client_sample_rate = Some(client_sample_rate);
1848 }
1849 }
1850 "server_sample_rate" => {
1851 if let Ok(server_sample_rate) = Deserialize::deserialize(value) {
1852 self.server_sample_rate = Some(server_sample_rate);
1853 }
1854 }
1855 _ => {
1856 data.entry(key.clone()).or_insert(Some(value));
1857 }
1858 }
1859 }
1860 }
1861
1862 if let Some(tags) = &self.tags {
1863 for (key, value) in tags {
1864 let Some(value) = value else {
1865 continue;
1866 };
1867
1868 let key = if *key == "description" {
1869 Cow::Borrowed("sentry.normalized_description")
1870 } else {
1871 key.clone()
1872 };
1873
1874 data.entry(key).or_insert(Some(value));
1875 }
1876 }
1877
1878 if let Some(sentry_tags) = &self.sentry_tags {
1879 for (key, value) in sentry_tags {
1880 let Some(value) = value else {
1881 continue;
1882 };
1883
1884 let key = if *key == "description" {
1885 Cow::Borrowed("sentry.normalized_description")
1886 } else {
1887 Cow::Owned(format!("sentry.{key}"))
1888 };
1889
1890 data.entry(key).or_insert(Some(value));
1891 }
1892 }
1893 }
1894}
1895
1896#[derive(Clone, Debug, Deserialize)]
1897#[serde(tag = "type", content = "value")]
1898enum LogAttributeValue {
1899 #[serde(rename = "string")]
1900 String(String),
1901 #[serde(rename = "boolean")]
1902 Bool(bool),
1903 #[serde(rename = "integer")]
1904 Int(i64),
1905 #[serde(rename = "double")]
1906 Double(f64),
1907 #[serde(rename = "unknown")]
1908 Unknown(()),
1909}
1910
1911#[derive(Debug, Deserialize)]
1913#[allow(dead_code)]
1914struct LogAttribute {
1915 #[serde(flatten)]
1916 value: Option<LogAttributeValue>,
1917}
1918
1919#[derive(Debug, Deserialize)]
1920struct LogKafkaMessages<'a> {
1921 #[serde(borrow)]
1922 items: Vec<LogKafkaMessage<'a>>,
1923}
1924
1925#[derive(Debug, Deserialize)]
1926struct LogKafkaMessage<'a> {
1927 trace_id: EventId,
1928 #[serde(default)]
1929 timestamp: f64,
1930 #[serde(borrow, default)]
1931 attributes: Option<BTreeMap<&'a str, Option<LogAttribute>>>,
1932}
1933
1934fn none_or_empty_object(value: &Option<&RawValue>) -> bool {
1935 match value {
1936 None => true,
1937 Some(raw) => raw.get() == "{}",
1938 }
1939}
1940
1941fn none_or_empty_vec<T>(value: &Option<Vec<T>>) -> bool {
1942 match &value {
1943 Some(vec) => vec.is_empty(),
1944 None => true,
1945 }
1946}
1947
1948fn none_or_empty_map<S, T>(value: &Option<BTreeMap<S, T>>) -> bool {
1949 value.as_ref().is_none_or(BTreeMap::is_empty)
1950}
1951
1952#[derive(Clone, Debug, Serialize)]
1953struct ProfileChunkKafkaMessage {
1954 organization_id: OrganizationId,
1955 project_id: ProjectId,
1956 received: u64,
1957 retention_days: u16,
1958 payload: Bytes,
1959}
1960
1961#[derive(Debug, Serialize)]
1963#[serde(tag = "type", rename_all = "snake_case")]
1964#[allow(clippy::large_enum_variant)]
1965enum KafkaMessage<'a> {
1966 Event(EventKafkaMessage),
1967 Attachment(AttachmentKafkaMessage),
1968 AttachmentChunk(AttachmentChunkKafkaMessage),
1969 UserReport(UserReportKafkaMessage),
1970 Metric {
1971 #[serde(skip)]
1972 headers: BTreeMap<String, String>,
1973 #[serde(flatten)]
1974 message: MetricKafkaMessage<'a>,
1975 },
1976 Profile(ProfileKafkaMessage),
1977 ReplayEvent(ReplayEventKafkaMessage<'a>),
1978 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1979 CheckIn(CheckInKafkaMessage),
1980 Item {
1981 #[serde(skip)]
1982 headers: BTreeMap<String, String>,
1983 #[serde(skip)]
1984 item_type: TraceItemType,
1985 #[serde(skip)]
1986 message: TraceItem,
1987 },
1988 Span {
1989 #[serde(skip)]
1990 headers: BTreeMap<String, String>,
1991 #[serde(flatten)]
1992 message: SpanKafkaMessage<'a>,
1993 },
1994 ProfileChunk(ProfileChunkKafkaMessage),
1995}
1996
1997impl Message for KafkaMessage<'_> {
1998 fn variant(&self) -> &'static str {
1999 match self {
2000 KafkaMessage::Event(_) => "event",
2001 KafkaMessage::Attachment(_) => "attachment",
2002 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
2003 KafkaMessage::UserReport(_) => "user_report",
2004 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
2005 MetricNamespace::Sessions => "metric_sessions",
2006 MetricNamespace::Transactions => "metric_transactions",
2007 MetricNamespace::Spans => "metric_spans",
2008 MetricNamespace::Custom => "metric_custom",
2009 MetricNamespace::Stats => "metric_metric_stats",
2010 MetricNamespace::Unsupported => "metric_unsupported",
2011 },
2012 KafkaMessage::Profile(_) => "profile",
2013 KafkaMessage::ReplayEvent(_) => "replay_event",
2014 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
2015 KafkaMessage::CheckIn(_) => "check_in",
2016 KafkaMessage::Span { .. } => "span",
2017 KafkaMessage::ProfileChunk(_) => "profile_chunk",
2018 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
2019 }
2020 }
2021
2022 fn key(&self) -> Option<[u8; 16]> {
2024 match self {
2025 Self::Event(message) => Some(message.event_id.0),
2026 Self::Attachment(message) => Some(message.event_id.0),
2027 Self::AttachmentChunk(message) => Some(message.event_id.0),
2028 Self::UserReport(message) => Some(message.event_id.0),
2029 Self::ReplayEvent(message) => Some(message.replay_id.0),
2030 Self::Span { message, .. } => Some(message.trace_id.0),
2031
2032 Self::CheckIn(message) => message.routing_key_hint,
2037
2038 Self::Metric { .. } => Some(Uuid::new_v4()),
2045
2046 Self::Profile(_)
2048 | Self::ReplayRecordingNotChunked(_)
2049 | Self::ProfileChunk(_)
2050 | Self::Item { .. } => None,
2051 }
2052 .filter(|uuid| !uuid.is_nil())
2053 .map(|uuid| uuid.into_bytes())
2054 }
2055
2056 fn headers(&self) -> Option<&BTreeMap<String, String>> {
2057 match &self {
2058 KafkaMessage::Metric { headers, .. }
2059 | KafkaMessage::Span { headers, .. }
2060 | KafkaMessage::Item { headers, .. } => {
2061 if !headers.is_empty() {
2062 return Some(headers);
2063 }
2064 None
2065 }
2066 KafkaMessage::Profile(profile) => {
2067 if !profile.headers.is_empty() {
2068 return Some(&profile.headers);
2069 }
2070 None
2071 }
2072 _ => None,
2073 }
2074 }
2075
2076 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
2077 match self {
2078 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
2079 KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
2080 KafkaMessage::Span { message, .. } => serialize_as_json(message),
2081 KafkaMessage::Item { message, .. } => {
2082 let mut payload = Vec::new();
2083 match message.encode(&mut payload) {
2084 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
2085 Err(_) => Err(ClientError::ProtobufEncodingFailed),
2086 }
2087 }
2088 _ => match rmp_serde::to_vec_named(&self) {
2089 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
2090 Err(err) => Err(ClientError::InvalidMsgPack(err)),
2091 },
2092 }
2093 }
2094}
2095
2096fn serialize_as_json<T: serde::Serialize>(
2097 value: &T,
2098) -> Result<SerializationOutput<'_>, ClientError> {
2099 match serde_json::to_vec(value) {
2100 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
2101 Err(err) => Err(ClientError::InvalidJson(err)),
2102 }
2103}
2104
2105fn is_slow_item(item: &Item) -> bool {
2109 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
2110}
2111
2112fn bool_to_str(value: bool) -> &'static str {
2113 if value { "true" } else { "false" }
2114}
2115
2116fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
2120 let ts = timestamp.timestamp();
2121 if ts >= 0 {
2122 return ts as u64;
2123 }
2124
2125 Utc::now().timestamp() as u64
2127}
2128
2129#[cfg(test)]
2130mod tests {
2131 use super::*;
2132
2133 #[test]
2134 fn disallow_outcomes() {
2135 let config = Config::default();
2136 let producer = Producer::create(&config).unwrap();
2137
2138 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
2139 let res = producer
2140 .client
2141 .send(topic, Some(*b"0123456789abcdef"), None, "foo", b"");
2142
2143 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
2144 }
2145 }
2146}