relay_server/services/projects/cache/
project.rs1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_dynamic_config::Feature;
5use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
6use relay_sampling::evaluation::ReservoirCounters;
7
8use crate::envelope::ItemType;
9use crate::managed::ManagedEnvelope;
10use crate::services::outcome::{DiscardReason, Outcome};
11use crate::services::projects::cache::state::SharedProject;
12use crate::services::projects::project::ProjectState;
13use crate::statsd::RelayTimers;
14use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter};
15
16pub struct Project<'a> {
18 shared: SharedProject,
19 config: &'a Config,
20}
21
22impl<'a> Project<'a> {
23 pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
24 Self { shared, config }
25 }
26
27 pub fn state(&self) -> &ProjectState {
29 self.shared.project_state()
30 }
31
32 pub fn rate_limits(&self) -> &CachedRateLimits {
34 self.shared.cached_rate_limits()
35 }
36
37 pub fn reservoir_counters(&self) -> &ReservoirCounters {
39 self.shared.reservoir_counters()
40 }
41
42 pub async fn check_envelope(
52 &self,
53 mut envelope: ManagedEnvelope,
54 ) -> Result<CheckedEnvelope, DiscardReason> {
55 let state = match self.state() {
56 ProjectState::Enabled(state) => Some(Arc::clone(state)),
57 ProjectState::Disabled => {
58 envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));
61 return Err(DiscardReason::ProjectId);
62 }
63 ProjectState::Pending => None,
64 };
65
66 let mut scoping = envelope.scoping();
67
68 if let Some(ref state) = state {
69 scoping = state.scope_request(envelope.envelope().meta());
70 envelope.scope(scoping);
71
72 if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) {
73 envelope.reject(Outcome::Invalid(reason));
74 return Err(reason);
75 }
76 }
77
78 let current_limits = self.rate_limits().current_limits();
79
80 let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
81 let current_limits_clone = current_limits.clone();
82 let envelope_limiter =
83 EnvelopeLimiter::new(CheckLimits::NonIndexed, move |item_scoping, _| {
84 let current_limits_clone = current_limits_clone.clone();
85
86 async move { Ok(current_limits_clone.check_with_quotas(quotas, item_scoping)) }
87 });
88
89 let (mut enforcement, mut rate_limits) = envelope_limiter
90 .compute(envelope.envelope_mut(), &scoping)
91 .await?;
92
93 let check_nested_spans = state
94 .as_ref()
95 .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent));
96
97 if check_nested_spans {
100 relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), {
101 sync_spans_to_enforcement(&envelope, &mut enforcement);
102 });
103 }
104
105 enforcement.apply_with_outcomes(&mut envelope);
106
107 envelope.update();
108
109 if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
113 let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
114 metrics_scoping.namespace = MetricNamespaceScoping::Any;
115 rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
116 }
117
118 let envelope = if envelope.envelope().is_empty() {
119 envelope.reject(Outcome::RateLimited(None));
121 None
122 } else {
123 Some(envelope)
124 };
125
126 Ok(CheckedEnvelope {
127 envelope,
128 rate_limits,
129 })
130 }
131}
132
133#[derive(Debug)]
138pub struct CheckedEnvelope {
139 pub envelope: Option<ManagedEnvelope>,
140 pub rate_limits: RateLimits,
141}
142
143fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
149 if !enforcement.is_event_active() {
150 return;
151 }
152
153 let spans_count = count_nested_spans(envelope);
154 if spans_count == 0 {
155 return;
156 }
157
158 if enforcement.event.is_active() {
159 enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count);
160 }
161
162 if enforcement.event_indexed.is_active() {
163 enforcement.spans_indexed = enforcement
164 .event_indexed
165 .clone_for(DataCategory::SpanIndexed, spans_count);
166 }
167}
168
169fn count_nested_spans(envelope: &ManagedEnvelope) -> usize {
171 #[derive(Debug, serde::Deserialize)]
172 struct PartialEvent {
173 spans: crate::utils::SeqCount,
174 }
175
176 envelope
177 .envelope()
178 .items()
179 .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
180 .and_then(|item| serde_json::from_slice::<PartialEvent>(&item.payload()).ok())
181 .map_or(0, |event| event.spans.0 + 1)
184}
185
186#[cfg(test)]
187mod tests {
188 use crate::envelope::{ContentType, Envelope, Item};
189 use crate::extractors::RequestMeta;
190 use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
191 use relay_base_schema::project::{ProjectId, ProjectKey};
192 use relay_event_schema::protocol::EventId;
193 use serde_json::json;
194 use smallvec::smallvec;
195
196 use super::*;
197
198 fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
199 let mut project_info = ProjectInfo {
200 project_id: Some(ProjectId::new(42)),
201 ..Default::default()
202 };
203 project_info.public_keys = smallvec![PublicKeyConfig {
204 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
205 numeric_id: None,
206 }];
207
208 if let Some(data) = data {
209 project_info.config = serde_json::from_value(data).unwrap();
210 }
211
212 Project::new(
213 SharedProject::for_test(ProjectState::Enabled(project_info.into())),
214 config,
215 )
216 }
217
218 fn request_meta() -> RequestMeta {
219 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
220 .parse()
221 .unwrap();
222
223 RequestMeta::new(dsn)
224 }
225
226 #[tokio::test]
227 async fn test_track_nested_spans_outcomes() {
228 let config = Default::default();
229 let project = create_project(
230 &config,
231 Some(json!({
232 "features": [
233 "organizations:indexed-spans-extraction"
234 ],
235 "quotas": [{
236 "id": "foo",
237 "categories": ["transaction"],
238 "window": 3600,
239 "limit": 0,
240 "reasonCode": "foo",
241 }]
242 })),
243 );
244
245 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
246
247 let mut transaction = Item::new(ItemType::Transaction);
248 transaction.set_payload(
249 ContentType::Json,
250 r#"{
251 "event_id": "52df9022835246eeb317dbd739ccd059",
252 "type": "transaction",
253 "transaction": "I have a stale timestamp, but I'm recent!",
254 "start_timestamp": 1,
255 "timestamp": 2,
256 "contexts": {
257 "trace": {
258 "trace_id": "ff62a8b040f340bda5d830223def1d81",
259 "span_id": "bd429c44b67a3eb4"
260 }
261 },
262 "spans": [
263 {
264 "span_id": "bd429c44b67a3eb4",
265 "start_timestamp": 1,
266 "timestamp": null,
267 "trace_id": "ff62a8b040f340bda5d830223def1d81"
268 },
269 {
270 "span_id": "bd429c44b67a3eb5",
271 "start_timestamp": 1,
272 "timestamp": null,
273 "trace_id": "ff62a8b040f340bda5d830223def1d81"
274 }
275 ]
276}"#,
277 );
278
279 envelope.add_item(transaction);
280
281 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
282 let (test_store, _) = relay_system::Addr::custom();
283
284 let managed_envelope =
285 ManagedEnvelope::new(envelope, outcome_aggregator.clone(), test_store);
286
287 project.check_envelope(managed_envelope).await.unwrap();
288 drop(outcome_aggregator);
289
290 let expected = [
291 (DataCategory::Transaction, 1),
292 (DataCategory::TransactionIndexed, 1),
293 (DataCategory::Span, 3),
294 (DataCategory::SpanIndexed, 3),
295 ];
296
297 for (expected_category, expected_quantity) in expected {
298 let outcome = outcome_aggregator_rx.recv().await.unwrap();
299 assert_eq!(outcome.category, expected_category);
300 assert_eq!(outcome.quantity, expected_quantity);
301 }
302 }
303}