1use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::error::Error;
7use std::sync::Arc;
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use futures::future::BoxFuture;
13use prost::Message as _;
14use sentry_protos::snuba::v1::{TraceItem, TraceItemType};
15use serde::{Deserialize, Serialize};
16use serde_json::value::RawValue;
17use uuid::Uuid;
18
19use relay_base_schema::data_category::DataCategory;
20use relay_base_schema::organization::OrganizationId;
21use relay_base_schema::project::ProjectId;
22use relay_common::time::UnixTimestamp;
23use relay_config::Config;
24use relay_event_schema::protocol::{EventId, SpanV2, datetime_to_timestamp};
25use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message, SerializationOutput};
26use relay_metrics::{
27 Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, GaugeValue, MetricName,
28 MetricNamespace, SetView,
29};
30use relay_protocol::{Annotated, FiniteF64, SerializableAnnotated};
31use relay_quotas::Scoping;
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
34use relay_threading::AsyncPool;
35
36use crate::envelope::{AttachmentType, ContentType, Item, ItemType};
37use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, TypedEnvelope};
38use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
39use crate::service::ServiceError;
40use crate::services::global_config::GlobalConfigHandle;
41use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
42use crate::services::processor::Processed;
43use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
44use crate::utils::{self, FormDataIter};
45
46const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
48
49#[derive(Debug, thiserror::Error)]
50pub enum StoreError {
51 #[error("failed to send the message to kafka: {0}")]
52 SendFailed(#[from] ClientError),
53 #[error("failed to encode data: {0}")]
54 EncodingFailed(std::io::Error),
55 #[error("failed to store event because event id was missing")]
56 NoEventId,
57}
58
59impl OutcomeError for StoreError {
60 type Error = Self;
61
62 fn consume(self) -> (Option<Outcome>, Self::Error) {
63 (Some(Outcome::Invalid(DiscardReason::Internal)), self)
64 }
65}
66
67struct Producer {
68 client: KafkaClient,
69}
70
71impl Producer {
72 pub fn create(config: &Config) -> anyhow::Result<Self> {
73 let mut client_builder = KafkaClient::builder();
74
75 for topic in KafkaTopic::iter().filter(|t| {
76 **t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
79 }) {
80 let kafka_configs = config.kafka_configs(*topic)?;
81 client_builder = client_builder
82 .add_kafka_topic_config(*topic, &kafka_configs, config.kafka_validate_topics())
83 .map_err(|e| ServiceError::Kafka(e.to_string()))?;
84 }
85
86 Ok(Self {
87 client: client_builder.build(),
88 })
89 }
90}
91
92#[derive(Debug)]
94pub struct StoreEnvelope {
95 pub envelope: TypedEnvelope<Processed>,
96}
97
98#[derive(Clone, Debug)]
100pub struct StoreMetrics {
101 pub buckets: Vec<Bucket>,
102 pub scoping: Scoping,
103 pub retention: u16,
104}
105
106#[derive(Debug)]
108pub struct StoreTraceItem {
109 pub trace_item: TraceItem,
111 pub quantities: Quantities,
116}
117
118impl Counted for StoreTraceItem {
119 fn quantities(&self) -> Quantities {
120 self.quantities.clone()
121 }
122}
123
124#[derive(Debug)]
126pub struct StoreSpanV2 {
127 pub routing_key: Option<Uuid>,
129 pub retention_days: u16,
131 pub downsampled_retention_days: u16,
133 pub item: SpanV2,
135}
136
137impl Counted for StoreSpanV2 {
138 fn quantities(&self) -> Quantities {
139 smallvec::smallvec![(DataCategory::SpanIndexed, 1)]
140 }
141}
142
143#[derive(Debug)]
145pub struct StoreProfileChunk {
146 pub retention_days: u16,
148 pub payload: Bytes,
150 pub quantities: Quantities,
154}
155
156impl Counted for StoreProfileChunk {
157 fn quantities(&self) -> Quantities {
158 self.quantities.clone()
159 }
160}
161
162pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
164
165#[derive(Debug)]
167pub enum Store {
168 Envelope(StoreEnvelope),
176 Metrics(StoreMetrics),
178 TraceItem(Managed<StoreTraceItem>),
180 Span(Managed<Box<StoreSpanV2>>),
182 ProfileChunk(Managed<StoreProfileChunk>),
184}
185
186impl Store {
187 fn variant(&self) -> &'static str {
189 match self {
190 Store::Envelope(_) => "envelope",
191 Store::Metrics(_) => "metrics",
192 Store::TraceItem(_) => "log",
193 Store::Span(_) => "span",
194 Store::ProfileChunk(_) => "profile_chunk",
195 }
196 }
197}
198
199impl Interface for Store {}
200
201impl FromMessage<StoreEnvelope> for Store {
202 type Response = NoResponse;
203
204 fn from_message(message: StoreEnvelope, _: ()) -> Self {
205 Self::Envelope(message)
206 }
207}
208
209impl FromMessage<StoreMetrics> for Store {
210 type Response = NoResponse;
211
212 fn from_message(message: StoreMetrics, _: ()) -> Self {
213 Self::Metrics(message)
214 }
215}
216
217impl FromMessage<Managed<StoreTraceItem>> for Store {
218 type Response = NoResponse;
219
220 fn from_message(message: Managed<StoreTraceItem>, _: ()) -> Self {
221 Self::TraceItem(message)
222 }
223}
224
225impl FromMessage<Managed<Box<StoreSpanV2>>> for Store {
226 type Response = NoResponse;
227
228 fn from_message(message: Managed<Box<StoreSpanV2>>, _: ()) -> Self {
229 Self::Span(message)
230 }
231}
232
233impl FromMessage<Managed<StoreProfileChunk>> for Store {
234 type Response = NoResponse;
235
236 fn from_message(message: Managed<StoreProfileChunk>, _: ()) -> Self {
237 Self::ProfileChunk(message)
238 }
239}
240
241pub struct StoreService {
243 pool: StoreServicePool,
244 config: Arc<Config>,
245 global_config: GlobalConfigHandle,
246 outcome_aggregator: Addr<TrackOutcome>,
247 metric_outcomes: MetricOutcomes,
248 producer: Producer,
249}
250
251impl StoreService {
252 pub fn create(
253 pool: StoreServicePool,
254 config: Arc<Config>,
255 global_config: GlobalConfigHandle,
256 outcome_aggregator: Addr<TrackOutcome>,
257 metric_outcomes: MetricOutcomes,
258 ) -> anyhow::Result<Self> {
259 let producer = Producer::create(&config)?;
260 Ok(Self {
261 pool,
262 config,
263 global_config,
264 outcome_aggregator,
265 metric_outcomes,
266 producer,
267 })
268 }
269
270 fn handle_message(&self, message: Store) {
271 let ty = message.variant();
272 relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
273 match message {
274 Store::Envelope(message) => self.handle_store_envelope(message),
275 Store::Metrics(message) => self.handle_store_metrics(message),
276 Store::TraceItem(message) => self.handle_store_trace_item(message),
277 Store::Span(message) => self.handle_store_span(message),
278 Store::ProfileChunk(message) => self.handle_store_profile_chunk(message),
279 }
280 })
281 }
282
283 fn handle_store_envelope(&self, message: StoreEnvelope) {
284 let StoreEnvelope {
285 envelope: mut managed,
286 } = message;
287
288 let scoping = managed.scoping();
289
290 match self.store_envelope(&mut managed) {
291 Ok(()) => managed.accept(),
292 Err(error) => {
293 managed.reject(Outcome::Invalid(DiscardReason::Internal));
294 relay_log::error!(
295 error = &error as &dyn Error,
296 tags.project_key = %scoping.project_key,
297 "failed to store envelope"
298 );
299 }
300 }
301 }
302
303 fn store_envelope(&self, managed_envelope: &mut ManagedEnvelope) -> Result<(), StoreError> {
304 let mut envelope = managed_envelope.take_envelope();
305 let received_at = managed_envelope.received_at();
306 let scoping = managed_envelope.scoping();
307
308 let retention = envelope.retention();
309 let downsampled_retention = envelope.downsampled_retention();
310
311 let event_id = envelope.event_id();
312 let event_item = envelope.as_mut().take_item_by(|item| {
313 matches!(
314 item.ty(),
315 ItemType::Event | ItemType::Transaction | ItemType::Security
316 )
317 });
318 let event_type = event_item.as_ref().map(|item| item.ty());
319
320 let event_topic = if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
324 KafkaTopic::Transactions
325 } else if envelope.get_item_by(is_slow_item).is_some() {
326 KafkaTopic::Attachments
327 } else {
328 KafkaTopic::Events
329 };
330
331 let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
332
333 let mut attachments = Vec::new();
334 let mut replay_event = None;
335 let mut replay_recording = None;
336
337 let options = &self.global_config.current().options;
338
339 let replay_relay_snuba_publish_disabled =
341 utils::sample(options.replay_relay_snuba_publish_disabled_sample_rate).is_keep();
342
343 for item in envelope.items() {
344 let content_type = item.content_type();
345 match item.ty() {
346 ItemType::Attachment => {
347 if let Some(attachment) = self.produce_attachment(
348 event_id.ok_or(StoreError::NoEventId)?,
349 scoping.project_id,
350 item,
351 send_individual_attachments,
352 )? {
353 attachments.push(attachment);
354 }
355 }
356 ItemType::UserReport => {
357 debug_assert!(event_topic == KafkaTopic::Attachments);
358 self.produce_user_report(
359 event_id.ok_or(StoreError::NoEventId)?,
360 scoping.project_id,
361 received_at,
362 item,
363 )?;
364 }
365 ItemType::UserReportV2 => {
366 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
367 self.produce_user_report_v2(
368 event_id.ok_or(StoreError::NoEventId)?,
369 scoping.project_id,
370 received_at,
371 item,
372 remote_addr,
373 )?;
374 }
375 ItemType::Profile => self.produce_profile(
376 scoping.organization_id,
377 scoping.project_id,
378 scoping.key_id,
379 received_at,
380 retention,
381 item,
382 )?,
383 ItemType::ReplayVideo => {
384 self.produce_replay_video(
385 event_id,
386 scoping,
387 item.payload(),
388 received_at,
389 retention,
390 replay_relay_snuba_publish_disabled,
391 )?;
392 }
393 ItemType::ReplayRecording => {
394 replay_recording = Some(item);
395 }
396 ItemType::ReplayEvent => {
397 replay_event = Some(item);
398 self.produce_replay_event(
399 event_id.ok_or(StoreError::NoEventId)?,
400 scoping.project_id,
401 received_at,
402 retention,
403 &item.payload(),
404 replay_relay_snuba_publish_disabled,
405 )?;
406 }
407 ItemType::CheckIn => {
408 let client = envelope.meta().client();
409 self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
410 }
411 ItemType::Span if content_type == Some(&ContentType::Json) => self.produce_span(
412 scoping,
413 received_at,
414 event_id,
415 retention,
416 downsampled_retention,
417 item,
418 )?,
419 ty @ ItemType::Log => {
420 debug_assert!(
421 false,
422 "received {ty} through an envelope, \
423 this item must be submitted via a specific store message instead"
424 );
425 relay_log::error!(
426 tags.project_key = %scoping.project_key,
427 "StoreService received unsupported item type '{ty}' in envelope"
428 );
429 }
430 other => {
431 let event_type = event_item.as_ref().map(|item| item.ty().as_str());
432 let item_types = envelope
433 .items()
434 .map(|item| item.ty().as_str())
435 .collect::<Vec<_>>();
436 let attachment_types = envelope
437 .items()
438 .map(|item| {
439 item.attachment_type()
440 .map(|t| t.to_string())
441 .unwrap_or_default()
442 })
443 .collect::<Vec<_>>();
444
445 relay_log::with_scope(
446 |scope| {
447 scope.set_extra("item_types", item_types.into());
448 scope.set_extra("attachment_types", attachment_types.into());
449 if other == &ItemType::FormData {
450 let payload = item.payload();
451 let form_data_keys = FormDataIter::new(&payload)
452 .map(|entry| entry.key())
453 .collect::<Vec<_>>();
454 scope.set_extra("form_data_keys", form_data_keys.into());
455 }
456 },
457 || {
458 relay_log::error!(
459 tags.project_key = %scoping.project_key,
460 tags.event_type = event_type.unwrap_or("none"),
461 "StoreService received unexpected item type: {other}"
462 )
463 },
464 )
465 }
466 }
467 }
468
469 if let Some(recording) = replay_recording {
470 let replay_event = replay_event.map(|rv| rv.payload());
476 self.produce_replay_recording(
477 event_id,
478 scoping,
479 &recording.payload(),
480 replay_event.as_deref(),
481 None,
482 received_at,
483 retention,
484 replay_relay_snuba_publish_disabled,
485 )?;
486 }
487
488 if let Some(event_item) = event_item {
489 let event_id = event_id.ok_or(StoreError::NoEventId)?;
490 let project_id = scoping.project_id;
491 let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
492
493 self.produce(
494 event_topic,
495 KafkaMessage::Event(EventKafkaMessage {
496 payload: event_item.payload(),
497 start_time: safe_timestamp(received_at),
498 event_id,
499 project_id,
500 remote_addr,
501 attachments,
502 }),
503 )?;
504 } else {
505 debug_assert!(attachments.is_empty());
506 }
507
508 Ok(())
509 }
510
511 fn handle_store_metrics(&self, message: StoreMetrics) {
512 let StoreMetrics {
513 buckets,
514 scoping,
515 retention,
516 } = message;
517
518 let batch_size = self.config.metrics_max_batch_size_bytes();
519 let mut error = None;
520
521 let global_config = self.global_config.current();
522 let mut encoder = BucketEncoder::new(&global_config);
523
524 let now = UnixTimestamp::now();
525 let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
526
527 for mut bucket in buckets {
528 let namespace = encoder.prepare(&mut bucket);
529
530 if let Some(received_at) = bucket.metadata.received_at {
531 let delay = now.as_secs().saturating_sub(received_at.as_secs());
532 let (total, count, max) = delay_stats.get_mut(namespace);
533 *total += delay;
534 *count += 1;
535 *max = (*max).max(delay);
536 }
537
538 for view in BucketsView::new(std::slice::from_ref(&bucket))
542 .by_size(batch_size)
543 .flatten()
544 {
545 let message = self.create_metric_message(
546 scoping.organization_id,
547 scoping.project_id,
548 &mut encoder,
549 namespace,
550 &view,
551 retention,
552 );
553
554 let result =
555 message.and_then(|message| self.send_metric_message(namespace, message));
556
557 let outcome = match result {
558 Ok(()) => Outcome::Accepted,
559 Err(e) => {
560 error.get_or_insert(e);
561 Outcome::Invalid(DiscardReason::Internal)
562 }
563 };
564
565 self.metric_outcomes.track(scoping, &[view], outcome);
566 }
567 }
568
569 if let Some(error) = error {
570 relay_log::error!(
571 error = &error as &dyn std::error::Error,
572 "failed to produce metric buckets: {error}"
573 );
574 }
575
576 for (namespace, (total, count, max)) in delay_stats {
577 if count == 0 {
578 continue;
579 }
580 metric!(
581 counter(RelayCounters::MetricDelaySum) += total,
582 namespace = namespace.as_str()
583 );
584 metric!(
585 counter(RelayCounters::MetricDelayCount) += count,
586 namespace = namespace.as_str()
587 );
588 metric!(
589 gauge(RelayGauges::MetricDelayMax) = max,
590 namespace = namespace.as_str()
591 );
592 }
593 }
594
595 fn handle_store_trace_item(&self, message: Managed<StoreTraceItem>) {
596 let scoping = message.scoping();
597 let received_at = message.received_at();
598
599 let quantities = message.try_accept(|item| {
600 let item_type = item.trace_item.item_type();
601 let message = KafkaMessage::Item {
602 headers: BTreeMap::from([
603 ("project_id".to_owned(), scoping.project_id.to_string()),
604 ("item_type".to_owned(), (item_type as i32).to_string()),
605 ]),
606 message: item.trace_item,
607 item_type,
608 };
609
610 self.produce(KafkaTopic::Items, message)
611 .map(|()| item.quantities)
612 });
613
614 if let Ok(quantities) = quantities {
619 for (category, quantity) in quantities {
620 self.outcome_aggregator.send(TrackOutcome {
621 category,
622 event_id: None,
623 outcome: Outcome::Accepted,
624 quantity: u32::try_from(quantity).unwrap_or(u32::MAX),
625 remote_addr: None,
626 scoping,
627 timestamp: received_at,
628 });
629 }
630 }
631 }
632
633 fn handle_store_span(&self, message: Managed<Box<StoreSpanV2>>) {
634 let scoping = message.scoping();
635 let received_at = message.received_at();
636
637 let meta = SpanMeta {
638 organization_id: scoping.organization_id,
639 project_id: scoping.project_id,
640 key_id: scoping.key_id,
641 event_id: None,
642 retention_days: message.retention_days,
643 downsampled_retention_days: message.downsampled_retention_days,
644 received: datetime_to_timestamp(received_at),
645 };
646
647 let result = message.try_accept(|span| {
648 let item = Annotated::new(span.item);
649 let message = KafkaMessage::SpanV2 {
650 routing_key: span.routing_key,
651 headers: BTreeMap::from([(
652 "project_id".to_owned(),
653 scoping.project_id.to_string(),
654 )]),
655 message: SpanKafkaMessage {
656 meta,
657 span: SerializableAnnotated(&item),
658 },
659 };
660
661 self.produce(KafkaTopic::Spans, message)
662 });
663
664 match result {
665 Ok(()) => {
666 relay_statsd::metric!(
667 counter(RelayCounters::SpanV2Produced) += 1,
668 via = "processing"
669 );
670
671 self.outcome_aggregator.send(TrackOutcome {
674 category: DataCategory::SpanIndexed,
675 event_id: None,
676 outcome: Outcome::Accepted,
677 quantity: 1,
678 remote_addr: None,
679 scoping,
680 timestamp: received_at,
681 });
682 }
683 Err(error) => {
684 relay_log::error!(
685 error = &error as &dyn Error,
686 tags.project_key = %scoping.project_key,
687 "failed to store span"
688 );
689 }
690 }
691 }
692
693 fn handle_store_profile_chunk(&self, message: Managed<StoreProfileChunk>) {
694 let scoping = message.scoping();
695 let received_at = message.received_at();
696
697 let _ = message.try_accept(|message| {
698 let message = ProfileChunkKafkaMessage {
699 organization_id: scoping.organization_id,
700 project_id: scoping.project_id,
701 received: safe_timestamp(received_at),
702 retention_days: message.retention_days,
703 headers: BTreeMap::from([(
704 "project_id".to_owned(),
705 scoping.project_id.to_string(),
706 )]),
707 payload: message.payload,
708 };
709
710 self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
711 });
712 }
713
714 fn create_metric_message<'a>(
715 &self,
716 organization_id: OrganizationId,
717 project_id: ProjectId,
718 encoder: &'a mut BucketEncoder,
719 namespace: MetricNamespace,
720 view: &BucketView<'a>,
721 retention_days: u16,
722 ) -> Result<MetricKafkaMessage<'a>, StoreError> {
723 let value = match view.value() {
724 BucketViewValue::Counter(c) => MetricValue::Counter(c),
725 BucketViewValue::Distribution(data) => MetricValue::Distribution(
726 encoder
727 .encode_distribution(namespace, data)
728 .map_err(StoreError::EncodingFailed)?,
729 ),
730 BucketViewValue::Set(data) => MetricValue::Set(
731 encoder
732 .encode_set(namespace, data)
733 .map_err(StoreError::EncodingFailed)?,
734 ),
735 BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
736 };
737
738 Ok(MetricKafkaMessage {
739 org_id: organization_id,
740 project_id,
741 name: view.name(),
742 value,
743 timestamp: view.timestamp(),
744 tags: view.tags(),
745 retention_days,
746 received_at: view.metadata().received_at,
747 })
748 }
749
750 fn produce(
751 &self,
752 topic: KafkaTopic,
753 message: KafkaMessage,
755 ) -> Result<(), StoreError> {
756 relay_log::trace!("Sending kafka message of type {}", message.variant());
757
758 let topic_name = self.producer.client.send_message(topic, &message)?;
759
760 match &message {
761 KafkaMessage::Metric {
762 message: metric, ..
763 } => {
764 metric!(
765 counter(RelayCounters::ProcessingMessageProduced) += 1,
766 event_type = message.variant(),
767 topic = topic_name,
768 metric_type = metric.value.variant(),
769 metric_encoding = metric.value.encoding().unwrap_or(""),
770 );
771 }
772 KafkaMessage::ReplayRecordingNotChunked(replay) => {
773 let has_video = replay.replay_video.is_some();
774
775 metric!(
776 counter(RelayCounters::ProcessingMessageProduced) += 1,
777 event_type = message.variant(),
778 topic = topic_name,
779 has_video = bool_to_str(has_video),
780 );
781 }
782 message => {
783 metric!(
784 counter(RelayCounters::ProcessingMessageProduced) += 1,
785 event_type = message.variant(),
786 topic = topic_name,
787 );
788 }
789 }
790
791 Ok(())
792 }
793
794 fn produce_attachment(
806 &self,
807 event_id: EventId,
808 project_id: ProjectId,
809 item: &Item,
810 send_individual_attachments: bool,
811 ) -> Result<Option<ChunkedAttachment>, StoreError> {
812 let id = Uuid::new_v4().to_string();
813
814 let payload = item.payload();
815 let size = item.len();
816 let max_chunk_size = self.config.attachment_chunk_size();
817
818 let payload = if size == 0 {
819 AttachmentPayload::Chunked(0)
820 } else if let Some(stored_key) = item.stored_key() {
821 AttachmentPayload::Stored(stored_key.into())
822 } else if send_individual_attachments && size < max_chunk_size {
823 AttachmentPayload::Inline(payload)
827 } else {
828 let mut chunk_index = 0;
829 let mut offset = 0;
830 while offset < size {
833 let chunk_size = std::cmp::min(max_chunk_size, size - offset);
834 let chunk_message = AttachmentChunkKafkaMessage {
835 payload: payload.slice(offset..offset + chunk_size),
836 event_id,
837 project_id,
838 id: id.clone(),
839 chunk_index,
840 };
841
842 self.produce(
843 KafkaTopic::Attachments,
844 KafkaMessage::AttachmentChunk(chunk_message),
845 )?;
846 offset += chunk_size;
847 chunk_index += 1;
848 }
849
850 AttachmentPayload::Chunked(chunk_index)
853 };
854
855 let attachment = ChunkedAttachment {
856 id,
857 name: match item.filename() {
858 Some(name) => name.to_owned(),
859 None => UNNAMED_ATTACHMENT.to_owned(),
860 },
861 rate_limited: item.rate_limited(),
862 content_type: item
863 .content_type()
864 .map(|content_type| content_type.as_str().to_owned()),
865 attachment_type: item.attachment_type().cloned().unwrap_or_default(),
866 size,
867 payload,
868 };
869
870 if send_individual_attachments {
871 let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
872 event_id,
873 project_id,
874 attachment,
875 });
876 self.produce(KafkaTopic::Attachments, message)?;
877 Ok(None)
878 } else {
879 Ok(Some(attachment))
880 }
881 }
882
883 fn produce_user_report(
884 &self,
885 event_id: EventId,
886 project_id: ProjectId,
887 received_at: DateTime<Utc>,
888 item: &Item,
889 ) -> Result<(), StoreError> {
890 let message = KafkaMessage::UserReport(UserReportKafkaMessage {
891 project_id,
892 event_id,
893 start_time: safe_timestamp(received_at),
894 payload: item.payload(),
895 });
896
897 self.produce(KafkaTopic::Attachments, message)
898 }
899
900 fn produce_user_report_v2(
901 &self,
902 event_id: EventId,
903 project_id: ProjectId,
904 received_at: DateTime<Utc>,
905 item: &Item,
906 remote_addr: Option<String>,
907 ) -> Result<(), StoreError> {
908 let message = KafkaMessage::Event(EventKafkaMessage {
909 project_id,
910 event_id,
911 payload: item.payload(),
912 start_time: safe_timestamp(received_at),
913 remote_addr,
914 attachments: vec![],
915 });
916 self.produce(KafkaTopic::Feedback, message)
917 }
918
919 fn send_metric_message(
920 &self,
921 namespace: MetricNamespace,
922 message: MetricKafkaMessage,
923 ) -> Result<(), StoreError> {
924 let topic = match namespace {
925 MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
926 MetricNamespace::Unsupported => {
927 relay_log::with_scope(
928 |scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
929 || relay_log::error!("store service dropping unknown metric usecase"),
930 );
931 return Ok(());
932 }
933 _ => KafkaTopic::MetricsGeneric,
934 };
935 let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
936
937 self.produce(topic, KafkaMessage::Metric { headers, message })?;
938 Ok(())
939 }
940
941 fn produce_profile(
942 &self,
943 organization_id: OrganizationId,
944 project_id: ProjectId,
945 key_id: Option<u64>,
946 received_at: DateTime<Utc>,
947 retention_days: u16,
948 item: &Item,
949 ) -> Result<(), StoreError> {
950 let message = ProfileKafkaMessage {
951 organization_id,
952 project_id,
953 key_id,
954 received: safe_timestamp(received_at),
955 retention_days,
956 headers: BTreeMap::from([
957 (
958 "sampled".to_owned(),
959 if item.sampled() { "true" } else { "false" }.to_owned(),
960 ),
961 ("project_id".to_owned(), project_id.to_string()),
962 ]),
963 payload: item.payload(),
964 };
965 self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
966 Ok(())
967 }
968
969 fn produce_replay_event(
970 &self,
971 replay_id: EventId,
972 project_id: ProjectId,
973 received_at: DateTime<Utc>,
974 retention_days: u16,
975 payload: &[u8],
976 relay_snuba_publish_disabled: bool,
977 ) -> Result<(), StoreError> {
978 if relay_snuba_publish_disabled {
979 return Ok(());
980 }
981
982 let message = ReplayEventKafkaMessage {
983 replay_id,
984 project_id,
985 retention_days,
986 start_time: safe_timestamp(received_at),
987 payload,
988 };
989 self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
990 Ok(())
991 }
992
993 #[allow(clippy::too_many_arguments)]
994 fn produce_replay_recording(
995 &self,
996 event_id: Option<EventId>,
997 scoping: Scoping,
998 payload: &[u8],
999 replay_event: Option<&[u8]>,
1000 replay_video: Option<&[u8]>,
1001 received_at: DateTime<Utc>,
1002 retention: u16,
1003 relay_snuba_publish_disabled: bool,
1004 ) -> Result<(), StoreError> {
1005 let max_payload_size = self.config.max_replay_message_size();
1007
1008 let mut payload_size = 2000; payload_size += replay_event.as_ref().map_or(0, |b| b.len());
1012 payload_size += replay_video.as_ref().map_or(0, |b| b.len());
1013 payload_size += payload.len();
1014
1015 if payload_size >= max_payload_size {
1017 relay_log::debug!("replay_recording over maximum size.");
1018 self.outcome_aggregator.send(TrackOutcome {
1019 category: DataCategory::Replay,
1020 event_id,
1021 outcome: Outcome::Invalid(DiscardReason::TooLarge(
1022 DiscardItemType::ReplayRecording,
1023 )),
1024 quantity: 1,
1025 remote_addr: None,
1026 scoping,
1027 timestamp: received_at,
1028 });
1029 return Ok(());
1030 }
1031
1032 let message =
1033 KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
1034 replay_id: event_id.ok_or(StoreError::NoEventId)?,
1035 project_id: scoping.project_id,
1036 key_id: scoping.key_id,
1037 org_id: scoping.organization_id,
1038 received: safe_timestamp(received_at),
1039 retention_days: retention,
1040 payload,
1041 replay_event,
1042 replay_video,
1043 relay_snuba_publish_disabled,
1044 });
1045
1046 self.produce(KafkaTopic::ReplayRecordings, message)?;
1047
1048 Ok(())
1049 }
1050
1051 fn produce_replay_video(
1052 &self,
1053 event_id: Option<EventId>,
1054 scoping: Scoping,
1055 payload: Bytes,
1056 received_at: DateTime<Utc>,
1057 retention: u16,
1058 relay_snuba_publish_disabled: bool,
1059 ) -> Result<(), StoreError> {
1060 #[derive(Deserialize)]
1061 struct VideoEvent<'a> {
1062 replay_event: &'a [u8],
1063 replay_recording: &'a [u8],
1064 replay_video: &'a [u8],
1065 }
1066
1067 let Ok(VideoEvent {
1068 replay_video,
1069 replay_event,
1070 replay_recording,
1071 }) = rmp_serde::from_slice::<VideoEvent>(&payload)
1072 else {
1073 self.outcome_aggregator.send(TrackOutcome {
1074 category: DataCategory::Replay,
1075 event_id,
1076 outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
1077 quantity: 1,
1078 remote_addr: None,
1079 scoping,
1080 timestamp: received_at,
1081 });
1082 return Ok(());
1083 };
1084
1085 self.produce_replay_event(
1086 event_id.ok_or(StoreError::NoEventId)?,
1087 scoping.project_id,
1088 received_at,
1089 retention,
1090 replay_event,
1091 relay_snuba_publish_disabled,
1092 )?;
1093
1094 self.produce_replay_recording(
1095 event_id,
1096 scoping,
1097 replay_recording,
1098 Some(replay_event),
1099 Some(replay_video),
1100 received_at,
1101 retention,
1102 relay_snuba_publish_disabled,
1103 )
1104 }
1105
1106 fn produce_check_in(
1107 &self,
1108 project_id: ProjectId,
1109 received_at: DateTime<Utc>,
1110 client: Option<&str>,
1111 retention_days: u16,
1112 item: &Item,
1113 ) -> Result<(), StoreError> {
1114 let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
1115 message_type: CheckInMessageType::CheckIn,
1116 project_id,
1117 retention_days,
1118 start_time: safe_timestamp(received_at),
1119 sdk: client.map(str::to_owned),
1120 payload: item.payload(),
1121 routing_key_hint: item.routing_hint(),
1122 });
1123
1124 self.produce(KafkaTopic::Monitors, message)?;
1125
1126 Ok(())
1127 }
1128
1129 fn produce_span(
1130 &self,
1131 scoping: Scoping,
1132 received_at: DateTime<Utc>,
1133 event_id: Option<EventId>,
1134 retention_days: u16,
1135 downsampled_retention_days: u16,
1136 item: &Item,
1137 ) -> Result<(), StoreError> {
1138 debug_assert_eq!(item.ty(), &ItemType::Span);
1139 debug_assert_eq!(item.content_type(), Some(&ContentType::Json));
1140
1141 let Scoping {
1142 organization_id,
1143 project_id,
1144 project_key: _,
1145 key_id,
1146 } = scoping;
1147
1148 let payload = item.payload();
1149 let message = SpanKafkaMessageRaw {
1150 meta: SpanMeta {
1151 organization_id,
1152 project_id,
1153 key_id,
1154 event_id,
1155 retention_days,
1156 downsampled_retention_days,
1157 received: datetime_to_timestamp(received_at),
1158 },
1159 span: serde_json::from_slice(&payload)
1160 .map_err(|e| StoreError::EncodingFailed(e.into()))?,
1161 };
1162
1163 debug_assert!(message.span.contains_key("attributes"));
1165 relay_statsd::metric!(
1166 counter(RelayCounters::SpanV2Produced) += 1,
1167 via = "envelope"
1168 );
1169
1170 self.produce(
1171 KafkaTopic::Spans,
1172 KafkaMessage::SpanRaw {
1173 routing_key: item.routing_hint(),
1174 headers: BTreeMap::from([(
1175 "project_id".to_owned(),
1176 scoping.project_id.to_string(),
1177 )]),
1178 message,
1179 },
1180 )?;
1181
1182 self.outcome_aggregator.send(TrackOutcome {
1185 category: DataCategory::SpanIndexed,
1186 event_id: None,
1187 outcome: Outcome::Accepted,
1188 quantity: 1,
1189 remote_addr: None,
1190 scoping,
1191 timestamp: received_at,
1192 });
1193
1194 Ok(())
1195 }
1196}
1197
1198impl Service for StoreService {
1199 type Interface = Store;
1200
1201 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1202 let this = Arc::new(self);
1203
1204 relay_log::info!("store forwarder started");
1205
1206 while let Some(message) = rx.recv().await {
1207 let service = Arc::clone(&this);
1208 this.pool
1211 .spawn_async(async move { service.handle_message(message) }.boxed())
1212 .await;
1213 }
1214
1215 relay_log::info!("store forwarder stopped");
1216 }
1217}
1218
1219#[derive(Debug, Serialize)]
1221enum AttachmentPayload {
1222 #[serde(rename = "chunks")]
1227 Chunked(usize),
1228
1229 #[serde(rename = "data")]
1231 Inline(Bytes),
1232
1233 #[serde(rename = "stored_id")]
1235 Stored(String),
1236}
1237
1238#[derive(Debug, Serialize)]
1240struct ChunkedAttachment {
1241 id: String,
1245
1246 name: String,
1248
1249 rate_limited: bool,
1256
1257 #[serde(skip_serializing_if = "Option::is_none")]
1259 content_type: Option<String>,
1260
1261 #[serde(serialize_with = "serialize_attachment_type")]
1263 attachment_type: AttachmentType,
1264
1265 size: usize,
1267
1268 #[serde(flatten)]
1270 payload: AttachmentPayload,
1271}
1272
1273fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
1279where
1280 S: serde::Serializer,
1281 T: serde::Serialize,
1282{
1283 serde_json::to_value(t)
1284 .map_err(|e| serde::ser::Error::custom(e.to_string()))?
1285 .serialize(serializer)
1286}
1287
1288#[derive(Debug, Serialize)]
1290struct EventKafkaMessage {
1291 payload: Bytes,
1293 start_time: u64,
1295 event_id: EventId,
1297 project_id: ProjectId,
1299 remote_addr: Option<String>,
1301 attachments: Vec<ChunkedAttachment>,
1303}
1304
1305#[derive(Debug, Serialize)]
1306struct ReplayEventKafkaMessage<'a> {
1307 payload: &'a [u8],
1309 start_time: u64,
1311 replay_id: EventId,
1313 project_id: ProjectId,
1315 retention_days: u16,
1317}
1318
1319#[derive(Debug, Serialize)]
1321struct AttachmentChunkKafkaMessage {
1322 payload: Bytes,
1324 event_id: EventId,
1326 project_id: ProjectId,
1328 id: String,
1332 chunk_index: usize,
1334}
1335
1336#[derive(Debug, Serialize)]
1341struct AttachmentKafkaMessage {
1342 event_id: EventId,
1344 project_id: ProjectId,
1346 attachment: ChunkedAttachment,
1348}
1349
1350#[derive(Debug, Serialize)]
1351struct ReplayRecordingNotChunkedKafkaMessage<'a> {
1352 replay_id: EventId,
1353 key_id: Option<u64>,
1354 org_id: OrganizationId,
1355 project_id: ProjectId,
1356 received: u64,
1357 retention_days: u16,
1358 #[serde(with = "serde_bytes")]
1359 payload: &'a [u8],
1360 #[serde(with = "serde_bytes")]
1361 replay_event: Option<&'a [u8]>,
1362 #[serde(with = "serde_bytes")]
1363 replay_video: Option<&'a [u8]>,
1364 relay_snuba_publish_disabled: bool,
1365}
1366
1367#[derive(Debug, Serialize)]
1371struct UserReportKafkaMessage {
1372 project_id: ProjectId,
1374 start_time: u64,
1375 payload: Bytes,
1376
1377 #[serde(skip)]
1379 event_id: EventId,
1380}
1381
1382#[derive(Clone, Debug, Serialize)]
1383struct MetricKafkaMessage<'a> {
1384 org_id: OrganizationId,
1385 project_id: ProjectId,
1386 name: &'a MetricName,
1387 #[serde(flatten)]
1388 value: MetricValue<'a>,
1389 timestamp: UnixTimestamp,
1390 tags: &'a BTreeMap<String, String>,
1391 retention_days: u16,
1392 #[serde(skip_serializing_if = "Option::is_none")]
1393 received_at: Option<UnixTimestamp>,
1394}
1395
1396#[derive(Clone, Debug, Serialize)]
1397#[serde(tag = "type", content = "value")]
1398enum MetricValue<'a> {
1399 #[serde(rename = "c")]
1400 Counter(FiniteF64),
1401 #[serde(rename = "d")]
1402 Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
1403 #[serde(rename = "s")]
1404 Set(ArrayEncoding<'a, SetView<'a>>),
1405 #[serde(rename = "g")]
1406 Gauge(GaugeValue),
1407}
1408
1409impl MetricValue<'_> {
1410 fn variant(&self) -> &'static str {
1411 match self {
1412 Self::Counter(_) => "counter",
1413 Self::Distribution(_) => "distribution",
1414 Self::Set(_) => "set",
1415 Self::Gauge(_) => "gauge",
1416 }
1417 }
1418
1419 fn encoding(&self) -> Option<&'static str> {
1420 match self {
1421 Self::Distribution(ae) => Some(ae.name()),
1422 Self::Set(ae) => Some(ae.name()),
1423 _ => None,
1424 }
1425 }
1426}
1427
1428#[derive(Clone, Debug, Serialize)]
1429struct ProfileKafkaMessage {
1430 organization_id: OrganizationId,
1431 project_id: ProjectId,
1432 key_id: Option<u64>,
1433 received: u64,
1434 retention_days: u16,
1435 #[serde(skip)]
1436 headers: BTreeMap<String, String>,
1437 payload: Bytes,
1438}
1439
1440#[allow(dead_code)]
1446#[derive(Debug, Serialize)]
1447#[serde(rename_all = "snake_case")]
1448enum CheckInMessageType {
1449 ClockPulse,
1450 CheckIn,
1451}
1452
1453#[derive(Debug, Serialize)]
1454struct CheckInKafkaMessage {
1455 #[serde(skip)]
1456 routing_key_hint: Option<Uuid>,
1457
1458 message_type: CheckInMessageType,
1460 payload: Bytes,
1462 start_time: u64,
1464 sdk: Option<String>,
1466 project_id: ProjectId,
1468 retention_days: u16,
1470}
1471
1472#[derive(Debug, Serialize)]
1473struct SpanKafkaMessageRaw<'a> {
1474 #[serde(flatten)]
1475 meta: SpanMeta,
1476 #[serde(flatten)]
1477 span: BTreeMap<&'a str, &'a RawValue>,
1478}
1479
1480#[derive(Debug, Serialize)]
1481struct SpanKafkaMessage<'a> {
1482 #[serde(flatten)]
1483 meta: SpanMeta,
1484 #[serde(flatten)]
1485 span: SerializableAnnotated<'a, SpanV2>,
1486}
1487
1488#[derive(Debug, Serialize)]
1489struct SpanMeta {
1490 organization_id: OrganizationId,
1491 project_id: ProjectId,
1492 #[serde(skip_serializing_if = "Option::is_none")]
1494 key_id: Option<u64>,
1495 #[serde(skip_serializing_if = "Option::is_none")]
1496 event_id: Option<EventId>,
1497 received: f64,
1499 retention_days: u16,
1501 downsampled_retention_days: u16,
1503}
1504
1505#[derive(Clone, Debug, Serialize)]
1506struct ProfileChunkKafkaMessage {
1507 organization_id: OrganizationId,
1508 project_id: ProjectId,
1509 received: u64,
1510 retention_days: u16,
1511 #[serde(skip)]
1512 headers: BTreeMap<String, String>,
1513 payload: Bytes,
1514}
1515
1516#[derive(Debug, Serialize)]
1518#[serde(tag = "type", rename_all = "snake_case")]
1519#[allow(clippy::large_enum_variant)]
1520enum KafkaMessage<'a> {
1521 Event(EventKafkaMessage),
1522 UserReport(UserReportKafkaMessage),
1523 Metric {
1524 #[serde(skip)]
1525 headers: BTreeMap<String, String>,
1526 #[serde(flatten)]
1527 message: MetricKafkaMessage<'a>,
1528 },
1529 CheckIn(CheckInKafkaMessage),
1530 Item {
1531 #[serde(skip)]
1532 headers: BTreeMap<String, String>,
1533 #[serde(skip)]
1534 item_type: TraceItemType,
1535 #[serde(skip)]
1536 message: TraceItem,
1537 },
1538 SpanRaw {
1539 #[serde(skip)]
1540 routing_key: Option<Uuid>,
1541 #[serde(skip)]
1542 headers: BTreeMap<String, String>,
1543 #[serde(flatten)]
1544 message: SpanKafkaMessageRaw<'a>,
1545 },
1546 SpanV2 {
1547 #[serde(skip)]
1548 routing_key: Option<Uuid>,
1549 #[serde(skip)]
1550 headers: BTreeMap<String, String>,
1551 #[serde(flatten)]
1552 message: SpanKafkaMessage<'a>,
1553 },
1554
1555 Attachment(AttachmentKafkaMessage),
1556 AttachmentChunk(AttachmentChunkKafkaMessage),
1557
1558 Profile(ProfileKafkaMessage),
1559 ProfileChunk(ProfileChunkKafkaMessage),
1560
1561 ReplayEvent(ReplayEventKafkaMessage<'a>),
1562 ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
1563}
1564
1565impl Message for KafkaMessage<'_> {
1566 fn variant(&self) -> &'static str {
1567 match self {
1568 KafkaMessage::Event(_) => "event",
1569 KafkaMessage::UserReport(_) => "user_report",
1570 KafkaMessage::Metric { message, .. } => match message.name.namespace() {
1571 MetricNamespace::Sessions => "metric_sessions",
1572 MetricNamespace::Transactions => "metric_transactions",
1573 MetricNamespace::Spans => "metric_spans",
1574 MetricNamespace::Custom => "metric_custom",
1575 MetricNamespace::Unsupported => "metric_unsupported",
1576 },
1577 KafkaMessage::CheckIn(_) => "check_in",
1578 KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span",
1579 KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1580
1581 KafkaMessage::Attachment(_) => "attachment",
1582 KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1583
1584 KafkaMessage::Profile(_) => "profile",
1585 KafkaMessage::ProfileChunk(_) => "profile_chunk",
1586
1587 KafkaMessage::ReplayEvent(_) => "replay_event",
1588 KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
1589 }
1590 }
1591
1592 fn key(&self) -> Option<relay_kafka::Key> {
1594 match self {
1595 Self::Event(message) => Some(message.event_id.0),
1596 Self::UserReport(message) => Some(message.event_id.0),
1597 Self::SpanRaw { routing_key, .. } | Self::SpanV2 { routing_key, .. } => *routing_key,
1598
1599 Self::CheckIn(message) => message.routing_key_hint,
1604
1605 Self::Attachment(message) => Some(message.event_id.0),
1606 Self::AttachmentChunk(message) => Some(message.event_id.0),
1607 Self::ReplayEvent(message) => Some(message.replay_id.0),
1608
1609 Self::Metric { .. }
1611 | Self::Item { .. }
1612 | Self::Profile(_)
1613 | Self::ProfileChunk(_)
1614 | Self::ReplayRecordingNotChunked(_) => None,
1615 }
1616 .filter(|uuid| !uuid.is_nil())
1617 .map(|uuid| uuid.as_u128())
1618 }
1619
1620 fn headers(&self) -> Option<&BTreeMap<String, String>> {
1621 match &self {
1622 KafkaMessage::Metric { headers, .. }
1623 | KafkaMessage::SpanRaw { headers, .. }
1624 | KafkaMessage::SpanV2 { headers, .. }
1625 | KafkaMessage::Item { headers, .. }
1626 | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. })
1627 | KafkaMessage::ProfileChunk(ProfileChunkKafkaMessage { headers, .. }) => Some(headers),
1628
1629 KafkaMessage::Event(_)
1630 | KafkaMessage::UserReport(_)
1631 | KafkaMessage::CheckIn(_)
1632 | KafkaMessage::Attachment(_)
1633 | KafkaMessage::AttachmentChunk(_)
1634 | KafkaMessage::ReplayEvent(_)
1635 | KafkaMessage::ReplayRecordingNotChunked(_) => None,
1636 }
1637 }
1638
1639 fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
1640 match self {
1641 KafkaMessage::Metric { message, .. } => serialize_as_json(message),
1642 KafkaMessage::ReplayEvent(message) => serialize_as_json(message),
1643 KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message),
1644 KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message),
1645 KafkaMessage::Item { message, .. } => {
1646 let mut payload = Vec::new();
1647 match message.encode(&mut payload) {
1648 Ok(_) => Ok(SerializationOutput::Protobuf(Cow::Owned(payload))),
1649 Err(_) => Err(ClientError::ProtobufEncodingFailed),
1650 }
1651 }
1652 KafkaMessage::Event(_)
1653 | KafkaMessage::UserReport(_)
1654 | KafkaMessage::CheckIn(_)
1655 | KafkaMessage::Attachment(_)
1656 | KafkaMessage::AttachmentChunk(_)
1657 | KafkaMessage::Profile(_)
1658 | KafkaMessage::ProfileChunk(_)
1659 | KafkaMessage::ReplayRecordingNotChunked(_) => match rmp_serde::to_vec_named(&self) {
1660 Ok(x) => Ok(SerializationOutput::MsgPack(Cow::Owned(x))),
1661 Err(err) => Err(ClientError::InvalidMsgPack(err)),
1662 },
1663 }
1664 }
1665}
1666
1667fn serialize_as_json<T: serde::Serialize>(
1668 value: &T,
1669) -> Result<SerializationOutput<'_>, ClientError> {
1670 match serde_json::to_vec(value) {
1671 Ok(vec) => Ok(SerializationOutput::Json(Cow::Owned(vec))),
1672 Err(err) => Err(ClientError::InvalidJson(err)),
1673 }
1674}
1675
1676fn is_slow_item(item: &Item) -> bool {
1680 item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
1681}
1682
1683fn bool_to_str(value: bool) -> &'static str {
1684 if value { "true" } else { "false" }
1685}
1686
1687fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
1691 let ts = timestamp.timestamp();
1692 if ts >= 0 {
1693 return ts as u64;
1694 }
1695
1696 Utc::now().timestamp() as u64
1698}
1699
1700#[cfg(test)]
1701mod tests {
1702
1703 use super::*;
1704
1705 #[test]
1706 fn disallow_outcomes() {
1707 let config = Config::default();
1708 let producer = Producer::create(&config).unwrap();
1709
1710 for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
1711 let res = producer
1712 .client
1713 .send(topic, Some(0x0123456789abcdef), None, "foo", b"");
1714
1715 assert!(matches!(res, Err(ClientError::InvalidTopicName)));
1716 }
1717 }
1718}