relay_server/services/projects/cache/
project.rs

1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_dynamic_config::Feature;
5use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
6use relay_sampling::evaluation::ReservoirCounters;
7
8use crate::envelope::ItemType;
9use crate::managed::ManagedEnvelope;
10use crate::services::outcome::{DiscardReason, Outcome};
11use crate::services::projects::cache::state::SharedProject;
12use crate::services::projects::project::ProjectState;
13use crate::statsd::RelayTimers;
14use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter};
15
16/// A loaded project.
17pub struct Project<'a> {
18    shared: SharedProject,
19    config: &'a Config,
20}
21
22impl<'a> Project<'a> {
23    pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
24        Self { shared, config }
25    }
26
27    /// Returns a reference to the currently cached project state.
28    pub fn state(&self) -> &ProjectState {
29        self.shared.project_state()
30    }
31
32    /// Returns a reference to the currently cached rate limits.
33    pub fn rate_limits(&self) -> &CachedRateLimits {
34        self.shared.cached_rate_limits()
35    }
36
37    /// Returns a reference to the currently reservoir counters.
38    pub fn reservoir_counters(&self) -> &ReservoirCounters {
39        self.shared.reservoir_counters()
40    }
41
42    /// Checks the envelope against project configuration and rate limits.
43    ///
44    /// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated
45    /// project state may be used, or otherwise the envelope is passed through unaltered.
46    ///
47    /// To check the envelope, this runs:
48    ///  - Validate origins and public keys
49    ///  - Quotas with a limit of `0`
50    ///  - Cached rate limits
51    pub async fn check_envelope(
52        &self,
53        mut envelope: ManagedEnvelope,
54    ) -> Result<CheckedEnvelope, DiscardReason> {
55        let state = match self.state() {
56            ProjectState::Enabled(state) => Some(Arc::clone(state)),
57            ProjectState::Disabled => {
58                // TODO(jjbayer): We should refactor this function to either return a Result or
59                // handle envelope rejections internally, but not both.
60                envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));
61                return Err(DiscardReason::ProjectId);
62            }
63            ProjectState::Pending => None,
64        };
65
66        let mut scoping = envelope.scoping();
67
68        if let Some(ref state) = state {
69            scoping = state.scope_request(envelope.envelope().meta());
70            envelope.scope(scoping);
71
72            if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) {
73                envelope.reject(Outcome::Invalid(reason));
74                return Err(reason);
75            }
76        }
77
78        let current_limits = self.rate_limits().current_limits();
79
80        let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
81        let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| {
82            let current_limits = Arc::clone(&current_limits);
83            async move { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }
84        });
85
86        let (mut enforcement, mut rate_limits) = envelope_limiter
87            .compute(envelope.envelope_mut(), &scoping)
88            .await?;
89
90        let check_nested_spans = state
91            .as_ref()
92            .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent));
93
94        // If we can extract spans from the event, we want to try and count the number of nested
95        // spans to correctly emit negative outcomes in case the transaction itself is dropped.
96        if check_nested_spans {
97            relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), {
98                sync_spans_to_enforcement(&envelope, &mut enforcement);
99            });
100        }
101
102        enforcement.apply_with_outcomes(&mut envelope);
103
104        envelope.update();
105
106        // Special case: Expose active rate limits for all metric namespaces if there is at least
107        // one metrics item in the Envelope to communicate backoff to SDKs. This is necessary
108        // because `EnvelopeLimiter` cannot not check metrics without parsing item contents.
109        if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
110            let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
111            metrics_scoping.namespace = MetricNamespaceScoping::Any;
112            rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
113        }
114
115        let envelope = if envelope.envelope().is_empty() {
116            // Individual rate limits have already been issued above
117            envelope.reject(Outcome::RateLimited(None));
118            None
119        } else {
120            Some(envelope)
121        };
122
123        Ok(CheckedEnvelope {
124            envelope,
125            rate_limits,
126        })
127    }
128}
129
130/// A checked envelope and associated rate limits.
131///
132/// Items violating the rate limits have been removed from the envelope. If all items are removed
133/// from the envelope, `None` is returned in place of the envelope.
134#[derive(Debug)]
135pub struct CheckedEnvelope {
136    pub envelope: Option<ManagedEnvelope>,
137    pub rate_limits: RateLimits,
138}
139
140/// Adds category limits for the nested spans inside a transaction.
141///
142/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted
143/// as top-level spans, thus if we limited a transaction, we want to count and emit negative
144/// outcomes for each of the spans nested inside that transaction.
145fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
146    if !enforcement.is_event_active() {
147        return;
148    }
149
150    let spans_count = count_nested_spans(envelope);
151    if spans_count == 0 {
152        return;
153    }
154
155    if enforcement.event.is_active() {
156        enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count);
157    }
158
159    if enforcement.event_indexed.is_active() {
160        enforcement.spans_indexed = enforcement
161            .event_indexed
162            .clone_for(DataCategory::SpanIndexed, spans_count);
163    }
164}
165
166/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope).
167fn count_nested_spans(envelope: &ManagedEnvelope) -> usize {
168    #[derive(Debug, serde::Deserialize)]
169    struct PartialEvent {
170        spans: crate::utils::SeqCount,
171    }
172
173    envelope
174        .envelope()
175        .items()
176        .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
177        .and_then(|item| serde_json::from_slice::<PartialEvent>(&item.payload()).ok())
178        // We do + 1, since we count the transaction itself because it will be extracted
179        // as a span and counted during the slow path of rate limiting.
180        .map_or(0, |event| event.spans.0 + 1)
181}
182
183#[cfg(test)]
184mod tests {
185    use crate::envelope::{ContentType, Envelope, Item};
186    use crate::extractors::RequestMeta;
187    use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
188    use relay_base_schema::project::{ProjectId, ProjectKey};
189    use relay_event_schema::protocol::EventId;
190    use serde_json::json;
191    use smallvec::smallvec;
192
193    use super::*;
194
195    fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
196        let mut project_info = ProjectInfo {
197            project_id: Some(ProjectId::new(42)),
198            ..Default::default()
199        };
200        project_info.public_keys = smallvec![PublicKeyConfig {
201            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
202            numeric_id: None,
203        }];
204
205        if let Some(data) = data {
206            project_info.config = serde_json::from_value(data).unwrap();
207        }
208
209        Project::new(
210            SharedProject::for_test(ProjectState::Enabled(project_info.into())),
211            config,
212        )
213    }
214
215    fn request_meta() -> RequestMeta {
216        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
217            .parse()
218            .unwrap();
219
220        RequestMeta::new(dsn)
221    }
222
223    #[tokio::test]
224    async fn test_track_nested_spans_outcomes() {
225        let config = Default::default();
226        let project = create_project(
227            &config,
228            Some(json!({
229                "features": [
230                    "organizations:indexed-spans-extraction"
231                ],
232                "quotas": [{
233                   "id": "foo",
234                   "categories": ["transaction"],
235                   "window": 3600,
236                   "limit": 0,
237                   "reasonCode": "foo",
238               }]
239            })),
240        );
241
242        let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
243
244        let mut transaction = Item::new(ItemType::Transaction);
245        transaction.set_payload(
246            ContentType::Json,
247            r#"{
248  "event_id": "52df9022835246eeb317dbd739ccd059",
249  "type": "transaction",
250  "transaction": "I have a stale timestamp, but I'm recent!",
251  "start_timestamp": 1,
252  "timestamp": 2,
253  "contexts": {
254    "trace": {
255      "trace_id": "ff62a8b040f340bda5d830223def1d81",
256      "span_id": "bd429c44b67a3eb4"
257    }
258  },
259  "spans": [
260    {
261      "span_id": "bd429c44b67a3eb4",
262      "start_timestamp": 1,
263      "timestamp": null,
264      "trace_id": "ff62a8b040f340bda5d830223def1d81"
265    },
266    {
267      "span_id": "bd429c44b67a3eb5",
268      "start_timestamp": 1,
269      "timestamp": null,
270      "trace_id": "ff62a8b040f340bda5d830223def1d81"
271    }
272  ]
273}"#,
274        );
275
276        envelope.add_item(transaction);
277
278        let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
279
280        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator.clone());
281
282        project.check_envelope(managed_envelope).await.unwrap();
283        drop(outcome_aggregator);
284
285        let expected = [
286            (DataCategory::Transaction, 1),
287            (DataCategory::TransactionIndexed, 1),
288            (DataCategory::Span, 3),
289            (DataCategory::SpanIndexed, 3),
290        ];
291
292        for (expected_category, expected_quantity) in expected {
293            let outcome = outcome_aggregator_rx.recv().await.unwrap();
294            assert_eq!(outcome.category, expected_category);
295            assert_eq!(outcome.quantity, expected_quantity);
296        }
297    }
298}