relay_server/services/processor/
span.rs1use std::sync::Arc;
4
5use opentelemetry_proto::tonic::common::v1::any_value::Value;
6use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
7use prost::Message;
8use relay_dynamic_config::Feature;
9use relay_event_normalization::span::tag_extraction;
10use relay_event_schema::protocol::{Event, Span};
11use relay_protocol::Annotated;
12use relay_quotas::DataCategory;
13use relay_spans::otel_trace::TracesData;
14
15use crate::envelope::{ContentType, Item, ItemType};
16use crate::services::outcome::{DiscardReason, Outcome};
17use crate::services::processor::{should_filter, SpanGroup};
18use crate::utils::ItemAction;
19use crate::utils::TypedEnvelope;
20
21#[cfg(feature = "processing")]
22mod processing;
23use crate::services::projects::project::ProjectInfo;
24#[cfg(feature = "processing")]
25pub use processing::*;
26use relay_config::Config;
27
28pub fn filter(
29 managed_envelope: &mut TypedEnvelope<SpanGroup>,
30 config: Arc<Config>,
31 project_info: Arc<ProjectInfo>,
32) {
33 let disabled = should_filter(&config, &project_info, Feature::StandaloneSpanIngestion);
34 let otel_disabled = should_filter(&config, &project_info, Feature::OtelEndpoint);
35
36 managed_envelope.retain_items(|item| {
37 if disabled && item.is_span() {
38 relay_log::debug!("dropping span because feature is disabled");
39 ItemAction::DropSilently
40 } else if otel_disabled && item.ty() == &ItemType::OtelTracesData {
41 relay_log::debug!("dropping otel trace because feature is disabled");
42 ItemAction::DropSilently
43 } else {
44 ItemAction::Keep
45 }
46 });
47}
48
49pub fn convert_otel_traces_data(managed_envelope: &mut TypedEnvelope<SpanGroup>) {
50 let envelope = managed_envelope.envelope_mut();
51
52 for item in envelope.take_items_by(|item| item.ty() == &ItemType::OtelTracesData) {
53 convert_traces_data(item, managed_envelope);
54 }
55}
56
57fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGroup>) {
58 let traces_data = match parse_traces_data(item) {
59 Ok(traces_data) => traces_data,
60 Err(reason) => {
61 track_invalid(managed_envelope, reason);
64 return;
65 }
66 };
67 for resource_spans in traces_data.resource_spans {
68 for scope_spans in resource_spans.scope_spans {
69 for mut span in scope_spans.spans {
70 if let Some(ref scope) = scope_spans.scope {
72 if !scope.name.is_empty() {
73 span.attributes.push(KeyValue {
74 key: "instrumentation.name".to_owned(),
75 value: Some(AnyValue {
76 value: Some(Value::StringValue(scope.name.clone())),
77 }),
78 })
79 }
80 if !scope.version.is_empty() {
81 span.attributes.push(KeyValue {
82 key: "instrumentation.version".to_owned(),
83 value: Some(AnyValue {
84 value: Some(Value::StringValue(scope.version.clone())),
85 }),
86 })
87 }
88 scope.attributes.iter().for_each(|a| {
89 span.attributes.push(KeyValue {
90 key: format!("instrumentation.{}", a.key),
91 value: a.value.clone(),
92 });
93 });
94 }
95 if let Some(ref resource) = resource_spans.resource {
96 resource.attributes.iter().for_each(|a| {
97 span.attributes.push(KeyValue {
98 key: format!("resource.{}", a.key),
99 value: a.value.clone(),
100 });
101 });
102 }
103
104 let Ok(payload) = serde_json::to_vec(&span) else {
105 track_invalid(managed_envelope, DiscardReason::Internal);
106 continue;
107 };
108 let mut item = Item::new(ItemType::OtelSpan);
109 item.set_payload(ContentType::Json, payload);
110 managed_envelope.envelope_mut().add_item(item);
111 }
112 }
113 }
114 managed_envelope.update(); }
116
117fn track_invalid(managed_envelope: &mut TypedEnvelope<SpanGroup>, reason: DiscardReason) {
118 managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::Span, 1);
119 managed_envelope.track_outcome(Outcome::Invalid(reason), DataCategory::SpanIndexed, 1);
120}
121
122fn parse_traces_data(item: Item) -> Result<TracesData, DiscardReason> {
123 match item.content_type() {
124 Some(&ContentType::Json) => serde_json::from_slice(&item.payload()).map_err(|e| {
125 relay_log::debug!(
126 error = &e as &dyn std::error::Error,
127 "Failed to parse traces data as JSON"
128 );
129 DiscardReason::InvalidJson
130 }),
131 Some(&ContentType::Protobuf) => TracesData::decode(item.payload()).map_err(|e| {
132 relay_log::debug!(
133 error = &e as &dyn std::error::Error,
134 "Failed to parse traces data as protobuf"
135 );
136 DiscardReason::InvalidProtobuf
137 }),
138 _ => Err(DiscardReason::ContentType),
139 }
140}
141
142pub fn extract_transaction_span(
146 event: &Event,
147 max_tag_value_size: usize,
148 span_allowed_hosts: &[String],
149) -> Option<Span> {
150 let mut spans = [Span::from(event).into()];
151
152 tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size, span_allowed_hosts);
153 tag_extraction::extract_segment_span_tags(event, &mut spans);
154
155 spans.into_iter().next().and_then(Annotated::into_value)
156}
157
158#[cfg(test)]
159mod tests {
160 use std::collections::BTreeMap;
161
162 use super::*;
163 use crate::services::processor::ProcessingGroup;
164 use crate::utils::{ManagedEnvelope, TypedEnvelope};
165 use crate::Envelope;
166 use bytes::Bytes;
167 use relay_spans::otel_trace::Span as OtelSpan;
168 use relay_system::Addr;
169
170 #[test]
171 fn attribute_denormalization() {
172 let traces_data = r#"
177 {
178 "resourceSpans": [
179 {
180 "resource": {
181 "attributes": [
182 {
183 "key": "resource_key",
184 "value": {
185 "stringValue": "resource_value"
186 }
187 }
188 ]
189 },
190 "scopeSpans": [
191 {
192 "scope": {
193 "name": "test_instrumentation",
194 "version": "0.0.1",
195 "attributes": [
196 {
197 "key": "scope_key",
198 "value": {
199 "stringValue": "scope_value"
200 }
201 }
202 ]
203 },
204 "spans": [
205 {
206 "attributes": [
207 {
208 "key": "span_key",
209 "value": {
210 "stringValue": "span_value"
211 }
212 }
213 ]
214 }
215 ]
216 }
217 ]
218 }
219 ]
220 }
221 "#;
222
223 let bytes =
225 Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
226 let envelope = Envelope::parse_bytes(bytes).unwrap();
227 let (test_store, _) = Addr::custom();
228 let (outcome_aggregator, _) = Addr::custom();
229 let managed_envelope = ManagedEnvelope::new(
230 envelope,
231 outcome_aggregator,
232 test_store,
233 ProcessingGroup::Span,
234 );
235 let mut typed_envelope: TypedEnvelope<_> = managed_envelope.try_into().unwrap();
236 let mut item = Item::new(ItemType::OtelTracesData);
237 item.set_payload(ContentType::Json, traces_data);
238 typed_envelope.envelope_mut().add_item(item.clone());
239
240 convert_traces_data(item, &mut typed_envelope);
242
243 let item = typed_envelope
246 .envelope()
247 .items()
248 .find(|i| *i.ty() == ItemType::OtelSpan)
249 .expect("converted span missing from envelope");
250 let attributes = serde_json::from_slice::<OtelSpan>(&item.payload())
251 .expect("unable to deserialize otel span")
252 .attributes
253 .into_iter()
254 .map(|kv| (kv.key, kv.value.unwrap()))
255 .collect::<BTreeMap<_, _>>();
256 let attribute_value = |key: &str| -> String {
257 match attributes
258 .get(key)
259 .unwrap_or_else(|| panic!("attribute {} missing", key))
260 .to_owned()
261 .value
262 {
263 Some(Value::StringValue(str)) => str,
264 _ => panic!("attribute {} not a string", key),
265 }
266 };
267 assert_eq!(
268 attribute_value("span_key"),
269 "span_value".to_owned(),
270 "original span attribute should be present"
271 );
272 assert_eq!(
273 attribute_value("instrumentation.name"),
274 "test_instrumentation".to_owned(),
275 "instrumentation name should be in attributes"
276 );
277 assert_eq!(
278 attribute_value("instrumentation.version"),
279 "0.0.1".to_owned(),
280 "instrumentation version should be in attributes"
281 );
282 assert_eq!(
283 attribute_value("resource.resource_key"),
284 "resource_value".to_owned(),
285 "resource attribute should be copied with prefix"
286 );
287 assert_eq!(
288 attribute_value("instrumentation.scope_key"),
289 "scope_value".to_owned(),
290 "instruementation scope attribute should be copied with prefix"
291 );
292 }
293}