relay_server/services/projects/cache/
project.rs1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_dynamic_config::Feature;
5use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
6use relay_sampling::evaluation::ReservoirCounters;
7
8use crate::envelope::ItemType;
9use crate::managed::ManagedEnvelope;
10use crate::services::outcome::{DiscardReason, Outcome};
11use crate::services::projects::cache::state::SharedProject;
12use crate::services::projects::project::ProjectState;
13use crate::statsd::RelayTimers;
14use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter};
15
16pub struct Project<'a> {
18 shared: SharedProject,
19 config: &'a Config,
20}
21
22impl<'a> Project<'a> {
23 pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
24 Self { shared, config }
25 }
26
27 pub fn state(&self) -> &ProjectState {
29 self.shared.project_state()
30 }
31
32 pub fn rate_limits(&self) -> &CachedRateLimits {
34 self.shared.cached_rate_limits()
35 }
36
37 pub fn reservoir_counters(&self) -> &ReservoirCounters {
39 self.shared.reservoir_counters()
40 }
41
42 pub async fn check_envelope(
52 &self,
53 mut envelope: ManagedEnvelope,
54 ) -> Result<CheckedEnvelope, DiscardReason> {
55 let state = match self.state() {
56 ProjectState::Enabled(state) => Some(Arc::clone(state)),
57 ProjectState::Disabled => {
58 envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));
61 return Err(DiscardReason::ProjectId);
62 }
63 ProjectState::Pending => None,
64 };
65
66 let mut scoping = envelope.scoping();
67
68 if let Some(ref state) = state {
69 scoping = state.scope_request(envelope.envelope().meta());
70 envelope.scope(scoping);
71
72 if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) {
73 envelope.reject(Outcome::Invalid(reason));
74 return Err(reason);
75 }
76 }
77
78 let current_limits = self.rate_limits().current_limits();
79
80 let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
81 let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
82 let current_limits = Arc::clone(¤t_limits);
83 async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
84 });
85
86 let (mut enforcement, mut rate_limits) = envelope_limiter
87 .compute(envelope.envelope_mut(), &scoping)
88 .await?;
89
90 let check_nested_spans = state
91 .as_ref()
92 .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent));
93
94 if check_nested_spans {
97 relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), {
98 sync_spans_to_enforcement(&envelope, &mut enforcement);
99 });
100 }
101
102 enforcement.apply_with_outcomes(&mut envelope);
103
104 envelope.update();
105
106 if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
110 let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
111 metrics_scoping.namespace = MetricNamespaceScoping::Any;
112 rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
113 }
114
115 let envelope = if envelope.envelope().is_empty() {
116 envelope.reject(Outcome::RateLimited(None));
118 None
119 } else {
120 Some(envelope)
121 };
122
123 Ok(CheckedEnvelope {
124 envelope,
125 rate_limits,
126 })
127 }
128}
129
130#[derive(Debug)]
135pub struct CheckedEnvelope {
136 pub envelope: Option<ManagedEnvelope>,
137 pub rate_limits: RateLimits,
138}
139
140fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
146 if !enforcement.is_event_active() {
147 return;
148 }
149
150 let spans_count = count_nested_spans(envelope);
151 if spans_count == 0 {
152 return;
153 }
154
155 if enforcement.event.is_active() {
156 enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count);
157 }
158
159 if enforcement.event_indexed.is_active() {
160 enforcement.spans_indexed = enforcement
161 .event_indexed
162 .clone_for(DataCategory::SpanIndexed, spans_count);
163 }
164}
165
166fn count_nested_spans(envelope: &ManagedEnvelope) -> usize {
168 #[derive(Debug, serde::Deserialize)]
169 struct PartialEvent {
170 spans: crate::utils::SeqCount,
171 }
172
173 envelope
174 .envelope()
175 .items()
176 .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
177 .and_then(|item| serde_json::from_slice::<PartialEvent>(&item.payload()).ok())
178 .map_or(0, |event| event.spans.0 + 1)
181}
182
183#[cfg(test)]
184mod tests {
185 use crate::envelope::{ContentType, Envelope, Item};
186 use crate::extractors::RequestMeta;
187 use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
188 use relay_base_schema::project::{ProjectId, ProjectKey};
189 use relay_event_schema::protocol::EventId;
190 use serde_json::json;
191 use smallvec::smallvec;
192
193 use super::*;
194
195 fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
196 let mut project_info = ProjectInfo {
197 project_id: Some(ProjectId::new(42)),
198 ..Default::default()
199 };
200 project_info.public_keys = smallvec![PublicKeyConfig {
201 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
202 numeric_id: None,
203 }];
204
205 if let Some(data) = data {
206 project_info.config = serde_json::from_value(data).unwrap();
207 }
208
209 Project::new(
210 SharedProject::for_test(ProjectState::Enabled(project_info.into())),
211 config,
212 )
213 }
214
215 fn request_meta() -> RequestMeta {
216 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
217 .parse()
218 .unwrap();
219
220 RequestMeta::new(dsn)
221 }
222
223 #[tokio::test]
224 async fn test_track_nested_spans_outcomes() {
225 let config = Default::default();
226 let project = create_project(
227 &config,
228 Some(json!({
229 "features": [
230 "organizations:indexed-spans-extraction"
231 ],
232 "quotas": [{
233 "id": "foo",
234 "categories": ["transaction"],
235 "window": 3600,
236 "limit": 0,
237 "reasonCode": "foo",
238 }]
239 })),
240 );
241
242 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
243
244 let mut transaction = Item::new(ItemType::Transaction);
245 transaction.set_payload(
246 ContentType::Json,
247 r#"{
248 "event_id": "52df9022835246eeb317dbd739ccd059",
249 "type": "transaction",
250 "transaction": "I have a stale timestamp, but I'm recent!",
251 "start_timestamp": 1,
252 "timestamp": 2,
253 "contexts": {
254 "trace": {
255 "trace_id": "ff62a8b040f340bda5d830223def1d81",
256 "span_id": "bd429c44b67a3eb4"
257 }
258 },
259 "spans": [
260 {
261 "span_id": "bd429c44b67a3eb4",
262 "start_timestamp": 1,
263 "timestamp": null,
264 "trace_id": "ff62a8b040f340bda5d830223def1d81"
265 },
266 {
267 "span_id": "bd429c44b67a3eb5",
268 "start_timestamp": 1,
269 "timestamp": null,
270 "trace_id": "ff62a8b040f340bda5d830223def1d81"
271 }
272 ]
273}"#,
274 );
275
276 envelope.add_item(transaction);
277
278 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
279
280 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone());
281
282 project.check_envelope(managed_envelope).await.unwrap();
283 drop(outcome_aggregator);
284
285 let expected = [
286 (DataCategory::Transaction, 1),
287 (DataCategory::TransactionIndexed, 1),
288 (DataCategory::Span, 3),
289 (DataCategory::SpanIndexed, 3),
290 ];
291
292 for (expected_category, expected_quantity) in expected {
293 let outcome = outcome_aggregator_rx.recv().await.unwrap();
294 assert_eq!(outcome.category, expected_category);
295 assert_eq!(outcome.quantity, expected_quantity);
296 }
297 }
298}