Skip to main content

relay_server/services/projects/cache/
project.rs

1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
5
6use crate::Envelope;
7use crate::envelope::ItemType;
8use crate::managed::{Managed, Rejected};
9use crate::services::outcome::{DiscardReason, Outcome};
10use crate::services::projects::cache::state::SharedProject;
11use crate::services::projects::project::ProjectState;
12use crate::utils::{CheckLimits, EnvelopeLimiter};
13
14/// A loaded project.
15pub struct Project<'a> {
16    shared: SharedProject,
17    config: &'a Config,
18}
19
20impl<'a> Project<'a> {
21    pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
22        Self { shared, config }
23    }
24
25    /// Returns a reference to the currently cached project state.
26    pub fn state(&self) -> &ProjectState {
27        self.shared.project_state()
28    }
29
30    /// Returns a reference to the currently cached rate limits.
31    pub fn rate_limits(&self) -> &CachedRateLimits {
32        self.shared.cached_rate_limits()
33    }
34
35    /// Checks the envelope against project configuration and rate limits.
36    ///
37    /// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated
38    /// project state may be used, or otherwise the envelope is passed through unaltered.
39    ///
40    /// To check the envelope, this runs:
41    ///  - Validate origins and public keys
42    ///  - Quotas with a limit of `0`
43    ///  - Cached rate limits
44    pub async fn check_envelope(
45        &self,
46        envelope: &mut Managed<Box<Envelope>>,
47    ) -> Result<RateLimits, Rejected<DiscardReason>> {
48        let state = match self.state() {
49            ProjectState::Enabled(state) => Some(Arc::clone(state)),
50            ProjectState::Dummy => None,
51            ProjectState::Disabled => {
52                // TODO(jjbayer): We should refactor this function to either return a Result or
53                // handle envelope rejections internally, but not both.
54                let err = envelope
55                    .reject_err(Outcome::Invalid(DiscardReason::ProjectId))
56                    .map(|_| DiscardReason::ProjectId);
57                return Err(err);
58            }
59            ProjectState::Pending => None,
60        };
61
62        let mut scoping = envelope.scoping();
63
64        if let Some(ref state) = state {
65            scoping = state.scope_request(envelope.meta());
66            envelope.scope(scoping);
67
68            if let Err(reason) = state.check_envelope(envelope, self.config) {
69                return Err(envelope
70                    .reject_err(Outcome::Invalid(reason))
71                    .map(|_| reason));
72            }
73        }
74
75        let current_limits = self.rate_limits().current_limits();
76
77        let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
78
79        // To get the correct span outcomes, we have to partially parse the event payload
80        // and count the spans contained in the transaction events.
81        // For performance reasons, we only do this if there is an active limit on `Transaction`.
82        if current_limits
83            .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Transaction)])
84        {
85            ensure_span_count(envelope);
86        }
87
88        let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
89            let current_limits = Arc::clone(&current_limits);
90            async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
91        });
92
93        let (enforcement, mut rate_limits) = envelope_limiter.compute(envelope, &scoping).await?;
94
95        enforcement.apply_to_managed(envelope);
96
97        // Special case: Expose active rate limits for all metric namespaces if there is at least
98        // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary
99        // because `EnvelopeLimiter` cannot not check metrics without parsing item contents.
100        if envelope.items().any(|i| i.ty().is_metrics()) {
101            let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
102            metrics_scoping.namespace = MetricNamespaceScoping::Any;
103            rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
104        }
105
106        Ok(rate_limits)
107    }
108}
109
110fn ensure_span_count(envelope: &mut Managed<Box<Envelope>>) {
111    envelope.modify(|envelope, records| {
112        if let Some(transaction_item) = envelope
113            .items_mut()
114            .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
115        {
116            // We're actively 'correcting' span counts -> there will be differences.
117            records.lenient(DataCategory::Span);
118            records.lenient(DataCategory::SpanIndexed);
119            transaction_item.ensure_span_count();
120        }
121    });
122}
123
124#[cfg(test)]
125mod tests {
126    use crate::envelope::{ContentType, Envelope, Item};
127    use crate::extractors::RequestMeta;
128    use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
129    use relay_base_schema::project::{ProjectId, ProjectKey};
130    use relay_event_schema::protocol::EventId;
131    use serde_json::json;
132    use smallvec::smallvec;
133
134    use super::*;
135
136    fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
137        let mut project_info = ProjectInfo {
138            project_id: Some(ProjectId::new(42)),
139            ..Default::default()
140        };
141        project_info.public_keys = smallvec![PublicKeyConfig {
142            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
143            numeric_id: None,
144        }];
145
146        if let Some(data) = data {
147            project_info.config = serde_json::from_value(data).unwrap();
148        }
149
150        Project::new(
151            SharedProject::for_test(ProjectState::Enabled(project_info.into())),
152            config,
153        )
154    }
155
156    fn request_meta() -> RequestMeta {
157        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
158            .parse()
159            .unwrap();
160
161        RequestMeta::new(dsn)
162    }
163
164    fn get_span_count(envelope: &Envelope) -> usize {
165        envelope.items().next().unwrap().span_count() as usize
166    }
167
168    #[tokio::test]
169    async fn test_track_nested_spans_outcomes() {
170        let config = Default::default();
171        let project = create_project(
172            &config,
173            Some(json!({
174                "quotas": [{
175                   "id": "foo",
176                   "categories": ["transaction"],
177                   "window": 3600,
178                   "limit": 0,
179                   "reasonCode": "foo",
180               }]
181            })),
182        );
183
184        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
185
186        let mut transaction = Item::new(ItemType::Transaction);
187        transaction.set_payload(
188            ContentType::Json,
189            r#"{
190  "event_id": "52df9022835246eeb317dbd739ccd059",
191  "type": "transaction",
192  "transaction": "I have a stale timestamp, but I'm recent!",
193  "start_timestamp": 1,
194  "timestamp": 2,
195  "contexts": {
196    "trace": {
197      "trace_id": "ff62a8b040f340bda5d830223def1d81",
198      "span_id": "bd429c44b67a3eb4"
199    }
200  },
201  "spans": [
202    {
203      "span_id": "bd429c44b67a3eb4",
204      "start_timestamp": 1,
205      "timestamp": null,
206      "trace_id": "ff62a8b040f340bda5d830223def1d81"
207    },
208    {
209      "span_id": "bd429c44b67a3eb5",
210      "start_timestamp": 1,
211      "timestamp": null,
212      "trace_id": "ff62a8b040f340bda5d830223def1d81"
213    }
214  ]
215}"#,
216        );
217
218        envelope.add_item(transaction);
219
220        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
221
222        let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator);
223
224        assert_eq!(get_span_count(&managed_envelope), 0); // not written yet
225        project.check_envelope(&mut managed_envelope).await.unwrap();
226
227        let expected = [
228            (DataCategory::Transaction, 1),
229            (DataCategory::TransactionIndexed, 1),
230            (DataCategory::Span, 3),
231            (DataCategory::SpanIndexed, 3),
232        ];
233
234        for (expected_category, expected_quantity) in expected {
235            let outcome = outcome_aggregator_rx.recv().await.unwrap();
236            assert_eq!(outcome.category, expected_category);
237            assert_eq!(outcome.quantity, expected_quantity);
238        }
239    }
240
241    #[tokio::test]
242    async fn test_track_nested_spans_outcomes_predefined() {
243        let config = Default::default();
244        let project = create_project(
245            &config,
246            Some(json!({
247                "quotas": [{
248                   "id": "foo",
249                   "categories": ["transaction"],
250                   "window": 3600,
251                   "limit": 0,
252                   "reasonCode": "foo",
253               }]
254            })),
255        );
256
257        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
258
259        let mut transaction = Item::new(ItemType::Transaction);
260        transaction.set_span_count(Some(666));
261        transaction.set_payload(
262            ContentType::Json,
263            r#"{
264  "event_id": "52df9022835246eeb317dbd739ccd059",
265  "type": "transaction",
266  "transaction": "I have a stale timestamp, but I'm recent!",
267  "start_timestamp": 1,
268  "timestamp": 2,
269  "contexts": {
270    "trace": {
271      "trace_id": "ff62a8b040f340bda5d830223def1d81",
272      "span_id": "bd429c44b67a3eb4"
273    }
274  },
275  "spans": []
276}"#,
277        );
278
279        envelope.add_item(transaction);
280
281        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
282
283        let mut managed_envelope = Managed::from_envelope(envelope, outcome_aggregator);
284
285        assert_eq!(get_span_count(&managed_envelope), 666);
286        project.check_envelope(&mut managed_envelope).await.unwrap();
287
288        let expected = [
289            (DataCategory::Transaction, 1),
290            (DataCategory::TransactionIndexed, 1),
291            (DataCategory::Span, 667),
292            (DataCategory::SpanIndexed, 667),
293        ];
294
295        for (expected_category, expected_quantity) in expected {
296            let outcome = outcome_aggregator_rx.recv().await.unwrap();
297            assert_eq!(outcome.category, expected_category);
298            assert_eq!(outcome.quantity, expected_quantity);
299        }
300    }
301}