relay_server/services/processor/
span.rs

1//! Processor code related to standalone spans.
2
3use prost::Message;
4use relay_dynamic_config::Feature;
5use relay_protocol::Annotated;
6use relay_quotas::DataCategory;
7use relay_spans::otel_trace::TracesData;
8
9use crate::envelope::{ContentType, Item, ItemType};
10use crate::integrations::{Integration, OtelFormat, SpansIntegration};
11use crate::managed::{ItemAction, TypedEnvelope};
12use crate::services::outcome::{DiscardReason, Outcome};
13use crate::services::processor::{SpanGroup, should_filter};
14
15#[cfg(feature = "processing")]
16mod processing;
17use crate::services::projects::project::ProjectInfo;
18#[cfg(feature = "processing")]
19pub use processing::*;
20use relay_config::Config;
21
22pub fn filter(
23    managed_envelope: &mut TypedEnvelope<SpanGroup>,
24    config: &Config,
25    project_info: &ProjectInfo,
26) {
27    let disabled = should_filter(config, project_info, Feature::StandaloneSpanIngestion);
28    if !disabled {
29        return;
30    }
31
32    managed_envelope.retain_items(|item| {
33        let is_span = matches!(item.ty(), &ItemType::Span)
34            || matches!(item.integration(), Some(Integration::Spans(_)));
35
36        match is_span {
37            true => {
38                relay_log::debug!("dropping span because feature is disabled");
39                ItemAction::DropSilently
40            }
41            false => ItemAction::Keep,
42        }
43    });
44}
45
46pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
47    let envelope = managed_envelope.envelope_mut();
48
49    for item in envelope.take_items_by(|item| {
50        matches!(
51            item.integration(),
52            Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
53        )
54    }) {
55        convert_traces_data(item, managed_envelope);
56    }
57}
58
59fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGroup>) {
60    let traces_data = match parse_traces_data(item) {
61        Ok(traces_data) => traces_data,
62        Err(reason) => {
63            // NOTE: logging quantity=1 is semantically wrong, but we cannot know the real quantity
64            // without parsing.
65            track_invalid(managed_envelope, reason, 1);
66            return;
67        }
68    };
69    for resource_spans in traces_data.resource_spans {
70        for scope_spans in resource_spans.scope_spans {
71            for span in scope_spans.spans {
72                let span = relay_spans::otel_to_sentry_span_v1(
73                    span,
74                    resource_spans.resource.as_ref(),
75                    scope_spans.scope.as_ref(),
76                );
77
78                let Ok(payload) = Annotated::new(span).to_json() else {
79                    track_invalid(managed_envelope, DiscardReason::Internal, 1);
80                    continue;
81                };
82
83                let mut item = Item::new(ItemType::Span);
84                item.set_payload(ContentType::Json, payload);
85                managed_envelope.envelope_mut().add_item(item);
86            }
87        }
88    }
89    managed_envelope.update(); // update envelope summary
90}
91
92fn track_invalid(
93    managed_envelope: &mut TypedEnvelope<SpanGroup>,
94    reason: DiscardReason,
95    quantity: usize,
96) {
97    managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::Span, quantity);
98    managed_envelope.track_outcome(
99        Outcome::Invalid(reason),
100        DataCategory::SpanIndexed,
101        quantity,
102    );
103}
104
105fn parse_traces_data(item: Item) -> Result<TracesData, DiscardReason> {
106    let Some(Integration::Spans(SpansIntegration::OtelV1 { format })) = item.integration() else {
107        return Err(DiscardReason::ContentType);
108    };
109
110    match format {
111        OtelFormat::Json => serde_json::from_slice(&item.payload()).map_err(|e| {
112            relay_log::debug!(
113                error = &e as &dyn std::error::Error,
114                "Failed to parse traces data as JSON"
115            );
116            DiscardReason::InvalidJson
117        }),
118        OtelFormat::Protobuf => TracesData::decode(item.payload()).map_err(|e| {
119            relay_log::debug!(
120                error = &e as &dyn std::error::Error,
121                "Failed to parse traces data as protobuf"
122            );
123            DiscardReason::InvalidProtobuf
124        }),
125    }
126}
127
128#[cfg(test)]
129mod tests {
130
131    use super::*;
132    use crate::Envelope;
133    use crate::managed::{ManagedEnvelope, TypedEnvelope};
134    use crate::services::processor::ProcessingGroup;
135    use bytes::Bytes;
136    use relay_system::Addr;
137
138    #[test]
139    fn attribute_denormalization() {
140        // Construct an OTLP trace payload with:
141        // - a resource with one attribute, containing:
142        // - an instrumentation scope with one attribute, containing:
143        // - a span with one attribute
144        let traces_data = r#"
145        {
146            "resourceSpans": [
147                {
148                    "resource": {
149                        "attributes": [
150                            {
151                                "key": "resource_key",
152                                "value": {
153                                    "stringValue": "resource_value"
154                                }
155                            }
156                        ]
157                    },
158                    "scopeSpans": [
159                        {
160                            "scope": {
161                                "name": "test_instrumentation",
162                                "version": "0.0.1",
163                                "attributes": [
164                                    {
165                                        "key": "scope_key",
166                                        "value": {
167                                            "stringValue": "scope_value"
168                                        }
169                                    }
170                                ]
171                            },
172                            "spans": [
173                                {
174                                    "traceId": "89143b0763095bd9c9955e8175d1fb23",
175                                    "spanId": "e342abb1214ca181",
176                                    "attributes": [
177                                        {
178                                            "key": "span_key",
179                                            "value": {
180                                                "stringValue": "span_value"
181                                            }
182                                        }
183                                    ]
184                                }
185                            ]
186                        }
187                    ]
188                }
189            ]
190        }
191        "#;
192
193        // Build an envelope containing the OTLP trace data.
194        let bytes =
195            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
196        let envelope = Envelope::parse_bytes(bytes).unwrap();
197        let (outcome_aggregator, _) = Addr::custom();
198        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
199        let mut typed_envelope: TypedEnvelope<_> = (managed_envelope, ProcessingGroup::Span)
200            .try_into()
201            .unwrap();
202        let mut item = Item::new(ItemType::Integration);
203        item.set_payload(
204            Integration::Spans(SpansIntegration::OtelV1 {
205                format: OtelFormat::Json,
206            })
207            .into(),
208            traces_data,
209        );
210        typed_envelope.envelope_mut().add_item(item.clone());
211
212        // Convert the OTLP trace data into `Span` item(s).
213        convert_traces_data(item, &mut typed_envelope);
214
215        // Assert that the attributes from the resource and instrumentation
216        // scope were copied.
217        let item = typed_envelope
218            .envelope()
219            .items()
220            .find(|i| *i.ty() == ItemType::Span)
221            .expect("converted span missing from envelope");
222
223        let payload = serde_json::from_slice::<serde_json::Value>(&item.payload()).unwrap();
224        insta::assert_json_snapshot!(payload, @r#"
225        {
226          "data": {
227            "instrumentation.name": "test_instrumentation",
228            "instrumentation.scope_key": "scope_value",
229            "instrumentation.version": "0.0.1",
230            "resource.resource_key": "resource_value",
231            "sentry.origin": "auto.otlp.spans",
232            "span_key": "span_value"
233          },
234          "exclusive_time": 0.0,
235          "links": [],
236          "op": "default",
237          "span_id": "e342abb1214ca181",
238          "start_timestamp": 0.0,
239          "status": "ok",
240          "timestamp": 0.0,
241          "trace_id": "89143b0763095bd9c9955e8175d1fb23"
242        }
243        "#);
244    }
245}