objectstore_server/extractors/
id.rs1use std::borrow::Cow;
2
3use axum::extract::rejection::PathRejection;
4use axum::extract::{FromRequestParts, Path};
5use axum::http::request::Parts;
6use axum::response::{IntoResponse, Response};
7use objectstore_service::id::{ObjectContext, ObjectId};
8use objectstore_types::scope::{EMPTY_SCOPES, Scope, Scopes};
9use serde::{Deserialize, de};
10
11use crate::extractors::Xt;
12use crate::extractors::downstream_service::DownstreamService;
13use crate::state::ServiceState;
14
15#[derive(Debug)]
16pub enum ObjectRejection {
17 Path(PathRejection),
18 Killswitched,
19 RateLimited,
20}
21
22impl IntoResponse for ObjectRejection {
23 fn into_response(self) -> Response {
24 match self {
25 ObjectRejection::Path(rejection) => rejection.into_response(),
26 ObjectRejection::Killswitched => (
27 axum::http::StatusCode::FORBIDDEN,
28 "Object access is disabled for this scope through killswitches",
29 )
30 .into_response(),
31 ObjectRejection::RateLimited => (
32 axum::http::StatusCode::TOO_MANY_REQUESTS,
33 "Object access is rate limited",
34 )
35 .into_response(),
36 }
37 }
38}
39
40impl From<PathRejection> for ObjectRejection {
41 fn from(rejection: PathRejection) -> Self {
42 ObjectRejection::Path(rejection)
43 }
44}
45
46impl FromRequestParts<ServiceState> for Xt<ObjectId> {
47 type Rejection = ObjectRejection;
48
49 async fn from_request_parts(
50 parts: &mut Parts,
51 state: &ServiceState,
52 ) -> Result<Self, Self::Rejection> {
53 let Path(params) = Path::<ObjectParams>::from_request_parts(parts, state).await?;
54 let id = ObjectId::from_parts(params.usecase, params.scopes, params.key);
55
56 populate_sentry_context(id.context());
57 sentry::configure_scope(|s| s.set_extra("key", id.key().into()));
58
59 let service = DownstreamService::from_request_parts(parts, state)
60 .await
61 .unwrap();
62
63 if state
64 .config
65 .killswitches
66 .matches(id.context(), service.as_str())
67 {
68 tracing::debug!("Request rejected due to killswitches");
69 return Err(ObjectRejection::Killswitched);
70 }
71
72 if !state.rate_limiter.check(id.context()) {
73 tracing::debug!("Request rejected due to rate limits");
74 return Err(ObjectRejection::RateLimited);
75 }
76
77 Ok(Xt(id))
78 }
79}
80
81#[derive(Clone, Debug, Deserialize)]
85struct ObjectParams {
86 usecase: String,
87 #[serde(deserialize_with = "deserialize_scopes")]
88 scopes: Scopes,
89 key: String,
90}
91
92fn deserialize_scopes<'de, D>(deserializer: D) -> Result<Scopes, D::Error>
97where
98 D: de::Deserializer<'de>,
99{
100 let s = Cow::<str>::deserialize(deserializer)?;
101 if s == EMPTY_SCOPES {
102 return Ok(Scopes::empty());
103 }
104
105 let scopes = s
106 .split(';')
107 .map(|s| {
108 let (key, value) = s
109 .split_once("=")
110 .ok_or_else(|| de::Error::custom("scope must be 'key=value'"))?;
111
112 Scope::create(key, value).map_err(de::Error::custom)
113 })
114 .collect::<Result<_, _>>()?;
115
116 Ok(scopes)
117}
118
119impl FromRequestParts<ServiceState> for Xt<ObjectContext> {
120 type Rejection = ObjectRejection;
121
122 async fn from_request_parts(
123 parts: &mut Parts,
124 state: &ServiceState,
125 ) -> Result<Self, Self::Rejection> {
126 let Path(params) = Path::<ContextParams>::from_request_parts(parts, state).await?;
127 let context = ObjectContext {
128 usecase: params.usecase,
129 scopes: params.scopes,
130 };
131
132 populate_sentry_context(&context);
133
134 let service = DownstreamService::from_request_parts(parts, state)
135 .await
136 .unwrap();
137
138 if state
139 .config
140 .killswitches
141 .matches(&context, service.as_str())
142 {
143 tracing::debug!("Request rejected due to killswitches");
144 return Err(ObjectRejection::Killswitched);
145 }
146
147 if !state.rate_limiter.check(&context) {
148 tracing::debug!("Request rejected due to rate limits");
149 return Err(ObjectRejection::RateLimited);
150 }
151
152 Ok(Xt(context))
153 }
154}
155
156#[derive(Clone, Debug, Deserialize)]
161pub(super) struct ContextParams {
162 pub usecase: String,
163 #[serde(deserialize_with = "deserialize_scopes")]
164 pub scopes: Scopes,
165}
166
167fn populate_sentry_context(context: &ObjectContext) {
168 sentry::configure_scope(|s| {
169 s.set_tag("usecase", &context.usecase);
170 for scope in &context.scopes {
171 s.set_tag(&format!("scope.{}", scope.name()), scope.value());
172 }
173 });
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use serde::de::IntoDeserializer;
180 use serde::de::value::{CowStrDeserializer, Error as DeError};
181 use std::borrow::Cow;
182
183 fn deser_scopes(input: &str) -> Result<Scopes, DeError> {
184 let deserializer: CowStrDeserializer<DeError> = Cow::Borrowed(input).into_deserializer();
185 deserialize_scopes(deserializer)
186 }
187
188 #[test]
189 fn parse_single_scope() {
190 let scopes = deser_scopes("org=123").unwrap();
191 assert_eq!(scopes.get_value("org"), Some("123"));
192 }
193
194 #[test]
195 fn parse_multiple_scopes() {
196 let scopes = deser_scopes("org=123;project=456").unwrap();
197 assert_eq!(scopes.get_value("org"), Some("123"));
198 assert_eq!(scopes.get_value("project"), Some("456"));
199 }
200
201 #[test]
202 fn parse_empty_scopes() {
203 let scopes = deser_scopes("_").unwrap();
204 assert!(scopes.is_empty());
205 }
206
207 #[test]
208 fn parse_missing_equals() {
209 let result = deser_scopes("org123");
210 assert!(result.is_err());
211 }
212
213 #[test]
214 fn parse_invalid_scope_chars() {
215 let result = deser_scopes("org=hello world");
216 assert!(result.is_err());
217 }
218
219 #[test]
220 fn parse_empty_key_or_value() {
221 assert!(deser_scopes("=value").is_err());
222 assert!(deser_scopes("key=").is_err());
223 }
224
225 use std::collections::BTreeMap;
228 use std::sync::Arc;
229
230 use axum::Router;
231 use axum::body::Body;
232 use axum::http::{Request, StatusCode};
233 use axum::routing::{get, post};
234 use objectstore_service::{StorageConfig, StorageService};
235 use tempfile::TempDir;
236 use tower::ServiceExt;
237
238 use crate::auth::PublicKeyDirectory;
239 use crate::config::{Config, Storage};
240 use crate::killswitches::{Killswitch, Killswitches};
241 use crate::rate_limits::{RateLimiter, RateLimits, ThroughputLimits};
242 use crate::state::{ServiceState, Services};
243 use crate::web::RequestCounter;
244
245 async fn test_state(mut config: Config) -> (ServiceState, TempDir) {
246 let tempdir = TempDir::new().unwrap();
247 config.high_volume_storage = Storage::FileSystem {
248 path: tempdir.path().join("high-volume"),
249 };
250 config.long_term_storage = Storage::FileSystem {
251 path: tempdir.path().join("long-term"),
252 };
253
254 let fs_config = StorageConfig::FileSystem {
255 path: tempdir.path(),
256 };
257 let service = StorageService::new(fs_config.clone(), fs_config)
258 .await
259 .unwrap();
260 let key_directory = PublicKeyDirectory::try_from(&config.auth).unwrap();
261 let rate_limiter = RateLimiter::new(config.rate_limits.clone());
262
263 let state = Arc::new(Services {
264 config,
265 service,
266 key_directory,
267 rate_limiter,
268 request_counter: RequestCounter::new(0),
269 });
270 (state, tempdir)
271 }
272
273 async fn handle_object_id(Xt(id): Xt<ObjectId>) -> String {
274 format!(
275 "usecase={} key={} scopes_empty={}",
276 id.context().usecase,
277 id.key(),
278 id.context().scopes.is_empty(),
279 )
280 }
281
282 async fn handle_object_context(Xt(ctx): Xt<ObjectContext>) -> String {
283 format!(
284 "usecase={} scopes_empty={}",
285 ctx.usecase,
286 ctx.scopes.is_empty(),
287 )
288 }
289
290 fn test_router(state: ServiceState) -> Router {
291 Router::new()
292 .route("/objects/{usecase}/{scopes}/{*key}", get(handle_object_id))
293 .route("/objects/{usecase}/{scopes}/", post(handle_object_context))
294 .with_state(state)
295 }
296
297 async fn response_body(response: axum::http::Response<Body>) -> String {
298 let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
299 .await
300 .unwrap();
301 String::from_utf8(bytes.to_vec()).unwrap()
302 }
303
304 #[tokio::test]
307 async fn extract_object_id_parses_path() {
308 let (state, _tempdir) = test_state(Config::default()).await;
309 let app = test_router(state);
310
311 let request = Request::builder()
312 .uri("/objects/myusecase/org=123;project=456/my-key")
313 .body(Body::empty())
314 .unwrap();
315 let response = app.oneshot(request).await.unwrap();
316
317 assert_eq!(response.status(), StatusCode::OK);
318 let body = response_body(response).await;
319 assert!(body.contains("usecase=myusecase"));
320 assert!(body.contains("key=my-key"));
321 assert!(body.contains("scopes_empty=false"));
322 }
323
324 #[tokio::test]
325 async fn extract_object_id_with_empty_scopes() {
326 let (state, _tempdir) = test_state(Config::default()).await;
327 let app = test_router(state);
328
329 let request = Request::builder()
330 .uri("/objects/myusecase/_/my-key")
331 .body(Body::empty())
332 .unwrap();
333 let response = app.oneshot(request).await.unwrap();
334
335 assert_eq!(response.status(), StatusCode::OK);
336 let body = response_body(response).await;
337 assert!(body.contains("scopes_empty=true"));
338 }
339
340 #[tokio::test]
341 async fn extract_object_context_parses_path() {
342 let (state, _tempdir) = test_state(Config::default()).await;
343 let app = test_router(state);
344
345 let request = Request::builder()
346 .method("POST")
347 .uri("/objects/myusecase/org=123;project=456/")
348 .body(Body::empty())
349 .unwrap();
350 let response = app.oneshot(request).await.unwrap();
351
352 assert_eq!(response.status(), StatusCode::OK);
353 let body = response_body(response).await;
354 assert!(body.contains("usecase=myusecase"));
355 assert!(body.contains("scopes_empty=false"));
356 }
357
358 #[tokio::test]
359 async fn extract_object_context_with_empty_scopes() {
360 let (state, _tempdir) = test_state(Config::default()).await;
361 let app = test_router(state);
362
363 let request = Request::builder()
364 .method("POST")
365 .uri("/objects/myusecase/_/")
366 .body(Body::empty())
367 .unwrap();
368 let response = app.oneshot(request).await.unwrap();
369
370 assert_eq!(response.status(), StatusCode::OK);
371 let body = response_body(response).await;
372 assert!(body.contains("scopes_empty=true"));
373 }
374
375 #[tokio::test]
376 async fn extract_object_id_invalid_scopes() {
377 let (state, _tempdir) = test_state(Config::default()).await;
378 let app = test_router(state);
379
380 let request = Request::builder()
381 .uri("/objects/myusecase/invalid-no-equals/key")
382 .body(Body::empty())
383 .unwrap();
384 let response = app.oneshot(request).await.unwrap();
385
386 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
387 }
388
389 #[tokio::test]
392 async fn extract_object_id_killswitched() {
393 let config = Config {
394 killswitches: Killswitches(vec![Killswitch {
395 usecase: Some("blocked".into()),
396 scopes: BTreeMap::new(),
397 service: None,
398 service_matcher: Default::default(),
399 }]),
400 ..Config::default()
401 };
402 let (state, _tempdir) = test_state(config).await;
403 let app = test_router(state);
404
405 let request = Request::builder()
406 .uri("/objects/blocked/org=1/key")
407 .body(Body::empty())
408 .unwrap();
409 let response = app.clone().oneshot(request).await.unwrap();
410 assert_eq!(response.status(), StatusCode::FORBIDDEN);
411
412 let request = Request::builder()
413 .uri("/objects/allowed/org=1/key")
414 .body(Body::empty())
415 .unwrap();
416 let response = app.oneshot(request).await.unwrap();
417 assert_eq!(response.status(), StatusCode::OK);
418 }
419
420 #[tokio::test]
421 async fn extract_object_context_killswitched() {
422 let config = Config {
423 killswitches: Killswitches(vec![Killswitch {
424 usecase: Some("blocked".into()),
425 scopes: BTreeMap::new(),
426 service: None,
427 service_matcher: Default::default(),
428 }]),
429 ..Config::default()
430 };
431 let (state, _tempdir) = test_state(config).await;
432 let app = test_router(state);
433
434 let request = Request::builder()
435 .method("POST")
436 .uri("/objects/blocked/org=1/")
437 .body(Body::empty())
438 .unwrap();
439 let response = app.clone().oneshot(request).await.unwrap();
440 assert_eq!(response.status(), StatusCode::FORBIDDEN);
441
442 let request = Request::builder()
443 .method("POST")
444 .uri("/objects/allowed/org=1/")
445 .body(Body::empty())
446 .unwrap();
447 let response = app.oneshot(request).await.unwrap();
448 assert_eq!(response.status(), StatusCode::OK);
449 }
450
451 #[tokio::test]
452 async fn extract_object_id_killswitched_with_service() {
453 let config = Config {
454 killswitches: Killswitches(vec![Killswitch {
455 usecase: None,
456 scopes: BTreeMap::new(),
457 service: Some("test-*".into()),
458 service_matcher: Default::default(),
459 }]),
460 ..Config::default()
461 };
462 let (state, _tempdir) = test_state(config).await;
463 let app = test_router(state);
464
465 let request = Request::builder()
467 .uri("/objects/any/org=1/key")
468 .header("x-downstream-service", "test-service")
469 .body(Body::empty())
470 .unwrap();
471 let response = app.clone().oneshot(request).await.unwrap();
472 assert_eq!(response.status(), StatusCode::FORBIDDEN);
473
474 let request = Request::builder()
476 .uri("/objects/any/org=1/key")
477 .header("x-downstream-service", "other-service")
478 .body(Body::empty())
479 .unwrap();
480 let response = app.clone().oneshot(request).await.unwrap();
481 assert_eq!(response.status(), StatusCode::OK);
482
483 let request = Request::builder()
485 .uri("/objects/any/org=1/key")
486 .body(Body::empty())
487 .unwrap();
488 let response = app.oneshot(request).await.unwrap();
489 assert_eq!(response.status(), StatusCode::OK);
490 }
491
492 #[tokio::test]
495 async fn extract_object_id_rate_limited() {
496 let config = Config {
497 rate_limits: RateLimits {
498 throughput: ThroughputLimits {
499 global_rps: Some(1),
500 burst: 0,
501 ..ThroughputLimits::default()
502 },
503 ..RateLimits::default()
504 },
505 ..Config::default()
506 };
507 let (state, _tempdir) = test_state(config).await;
508 let app = test_router(state);
509
510 let request = Request::builder()
511 .uri("/objects/test/org=1/key")
512 .body(Body::empty())
513 .unwrap();
514 let response = app.clone().oneshot(request).await.unwrap();
515 assert_eq!(response.status(), StatusCode::OK);
516
517 let request = Request::builder()
518 .uri("/objects/test/org=1/key")
519 .body(Body::empty())
520 .unwrap();
521 let response = app.oneshot(request).await.unwrap();
522 assert_eq!(response.status(), StatusCode::TOO_MANY_REQUESTS);
523 }
524
525 #[tokio::test]
526 async fn extract_object_context_rate_limited() {
527 let config = Config {
528 rate_limits: RateLimits {
529 throughput: ThroughputLimits {
530 global_rps: Some(1),
531 burst: 0,
532 ..ThroughputLimits::default()
533 },
534 ..RateLimits::default()
535 },
536 ..Config::default()
537 };
538 let (state, _tempdir) = test_state(config).await;
539 let app = test_router(state);
540
541 let request = Request::builder()
542 .method("POST")
543 .uri("/objects/test/org=1/")
544 .body(Body::empty())
545 .unwrap();
546 let response = app.clone().oneshot(request).await.unwrap();
547 assert_eq!(response.status(), StatusCode::OK);
548
549 let request = Request::builder()
550 .method("POST")
551 .uri("/objects/test/org=1/")
552 .body(Body::empty())
553 .unwrap();
554 let response = app.oneshot(request).await.unwrap();
555 assert_eq!(response.status(), StatusCode::TOO_MANY_REQUESTS);
556 }
557}