relay_server/services/projects/cache/
project.rs

1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_dynamic_config::Feature;
5use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
6use relay_sampling::evaluation::ReservoirCounters;
7
8use crate::envelope::ItemType;
9use crate::services::outcome::{DiscardReason, Outcome};
10use crate::services::projects::cache::state::SharedProject;
11use crate::services::projects::project::ProjectState;
12use crate::statsd::RelayTimers;
13use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter, ManagedEnvelope};
14
15/// A loaded project.
16pub struct Project<'a> {
17    shared: SharedProject,
18    config: &'a Config,
19}
20
21impl<'a> Project<'a> {
22    pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
23        Self { shared, config }
24    }
25
26    /// Returns a reference to the currently cached project state.
27    pub fn state(&self) -> &ProjectState {
28        self.shared.project_state()
29    }
30
31    /// Returns a reference to the currently cached rate limits.
32    pub fn rate_limits(&self) -> &CachedRateLimits {
33        self.shared.cached_rate_limits()
34    }
35
36    /// Returns a reference to the currently reservoir counters.
37    pub fn reservoir_counters(&self) -> &ReservoirCounters {
38        self.shared.reservoir_counters()
39    }
40
41    /// Checks the envelope against project configuration and rate limits.
42    ///
43    /// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated
44    /// project state may be used, or otherwise the envelope is passed through unaltered.
45    ///
46    /// To check the envelope, this runs:
47    ///  - Validate origins and public keys
48    ///  - Quotas with a limit of `0`
49    ///  - Cached rate limits
50    pub async fn check_envelope(
51        &self,
52        mut envelope: ManagedEnvelope,
53    ) -> Result<CheckedEnvelope, DiscardReason> {
54        let state = match self.state() {
55            ProjectState::Enabled(state) => Some(Arc::clone(state)),
56            ProjectState::Disabled => {
57                // TODO(jjbayer): We should refactor this function to either return a Result or
58                // handle envelope rejections internally, but not both.
59                envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));
60                return Err(DiscardReason::ProjectId);
61            }
62            ProjectState::Pending => None,
63        };
64
65        let mut scoping = envelope.scoping();
66
67        if let Some(ref state) = state {
68            scoping = state.scope_request(envelope.envelope().meta());
69            envelope.scope(scoping);
70
71            if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) {
72                envelope.reject(Outcome::Invalid(reason));
73                return Err(reason);
74            }
75        }
76
77        let current_limits = self.rate_limits().current_limits();
78
79        let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
80        let current_limits_clone = current_limits.clone();
81        let envelope_limiter =
82            EnvelopeLimiter::new(CheckLimits::NonIndexed, move |item_scoping, _| {
83                let current_limits_clone = current_limits_clone.clone();
84
85                async move { Ok(current_limits_clone.check_with_quotas(quotas, item_scoping)) }
86            });
87
88        let (mut enforcement, mut rate_limits) = envelope_limiter
89            .compute(envelope.envelope_mut(), &scoping)
90            .await?;
91
92        let check_nested_spans = state
93            .as_ref()
94            .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent));
95
96        // If we can extract spans from the event, we want to try and count the number of nested
97        // spans to correctly emit negative outcomes in case the transaction itself is dropped.
98        if check_nested_spans {
99            relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), {
100                sync_spans_to_enforcement(&envelope, &mut enforcement);
101            });
102        }
103
104        enforcement.apply_with_outcomes(&mut envelope);
105
106        envelope.update();
107
108        // Special case: Expose active rate limits for all metric namespaces if there is at least
109        // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary
110        // because `EnvelopeLimiter` cannot not check metrics without parsing item contents.
111        if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
112            let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
113            metrics_scoping.namespace = MetricNamespaceScoping::Any;
114            rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
115        }
116
117        let envelope = if envelope.envelope().is_empty() {
118            // Individual rate limits have already been issued above
119            envelope.reject(Outcome::RateLimited(None));
120            None
121        } else {
122            Some(envelope)
123        };
124
125        Ok(CheckedEnvelope {
126            envelope,
127            rate_limits,
128        })
129    }
130}
131
132/// A checked envelope and associated rate limits.
133///
134/// Items violating the rate limits have been removed from the envelope. If all items are removed
135/// from the envelope, `None` is returned in place of the envelope.
136#[derive(Debug)]
137pub struct CheckedEnvelope {
138    pub envelope: Option<ManagedEnvelope>,
139    pub rate_limits: RateLimits,
140}
141
142/// Adds category limits for the nested spans inside a transaction.
143///
144/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted
145/// as top-level spans, thus if we limited a transaction, we want to count and emit negative
146/// outcomes for each of the spans nested inside that transaction.
147fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
148    if !enforcement.is_event_active() {
149        return;
150    }
151
152    let spans_count = count_nested_spans(envelope);
153    if spans_count == 0 {
154        return;
155    }
156
157    if enforcement.event.is_active() {
158        enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count);
159    }
160
161    if enforcement.event_indexed.is_active() {
162        enforcement.spans_indexed = enforcement
163            .event_indexed
164            .clone_for(DataCategory::SpanIndexed, spans_count);
165    }
166}
167
168/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope).
169fn count_nested_spans(envelope: &ManagedEnvelope) -> usize {
170    #[derive(Debug, serde::Deserialize)]
171    struct PartialEvent {
172        spans: crate::utils::SeqCount,
173    }
174
175    envelope
176        .envelope()
177        .items()
178        .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
179        .and_then(|item| serde_json::from_slice::<PartialEvent>(&item.payload()).ok())
180        // We do + 1, since we count the transaction itself because it will be extracted
181        // as a span and counted during the slow path of rate limiting.
182        .map_or(0, |event| event.spans.0 + 1)
183}
184
185#[cfg(test)]
186mod tests {
187    use crate::envelope::{ContentType, Envelope, Item};
188    use crate::extractors::RequestMeta;
189    use crate::services::processor::ProcessingGroup;
190    use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
191    use relay_base_schema::project::{ProjectId, ProjectKey};
192    use relay_event_schema::protocol::EventId;
193    use serde_json::json;
194    use smallvec::smallvec;
195
196    use super::*;
197
198    fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
199        let mut project_info = ProjectInfo {
200            project_id: Some(ProjectId::new(42)),
201            ..Default::default()
202        };
203        project_info.public_keys = smallvec![PublicKeyConfig {
204            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
205            numeric_id: None,
206        }];
207
208        if let Some(data) = data {
209            project_info.config = serde_json::from_value(data).unwrap();
210        }
211
212        Project::new(
213            SharedProject::for_test(ProjectState::Enabled(project_info.into())),
214            config,
215        )
216    }
217
218    fn request_meta() -> RequestMeta {
219        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
220            .parse()
221            .unwrap();
222
223        RequestMeta::new(dsn)
224    }
225
226    #[tokio::test]
227    async fn test_track_nested_spans_outcomes() {
228        let config = Default::default();
229        let project = create_project(
230            &config,
231            Some(json!({
232                "features": [
233                    "organizations:indexed-spans-extraction"
234                ],
235                "quotas": [{
236                   "id": "foo",
237                   "categories": ["transaction"],
238                   "window": 3600,
239                   "limit": 0,
240                   "reasonCode": "foo",
241               }]
242            })),
243        );
244
245        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
246
247        let mut transaction = Item::new(ItemType::Transaction);
248        transaction.set_payload(
249            ContentType::Json,
250            r#"{
251  "event_id": "52df9022835246eeb317dbd739ccd059",
252  "type": "transaction",
253  "transaction": "I have a stale timestamp, but I'm recent!",
254  "start_timestamp": 1,
255  "timestamp": 2,
256  "contexts": {
257    "trace": {
258      "trace_id": "ff62a8b040f340bda5d830223def1d81",
259      "span_id": "bd429c44b67a3eb4"
260    }
261  },
262  "spans": [
263    {
264      "span_id": "bd429c44b67a3eb4",
265      "start_timestamp": 1,
266      "timestamp": null,
267      "trace_id": "ff62a8b040f340bda5d830223def1d81"
268    },
269    {
270      "span_id": "bd429c44b67a3eb5",
271      "start_timestamp": 1,
272      "timestamp": null,
273      "trace_id": "ff62a8b040f340bda5d830223def1d81"
274    }
275  ]
276}"#,
277        );
278
279        envelope.add_item(transaction);
280
281        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
282        let (test_store, _) = relay_system::Addr::custom();
283
284        let managed_envelope = ManagedEnvelope::new(
285            envelope,
286            outcome_aggregator.clone(),
287            test_store,
288            ProcessingGroup::Transaction,
289        );
290
291        project.check_envelope(managed_envelope).await.unwrap();
292        drop(outcome_aggregator);
293
294        let expected = [
295            (DataCategory::Transaction, 1),
296            (DataCategory::TransactionIndexed, 1),
297            (DataCategory::Span, 3),
298            (DataCategory::SpanIndexed, 3),
299        ];
300
301        for (expected_category, expected_quantity) in expected {
302            let outcome = outcome_aggregator_rx.recv().await.unwrap();
303            assert_eq!(outcome.category, expected_category);
304            assert_eq!(outcome.quantity, expected_quantity);
305        }
306    }
307}