relay_server/services/processor/
span.rs

1//! Processor code related to standalone spans.
2
3use prost::Message;
4use relay_dynamic_config::Feature;
5use relay_event_normalization::span::tag_extraction;
6use relay_event_schema::protocol::{Event, Span};
7use relay_protocol::Annotated;
8use relay_quotas::DataCategory;
9use relay_spans::otel_trace::TracesData;
10
11use crate::envelope::{ContentType, Item, ItemType};
12use crate::integrations::{Integration, OtelFormat, SpansIntegration};
13use crate::managed::{ItemAction, TypedEnvelope};
14use crate::services::outcome::{DiscardReason, Outcome};
15use crate::services::processor::{SpanGroup, should_filter};
16
17#[cfg(feature = "processing")]
18mod processing;
19use crate::services::projects::project::ProjectInfo;
20#[cfg(feature = "processing")]
21pub use processing::*;
22use relay_config::Config;
23
24pub fn filter(
25    managed_envelope: &mut TypedEnvelope<SpanGroup>,
26    config: &Config,
27    project_info: &ProjectInfo,
28) {
29    let disabled = should_filter(config, project_info, Feature::StandaloneSpanIngestion);
30    if !disabled {
31        return;
32    }
33
34    managed_envelope.retain_items(|item| {
35        let is_span = matches!(item.ty(), &ItemType::Span)
36            || matches!(item.integration(), Some(Integration::Spans(_)));
37
38        match is_span {
39            true => {
40                relay_log::debug!("dropping span because feature is disabled");
41                ItemAction::DropSilently
42            }
43            false => ItemAction::Keep,
44        }
45    });
46}
47
48pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
49    let envelope = managed_envelope.envelope_mut();
50
51    for item in envelope.take_items_by(|item| {
52        matches!(
53            item.integration(),
54            Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
55        )
56    }) {
57        convert_traces_data(item, managed_envelope);
58    }
59}
60
61fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGroup>) {
62    let traces_data = match parse_traces_data(item) {
63        Ok(traces_data) => traces_data,
64        Err(reason) => {
65            // NOTE: logging quantity=1 is semantically wrong, but we cannot know the real quantity
66            // without parsing.
67            track_invalid(managed_envelope, reason, 1);
68            return;
69        }
70    };
71    for resource_spans in traces_data.resource_spans {
72        for scope_spans in resource_spans.scope_spans {
73            for span in scope_spans.spans {
74                let span = relay_spans::otel_to_sentry_span_v1(
75                    span,
76                    resource_spans.resource.as_ref(),
77                    scope_spans.scope.as_ref(),
78                );
79
80                let Ok(payload) = Annotated::new(span).to_json() else {
81                    track_invalid(managed_envelope, DiscardReason::Internal, 1);
82                    continue;
83                };
84
85                let mut item = Item::new(ItemType::Span);
86                item.set_payload(ContentType::Json, payload);
87                managed_envelope.envelope_mut().add_item(item);
88            }
89        }
90    }
91    managed_envelope.update(); // update envelope summary
92}
93
94fn track_invalid(
95    managed_envelope: &mut TypedEnvelope<SpanGroup>,
96    reason: DiscardReason,
97    quantity: usize,
98) {
99    managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::Span, quantity);
100    managed_envelope.track_outcome(
101        Outcome::Invalid(reason),
102        DataCategory::SpanIndexed,
103        quantity,
104    );
105}
106
107fn parse_traces_data(item: Item) -> Result<TracesData, DiscardReason> {
108    let Some(Integration::Spans(SpansIntegration::OtelV1 { format })) = item.integration() else {
109        return Err(DiscardReason::ContentType);
110    };
111
112    match format {
113        OtelFormat::Json => serde_json::from_slice(&item.payload()).map_err(|e| {
114            relay_log::debug!(
115                error = &e as &dyn std::error::Error,
116                "Failed to parse traces data as JSON"
117            );
118            DiscardReason::InvalidJson
119        }),
120        OtelFormat::Protobuf => TracesData::decode(item.payload()).map_err(|e| {
121            relay_log::debug!(
122                error = &e as &dyn std::error::Error,
123                "Failed to parse traces data as protobuf"
124            );
125            DiscardReason::InvalidProtobuf
126        }),
127    }
128}
129
130/// Creates a span from the transaction and applies tag extraction on it.
131///
132/// Returns `None` when [`tag_extraction::extract_span_tags`] clears the span, which it shouldn't.
133pub fn extract_transaction_span(
134    event: &Event,
135    max_tag_value_size: usize,
136    span_allowed_hosts: &[String],
137) -> Option<Span> {
138    let mut spans = [Span::from(event).into()];
139
140    tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size, span_allowed_hosts);
141    tag_extraction::extract_segment_span_tags(event, &mut spans);
142
143    spans.into_iter().next().and_then(Annotated::into_value)
144}
145
146#[cfg(test)]
147mod tests {
148
149    use super::*;
150    use crate::Envelope;
151    use crate::managed::{ManagedEnvelope, TypedEnvelope};
152    use crate::services::processor::ProcessingGroup;
153    use bytes::Bytes;
154    use relay_system::Addr;
155
156    #[test]
157    fn attribute_denormalization() {
158        // Construct an OTLP trace payload with:
159        // - a resource with one attribute, containing:
160        // - an instrumentation scope with one attribute, containing:
161        // - a span with one attribute
162        let traces_data = r#"
163        {
164            "resourceSpans": [
165                {
166                    "resource": {
167                        "attributes": [
168                            {
169                                "key": "resource_key",
170                                "value": {
171                                    "stringValue": "resource_value"
172                                }
173                            }
174                        ]
175                    },
176                    "scopeSpans": [
177                        {
178                            "scope": {
179                                "name": "test_instrumentation",
180                                "version": "0.0.1",
181                                "attributes": [
182                                    {
183                                        "key": "scope_key",
184                                        "value": {
185                                            "stringValue": "scope_value"
186                                        }
187                                    }
188                                ]
189                            },
190                            "spans": [
191                                {
192                                    "traceId": "89143b0763095bd9c9955e8175d1fb23",
193                                    "spanId": "e342abb1214ca181",
194                                    "attributes": [
195                                        {
196                                            "key": "span_key",
197                                            "value": {
198                                                "stringValue": "span_value"
199                                            }
200                                        }
201                                    ]
202                                }
203                            ]
204                        }
205                    ]
206                }
207            ]
208        }
209        "#;
210
211        // Build an envelope containing the OTLP trace data.
212        let bytes =
213            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
214        let envelope = Envelope::parse_bytes(bytes).unwrap();
215        let (outcome_aggregator, _) = Addr::custom();
216        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
217        let mut typed_envelope: TypedEnvelope<_> = (managed_envelope, ProcessingGroup::Span)
218            .try_into()
219            .unwrap();
220        let mut item = Item::new(ItemType::Integration);
221        item.set_payload(
222            Integration::Spans(SpansIntegration::OtelV1 {
223                format: OtelFormat::Json,
224            })
225            .into(),
226            traces_data,
227        );
228        typed_envelope.envelope_mut().add_item(item.clone());
229
230        // Convert the OTLP trace data into `Span` item(s).
231        convert_traces_data(item, &mut typed_envelope);
232
233        // Assert that the attributes from the resource and instrumentation
234        // scope were copied.
235        let item = typed_envelope
236            .envelope()
237            .items()
238            .find(|i| *i.ty() == ItemType::Span)
239            .expect("converted span missing from envelope");
240
241        let payload = serde_json::from_slice::<serde_json::Value>(&item.payload()).unwrap();
242        insta::assert_json_snapshot!(payload, @r#"
243        {
244          "data": {
245            "instrumentation.name": "test_instrumentation",
246            "instrumentation.scope_key": "scope_value",
247            "instrumentation.version": "0.0.1",
248            "resource.resource_key": "resource_value",
249            "span_key": "span_value"
250          },
251          "exclusive_time": 0.0,
252          "is_remote": false,
253          "links": [],
254          "op": "default",
255          "span_id": "e342abb1214ca181",
256          "start_timestamp": 0.0,
257          "status": "ok",
258          "timestamp": 0.0,
259          "trace_id": "89143b0763095bd9c9955e8175d1fb23"
260        }
261        "#);
262    }
263}