relay_server/services/processor/
span.rs1use std::sync::Arc;
4
5use opentelemetry_proto::tonic::common::v1::any_value::Value;
6use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
7use prost::Message;
8use relay_dynamic_config::Feature;
9use relay_event_normalization::span::tag_extraction;
10use relay_event_schema::protocol::{Event, Span, SpanV2};
11use relay_protocol::Annotated;
12use relay_quotas::DataCategory;
13use relay_spans::otel_trace::TracesData;
14
15use crate::envelope::{ContentType, Item, ItemContainer, ItemType};
16use crate::managed::{ItemAction, TypedEnvelope};
17use crate::services::outcome::{DiscardReason, Outcome};
18use crate::services::processor::{SpanGroup, should_filter};
19use crate::statsd::RelayTimers;
20
21#[cfg(feature = "processing")]
22mod processing;
23use crate::services::projects::project::ProjectInfo;
24#[cfg(feature = "processing")]
25pub use processing::*;
26use relay_config::Config;
27
28use super::ProcessingError;
29
30pub fn filter(
31 managed_envelope: &mut TypedEnvelope<SpanGroup>,
32 config: Arc<Config>,
33 project_info: Arc<ProjectInfo>,
34) {
35 let disabled = should_filter(&config, &project_info, Feature::StandaloneSpanIngestion);
36 let otel_disabled = should_filter(&config, &project_info, Feature::OtelEndpoint);
37
38 managed_envelope.retain_items(|item| {
39 if disabled && item.is_span() {
40 relay_log::debug!("dropping span because feature is disabled");
41 ItemAction::DropSilently
42 } else if otel_disabled && item.ty() == &ItemType::OtelTracesData {
43 relay_log::debug!("dropping otel trace because feature is disabled");
44 ItemAction::DropSilently
45 } else {
46 ItemAction::Keep
47 }
48 });
49}
50
51pub fn expand_v2_spans(
56 managed_envelope: &mut TypedEnvelope<SpanGroup>,
57) -> Result<(), ProcessingError> {
58 let span_v2_items = managed_envelope
59 .envelope_mut()
60 .take_items_by(ItemContainer::<SpanV2>::is_container);
61
62 if span_v2_items.len() > 1 {
71 return Err(ProcessingError::DuplicateItem(ItemType::Span));
72 }
73
74 if span_v2_items.is_empty() {
75 return Ok(());
76 }
77
78 let now = std::time::Instant::now();
79
80 for span_v2_item in span_v2_items {
81 let spans_v2 = match ItemContainer::parse(&span_v2_item) {
82 Ok(spans_v2) => spans_v2,
83 Err(err) => {
84 relay_log::debug!("failed to parse V2 spans: {err}");
85 track_invalid(
86 managed_envelope,
87 DiscardReason::InvalidSpan,
88 span_v2_item.item_count().unwrap_or(1) as usize,
89 );
90 continue;
91 }
92 };
93
94 for span_v2 in spans_v2.into_items() {
95 let span_v1 = span_v2.value.map_value(relay_spans::span_v2_to_span_v1);
96 match span_v1.to_json() {
97 Ok(payload) => {
98 let mut new_item = Item::new(ItemType::Span);
99 new_item.set_payload(ContentType::Json, payload);
100 managed_envelope.envelope_mut().add_item(new_item);
101 }
102 Err(err) => {
103 relay_log::debug!("failed to serialize span: {}", err);
104 track_invalid(managed_envelope, DiscardReason::Internal, 1);
105 }
106 }
107 }
108 }
109
110 relay_statsd::metric!(timer(RelayTimers::SpanV2Expansion) = now.elapsed());
111
112 Ok(())
113}
114
115pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
116 let envelope = managed_envelope.envelope_mut();
117
118 for item in envelope.take_items_by(|item| item.ty() == &ItemType::OtelTracesData) {
119 convert_traces_data(item, managed_envelope);
120 }
121}
122
123fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGroup>) {
124 let traces_data = match parse_traces_data(item) {
125 Ok(traces_data) => traces_data,
126 Err(reason) => {
127 track_invalid(managed_envelope, reason, 1);
130 return;
131 }
132 };
133 for resource_spans in traces_data.resource_spans {
134 for scope_spans in resource_spans.scope_spans {
135 for mut span in scope_spans.spans {
136 if let Some(ref scope) = scope_spans.scope {
138 if !scope.name.is_empty() {
139 span.attributes.push(KeyValue {
140 key: "instrumentation.name".to_owned(),
141 value: Some(AnyValue {
142 value: Some(Value::StringValue(scope.name.clone())),
143 }),
144 })
145 }
146 if !scope.version.is_empty() {
147 span.attributes.push(KeyValue {
148 key: "instrumentation.version".to_owned(),
149 value: Some(AnyValue {
150 value: Some(Value::StringValue(scope.version.clone())),
151 }),
152 })
153 }
154 scope.attributes.iter().for_each(|a| {
155 span.attributes.push(KeyValue {
156 key: format!("instrumentation.{}", a.key),
157 value: a.value.clone(),
158 });
159 });
160 }
161 if let Some(ref resource) = resource_spans.resource {
162 resource.attributes.iter().for_each(|a| {
163 span.attributes.push(KeyValue {
164 key: format!("resource.{}", a.key),
165 value: a.value.clone(),
166 });
167 });
168 }
169
170 let Ok(payload) = serde_json::to_vec(&span) else {
171 track_invalid(managed_envelope, DiscardReason::Internal, 1);
172 continue;
173 };
174 let mut item = Item::new(ItemType::OtelSpan);
175 item.set_payload(ContentType::Json, payload);
176 managed_envelope.envelope_mut().add_item(item);
177 }
178 }
179 }
180 managed_envelope.update(); }
182
183fn track_invalid(
184 managed_envelope: &mut TypedEnvelope<SpanGroup>,
185 reason: DiscardReason,
186 quantity: usize,
187) {
188 managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::Span, quantity);
189 managed_envelope.track_outcome(
190 Outcome::Invalid(reason),
191 DataCategory::SpanIndexed,
192 quantity,
193 );
194}
195
196fn parse_traces_data(item: Item) -> Result<TracesData, DiscardReason> {
197 match item.content_type() {
198 Some(&ContentType::Json) => serde_json::from_slice(&item.payload()).map_err(|e| {
199 relay_log::debug!(
200 error = &e as &dyn std::error::Error,
201 "Failed to parse traces data as JSON"
202 );
203 DiscardReason::InvalidJson
204 }),
205 Some(&ContentType::Protobuf) => TracesData::decode(item.payload()).map_err(|e| {
206 relay_log::debug!(
207 error = &e as &dyn std::error::Error,
208 "Failed to parse traces data as protobuf"
209 );
210 DiscardReason::InvalidProtobuf
211 }),
212 _ => Err(DiscardReason::ContentType),
213 }
214}
215
216pub fn extract_transaction_span(
220 event: &Event,
221 max_tag_value_size: usize,
222 span_allowed_hosts: &[String],
223) -> Option<Span> {
224 let mut spans = [Span::from(event).into()];
225
226 tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size, span_allowed_hosts);
227 tag_extraction::extract_segment_span_tags(event, &mut spans);
228
229 spans.into_iter().next().and_then(Annotated::into_value)
230}
231
232#[cfg(test)]
233mod tests {
234 use std::collections::BTreeMap;
235
236 use super::*;
237 use crate::Envelope;
238 use crate::managed::{ManagedEnvelope, TypedEnvelope};
239 use crate::services::processor::ProcessingGroup;
240 use bytes::Bytes;
241 use relay_spans::otel_trace::Span as OtelSpan;
242 use relay_system::Addr;
243
244 #[test]
245 fn attribute_denormalization() {
246 let traces_data = r#"
251 {
252 "resourceSpans": [
253 {
254 "resource": {
255 "attributes": [
256 {
257 "key": "resource_key",
258 "value": {
259 "stringValue": "resource_value"
260 }
261 }
262 ]
263 },
264 "scopeSpans": [
265 {
266 "scope": {
267 "name": "test_instrumentation",
268 "version": "0.0.1",
269 "attributes": [
270 {
271 "key": "scope_key",
272 "value": {
273 "stringValue": "scope_value"
274 }
275 }
276 ]
277 },
278 "spans": [
279 {
280 "attributes": [
281 {
282 "key": "span_key",
283 "value": {
284 "stringValue": "span_value"
285 }
286 }
287 ]
288 }
289 ]
290 }
291 ]
292 }
293 ]
294 }
295 "#;
296
297 let bytes =
299 Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
300 let envelope = Envelope::parse_bytes(bytes).unwrap();
301 let (test_store, _) = Addr::custom();
302 let (outcome_aggregator, _) = Addr::custom();
303 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
304 let mut typed_envelope: TypedEnvelope<_> = (managed_envelope, ProcessingGroup::Span)
305 .try_into()
306 .unwrap();
307 let mut item = Item::new(ItemType::OtelTracesData);
308 item.set_payload(ContentType::Json, traces_data);
309 typed_envelope.envelope_mut().add_item(item.clone());
310
311 convert_traces_data(item, &mut typed_envelope);
313
314 let item = typed_envelope
317 .envelope()
318 .items()
319 .find(|i| *i.ty() == ItemType::OtelSpan)
320 .expect("converted span missing from envelope");
321 let attributes = serde_json::from_slice::<OtelSpan>(&item.payload())
322 .expect("unable to deserialize otel span")
323 .attributes
324 .into_iter()
325 .map(|kv| (kv.key, kv.value.unwrap()))
326 .collect::<BTreeMap<_, _>>();
327 let attribute_value = |key: &str| -> String {
328 match attributes
329 .get(key)
330 .unwrap_or_else(|| panic!("attribute {key} missing"))
331 .to_owned()
332 .value
333 {
334 Some(Value::StringValue(str)) => str,
335 _ => panic!("attribute {key} not a string"),
336 }
337 };
338 assert_eq!(
339 attribute_value("span_key"),
340 "span_value".to_owned(),
341 "original span attribute should be present"
342 );
343 assert_eq!(
344 attribute_value("instrumentation.name"),
345 "test_instrumentation".to_owned(),
346 "instrumentation name should be in attributes"
347 );
348 assert_eq!(
349 attribute_value("instrumentation.version"),
350 "0.0.1".to_owned(),
351 "instrumentation version should be in attributes"
352 );
353 assert_eq!(
354 attribute_value("resource.resource_key"),
355 "resource_value".to_owned(),
356 "resource attribute should be copied with prefix"
357 );
358 assert_eq!(
359 attribute_value("instrumentation.scope_key"),
360 "scope_value".to_owned(),
361 "instruementation scope attribute should be copied with prefix"
362 );
363 }
364}