relay_server/services/projects/cache/
project.rs1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
5use relay_sampling::evaluation::ReservoirCounters;
6
7use crate::envelope::ItemType;
8use crate::managed::ManagedEnvelope;
9use crate::services::outcome::{DiscardReason, Outcome};
10use crate::services::projects::cache::state::SharedProject;
11use crate::services::projects::project::ProjectState;
12use crate::utils::{CheckLimits, EnvelopeLimiter};
13
14pub struct Project<'a> {
16 shared: SharedProject,
17 config: &'a Config,
18}
19
20impl<'a> Project<'a> {
21 pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
22 Self { shared, config }
23 }
24
25 pub fn state(&self) -> &ProjectState {
27 self.shared.project_state()
28 }
29
30 pub fn rate_limits(&self) -> &CachedRateLimits {
32 self.shared.cached_rate_limits()
33 }
34
35 pub fn reservoir_counters(&self) -> &ReservoirCounters {
37 self.shared.reservoir_counters()
38 }
39
40 pub async fn check_envelope(
50 &self,
51 mut envelope: ManagedEnvelope,
52 ) -> Result<CheckedEnvelope, DiscardReason> {
53 let state = match self.state() {
54 ProjectState::Enabled(state) => Some(Arc::clone(state)),
55 ProjectState::Disabled => {
56 envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));
59 return Err(DiscardReason::ProjectId);
60 }
61 ProjectState::Pending => None,
62 };
63
64 let mut scoping = envelope.scoping();
65
66 if let Some(ref state) = state {
67 scoping = state.scope_request(envelope.envelope().meta());
68 envelope.scope(scoping);
69
70 if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) {
71 envelope.reject(Outcome::Invalid(reason));
72 return Err(reason);
73 }
74 }
75
76 let current_limits = self.rate_limits().current_limits();
77
78 let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
79
80 if current_limits
84 .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Transaction)])
85 {
86 ensure_span_count(&mut envelope);
87 }
88
89 let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
90 let current_limits = Arc::clone(¤t_limits);
91 async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
92 });
93
94 let (enforcement, mut rate_limits) = envelope_limiter
95 .compute(envelope.envelope_mut(), &scoping)
96 .await?;
97
98 enforcement.apply_with_outcomes(&mut envelope);
99
100 envelope.update();
101
102 if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
106 let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
107 metrics_scoping.namespace = MetricNamespaceScoping::Any;
108 rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
109 }
110
111 let envelope = if envelope.envelope().is_empty() {
112 envelope.reject(Outcome::RateLimited(None));
114 None
115 } else {
116 Some(envelope)
117 };
118
119 Ok(CheckedEnvelope {
120 envelope,
121 rate_limits,
122 })
123 }
124}
125
126#[derive(Debug)]
131pub struct CheckedEnvelope {
132 pub envelope: Option<ManagedEnvelope>,
133 pub rate_limits: RateLimits,
134}
135
136fn ensure_span_count(envelope: &mut ManagedEnvelope) {
137 if let Some(transaction_item) = envelope
138 .envelope_mut()
139 .items_mut()
140 .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
141 {
142 transaction_item.ensure_span_count();
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use crate::envelope::{ContentType, Envelope, Item};
149 use crate::extractors::RequestMeta;
150 use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
151 use relay_base_schema::project::{ProjectId, ProjectKey};
152 use relay_event_schema::protocol::EventId;
153 use serde_json::json;
154 use smallvec::smallvec;
155
156 use super::*;
157
158 fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
159 let mut project_info = ProjectInfo {
160 project_id: Some(ProjectId::new(42)),
161 ..Default::default()
162 };
163 project_info.public_keys = smallvec![PublicKeyConfig {
164 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
165 numeric_id: None,
166 }];
167
168 if let Some(data) = data {
169 project_info.config = serde_json::from_value(data).unwrap();
170 }
171
172 Project::new(
173 SharedProject::for_test(ProjectState::Enabled(project_info.into())),
174 config,
175 )
176 }
177
178 fn request_meta() -> RequestMeta {
179 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
180 .parse()
181 .unwrap();
182
183 RequestMeta::new(dsn)
184 }
185
186 fn get_span_count(managed_envelope: &ManagedEnvelope) -> usize {
187 managed_envelope
188 .envelope()
189 .items()
190 .next()
191 .unwrap()
192 .span_count()
193 }
194
195 #[tokio::test]
196 async fn test_track_nested_spans_outcomes() {
197 let config = Default::default();
198 let project = create_project(
199 &config,
200 Some(json!({
201 "quotas": [{
202 "id": "foo",
203 "categories": ["transaction"],
204 "window": 3600,
205 "limit": 0,
206 "reasonCode": "foo",
207 }]
208 })),
209 );
210
211 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
212
213 let mut transaction = Item::new(ItemType::Transaction);
214 transaction.set_payload(
215 ContentType::Json,
216 r#"{
217 "event_id": "52df9022835246eeb317dbd739ccd059",
218 "type": "transaction",
219 "transaction": "I have a stale timestamp, but I'm recent!",
220 "start_timestamp": 1,
221 "timestamp": 2,
222 "contexts": {
223 "trace": {
224 "trace_id": "ff62a8b040f340bda5d830223def1d81",
225 "span_id": "bd429c44b67a3eb4"
226 }
227 },
228 "spans": [
229 {
230 "span_id": "bd429c44b67a3eb4",
231 "start_timestamp": 1,
232 "timestamp": null,
233 "trace_id": "ff62a8b040f340bda5d830223def1d81"
234 },
235 {
236 "span_id": "bd429c44b67a3eb5",
237 "start_timestamp": 1,
238 "timestamp": null,
239 "trace_id": "ff62a8b040f340bda5d830223def1d81"
240 }
241 ]
242}"#,
243 );
244
245 envelope.add_item(transaction);
246
247 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
248
249 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone());
250
251 assert_eq!(get_span_count(&managed_envelope), 0); project.check_envelope(managed_envelope).await.unwrap();
253
254 drop(outcome_aggregator);
255
256 let expected = [
257 (DataCategory::Transaction, 1),
258 (DataCategory::TransactionIndexed, 1),
259 (DataCategory::Span, 3),
260 (DataCategory::SpanIndexed, 3),
261 ];
262
263 for (expected_category, expected_quantity) in expected {
264 let outcome = outcome_aggregator_rx.recv().await.unwrap();
265 assert_eq!(outcome.category, expected_category);
266 assert_eq!(outcome.quantity, expected_quantity);
267 }
268 }
269
270 #[tokio::test]
271 async fn test_track_nested_spans_outcomes_predefined() {
272 let config = Default::default();
273 let project = create_project(
274 &config,
275 Some(json!({
276 "quotas": [{
277 "id": "foo",
278 "categories": ["transaction"],
279 "window": 3600,
280 "limit": 0,
281 "reasonCode": "foo",
282 }]
283 })),
284 );
285
286 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
287
288 let mut transaction = Item::new(ItemType::Transaction);
289 transaction.set_span_count(Some(666));
290 transaction.set_payload(
291 ContentType::Json,
292 r#"{
293 "event_id": "52df9022835246eeb317dbd739ccd059",
294 "type": "transaction",
295 "transaction": "I have a stale timestamp, but I'm recent!",
296 "start_timestamp": 1,
297 "timestamp": 2,
298 "contexts": {
299 "trace": {
300 "trace_id": "ff62a8b040f340bda5d830223def1d81",
301 "span_id": "bd429c44b67a3eb4"
302 }
303 },
304 "spans": []
305}"#,
306 );
307
308 envelope.add_item(transaction);
309
310 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
311
312 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone());
313
314 assert_eq!(get_span_count(&managed_envelope), 666);
315 project.check_envelope(managed_envelope).await.unwrap();
316
317 drop(outcome_aggregator);
318
319 let expected = [
320 (DataCategory::Transaction, 1),
321 (DataCategory::TransactionIndexed, 1),
322 (DataCategory::Span, 667),
323 (DataCategory::SpanIndexed, 667),
324 ];
325
326 for (expected_category, expected_quantity) in expected {
327 let outcome = outcome_aggregator_rx.recv().await.unwrap();
328 assert_eq!(outcome.category, expected_category);
329 assert_eq!(outcome.quantity, expected_quantity);
330 }
331 }
332}