relay_server/services/projects/cache/
project.rs1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
5use relay_sampling::evaluation::ReservoirCounters;
6
7use crate::Envelope;
8use crate::envelope::ItemType;
9use crate::managed::{Managed, Rejected};
10use crate::services::outcome::{DiscardReason, Outcome};
11use crate::services::projects::cache::state::SharedProject;
12use crate::services::projects::project::ProjectState;
13use crate::utils::{CheckLimits, EnvelopeLimiter};
14
15pub struct Project<'a> {
17 shared: SharedProject,
18 config: &'a Config,
19}
20
21impl<'a> Project<'a> {
22 pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
23 Self { shared, config }
24 }
25
26 pub fn state(&self) -> &ProjectState {
28 self.shared.project_state()
29 }
30
31 pub fn rate_limits(&self) -> &CachedRateLimits {
33 self.shared.cached_rate_limits()
34 }
35
36 pub fn reservoir_counters(&self) -> &ReservoirCounters {
38 self.shared.reservoir_counters()
39 }
40
41 pub async fn check_envelope(
51 &self,
52 envelope: &mut Managed<Box<Envelope>>,
53 ) -> Result<RateLimits, Rejected<DiscardReason>> {
54 let state = match self.state() {
55 ProjectState::Enabled(state) => Some(Arc::clone(state)),
56 ProjectState::Disabled => {
57 let err = envelope
60 .reject_err(Outcome::Invalid(DiscardReason::ProjectId))
61 .map(|_| DiscardReason::ProjectId);
62 return Err(err);
63 }
64 ProjectState::Pending => None,
65 };
66
67 let mut scoping = envelope.scoping();
68
69 if let Some(ref state) = state {
70 scoping = state.scope_request(envelope.meta());
71 envelope.scope(scoping);
72
73 if let Err(reason) = state.check_envelope(envelope, self.config) {
74 return Err(envelope
75 .reject_err(Outcome::Invalid(reason))
76 .map(|_| reason));
77 }
78 }
79
80 let current_limits = self.rate_limits().current_limits();
81
82 let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
83
84 if current_limits
88 .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Transaction)])
89 {
90 ensure_span_count(envelope);
91 }
92
93 let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
94 let current_limits = Arc::clone(¤t_limits);
95 async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
96 });
97
98 let (enforcement, mut rate_limits) = envelope_limiter.compute(envelope, &scoping).await?;
99
100 enforcement.apply_to_managed(envelope);
101
102 if envelope.items().any(|i| i.ty().is_metrics()) {
106 let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
107 metrics_scoping.namespace = MetricNamespaceScoping::Any;
108 rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
109 }
110
111 Ok(rate_limits)
112 }
113}
114
115fn ensure_span_count(envelope: &mut Managed<Box<Envelope>>) {
116 envelope.modify(|envelope, records| {
117 if let Some(transaction_item) = envelope
118 .items_mut()
119 .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
120 {
121 records.lenient(DataCategory::Span);
123 records.lenient(DataCategory::SpanIndexed);
124 transaction_item.ensure_span_count();
125 }
126 });
127}
128
129#[cfg(test)]
130mod tests {
131 use crate::envelope::{ContentType, Envelope, Item};
132 use crate::extractors::RequestMeta;
133 use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
134 use relay_base_schema::project::{ProjectId, ProjectKey};
135 use relay_event_schema::protocol::EventId;
136 use serde_json::json;
137 use smallvec::smallvec;
138
139 use super::*;
140
141 fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
142 let mut project_info = ProjectInfo {
143 project_id: Some(ProjectId::new(42)),
144 ..Default::default()
145 };
146 project_info.public_keys = smallvec![PublicKeyConfig {
147 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
148 numeric_id: None,
149 }];
150
151 if let Some(data) = data {
152 project_info.config = serde_json::from_value(data).unwrap();
153 }
154
155 Project::new(
156 SharedProject::for_test(ProjectState::Enabled(project_info.into())),
157 config,
158 )
159 }
160
161 fn request_meta() -> RequestMeta {
162 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
163 .parse()
164 .unwrap();
165
166 RequestMeta::new(dsn)
167 }
168
169 fn get_span_count(envelope: &Envelope) -> usize {
170 envelope.items().next().unwrap().span_count()
171 }
172
173 #[tokio::test]
174 async fn test_track_nested_spans_outcomes() {
175 let config = Default::default();
176 let project = create_project(
177 &config,
178 Some(json!({
179 "quotas": [{
180 "id": "foo",
181 "categories": ["transaction"],
182 "window": 3600,
183 "limit": 0,
184 "reasonCode": "foo",
185 }]
186 })),
187 );
188
189 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
190
191 let mut transaction = Item::new(ItemType::Transaction);
192 transaction.set_payload(
193 ContentType::Json,
194 r#"{
195 "event_id": "52df9022835246eeb317dbd739ccd059",
196 "type": "transaction",
197 "transaction": "I have a stale timestamp, but I'm recent!",
198 "start_timestamp": 1,
199 "timestamp": 2,
200 "contexts": {
201 "trace": {
202 "trace_id": "ff62a8b040f340bda5d830223def1d81",
203 "span_id": "bd429c44b67a3eb4"
204 }
205 },
206 "spans": [
207 {
208 "span_id": "bd429c44b67a3eb4",
209 "start_timestamp": 1,
210 "timestamp": null,
211 "trace_id": "ff62a8b040f340bda5d830223def1d81"
212 },
213 {
214 "span_id": "bd429c44b67a3eb5",
215 "start_timestamp": 1,
216 "timestamp": null,
217 "trace_id": "ff62a8b040f340bda5d830223def1d81"
218 }
219 ]
220}"#,
221 );
222
223 envelope.add_item(transaction);
224
225 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
226
227 let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator);
228
229 assert_eq!(get_span_count(&managed_envelope), 0); project.check_envelope(&mut managed_envelope).await.unwrap();
231
232 let expected = [
233 (DataCategory::Transaction, 1),
234 (DataCategory::TransactionIndexed, 1),
235 (DataCategory::Span, 3),
236 (DataCategory::SpanIndexed, 3),
237 ];
238
239 for (expected_category, expected_quantity) in expected {
240 let outcome = outcome_aggregator_rx.recv().await.unwrap();
241 assert_eq!(outcome.category, expected_category);
242 assert_eq!(outcome.quantity, expected_quantity);
243 }
244 }
245
246 #[tokio::test]
247 async fn test_track_nested_spans_outcomes_predefined() {
248 let config = Default::default();
249 let project = create_project(
250 &config,
251 Some(json!({
252 "quotas": [{
253 "id": "foo",
254 "categories": ["transaction"],
255 "window": 3600,
256 "limit": 0,
257 "reasonCode": "foo",
258 }]
259 })),
260 );
261
262 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
263
264 let mut transaction = Item::new(ItemType::Transaction);
265 transaction.set_span_count(Some(666));
266 transaction.set_payload(
267 ContentType::Json,
268 r#"{
269 "event_id": "52df9022835246eeb317dbd739ccd059",
270 "type": "transaction",
271 "transaction": "I have a stale timestamp, but I'm recent!",
272 "start_timestamp": 1,
273 "timestamp": 2,
274 "contexts": {
275 "trace": {
276 "trace_id": "ff62a8b040f340bda5d830223def1d81",
277 "span_id": "bd429c44b67a3eb4"
278 }
279 },
280 "spans": []
281}"#,
282 );
283
284 envelope.add_item(transaction);
285
286 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
287
288 let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator);
289
290 assert_eq!(get_span_count(&managed_envelope), 666);
291 project.check_envelope(&mut managed_envelope).await.unwrap();
292
293 let expected = [
294 (DataCategory::Transaction, 1),
295 (DataCategory::TransactionIndexed, 1),
296 (DataCategory::Span, 667),
297 (DataCategory::SpanIndexed, 667),
298 ];
299
300 for (expected_category, expected_quantity) in expected {
301 let outcome = outcome_aggregator_rx.recv().await.unwrap();
302 assert_eq!(outcome.category, expected_category);
303 assert_eq!(outcome.quantity, expected_quantity);
304 }
305 }
306}