relay_server/services/projects/cache/
project.rs1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
5
6use crate::Envelope;
7use crate::envelope::ItemType;
8use crate::managed::{Managed, Rejected};
9use crate::services::outcome::{DiscardReason, Outcome};
10use crate::services::projects::cache::state::SharedProject;
11use crate::services::projects::project::ProjectState;
12use crate::utils::{CheckLimits, EnvelopeLimiter};
13
14pub struct Project<'a> {
16 shared: SharedProject,
17 config: &'a Config,
18}
19
20impl<'a> Project<'a> {
21 pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
22 Self { shared, config }
23 }
24
25 pub fn state(&self) -> &ProjectState {
27 self.shared.project_state()
28 }
29
30 pub fn rate_limits(&self) -> &CachedRateLimits {
32 self.shared.cached_rate_limits()
33 }
34
35 pub async fn check_envelope(
45 &self,
46 envelope: &mut Managed<Box<Envelope>>,
47 ) -> Result<RateLimits, Rejected<DiscardReason>> {
48 let state = match self.state() {
49 ProjectState::Enabled(state) => Some(Arc::clone(state)),
50 ProjectState::Dummy => None,
51 ProjectState::Disabled => {
52 let err = envelope
55 .reject_err(Outcome::Invalid(DiscardReason::ProjectId))
56 .map(|_| DiscardReason::ProjectId);
57 return Err(err);
58 }
59 ProjectState::Pending => None,
60 };
61
62 let mut scoping = envelope.scoping();
63
64 if let Some(ref state) = state {
65 scoping = state.scope_request(envelope.meta());
66 envelope.scope(scoping);
67
68 if let Err(reason) = state.check_envelope(envelope, self.config) {
69 return Err(envelope
70 .reject_err(Outcome::Invalid(reason))
71 .map(|_| reason));
72 }
73 }
74
75 let current_limits = self.rate_limits().current_limits();
76
77 let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
78
79 if current_limits
83 .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Transaction)])
84 {
85 ensure_span_count(envelope);
86 }
87
88 let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
89 let current_limits = Arc::clone(¤t_limits);
90 async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
91 });
92
93 let (enforcement, mut rate_limits) = envelope_limiter.compute(envelope, &scoping).await?;
94
95 enforcement.apply_to_managed(envelope);
96
97 if envelope.items().any(|i| i.ty().is_metrics()) {
101 let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
102 metrics_scoping.namespace = MetricNamespaceScoping::Any;
103 rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
104 }
105
106 Ok(rate_limits)
107 }
108}
109
110fn ensure_span_count(envelope: &mut Managed<Box<Envelope>>) {
111 envelope.modify(|envelope, records| {
112 if let Some(transaction_item) = envelope
113 .items_mut()
114 .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
115 {
116 records.lenient(DataCategory::Span);
118 records.lenient(DataCategory::SpanIndexed);
119 transaction_item.ensure_span_count();
120 }
121 });
122}
123
124#[cfg(test)]
125mod tests {
126 use crate::envelope::{ContentType, Envelope, Item};
127 use crate::extractors::RequestMeta;
128 use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
129 use relay_base_schema::project::{ProjectId, ProjectKey};
130 use relay_event_schema::protocol::EventId;
131 use serde_json::json;
132 use smallvec::smallvec;
133
134 use super::*;
135
136 fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
137 let mut project_info = ProjectInfo {
138 project_id: Some(ProjectId::new(42)),
139 ..Default::default()
140 };
141 project_info.public_keys = smallvec![PublicKeyConfig {
142 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
143 numeric_id: None,
144 }];
145
146 if let Some(data) = data {
147 project_info.config = serde_json::from_value(data).unwrap();
148 }
149
150 Project::new(
151 SharedProject::for_test(ProjectState::Enabled(project_info.into())),
152 config,
153 )
154 }
155
156 fn request_meta() -> RequestMeta {
157 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
158 .parse()
159 .unwrap();
160
161 RequestMeta::new(dsn)
162 }
163
164 fn get_span_count(envelope: &Envelope) -> usize {
165 envelope.items().next().unwrap().span_count() as usize
166 }
167
168 #[tokio::test]
169 async fn test_track_nested_spans_outcomes() {
170 let config = Default::default();
171 let project = create_project(
172 &config,
173 Some(json!({
174 "quotas": [{
175 "id": "foo",
176 "categories": ["transaction"],
177 "window": 3600,
178 "limit": 0,
179 "reasonCode": "foo",
180 }]
181 })),
182 );
183
184 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
185
186 let mut transaction = Item::new(ItemType::Transaction);
187 transaction.set_payload(
188 ContentType::Json,
189 r#"{
190 "event_id": "52df9022835246eeb317dbd739ccd059",
191 "type": "transaction",
192 "transaction": "I have a stale timestamp, but I'm recent!",
193 "start_timestamp": 1,
194 "timestamp": 2,
195 "contexts": {
196 "trace": {
197 "trace_id": "ff62a8b040f340bda5d830223def1d81",
198 "span_id": "bd429c44b67a3eb4"
199 }
200 },
201 "spans": [
202 {
203 "span_id": "bd429c44b67a3eb4",
204 "start_timestamp": 1,
205 "timestamp": null,
206 "trace_id": "ff62a8b040f340bda5d830223def1d81"
207 },
208 {
209 "span_id": "bd429c44b67a3eb5",
210 "start_timestamp": 1,
211 "timestamp": null,
212 "trace_id": "ff62a8b040f340bda5d830223def1d81"
213 }
214 ]
215}"#,
216 );
217
218 envelope.add_item(transaction);
219
220 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
221
222 let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator);
223
224 assert_eq!(get_span_count(&managed_envelope), 0); project.check_envelope(&mut managed_envelope).await.unwrap();
226
227 let expected = [
228 (DataCategory::Transaction, 1),
229 (DataCategory::TransactionIndexed, 1),
230 (DataCategory::Span, 3),
231 (DataCategory::SpanIndexed, 3),
232 ];
233
234 for (expected_category, expected_quantity) in expected {
235 let outcome = outcome_aggregator_rx.recv().await.unwrap();
236 assert_eq!(outcome.category, expected_category);
237 assert_eq!(outcome.quantity, expected_quantity);
238 }
239 }
240
241 #[tokio::test]
242 async fn test_track_nested_spans_outcomes_predefined() {
243 let config = Default::default();
244 let project = create_project(
245 &config,
246 Some(json!({
247 "quotas": [{
248 "id": "foo",
249 "categories": ["transaction"],
250 "window": 3600,
251 "limit": 0,
252 "reasonCode": "foo",
253 }]
254 })),
255 );
256
257 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
258
259 let mut transaction = Item::new(ItemType::Transaction);
260 transaction.set_span_count(Some(666));
261 transaction.set_payload(
262 ContentType::Json,
263 r#"{
264 "event_id": "52df9022835246eeb317dbd739ccd059",
265 "type": "transaction",
266 "transaction": "I have a stale timestamp, but I'm recent!",
267 "start_timestamp": 1,
268 "timestamp": 2,
269 "contexts": {
270 "trace": {
271 "trace_id": "ff62a8b040f340bda5d830223def1d81",
272 "span_id": "bd429c44b67a3eb4"
273 }
274 },
275 "spans": []
276}"#,
277 );
278
279 envelope.add_item(transaction);
280
281 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
282
283 let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator);
284
285 assert_eq!(get_span_count(&managed_envelope), 666);
286 project.check_envelope(&mut managed_envelope).await.unwrap();
287
288 let expected = [
289 (DataCategory::Transaction, 1),
290 (DataCategory::TransactionIndexed, 1),
291 (DataCategory::Span, 667),
292 (DataCategory::SpanIndexed, 667),
293 ];
294
295 for (expected_category, expected_quantity) in expected {
296 let outcome = outcome_aggregator_rx.recv().await.unwrap();
297 assert_eq!(outcome.category, expected_category);
298 assert_eq!(outcome.quantity, expected_quantity);
299 }
300 }
301}