1use std::fmt::{self, Debug};
2use std::sync::Arc;
3
4use itertools::Itertools;
5use relay_base_schema::metrics::MetricNamespace;
6use relay_base_schema::organization::OrganizationId;
7use relay_common::time::UnixTimestamp;
8use relay_log::protocol::value;
9use relay_redis::redis::{self, FromRedisValue, ParsingError, Script};
10use relay_redis::{AsyncRedisClient, RedisError, RedisScripts};
11use thiserror::Error;
12
13use crate::cache::OpportunisticQuotaCache;
14use crate::quota::{ItemScoping, Quota, QuotaScope};
15use crate::rate_limit::{RateLimit, RateLimits, RetryAfter};
16use crate::statsd::{QuotaCounters, QuotaTimers};
17use crate::{REJECT_ALL_SECS, cache};
18
19const GRACE: u64 = 60;
23
24#[derive(Debug, Error)]
26#[error("failed to communicate with redis")]
27pub struct RateLimitingError(
28 #[from]
29 #[source]
30 pub RedisError,
31);
32
33fn get_refunded_quota_key(counter_key: &str) -> String {
38 format!("r:{counter_key}")
39}
40
41struct OptionalDisplay<T>(Option<T>);
43
44impl<T> fmt::Display for OptionalDisplay<T>
45where
46 T: fmt::Display,
47{
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self.0 {
50 Some(ref value) => write!(f, "{value}"),
51 None => Ok(()),
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
58pub struct OwnedRedisQuota {
59 quota: Quota,
61 scoping: ItemScoping,
63 prefix: Arc<str>,
65 window: u64,
67 quantity: u64,
69 timestamp: UnixTimestamp,
71}
72
73impl OwnedRedisQuota {
74 pub fn build_ref(&self) -> RedisQuota<'_> {
76 RedisQuota {
77 quota: &self.quota,
78 scoping: self.scoping,
79 prefix: Arc::clone(&self.prefix),
80 window: self.window,
81 quantity: self.quantity,
82 timestamp: self.timestamp,
83 }
84 }
85}
86
87#[derive(Debug, Clone, Eq, PartialEq)]
89pub struct RedisQuota<'a> {
90 quota: &'a Quota,
92 scoping: ItemScoping,
94 prefix: Arc<str>,
96 window: u64,
98 quantity: u64,
100 timestamp: UnixTimestamp,
102}
103
104impl<'a> RedisQuota<'a> {
105 pub fn new(
111 quota: &'a Quota,
112 quantity: u64,
113 scoping: ItemScoping,
114 timestamp: UnixTimestamp,
115 ) -> Option<Self> {
116 let prefix = quota.id.clone()?;
118 let window = quota.window?;
119
120 Some(Self {
121 quota,
122 scoping,
123 prefix,
124 quantity,
125 window,
126 timestamp,
127 })
128 }
129
130 pub fn build_owned(&self) -> OwnedRedisQuota {
133 OwnedRedisQuota {
134 quota: self.quota.clone(),
135 scoping: self.scoping,
136 prefix: Arc::clone(&self.prefix),
137 window: self.window,
138 quantity: self.quantity,
139 timestamp: self.timestamp,
140 }
141 }
142
143 pub fn window(&self) -> u64 {
145 self.window
146 }
147
148 pub fn prefix(&self) -> &str {
150 &self.prefix
151 }
152
153 pub fn quantity(&self) -> u64 {
155 self.quantity
156 }
157
158 pub fn limit(&self) -> i64 {
163 self.limit
164 .and_then(|limit| limit.try_into().ok())
166 .unwrap_or(-1)
167 }
168
169 fn shift(&self) -> u64 {
170 self.scoping.organization_id.value() % self.window
171 }
172
173 pub fn slot(&self) -> u64 {
177 (self.timestamp.as_secs() - self.shift()) / self.window
178 }
179
180 pub fn expiry(&self) -> UnixTimestamp {
182 let next_slot = self.slot() + 1;
183 let next_start = next_slot * self.window + self.shift();
184 UnixTimestamp::from_secs(next_start)
185 }
186
187 pub fn key_expiry(&self) -> u64 {
191 self.expiry().as_secs() + GRACE
192 }
193
194 pub fn key(&self) -> QuotaCacheKey {
200 let subscope = match self.quota.scope {
203 QuotaScope::Organization => None,
204 scope => self.scoping.scope_id(scope),
205 };
206
207 QuotaCacheKey {
208 id: Arc::clone(&self.prefix),
209 org: self.scoping.organization_id,
210 subscope,
211 namespace: self.namespace,
212 slot: self.slot(),
213 }
214 }
215
216 fn for_cache(&self) -> cache::Quota<QuotaCacheKey> {
218 cache::Quota {
219 limit: self.limit(),
220 window: self.window,
221 key: self.key(),
222 expiry: UnixTimestamp::from_secs(self.key_expiry()),
223 }
224 }
225}
226
227impl std::ops::Deref for RedisQuota<'_> {
228 type Target = Quota;
229
230 fn deref(&self) -> &Self::Target {
231 self.quota
232 }
233}
234
235#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
241pub struct QuotaCacheKey {
242 id: Arc<str>,
243 org: OrganizationId,
244 subscope: Option<u64>,
245 namespace: Option<MetricNamespace>,
246 slot: u64,
247}
248
249impl fmt::Display for QuotaCacheKey {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 write!(
252 f,
253 "quota:{id}{{{org}}}{subscope}{namespace}:{slot}",
254 id = self.id,
255 org = self.org,
256 subscope = OptionalDisplay(self.subscope),
257 namespace = OptionalDisplay(self.namespace),
258 slot = self.slot,
259 )
260 }
261}
262
263#[derive(Clone)]
274pub struct RedisRateLimiter {
275 client: AsyncRedisClient,
276 cache: Option<Arc<OpportunisticQuotaCache<QuotaCacheKey>>>,
277 script: &'static Script,
278 max_limit: Option<u64>,
279}
280
281impl RedisRateLimiter {
282 pub fn new(client: AsyncRedisClient) -> Self {
284 RedisRateLimiter {
285 client,
286 cache: None,
287 script: RedisScripts::load_is_rate_limited(),
288 max_limit: None,
289 }
290 }
291
292 pub fn max_limit(mut self, max_limit: Option<u64>) -> Self {
297 self.max_limit = max_limit;
298 self
299 }
300
301 pub fn cache(mut self, cache_ratio: Option<f32>, max: Option<f32>) -> Self {
309 self.cache = cache_ratio
310 .map(OpportunisticQuotaCache::new)
311 .map(|c| c.with_max(max))
312 .map(Arc::new);
313
314 self
315 }
316
317 pub async fn is_rate_limited<'a>(
334 &self,
335 quotas: impl IntoIterator<Item = &'a Quota>,
336 item_scoping: ItemScoping,
337 quantity: usize,
338 over_accept_once: bool,
339 ) -> Result<RateLimits, RateLimitingError> {
340 let timestamp = UnixTimestamp::now();
341 let mut invocation = self.script.prepare_invoke();
342 let mut tracked_quotas = Vec::new();
343 let mut rate_limits = RateLimits::new();
344
345 let quantity = u64::try_from(quantity).unwrap_or(u64::MAX);
346
347 for quota in quotas {
348 if !quota.matches(item_scoping) {
349 } else if quota.limit == Some(0) {
351 let retry_after = self.retry_after(REJECT_ALL_SECS);
355 rate_limits.add(RateLimit::from_quota(quota, *item_scoping, retry_after));
356 } else if let Some(mut quota) =
357 RedisQuota::new(quota, quantity, item_scoping, timestamp)
358 {
359 if let Some(cache) = &self.cache {
360 quota.quantity = match cache.check_quota(quota.for_cache(), quantity) {
361 cache::Action::Accept => continue,
362 cache::Action::Check(quantity) => quantity,
363 };
364 }
365
366 let redis_key = quota.key().to_string();
367 let refund_key = get_refunded_quota_key(&redis_key);
369
370 invocation.key(redis_key);
371 invocation.key(refund_key);
372
373 invocation.arg(quota.limit());
374 invocation.arg(quota.key_expiry());
375 invocation.arg(quota.quantity);
376 invocation.arg(over_accept_once);
377
378 tracked_quotas.push(quota);
379 } else {
380 relay_log::with_scope(
383 |scope| scope.set_extra("quota", value::to_value(quota).unwrap()),
384 || relay_log::warn!("skipping unsupported quota"),
385 )
386 }
387 }
388
389 if tracked_quotas.is_empty() || rate_limits.is_limited() {
392 return Ok(rate_limits);
393 }
394
395 let mut connection = self.client.get_connection().await?;
396 let result: ScriptResult = invocation
397 .invoke_async(&mut connection)
398 .await
399 .map_err(RedisError::Redis)?;
400
401 for (quota, state) in tracked_quotas.iter().zip(result.0) {
402 if state.is_rejected {
403 let cache_error = {
406 let remaining = quota.limit().saturating_sub(state.consumed).max(0) as u64;
407 let cache_quantity = quota.quantity.saturating_sub(quantity);
408
409 cache_quantity.saturating_sub(remaining)
410 };
411 relay_statsd::metric!(
412 counter(QuotaCounters::CacheError) += cache_error,
413 category = item_scoping.category.name(),
414 );
415
416 let retry_after = self.retry_after((quota.expiry() - timestamp).as_secs());
417 rate_limits.add(RateLimit::from_quota(quota, *item_scoping, retry_after));
418 } else if let Some(cache) = &self.cache {
419 cache.set_quota(quota.for_cache(), state.consumed);
422 }
423 }
424 drop(connection);
425
426 if let Some(cache) = &self.cache {
427 let vacuum_start = std::time::Instant::now();
428 if cache.try_vacuum(timestamp) {
429 relay_statsd::metric!(
430 timer(QuotaTimers::CacheVacuumDuration) = vacuum_start.elapsed()
431 );
432 }
433 }
434
435 Ok(rate_limits)
436 }
437
438 fn retry_after(&self, mut seconds: u64) -> RetryAfter {
442 if let Some(max_limit) = self.max_limit {
443 seconds = std::cmp::min(seconds, max_limit);
444 }
445
446 RetryAfter::from_secs(seconds)
447 }
448}
449
450#[derive(Debug)]
452struct ScriptResult(Vec<QuotaState>);
453
454impl FromRedisValue for ScriptResult {
455 fn from_redis_value(v: redis::Value) -> Result<Self, ParsingError> {
456 let seq = v.into_sequence().map_err(|v| {
457 format!("Expected a sequence from the rate limiting script (value was: {v:?})")
458 })?;
459
460 if !seq.len().is_multiple_of(2) {
461 return Err(format!(
462 "Expected an even number of values from the rate limiting script (value was: {seq:?})"
463 ).into());
464 }
465
466 let mut quotas = Vec::with_capacity(seq.len() / 2);
467 for (is_rejected, consumed) in seq.into_iter().tuples() {
468 quotas.push(QuotaState {
469 is_rejected: bool::from_redis_value(is_rejected)?,
470 consumed: i64::from_redis_value(consumed)?,
471 });
472 }
473
474 Ok(Self(quotas))
475 }
476}
477
478#[derive(Debug)]
480struct QuotaState {
481 is_rejected: bool,
483 consumed: i64,
485}
486
487#[cfg(test)]
488mod tests {
489 use std::time::{SystemTime, UNIX_EPOCH};
490
491 use super::*;
492 use crate::MetricNamespaceScoping;
493 use crate::quota::{DataCategories, DataCategory, ReasonCode, Scoping};
494 use crate::rate_limit::RateLimitScope;
495 use relay_base_schema::metrics::MetricNamespace;
496 use relay_base_schema::organization::OrganizationId;
497 use relay_base_schema::project::{ProjectId, ProjectKey};
498 use relay_redis::RedisConfigOptions;
499 use relay_redis::redis::AsyncCommands;
500 use smallvec::smallvec;
501
502 fn build_rate_limiter() -> RedisRateLimiter {
503 let url = std::env::var("RELAY_REDIS_URL")
504 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned());
505 let client =
506 AsyncRedisClient::single("test", &url, &RedisConfigOptions::default()).unwrap();
507
508 RedisRateLimiter {
509 client,
510 cache: None,
511 script: RedisScripts::load_is_rate_limited(),
512 max_limit: None,
513 }
514 }
515
516 #[tokio::test]
517 async fn test_zero_size_quotas() {
518 let quotas = &[
519 Quota {
520 id: None,
521 categories: DataCategories::new(),
522 scope: QuotaScope::Organization,
523 scope_id: None,
524 limit: Some(0),
525 window: None,
526 reason_code: Some(ReasonCode::new("get_lost")),
527 namespace: None,
528 },
529 Quota {
530 id: Some("42".into()),
531 categories: DataCategories::new(),
532 scope: QuotaScope::Organization,
533 scope_id: None,
534 limit: None,
535 window: Some(42),
536 reason_code: Some(ReasonCode::new("unlimited")),
537 namespace: None,
538 },
539 ];
540
541 let scoping = ItemScoping {
542 category: DataCategory::Error,
543 scoping: Scoping {
544 organization_id: OrganizationId::new(42),
545 project_id: ProjectId::new(43),
546 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
547 key_id: Some(44),
548 },
549 namespace: MetricNamespaceScoping::None,
550 };
551
552 let rate_limits: Vec<RateLimit> = build_rate_limiter()
553 .is_rate_limited(quotas, scoping, 1, false)
554 .await
555 .expect("rate limiting failed")
556 .into_iter()
557 .collect();
558
559 assert_eq!(
560 rate_limits,
561 vec![RateLimit {
562 categories: DataCategories::new(),
563 scope: RateLimitScope::Organization(OrganizationId::new(42)),
564 reason_code: Some(ReasonCode::new("get_lost")),
565 retry_after: rate_limits[0].retry_after,
566 namespaces: smallvec![],
567 }]
568 );
569 }
570
571 #[tokio::test]
573 async fn test_namespace_quota() {
574 let quota_limit = 5;
575 let get_quota = |namespace: Option<MetricNamespace>| -> Quota {
576 Quota {
577 id: Some(format!("test_simple_quota_{}", uuid::Uuid::new_v4()).into()),
578 categories: DataCategories::new(),
579 scope: QuotaScope::Organization,
580 scope_id: None,
581 limit: Some(quota_limit),
582 window: Some(600),
583 reason_code: Some(ReasonCode::new(format!("ns: {namespace:?}"))),
584 namespace,
585 }
586 };
587
588 let quotas = &[get_quota(None)];
589 let quota_with_namespace = &[get_quota(Some(MetricNamespace::Transactions))];
590
591 let scoping = ItemScoping {
592 category: DataCategory::Error,
593 scoping: Scoping {
594 organization_id: OrganizationId::new(42),
595 project_id: ProjectId::new(43),
596 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
597 key_id: Some(44),
598 },
599 namespace: MetricNamespaceScoping::Some(MetricNamespace::Transactions),
600 };
601
602 let rate_limiter = build_rate_limiter();
603
604 for i in 0..10 {
606 let rate_limits: Vec<RateLimit> = rate_limiter
607 .is_rate_limited(quotas, scoping, 1, false)
608 .await
609 .expect("rate limiting failed")
610 .into_iter()
611 .collect();
612
613 if i < quota_limit {
614 assert_eq!(rate_limits, vec![]);
615 } else {
616 assert_eq!(
617 rate_limits[0].reason_code,
618 Some(ReasonCode::new("ns: None"))
619 );
620 }
621 }
622
623 for i in 0..10 {
625 let rate_limits: Vec<RateLimit> = rate_limiter
626 .is_rate_limited(quota_with_namespace, scoping, 1, false)
627 .await
628 .expect("rate limiting failed")
629 .into_iter()
630 .collect();
631
632 if i < quota_limit {
633 assert_eq!(rate_limits, vec![]);
634 } else {
635 assert_eq!(
636 rate_limits[0].reason_code,
637 Some(ReasonCode::new("ns: Some(Transactions)"))
638 );
639 }
640 }
641 }
642
643 #[tokio::test]
644 async fn test_simple_quota() {
645 let quotas = &[Quota {
646 id: Some(format!("test_simple_quota_{}", uuid::Uuid::new_v4()).into()),
647 categories: DataCategories::new(),
648 scope: QuotaScope::Organization,
649 scope_id: None,
650 limit: Some(5),
651 window: Some(60),
652 reason_code: Some(ReasonCode::new("get_lost")),
653 namespace: None,
654 }];
655
656 let scoping = ItemScoping {
657 category: DataCategory::Error,
658 scoping: Scoping {
659 organization_id: OrganizationId::new(42),
660 project_id: ProjectId::new(43),
661 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
662 key_id: Some(44),
663 },
664 namespace: MetricNamespaceScoping::None,
665 };
666
667 let rate_limiter = build_rate_limiter();
668
669 for i in 0..10 {
670 let rate_limits: Vec<RateLimit> = rate_limiter
671 .is_rate_limited(quotas, scoping, 1, false)
672 .await
673 .expect("rate limiting failed")
674 .into_iter()
675 .collect();
676
677 if i >= 5 {
678 assert_eq!(
679 rate_limits,
680 vec![RateLimit {
681 categories: DataCategories::new(),
682 scope: RateLimitScope::Organization(OrganizationId::new(42)),
683 reason_code: Some(ReasonCode::new("get_lost")),
684 retry_after: rate_limits[0].retry_after,
685 namespaces: smallvec![],
686 }]
687 );
688 } else {
689 assert_eq!(rate_limits, vec![]);
690 }
691 }
692 }
693
694 #[tokio::test]
695 async fn test_quantity_0() {
696 let quotas = &[Quota {
697 id: Some(format!("test_quantity_0_{}", uuid::Uuid::new_v4()).into()),
698 categories: DataCategories::new(),
699 scope: QuotaScope::Organization,
700 scope_id: None,
701 limit: Some(1),
702 window: Some(60),
703 reason_code: Some(ReasonCode::new("get_lost")),
704 namespace: None,
705 }];
706
707 let scoping = ItemScoping {
708 category: DataCategory::Error,
709 scoping: Scoping {
710 organization_id: OrganizationId::new(42),
711 project_id: ProjectId::new(43),
712 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
713 key_id: Some(44),
714 },
715 namespace: MetricNamespaceScoping::None,
716 };
717
718 let rate_limiter = build_rate_limiter();
719
720 assert!(
722 !rate_limiter
723 .is_rate_limited(quotas, scoping, 1, false)
724 .await
725 .unwrap()
726 .is_limited()
727 );
728
729 assert!(
731 rate_limiter
732 .is_rate_limited(quotas, scoping, 1, false)
733 .await
734 .unwrap()
735 .is_limited()
736 );
737
738 assert!(
740 rate_limiter
741 .is_rate_limited(quotas, scoping, 0, false)
742 .await
743 .unwrap()
744 .is_limited()
745 );
746
747 assert!(
749 rate_limiter
750 .is_rate_limited(quotas, scoping, 1, false)
751 .await
752 .unwrap()
753 .is_limited()
754 );
755 }
756
757 #[tokio::test]
758 async fn test_quota_go_over() {
759 let quotas = &[Quota {
760 id: Some(format!("test_quota_go_over{}", uuid::Uuid::new_v4()).into()),
761 categories: DataCategories::new(),
762 scope: QuotaScope::Organization,
763 scope_id: None,
764 limit: Some(2),
765 window: Some(60),
766 reason_code: Some(ReasonCode::new("get_lost")),
767 namespace: None,
768 }];
769
770 let scoping = ItemScoping {
771 category: DataCategory::Error,
772 scoping: Scoping {
773 organization_id: OrganizationId::new(42),
774 project_id: ProjectId::new(43),
775 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
776 key_id: Some(44),
777 },
778 namespace: MetricNamespaceScoping::None,
779 };
780
781 let rate_limiter = build_rate_limiter();
782
783 let is_limited = rate_limiter
785 .is_rate_limited(quotas, scoping, 1, true)
786 .await
787 .unwrap()
788 .is_limited();
789 assert!(!is_limited);
790
791 let is_limited = rate_limiter
793 .is_rate_limited(quotas, scoping, 2, true)
794 .await
795 .unwrap()
796 .is_limited();
797 assert!(!is_limited);
798
799 let is_limited = rate_limiter
801 .is_rate_limited(quotas, scoping, 0, true)
802 .await
803 .unwrap()
804 .is_limited();
805 assert!(is_limited);
806
807 let is_limited = rate_limiter
809 .is_rate_limited(quotas, scoping, 1, true)
810 .await
811 .unwrap()
812 .is_limited();
813 assert!(is_limited);
814 }
815
816 #[tokio::test]
817 async fn test_bails_immediately_without_any_quota() {
818 let scoping = ItemScoping {
819 category: DataCategory::Error,
820 scoping: Scoping {
821 organization_id: OrganizationId::new(42),
822 project_id: ProjectId::new(43),
823 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
824 key_id: Some(44),
825 },
826 namespace: MetricNamespaceScoping::None,
827 };
828
829 let rate_limits: Vec<RateLimit> = build_rate_limiter()
830 .is_rate_limited(&[], scoping, 1, false)
831 .await
832 .expect("rate limiting failed")
833 .into_iter()
834 .collect();
835
836 assert_eq!(rate_limits, vec![]);
837 }
838
839 #[tokio::test]
840 async fn test_limited_with_unlimited_quota() {
841 let quotas = &[
842 Quota {
843 id: Some("q0".into()),
844 categories: DataCategories::new(),
845 scope: QuotaScope::Organization,
846 scope_id: None,
847 limit: None,
848 window: Some(1),
849 reason_code: Some(ReasonCode::new("project_quota0")),
850 namespace: None,
851 },
852 Quota {
853 id: Some("q1".into()),
854 categories: DataCategories::new(),
855 scope: QuotaScope::Organization,
856 scope_id: None,
857 limit: Some(1),
858 window: Some(1),
859 reason_code: Some(ReasonCode::new("project_quota1")),
860 namespace: None,
861 },
862 ];
863
864 let scoping = ItemScoping {
865 category: DataCategory::Error,
866 scoping: Scoping {
867 organization_id: OrganizationId::new(42),
868 project_id: ProjectId::new(43),
869 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
870 key_id: Some(44),
871 },
872 namespace: MetricNamespaceScoping::None,
873 };
874
875 let rate_limiter = build_rate_limiter();
876
877 for i in 0..1 {
878 let rate_limits: Vec<RateLimit> = rate_limiter
879 .is_rate_limited(quotas, scoping, 1, false)
880 .await
881 .expect("rate limiting failed")
882 .into_iter()
883 .collect();
884
885 if i == 0 {
886 assert_eq!(rate_limits, &[]);
887 } else {
888 assert_eq!(
889 rate_limits,
890 vec![RateLimit {
891 categories: DataCategories::new(),
892 scope: RateLimitScope::Organization(OrganizationId::new(42)),
893 reason_code: Some(ReasonCode::new("project_quota1")),
894 retry_after: rate_limits[0].retry_after,
895 namespaces: smallvec![],
896 }]
897 );
898 }
899 }
900 }
901
902 #[tokio::test]
903 async fn test_quota_with_quantity() {
904 let quotas = &[Quota {
905 id: Some(format!("test_quantity_quota_{}", uuid::Uuid::new_v4()).into()),
906 categories: DataCategories::new(),
907 scope: QuotaScope::Organization,
908 scope_id: None,
909 limit: Some(500),
910 window: Some(60),
911 reason_code: Some(ReasonCode::new("get_lost")),
912 namespace: None,
913 }];
914
915 let scoping = ItemScoping {
916 category: DataCategory::Error,
917 scoping: Scoping {
918 organization_id: OrganizationId::new(42),
919 project_id: ProjectId::new(43),
920 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
921 key_id: Some(44),
922 },
923 namespace: MetricNamespaceScoping::None,
924 };
925
926 let rate_limiter = build_rate_limiter();
927
928 for i in 0..10 {
929 let rate_limits: Vec<RateLimit> = rate_limiter
930 .is_rate_limited(quotas, scoping, 100, false)
931 .await
932 .expect("rate limiting failed")
933 .into_iter()
934 .collect();
935
936 if i >= 5 {
937 assert_eq!(
938 rate_limits,
939 vec![RateLimit {
940 categories: DataCategories::new(),
941 scope: RateLimitScope::Organization(OrganizationId::new(42)),
942 reason_code: Some(ReasonCode::new("get_lost")),
943 retry_after: rate_limits[0].retry_after,
944 namespaces: smallvec![],
945 }]
946 );
947 } else {
948 assert_eq!(rate_limits, vec![]);
949 }
950 }
951 }
952
953 #[tokio::test]
954 async fn test_get_redis_key_scoped() {
955 let quota = Quota {
956 id: Some("foo".into()),
957 categories: DataCategories::new(),
958 scope: QuotaScope::Project,
959 scope_id: Some("42".into()),
960 window: Some(2),
961 limit: Some(0),
962 reason_code: None,
963 namespace: None,
964 };
965
966 let scoping = ItemScoping {
967 category: DataCategory::Error,
968 scoping: Scoping {
969 organization_id: OrganizationId::new(69420),
970 project_id: ProjectId::new(42),
971 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
972 key_id: Some(4711),
973 },
974 namespace: MetricNamespaceScoping::None,
975 };
976
977 let timestamp = UnixTimestamp::from_secs(123_123_123);
978 let redis_quota = RedisQuota::new("a, 0, scoping, timestamp).unwrap();
979 assert_eq!(redis_quota.key().to_string(), "quota:foo{69420}42:61561561");
980 }
981
982 #[tokio::test]
983 async fn test_get_redis_key_unscoped() {
984 let quota = Quota {
985 id: Some("foo".into()),
986 categories: DataCategories::new(),
987 scope: QuotaScope::Organization,
988 scope_id: None,
989 window: Some(10),
990 limit: Some(0),
991 reason_code: None,
992 namespace: None,
993 };
994
995 let scoping = ItemScoping {
996 category: DataCategory::Error,
997 scoping: Scoping {
998 organization_id: OrganizationId::new(69420),
999 project_id: ProjectId::new(42),
1000 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1001 key_id: Some(4711),
1002 },
1003 namespace: MetricNamespaceScoping::None,
1004 };
1005
1006 let timestamp = UnixTimestamp::from_secs(234_531);
1007 let redis_quota = RedisQuota::new("a, 0, scoping, timestamp).unwrap();
1008 assert_eq!(redis_quota.key().to_string(), "quota:foo{69420}:23453");
1009 }
1010
1011 #[tokio::test]
1012 async fn test_large_redis_limit_large() {
1013 let quota = Quota {
1014 id: Some("foo".into()),
1015 categories: DataCategories::new(),
1016 scope: QuotaScope::Organization,
1017 scope_id: None,
1018 window: Some(10),
1019 limit: Some(9223372036854775808), reason_code: None,
1021 namespace: None,
1022 };
1023
1024 let scoping = ItemScoping {
1025 category: DataCategory::Error,
1026 scoping: Scoping {
1027 organization_id: OrganizationId::new(69420),
1028 project_id: ProjectId::new(42),
1029 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1030 key_id: Some(4711),
1031 },
1032 namespace: MetricNamespaceScoping::None,
1033 };
1034
1035 let timestamp = UnixTimestamp::from_secs(234_531);
1036 let redis_quota = RedisQuota::new("a, 0, scoping, timestamp).unwrap();
1037 assert_eq!(redis_quota.limit(), -1);
1038 }
1039
1040 #[tokio::test]
1041 async fn test_is_rate_limited_script() {
1042 let now = SystemTime::now()
1043 .duration_since(UNIX_EPOCH)
1044 .map(|duration| duration.as_secs())
1045 .unwrap();
1046
1047 let rate_limiter = build_rate_limiter();
1048 let mut conn = rate_limiter.client.get_connection().await.unwrap();
1049
1050 let foo = format!("foo___{now}");
1052 let r_foo = format!("r:foo___{now}");
1053 let bar = format!("bar___{now}");
1054 let r_bar = format!("r:bar___{now}");
1055 let apple = format!("apple___{now}");
1056 let orange = format!("orange___{now}");
1057 let baz = format!("baz___{now}");
1058
1059 let script = RedisScripts::load_is_rate_limited();
1060
1061 macro_rules! assert_invocation {
1062 ($invocation:expr, $($tt:tt)*) => {{
1063 let result = $invocation
1064 .invoke_async::<ScriptResult>(&mut conn)
1065 .await
1066 .unwrap();
1067
1068 insta::assert_debug_snapshot!(result, $($tt)*);
1069 }};
1070 }
1071
1072 let mut invocation = script.prepare_invoke();
1073 invocation
1074 .key(&foo) .key(&r_foo) .key(&bar) .key(&r_bar) .arg(1) .arg(now + 60) .arg(1) .arg(false) .arg(2) .arg(now + 120) .arg(1) .arg(false); let mut invocation2 = script.prepare_invoke();
1090 invocation2
1091 .key(&bar) .key(&r_bar) .arg(2) .arg(now + 120) .arg(1) .arg(false); assert_invocation!(invocation, @r"
1100 ScriptResult(
1101 [
1102 QuotaState {
1103 is_rejected: false,
1104 consumed: 1,
1105 },
1106 QuotaState {
1107 is_rejected: false,
1108 consumed: 1,
1109 },
1110 ],
1111 )
1112 "
1113 );
1114
1115 assert_invocation!(invocation, @r"
1118 ScriptResult(
1119 [
1120 QuotaState {
1121 is_rejected: true,
1122 consumed: 1,
1123 },
1124 QuotaState {
1125 is_rejected: false,
1126 consumed: 1,
1127 },
1128 ],
1129 )
1130 "
1131 );
1132
1133 assert_invocation!(invocation, @r"
1135 ScriptResult(
1136 [
1137 QuotaState {
1138 is_rejected: true,
1139 consumed: 1,
1140 },
1141 QuotaState {
1142 is_rejected: false,
1143 consumed: 1,
1144 },
1145 ],
1146 )
1147 "
1148 );
1149
1150 assert_invocation!(invocation2, @r"
1153 ScriptResult(
1154 [
1155 QuotaState {
1156 is_rejected: false,
1157 consumed: 2,
1158 },
1159 ],
1160 )
1161 "
1162 );
1163
1164 assert_invocation!(invocation2, @r"
1166 ScriptResult(
1167 [
1168 QuotaState {
1169 is_rejected: true,
1170 consumed: 2,
1171 },
1172 ],
1173 )
1174 "
1175 );
1176
1177 assert_invocation!(invocation, @r"
1179 ScriptResult(
1180 [
1181 QuotaState {
1182 is_rejected: true,
1183 consumed: 1,
1184 },
1185 QuotaState {
1186 is_rejected: true,
1187 consumed: 2,
1188 },
1189 ],
1190 )
1191 "
1192 );
1193
1194 assert_eq!(conn.get::<_, String>(&foo).await.unwrap(), "1");
1195 let ttl: u64 = conn.ttl(&foo).await.unwrap();
1196 assert!(ttl >= 59);
1197 assert!(ttl <= 60);
1198
1199 assert_eq!(conn.get::<_, String>(&bar).await.unwrap(), "2");
1200 let ttl: u64 = conn.ttl(&bar).await.unwrap();
1201 assert!(ttl >= 119);
1202 assert!(ttl <= 120);
1203
1204 let () = conn.get(r_foo).await.unwrap();
1206 let () = conn.get(r_bar).await.unwrap();
1207
1208 let () = conn.set(&apple, 5).await.unwrap();
1210
1211 let mut invocation = script.prepare_invoke();
1212 invocation
1213 .key(&orange) .key(&baz) .arg(1) .arg(now + 60) .arg(1) .arg(false);
1219
1220 assert_invocation!(invocation, @r"
1222 ScriptResult(
1223 [
1224 QuotaState {
1225 is_rejected: false,
1226 consumed: 1,
1227 },
1228 ],
1229 )
1230 "
1231 );
1232
1233 assert_invocation!(invocation, @r"
1235 ScriptResult(
1236 [
1237 QuotaState {
1238 is_rejected: true,
1239 consumed: 1,
1240 },
1241 ],
1242 )
1243 "
1244 );
1245
1246 assert_invocation!(invocation, @r"
1248 ScriptResult(
1249 [
1250 QuotaState {
1251 is_rejected: true,
1252 consumed: 1,
1253 },
1254 ],
1255 )
1256 "
1257 );
1258
1259 let mut invocation = script.prepare_invoke();
1260 invocation
1261 .key(&orange) .key(&apple) .arg(1) .arg(now + 60) .arg(1) .arg(false);
1267
1268 assert_invocation!(invocation, @r"
1270 ScriptResult(
1271 [
1272 QuotaState {
1273 is_rejected: false,
1274 consumed: -3,
1275 },
1276 ],
1277 )
1278 "
1279 );
1280 }
1281
1282 #[tokio::test]
1284 async fn test_quota_with_cache() {
1285 let quotas = &[Quota {
1286 id: Some(format!("test_simple_quota_{}", uuid::Uuid::new_v4()).into()),
1287 categories: DataCategories::new(),
1288 scope: QuotaScope::Organization,
1289 scope_id: None,
1290 limit: Some(50),
1291 window: Some(60),
1292 reason_code: Some(ReasonCode::new("get_lost")),
1293 namespace: None,
1294 }];
1295
1296 let scoping = ItemScoping {
1297 category: DataCategory::Error,
1298 scoping: Scoping {
1299 organization_id: OrganizationId::new(42),
1300 project_id: ProjectId::new(43),
1301 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1302 key_id: Some(44),
1303 },
1304 namespace: MetricNamespaceScoping::None,
1305 };
1306
1307 let rate_limiter = build_rate_limiter().cache(Some(0.1), Some(0.9));
1310
1311 for _ in 0..50 {
1312 let rate_limits = rate_limiter
1313 .is_rate_limited(quotas, scoping, 1, false)
1314 .await
1315 .unwrap();
1316
1317 assert!(rate_limits.is_empty());
1318 }
1319
1320 let rate_limits: Vec<RateLimit> = rate_limiter
1321 .is_rate_limited(quotas, scoping, 1, false)
1322 .await
1323 .expect("rate limiting failed")
1324 .into_iter()
1325 .collect();
1326
1327 assert_eq!(
1328 rate_limits,
1329 vec![RateLimit {
1330 categories: DataCategories::new(),
1331 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1332 reason_code: Some(ReasonCode::new("get_lost")),
1333 retry_after: rate_limits[0].retry_after,
1334 namespaces: smallvec![],
1335 }]
1336 );
1337 }
1338
1339 #[tokio::test]
1340 async fn test_quota_with_cache_slightly_over_account() {
1341 let window = 60;
1342 let limit = 50 * window;
1343
1344 let quotas = &[Quota {
1345 id: Some(format!("test_simple_quota_{}", uuid::Uuid::new_v4()).into()),
1346 categories: DataCategories::new(),
1347 scope: QuotaScope::Organization,
1348 scope_id: None,
1349 limit: Some(limit),
1350 window: Some(window),
1351 reason_code: Some(ReasonCode::new("get_lost")),
1352 namespace: None,
1353 }];
1354
1355 let scoping = ItemScoping {
1356 category: DataCategory::Error,
1357 scoping: Scoping {
1358 organization_id: OrganizationId::new(42),
1359 project_id: ProjectId::new(43),
1360 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1361 key_id: Some(44),
1362 },
1363 namespace: MetricNamespaceScoping::None,
1364 };
1365
1366 let rate_limiter1 = build_rate_limiter().cache(Some(0.1), None);
1368 let rate_limiter2 = build_rate_limiter().cache(Some(0.1), None);
1369
1370 let rate_limits = rate_limiter1
1372 .is_rate_limited(quotas, scoping, 1, false)
1373 .await
1374 .unwrap();
1375 assert!(rate_limits.is_empty());
1376 let rate_limits = rate_limiter1
1378 .is_rate_limited(quotas, scoping, 3, false)
1379 .await
1380 .unwrap();
1381 assert!(rate_limits.is_empty());
1382
1383 let rate_limits = rate_limiter2
1385 .is_rate_limited(quotas, scoping, limit as usize - 1, false)
1386 .await
1387 .unwrap();
1388 assert!(rate_limits.is_empty());
1389
1390 let rate_limits = rate_limiter1
1392 .is_rate_limited(quotas, scoping, 1, false)
1393 .await
1394 .unwrap();
1395 assert!(rate_limits.is_empty());
1396
1397 let rate_limits: Vec<RateLimit> = rate_limiter1
1399 .is_rate_limited(quotas, scoping, 1, false)
1400 .await
1401 .unwrap()
1402 .into_iter()
1403 .collect();
1404
1405 assert_eq!(
1406 rate_limits,
1407 vec![RateLimit {
1408 categories: DataCategories::new(),
1409 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1410 reason_code: Some(ReasonCode::new("get_lost")),
1411 retry_after: rate_limits[0].retry_after,
1412 namespaces: smallvec![],
1413 }]
1414 );
1415 }
1416}