relay_server/services/processor/
span.rs

1//! Processor code related to standalone spans.
2
3use std::sync::Arc;
4
5use opentelemetry_proto::tonic::common::v1::any_value::Value;
6use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
7use prost::Message;
8use relay_dynamic_config::Feature;
9use relay_event_normalization::span::tag_extraction;
10use relay_event_schema::protocol::{Event, Span, SpanV2};
11use relay_protocol::Annotated;
12use relay_quotas::DataCategory;
13use relay_spans::otel_trace::TracesData;
14
15use crate::envelope::{ContentType, Item, ItemContainer, ItemType};
16use crate::managed::{ItemAction, TypedEnvelope};
17use crate::services::outcome::{DiscardReason, Outcome};
18use crate::services::processor::{SpanGroup, should_filter};
19use crate::statsd::RelayTimers;
20
21#[cfg(feature = "processing")]
22mod processing;
23use crate::services::projects::project::ProjectInfo;
24#[cfg(feature = "processing")]
25pub use processing::*;
26use relay_config::Config;
27
28use super::ProcessingError;
29
30pub fn filter(
31    managed_envelope: &mut TypedEnvelope<SpanGroup>,
32    config: Arc<Config>,
33    project_info: Arc<ProjectInfo>,
34) {
35    let disabled = should_filter(&config, &project_info, Feature::StandaloneSpanIngestion);
36    let otel_disabled = should_filter(&config, &project_info, Feature::OtelEndpoint);
37
38    managed_envelope.retain_items(|item| {
39        if disabled && item.is_span() {
40            relay_log::debug!("dropping span because feature is disabled");
41            ItemAction::DropSilently
42        } else if otel_disabled && item.ty() == &ItemType::OtelTracesData {
43            relay_log::debug!("dropping otel trace because feature is disabled");
44            ItemAction::DropSilently
45        } else {
46            ItemAction::Keep
47        }
48    });
49}
50
51/// Expands V2 spans to V1 spans.
52///
53/// This expands one item (contanining multiple V2 spans) into several
54/// (containing one V1 span each).
55pub fn expand_v2_spans(
56    managed_envelope: &mut TypedEnvelope<SpanGroup>,
57) -> Result<(), ProcessingError> {
58    let span_v2_items = managed_envelope
59        .envelope_mut()
60        .take_items_by(ItemContainer::<SpanV2>::is_container);
61
62    // V2 spans must always be sent as an `ItemContainer`, currently it is not allowed to
63    // send multiple containers for V2 spans.
64    //
65    // This restriction may be lifted in the future, this is why this validation only happens
66    // when processing is enabled, allowing it to be changed easily in the future.
67    //
68    // This limit mostly exists to incentivise SDKs to batch multiple spans into a single container,
69    // technically it can be removed without issues.
70    if span_v2_items.len() > 1 {
71        return Err(ProcessingError::DuplicateItem(ItemType::Span));
72    }
73
74    if span_v2_items.is_empty() {
75        return Ok(());
76    }
77
78    let now = std::time::Instant::now();
79
80    for span_v2_item in span_v2_items {
81        let spans_v2 = match ItemContainer::parse(&span_v2_item) {
82            Ok(spans_v2) => spans_v2,
83            Err(err) => {
84                relay_log::debug!("failed to parse V2 spans: {err}");
85                track_invalid(
86                    managed_envelope,
87                    DiscardReason::InvalidSpan,
88                    span_v2_item.item_count().unwrap_or(1) as usize,
89                );
90                continue;
91            }
92        };
93
94        for span_v2 in spans_v2.into_items() {
95            let span_v1 = span_v2.value.map_value(relay_spans::span_v2_to_span_v1);
96            match span_v1.to_json() {
97                Ok(payload) => {
98                    let mut new_item = Item::new(ItemType::Span);
99                    new_item.set_payload(ContentType::Json, payload);
100                    managed_envelope.envelope_mut().add_item(new_item);
101                }
102                Err(err) => {
103                    relay_log::debug!("failed to serialize span: {}", err);
104                    track_invalid(managed_envelope, DiscardReason::Internal, 1);
105                }
106            }
107        }
108    }
109
110    relay_statsd::metric!(timer(RelayTimers::SpanV2Expansion) = now.elapsed());
111
112    Ok(())
113}
114
115pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
116    let envelope = managed_envelope.envelope_mut();
117
118    for item in envelope.take_items_by(|item| item.ty() == &ItemType::OtelTracesData) {
119        convert_traces_data(item, managed_envelope);
120    }
121}
122
123fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGroup>) {
124    let traces_data = match parse_traces_data(item) {
125        Ok(traces_data) => traces_data,
126        Err(reason) => {
127            // NOTE: logging quantity=1 is semantically wrong, but we cannot know the real quantity
128            // without parsing.
129            track_invalid(managed_envelope, reason, 1);
130            return;
131        }
132    };
133    for resource_spans in traces_data.resource_spans {
134        for scope_spans in resource_spans.scope_spans {
135            for mut span in scope_spans.spans {
136                // Denormalize instrumentation scope and resource attributes into every span.
137                if let Some(ref scope) = scope_spans.scope {
138                    if !scope.name.is_empty() {
139                        span.attributes.push(KeyValue {
140                            key: "instrumentation.name".to_owned(),
141                            value: Some(AnyValue {
142                                value: Some(Value::StringValue(scope.name.clone())),
143                            }),
144                        })
145                    }
146                    if !scope.version.is_empty() {
147                        span.attributes.push(KeyValue {
148                            key: "instrumentation.version".to_owned(),
149                            value: Some(AnyValue {
150                                value: Some(Value::StringValue(scope.version.clone())),
151                            }),
152                        })
153                    }
154                    scope.attributes.iter().for_each(|a| {
155                        span.attributes.push(KeyValue {
156                            key: format!("instrumentation.{}", a.key),
157                            value: a.value.clone(),
158                        });
159                    });
160                }
161                if let Some(ref resource) = resource_spans.resource {
162                    resource.attributes.iter().for_each(|a| {
163                        span.attributes.push(KeyValue {
164                            key: format!("resource.{}", a.key),
165                            value: a.value.clone(),
166                        });
167                    });
168                }
169
170                let Ok(payload) = serde_json::to_vec(&span) else {
171                    track_invalid(managed_envelope, DiscardReason::Internal, 1);
172                    continue;
173                };
174                let mut item = Item::new(ItemType::OtelSpan);
175                item.set_payload(ContentType::Json, payload);
176                managed_envelope.envelope_mut().add_item(item);
177            }
178        }
179    }
180    managed_envelope.update(); // update envelope summary
181}
182
183fn track_invalid(
184    managed_envelope: &mut TypedEnvelope<SpanGroup>,
185    reason: DiscardReason,
186    quantity: usize,
187) {
188    managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::Span, quantity);
189    managed_envelope.track_outcome(
190        Outcome::Invalid(reason),
191        DataCategory::SpanIndexed,
192        quantity,
193    );
194}
195
196fn parse_traces_data(item: Item) -> Result<TracesData, DiscardReason> {
197    match item.content_type() {
198        Some(&ContentType::Json) => serde_json::from_slice(&item.payload()).map_err(|e| {
199            relay_log::debug!(
200                error = &e as &dyn std::error::Error,
201                "Failed to parse traces data as JSON"
202            );
203            DiscardReason::InvalidJson
204        }),
205        Some(&ContentType::Protobuf) => TracesData::decode(item.payload()).map_err(|e| {
206            relay_log::debug!(
207                error = &e as &dyn std::error::Error,
208                "Failed to parse traces data as protobuf"
209            );
210            DiscardReason::InvalidProtobuf
211        }),
212        _ => Err(DiscardReason::ContentType),
213    }
214}
215
216/// Creates a span from the transaction and applies tag extraction on it.
217///
218/// Returns `None` when [`tag_extraction::extract_span_tags`] clears the span, which it shouldn't.
219pub fn extract_transaction_span(
220    event: &Event,
221    max_tag_value_size: usize,
222    span_allowed_hosts: &[String],
223) -> Option<Span> {
224    let mut spans = [Span::from(event).into()];
225
226    tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size, span_allowed_hosts);
227    tag_extraction::extract_segment_span_tags(event, &mut spans);
228
229    spans.into_iter().next().and_then(Annotated::into_value)
230}
231
232#[cfg(test)]
233mod tests {
234    use std::collections::BTreeMap;
235
236    use super::*;
237    use crate::Envelope;
238    use crate::managed::{ManagedEnvelope, TypedEnvelope};
239    use crate::services::processor::ProcessingGroup;
240    use bytes::Bytes;
241    use relay_spans::otel_trace::Span as OtelSpan;
242    use relay_system::Addr;
243
244    #[test]
245    fn attribute_denormalization() {
246        // Construct an OTLP trace payload with:
247        // - a resource with one attribute, containing:
248        // - an instrumentation scope with one attribute, containing:
249        // - a span with one attribute
250        let traces_data = r#"
251        {
252            "resourceSpans": [
253                {
254                    "resource": {
255                        "attributes": [
256                            {
257                                "key": "resource_key",
258                                "value": {
259                                    "stringValue": "resource_value"
260                                }
261                            }
262                        ]
263                    },
264                    "scopeSpans": [
265                        {
266                            "scope": {
267                                "name": "test_instrumentation",
268                                "version": "0.0.1",
269                                "attributes": [
270                                    {
271                                        "key": "scope_key",
272                                        "value": {
273                                            "stringValue": "scope_value"
274                                        }
275                                    }
276                                ]
277                            },
278                            "spans": [
279                                {
280                                    "attributes": [
281                                        {
282                                            "key": "span_key",
283                                            "value": {
284                                                "stringValue": "span_value"
285                                            }
286                                        }
287                                    ]
288                                }
289                            ]
290                        }
291                    ]
292                }
293            ]
294        }
295        "#;
296
297        // Build an envelope containing the OTLP trace data.
298        let bytes =
299            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
300        let envelope = Envelope::parse_bytes(bytes).unwrap();
301        let (test_store, _) = Addr::custom();
302        let (outcome_aggregator, _) = Addr::custom();
303        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
304        let mut typed_envelope: TypedEnvelope<_> = (managed_envelope, ProcessingGroup::Span)
305            .try_into()
306            .unwrap();
307        let mut item = Item::new(ItemType::OtelTracesData);
308        item.set_payload(ContentType::Json, traces_data);
309        typed_envelope.envelope_mut().add_item(item.clone());
310
311        // Convert the OTLP trace data into `OtelSpan` item(s).
312        convert_traces_data(item, &mut typed_envelope);
313
314        // Assert that the attributes from the resource and instrumentation
315        // scope were copied.
316        let item = typed_envelope
317            .envelope()
318            .items()
319            .find(|i| *i.ty() == ItemType::OtelSpan)
320            .expect("converted span missing from envelope");
321        let attributes = serde_json::from_slice::<OtelSpan>(&item.payload())
322            .expect("unable to deserialize otel span")
323            .attributes
324            .into_iter()
325            .map(|kv| (kv.key, kv.value.unwrap()))
326            .collect::<BTreeMap<_, _>>();
327        let attribute_value = |key: &str| -> String {
328            match attributes
329                .get(key)
330                .unwrap_or_else(|| panic!("attribute {key} missing"))
331                .to_owned()
332                .value
333            {
334                Some(Value::StringValue(str)) => str,
335                _ => panic!("attribute {key} not a string"),
336            }
337        };
338        assert_eq!(
339            attribute_value("span_key"),
340            "span_value".to_owned(),
341            "original span attribute should be present"
342        );
343        assert_eq!(
344            attribute_value("instrumentation.name"),
345            "test_instrumentation".to_owned(),
346            "instrumentation name should be in attributes"
347        );
348        assert_eq!(
349            attribute_value("instrumentation.version"),
350            "0.0.1".to_owned(),
351            "instrumentation version should be in attributes"
352        );
353        assert_eq!(
354            attribute_value("resource.resource_key"),
355            "resource_value".to_owned(),
356            "resource attribute should be copied with prefix"
357        );
358        assert_eq!(
359            attribute_value("instrumentation.scope_key"),
360            "scope_value".to_owned(),
361            "instruementation scope attribute should be copied with prefix"
362        );
363    }
364}