use anyhow::Context;
use relay_base_schema::organization::OrganizationId;
use serde::ser::SerializeMap;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::error::Error;
use std::sync::Arc;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use relay_base_schema::data_category::DataCategory;
use relay_base_schema::project::ProjectId;
use relay_common::time::UnixTimestamp;
use relay_config::Config;
use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};
use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
use relay_metrics::{
Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, FiniteF64, GaugeValue,
MetricName, MetricNamespace, SetView,
};
use relay_quotas::Scoping;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::Deserializer;
use uuid::Uuid;
use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::{FormDataIter, ThreadPool, TypedEnvelope, WorkerGroup};
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error("failed to send the message to kafka: {0}")]
SendFailed(#[from] ClientError),
#[error("failed to encode data: {0}")]
EncodingFailed(std::io::Error),
#[error("failed to store event because event id was missing")]
NoEventId,
}
struct Producer {
client: KafkaClient,
}
impl Producer {
pub fn create(config: &Config) -> anyhow::Result<Self> {
let mut client_builder = KafkaClient::builder();
for topic in KafkaTopic::iter().filter(|t| {
**t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
}) {
let kafka_config = &config.kafka_config(*topic)?;
client_builder = client_builder
.add_kafka_topic_config(*topic, kafka_config, config.kafka_validate_topics())
.context(ServiceError::Kafka)?;
}
Ok(Self {
client: client_builder.build(),
})
}
}
#[derive(Debug)]
pub struct StoreEnvelope {
pub envelope: TypedEnvelope<Processed>,
}
#[derive(Clone, Debug)]
pub struct StoreMetrics {
pub buckets: Vec<Bucket>,
pub scoping: Scoping,
pub retention: u16,
}
#[derive(Debug)]
pub enum Store {
Envelope(StoreEnvelope),
Metrics(StoreMetrics),
}
impl Store {
fn variant(&self) -> &'static str {
match self {
Store::Envelope(_) => "envelope",
Store::Metrics(_) => "metrics",
}
}
}
impl Interface for Store {}
impl FromMessage<StoreEnvelope> for Store {
type Response = NoResponse;
fn from_message(message: StoreEnvelope, _: ()) -> Self {
Self::Envelope(message)
}
}
impl FromMessage<StoreMetrics> for Store {
type Response = NoResponse;
fn from_message(message: StoreMetrics, _: ()) -> Self {
Self::Metrics(message)
}
}
pub struct StoreService {
workers: WorkerGroup,
config: Arc<Config>,
global_config: GlobalConfigHandle,
outcome_aggregator: Addr<TrackOutcome>,
metric_outcomes: MetricOutcomes,
producer: Producer,
}
impl StoreService {
pub fn create(
pool: ThreadPool,
config: Arc<Config>,
global_config: GlobalConfigHandle,
outcome_aggregator: Addr<TrackOutcome>,
metric_outcomes: MetricOutcomes,
) -> anyhow::Result<Self> {
let producer = Producer::create(&config)?;
Ok(Self {
workers: WorkerGroup::new(pool),
config,
global_config,
outcome_aggregator,
metric_outcomes,
producer,
})
}
fn handle_message(&self, message: Store) {
let ty = message.variant();
relay_statsd::metric!(timer(RelayTimers::StoreServiceDuration), message = ty, {
match message {
Store::Envelope(message) => self.handle_store_envelope(message),
Store::Metrics(message) => self.handle_store_metrics(message),
}
})
}
fn handle_store_envelope(&self, message: StoreEnvelope) {
let StoreEnvelope {
envelope: mut managed,
} = message;
let scoping = managed.scoping();
let envelope = managed.take_envelope();
match self.store_envelope(envelope, managed.received_at(), scoping) {
Ok(()) => managed.accept(),
Err(error) => {
managed.reject(Outcome::Invalid(DiscardReason::Internal));
relay_log::error!(
error = &error as &dyn Error,
tags.project_key = %scoping.project_key,
"failed to store envelope"
);
}
}
}
fn store_envelope(
&self,
mut envelope: Box<Envelope>,
received_at: DateTime<Utc>,
scoping: Scoping,
) -> Result<(), StoreError> {
let retention = envelope.retention();
let event_id = envelope.event_id();
let event_item = envelope.as_mut().take_item_by(|item| {
matches!(
item.ty(),
ItemType::Event | ItemType::Transaction | ItemType::Security
)
});
let event_type = event_item.as_ref().map(|item| item.ty());
let topic = if envelope.get_item_by(is_slow_item).is_some() {
KafkaTopic::Attachments
} else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
KafkaTopic::Transactions
} else {
KafkaTopic::Events
};
let send_individual_attachments = matches!(event_type, None | Some(&ItemType::Transaction));
let mut attachments = Vec::new();
let mut replay_event = None;
let mut replay_recording = None;
for item in envelope.items() {
match item.ty() {
ItemType::Attachment => {
debug_assert!(topic == KafkaTopic::Attachments);
if let Some(attachment) = self.produce_attachment(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
item,
send_individual_attachments,
)? {
attachments.push(attachment);
}
}
ItemType::UserReport => {
debug_assert!(topic == KafkaTopic::Attachments);
self.produce_user_report(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
received_at,
item,
)?;
}
ItemType::UserReportV2 => {
let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
self.produce_user_report_v2(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
received_at,
item,
remote_addr,
)?;
}
ItemType::Profile => self.produce_profile(
scoping.organization_id,
scoping.project_id,
scoping.key_id,
received_at,
retention,
item,
)?,
ItemType::ReplayVideo => {
self.produce_replay_video(
event_id,
scoping,
item.payload(),
received_at,
retention,
)?;
}
ItemType::ReplayRecording => {
replay_recording = Some(item);
}
ItemType::ReplayEvent => {
if item.replay_combined_payload() {
replay_event = Some(item);
}
self.produce_replay_event(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
received_at,
retention,
&item.payload(),
)?;
}
ItemType::CheckIn => {
let client = envelope.meta().client();
self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
}
ItemType::Span => {
self.produce_span(scoping, received_at, event_id, retention, item)?
}
ItemType::ProfileChunk => self.produce_profile_chunk(
scoping.organization_id,
scoping.project_id,
received_at,
retention,
item,
)?,
other => {
let event_type = event_item.as_ref().map(|item| item.ty().as_str());
let item_types = envelope
.items()
.map(|item| item.ty().as_str())
.collect::<Vec<_>>();
let attachment_types = envelope
.items()
.map(|item| {
item.attachment_type()
.map(|t| t.to_string())
.unwrap_or_default()
})
.collect::<Vec<_>>();
relay_log::with_scope(
|scope| {
scope.set_extra("item_types", item_types.into());
scope.set_extra("attachment_types", attachment_types.into());
if other == &ItemType::FormData {
let payload = item.payload();
let form_data_keys = FormDataIter::new(&payload)
.map(|entry| entry.key())
.collect::<Vec<_>>();
scope.set_extra("form_data_keys", form_data_keys.into());
}
},
|| {
relay_log::error!(
tags.project_key = %scoping.project_key,
tags.event_type = event_type.unwrap_or("none"),
"StoreService received unexpected item type: {other}"
)
},
)
}
}
}
if let Some(recording) = replay_recording {
let replay_event = replay_event.map(|rv| rv.payload());
self.produce_replay_recording(
event_id,
scoping,
&recording.payload(),
replay_event.as_deref(),
None,
received_at,
retention,
)?;
}
if let Some(event_item) = event_item {
let event_id = event_id.ok_or(StoreError::NoEventId)?;
let project_id = scoping.project_id;
let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
self.produce(
topic,
KafkaMessage::Event(EventKafkaMessage {
payload: event_item.payload(),
start_time: safe_timestamp(received_at),
event_id,
project_id,
remote_addr,
attachments,
}),
)?;
} else {
debug_assert!(attachments.is_empty());
}
Ok(())
}
fn handle_store_metrics(&self, message: StoreMetrics) {
let StoreMetrics {
buckets,
scoping,
retention,
} = message;
let batch_size = self.config.metrics_max_batch_size_bytes();
let mut error = None;
let global_config = self.global_config.current();
let mut encoder = BucketEncoder::new(&global_config);
let now = UnixTimestamp::now();
let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();
for mut bucket in buckets {
let namespace = encoder.prepare(&mut bucket);
if let Some(received_at) = bucket.metadata.received_at {
let delay = now.as_secs().saturating_sub(received_at.as_secs());
let (total, count, max) = delay_stats.get_mut(namespace);
*total += delay;
*count += 1;
*max = (*max).max(delay);
}
for view in BucketsView::new(std::slice::from_ref(&bucket))
.by_size(batch_size)
.flatten()
{
let message = self.create_metric_message(
scoping.organization_id,
scoping.project_id,
&mut encoder,
namespace,
&view,
retention,
);
let result =
message.and_then(|message| self.send_metric_message(namespace, message));
let outcome = match result {
Ok(()) => Outcome::Accepted,
Err(e) => {
error.get_or_insert(e);
Outcome::Invalid(DiscardReason::Internal)
}
};
self.metric_outcomes.track(scoping, &[view], outcome);
}
}
if let Some(error) = error {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to produce metric buckets: {error}"
);
}
for (namespace, (total, count, max)) in delay_stats {
if count == 0 {
continue;
}
metric!(
counter(RelayCounters::MetricDelaySum) += total,
namespace = namespace.as_str()
);
metric!(
counter(RelayCounters::MetricDelayCount) += count,
namespace = namespace.as_str()
);
metric!(
gauge(RelayGauges::MetricDelayMax) = max,
namespace = namespace.as_str()
);
}
}
fn create_metric_message<'a>(
&self,
organization_id: OrganizationId,
project_id: ProjectId,
encoder: &'a mut BucketEncoder,
namespace: MetricNamespace,
view: &BucketView<'a>,
retention_days: u16,
) -> Result<MetricKafkaMessage<'a>, StoreError> {
let value = match view.value() {
BucketViewValue::Counter(c) => MetricValue::Counter(c),
BucketViewValue::Distribution(data) => MetricValue::Distribution(
encoder
.encode_distribution(namespace, data)
.map_err(StoreError::EncodingFailed)?,
),
BucketViewValue::Set(data) => MetricValue::Set(
encoder
.encode_set(namespace, data)
.map_err(StoreError::EncodingFailed)?,
),
BucketViewValue::Gauge(g) => MetricValue::Gauge(g),
};
Ok(MetricKafkaMessage {
org_id: organization_id,
project_id,
name: view.name(),
value,
timestamp: view.timestamp(),
tags: view.tags(),
retention_days,
received_at: view.metadata().received_at,
})
}
fn produce(
&self,
topic: KafkaTopic,
message: KafkaMessage,
) -> Result<(), StoreError> {
relay_log::trace!("Sending kafka message of type {}", message.variant());
let topic_name = self.producer.client.send_message(topic, &message)?;
match &message {
KafkaMessage::Metric {
message: metric, ..
} => {
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = message.variant(),
topic = topic_name,
metric_type = metric.value.variant(),
metric_encoding = metric.value.encoding().unwrap_or(""),
);
}
KafkaMessage::Span { message: span, .. } => {
let is_segment = span.is_segment;
let has_parent = span.parent_span_id.is_some();
let platform = VALID_PLATFORMS.iter().find(|p| *p == &span.platform);
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = message.variant(),
topic = topic_name,
platform = platform.unwrap_or(&""),
is_segment = bool_to_str(is_segment),
has_parent = bool_to_str(has_parent),
topic = topic_name,
);
}
KafkaMessage::ReplayRecordingNotChunked(replay) => {
let has_video = replay.replay_video.is_some();
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = message.variant(),
topic = topic_name,
has_video = bool_to_str(has_video),
);
}
message => {
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = message.variant(),
topic = topic_name,
);
}
}
Ok(())
}
fn produce_attachment(
&self,
event_id: EventId,
project_id: ProjectId,
item: &Item,
send_individual_attachments: bool,
) -> Result<Option<ChunkedAttachment>, StoreError> {
let id = Uuid::new_v4().to_string();
let mut chunk_index = 0;
let payload = item.payload();
let size = item.len();
let max_chunk_size = self.config.attachment_chunk_size();
let data = if send_individual_attachments && size < max_chunk_size {
(size > 0).then_some(payload)
} else {
let mut offset = 0;
while offset < size {
let chunk_size = std::cmp::min(max_chunk_size, size - offset);
let chunk_message = AttachmentChunkKafkaMessage {
payload: payload.slice(offset..offset + chunk_size),
event_id,
project_id,
id: id.clone(),
chunk_index,
};
self.produce(
KafkaTopic::Attachments,
KafkaMessage::AttachmentChunk(chunk_message),
)?;
offset += chunk_size;
chunk_index += 1;
}
None
};
let attachment = ChunkedAttachment {
id,
name: match item.filename() {
Some(name) => name.to_owned(),
None => UNNAMED_ATTACHMENT.to_owned(),
},
content_type: item
.content_type()
.map(|content_type| content_type.as_str().to_owned()),
attachment_type: item.attachment_type().cloned().unwrap_or_default(),
chunks: chunk_index,
data,
size: Some(size),
rate_limited: Some(item.rate_limited()),
};
if send_individual_attachments {
let message = KafkaMessage::Attachment(AttachmentKafkaMessage {
event_id,
project_id,
attachment,
});
self.produce(KafkaTopic::Attachments, message)?;
Ok(None)
} else {
Ok(Some(attachment))
}
}
fn produce_user_report(
&self,
event_id: EventId,
project_id: ProjectId,
received_at: DateTime<Utc>,
item: &Item,
) -> Result<(), StoreError> {
let message = KafkaMessage::UserReport(UserReportKafkaMessage {
project_id,
event_id,
start_time: safe_timestamp(received_at),
payload: item.payload(),
});
self.produce(KafkaTopic::Attachments, message)
}
fn produce_user_report_v2(
&self,
event_id: EventId,
project_id: ProjectId,
received_at: DateTime<Utc>,
item: &Item,
remote_addr: Option<String>,
) -> Result<(), StoreError> {
let message = KafkaMessage::Event(EventKafkaMessage {
project_id,
event_id,
payload: item.payload(),
start_time: safe_timestamp(received_at),
remote_addr,
attachments: vec![],
});
self.produce(KafkaTopic::Feedback, message)
}
fn send_metric_message(
&self,
namespace: MetricNamespace,
message: MetricKafkaMessage,
) -> Result<(), StoreError> {
let topic = match namespace {
MetricNamespace::Sessions => KafkaTopic::MetricsSessions,
MetricNamespace::Unsupported => {
relay_log::with_scope(
|scope| scope.set_extra("metric_message.name", message.name.as_ref().into()),
|| relay_log::error!("store service dropping unknown metric usecase"),
);
return Ok(());
}
_ => KafkaTopic::MetricsGeneric,
};
let headers = BTreeMap::from([("namespace".to_string(), namespace.to_string())]);
self.produce(topic, KafkaMessage::Metric { headers, message })?;
Ok(())
}
fn produce_profile(
&self,
organization_id: OrganizationId,
project_id: ProjectId,
key_id: Option<u64>,
received_at: DateTime<Utc>,
retention_days: u16,
item: &Item,
) -> Result<(), StoreError> {
let message = ProfileKafkaMessage {
organization_id,
project_id,
key_id,
received: safe_timestamp(received_at),
retention_days,
headers: BTreeMap::from([(
"sampled".to_string(),
if item.sampled() { "true" } else { "false" }.to_owned(),
)]),
payload: item.payload(),
};
self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
Ok(())
}
fn produce_replay_event(
&self,
replay_id: EventId,
project_id: ProjectId,
received_at: DateTime<Utc>,
retention_days: u16,
payload: &[u8],
) -> Result<(), StoreError> {
let message = ReplayEventKafkaMessage {
replay_id,
project_id,
retention_days,
start_time: safe_timestamp(received_at),
payload,
};
self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn produce_replay_recording(
&self,
event_id: Option<EventId>,
scoping: Scoping,
payload: &[u8],
replay_event: Option<&[u8]>,
replay_video: Option<&[u8]>,
received_at: DateTime<Utc>,
retention: u16,
) -> Result<(), StoreError> {
let max_payload_size = self.config.max_replay_message_size();
let mut payload_size = 2000; payload_size += replay_event.as_ref().map_or(0, |b| b.len());
payload_size += replay_video.as_ref().map_or(0, |b| b.len());
payload_size += payload.len();
if payload_size >= max_payload_size {
relay_log::warn!("replay_recording over maximum size.");
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::Replay,
event_id,
outcome: Outcome::Invalid(DiscardReason::TooLarge),
quantity: 1,
remote_addr: None,
scoping,
timestamp: received_at,
});
return Ok(());
}
let message =
KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
replay_id: event_id.ok_or(StoreError::NoEventId)?,
project_id: scoping.project_id,
key_id: scoping.key_id,
org_id: scoping.organization_id,
received: safe_timestamp(received_at),
retention_days: retention,
payload,
replay_event,
replay_video,
});
self.produce(KafkaTopic::ReplayRecordings, message)?;
Ok(())
}
fn produce_replay_video(
&self,
event_id: Option<EventId>,
scoping: Scoping,
payload: Bytes,
received_at: DateTime<Utc>,
retention: u16,
) -> Result<(), StoreError> {
#[derive(Deserialize)]
struct VideoEvent<'a> {
replay_event: &'a [u8],
replay_recording: &'a [u8],
replay_video: &'a [u8],
}
let Ok(VideoEvent {
replay_video,
replay_event,
replay_recording,
}) = rmp_serde::from_slice::<VideoEvent>(&payload)
else {
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::Replay,
event_id,
outcome: Outcome::Invalid(DiscardReason::InvalidReplayEvent),
quantity: 1,
remote_addr: None,
scoping,
timestamp: received_at,
});
return Ok(());
};
self.produce_replay_event(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
received_at,
retention,
replay_event,
)?;
self.produce_replay_recording(
event_id,
scoping,
replay_recording,
Some(replay_event),
Some(replay_video),
received_at,
retention,
)
}
fn produce_check_in(
&self,
project_id: ProjectId,
received_at: DateTime<Utc>,
client: Option<&str>,
retention_days: u16,
item: &Item,
) -> Result<(), StoreError> {
let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
message_type: CheckInMessageType::CheckIn,
project_id,
retention_days,
start_time: safe_timestamp(received_at),
sdk: client.map(str::to_owned),
payload: item.payload(),
routing_key_hint: item.routing_hint(),
});
self.produce(KafkaTopic::Monitors, message)?;
Ok(())
}
fn produce_span(
&self,
scoping: Scoping,
received_at: DateTime<Utc>,
event_id: Option<EventId>,
retention_days: u16,
item: &Item,
) -> Result<(), StoreError> {
relay_log::trace!("Producing span");
let payload = item.payload();
let d = &mut Deserializer::from_slice(&payload);
let mut span: SpanKafkaMessage = match serde_path_to_error::deserialize(d) {
Ok(span) => span,
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to parse span"
);
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id: None,
outcome: Outcome::Invalid(DiscardReason::InvalidSpan),
quantity: 1,
remote_addr: None,
scoping,
timestamp: received_at,
});
return Ok(());
}
};
span.duration_ms =
((span.end_timestamp_precise - span.start_timestamp_precise) * 1e3) as u32;
span.event_id = event_id;
span.organization_id = scoping.organization_id.value();
span.project_id = scoping.project_id.value();
span.retention_days = retention_days;
span.start_timestamp_ms = (span.start_timestamp_precise * 1e3) as u64;
if let Some(measurements) = &mut span.measurements {
measurements.retain(|_, v| {
v.as_ref()
.and_then(|v| v.value)
.map_or(false, f64::is_finite)
});
}
self.produce(
KafkaTopic::Spans,
KafkaMessage::Span {
headers: BTreeMap::from([
("project_id".to_string(), scoping.project_id.to_string()),
(
"ingest_in_eap".to_string(),
item.ingest_span_in_eap().to_string(),
),
]),
message: span,
},
)?;
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id: None,
outcome: Outcome::Accepted,
quantity: 1,
remote_addr: None,
scoping,
timestamp: received_at,
});
Ok(())
}
fn produce_profile_chunk(
&self,
organization_id: OrganizationId,
project_id: ProjectId,
received_at: DateTime<Utc>,
retention_days: u16,
item: &Item,
) -> Result<(), StoreError> {
let message = ProfileChunkKafkaMessage {
organization_id,
project_id,
received: safe_timestamp(received_at),
retention_days,
payload: item.payload(),
};
self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))?;
Ok(())
}
}
impl Service for StoreService {
type Interface = Store;
async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
let this = Arc::new(self);
relay_log::info!("store forwarder started");
while let Some(message) = rx.recv().await {
let service = Arc::clone(&this);
this.workers
.spawn(move || service.handle_message(message))
.await;
}
relay_log::info!("store forwarder stopped");
}
}
#[derive(Debug, Serialize)]
struct ChunkedAttachment {
id: String,
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
content_type: Option<String>,
#[serde(serialize_with = "serialize_attachment_type")]
attachment_type: AttachmentType,
chunks: usize,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<Bytes>,
#[serde(skip_serializing_if = "Option::is_none")]
size: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
rate_limited: Option<bool>,
}
fn serialize_attachment_type<S, T>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
T: serde::Serialize,
{
serde_json::to_value(t)
.map_err(|e| serde::ser::Error::custom(e.to_string()))?
.serialize(serializer)
}
pub fn serialize_btreemap_skip_nulls<S>(
map: &Option<BTreeMap<&str, Option<String>>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let Some(map) = map else {
return serializer.serialize_none();
};
let mut m = serializer.serialize_map(Some(map.len()))?;
for (key, value) in map.iter() {
if let Some(value) = value {
m.serialize_entry(key, value)?;
}
}
m.end()
}
#[derive(Debug, Serialize)]
struct EventKafkaMessage {
payload: Bytes,
start_time: u64,
event_id: EventId,
project_id: ProjectId,
remote_addr: Option<String>,
attachments: Vec<ChunkedAttachment>,
}
#[derive(Debug, Serialize)]
struct ReplayEventKafkaMessage<'a> {
payload: &'a [u8],
start_time: u64,
replay_id: EventId,
project_id: ProjectId,
retention_days: u16,
}
#[derive(Debug, Serialize)]
struct AttachmentChunkKafkaMessage {
payload: Bytes,
event_id: EventId,
project_id: ProjectId,
id: String,
chunk_index: usize,
}
#[derive(Debug, Serialize)]
struct AttachmentKafkaMessage {
event_id: EventId,
project_id: ProjectId,
attachment: ChunkedAttachment,
}
#[derive(Debug, Serialize)]
struct ReplayRecordingNotChunkedKafkaMessage<'a> {
replay_id: EventId,
key_id: Option<u64>,
org_id: OrganizationId,
project_id: ProjectId,
received: u64,
retention_days: u16,
#[serde(with = "serde_bytes")]
payload: &'a [u8],
#[serde(with = "serde_bytes")]
replay_event: Option<&'a [u8]>,
#[serde(with = "serde_bytes")]
replay_video: Option<&'a [u8]>,
}
#[derive(Debug, Serialize)]
struct UserReportKafkaMessage {
project_id: ProjectId,
start_time: u64,
payload: Bytes,
#[serde(skip)]
event_id: EventId,
}
#[derive(Clone, Debug, Serialize)]
struct MetricKafkaMessage<'a> {
org_id: OrganizationId,
project_id: ProjectId,
name: &'a MetricName,
#[serde(flatten)]
value: MetricValue<'a>,
timestamp: UnixTimestamp,
tags: &'a BTreeMap<String, String>,
retention_days: u16,
#[serde(skip_serializing_if = "Option::is_none")]
received_at: Option<UnixTimestamp>,
}
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type", content = "value")]
enum MetricValue<'a> {
#[serde(rename = "c")]
Counter(FiniteF64),
#[serde(rename = "d")]
Distribution(ArrayEncoding<'a, &'a [FiniteF64]>),
#[serde(rename = "s")]
Set(ArrayEncoding<'a, SetView<'a>>),
#[serde(rename = "g")]
Gauge(GaugeValue),
}
impl MetricValue<'_> {
fn variant(&self) -> &'static str {
match self {
Self::Counter(_) => "counter",
Self::Distribution(_) => "distribution",
Self::Set(_) => "set",
Self::Gauge(_) => "gauge",
}
}
fn encoding(&self) -> Option<&'static str> {
match self {
Self::Distribution(ae) => Some(ae.name()),
Self::Set(ae) => Some(ae.name()),
_ => None,
}
}
}
#[derive(Clone, Debug, Serialize)]
struct ProfileKafkaMessage {
organization_id: OrganizationId,
project_id: ProjectId,
key_id: Option<u64>,
received: u64,
retention_days: u16,
#[serde(skip)]
headers: BTreeMap<String, String>,
payload: Bytes,
}
#[allow(dead_code)]
#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
enum CheckInMessageType {
ClockPulse,
CheckIn,
}
#[derive(Debug, Serialize)]
struct CheckInKafkaMessage {
#[serde(skip)]
routing_key_hint: Option<Uuid>,
message_type: CheckInMessageType,
payload: Bytes,
start_time: u64,
sdk: Option<String>,
project_id: ProjectId,
retention_days: u16,
}
#[derive(Debug, Deserialize, Serialize)]
struct SpanMeasurement {
#[serde(default, skip_serializing_if = "Option::is_none")]
value: Option<f64>,
}
#[derive(Debug, Deserialize, Serialize)]
struct SpanKafkaMessage<'a> {
#[serde(default, skip_serializing_if = "Option::is_none")]
description: Option<&'a RawValue>,
#[serde(default)]
duration_ms: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
event_id: Option<EventId>,
#[serde(rename(deserialize = "exclusive_time"))]
exclusive_time_ms: f64,
#[serde(default)]
is_segment: bool,
#[serde(default, skip_serializing_if = "none_or_empty_object")]
data: Option<&'a RawValue>,
#[serde(borrow, default, skip_serializing_if = "Option::is_none")]
measurements: Option<BTreeMap<Cow<'a, str>, Option<SpanMeasurement>>>,
#[serde(default)]
organization_id: u64,
#[serde(borrow, default, skip_serializing_if = "Option::is_none")]
origin: Option<Cow<'a, str>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent_span_id: Option<&'a str>,
#[serde(default, skip_serializing_if = "Option::is_none")]
profile_id: Option<&'a str>,
#[serde(default)]
project_id: u64,
received: f64,
#[serde(default)]
retention_days: u16,
#[serde(default, skip_serializing_if = "Option::is_none")]
segment_id: Option<&'a str>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
serialize_with = "serialize_btreemap_skip_nulls"
)]
sentry_tags: Option<BTreeMap<&'a str, Option<String>>>,
span_id: &'a str,
#[serde(default, skip_serializing_if = "none_or_empty_object")]
tags: Option<&'a RawValue>,
trace_id: EventId,
#[serde(default)]
start_timestamp_ms: u64,
#[serde(rename(deserialize = "start_timestamp"))]
start_timestamp_precise: f64,
#[serde(rename(deserialize = "timestamp"))]
end_timestamp_precise: f64,
#[serde(borrow, default, skip_serializing)]
platform: Cow<'a, str>, }
fn none_or_empty_object(value: &Option<&RawValue>) -> bool {
match value {
None => true,
Some(raw) => raw.get() == "{}",
}
}
#[derive(Clone, Debug, Serialize)]
struct ProfileChunkKafkaMessage {
organization_id: OrganizationId,
project_id: ProjectId,
received: u64,
retention_days: u16,
payload: Bytes,
}
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[allow(clippy::large_enum_variant)]
enum KafkaMessage<'a> {
Event(EventKafkaMessage),
Attachment(AttachmentKafkaMessage),
AttachmentChunk(AttachmentChunkKafkaMessage),
UserReport(UserReportKafkaMessage),
Metric {
#[serde(skip)]
headers: BTreeMap<String, String>,
#[serde(flatten)]
message: MetricKafkaMessage<'a>,
},
Profile(ProfileKafkaMessage),
ReplayEvent(ReplayEventKafkaMessage<'a>),
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
CheckIn(CheckInKafkaMessage),
Span {
#[serde(skip)]
headers: BTreeMap<String, String>,
#[serde(flatten)]
message: SpanKafkaMessage<'a>,
},
ProfileChunk(ProfileChunkKafkaMessage),
}
impl Message for KafkaMessage<'_> {
fn variant(&self) -> &'static str {
match self {
KafkaMessage::Event(_) => "event",
KafkaMessage::Attachment(_) => "attachment",
KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
KafkaMessage::UserReport(_) => "user_report",
KafkaMessage::Metric { message, .. } => match message.name.namespace() {
MetricNamespace::Sessions => "metric_sessions",
MetricNamespace::Transactions => "metric_transactions",
MetricNamespace::Spans => "metric_spans",
MetricNamespace::Custom => "metric_custom",
MetricNamespace::Stats => "metric_metric_stats",
MetricNamespace::Unsupported => "metric_unsupported",
},
KafkaMessage::Profile(_) => "profile",
KafkaMessage::ReplayEvent(_) => "replay_event",
KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
KafkaMessage::CheckIn(_) => "check_in",
KafkaMessage::Span { .. } => "span",
KafkaMessage::ProfileChunk(_) => "profile_chunk",
}
}
fn key(&self) -> [u8; 16] {
let mut uuid = match self {
Self::Event(message) => message.event_id.0,
Self::Attachment(message) => message.event_id.0,
Self::AttachmentChunk(message) => message.event_id.0,
Self::UserReport(message) => message.event_id.0,
Self::ReplayEvent(message) => message.replay_id.0,
Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil),
Self::Profile(_)
| Self::Span { .. }
| Self::ReplayRecordingNotChunked(_)
| Self::ProfileChunk(_) => Uuid::nil(),
Self::Metric { .. } => Uuid::nil(),
};
if uuid.is_nil() {
uuid = Uuid::new_v4();
}
*uuid.as_bytes()
}
fn headers(&self) -> Option<&BTreeMap<String, String>> {
match &self {
KafkaMessage::Metric { headers, .. } => {
if !headers.is_empty() {
return Some(headers);
}
None
}
KafkaMessage::Profile(profile) => {
if !profile.headers.is_empty() {
return Some(&profile.headers);
}
None
}
KafkaMessage::Span { headers, .. } => {
if !headers.is_empty() {
return Some(headers);
}
None
}
_ => None,
}
}
fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError> {
match self {
KafkaMessage::Metric { message, .. } => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
KafkaMessage::ReplayEvent(message) => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
KafkaMessage::Span { message, .. } => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
_ => rmp_serde::to_vec_named(&self)
.map(Cow::Owned)
.map_err(ClientError::InvalidMsgPack),
}
}
}
fn is_slow_item(item: &Item) -> bool {
item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
}
fn bool_to_str(value: bool) -> &'static str {
if value {
"true"
} else {
"false"
}
}
fn safe_timestamp(timestamp: DateTime<Utc>) -> u64 {
let ts = timestamp.timestamp();
if ts >= 0 {
return ts as u64;
}
Utc::now().timestamp() as u64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn disallow_outcomes() {
let config = Config::default();
let producer = Producer::create(&config).unwrap();
for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
let res = producer
.client
.send(topic, b"0123456789abcdef", None, "foo", b"");
assert!(matches!(res, Err(ClientError::InvalidTopicName)));
}
}
}