relay_server/services/projects/cache/
project.rs

1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_dynamic_config::Feature;
5use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
6use relay_sampling::evaluation::ReservoirCounters;
7
8use crate::envelope::ItemType;
9use crate::managed::ManagedEnvelope;
10use crate::services::outcome::{DiscardReason, Outcome};
11use crate::services::projects::cache::state::SharedProject;
12use crate::services::projects::project::ProjectState;
13use crate::statsd::RelayTimers;
14use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter};
15
16/// A loaded project.
17pub struct Project<'a> {
18    shared: SharedProject,
19    config: &'a Config,
20}
21
22impl<'a> Project<'a> {
23    pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
24        Self { shared, config }
25    }
26
27    /// Returns a reference to the currently cached project state.
28    pub fn state(&self) -> &ProjectState {
29        self.shared.project_state()
30    }
31
32    /// Returns a reference to the currently cached rate limits.
33    pub fn rate_limits(&self) -> &CachedRateLimits {
34        self.shared.cached_rate_limits()
35    }
36
37    /// Returns a reference to the currently reservoir counters.
38    pub fn reservoir_counters(&self) -> &ReservoirCounters {
39        self.shared.reservoir_counters()
40    }
41
42    /// Checks the envelope against project configuration and rate limits.
43    ///
44    /// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated
45    /// project state may be used, or otherwise the envelope is passed through unaltered.
46    ///
47    /// To check the envelope, this runs:
48    ///  - Validate origins and public keys
49    ///  - Quotas with a limit of `0`
50    ///  - Cached rate limits
51    pub async fn check_envelope(
52        &self,
53        mut envelope: ManagedEnvelope,
54    ) -> Result<CheckedEnvelope, DiscardReason> {
55        let state = match self.state() {
56            ProjectState::Enabled(state) => Some(Arc::clone(state)),
57            ProjectState::Disabled => {
58                // TODO(jjbayer): We should refactor this function to either return a Result or
59                // handle envelope rejections internally, but not both.
60                envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));
61                return Err(DiscardReason::ProjectId);
62            }
63            ProjectState::Pending => None,
64        };
65
66        let mut scoping = envelope.scoping();
67
68        if let Some(ref state) = state {
69            scoping = state.scope_request(envelope.envelope().meta());
70            envelope.scope(scoping);
71
72            if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) {
73                envelope.reject(Outcome::Invalid(reason));
74                return Err(reason);
75            }
76        }
77
78        let current_limits = self.rate_limits().current_limits();
79
80        let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
81        let current_limits_clone = current_limits.clone();
82        let envelope_limiter =
83            EnvelopeLimiter::new(CheckLimits::NonIndexed, move |item_scoping, _| {
84                let current_limits_clone = current_limits_clone.clone();
85
86                async move { Ok(current_limits_clone.check_with_quotas(quotas, item_scoping)) }
87            });
88
89        let (mut enforcement, mut rate_limits) = envelope_limiter
90            .compute(envelope.envelope_mut(), &scoping)
91            .await?;
92
93        let check_nested_spans = state
94            .as_ref()
95            .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent));
96
97        // If we can extract spans from the event, we want to try and count the number of nested
98        // spans to correctly emit negative outcomes in case the transaction itself is dropped.
99        if check_nested_spans {
100            relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), {
101                sync_spans_to_enforcement(&envelope, &mut enforcement);
102            });
103        }
104
105        enforcement.apply_with_outcomes(&mut envelope);
106
107        envelope.update();
108
109        // Special case: Expose active rate limits for all metric namespaces if there is at least
110        // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary
111        // because `EnvelopeLimiter` cannot not check metrics without parsing item contents.
112        if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
113            let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
114            metrics_scoping.namespace = MetricNamespaceScoping::Any;
115            rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
116        }
117
118        let envelope = if envelope.envelope().is_empty() {
119            // Individual rate limits have already been issued above
120            envelope.reject(Outcome::RateLimited(None));
121            None
122        } else {
123            Some(envelope)
124        };
125
126        Ok(CheckedEnvelope {
127            envelope,
128            rate_limits,
129        })
130    }
131}
132
133/// A checked envelope and associated rate limits.
134///
135/// Items violating the rate limits have been removed from the envelope. If all items are removed
136/// from the envelope, `None` is returned in place of the envelope.
137#[derive(Debug)]
138pub struct CheckedEnvelope {
139    pub envelope: Option<ManagedEnvelope>,
140    pub rate_limits: RateLimits,
141}
142
143/// Adds category limits for the nested spans inside a transaction.
144///
145/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted
146/// as top-level spans, thus if we limited a transaction, we want to count and emit negative
147/// outcomes for each of the spans nested inside that transaction.
148fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
149    if !enforcement.is_event_active() {
150        return;
151    }
152
153    let spans_count = count_nested_spans(envelope);
154    if spans_count == 0 {
155        return;
156    }
157
158    if enforcement.event.is_active() {
159        enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count);
160    }
161
162    if enforcement.event_indexed.is_active() {
163        enforcement.spans_indexed = enforcement
164            .event_indexed
165            .clone_for(DataCategory::SpanIndexed, spans_count);
166    }
167}
168
169/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope).
170fn count_nested_spans(envelope: &ManagedEnvelope) -> usize {
171    #[derive(Debug, serde::Deserialize)]
172    struct PartialEvent {
173        spans: crate::utils::SeqCount,
174    }
175
176    envelope
177        .envelope()
178        .items()
179        .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
180        .and_then(|item| serde_json::from_slice::<PartialEvent>(&item.payload()).ok())
181        // We do + 1, since we count the transaction itself because it will be extracted
182        // as a span and counted during the slow path of rate limiting.
183        .map_or(0, |event| event.spans.0 + 1)
184}
185
186#[cfg(test)]
187mod tests {
188    use crate::envelope::{ContentType, Envelope, Item};
189    use crate::extractors::RequestMeta;
190    use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
191    use relay_base_schema::project::{ProjectId, ProjectKey};
192    use relay_event_schema::protocol::EventId;
193    use serde_json::json;
194    use smallvec::smallvec;
195
196    use super::*;
197
198    fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
199        let mut project_info = ProjectInfo {
200            project_id: Some(ProjectId::new(42)),
201            ..Default::default()
202        };
203        project_info.public_keys = smallvec![PublicKeyConfig {
204            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
205            numeric_id: None,
206        }];
207
208        if let Some(data) = data {
209            project_info.config = serde_json::from_value(data).unwrap();
210        }
211
212        Project::new(
213            SharedProject::for_test(ProjectState::Enabled(project_info.into())),
214            config,
215        )
216    }
217
218    fn request_meta() -> RequestMeta {
219        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
220            .parse()
221            .unwrap();
222
223        RequestMeta::new(dsn)
224    }
225
226    #[tokio::test]
227    async fn test_track_nested_spans_outcomes() {
228        let config = Default::default();
229        let project = create_project(
230            &config,
231            Some(json!({
232                "features": [
233                    "organizations:indexed-spans-extraction"
234                ],
235                "quotas": [{
236                   "id": "foo",
237                   "categories": ["transaction"],
238                   "window": 3600,
239                   "limit": 0,
240                   "reasonCode": "foo",
241               }]
242            })),
243        );
244
245        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
246
247        let mut transaction = Item::new(ItemType::Transaction);
248        transaction.set_payload(
249            ContentType::Json,
250            r#"{
251  "event_id": "52df9022835246eeb317dbd739ccd059",
252  "type": "transaction",
253  "transaction": "I have a stale timestamp, but I'm recent!",
254  "start_timestamp": 1,
255  "timestamp": 2,
256  "contexts": {
257    "trace": {
258      "trace_id": "ff62a8b040f340bda5d830223def1d81",
259      "span_id": "bd429c44b67a3eb4"
260    }
261  },
262  "spans": [
263    {
264      "span_id": "bd429c44b67a3eb4",
265      "start_timestamp": 1,
266      "timestamp": null,
267      "trace_id": "ff62a8b040f340bda5d830223def1d81"
268    },
269    {
270      "span_id": "bd429c44b67a3eb5",
271      "start_timestamp": 1,
272      "timestamp": null,
273      "trace_id": "ff62a8b040f340bda5d830223def1d81"
274    }
275  ]
276}"#,
277        );
278
279        envelope.add_item(transaction);
280
281        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
282        let (test_store, _) = relay_system::Addr::custom();
283
284        let managed_envelope =
285            ManagedEnvelope::new(envelope, outcome_aggregator.clone(), test_store);
286
287        project.check_envelope(managed_envelope).await.unwrap();
288        drop(outcome_aggregator);
289
290        let expected = [
291            (DataCategory::Transaction, 1),
292            (DataCategory::TransactionIndexed, 1),
293            (DataCategory::Span, 3),
294            (DataCategory::SpanIndexed, 3),
295        ];
296
297        for (expected_category, expected_quantity) in expected {
298            let outcome = outcome_aggregator_rx.recv().await.unwrap();
299            assert_eq!(outcome.category, expected_category);
300            assert_eq!(outcome.quantity, expected_quantity);
301        }
302    }
303}