1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::Serialize;
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
42use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
43use crate::utils::{self, FormDataIter};
44
45mod sessions;
46
47const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
49
50#[derive(Debug, thiserror::Error)]
51pub enum StoreError {
52 #[error("failed to send the message to kafka: {0}")]
53 SendFailed(#[from] ClientError),
54 #[error("failed to encode data: {0}")]
55 EncodingFailed(std::io::Error),
56 #[error("failed to store event because event id was missing")]
57 NoEventId,
58}
59
60impl OutcomeError for StoreError {
61 type Error = Self;
62
63 fn consume(self) -> (Option<Outcome>, Self::Error) {
64 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
65 }
66}
67
68struct Producer {
69 client: KafkaClient,
70}
71
72impl Producer {
73 pub fn create(config: &Config) -> anyhow::Result<Self> {
74 let mut client_builder = KafkaClient::builder();
75
76 for topic in KafkaTopic::iter().filter(|t| {
77 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
80 }) {
81 let kafka_configs = config.kafka_configs(*topic)?;
82 client_builder = client_builder
83 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
84 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
85 }
86
87 Ok(Self {
88 client: client_builder.build(),
89 })
90 }
91}
92
93#[derive(Debug)]
95pub struct StoreEnvelope {
96 pub envelope: ManagedEnvelope,
97}
98
99#[derive(Clone, Debug)]
101pub struct StoreMetrics {
102 pub buckets: Vec<Bucket>,
103 pub scoping: Scoping,
104 pub retention: u16,
105}
106
107#[derive(Debug)]
109pub struct StoreTraceItem {
110 pub trace_item: TraceItem,
112}
113
114impl Counted for StoreTraceItem {
115 fn quantities(&self) -> Quantities {
116 self.trace_item.quantities()
117 }
118}
119
120#[derive(Debug)]
122pub struct StoreSpanV2 {
123 pub routing_key: Option<Uuid>,
125 pub retention_days: u16,
127 pub downsampled_retention_days: u16,
129 pub item: SpanV2,
131}
132
133impl Counted for StoreSpanV2 {
134 fn quantities(&self) -> Quantities {
135 smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
136 }
137}
138
139#[derive(Debug)]
141pub struct StoreProfileChunk {
142 pub retention_days: u16,
144 pub payload: Bytes,
146 pub quantities: Quantities,
150}
151
152impl Counted for StoreProfileChunk {
153 fn quantities(&self) -> Quantities {
154 self.quantities.clone()
155 }
156}
157
158#[derive(Debug)]
160pub struct StoreReplay {
161 pub event_id: EventId,
163 pub retention_days: u16,
165 pub recording: Bytes,
167 pub event: Option<Bytes>,
169 pub video: Option<Bytes>,
171 pub quantities: Quantities,
175}
176
177impl Counted for StoreReplay {
178 fn quantities(&self) -> Quantities {
179 self.quantities.clone()
180 }
181}
182
183#[derive(Debug)]
185pub struct StoreAttachment {
186 pub event_id: EventId,
188 pub attachment: Item,
190 pub quantities: Quantities,
192}
193
194impl Counted for StoreAttachment {
195 fn quantities(&self) -> Quantities {
196 self.quantities.clone()
197 }
198}
199
200#[derive(Debug)]
202pub struct StoreUserReport {
203 pub event_id: EventId,
205 pub report: Item,
207}
208
209impl Counted for StoreUserReport {
210 fn quantities(&self) -> Quantities {
211 smallvec::smallvec![(DataCategory::UserReportV2, 1)]
212 }
213}
214
215#[derive(Debug)]
217pub struct StoreProfile {
218 pub retention_days: u16,
220 pub profile: Item,
222 pub quantities: Quantities,
224}
225
226impl Counted for StoreProfile {
227 fn quantities(&self) -> Quantities {
228 self.quantities.clone()
229 }
230}
231
232pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
234
235#[derive(Debug)]
237pub enum Store {
238 Envelope(StoreEnvelope),
246 Metrics(StoreMetrics),
248 TraceItem(Managed<StoreTraceItem>),
250 Span(Managed<Box<StoreSpanV2>>),
252 ProfileChunk(Managed<StoreProfileChunk>),
254 Replay(Managed<StoreReplay>),
256 Attachment(Managed<StoreAttachment>),
258 UserReport(Managed<StoreUserReport>),
260 Profile(Managed<StoreProfile>),
262}
263
264impl Store {
265 fn variant(&self) -> &'static str {
267 match self {
268 Store::Envelope(_) => "envelope",
269 Store::Metrics(_) => "metrics",
270 Store::TraceItem(_) => "trace_item",
271 Store::Span(_) => "span",
272 Store::ProfileChunk(_) => "profile_chunk",
273 Store::Replay(_) => "replay",
274 Store::Attachment(_) => "attachment",
275 Store::UserReport(_) => "user_report",
276 Store::Profile(_) => "profile",
277 }
278 }
279}
280
281impl Interface for Store {}
282
283impl FromMessage<StoreEnvelope> for Store {
284 type Response = NoResponse;
285
286 fn from_message(message: StoreEnvelope, _: ()) -> Self {
287 Self::Envelope(message)
288 }
289}
290
291impl FromMessage<StoreMetrics> for Store {
292 type Response = NoResponse;
293
294 fn from_message(message: StoreMetrics, _: ()) -> Self {
295 Self::Metrics(message)
296 }
297}
298
299impl FromMessage<Managed<StoreTraceItem>> for Store {
300 type Response = NoResponse;
301
302 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
303 Self::TraceItem(message)
304 }
305}
306
307impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
308 type Response = NoResponse;
309
310 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
311 Self::Span(message)
312 }
313}
314
315impl FromMessage<Managed<StoreProfileChunk>> for Store {
316 type Response = NoResponse;
317
318 fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
319 Self::ProfileChunk(message)
320 }
321}
322
323impl FromMessage<Managed<StoreReplay>> for Store {
324 type Response = NoResponse;
325
326 fn from_message(message: Managed<StoreReplay>, _: ()) -> Self {
327 Self::Replay(message)
328 }
329}
330
331impl FromMessage<Managed<StoreAttachment>> for Store {
332 type Response = NoResponse;
333
334 fn from_message(message: Managed<StoreAttachment>, _: ()) -> Self {
335 Self::Attachment(message)
336 }
337}
338
339impl FromMessage<Managed<StoreUserReport>> for Store {
340 type Response = NoResponse;
341
342 fn from_message(message: Managed<StoreUserReport>, _: ()) -> Self {
343 Self::UserReport(message)
344 }
345}
346
347impl FromMessage<Managed<StoreProfile>> for Store {
348 type Response = NoResponse;
349
350 fn from_message(message: Managed<StoreProfile>, _: ()) -> Self {
351 Self::Profile(message)
352 }
353}
354
355pub struct StoreService {
357 pool: StoreServicePool,
358 config: Arc<Config>,
359 global_config: GlobalConfigHandle,
360 outcome_aggregator: Addr<TrackOutcome>,
361 metric_outcomes: MetricOutcomes,
362 producer: Producer,
363}
364
365impl StoreService {
366 pub fn create(
367 pool: StoreServicePool,
368 config: Arc<Config>,
369 global_config: GlobalConfigHandle,
370 outcome_aggregator: Addr<TrackOutcome>,
371 metric_outcomes: MetricOutcomes,
372 ) -> anyhow::Result<Self> {
373 let producer = Producer::create(&config)?;
374 Ok(Self {
375 pool,
376 config,
377 global_config,
378 outcome_aggregator,
379 metric_outcomes,
380 producer,
381 })
382 }
383
384 fn handle_message(&self, message: Store) {
385 let ty = message.variant();
386 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
387 match message {
388 Store::Envelope(message) => self.handle_store_envelope(message),
389 Store::Metrics(message) => self.handle_store_metrics(message),
390 Store::TraceItem(message) => self.handle_store_trace_item(message),
391 Store::Span(message) => self.handle_store_span(message),
392 Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
393 Store::Replay(message) => self.handle_store_replay(message),
394 Store::Attachment(message) => self.handle_store_attachment(message),
395 Store::UserReport(message) => self.handle_user_report(message),
396 Store::Profile(message) => self.handle_profile(message),
397 }
398 })
399 }
400
401 fn handle_store_envelope(&self, message: StoreEnvelope) {
402 let StoreEnvelope { mut envelope } = message;
403
404 let scoping = envelope.scoping();
405 match self.store_envelope(&mut envelope) {
406 Ok(()) => envelope.accept(),
407 Err(error) => {
408 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
409 relay_log::error!(
410 error = &error as &dyn Error,
411 tags.project_key = %scoping.project_key,
412 "failed to store envelope"
413 );
414 }
415 }
416 }
417
418 fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
419 let mut envelope = managed_envelope.take_envelope();
420 let received_at = managed_envelope.received_at();
421 let scoping = managed_envelope.scoping();
422
423 let retention = envelope.retention();
424 let downsampled_retention = envelope.downsampled_retention();
425
426 let event_id = envelope.event_id();
427 let event_item = envelope.as_mut().take_item_by(|item| {
428 matches!(
429 item.ty(),
430 ItemType::Event | ItemType::Transaction | ItemType::Security
431 )
432 });
433 let event_type = event_item.as_ref().map(|item| item.ty());
434
435 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
439 KafkaTopic::Transactions
440 } else if envelope.get_item_by(is_slow_item).is_some() {
441 KafkaTopic::Attachments
442 } else {
443 KafkaTopic::Events
444 };
445
446 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
447
448 let mut attachments = Vec::new();
449
450 for item in envelope.items() {
451 let content_type = item.content_type();
452 match item.ty() {
453 ItemType::Attachment => {
454 if let Some(attachment) = self.produce_attachment(
455 event_id.ok_or(StoreError::NoEventId)?,
456 scoping.project_id,
457 scoping.organization_id,
458 item,
459 send_individual_attachments,
460 )? {
461 attachments.push(attachment);
462 }
463 }
464 ItemType::UserReport => {
465 debug_assert!(event_topic == KafkaTopic::Attachments);
466 self.produce_user_report(
467 event_id.ok_or(StoreError::NoEventId)?,
468 scoping.project_id,
469 scoping.organization_id,
470 received_at,
471 item,
472 )?;
473 }
474 ItemType::UserReportV2 => {
475 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
476 self.produce_user_report_v2(
477 event_id.ok_or(StoreError::NoEventId)?,
478 scoping.project_id,
479 scoping.organization_id,
480 received_at,
481 item,
482 remote_addr,
483 )?;
484 }
485 ItemType::Profile => self.produce_profile(
486 scoping.organization_id,
487 scoping.project_id,
488 scoping.key_id,
489 received_at,
490 retention,
491 item,
492 )?,
493 ItemType::CheckIn => {
494 let client = envelope.meta().client();
495 self.produce_check_in(
496 scoping.project_id,
497 scoping.organization_id,
498 received_at,
499 client,
500 retention,
501 item,
502 )?
503 }
504 ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span(
505 scoping,
506 received_at,
507 event_id,
508 retention,
509 downsampled_retention,
510 item,
511 )?,
512 ty @ ItemType::Log => {
513 debug_assert!(
514 false,
515 "received {ty} through an envelope, \
516 this item must be submitted via a specific store message instead"
517 );
518 relay_log::error!(
519 tags.project_key = %scoping.project_key,
520 "StoreService received unsupported item type '{ty}' in envelope"
521 );
522 }
523 other => {
524 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
525 let item_types = envelope
526 .items()
527 .map(|item| item.ty().as_str())
528 .collect::<Vec<_>>();
529 let attachment_types = envelope
530 .items()
531 .map(|item| {
532 item.attachment_type()
533 .map(|t| t.to_string())
534 .unwrap_or_default()
535 })
536 .collect::<Vec<_>>();
537
538 relay_log::with_scope(
539 |scope| {
540 scope.set_extra("item_types", item_types.into());
541 scope.set_extra("attachment_types", attachment_types.into());
542 if other == &ItemType::FormData {
543 let payload = item.payload();
544 let form_data_keys = FormDataIter::new(&payload)
545 .map(|entry| entry.key())
546 .collect::<Vec<_>>();
547 scope.set_extra("form_data_keys", form_data_keys.into());
548 }
549 },
550 || {
551 relay_log::error!(
552 tags.project_key = %scoping.project_key,
553 tags.event_type = event_type.unwrap_or("none"),
554 "StoreService received unexpected item type: {other}"
555 )
556 },
557 )
558 }
559 }
560 }
561
562 if let Some(event_item) = event_item {
563 let event_id = event_id.ok_or(StoreError::NoEventId)?;
564 let project_id = scoping.project_id;
565 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
566
567 self.produce(
568 event_topic,
569 KafkaMessage::Event(EventKafkaMessage {
570 payload: event_item.payload(),
571 start_time: safe_timestamp(received_at),
572 event_id,
573 project_id,
574 remote_addr,
575 attachments,
576 org_id: scoping.organization_id,
577 }),
578 )?;
579 } else {
580 debug_assert!(attachments.is_empty());
581 }
582
583 Ok(())
584 }
585
586 fn handle_store_metrics(&self, message: StoreMetrics) {
587 let StoreMetrics {
588 buckets,
589 scoping,
590 retention,
591 } = message;
592
593 let batch_size = self.config.metrics_max_batch_size_bytes();
594 let mut error = None;
595
596 let global_config = self.global_config.current();
597 let mut encoder = BucketEncoder::new(&global_config);
598
599 let emit_sessions_to_eap = utils::is_rolled_out(
600 scoping.organization_id.value(),
601 global_config.options.sessions_eap_rollout_rate,
602 )
603 .is_keep();
604
605 let now = UnixTimestamp::now();
606 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
607
608 for mut bucket in buckets {
609 let namespace = encoder.prepare(&mut bucket);
610
611 if let Some(received_at) = bucket.metadata.received_at {
612 let delay = now.as_secs().saturating_sub(received_at.as_secs());
613 let (total, count, max) = delay_stats.get_mut(namespace);
614 *total += delay;
615 *count += 1;
616 *max = (*max).max(delay);
617 }
618
619 for view in BucketsView::new(std::slice::from_ref(&bucket))
623 .by_size(batch_size)
624 .flatten()
625 {
626 let message = self.create_metric_message(
627 scoping.organization_id,
628 scoping.project_id,
629 &mut encoder,
630 namespace,
631 &view,
632 retention,
633 );
634
635 let result =
636 message.and_then(|message| self.send_metric_message(namespace, message));
637
638 let outcome = match result {
639 Ok(()) => Outcome::Accepted,
640 Err(e) => {
641 error.get_or_insert(e);
642 Outcome::Invalid(DiscardReason::Internal)
643 }
644 };
645
646 self.metric_outcomes.track(scoping, &[view], outcome);
647 }
648
649 if emit_sessions_to_eap
650 && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
651 {
652 let message = KafkaMessage::for_item(scoping, trace_item);
653 let _ = self.produce(KafkaTopic::Items, message);
654 }
655 }
656
657 if let Some(error) = error {
658 relay_log::error!(
659 error = &error as &dyn std::error::Error,
660 "failed to produce metric buckets: {error}"
661 );
662 }
663
664 for (namespace, (total, count, max)) in delay_stats {
665 if count == 0 {
666 continue;
667 }
668 metric!(
669 counter(RelayCounters::MetricDelaySum) += total,
670 namespace = namespace.as_str()
671 );
672 metric!(
673 counter(RelayCounters::MetricDelayCount) += count,
674 namespace = namespace.as_str()
675 );
676 metric!(
677 gauge(RelayGauges::MetricDelayMax) = max,
678 namespace = namespace.as_str()
679 );
680 }
681 }
682
683 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
684 let scoping = message.scoping();
685 let received_at = message.received_at();
686
687 let eap_emits_outcomes = utils::is_rolled_out(
688 scoping.organization_id.value(),
689 self.global_config
690 .current()
691 .options
692 .eap_outcomes_rollout_rate,
693 )
694 .is_keep();
695
696 let outcomes = message.try_accept(|mut item| {
697 let outcomes = match eap_emits_outcomes {
698 true => None,
699 false => item.trace_item.outcomes.take(),
700 };
701
702 let message = KafkaMessage::for_item(scoping, item.trace_item);
703 self.produce(KafkaTopic::Items, message).map(|()| outcomes)
704 });
705
706 if let Ok(Some(outcomes)) = outcomes {
711 for (category, quantity) in outcomes.quantities() {
712 self.outcome_aggregator.send(TrackOutcome {
713 category,
714 event_id: None,
715 outcome: Outcome::Accepted,
716 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
717 remote_addr: None,
718 scoping,
719 timestamp: received_at,
720 });
721 }
722 }
723 }
724
725 fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
726 let scoping = message.scoping();
727 let received_at = message.received_at();
728
729 let relay_emits_accepted_outcome = !utils::is_rolled_out(
730 scoping.organization_id.value(),
731 self.global_config
732 .current()
733 .options
734 .eap_span_outcomes_rollout_rate,
735 )
736 .is_keep();
737
738 let meta = SpanMeta {
739 organization_id: scoping.organization_id,
740 project_id: scoping.project_id,
741 key_id: scoping.key_id,
742 event_id: None,
743 retention_days: message.retention_days,
744 downsampled_retention_days: message.downsampled_retention_days,
745 received: datetime_to_timestamp(received_at),
746 accepted_outcome_emitted: relay_emits_accepted_outcome,
747 };
748
749 let result = message.try_accept(|span| {
750 let item = Annotated::new(span.item);
751 let message = KafkaMessage::SpanV2 {
752 routing_key: span.routing_key,
753 headers: BTreeMap::from([(
754 "project_id".to_owned(),
755 scoping.project_id.to_string(),
756 )]),
757 message: SpanKafkaMessage {
758 meta,
759 span: SerializableAnnotated(&item),
760 },
761 org_id: scoping.organization_id,
762 };
763
764 self.produce(KafkaTopic::Spans, message)
765 });
766
767 if result.is_ok() {
768 relay_statsd::metric!(
769 counter(RelayCounters::SpanV2Produced) += 1,
770 via = "processing"
771 );
772
773 if relay_emits_accepted_outcome {
774 self.outcome_aggregator.send(TrackOutcome {
777 category: DataCategory::SpanIndexed,
778 event_id: None,
779 outcome: Outcome::Accepted,
780 quantity: 1,
781 remote_addr: None,
782 scoping,
783 timestamp: received_at,
784 });
785 }
786 }
787 }
788
789 fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
790 let scoping = message.scoping();
791 let received_at = message.received_at();
792
793 let _ = message.try_accept(|message| {
794 let message = ProfileChunkKafkaMessage {
795 organization_id: scoping.organization_id,
796 project_id: scoping.project_id,
797 received: safe_timestamp(received_at),
798 retention_days: message.retention_days,
799 headers: BTreeMap::from([(
800 "project_id".to_owned(),
801 scoping.project_id.to_string(),
802 )]),
803 payload: message.payload,
804 };
805
806 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
807 });
808 }
809
810 fn handle_store_replay(&self, message: Managed<StoreReplay>) {
811 let scoping = message.scoping();
812 let received_at = message.received_at();
813
814 let _ = message.try_accept(|replay| {
815 let kafka_msg =
816 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
817 replay_id: replay.event_id,
818 key_id: scoping.key_id,
819 org_id: scoping.organization_id,
820 project_id: scoping.project_id,
821 received: safe_timestamp(received_at),
822 retention_days: replay.retention_days,
823 payload: &replay.recording,
824 replay_event: replay.event.as_deref(),
825 replay_video: replay.video.as_deref(),
826 relay_snuba_publish_disabled: true,
829 });
830 self.produce(KafkaTopic::ReplayRecordings, kafka_msg)
831 });
832 }
833
834 fn handle_store_attachment(&self, message: Managed<StoreAttachment>) {
835 let scoping = message.scoping();
836 let _ = message.try_accept(|attachment| {
837 let result = self.produce_attachment(
838 attachment.event_id,
839 scoping.project_id,
840 scoping.organization_id,
841 &attachment.attachment,
842 true,
844 );
845 debug_assert!(!matches!(result, Ok(Some(_))));
848 result
849 });
850 }
851
852 fn handle_user_report(&self, message: Managed<StoreUserReport>) {
853 let scoping = message.scoping();
854 let received_at = message.received_at();
855
856 let _ = message.try_accept(|report| {
857 let kafka_msg = KafkaMessage::UserReport(UserReportKafkaMessage {
858 project_id: scoping.project_id,
859 event_id: report.event_id,
860 start_time: safe_timestamp(received_at),
861 payload: report.report.payload(),
862 org_id: scoping.organization_id,
863 });
864 self.produce(KafkaTopic::Attachments, kafka_msg)
865 });
866 }
867
868 fn handle_profile(&self, message: Managed<StoreProfile>) {
869 let scoping = message.scoping();
870 let received_at = message.received_at();
871
872 let _ = message.try_accept(|profile| {
873 self.produce_profile(
874 scoping.organization_id,
875 scoping.project_id,
876 scoping.key_id,
877 received_at,
878 profile.retention_days,
879 &profile.profile,
880 )
881 });
882 }
883
884 fn create_metric_message<'a>(
885 &self,
886 organization_id: OrganizationId,
887 project_id: ProjectId,
888 encoder: &'a mut BucketEncoder,
889 namespace: MetricNamespace,
890 view: &BucketView<'a>,
891 retention_days: u16,
892 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
893 let value = match view.value() {
894 BucketViewValue::Counter(c) => MetricValue::Counter(c),
895 BucketViewValue::Distribution(data) => MetricValue::Distribution(
896 encoder
897 .encode_distribution(namespace, data)
898 .map_err(StoreError::EncodingFailed)?,
899 ),
900 BucketViewValue::Set(data) => MetricValue::Set(
901 encoder
902 .encode_set(namespace, data)
903 .map_err(StoreError::EncodingFailed)?,
904 ),
905 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
906 };
907
908 Ok(MetricKafkaMessage {
909 org_id: organization_id,
910 project_id,
911 name: view.name(),
912 value,
913 timestamp: view.timestamp(),
914 tags: view.tags(),
915 retention_days,
916 received_at: view.metadata().received_at,
917 })
918 }
919
920 fn produce(
921 &self,
922 topic: KafkaTopic,
923 message: KafkaMessage,
925 ) -> Result<(), StoreError> {
926 relay_log::trace!("Sending kafka message of type {}", message.variant());
927
928 let topic_name = self
929 .producer
930 .client
931 .send_message(topic, &message)
932 .inspect_err(|err| {
933 relay_log::error!(
934 error = err as &dyn Error,
935 tags.topic = ?topic,
936 tags.message = message.variant(),
937 "failed to produce to Kafka"
938 )
939 })?;
940
941 match &message {
942 KafkaMessage::Metric {
943 message: metric, ..
944 } => {
945 metric!(
946 counter(RelayCounters::ProcessingMessageProduced) += 1,
947 event_type = message.variant(),
948 topic = topic_name,
949 metric_type = metric.value.variant(),
950 metric_encoding = metric.value.encoding().unwrap_or(""),
951 );
952 }
953 KafkaMessage::ReplayRecordingNotChunked(replay) => {
954 let has_video = replay.replay_video.is_some();
955
956 metric!(
957 counter(RelayCounters::ProcessingMessageProduced) += 1,
958 event_type = message.variant(),
959 topic = topic_name,
960 has_video = bool_to_str(has_video),
961 );
962 }
963 message => {
964 metric!(
965 counter(RelayCounters::ProcessingMessageProduced) += 1,
966 event_type = message.variant(),
967 topic = topic_name,
968 );
969 }
970 }
971
972 Ok(())
973 }
974
975 fn produce_attachment(
987 &self,
988 event_id: EventId,
989 project_id: ProjectId,
990 org_id: OrganizationId,
991 item: &Item,
992 send_individual_attachments: bool,
993 ) -> Result<Option<ChunkedAttachment>, StoreError> {
994 let id = Uuid::new_v4().to_string();
995
996 let payload = item.payload();
997 let size = item.len();
998 let max_chunk_size = self.config.attachment_chunk_size();
999
1000 let payload = if size == 0 {
1001 AttachmentPayload::Chunked(0)
1002 } else if let Some(stored_key) = item.stored_key() {
1003 AttachmentPayload::Stored(stored_key.into())
1004 } else if send_individual_attachments && size < max_chunk_size {
1005 AttachmentPayload::Inline(payload)
1009 } else {
1010 let mut chunk_index = 0;
1011 let mut offset = 0;
1012 while offset < size {
1015 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
1016 let chunk_message = AttachmentChunkKafkaMessage {
1017 payload: payload.slice(offset..offset + chunk_size),
1018 event_id,
1019 project_id,
1020 id: id.clone(),
1021 chunk_index,
1022 org_id,
1023 };
1024
1025 self.produce(
1026 KafkaTopic::Attachments,
1027 KafkaMessage::AttachmentChunk(chunk_message),
1028 )?;
1029 offset += chunk_size;
1030 chunk_index += 1;
1031 }
1032
1033 AttachmentPayload::Chunked(chunk_index)
1036 };
1037
1038 let attachment = ChunkedAttachment {
1039 id,
1040 name: match item.filename() {
1041 Some(name) => name.to_owned(),
1042 None => UNNAMED_ATTACHMENT.to_owned(),
1043 },
1044 rate_limited: item.rate_limited(),
1045 content_type: item.raw_content_type().map(|s| s.to_ascii_lowercase()),
1046 attachment_type: item.attachment_type().unwrap_or_default(),
1047 size,
1048 payload,
1049 };
1050
1051 if send_individual_attachments {
1052 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
1053 event_id,
1054 project_id,
1055 attachment,
1056 org_id,
1057 });
1058 self.produce(KafkaTopic::Attachments, message)?;
1059 Ok(None)
1060 } else {
1061 Ok(Some(attachment))
1062 }
1063 }
1064
1065 fn produce_user_report(
1066 &self,
1067 event_id: EventId,
1068 project_id: ProjectId,
1069 org_id: OrganizationId,
1070 received_at: DateTime<Utc>,
1071 item: &Item,
1072 ) -> Result<(), StoreError> {
1073 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
1074 project_id,
1075 event_id,
1076 start_time: safe_timestamp(received_at),
1077 payload: item.payload(),
1078 org_id,
1079 });
1080
1081 self.produce(KafkaTopic::Attachments, message)
1082 }
1083
1084 fn produce_user_report_v2(
1085 &self,
1086 event_id: EventId,
1087 project_id: ProjectId,
1088 org_id: OrganizationId,
1089 received_at: DateTime<Utc>,
1090 item: &Item,
1091 remote_addr: Option<String>,
1092 ) -> Result<(), StoreError> {
1093 let message = KafkaMessage::Event(EventKafkaMessage {
1094 project_id,
1095 event_id,
1096 payload: item.payload(),
1097 start_time: safe_timestamp(received_at),
1098 remote_addr,
1099 attachments: vec![],
1100 org_id,
1101 });
1102 self.produce(KafkaTopic::Feedback, message)
1103 }
1104
1105 fn send_metric_message(
1106 &self,
1107 namespace: MetricNamespace,
1108 message: MetricKafkaMessage,
1109 ) -> Result<(), StoreError> {
1110 let topic = match namespace {
1111 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
1112 MetricNamespace::Unsupported => {
1113 relay_log::with_scope(
1114 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
1115 || relay_log::error!("store service dropping unknown metric usecase"),
1116 );
1117 return Ok(());
1118 }
1119 _ => KafkaTopic::MetricsGeneric,
1120 };
1121
1122 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
1123 self.produce(topic, KafkaMessage::Metric { headers, message })?;
1124 Ok(())
1125 }
1126
1127 fn produce_profile(
1128 &self,
1129 organization_id: OrganizationId,
1130 project_id: ProjectId,
1131 key_id: Option<u64>,
1132 received_at: DateTime<Utc>,
1133 retention_days: u16,
1134 item: &Item,
1135 ) -> Result<(), StoreError> {
1136 let message = ProfileKafkaMessage {
1137 organization_id,
1138 project_id,
1139 key_id,
1140 received: safe_timestamp(received_at),
1141 retention_days,
1142 headers: BTreeMap::from([
1143 (
1144 "sampled".to_owned(),
1145 if item.sampled() { "true" } else { "false" }.to_owned(),
1146 ),
1147 ("project_id".to_owned(), project_id.to_string()),
1148 ]),
1149 payload: item.payload(),
1150 };
1151 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
1152 Ok(())
1153 }
1154
1155 fn produce_check_in(
1156 &self,
1157 project_id: ProjectId,
1158 org_id: OrganizationId,
1159 received_at: DateTime<Utc>,
1160 client: Option<&str>,
1161 retention_days: u16,
1162 item: &Item,
1163 ) -> Result<(), StoreError> {
1164 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1165 message_type: CheckInMessageType::CheckIn,
1166 project_id,
1167 retention_days,
1168 start_time: safe_timestamp(received_at),
1169 sdk: client.map(str::to_owned),
1170 payload: item.payload(),
1171 routing_key_hint: item.routing_hint(),
1172 org_id,
1173 });
1174
1175 self.produce(KafkaTopic::Monitors, message)?;
1176
1177 Ok(())
1178 }
1179
1180 fn produce_span(
1181 &self,
1182 scoping: Scoping,
1183 received_at: DateTime<Utc>,
1184 event_id: Option<EventId>,
1185 retention_days: u16,
1186 downsampled_retention_days: u16,
1187 item: &Item,
1188 ) -> Result<(), StoreError> {
1189 debug_assert_eq!(item.ty(), &ItemType::Span);
1190 debug_assert_eq!(item.content_type(), Some(ContentType::Json));
1191
1192 let Scoping {
1193 organization_id,
1194 project_id,
1195 project_key: _,
1196 key_id,
1197 } = scoping;
1198
1199 let relay_emits_accepted_outcome = !utils::is_rolled_out(
1200 scoping.organization_id.value(),
1201 self.global_config
1202 .current()
1203 .options
1204 .eap_span_outcomes_rollout_rate,
1205 )
1206 .is_keep();
1207
1208 let payload = item.payload();
1209 let message = SpanKafkaMessageRaw {
1210 meta: SpanMeta {
1211 organization_id,
1212 project_id,
1213 key_id,
1214 event_id,
1215 retention_days,
1216 downsampled_retention_days,
1217 received: datetime_to_timestamp(received_at),
1218 accepted_outcome_emitted: relay_emits_accepted_outcome,
1219 },
1220 span: serde_json::from_slice(&payload)
1221 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1222 };
1223
1224 debug_assert!(message.span.contains_key("attributes"));
1226 relay_statsd::metric!(
1227 counter(RelayCounters::SpanV2Produced) += 1,
1228 via = "envelope"
1229 );
1230
1231 self.produce(
1232 KafkaTopic::Spans,
1233 KafkaMessage::SpanRaw {
1234 routing_key: item.routing_hint(),
1235 headers: BTreeMap::from([(
1236 "project_id".to_owned(),
1237 scoping.project_id.to_string(),
1238 )]),
1239 message,
1240 org_id: organization_id,
1241 },
1242 )?;
1243
1244 if relay_emits_accepted_outcome {
1245 self.outcome_aggregator.send(TrackOutcome {
1248 category: DataCategory::SpanIndexed,
1249 event_id: None,
1250 outcome: Outcome::Accepted,
1251 quantity: 1,
1252 remote_addr: None,
1253 scoping,
1254 timestamp: received_at,
1255 });
1256 }
1257
1258 Ok(())
1259 }
1260}
1261
1262impl Service for StoreService {
1263 type Interface = Store;
1264
1265 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1266 let this = Arc::new(self);
1267
1268 relay_log::info!("store forwarder started");
1269
1270 while let Some(message) = rx.recv().await {
1271 let service = Arc::clone(&this);
1272 this.pool
1275 .spawn_async(async move { service.handle_message(message) }.boxed())
1276 .await;
1277 }
1278
1279 relay_log::info!("store forwarder stopped");
1280 }
1281}
1282
1283#[derive(Debug, Serialize)]
1285enum AttachmentPayload {
1286 #[serde(rename = "chunks")]
1291 Chunked(usize),
1292
1293 #[serde(rename = "data")]
1295 Inline(Bytes),
1296
1297 #[serde(rename = "stored_id")]
1299 Stored(String),
1300}
1301
1302#[derive(Debug, Serialize)]
1304struct ChunkedAttachment {
1305 id: String,
1309
1310 name: String,
1312
1313 rate_limited: bool,
1320
1321 #[serde(skip_serializing_if = "Option::is_none")]
1323 content_type: Option<String>,
1324
1325 #[serde(serialize_with = "serialize_attachment_type")]
1327 attachment_type: AttachmentType,
1328
1329 size: usize,
1331
1332 #[serde(flatten)]
1334 payload: AttachmentPayload,
1335}
1336
1337fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1343where
1344 S: serde::Serializer,
1345 T: serde::Serialize,
1346{
1347 serde_json::to_value(t)
1348 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1349 .serialize(serializer)
1350}
1351
1352#[derive(Debug, Serialize)]
1354struct EventKafkaMessage {
1355 payload: Bytes,
1357 start_time: u64,
1359 event_id: EventId,
1361 project_id: ProjectId,
1363 remote_addr: Option<String>,
1365 attachments: Vec<ChunkedAttachment>,
1367
1368 #[serde(skip)]
1370 org_id: OrganizationId,
1371}
1372
1373#[derive(Debug, Serialize)]
1375struct AttachmentChunkKafkaMessage {
1376 payload: Bytes,
1378 event_id: EventId,
1380 project_id: ProjectId,
1382 id: String,
1386 chunk_index: usize,
1388
1389 #[serde(skip)]
1391 org_id: OrganizationId,
1392}
1393
1394#[derive(Debug, Serialize)]
1399struct AttachmentKafkaMessage {
1400 event_id: EventId,
1402 project_id: ProjectId,
1404 attachment: ChunkedAttachment,
1406
1407 #[serde(skip)]
1409 org_id: OrganizationId,
1410}
1411
1412#[derive(Debug, Serialize)]
1413struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1414 replay_id: EventId,
1415 key_id: Option<u64>,
1416 org_id: OrganizationId,
1417 project_id: ProjectId,
1418 received: u64,
1419 retention_days: u16,
1420 #[serde(with = "serde_bytes")]
1421 payload: &'a [u8],
1422 #[serde(with = "serde_bytes")]
1423 replay_event: Option<&'a [u8]>,
1424 #[serde(with = "serde_bytes")]
1425 replay_video: Option<&'a [u8]>,
1426 relay_snuba_publish_disabled: bool,
1427}
1428
1429#[derive(Debug, Serialize)]
1433struct UserReportKafkaMessage {
1434 project_id: ProjectId,
1436 start_time: u64,
1437 payload: Bytes,
1438
1439 #[serde(skip)]
1441 event_id: EventId,
1442 #[serde(skip)]
1444 org_id: OrganizationId,
1445}
1446
1447#[derive(Clone, Debug, Serialize)]
1448struct MetricKafkaMessage<'a> {
1449 org_id: OrganizationId,
1450 project_id: ProjectId,
1451 name: &'a MetricName,
1452 #[serde(flatten)]
1453 value: MetricValue<'a>,
1454 timestamp: UnixTimestamp,
1455 tags: &'a BTreeMap<String, String>,
1456 retention_days: u16,
1457 #[serde(skip_serializing_if = "Option::is_none")]
1458 received_at: Option<UnixTimestamp>,
1459}
1460
1461#[derive(Clone, Debug, Serialize)]
1462#[serde(tag = "type", content = "value")]
1463enum MetricValue<'a> {
1464 #[serde(rename = "c")]
1465 Counter(FiniteF64),
1466 #[serde(rename = "d")]
1467 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1468 #[serde(rename = "s")]
1469 Set(ArrayEncoding<'a, SetView<'a>>),
1470 #[serde(rename = "g")]
1471 Gauge(GaugeValue),
1472}
1473
1474impl MetricValue<'_> {
1475 fn variant(&self) -> &'static str {
1476 match self {
1477 Self::Counter(_) => "counter",
1478 Self::Distribution(_) => "distribution",
1479 Self::Set(_) => "set",
1480 Self::Gauge(_) => "gauge",
1481 }
1482 }
1483
1484 fn encoding(&self) -> Option<&'static str> {
1485 match self {
1486 Self::Distribution(ae) => Some(ae.name()),
1487 Self::Set(ae) => Some(ae.name()),
1488 _ => None,
1489 }
1490 }
1491}
1492
1493#[derive(Clone, Debug, Serialize)]
1494struct ProfileKafkaMessage {
1495 organization_id: OrganizationId,
1496 project_id: ProjectId,
1497 key_id: Option<u64>,
1498 received: u64,
1499 retention_days: u16,
1500 #[serde(skip)]
1501 headers: BTreeMap<String, String>,
1502 payload: Bytes,
1503}
1504
1505#[allow(dead_code)]
1511#[derive(Debug, Serialize)]
1512#[serde(rename_all = "snake_case")]
1513enum CheckInMessageType {
1514 ClockPulse,
1515 CheckIn,
1516}
1517
1518#[derive(Debug, Serialize)]
1519struct CheckInKafkaMessage {
1520 message_type: CheckInMessageType,
1522 payload: Bytes,
1524 start_time: u64,
1526 sdk: Option<String>,
1528 project_id: ProjectId,
1530 retention_days: u16,
1532
1533 #[serde(skip)]
1535 routing_key_hint: Option<Uuid>,
1536 #[serde(skip)]
1538 org_id: OrganizationId,
1539}
1540
1541#[derive(Debug, Serialize)]
1542struct SpanKafkaMessageRaw<'a> {
1543 #[serde(flatten)]
1544 meta: SpanMeta,
1545 #[serde(flatten)]
1546 span: BTreeMap<&'a str, &'a RawValue>,
1547}
1548
1549#[derive(Debug, Serialize)]
1550struct SpanKafkaMessage<'a> {
1551 #[serde(flatten)]
1552 meta: SpanMeta,
1553 #[serde(flatten)]
1554 span: SerializableAnnotated<'a, SpanV2>,
1555}
1556
1557#[derive(Debug, Serialize)]
1558struct SpanMeta {
1559 organization_id: OrganizationId,
1560 project_id: ProjectId,
1561 #[serde(skip_serializing_if = "Option::is_none")]
1563 key_id: Option<u64>,
1564 #[serde(skip_serializing_if = "Option::is_none")]
1565 event_id: Option<EventId>,
1566 received: f64,
1568 retention_days: u16,
1570 downsampled_retention_days: u16,
1572 accepted_outcome_emitted: bool,
1574}
1575
1576#[derive(Clone, Debug, Serialize)]
1577struct ProfileChunkKafkaMessage {
1578 organization_id: OrganizationId,
1579 project_id: ProjectId,
1580 received: u64,
1581 retention_days: u16,
1582 #[serde(skip)]
1583 headers: BTreeMap<String, String>,
1584 payload: Bytes,
1585}
1586
1587#[derive(Debug, Serialize)]
1589#[serde(tag = "type", rename_all = "snake_case")]
1590#[allow(clippy::large_enum_variant)]
1591enum KafkaMessage<'a> {
1592 Event(EventKafkaMessage),
1593 UserReport(UserReportKafkaMessage),
1594 Metric {
1595 #[serde(skip)]
1596 headers: BTreeMap<String, String>,
1597 #[serde(flatten)]
1598 message: MetricKafkaMessage<'a>,
1599 },
1600 CheckIn(CheckInKafkaMessage),
1601 Item {
1602 #[serde(skip)]
1603 headers: BTreeMap<String, String>,
1604 #[serde(skip)]
1605 item_type: TraceItemType,
1606 #[serde(skip)]
1607 message: TraceItem,
1608 },
1609 SpanRaw {
1610 #[serde(skip)]
1611 routing_key: Option<Uuid>,
1612 #[serde(skip)]
1613 headers: BTreeMap<String, String>,
1614 #[serde(flatten)]
1615 message: SpanKafkaMessageRaw<'a>,
1616
1617 #[serde(skip)]
1619 org_id: OrganizationId,
1620 },
1621 SpanV2 {
1622 #[serde(skip)]
1623 routing_key: Option<Uuid>,
1624 #[serde(skip)]
1625 headers: BTreeMap<String, String>,
1626 #[serde(flatten)]
1627 message: SpanKafkaMessage<'a>,
1628
1629 #[serde(skip)]
1631 org_id: OrganizationId,
1632 },
1633
1634 Attachment(AttachmentKafkaMessage),
1635 AttachmentChunk(AttachmentChunkKafkaMessage),
1636
1637 Profile(ProfileKafkaMessage),
1638 ProfileChunk(ProfileChunkKafkaMessage),
1639
1640 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1641}
1642
1643impl KafkaMessage<'_> {
1644 fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
1646 let item_type = item.item_type();
1647 KafkaMessage::Item {
1648 headers: BTreeMap::from([
1649 ("project_id".to_owned(), scoping.project_id.to_string()),
1650 ("item_type".to_owned(), (item_type as i32).to_string()),
1651 ]),
1652 message: item,
1653 item_type,
1654 }
1655 }
1656}
1657
1658impl Message for KafkaMessage<'_> {
1659 fn variant(&self) -> &'static str {
1660 match self {
1661 KafkaMessage::Event(_) => "event",
1662 KafkaMessage::UserReport(_) => "user_report",
1663 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1664 MetricNamespace::Sessions => "metric_sessions",
1665 MetricNamespace::Transactions => "metric_transactions",
1666 MetricNamespace::Spans => "metric_spans",
1667 MetricNamespace::Custom => "metric_custom",
1668 MetricNamespace::Unsupported => "metric_unsupported",
1669 },
1670 KafkaMessage::CheckIn(_) => "check_in",
1671 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1672 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1673
1674 KafkaMessage::Attachment(_) => "attachment",
1675 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1676
1677 KafkaMessage::Profile(_) => "profile",
1678 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1679
1680 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1681 }
1682 }
1683
1684 fn key(&self) -> Option<relay_kafka::Key> {
1686 match self {
1687 Self::Event(message) => Some((message.event_id.0, message.org_id)),
1688 Self::UserReport(message) => Some((message.event_id.0, message.org_id)),
1689 Self::SpanRaw {
1690 routing_key,
1691 org_id,
1692 ..
1693 }
1694 | Self::SpanV2 {
1695 routing_key,
1696 org_id,
1697 ..
1698 } => routing_key.map(|r| (r, *org_id)),
1699
1700 Self::CheckIn(message) => message.routing_key_hint.map(|r| (r, message.org_id)),
1705
1706 Self::Attachment(message) => Some((message.event_id.0, message.org_id)),
1707 Self::AttachmentChunk(message) => Some((message.event_id.0, message.org_id)),
1708
1709 Self::Metric { .. }
1711 | Self::Item { .. }
1712 | Self::Profile(_)
1713 | Self::ProfileChunk(_)
1714 | Self::ReplayRecordingNotChunked(_) => None,
1715 }
1716 .filter(|(uuid, _)| !uuid.is_nil())
1717 .map(|(uuid, org_id)| {
1718 let mut res = uuid.into_bytes();
1721 for (i, &b) in org_id.value().to_be_bytes().iter().enumerate() {
1722 res[i] ^= b;
1723 }
1724 u128::from_be_bytes(res)
1725 })
1726 }
1727
1728 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1729 match &self {
1730 KafkaMessage::Metric { headers, .. }
1731 | KafkaMessage::SpanRaw { headers, .. }
1732 | KafkaMessage::SpanV2 { headers, .. }
1733 | KafkaMessage::Item { headers, .. }
1734 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1735 | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1736
1737 KafkaMessage::Event(_)
1738 | KafkaMessage::UserReport(_)
1739 | KafkaMessage::CheckIn(_)
1740 | KafkaMessage::Attachment(_)
1741 | KafkaMessage::AttachmentChunk(_)
1742 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1743 }
1744 }
1745
1746 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1747 match self {
1748 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1749 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1750 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1751 KafkaMessage::Item { message, .. } => {
1752 let mut payload = Vec::new();
1753 match message.encode(&mut payload) {
1754 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1755 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1756 }
1757 }
1758 KafkaMessage::Event(_)
1759 | KafkaMessage::UserReport(_)
1760 | KafkaMessage::CheckIn(_)
1761 | KafkaMessage::Attachment(_)
1762 | KafkaMessage::AttachmentChunk(_)
1763 | KafkaMessage::Profile(_)
1764 | KafkaMessage::ProfileChunk(_)
1765 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1766 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1767 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1768 },
1769 }
1770 }
1771}
1772
1773fn serialize_as_json<T: serde::Serialize>(
1774 value: &T,
1775) -> Result<SerializationOutput<'_>, ClientError> {
1776 match serde_json::to_vec(value) {
1777 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1778 Err(err) => Err(ClientError::InvalidJson(err)),
1779 }
1780}
1781
1782fn is_slow_item(item: &Item) -> bool {
1786 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1787}
1788
1789fn bool_to_str(value: bool) -> &'static str {
1790 if value { "true" } else { "false" }
1791}
1792
1793fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1797 let ts = timestamp.timestamp();
1798 if ts >= 0 {
1799 return ts as u64;
1800 }
1801
1802 Utc::now().timestamp() as u64
1804}
1805
1806#[cfg(test)]
1807mod tests {
1808
1809 use super::*;
1810
1811 #[test]
1812 fn disallow_outcomes() {
1813 let config = Config::default();
1814 let producer = Producer::create(&config).unwrap();
1815
1816 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1817 let res = producer
1818 .client
1819 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1820
1821 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1822 }
1823 }
1824}