objectstore_server/extractors/
id.rs

1use std::borrow::Cow;
2
3use axum::extract::rejection::PathRejection;
4use axum::extract::{FromRequestParts, Path};
5use axum::http::request::Parts;
6use axum::response::{IntoResponse, Response};
7use objectstore_service::id::{ObjectContext, ObjectId};
8use objectstore_types::scope::{EMPTY_SCOPES, Scope, Scopes};
9use serde::{Deserialize, de};
10
11use crate::extractors::Xt;
12use crate::extractors::downstream_service::DownstreamService;
13use crate::state::ServiceState;
14
15#[derive(Debug)]
16pub enum ObjectRejection {
17    Path(PathRejection),
18    Killswitched,
19    RateLimited,
20}
21
22impl IntoResponse for ObjectRejection {
23    fn into_response(self) -> Response {
24        match self {
25            ObjectRejection::Path(rejection) => rejection.into_response(),
26            ObjectRejection::Killswitched => (
27                axum::http::StatusCode::FORBIDDEN,
28                "Object access is disabled for this scope through killswitches",
29            )
30                .into_response(),
31            ObjectRejection::RateLimited => (
32                axum::http::StatusCode::TOO_MANY_REQUESTS,
33                "Object access is rate limited",
34            )
35                .into_response(),
36        }
37    }
38}
39
40impl From<PathRejection> for ObjectRejection {
41    fn from(rejection: PathRejection) -> Self {
42        ObjectRejection::Path(rejection)
43    }
44}
45
46impl FromRequestParts<ServiceState> for Xt<ObjectId> {
47    type Rejection = ObjectRejection;
48
49    async fn from_request_parts(
50        parts: &mut Parts,
51        state: &ServiceState,
52    ) -> Result<Self, Self::Rejection> {
53        let Path(params) = Path::<ObjectParams>::from_request_parts(parts, state).await?;
54        let id = ObjectId::from_parts(params.usecase, params.scopes, params.key);
55
56        populate_sentry_context(id.context());
57        sentry::configure_scope(|s| s.set_extra("key", id.key().into()));
58
59        let service = DownstreamService::from_request_parts(parts, state)
60            .await
61            .unwrap();
62
63        if state
64            .config
65            .killswitches
66            .matches(id.context(), service.as_str())
67        {
68            tracing::debug!("Request rejected due to killswitches");
69            return Err(ObjectRejection::Killswitched);
70        }
71
72        if !state.rate_limiter.check(id.context()) {
73            tracing::debug!("Request rejected due to rate limits");
74            return Err(ObjectRejection::RateLimited);
75        }
76
77        Ok(Xt(id))
78    }
79}
80
81/// Path parameters used for object-level endpoints.
82///
83/// This is meant to be used with the axum `Path` extractor.
84#[derive(Clone, Debug, Deserialize)]
85struct ObjectParams {
86    usecase: String,
87    #[serde(deserialize_with = "deserialize_scopes")]
88    scopes: Scopes,
89    key: String,
90}
91
92/// Deserializes a `Scopes` instance from a string representation.
93///
94/// The string representation is a semicolon-separated list of `key=value` pairs, following the
95/// Matrix URIs proposal. An empty scopes string (`"_"`) represents no scopes.
96fn deserialize_scopes<'de, D>(deserializer: D) -> Result<Scopes, D::Error>
97where
98    D: de::Deserializer<'de>,
99{
100    let s = Cow::<str>::deserialize(deserializer)?;
101    if s == EMPTY_SCOPES {
102        return Ok(Scopes::empty());
103    }
104
105    let scopes = s
106        .split(';')
107        .map(|s| {
108            let (key, value) = s
109                .split_once("=")
110                .ok_or_else(|| de::Error::custom("scope must be 'key=value'"))?;
111
112            Scope::create(key, value).map_err(de::Error::custom)
113        })
114        .collect::<Result<_, _>>()?;
115
116    Ok(scopes)
117}
118
119impl FromRequestParts<ServiceState> for Xt<ObjectContext> {
120    type Rejection = ObjectRejection;
121
122    async fn from_request_parts(
123        parts: &mut Parts,
124        state: &ServiceState,
125    ) -> Result<Self, Self::Rejection> {
126        let Path(params) = Path::<ContextParams>::from_request_parts(parts, state).await?;
127        let context = ObjectContext {
128            usecase: params.usecase,
129            scopes: params.scopes,
130        };
131
132        populate_sentry_context(&context);
133
134        let service = DownstreamService::from_request_parts(parts, state)
135            .await
136            .unwrap();
137
138        if state
139            .config
140            .killswitches
141            .matches(&context, service.as_str())
142        {
143            tracing::debug!("Request rejected due to killswitches");
144            return Err(ObjectRejection::Killswitched);
145        }
146
147        if !state.rate_limiter.check(&context) {
148            tracing::debug!("Request rejected due to rate limits");
149            return Err(ObjectRejection::RateLimited);
150        }
151
152        Ok(Xt(context))
153    }
154}
155
156/// Path parameters for extracting an [`ObjectContext`] from a request path.
157///
158/// Works on both collection-level (`/objects/{usecase}/{scopes}`) and object-level
159/// (`/objects/{usecase}/{scopes}/{*key}`) routes — the extra `key` parameter is ignored.
160#[derive(Clone, Debug, Deserialize)]
161pub(super) struct ContextParams {
162    pub usecase: String,
163    #[serde(deserialize_with = "deserialize_scopes")]
164    pub scopes: Scopes,
165}
166
167fn populate_sentry_context(context: &ObjectContext) {
168    sentry::configure_scope(|s| {
169        s.set_tag("usecase", &context.usecase);
170        for scope in &context.scopes {
171            s.set_tag(&format!("scope.{}", scope.name()), scope.value());
172        }
173    });
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use serde::de::IntoDeserializer;
180    use serde::de::value::{CowStrDeserializer, Error as DeError};
181    use std::borrow::Cow;
182
183    fn deser_scopes(input: &str) -> Result<Scopes, DeError> {
184        let deserializer: CowStrDeserializer<DeError> = Cow::Borrowed(input).into_deserializer();
185        deserialize_scopes(deserializer)
186    }
187
188    #[test]
189    fn parse_single_scope() {
190        let scopes = deser_scopes("org=123").unwrap();
191        assert_eq!(scopes.get_value("org"), Some("123"));
192    }
193
194    #[test]
195    fn parse_multiple_scopes() {
196        let scopes = deser_scopes("org=123;project=456").unwrap();
197        assert_eq!(scopes.get_value("org"), Some("123"));
198        assert_eq!(scopes.get_value("project"), Some("456"));
199    }
200
201    #[test]
202    fn parse_empty_scopes() {
203        let scopes = deser_scopes("_").unwrap();
204        assert!(scopes.is_empty());
205    }
206
207    #[test]
208    fn parse_missing_equals() {
209        let result = deser_scopes("org123");
210        assert!(result.is_err());
211    }
212
213    #[test]
214    fn parse_invalid_scope_chars() {
215        let result = deser_scopes("org=hello world");
216        assert!(result.is_err());
217    }
218
219    #[test]
220    fn parse_empty_key_or_value() {
221        assert!(deser_scopes("=value").is_err());
222        assert!(deser_scopes("key=").is_err());
223    }
224
225    // --- Extractor integration tests ---
226
227    use std::collections::BTreeMap;
228    use std::sync::Arc;
229
230    use axum::Router;
231    use axum::body::Body;
232    use axum::http::{Request, StatusCode};
233    use axum::routing::{get, post};
234    use objectstore_service::{StorageConfig, StorageService};
235    use tempfile::TempDir;
236    use tower::ServiceExt;
237
238    use crate::auth::PublicKeyDirectory;
239    use crate::config::{Config, Storage};
240    use crate::killswitches::{Killswitch, Killswitches};
241    use crate::rate_limits::{RateLimiter, RateLimits, ThroughputLimits};
242    use crate::state::{ServiceState, Services};
243    use crate::web::RequestCounter;
244
245    async fn test_state(mut config: Config) -> (ServiceState, TempDir) {
246        let tempdir = TempDir::new().unwrap();
247        config.high_volume_storage = Storage::FileSystem {
248            path: tempdir.path().join("high-volume"),
249        };
250        config.long_term_storage = Storage::FileSystem {
251            path: tempdir.path().join("long-term"),
252        };
253
254        let fs_config = StorageConfig::FileSystem {
255            path: tempdir.path(),
256        };
257        let service = StorageService::new(fs_config.clone(), fs_config)
258            .await
259            .unwrap();
260        let key_directory = PublicKeyDirectory::try_from(&config.auth).unwrap();
261        let rate_limiter = RateLimiter::new(config.rate_limits.clone());
262
263        let state = Arc::new(Services {
264            config,
265            service,
266            key_directory,
267            rate_limiter,
268            request_counter: RequestCounter::new(0),
269        });
270        (state, tempdir)
271    }
272
273    async fn handle_object_id(Xt(id): Xt<ObjectId>) -> String {
274        format!(
275            "usecase={} key={} scopes_empty={}",
276            id.context().usecase,
277            id.key(),
278            id.context().scopes.is_empty(),
279        )
280    }
281
282    async fn handle_object_context(Xt(ctx): Xt<ObjectContext>) -> String {
283        format!(
284            "usecase={} scopes_empty={}",
285            ctx.usecase,
286            ctx.scopes.is_empty(),
287        )
288    }
289
290    fn test_router(state: ServiceState) -> Router {
291        Router::new()
292            .route("/objects/{usecase}/{scopes}/{*key}", get(handle_object_id))
293            .route("/objects/{usecase}/{scopes}/", post(handle_object_context))
294            .with_state(state)
295    }
296
297    async fn response_body(response: axum::http::Response<Body>) -> String {
298        let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
299            .await
300            .unwrap();
301        String::from_utf8(bytes.to_vec()).unwrap()
302    }
303
304    // Extraction tests
305
306    #[tokio::test]
307    async fn extract_object_id_parses_path() {
308        let (state, _tempdir) = test_state(Config::default()).await;
309        let app = test_router(state);
310
311        let request = Request::builder()
312            .uri("/objects/myusecase/org=123;project=456/my-key")
313            .body(Body::empty())
314            .unwrap();
315        let response = app.oneshot(request).await.unwrap();
316
317        assert_eq!(response.status(), StatusCode::OK);
318        let body = response_body(response).await;
319        assert!(body.contains("usecase=myusecase"));
320        assert!(body.contains("key=my-key"));
321        assert!(body.contains("scopes_empty=false"));
322    }
323
324    #[tokio::test]
325    async fn extract_object_id_with_empty_scopes() {
326        let (state, _tempdir) = test_state(Config::default()).await;
327        let app = test_router(state);
328
329        let request = Request::builder()
330            .uri("/objects/myusecase/_/my-key")
331            .body(Body::empty())
332            .unwrap();
333        let response = app.oneshot(request).await.unwrap();
334
335        assert_eq!(response.status(), StatusCode::OK);
336        let body = response_body(response).await;
337        assert!(body.contains("scopes_empty=true"));
338    }
339
340    #[tokio::test]
341    async fn extract_object_context_parses_path() {
342        let (state, _tempdir) = test_state(Config::default()).await;
343        let app = test_router(state);
344
345        let request = Request::builder()
346            .method("POST")
347            .uri("/objects/myusecase/org=123;project=456/")
348            .body(Body::empty())
349            .unwrap();
350        let response = app.oneshot(request).await.unwrap();
351
352        assert_eq!(response.status(), StatusCode::OK);
353        let body = response_body(response).await;
354        assert!(body.contains("usecase=myusecase"));
355        assert!(body.contains("scopes_empty=false"));
356    }
357
358    #[tokio::test]
359    async fn extract_object_context_with_empty_scopes() {
360        let (state, _tempdir) = test_state(Config::default()).await;
361        let app = test_router(state);
362
363        let request = Request::builder()
364            .method("POST")
365            .uri("/objects/myusecase/_/")
366            .body(Body::empty())
367            .unwrap();
368        let response = app.oneshot(request).await.unwrap();
369
370        assert_eq!(response.status(), StatusCode::OK);
371        let body = response_body(response).await;
372        assert!(body.contains("scopes_empty=true"));
373    }
374
375    #[tokio::test]
376    async fn extract_object_id_invalid_scopes() {
377        let (state, _tempdir) = test_state(Config::default()).await;
378        let app = test_router(state);
379
380        let request = Request::builder()
381            .uri("/objects/myusecase/invalid-no-equals/key")
382            .body(Body::empty())
383            .unwrap();
384        let response = app.oneshot(request).await.unwrap();
385
386        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
387    }
388
389    // Killswitch tests
390
391    #[tokio::test]
392    async fn extract_object_id_killswitched() {
393        let config = Config {
394            killswitches: Killswitches(vec![Killswitch {
395                usecase: Some("blocked".into()),
396                scopes: BTreeMap::new(),
397                service: None,
398                service_matcher: Default::default(),
399            }]),
400            ..Config::default()
401        };
402        let (state, _tempdir) = test_state(config).await;
403        let app = test_router(state);
404
405        let request = Request::builder()
406            .uri("/objects/blocked/org=1/key")
407            .body(Body::empty())
408            .unwrap();
409        let response = app.clone().oneshot(request).await.unwrap();
410        assert_eq!(response.status(), StatusCode::FORBIDDEN);
411
412        let request = Request::builder()
413            .uri("/objects/allowed/org=1/key")
414            .body(Body::empty())
415            .unwrap();
416        let response = app.oneshot(request).await.unwrap();
417        assert_eq!(response.status(), StatusCode::OK);
418    }
419
420    #[tokio::test]
421    async fn extract_object_context_killswitched() {
422        let config = Config {
423            killswitches: Killswitches(vec![Killswitch {
424                usecase: Some("blocked".into()),
425                scopes: BTreeMap::new(),
426                service: None,
427                service_matcher: Default::default(),
428            }]),
429            ..Config::default()
430        };
431        let (state, _tempdir) = test_state(config).await;
432        let app = test_router(state);
433
434        let request = Request::builder()
435            .method("POST")
436            .uri("/objects/blocked/org=1/")
437            .body(Body::empty())
438            .unwrap();
439        let response = app.clone().oneshot(request).await.unwrap();
440        assert_eq!(response.status(), StatusCode::FORBIDDEN);
441
442        let request = Request::builder()
443            .method("POST")
444            .uri("/objects/allowed/org=1/")
445            .body(Body::empty())
446            .unwrap();
447        let response = app.oneshot(request).await.unwrap();
448        assert_eq!(response.status(), StatusCode::OK);
449    }
450
451    #[tokio::test]
452    async fn extract_object_id_killswitched_with_service() {
453        let config = Config {
454            killswitches: Killswitches(vec![Killswitch {
455                usecase: None,
456                scopes: BTreeMap::new(),
457                service: Some("test-*".into()),
458                service_matcher: Default::default(),
459            }]),
460            ..Config::default()
461        };
462        let (state, _tempdir) = test_state(config).await;
463        let app = test_router(state);
464
465        // Matching service header → 403
466        let request = Request::builder()
467            .uri("/objects/any/org=1/key")
468            .header("x-downstream-service", "test-service")
469            .body(Body::empty())
470            .unwrap();
471        let response = app.clone().oneshot(request).await.unwrap();
472        assert_eq!(response.status(), StatusCode::FORBIDDEN);
473
474        // Non-matching service header → 200
475        let request = Request::builder()
476            .uri("/objects/any/org=1/key")
477            .header("x-downstream-service", "other-service")
478            .body(Body::empty())
479            .unwrap();
480        let response = app.clone().oneshot(request).await.unwrap();
481        assert_eq!(response.status(), StatusCode::OK);
482
483        // No service header → 200
484        let request = Request::builder()
485            .uri("/objects/any/org=1/key")
486            .body(Body::empty())
487            .unwrap();
488        let response = app.oneshot(request).await.unwrap();
489        assert_eq!(response.status(), StatusCode::OK);
490    }
491
492    // Rate limiter tests
493
494    #[tokio::test]
495    async fn extract_object_id_rate_limited() {
496        let config = Config {
497            rate_limits: RateLimits {
498                throughput: ThroughputLimits {
499                    global_rps: Some(1),
500                    burst: 0,
501                    ..ThroughputLimits::default()
502                },
503                ..RateLimits::default()
504            },
505            ..Config::default()
506        };
507        let (state, _tempdir) = test_state(config).await;
508        let app = test_router(state);
509
510        let request = Request::builder()
511            .uri("/objects/test/org=1/key")
512            .body(Body::empty())
513            .unwrap();
514        let response = app.clone().oneshot(request).await.unwrap();
515        assert_eq!(response.status(), StatusCode::OK);
516
517        let request = Request::builder()
518            .uri("/objects/test/org=1/key")
519            .body(Body::empty())
520            .unwrap();
521        let response = app.oneshot(request).await.unwrap();
522        assert_eq!(response.status(), StatusCode::TOO_MANY_REQUESTS);
523    }
524
525    #[tokio::test]
526    async fn extract_object_context_rate_limited() {
527        let config = Config {
528            rate_limits: RateLimits {
529                throughput: ThroughputLimits {
530                    global_rps: Some(1),
531                    burst: 0,
532                    ..ThroughputLimits::default()
533                },
534                ..RateLimits::default()
535            },
536            ..Config::default()
537        };
538        let (state, _tempdir) = test_state(config).await;
539        let app = test_router(state);
540
541        let request = Request::builder()
542            .method("POST")
543            .uri("/objects/test/org=1/")
544            .body(Body::empty())
545            .unwrap();
546        let response = app.clone().oneshot(request).await.unwrap();
547        assert_eq!(response.status(), StatusCode::OK);
548
549        let request = Request::builder()
550            .method("POST")
551            .uri("/objects/test/org=1/")
552            .body(Body::empty())
553            .unwrap();
554        let response = app.oneshot(request).await.unwrap();
555        assert_eq!(response.status(), StatusCode::TOO_MANY_REQUESTS);
556    }
557}