relay_server/services/processor/
span.rs1use prost::Message;
4use relay_dynamic_config::Feature;
5use relay_event_normalization::span::tag_extraction;
6use relay_event_schema::protocol::{Event, Span};
7use relay_protocol::Annotated;
8use relay_quotas::DataCategory;
9use relay_spans::otel_trace::TracesData;
10
11use crate::envelope::{ContentType, Item, ItemType};
12use crate::integrations::{Integration, OtelFormat, SpansIntegration};
13use crate::managed::{ItemAction, TypedEnvelope};
14use crate::services::outcome::{DiscardReason, Outcome};
15use crate::services::processor::{SpanGroup, should_filter};
16
17#[cfg(feature = "processing")]
18mod processing;
19use crate::services::projects::project::ProjectInfo;
20#[cfg(feature = "processing")]
21pub use processing::*;
22use relay_config::Config;
23
24pub fn filter(
25 managed_envelope: &mut TypedEnvelope<SpanGroup>,
26 config: &Config,
27 project_info: &ProjectInfo,
28) {
29 let disabled = should_filter(config, project_info, Feature::StandaloneSpanIngestion);
30 if !disabled {
31 return;
32 }
33
34 managed_envelope.retain_items(|item| {
35 let is_span = matches!(item.ty(), &ItemType::Span)
36 || matches!(item.integration(), Some(Integration::Spans(_)));
37
38 match is_span {
39 true => {
40 relay_log::debug!("dropping span because feature is disabled");
41 ItemAction::DropSilently
42 }
43 false => ItemAction::Keep,
44 }
45 });
46}
47
48pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
49 let envelope = managed_envelope.envelope_mut();
50
51 for item in envelope.take_items_by(|item| {
52 matches!(
53 item.integration(),
54 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
55 )
56 }) {
57 convert_traces_data(item, managed_envelope);
58 }
59}
60
61fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGroup>) {
62 let traces_data = match parse_traces_data(item) {
63 Ok(traces_data) => traces_data,
64 Err(reason) => {
65 track_invalid(managed_envelope, reason, 1);
68 return;
69 }
70 };
71 for resource_spans in traces_data.resource_spans {
72 for scope_spans in resource_spans.scope_spans {
73 for span in scope_spans.spans {
74 let span = relay_spans::otel_to_sentry_span_v1(
75 span,
76 resource_spans.resource.as_ref(),
77 scope_spans.scope.as_ref(),
78 );
79
80 let Ok(payload) = Annotated::new(span).to_json() else {
81 track_invalid(managed_envelope, DiscardReason::Internal, 1);
82 continue;
83 };
84
85 let mut item = Item::new(ItemType::Span);
86 item.set_payload(ContentType::Json, payload);
87 managed_envelope.envelope_mut().add_item(item);
88 }
89 }
90 }
91 managed_envelope.update(); }
93
94fn track_invalid(
95 managed_envelope: &mut TypedEnvelope<SpanGroup>,
96 reason: DiscardReason,
97 quantity: usize,
98) {
99 managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::Span, quantity);
100 managed_envelope.track_outcome(
101 Outcome::Invalid(reason),
102 DataCategory::SpanIndexed,
103 quantity,
104 );
105}
106
107fn parse_traces_data(item: Item) -> Result<TracesData, DiscardReason> {
108 let Some(Integration::Spans(SpansIntegration::OtelV1 { format })) = item.integration() else {
109 return Err(DiscardReason::ContentType);
110 };
111
112 match format {
113 OtelFormat::Json => serde_json::from_slice(&item.payload()).map_err(|e| {
114 relay_log::debug!(
115 error = &e as &dyn std::error::Error,
116 "Failed to parse traces data as JSON"
117 );
118 DiscardReason::InvalidJson
119 }),
120 OtelFormat::Protobuf => TracesData::decode(item.payload()).map_err(|e| {
121 relay_log::debug!(
122 error = &e as &dyn std::error::Error,
123 "Failed to parse traces data as protobuf"
124 );
125 DiscardReason::InvalidProtobuf
126 }),
127 }
128}
129
130pub fn extract_transaction_span(
134 event: &Event,
135 max_tag_value_size: usize,
136 span_allowed_hosts: &[String],
137) -> Option<Span> {
138 let mut spans = [Span::from(event).into()];
139
140 tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size, span_allowed_hosts);
141 tag_extraction::extract_segment_span_tags(event, &mut spans);
142
143 spans.into_iter().next().and_then(Annotated::into_value)
144}
145
146#[cfg(test)]
147mod tests {
148
149 use super::*;
150 use crate::Envelope;
151 use crate::managed::{ManagedEnvelope, TypedEnvelope};
152 use crate::services::processor::ProcessingGroup;
153 use bytes::Bytes;
154 use relay_system::Addr;
155
156 #[test]
157 fn attribute_denormalization() {
158 let traces_data = r#"
163 {
164 "resourceSpans": [
165 {
166 "resource": {
167 "attributes": [
168 {
169 "key": "resource_key",
170 "value": {
171 "stringValue": "resource_value"
172 }
173 }
174 ]
175 },
176 "scopeSpans": [
177 {
178 "scope": {
179 "name": "test_instrumentation",
180 "version": "0.0.1",
181 "attributes": [
182 {
183 "key": "scope_key",
184 "value": {
185 "stringValue": "scope_value"
186 }
187 }
188 ]
189 },
190 "spans": [
191 {
192 "traceId": "89143b0763095bd9c9955e8175d1fb23",
193 "spanId": "e342abb1214ca181",
194 "attributes": [
195 {
196 "key": "span_key",
197 "value": {
198 "stringValue": "span_value"
199 }
200 }
201 ]
202 }
203 ]
204 }
205 ]
206 }
207 ]
208 }
209 "#;
210
211 let bytes =
213 Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
214 let envelope = Envelope::parse_bytes(bytes).unwrap();
215 let (outcome_aggregator, _) = Addr::custom();
216 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
217 let mut typed_envelope: TypedEnvelope<_> = (managed_envelope, ProcessingGroup::Span)
218 .try_into()
219 .unwrap();
220 let mut item = Item::new(ItemType::Integration);
221 item.set_payload(
222 Integration::Spans(SpansIntegration::OtelV1 {
223 format: OtelFormat::Json,
224 })
225 .into(),
226 traces_data,
227 );
228 typed_envelope.envelope_mut().add_item(item.clone());
229
230 convert_traces_data(item, &mut typed_envelope);
232
233 let item = typed_envelope
236 .envelope()
237 .items()
238 .find(|i| *i.ty() == ItemType::Span)
239 .expect("converted span missing from envelope");
240
241 let payload = serde_json::from_slice::<serde_json::Value>(&item.payload()).unwrap();
242 insta::assert_json_snapshot!(payload, @r#"
243 {
244 "data": {
245 "instrumentation.name": "test_instrumentation",
246 "instrumentation.scope_key": "scope_value",
247 "instrumentation.version": "0.0.1",
248 "resource.resource_key": "resource_value",
249 "span_key": "span_value"
250 },
251 "exclusive_time": 0.0,
252 "is_remote": false,
253 "links": [],
254 "op": "default",
255 "span_id": "e342abb1214ca181",
256 "start_timestamp": 0.0,
257 "status": "ok",
258 "timestamp": 0.0,
259 "trace_id": "89143b0763095bd9c9955e8175d1fb23"
260 }
261 "#);
262 }
263}