relay_server/services/projects/cache/
project.rs

1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
5use relay_sampling::evaluation::ReservoirCounters;
6
7use crate::envelope::ItemType;
8use crate::managed::ManagedEnvelope;
9use crate::services::outcome::{DiscardReason, Outcome};
10use crate::services::projects::cache::state::SharedProject;
11use crate::services::projects::project::ProjectState;
12use crate::utils::{CheckLimits, EnvelopeLimiter};
13
14/// A loaded project.
15pub struct Project<'a> {
16    shared: SharedProject,
17    config: &'a Config,
18}
19
20impl<'a> Project<'a> {
21    pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
22        Self { shared, config }
23    }
24
25    /// Returns a reference to the currently cached project state.
26    pub fn state(&self) -> &ProjectState {
27        self.shared.project_state()
28    }
29
30    /// Returns a reference to the currently cached rate limits.
31    pub fn rate_limits(&self) -> &CachedRateLimits {
32        self.shared.cached_rate_limits()
33    }
34
35    /// Returns a reference to the currently reservoir counters.
36    pub fn reservoir_counters(&self) -> &ReservoirCounters {
37        self.shared.reservoir_counters()
38    }
39
40    /// Checks the envelope against project configuration and rate limits.
41    ///
42    /// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated
43    /// project state may be used, or otherwise the envelope is passed through unaltered.
44    ///
45    /// To check the envelope, this runs:
46    ///  - Validate origins and public keys
47    ///  - Quotas with a limit of `0`
48    ///  - Cached rate limits
49    pub async fn check_envelope(
50        &self,
51        mut envelope: ManagedEnvelope,
52    ) -> Result<CheckedEnvelope, DiscardReason> {
53        let state = match self.state() {
54            ProjectState::Enabled(state) => Some(Arc::clone(state)),
55            ProjectState::Disabled => {
56                // TODO(jjbayer): We should refactor this function to either return a Result or
57                // handle envelope rejections internally, but not both.
58                envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));
59                return Err(DiscardReason::ProjectId);
60            }
61            ProjectState::Pending => None,
62        };
63
64        let mut scoping = envelope.scoping();
65
66        if let Some(ref state) = state {
67            scoping = state.scope_request(envelope.envelope().meta());
68            envelope.scope(scoping);
69
70            if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) {
71                envelope.reject(Outcome::Invalid(reason));
72                return Err(reason);
73            }
74        }
75
76        let current_limits = self.rate_limits().current_limits();
77
78        let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
79
80        // To get the correct span outcomes, we have to partially parse the event payload
81        // and count the spans contained in the transaction events.
82        // For performance reasons, we only do this if there is an active limit on `Transaction`.
83        if current_limits
84            .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Transaction)])
85        {
86            ensure_span_count(&mut envelope);
87        }
88
89        let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
90            let current_limits = Arc::clone(&current_limits);
91            async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
92        });
93
94        let (enforcement, mut rate_limits) = envelope_limiter
95            .compute(envelope.envelope_mut(), &scoping)
96            .await?;
97
98        enforcement.apply_with_outcomes(&mut envelope);
99
100        envelope.update();
101
102        // Special case: Expose active rate limits for all metric namespaces if there is at least
103        // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary
104        // because `EnvelopeLimiter` cannot not check metrics without parsing item contents.
105        if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
106            let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
107            metrics_scoping.namespace = MetricNamespaceScoping::Any;
108            rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
109        }
110
111        let envelope = if envelope.envelope().is_empty() {
112            // Individual rate limits have already been issued above
113            envelope.reject(Outcome::RateLimited(None));
114            None
115        } else {
116            Some(envelope)
117        };
118
119        Ok(CheckedEnvelope {
120            envelope,
121            rate_limits,
122        })
123    }
124}
125
126/// A checked envelope and associated rate limits.
127///
128/// Items violating the rate limits have been removed from the envelope. If all items are removed
129/// from the envelope, `None` is returned in place of the envelope.
130#[derive(Debug)]
131pub struct CheckedEnvelope {
132    pub envelope: Option<ManagedEnvelope>,
133    pub rate_limits: RateLimits,
134}
135
136fn ensure_span_count(envelope: &mut ManagedEnvelope) {
137    if let Some(transaction_item) = envelope
138        .envelope_mut()
139        .items_mut()
140        .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
141    {
142        transaction_item.ensure_span_count();
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use crate::envelope::{ContentType, Envelope, Item};
149    use crate::extractors::RequestMeta;
150    use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
151    use relay_base_schema::project::{ProjectId, ProjectKey};
152    use relay_event_schema::protocol::EventId;
153    use serde_json::json;
154    use smallvec::smallvec;
155
156    use super::*;
157
158    fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
159        let mut project_info = ProjectInfo {
160            project_id: Some(ProjectId::new(42)),
161            ..Default::default()
162        };
163        project_info.public_keys = smallvec![PublicKeyConfig {
164            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
165            numeric_id: None,
166        }];
167
168        if let Some(data) = data {
169            project_info.config = serde_json::from_value(data).unwrap();
170        }
171
172        Project::new(
173            SharedProject::for_test(ProjectState::Enabled(project_info.into())),
174            config,
175        )
176    }
177
178    fn request_meta() -> RequestMeta {
179        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
180            .parse()
181            .unwrap();
182
183        RequestMeta::new(dsn)
184    }
185
186    fn get_span_count(managed_envelope: &ManagedEnvelope) -> usize {
187        managed_envelope
188            .envelope()
189            .items()
190            .next()
191            .unwrap()
192            .span_count()
193    }
194
195    #[tokio::test]
196    async fn test_track_nested_spans_outcomes() {
197        let config = Default::default();
198        let project = create_project(
199            &config,
200            Some(json!({
201                "quotas": [{
202                   "id": "foo",
203                   "categories": ["transaction"],
204                   "window": 3600,
205                   "limit": 0,
206                   "reasonCode": "foo",
207               }]
208            })),
209        );
210
211        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
212
213        let mut transaction = Item::new(ItemType::Transaction);
214        transaction.set_payload(
215            ContentType::Json,
216            r#"{
217  "event_id": "52df9022835246eeb317dbd739ccd059",
218  "type": "transaction",
219  "transaction": "I have a stale timestamp, but I'm recent!",
220  "start_timestamp": 1,
221  "timestamp": 2,
222  "contexts": {
223    "trace": {
224      "trace_id": "ff62a8b040f340bda5d830223def1d81",
225      "span_id": "bd429c44b67a3eb4"
226    }
227  },
228  "spans": [
229    {
230      "span_id": "bd429c44b67a3eb4",
231      "start_timestamp": 1,
232      "timestamp": null,
233      "trace_id": "ff62a8b040f340bda5d830223def1d81"
234    },
235    {
236      "span_id": "bd429c44b67a3eb5",
237      "start_timestamp": 1,
238      "timestamp": null,
239      "trace_id": "ff62a8b040f340bda5d830223def1d81"
240    }
241  ]
242}"#,
243        );
244
245        envelope.add_item(transaction);
246
247        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
248
249        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone());
250
251        assert_eq!(get_span_count(&managed_envelope), 0); // not written yet
252        project.check_envelope(managed_envelope).await.unwrap();
253
254        drop(outcome_aggregator);
255
256        let expected = [
257            (DataCategory::Transaction, 1),
258            (DataCategory::TransactionIndexed, 1),
259            (DataCategory::Span, 3),
260            (DataCategory::SpanIndexed, 3),
261        ];
262
263        for (expected_category, expected_quantity) in expected {
264            let outcome = outcome_aggregator_rx.recv().await.unwrap();
265            assert_eq!(outcome.category, expected_category);
266            assert_eq!(outcome.quantity, expected_quantity);
267        }
268    }
269
270    #[tokio::test]
271    async fn test_track_nested_spans_outcomes_predefined() {
272        let config = Default::default();
273        let project = create_project(
274            &config,
275            Some(json!({
276                "quotas": [{
277                   "id": "foo",
278                   "categories": ["transaction"],
279                   "window": 3600,
280                   "limit": 0,
281                   "reasonCode": "foo",
282               }]
283            })),
284        );
285
286        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
287
288        let mut transaction = Item::new(ItemType::Transaction);
289        transaction.set_span_count(Some(666));
290        transaction.set_payload(
291            ContentType::Json,
292            r#"{
293  "event_id": "52df9022835246eeb317dbd739ccd059",
294  "type": "transaction",
295  "transaction": "I have a stale timestamp, but I'm recent!",
296  "start_timestamp": 1,
297  "timestamp": 2,
298  "contexts": {
299    "trace": {
300      "trace_id": "ff62a8b040f340bda5d830223def1d81",
301      "span_id": "bd429c44b67a3eb4"
302    }
303  },
304  "spans": []
305}"#,
306        );
307
308        envelope.add_item(transaction);
309
310        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
311
312        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone());
313
314        assert_eq!(get_span_count(&managed_envelope), 666);
315        project.check_envelope(managed_envelope).await.unwrap();
316
317        drop(outcome_aggregator);
318
319        let expected = [
320            (DataCategory::Transaction, 1),
321            (DataCategory::TransactionIndexed, 1),
322            (DataCategory::Span, 667),
323            (DataCategory::SpanIndexed, 667),
324        ];
325
326        for (expected_category, expected_quantity) in expected {
327            let outcome = outcome_aggregator_rx.recv().await.unwrap();
328            assert_eq!(outcome.category, expected_category);
329            assert_eq!(outcome.quantity, expected_quantity);
330        }
331    }
332}