relay_server/services/processor/
span.rs1use prost::Message;
4use relay_dynamic_config::Feature;
5use relay_protocol::Annotated;
6use relay_quotas::DataCategory;
7use relay_spans::otel_trace::TracesData;
8
9use crate::envelope::{ContentType, Item, ItemType};
10use crate::integrations::{Integration, OtelFormat, SpansIntegration};
11use crate::managed::{ItemAction, TypedEnvelope};
12use crate::services::outcome::{DiscardReason, Outcome};
13use crate::services::processor::{SpanGroup, should_filter};
14
15#[cfg(feature = "processing")]
16mod processing;
17use crate::services::projects::project::ProjectInfo;
18#[cfg(feature = "processing")]
19pub use processing::*;
20use relay_config::Config;
21
22pub fn filter(
23 managed_envelope: &mut TypedEnvelope<SpanGroup>,
24 config: &Config,
25 project_info: &ProjectInfo,
26) {
27 let disabled = should_filter(config, project_info, Feature::StandaloneSpanIngestion);
28 if !disabled {
29 return;
30 }
31
32 managed_envelope.retain_items(|item| {
33 let is_span = matches!(item.ty(), &ItemType::Span)
34 || matches!(item.integration(), Some(Integration::Spans(_)));
35
36 match is_span {
37 true => {
38 relay_log::debug!("dropping span because feature is disabled");
39 ItemAction::DropSilently
40 }
41 false => ItemAction::Keep,
42 }
43 });
44}
45
46pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
47 let envelope = managed_envelope.envelope_mut();
48
49 for item in envelope.take_items_by(|item| {
50 matches!(
51 item.integration(),
52 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
53 )
54 }) {
55 convert_traces_data(item, managed_envelope);
56 }
57}
58
59fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGroup>) {
60 let traces_data = match parse_traces_data(item) {
61 Ok(traces_data) => traces_data,
62 Err(reason) => {
63 track_invalid(managed_envelope, reason, 1);
66 return;
67 }
68 };
69 for resource_spans in traces_data.resource_spans {
70 for scope_spans in resource_spans.scope_spans {
71 for span in scope_spans.spans {
72 let span = relay_spans::otel_to_sentry_span_v1(
73 span,
74 resource_spans.resource.as_ref(),
75 scope_spans.scope.as_ref(),
76 );
77
78 let Ok(payload) = Annotated::new(span).to_json() else {
79 track_invalid(managed_envelope, DiscardReason::Internal, 1);
80 continue;
81 };
82
83 let mut item = Item::new(ItemType::Span);
84 item.set_payload(ContentType::Json, payload);
85 managed_envelope.envelope_mut().add_item(item);
86 }
87 }
88 }
89 managed_envelope.update(); }
91
92fn track_invalid(
93 managed_envelope: &mut TypedEnvelope<SpanGroup>,
94 reason: DiscardReason,
95 quantity: usize,
96) {
97 managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::Span, quantity);
98 managed_envelope.track_outcome(
99 Outcome::Invalid(reason),
100 DataCategory::SpanIndexed,
101 quantity,
102 );
103}
104
105fn parse_traces_data(item: Item) -> Result<TracesData, DiscardReason> {
106 let Some(Integration::Spans(SpansIntegration::OtelV1 { format })) = item.integration() else {
107 return Err(DiscardReason::ContentType);
108 };
109
110 match format {
111 OtelFormat::Json => serde_json::from_slice(&item.payload()).map_err(|e| {
112 relay_log::debug!(
113 error = &e as &dyn std::error::Error,
114 "Failed to parse traces data as JSON"
115 );
116 DiscardReason::InvalidJson
117 }),
118 OtelFormat::Protobuf => TracesData::decode(item.payload()).map_err(|e| {
119 relay_log::debug!(
120 error = &e as &dyn std::error::Error,
121 "Failed to parse traces data as protobuf"
122 );
123 DiscardReason::InvalidProtobuf
124 }),
125 }
126}
127
128#[cfg(test)]
129mod tests {
130
131 use super::*;
132 use crate::Envelope;
133 use crate::managed::{ManagedEnvelope, TypedEnvelope};
134 use crate::services::processor::ProcessingGroup;
135 use bytes::Bytes;
136 use relay_system::Addr;
137
138 #[test]
139 fn attribute_denormalization() {
140 let traces_data = r#"
145 {
146 "resourceSpans": [
147 {
148 "resource": {
149 "attributes": [
150 {
151 "key": "resource_key",
152 "value": {
153 "stringValue": "resource_value"
154 }
155 }
156 ]
157 },
158 "scopeSpans": [
159 {
160 "scope": {
161 "name": "test_instrumentation",
162 "version": "0.0.1",
163 "attributes": [
164 {
165 "key": "scope_key",
166 "value": {
167 "stringValue": "scope_value"
168 }
169 }
170 ]
171 },
172 "spans": [
173 {
174 "traceId": "89143b0763095bd9c9955e8175d1fb23",
175 "spanId": "e342abb1214ca181",
176 "attributes": [
177 {
178 "key": "span_key",
179 "value": {
180 "stringValue": "span_value"
181 }
182 }
183 ]
184 }
185 ]
186 }
187 ]
188 }
189 ]
190 }
191 "#;
192
193 let bytes =
195 Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
196 let envelope = Envelope::parse_bytes(bytes).unwrap();
197 let (outcome_aggregator, _) = Addr::custom();
198 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
199 let mut typed_envelope: TypedEnvelope<_> = (managed_envelope, ProcessingGroup::Span)
200 .try_into()
201 .unwrap();
202 let mut item = Item::new(ItemType::Integration);
203 item.set_payload(
204 Integration::Spans(SpansIntegration::OtelV1 {
205 format: OtelFormat::Json,
206 })
207 .into(),
208 traces_data,
209 );
210 typed_envelope.envelope_mut().add_item(item.clone());
211
212 convert_traces_data(item, &mut typed_envelope);
214
215 let item = typed_envelope
218 .envelope()
219 .items()
220 .find(|i| *i.ty() == ItemType::Span)
221 .expect("converted span missing from envelope");
222
223 let payload = serde_json::from_slice::<serde_json::Value>(&item.payload()).unwrap();
224 insta::assert_json_snapshot!(payload, @r#"
225 {
226 "data": {
227 "instrumentation.name": "test_instrumentation",
228 "instrumentation.scope_key": "scope_value",
229 "instrumentation.version": "0.0.1",
230 "resource.resource_key": "resource_value",
231 "sentry.origin": "auto.otlp.spans",
232 "span_key": "span_value"
233 },
234 "exclusive_time": 0.0,
235 "links": [],
236 "op": "default",
237 "span_id": "e342abb1214ca181",
238 "start_timestamp": 0.0,
239 "status": "ok",
240 "timestamp": 0.0,
241 "trace_id": "89143b0763095bd9c9955e8175d1fb23"
242 }
243 "#);
244 }
245}