relay_server/services/projects/cache/
project.rs

1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
5use relay_sampling::evaluation::ReservoirCounters;
6
7use crate::Envelope;
8use crate::envelope::ItemType;
9use crate::managed::{Managed, Rejected};
10use crate::services::outcome::{DiscardReason, Outcome};
11use crate::services::projects::cache::state::SharedProject;
12use crate::services::projects::project::ProjectState;
13use crate::utils::{CheckLimits, EnvelopeLimiter};
14
15/// A loaded project.
16pub struct Project<'a> {
17    shared: SharedProject,
18    config: &'a Config,
19}
20
21impl<'a> Project<'a> {
22    pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
23        Self { shared, config }
24    }
25
26    /// Returns a reference to the currently cached project state.
27    pub fn state(&self) -> &ProjectState {
28        self.shared.project_state()
29    }
30
31    /// Returns a reference to the currently cached rate limits.
32    pub fn rate_limits(&self) -> &CachedRateLimits {
33        self.shared.cached_rate_limits()
34    }
35
36    /// Returns a reference to the currently reservoir counters.
37    pub fn reservoir_counters(&self) -> &ReservoirCounters {
38        self.shared.reservoir_counters()
39    }
40
41    /// Checks the envelope against project configuration and rate limits.
42    ///
43    /// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated
44    /// project state may be used, or otherwise the envelope is passed through unaltered.
45    ///
46    /// To check the envelope, this runs:
47    ///  - Validate origins and public keys
48    ///  - Quotas with a limit of `0`
49    ///  - Cached rate limits
50    pub async fn check_envelope(
51        &self,
52        envelope: &mut Managed<Box<Envelope>>,
53    ) -> Result<RateLimits, Rejected<DiscardReason>> {
54        let state = match self.state() {
55            ProjectState::Enabled(state) => Some(Arc::clone(state)),
56            ProjectState::Disabled => {
57                // TODO(jjbayer): We should refactor this function to either return a Result or
58                // handle envelope rejections internally, but not both.
59                let err = envelope
60                    .reject_err(Outcome::Invalid(DiscardReason::ProjectId))
61                    .map(|_| DiscardReason::ProjectId);
62                return Err(err);
63            }
64            ProjectState::Pending => None,
65        };
66
67        let mut scoping = envelope.scoping();
68
69        if let Some(ref state) = state {
70            scoping = state.scope_request(envelope.meta());
71            envelope.scope(scoping);
72
73            if let Err(reason) = state.check_envelope(envelope, self.config) {
74                return Err(envelope
75                    .reject_err(Outcome::Invalid(reason))
76                    .map(|_| reason));
77            }
78        }
79
80        let current_limits = self.rate_limits().current_limits();
81
82        let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
83
84        // To get the correct span outcomes, we have to partially parse the event payload
85        // and count the spans contained in the transaction events.
86        // For performance reasons, we only do this if there is an active limit on `Transaction`.
87        if current_limits
88            .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Transaction)])
89        {
90            ensure_span_count(envelope);
91        }
92
93        let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
94            let current_limits = Arc::clone(&current_limits);
95            async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
96        });
97
98        let (enforcement, mut rate_limits) = envelope_limiter.compute(envelope, &scoping).await?;
99
100        enforcement.apply_to_managed(envelope);
101
102        // Special case: Expose active rate limits for all metric namespaces if there is at least
103        // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary
104        // because `EnvelopeLimiter` cannot not check metrics without parsing item contents.
105        if envelope.items().any(|i| i.ty().is_metrics()) {
106            let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
107            metrics_scoping.namespace = MetricNamespaceScoping::Any;
108            rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
109        }
110
111        Ok(rate_limits)
112    }
113}
114
115fn ensure_span_count(envelope: &mut Managed<Box<Envelope>>) {
116    envelope.modify(|envelope, records| {
117        if let Some(transaction_item) = envelope
118            .items_mut()
119            .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
120        {
121            // We're actively 'correcting' span counts -> there will be differences.
122            records.lenient(DataCategory::Span);
123            records.lenient(DataCategory::SpanIndexed);
124            transaction_item.ensure_span_count();
125        }
126    });
127}
128
129#[cfg(test)]
130mod tests {
131    use crate::envelope::{ContentType, Envelope, Item};
132    use crate::extractors::RequestMeta;
133    use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
134    use relay_base_schema::project::{ProjectId, ProjectKey};
135    use relay_event_schema::protocol::EventId;
136    use serde_json::json;
137    use smallvec::smallvec;
138
139    use super::*;
140
141    fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
142        let mut project_info = ProjectInfo {
143            project_id: Some(ProjectId::new(42)),
144            ..Default::default()
145        };
146        project_info.public_keys = smallvec![PublicKeyConfig {
147            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
148            numeric_id: None,
149        }];
150
151        if let Some(data) = data {
152            project_info.config = serde_json::from_value(data).unwrap();
153        }
154
155        Project::new(
156            SharedProject::for_test(ProjectState::Enabled(project_info.into())),
157            config,
158        )
159    }
160
161    fn request_meta() -> RequestMeta {
162        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
163            .parse()
164            .unwrap();
165
166        RequestMeta::new(dsn)
167    }
168
169    fn get_span_count(envelope: &Envelope) -> usize {
170        envelope.items().next().unwrap().span_count()
171    }
172
173    #[tokio::test]
174    async fn test_track_nested_spans_outcomes() {
175        let config = Default::default();
176        let project = create_project(
177            &config,
178            Some(json!({
179                "quotas": [{
180                   "id": "foo",
181                   "categories": ["transaction"],
182                   "window": 3600,
183                   "limit": 0,
184                   "reasonCode": "foo",
185               }]
186            })),
187        );
188
189        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
190
191        let mut transaction = Item::new(ItemType::Transaction);
192        transaction.set_payload(
193            ContentType::Json,
194            r#"{
195  "event_id": "52df9022835246eeb317dbd739ccd059",
196  "type": "transaction",
197  "transaction": "I have a stale timestamp, but I'm recent!",
198  "start_timestamp": 1,
199  "timestamp": 2,
200  "contexts": {
201    "trace": {
202      "trace_id": "ff62a8b040f340bda5d830223def1d81",
203      "span_id": "bd429c44b67a3eb4"
204    }
205  },
206  "spans": [
207    {
208      "span_id": "bd429c44b67a3eb4",
209      "start_timestamp": 1,
210      "timestamp": null,
211      "trace_id": "ff62a8b040f340bda5d830223def1d81"
212    },
213    {
214      "span_id": "bd429c44b67a3eb5",
215      "start_timestamp": 1,
216      "timestamp": null,
217      "trace_id": "ff62a8b040f340bda5d830223def1d81"
218    }
219  ]
220}"#,
221        );
222
223        envelope.add_item(transaction);
224
225        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
226
227        let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator);
228
229        assert_eq!(get_span_count(&managed_envelope), 0); // not written yet
230        project.check_envelope(&mut managed_envelope).await.unwrap();
231
232        let expected = [
233            (DataCategory::Transaction, 1),
234            (DataCategory::TransactionIndexed, 1),
235            (DataCategory::Span, 3),
236            (DataCategory::SpanIndexed, 3),
237        ];
238
239        for (expected_category, expected_quantity) in expected {
240            let outcome = outcome_aggregator_rx.recv().await.unwrap();
241            assert_eq!(outcome.category, expected_category);
242            assert_eq!(outcome.quantity, expected_quantity);
243        }
244    }
245
246    #[tokio::test]
247    async fn test_track_nested_spans_outcomes_predefined() {
248        let config = Default::default();
249        let project = create_project(
250            &config,
251            Some(json!({
252                "quotas": [{
253                   "id": "foo",
254                   "categories": ["transaction"],
255                   "window": 3600,
256                   "limit": 0,
257                   "reasonCode": "foo",
258               }]
259            })),
260        );
261
262        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
263
264        let mut transaction = Item::new(ItemType::Transaction);
265        transaction.set_span_count(Some(666));
266        transaction.set_payload(
267            ContentType::Json,
268            r#"{
269  "event_id": "52df9022835246eeb317dbd739ccd059",
270  "type": "transaction",
271  "transaction": "I have a stale timestamp, but I'm recent!",
272  "start_timestamp": 1,
273  "timestamp": 2,
274  "contexts": {
275    "trace": {
276      "trace_id": "ff62a8b040f340bda5d830223def1d81",
277      "span_id": "bd429c44b67a3eb4"
278    }
279  },
280  "spans": []
281}"#,
282        );
283
284        envelope.add_item(transaction);
285
286        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
287
288        let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator);
289
290        assert_eq!(get_span_count(&managed_envelope), 666);
291        project.check_envelope(&mut managed_envelope).await.unwrap();
292
293        let expected = [
294            (DataCategory::Transaction, 1),
295            (DataCategory::TransactionIndexed, 1),
296            (DataCategory::Span, 667),
297            (DataCategory::SpanIndexed, 667),
298        ];
299
300        for (expected_category, expected_quantity) in expected {
301            let outcome = outcome_aggregator_rx.recv().await.unwrap();
302            assert_eq!(outcome.category, expected_category);
303            assert_eq!(outcome.quantity, expected_quantity);
304        }
305    }
306}