relay_server/services/projects/cache/
project.rs1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_dynamic_config::Feature;
5use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits};
6use relay_sampling::evaluation::ReservoirCounters;
7
8use crate::envelope::ItemType;
9use crate::services::outcome::{DiscardReason, Outcome};
10use crate::services::projects::cache::state::SharedProject;
11use crate::services::projects::project::ProjectState;
12use crate::statsd::RelayTimers;
13use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter, ManagedEnvelope};
14
15pub struct Project<'a> {
17 shared: SharedProject,
18 config: &'a Config,
19}
20
21impl<'a> Project<'a> {
22 pub(crate) fn new(shared: SharedProject, config: &'a Config) -> Self {
23 Self { shared, config }
24 }
25
26 pub fn state(&self) -> &ProjectState {
28 self.shared.project_state()
29 }
30
31 pub fn rate_limits(&self) -> &CachedRateLimits {
33 self.shared.cached_rate_limits()
34 }
35
36 pub fn reservoir_counters(&self) -> &ReservoirCounters {
38 self.shared.reservoir_counters()
39 }
40
41 pub async fn check_envelope(
51 &self,
52 mut envelope: ManagedEnvelope,
53 ) -> Result<CheckedEnvelope, DiscardReason> {
54 let state = match self.state() {
55 ProjectState::Enabled(state) => Some(Arc::clone(state)),
56 ProjectState::Disabled => {
57 envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));
60 return Err(DiscardReason::ProjectId);
61 }
62 ProjectState::Pending => None,
63 };
64
65 let mut scoping = envelope.scoping();
66
67 if let Some(ref state) = state {
68 scoping = state.scope_request(envelope.envelope().meta());
69 envelope.scope(scoping);
70
71 if let Err(reason) = state.check_envelope(envelope.envelope(), self.config) {
72 envelope.reject(Outcome::Invalid(reason));
73 return Err(reason);
74 }
75 }
76
77 let current_limits = self.rate_limits().current_limits();
78
79 let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]);
80 let current_limits_clone = current_limits.clone();
81 let envelope_limiter =
82 EnvelopeLimiter::new(CheckLimits::NonIndexed, move |item_scoping, _| {
83 let current_limits_clone = current_limits_clone.clone();
84
85 async move { Ok(current_limits_clone.check_with_quotas(quotas, item_scoping)) }
86 });
87
88 let (mut enforcement, mut rate_limits) = envelope_limiter
89 .compute(envelope.envelope_mut(), &scoping)
90 .await?;
91
92 let check_nested_spans = state
93 .as_ref()
94 .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent));
95
96 if check_nested_spans {
99 relay_statsd::metric!(timer(RelayTimers::CheckNestedSpans), {
100 sync_spans_to_enforcement(&envelope, &mut enforcement);
101 });
102 }
103
104 enforcement.apply_with_outcomes(&mut envelope);
105
106 envelope.update();
107
108 if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
112 let mut metrics_scoping = scoping.item(DataCategory::MetricBucket);
113 metrics_scoping.namespace = MetricNamespaceScoping::Any;
114 rate_limits.merge(current_limits.check_with_quotas(quotas, metrics_scoping));
115 }
116
117 let envelope = if envelope.envelope().is_empty() {
118 envelope.reject(Outcome::RateLimited(None));
120 None
121 } else {
122 Some(envelope)
123 };
124
125 Ok(CheckedEnvelope {
126 envelope,
127 rate_limits,
128 })
129 }
130}
131
132#[derive(Debug)]
137pub struct CheckedEnvelope {
138 pub envelope: Option<ManagedEnvelope>,
139 pub rate_limits: RateLimits,
140}
141
142fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
148 if !enforcement.is_event_active() {
149 return;
150 }
151
152 let spans_count = count_nested_spans(envelope);
153 if spans_count == 0 {
154 return;
155 }
156
157 if enforcement.event.is_active() {
158 enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count);
159 }
160
161 if enforcement.event_indexed.is_active() {
162 enforcement.spans_indexed = enforcement
163 .event_indexed
164 .clone_for(DataCategory::SpanIndexed, spans_count);
165 }
166}
167
168fn count_nested_spans(envelope: &ManagedEnvelope) -> usize {
170 #[derive(Debug, serde::Deserialize)]
171 struct PartialEvent {
172 spans: crate::utils::SeqCount,
173 }
174
175 envelope
176 .envelope()
177 .items()
178 .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
179 .and_then(|item| serde_json::from_slice::<PartialEvent>(&item.payload()).ok())
180 .map_or(0, |event| event.spans.0 + 1)
183}
184
185#[cfg(test)]
186mod tests {
187 use crate::envelope::{ContentType, Envelope, Item};
188 use crate::extractors::RequestMeta;
189 use crate::services::processor::ProcessingGroup;
190 use crate::services::projects::project::{ProjectInfo, PublicKeyConfig};
191 use relay_base_schema::project::{ProjectId, ProjectKey};
192 use relay_event_schema::protocol::EventId;
193 use serde_json::json;
194 use smallvec::smallvec;
195
196 use super::*;
197
198 fn create_project(config: &Config, data: Option<serde_json::Value>) -> Project<'_> {
199 let mut project_info = ProjectInfo {
200 project_id: Some(ProjectId::new(42)),
201 ..Default::default()
202 };
203 project_info.public_keys = smallvec![PublicKeyConfig {
204 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
205 numeric_id: None,
206 }];
207
208 if let Some(data) = data {
209 project_info.config = serde_json::from_value(data).unwrap();
210 }
211
212 Project::new(
213 SharedProject::for_test(ProjectState::Enabled(project_info.into())),
214 config,
215 )
216 }
217
218 fn request_meta() -> RequestMeta {
219 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
220 .parse()
221 .unwrap();
222
223 RequestMeta::new(dsn)
224 }
225
226 #[tokio::test]
227 async fn test_track_nested_spans_outcomes() {
228 let config = Default::default();
229 let project = create_project(
230 &config,
231 Some(json!({
232 "features": [
233 "organizations:indexed-spans-extraction"
234 ],
235 "quotas": [{
236 "id": "foo",
237 "categories": ["transaction"],
238 "window": 3600,
239 "limit": 0,
240 "reasonCode": "foo",
241 }]
242 })),
243 );
244
245 let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());
246
247 let mut transaction = Item::new(ItemType::Transaction);
248 transaction.set_payload(
249 ContentType::Json,
250 r#"{
251 "event_id": "52df9022835246eeb317dbd739ccd059",
252 "type": "transaction",
253 "transaction": "I have a stale timestamp, but I'm recent!",
254 "start_timestamp": 1,
255 "timestamp": 2,
256 "contexts": {
257 "trace": {
258 "trace_id": "ff62a8b040f340bda5d830223def1d81",
259 "span_id": "bd429c44b67a3eb4"
260 }
261 },
262 "spans": [
263 {
264 "span_id": "bd429c44b67a3eb4",
265 "start_timestamp": 1,
266 "timestamp": null,
267 "trace_id": "ff62a8b040f340bda5d830223def1d81"
268 },
269 {
270 "span_id": "bd429c44b67a3eb5",
271 "start_timestamp": 1,
272 "timestamp": null,
273 "trace_id": "ff62a8b040f340bda5d830223def1d81"
274 }
275 ]
276}"#,
277 );
278
279 envelope.add_item(transaction);
280
281 let (outcome_aggregator, mut outcome_aggregator_rx) = relay_system::Addr::custom();
282 let (test_store, _) = relay_system::Addr::custom();
283
284 let managed_envelope = ManagedEnvelope::new(
285 envelope,
286 outcome_aggregator.clone(),
287 test_store,
288 ProcessingGroup::Transaction,
289 );
290
291 project.check_envelope(managed_envelope).await.unwrap();
292 drop(outcome_aggregator);
293
294 let expected = [
295 (DataCategory::Transaction, 1),
296 (DataCategory::TransactionIndexed, 1),
297 (DataCategory::Span, 3),
298 (DataCategory::SpanIndexed, 3),
299 ];
300
301 for (expected_category, expected_quantity) in expected {
302 let outcome = outcome_aggregator_rx.recv().await.unwrap();
303 assert_eq!(outcome.category, expected_category);
304 assert_eq!(outcome.quantity, expected_quantity);
305 }
306 }
307}