use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::error::Error;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, mem};
#[cfg(feature = "processing")]
use anyhow::Context;
use chrono::{DateTime, SecondsFormat, Utc};
use relay_base_schema::organization::OrganizationId;
use relay_base_schema::project::ProjectId;
use relay_common::time::UnixTimestamp;
use relay_config::{Config, EmitOutcomes};
use relay_dynamic_config::Feature;
use relay_event_schema::protocol::{ClientReport, DiscardedEvent, EventId};
use relay_filter::FilterStatKey;
#[cfg(feature = "processing")]
use relay_kafka::{ClientError, KafkaClient, KafkaTopic};
use relay_quotas::{DataCategory, ReasonCode, Scoping};
use relay_sampling::config::RuleId;
use relay_sampling::evaluation::MatchedRuleIds;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use serde::{Deserialize, Serialize};
#[cfg(feature = "processing")]
use crate::service::ServiceError;
use crate::services::processor::{EnvelopeProcessor, SubmitClientReports};
use crate::services::upstream::{Method, SendQuery, UpstreamQuery, UpstreamRelay};
use crate::statsd::RelayCounters;
use crate::utils::SleepHandle;
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct SendOutcomes {
#[serde(default)]
pub outcomes: Vec<TrackRawOutcome>,
}
impl UpstreamQuery for SendOutcomes {
type Response = SendOutcomesResponse;
fn method(&self) -> Method {
Method::POST
}
fn path(&self) -> Cow<'static, str> {
Cow::Borrowed("/api/0/relays/outcomes/")
}
fn retry() -> bool {
true
}
fn route(&self) -> &'static str {
"outcomes"
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct SendOutcomesResponse {
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub struct OutcomeId(u8);
impl OutcomeId {
const ACCEPTED: OutcomeId = OutcomeId(0);
const FILTERED: OutcomeId = OutcomeId(1);
const RATE_LIMITED: OutcomeId = OutcomeId(2);
const INVALID: OutcomeId = OutcomeId(3);
const ABUSE: OutcomeId = OutcomeId(4);
const CLIENT_DISCARD: OutcomeId = OutcomeId(5);
const CARDINALITY_LIMITED: OutcomeId = OutcomeId(6);
pub fn as_u8(self) -> u8 {
self.0
}
}
trait TrackOutcomeLike {
fn reason(&self) -> Option<Cow<str>>;
fn outcome_id(&self) -> OutcomeId;
fn tag_name(&self) -> &'static str {
match self.outcome_id() {
OutcomeId::ACCEPTED => "accepted",
OutcomeId::FILTERED => "filtered",
OutcomeId::RATE_LIMITED => "rate_limited",
OutcomeId::INVALID => "invalid",
OutcomeId::ABUSE => "abuse",
OutcomeId::CLIENT_DISCARD => "client_discard",
OutcomeId::CARDINALITY_LIMITED => "cardinality_limited",
_ => "<unknown>",
}
}
}
#[derive(Clone, Debug, Hash)]
pub struct TrackOutcome {
pub timestamp: DateTime<Utc>,
pub scoping: Scoping,
pub outcome: Outcome,
pub event_id: Option<EventId>,
pub remote_addr: Option<IpAddr>,
pub category: DataCategory,
pub quantity: u32,
}
impl TrackOutcomeLike for TrackOutcome {
fn reason(&self) -> Option<Cow<str>> {
self.outcome.to_reason()
}
fn outcome_id(&self) -> OutcomeId {
self.outcome.to_outcome_id()
}
}
impl Interface for TrackOutcome {}
impl FromMessage<Self> for TrackOutcome {
type Response = NoResponse;
fn from_message(message: Self, _: ()) -> Self {
message
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Outcome {
#[allow(dead_code)]
Accepted,
Filtered(FilterStatKey),
FilteredSampling(RuleCategories),
RateLimited(Option<ReasonCode>),
#[cfg(feature = "processing")]
CardinalityLimited(String),
Invalid(DiscardReason),
#[allow(dead_code)]
Abuse,
ClientDiscard(String),
}
impl Outcome {
pub fn to_outcome_id(&self) -> OutcomeId {
match self {
Outcome::Filtered(_) | Outcome::FilteredSampling(_) => OutcomeId::FILTERED,
Outcome::RateLimited(_) => OutcomeId::RATE_LIMITED,
#[cfg(feature = "processing")]
Outcome::CardinalityLimited(_) => OutcomeId::CARDINALITY_LIMITED,
Outcome::Invalid(_) => OutcomeId::INVALID,
Outcome::Abuse => OutcomeId::ABUSE,
Outcome::ClientDiscard(_) => OutcomeId::CLIENT_DISCARD,
Outcome::Accepted => OutcomeId::ACCEPTED,
}
}
pub fn to_reason(&self) -> Option<Cow<'_, str>> {
match self {
Outcome::Invalid(discard_reason) => Some(Cow::Borrowed(discard_reason.name())),
Outcome::Filtered(filter_key) => Some(filter_key.clone().name()),
Outcome::FilteredSampling(rule_ids) => Some(Cow::Owned(format!("Sampled:{rule_ids}"))),
Outcome::RateLimited(code_opt) => {
code_opt.as_ref().map(|code| Cow::Borrowed(code.as_str()))
}
#[cfg(feature = "processing")]
Outcome::CardinalityLimited(id) => Some(Cow::Borrowed(id)),
Outcome::ClientDiscard(ref discard_reason) => Some(Cow::Borrowed(discard_reason)),
Outcome::Abuse => None,
Outcome::Accepted => None,
}
}
pub fn is_unexpected(&self) -> bool {
matches!(
self,
Outcome::Invalid(
DiscardReason::Internal
| DiscardReason::ProjectState
| DiscardReason::ProjectStatePii,
)
)
}
}
impl fmt::Display for Outcome {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Outcome::Filtered(key) => write!(f, "filtered by {key}"),
Outcome::FilteredSampling(rule_ids) => write!(f, "sampling rule {rule_ids}"),
Outcome::RateLimited(None) => write!(f, "rate limited"),
Outcome::RateLimited(Some(reason)) => write!(f, "rate limited with reason {reason}"),
#[cfg(feature = "processing")]
Outcome::CardinalityLimited(id) => write!(f, "cardinality limited ({})", id),
Outcome::Invalid(DiscardReason::Internal) => write!(f, "internal error"),
Outcome::Invalid(reason) => write!(f, "invalid data ({reason})"),
Outcome::Abuse => write!(f, "abuse limit reached"),
Outcome::ClientDiscard(reason) => write!(f, "discarded by client ({reason})"),
Outcome::Accepted => write!(f, "accepted"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum RuleCategory {
BoostLowVolumeProjects,
BoostEnvironments,
IgnoreHealthChecks,
BoostKeyTransactions,
Recalibration,
BoostReplayId,
BoostLowVolumeTransactions,
BoostLatestReleases,
Custom,
Other,
}
impl RuleCategory {
fn as_str(&self) -> &'static str {
match self {
Self::BoostLowVolumeProjects => "1000",
Self::BoostEnvironments => "1001",
Self::IgnoreHealthChecks => "1002",
Self::BoostKeyTransactions => "1003",
Self::Recalibration => "1004",
Self::BoostReplayId => "1005",
Self::BoostLowVolumeTransactions => "1400",
Self::BoostLatestReleases => "1500",
Self::Custom => "3000",
Self::Other => "0",
}
}
}
impl From<RuleId> for RuleCategory {
fn from(value: RuleId) -> Self {
match value.0 {
1000 => Self::BoostLowVolumeProjects,
1001 => Self::BoostEnvironments,
1002 => Self::IgnoreHealthChecks,
1003 => Self::BoostKeyTransactions,
1004 => Self::Recalibration,
1005 => Self::BoostReplayId,
1400..=1499 => Self::BoostLowVolumeTransactions,
1500..=1599 => Self::BoostLatestReleases,
3000..=4999 => Self::Custom,
_ => Self::Other,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RuleCategories(pub BTreeSet<RuleCategory>);
impl fmt::Display for RuleCategories {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (i, c) in self.0.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
write!(f, "{}", c.as_str())?;
}
Ok(())
}
}
impl From<MatchedRuleIds> for RuleCategories {
fn from(value: MatchedRuleIds) -> Self {
RuleCategories(BTreeSet::from_iter(
value.0.into_iter().map(RuleCategory::from),
))
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[allow(dead_code)]
pub enum DiscardReason {
Duplicate,
ProjectId,
AuthVersion,
AuthClient,
NoData,
EmptyEnvelope,
TooLarge,
DisallowedMethod,
ContentType,
MultiProjectId,
MissingMinidumpUpload,
InvalidMinidump,
SecurityReportType,
SecurityReport,
Cors,
Payload,
InvalidJson,
InvalidProtobuf,
InvalidMsgpack,
InvalidMultipart,
InvalidTransaction,
InvalidEnvelope,
InvalidCompression,
ProjectState,
ProjectStatePii,
DuplicateItem,
NoEventPayload,
Timestamp,
Internal,
ProcessUnreal,
TransactionSampled,
InvalidReplayEvent,
InvalidReplayEventNoPayload,
InvalidReplayEventPii,
InvalidReplayRecordingEvent,
InvalidReplayVideoEvent,
Profiling(&'static str),
InvalidSpan,
FeatureDisabled(Feature),
}
impl DiscardReason {
pub fn name(self) -> &'static str {
match self {
DiscardReason::Duplicate => "duplicate",
DiscardReason::ProjectId => "project_id",
DiscardReason::AuthVersion => "auth_version",
DiscardReason::AuthClient => "auth_client",
DiscardReason::NoData => "no_data",
DiscardReason::TooLarge => "too_large",
DiscardReason::DisallowedMethod => "disallowed_method",
DiscardReason::ContentType => "content_type",
DiscardReason::MultiProjectId => "multi_project_id",
DiscardReason::MissingMinidumpUpload => "missing_minidump_upload",
DiscardReason::InvalidMinidump => "invalid_minidump",
DiscardReason::SecurityReportType => "security_report_type",
DiscardReason::SecurityReport => "security_report",
DiscardReason::Cors => "cors",
DiscardReason::ProcessUnreal => "process_unreal",
DiscardReason::Payload => "payload",
DiscardReason::InvalidJson => "invalid_json",
DiscardReason::InvalidMultipart => "invalid_multipart",
DiscardReason::InvalidMsgpack => "invalid_msgpack",
DiscardReason::InvalidProtobuf => "invalid_proto",
DiscardReason::InvalidTransaction => "invalid_transaction",
DiscardReason::InvalidEnvelope => "invalid_envelope",
DiscardReason::InvalidCompression => "invalid_compression",
DiscardReason::Timestamp => "timestamp",
DiscardReason::ProjectState => "project_state",
DiscardReason::ProjectStatePii => "project_state_pii",
DiscardReason::DuplicateItem => "duplicate_item",
DiscardReason::NoEventPayload => "no_event_payload",
DiscardReason::Internal => "internal",
DiscardReason::TransactionSampled => "transaction_sampled",
DiscardReason::EmptyEnvelope => "empty_envelope",
DiscardReason::InvalidReplayEvent => "invalid_replay",
DiscardReason::InvalidReplayEventNoPayload => "invalid_replay_no_payload",
DiscardReason::InvalidReplayEventPii => "invalid_replay_pii_scrubber_failed",
DiscardReason::InvalidReplayRecordingEvent => "invalid_replay_recording",
DiscardReason::InvalidReplayVideoEvent => "invalid_replay_video",
DiscardReason::Profiling(reason) => reason,
DiscardReason::InvalidSpan => "invalid_span",
DiscardReason::FeatureDisabled(_) => "feature_disabled",
}
}
}
impl fmt::Display for DiscardReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TrackRawOutcome {
timestamp: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
org_id: Option<OrganizationId>,
project_id: ProjectId,
#[serde(default, skip_serializing_if = "Option::is_none")]
key_id: Option<u64>,
outcome: OutcomeId,
#[serde(default, skip_serializing_if = "Option::is_none")]
reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
event_id: Option<EventId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
remote_addr: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
source: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub category: Option<u8>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quantity: Option<u32>,
}
impl TrackRawOutcome {
fn from_outcome(msg: TrackOutcome, config: &Config) -> Self {
let reason = msg.outcome.to_reason().map(|reason| reason.to_string());
let timestamp = msg.timestamp.to_rfc3339_opts(SecondsFormat::Micros, true);
let org_id = match msg.scoping.organization_id.value() {
0 => None,
id => Some(OrganizationId::new(id)),
};
let source = config.outcome_source().map(str::to_owned);
TrackRawOutcome {
timestamp,
org_id,
project_id: msg.scoping.project_id,
key_id: msg.scoping.key_id,
outcome: msg.outcome.to_outcome_id(),
reason,
event_id: msg.event_id,
remote_addr: msg.remote_addr.map(|addr| addr.to_string()),
source,
category: msg.category.value(),
quantity: Some(msg.quantity),
}
}
#[cfg(feature = "processing")]
fn is_billing(&self) -> bool {
matches!(self.outcome, OutcomeId::ACCEPTED | OutcomeId::RATE_LIMITED)
}
}
impl TrackOutcomeLike for TrackRawOutcome {
fn reason(&self) -> Option<Cow<str>> {
self.reason.as_ref().map(|s| s.into())
}
fn outcome_id(&self) -> OutcomeId {
self.outcome
}
}
impl Interface for TrackRawOutcome {}
impl FromMessage<Self> for TrackRawOutcome {
type Response = NoResponse;
fn from_message(message: Self, _: ()) -> Self {
message
}
}
#[derive(Debug)]
#[cfg(feature = "processing")]
#[cfg_attr(feature = "processing", derive(thiserror::Error))]
pub enum OutcomeError {
#[error("failed to send kafka message")]
SendFailed(ClientError),
#[error("json serialization error")]
SerializationError(serde_json::Error),
}
#[derive(Debug)]
struct HttpOutcomeProducer {
config: Arc<Config>,
upstream_relay: Addr<UpstreamRelay>,
unsent_outcomes: Vec<TrackRawOutcome>,
flush_handle: SleepHandle,
}
impl HttpOutcomeProducer {
pub fn new(config: Arc<Config>, upstream_relay: Addr<UpstreamRelay>) -> Self {
Self {
config,
upstream_relay,
unsent_outcomes: Vec::new(),
flush_handle: SleepHandle::idle(),
}
}
fn send_batch(&mut self) {
self.flush_handle.reset();
if self.unsent_outcomes.is_empty() {
relay_log::warn!("unexpected send_batch scheduled with no outcomes to send");
return;
} else {
relay_log::trace!(
"sending outcome batch of size {}",
self.unsent_outcomes.len()
);
}
let request = SendOutcomes {
outcomes: mem::take(&mut self.unsent_outcomes),
};
let upstream_relay = self.upstream_relay.clone();
relay_system::spawn!(async move {
match upstream_relay.send(SendQuery(request)).await {
Ok(_) => relay_log::trace!("outcome batch sent"),
Err(error) => {
relay_log::error!(error = &error as &dyn Error, "outcome batch sending failed")
}
}
});
}
fn handle_message(&mut self, message: TrackRawOutcome) {
relay_log::trace!("batching outcome");
self.unsent_outcomes.push(message);
if self.unsent_outcomes.len() >= self.config.outcome_batch_size() {
self.send_batch();
} else if self.flush_handle.is_idle() {
self.flush_handle.set(self.config.outcome_batch_interval());
}
}
}
impl Service for HttpOutcomeProducer {
type Interface = TrackRawOutcome;
async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
loop {
tokio::select! {
biased;
() = &mut self.flush_handle => self.send_batch(),
Some(message) = rx.recv() => self.handle_message(message),
else => break,
}
}
}
}
#[derive(Debug)]
struct ClientReportOutcomeProducer {
flush_interval: Duration,
unsent_reports: BTreeMap<Scoping, Vec<ClientReport>>,
flush_handle: SleepHandle,
envelope_processor: Addr<EnvelopeProcessor>,
}
impl ClientReportOutcomeProducer {
fn new(config: &Config, envelope_processor: Addr<EnvelopeProcessor>) -> Self {
Self {
flush_interval: Duration::from_secs(config.outcome_aggregator().flush_interval),
unsent_reports: BTreeMap::new(),
flush_handle: SleepHandle::idle(),
envelope_processor,
}
}
fn flush(&mut self) {
relay_log::trace!("flushing client reports");
self.flush_handle.reset();
let unsent_reports = mem::take(&mut self.unsent_reports);
for (scoping, client_reports) in unsent_reports.into_iter() {
self.envelope_processor.send(SubmitClientReports {
client_reports,
scoping,
});
}
}
fn handle_message(&mut self, msg: TrackOutcome) {
let mut client_report = ClientReport {
timestamp: Some(UnixTimestamp::from_secs(
msg.timestamp.timestamp().try_into().unwrap_or(0),
)),
..Default::default()
};
let discarded_events = match msg.outcome {
Outcome::Filtered(_) => &mut client_report.filtered_events,
Outcome::FilteredSampling(_) => &mut client_report.filtered_sampling_events,
Outcome::RateLimited(_) => &mut client_report.rate_limited_events,
_ => {
return;
}
};
let discarded_event = DiscardedEvent {
reason: msg.outcome.to_reason().unwrap_or_default().to_string(),
category: msg.category,
quantity: msg.quantity,
};
discarded_events.push(discarded_event);
self.unsent_reports
.entry(msg.scoping)
.or_default()
.push(client_report);
if self.flush_interval == Duration::ZERO {
self.flush();
} else if self.flush_handle.is_idle() {
self.flush_handle.set(self.flush_interval);
}
}
}
impl Service for ClientReportOutcomeProducer {
type Interface = TrackOutcome;
async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
loop {
tokio::select! {
biased;
() = &mut self.flush_handle => self.flush(),
Some(message) = rx.recv() => self.handle_message(message),
else => break,
}
}
}
}
#[cfg(feature = "processing")]
#[derive(Debug)]
struct KafkaOutcomesProducer {
client: KafkaClient,
}
#[cfg(feature = "processing")]
impl KafkaOutcomesProducer {
pub fn create(config: &Config) -> anyhow::Result<Self> {
let mut client_builder = KafkaClient::builder();
for topic in &[KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
let kafka_config = &config.kafka_config(*topic).context(ServiceError::Kafka)?;
client_builder = client_builder
.add_kafka_topic_config(*topic, kafka_config, config.kafka_validate_topics())
.context(ServiceError::Kafka)?;
}
Ok(Self {
client: client_builder.build(),
})
}
}
#[derive(Debug)]
pub enum OutcomeProducer {
TrackOutcome(TrackOutcome),
TrackRawOutcome(TrackRawOutcome),
}
impl Interface for OutcomeProducer {}
impl FromMessage<TrackOutcome> for OutcomeProducer {
type Response = NoResponse;
fn from_message(message: TrackOutcome, _: ()) -> Self {
Self::TrackOutcome(message)
}
}
impl FromMessage<TrackRawOutcome> for OutcomeProducer {
type Response = NoResponse;
fn from_message(message: TrackRawOutcome, _: ()) -> Self {
Self::TrackRawOutcome(message)
}
}
fn send_outcome_metric(message: &impl TrackOutcomeLike, to: &'static str) {
metric!(
counter(RelayCounters::Outcomes) += 1,
reason = message.reason().as_deref().unwrap_or(""),
outcome = message.tag_name(),
to = to,
);
}
#[derive(Debug)]
enum OutcomeBroker {
ClientReport(Addr<TrackOutcome>),
Http(Addr<TrackRawOutcome>),
#[cfg(feature = "processing")]
Kafka(KafkaOutcomesProducer),
Disabled,
}
impl OutcomeBroker {
fn handle_message(&self, message: OutcomeProducer, config: &Config) {
match message {
OutcomeProducer::TrackOutcome(msg) => self.handle_track_outcome(msg, config),
OutcomeProducer::TrackRawOutcome(msg) => self.handle_track_raw_outcome(msg),
}
}
#[cfg(feature = "processing")]
fn send_kafka_message(
&self,
producer: &KafkaOutcomesProducer,
message: TrackRawOutcome,
) -> Result<(), OutcomeError> {
relay_log::trace!("Tracking kafka outcome: {message:?}");
let payload = serde_json::to_string(&message).map_err(OutcomeError::SerializationError)?;
let key = message.event_id.unwrap_or_default().0;
let topic = if message.is_billing() {
KafkaTopic::OutcomesBilling
} else {
KafkaTopic::Outcomes
};
let result =
producer
.client
.send(topic, key.as_bytes(), None, "outcome", payload.as_bytes());
match result {
Ok(_) => Ok(()),
Err(kafka_error) => Err(OutcomeError::SendFailed(kafka_error)),
}
}
fn handle_track_outcome(&self, message: TrackOutcome, config: &Config) {
match self {
#[cfg(feature = "processing")]
Self::Kafka(kafka_producer) => {
send_outcome_metric(&message, "kafka");
let raw_message = TrackRawOutcome::from_outcome(message, config);
if let Err(error) = self.send_kafka_message(kafka_producer, raw_message) {
relay_log::error!(error = &error as &dyn Error, "failed to produce outcome");
}
}
Self::ClientReport(producer) => {
send_outcome_metric(&message, "client_report");
producer.send(message);
}
Self::Http(producer) => {
send_outcome_metric(&message, "http");
producer.send(TrackRawOutcome::from_outcome(message, config));
}
Self::Disabled => (),
}
}
fn handle_track_raw_outcome(&self, message: TrackRawOutcome) {
match self {
#[cfg(feature = "processing")]
Self::Kafka(kafka_producer) => {
send_outcome_metric(&message, "kafka");
if let Err(error) = self.send_kafka_message(kafka_producer, message) {
relay_log::error!(error = &error as &dyn Error, "failed to produce outcome");
}
}
Self::Http(producer) => {
send_outcome_metric(&message, "http");
producer.send(message);
}
Self::ClientReport(_) => (),
Self::Disabled => (),
}
}
}
#[derive(Debug)]
enum ProducerInner {
#[cfg(feature = "processing")]
Kafka(KafkaOutcomesProducer),
Http(HttpOutcomeProducer),
ClientReport(ClientReportOutcomeProducer),
Disabled,
}
impl ProducerInner {
fn start(self) -> OutcomeBroker {
match self {
#[cfg(feature = "processing")]
ProducerInner::Kafka(inner) => OutcomeBroker::Kafka(inner),
ProducerInner::Http(inner) => OutcomeBroker::Http(inner.start_detached()),
ProducerInner::ClientReport(inner) => {
OutcomeBroker::ClientReport(inner.start_detached())
}
ProducerInner::Disabled => OutcomeBroker::Disabled,
}
}
}
#[derive(Debug)]
pub struct OutcomeProducerService {
config: Arc<Config>,
inner: ProducerInner,
}
impl OutcomeProducerService {
pub fn create(
config: Arc<Config>,
upstream_relay: Addr<UpstreamRelay>,
envelope_processor: Addr<EnvelopeProcessor>,
) -> anyhow::Result<Self> {
let inner = match config.emit_outcomes() {
#[cfg(feature = "processing")]
EmitOutcomes::AsOutcomes if config.processing_enabled() => {
relay_log::info!("Configured to emit outcomes via kafka");
ProducerInner::Kafka(KafkaOutcomesProducer::create(&config)?)
}
EmitOutcomes::AsOutcomes => {
relay_log::info!("Configured to emit outcomes via http");
ProducerInner::Http(HttpOutcomeProducer::new(
Arc::clone(&config),
upstream_relay,
))
}
EmitOutcomes::AsClientReports => {
relay_log::info!("Configured to emit outcomes as client reports");
ProducerInner::ClientReport(ClientReportOutcomeProducer::new(
&config,
envelope_processor,
))
}
EmitOutcomes::None => {
relay_log::info!("Configured to drop all outcomes");
ProducerInner::Disabled
}
};
Ok(Self { config, inner })
}
}
impl Service for OutcomeProducerService {
type Interface = OutcomeProducer;
async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
let Self { config, inner } = self;
let broker = inner.start();
relay_log::info!("OutcomeProducer started.");
while let Some(message) = rx.recv().await {
broker.handle_message(message, &config);
}
relay_log::info!("OutcomeProducer stopped.");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rule_category_roundtrip() {
let input = "123,1004,1500,1403,1403,1404,1000";
let rule_ids = MatchedRuleIds::parse(input).unwrap();
let rule_categories = RuleCategories::from(rule_ids);
let serialized = rule_categories.to_string();
assert_eq!(&serialized, "1000,1004,1400,1500,0");
assert_eq!(
MatchedRuleIds::parse(&serialized).unwrap(),
MatchedRuleIds([1000, 1004, 1400, 1500, 0].map(RuleId).into())
);
}
}