relay_server/services/processor/
span.rs

1//! Processor code related to standalone spans.
2
3use std::sync::Arc;
4
5use opentelemetry_proto::tonic::common::v1::any_value::Value;
6use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
7use prost::Message;
8use relay_dynamic_config::Feature;
9use relay_event_normalization::span::tag_extraction;
10use relay_event_schema::protocol::{Event, Span};
11use relay_protocol::Annotated;
12use relay_quotas::DataCategory;
13use relay_spans::otel_trace::TracesData;
14
15use crate::envelope::{ContentType, Item, ItemType};
16use crate::services::outcome::{DiscardReason, Outcome};
17use crate::services::processor::{should_filter, SpanGroup};
18use crate::utils::ItemAction;
19use crate::utils::TypedEnvelope;
20
21#[cfg(feature = "processing")]
22mod processing;
23use crate::services::projects::project::ProjectInfo;
24#[cfg(feature = "processing")]
25pub use processing::*;
26use relay_config::Config;
27
28pub fn filter(
29    managed_envelope: &mut TypedEnvelope<SpanGroup>,
30    config: Arc<Config>,
31    project_info: Arc<ProjectInfo>,
32) {
33    let disabled = should_filter(&config, &project_info, Feature::StandaloneSpanIngestion);
34    let otel_disabled = should_filter(&config, &project_info, Feature::OtelEndpoint);
35
36    managed_envelope.retain_items(|item| {
37        if disabled && item.is_span() {
38            relay_log::debug!("dropping span because feature is disabled");
39            ItemAction::DropSilently
40        } else if otel_disabled && item.ty() == &ItemType::OtelTracesData {
41            relay_log::debug!("dropping otel trace because feature is disabled");
42            ItemAction::DropSilently
43        } else {
44            ItemAction::Keep
45        }
46    });
47}
48
49pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
50    let envelope = managed_envelope.envelope_mut();
51
52    for item in envelope.take_items_by(|item| item.ty() == &ItemType::OtelTracesData) {
53        convert_traces_data(item, managed_envelope);
54    }
55}
56
57fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGroup>) {
58    let traces_data = match parse_traces_data(item) {
59        Ok(traces_data) => traces_data,
60        Err(reason) => {
61            // NOTE: logging quantity=1 is semantically wrong, but we cannot know the real quantity
62            // without parsing.
63            track_invalid(managed_envelope, reason);
64            return;
65        }
66    };
67    for resource_spans in traces_data.resource_spans {
68        for scope_spans in resource_spans.scope_spans {
69            for mut span in scope_spans.spans {
70                // Denormalize instrumentation scope and resource attributes into every span.
71                if let Some(ref scope) = scope_spans.scope {
72                    if !scope.name.is_empty() {
73                        span.attributes.push(KeyValue {
74                            key: "instrumentation.name".to_owned(),
75                            value: Some(AnyValue {
76                                value: Some(Value::StringValue(scope.name.clone())),
77                            }),
78                        })
79                    }
80                    if !scope.version.is_empty() {
81                        span.attributes.push(KeyValue {
82                            key: "instrumentation.version".to_owned(),
83                            value: Some(AnyValue {
84                                value: Some(Value::StringValue(scope.version.clone())),
85                            }),
86                        })
87                    }
88                    scope.attributes.iter().for_each(|a| {
89                        span.attributes.push(KeyValue {
90                            key: format!("instrumentation.{}", a.key),
91                            value: a.value.clone(),
92                        });
93                    });
94                }
95                if let Some(ref resource) = resource_spans.resource {
96                    resource.attributes.iter().for_each(|a| {
97                        span.attributes.push(KeyValue {
98                            key: format!("resource.{}", a.key),
99                            value: a.value.clone(),
100                        });
101                    });
102                }
103
104                let Ok(payload) = serde_json::to_vec(&span) else {
105                    track_invalid(managed_envelope, DiscardReason::Internal);
106                    continue;
107                };
108                let mut item = Item::new(ItemType::OtelSpan);
109                item.set_payload(ContentType::Json, payload);
110                managed_envelope.envelope_mut().add_item(item);
111            }
112        }
113    }
114    managed_envelope.update(); // update envelope summary
115}
116
117fn track_invalid(managed_envelope: &mut TypedEnvelope<SpanGroup>, reason: DiscardReason) {
118    managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::Span, 1);
119    managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::SpanIndexed, 1);
120}
121
122fn parse_traces_data(item: Item) -> Result<TracesData, DiscardReason> {
123    match item.content_type() {
124        Some(&ContentType::Json) => serde_json::from_slice(&item.payload()).map_err(|e| {
125            relay_log::debug!(
126                error = &e as &dyn std::error::Error,
127                "Failed to parse traces data as JSON"
128            );
129            DiscardReason::InvalidJson
130        }),
131        Some(&ContentType::Protobuf) => TracesData::decode(item.payload()).map_err(|e| {
132            relay_log::debug!(
133                error = &e as &dyn std::error::Error,
134                "Failed to parse traces data as protobuf"
135            );
136            DiscardReason::InvalidProtobuf
137        }),
138        _ => Err(DiscardReason::ContentType),
139    }
140}
141
142/// Creates a span from the transaction and applies tag extraction on it.
143///
144/// Returns `None` when [`tag_extraction::extract_span_tags`] clears the span, which it shouldn't.
145pub fn extract_transaction_span(
146    event: &Event,
147    max_tag_value_size: usize,
148    span_allowed_hosts: &[String],
149) -> Option<Span> {
150    let mut spans = [Span::from(event).into()];
151
152    tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size, span_allowed_hosts);
153    tag_extraction::extract_segment_span_tags(event, &mut spans);
154
155    spans.into_iter().next().and_then(Annotated::into_value)
156}
157
158#[cfg(test)]
159mod tests {
160    use std::collections::BTreeMap;
161
162    use super::*;
163    use crate::services::processor::ProcessingGroup;
164    use crate::utils::{ManagedEnvelope, TypedEnvelope};
165    use crate::Envelope;
166    use bytes::Bytes;
167    use relay_spans::otel_trace::Span as OtelSpan;
168    use relay_system::Addr;
169
170    #[test]
171    fn attribute_denormalization() {
172        // Construct an OTLP trace payload with:
173        // - a resource with one attribute, containing:
174        // - an instrumentation scope with one attribute, containing:
175        // - a span with one attribute
176        let traces_data = r#"
177        {
178            "resourceSpans": [
179                {
180                    "resource": {
181                        "attributes": [
182                            {
183                                "key": "resource_key",
184                                "value": {
185                                    "stringValue": "resource_value"
186                                }
187                            }
188                        ]
189                    },
190                    "scopeSpans": [
191                        {
192                            "scope": {
193                                "name": "test_instrumentation",
194                                "version": "0.0.1",
195                                "attributes": [
196                                    {
197                                        "key": "scope_key",
198                                        "value": {
199                                            "stringValue": "scope_value"
200                                        }
201                                    }
202                                ]
203                            },
204                            "spans": [
205                                {
206                                    "attributes": [
207                                        {
208                                            "key": "span_key",
209                                            "value": {
210                                                "stringValue": "span_value"
211                                            }
212                                        }
213                                    ]
214                                }
215                            ]
216                        }
217                    ]
218                }
219            ]
220        }
221        "#;
222
223        // Build an envelope containing the OTLP trace data.
224        let bytes =
225            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
226        let envelope = Envelope::parse_bytes(bytes).unwrap();
227        let (test_store, _) = Addr::custom();
228        let (outcome_aggregator, _) = Addr::custom();
229        let managed_envelope = ManagedEnvelope::new(
230            envelope,
231            outcome_aggregator,
232            test_store,
233            ProcessingGroup::Span,
234        );
235        let mut typed_envelope: TypedEnvelope<_> = managed_envelope.try_into().unwrap();
236        let mut item = Item::new(ItemType::OtelTracesData);
237        item.set_payload(ContentType::Json, traces_data);
238        typed_envelope.envelope_mut().add_item(item.clone());
239
240        // Convert the OTLP trace data into `OtelSpan` item(s).
241        convert_traces_data(item, &mut typed_envelope);
242
243        // Assert that the attributes from the resource and instrumentation
244        // scope were copied.
245        let item = typed_envelope
246            .envelope()
247            .items()
248            .find(|i| *i.ty() == ItemType::OtelSpan)
249            .expect("converted span missing from envelope");
250        let attributes = serde_json::from_slice::<OtelSpan>(&item.payload())
251            .expect("unable to deserialize otel span")
252            .attributes
253            .into_iter()
254            .map(|kv| (kv.key, kv.value.unwrap()))
255            .collect::<BTreeMap<_, _>>();
256        let attribute_value = |key: &str| -> String {
257            match attributes
258                .get(key)
259                .unwrap_or_else(|| panic!("attribute {} missing", key))
260                .to_owned()
261                .value
262            {
263                Some(Value::StringValue(str)) => str,
264                _ => panic!("attribute {} not a string", key),
265            }
266        };
267        assert_eq!(
268            attribute_value("span_key"),
269            "span_value".to_owned(),
270            "original span attribute should be present"
271        );
272        assert_eq!(
273            attribute_value("instrumentation.name"),
274            "test_instrumentation".to_owned(),
275            "instrumentation name should be in attributes"
276        );
277        assert_eq!(
278            attribute_value("instrumentation.version"),
279            "0.0.1".to_owned(),
280            "instrumentation version should be in attributes"
281        );
282        assert_eq!(
283            attribute_value("resource.resource_key"),
284            "resource_value".to_owned(),
285            "resource attribute should be copied with prefix"
286        );
287        assert_eq!(
288            attribute_value("instrumentation.scope_key"),
289            "scope_value".to_owned(),
290            "instruementation scope attribute should be copied with prefix"
291        );
292    }
293}