objectstore_client/
client.rs1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use url::Url;
9
10pub use objectstore_types::Compression;
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
15struct ClientBuilderInner {
16 service_url: Url,
17 propagate_traces: bool,
18 reqwest_builder: reqwest::ClientBuilder,
19}
20
21impl ClientBuilderInner {
22 fn apply_defaults(mut self) -> Self {
24 self.reqwest_builder = self
25 .reqwest_builder
26 .no_brotli()
29 .no_deflate()
30 .no_gzip()
31 .no_zstd();
32 self
33 }
34}
35
36#[must_use = "call .build() on this ClientBuilder to create a Client"]
38#[derive(Debug)]
39pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
40
41impl ClientBuilder {
42 pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
47 let service_url = match service_url.into_url() {
48 Ok(url) => url,
49 Err(err) => return Self(Err(err.into())),
50 };
51 if service_url.cannot_be_a_base() {
52 return ClientBuilder(Err(crate::Error::InvalidUrl {
53 message: "service_url cannot be a base".to_owned(),
54 }));
55 }
56
57 let reqwest_builder = reqwest::Client::builder()
58 .connect_timeout(Duration::from_millis(500))
64 .read_timeout(Duration::from_millis(500))
65 .user_agent(USER_AGENT);
66
67 Self(Ok(ClientBuilderInner {
68 service_url,
69 propagate_traces: false,
70 reqwest_builder,
71 }))
72 }
73
74 pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
79 if let Ok(ref mut inner) = self.0 {
80 inner.propagate_traces = propagate_traces;
81 }
82 self
83 }
84
85 pub fn timeout(self, timeout: Duration) -> Self {
90 let Ok(mut inner) = self.0 else { return self };
91 inner.reqwest_builder = inner
92 .reqwest_builder
93 .connect_timeout(timeout)
94 .read_timeout(timeout);
95 Self(Ok(inner))
96 }
97
98 pub fn configure_reqwest<F>(self, closure: F) -> Self
102 where
103 F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
104 {
105 let Ok(mut inner) = self.0 else { return self };
106 inner.reqwest_builder = closure(inner.reqwest_builder);
107 Self(Ok(inner))
108 }
109
110 pub fn build(self) -> crate::Result<Client> {
119 let inner = self.0?.apply_defaults();
120
121 Ok(Client {
122 inner: Arc::new(ClientInner {
123 reqwest: inner.reqwest_builder.build()?,
124 service_url: inner.service_url,
125 propagate_traces: inner.propagate_traces,
126 }),
127 })
128 }
129}
130
131#[derive(Debug, Clone)]
138pub struct Usecase {
139 name: Arc<str>,
140 compression: Compression,
141 expiration_policy: ExpirationPolicy,
142}
143
144impl Usecase {
145 pub fn new(name: &str) -> Self {
147 Self {
148 name: name.into(),
149 compression: Compression::Zstd,
150 expiration_policy: Default::default(),
151 }
152 }
153
154 #[inline]
156 pub fn name(&self) -> &str {
157 &self.name
158 }
159
160 #[inline]
162 pub fn compression(&self) -> Compression {
163 self.compression
164 }
165
166 pub fn with_compression(self, compression: Compression) -> Self {
172 Self {
173 compression,
174 ..self
175 }
176 }
177
178 #[inline]
180 pub fn expiration_policy(&self) -> ExpirationPolicy {
181 self.expiration_policy
182 }
183
184 pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
191 Self {
192 expiration_policy,
193 ..self
194 }
195 }
196
197 pub fn scope(&self) -> Scope {
204 Scope::new(self.clone())
205 }
206
207 pub fn for_organization(&self, organization: u64) -> Scope {
209 Scope::for_organization(self.clone(), organization)
210 }
211
212 pub fn for_project(&self, organization: u64, project: u64) -> Scope {
214 Scope::for_project(self.clone(), organization, project)
215 }
216}
217
218#[derive(Debug)]
219pub(crate) struct ScopeInner {
220 usecase: Usecase,
221 scope: String,
222}
223
224impl ScopeInner {
225 #[inline]
226 pub(crate) fn usecase(&self) -> &Usecase {
227 &self.usecase
228 }
229}
230
231impl std::fmt::Display for ScopeInner {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 f.write_str(&self.scope)
234 }
235}
236
237#[derive(Debug)]
243pub struct Scope(crate::Result<ScopeInner>);
244
245impl Scope {
246 pub fn new(usecase: Usecase) -> Self {
250 Self(Ok(ScopeInner {
251 usecase,
252 scope: String::new(),
253 }))
254 }
255
256 fn for_organization(usecase: Usecase, organization: u64) -> Self {
257 let scope = format!("org.{}", organization);
258 Self(Ok(ScopeInner { usecase, scope }))
259 }
260
261 fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
262 let scope = format!("org.{}/project.{}", organization, project);
263 Self(Ok(ScopeInner { usecase, scope }))
264 }
265
266 pub fn push<V>(self, key: &str, value: V) -> Self
268 where
269 V: std::fmt::Display,
270 {
271 let result = self.0.and_then(|mut inner| {
272 Self::validate_key(key)?;
273
274 let value = value.to_string();
275 Self::validate_value(&value)?;
276
277 if !inner.scope.is_empty() {
278 inner.scope.push('/');
279 }
280 inner.scope.push_str(key);
281 inner.scope.push('.');
282 inner.scope.push_str(&value);
283
284 Ok(inner)
285 });
286
287 Self(result)
288 }
289
290 const ALLOWED_CHARS: &[u8] =
294 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-()$!+'";
295
296 fn validate_key(key: &str) -> crate::Result<()> {
298 if key.is_empty() {
299 return Err(crate::Error::InvalidScope {
300 message: "Scope key cannot be empty".to_string(),
301 });
302 }
303 if key.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
304 Ok(())
305 } else {
306 Err(crate::Error::InvalidScope {
307 message: format!("Invalid scope key '{key}'."),
308 })
309 }
310 }
311
312 fn validate_value(value: &str) -> crate::Result<()> {
314 if value.is_empty() {
315 return Err(crate::Error::InvalidScope {
316 message: "Scope value cannot be empty".to_string(),
317 });
318 }
319 if value.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
320 Ok(())
321 } else {
322 Err(crate::Error::InvalidScope {
323 message: format!("Invalid scope value '{value}'."),
324 })
325 }
326 }
327
328 pub fn session(self, client: &Client) -> crate::Result<Session> {
334 client.session(self)
335 }
336}
337
338#[derive(Debug)]
339pub(crate) struct ClientInner {
340 reqwest: reqwest::Client,
341 service_url: Url,
342 propagate_traces: bool,
343}
344
345#[derive(Debug, Clone)]
371pub struct Client {
372 inner: Arc<ClientInner>,
373}
374
375impl Client {
376 pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
385 ClientBuilder::new(service_url).build()
386 }
387
388 pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
390 ClientBuilder::new(service_url)
391 }
392
393 pub fn session(&self, scope: Scope) -> crate::Result<Session> {
399 scope.0.map(|inner| Session {
400 scope: inner.into(),
401 client: self.inner.clone(),
402 })
403 }
404}
405
406#[derive(Debug, Clone)]
410pub struct Session {
411 pub(crate) scope: Arc<ScopeInner>,
412 pub(crate) client: Arc<ClientInner>,
413}
414
415pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
417
418impl Session {
419 pub fn object_url(&self, object_key: &str) -> Url {
425 let mut url = self.client.service_url.clone();
426
427 let mut segments = url.path_segments_mut().unwrap();
430 segments
431 .push("v1")
432 .extend(self.scope.usecase.name.split("/"));
433 if !self.scope.scope.is_empty() {
434 segments.extend(self.scope.scope.split("/"));
435 }
436 segments.push("objects").extend(object_key.split("/"));
437 drop(segments);
438
439 url
440 }
441
442 pub(crate) fn request(
443 &self,
444 method: reqwest::Method,
445 object_key: &str,
446 ) -> reqwest::RequestBuilder {
447 let url = self.object_url(object_key);
448
449 let mut builder = self.client.reqwest.request(method, url);
450
451 if self.client.propagate_traces {
452 let trace_headers =
453 sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
454 for (header_name, value) in trace_headers.into_iter().flatten() {
455 builder = builder.header(header_name, value);
456 }
457 }
458
459 builder
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_object_url() {
469 let client = Client::new("http://127.0.0.1:8888/").unwrap();
470 let usecase = Usecase::new("testing");
471 let scope = usecase
472 .for_project(12345, 1337)
473 .push("app_slug", "email_app");
474 let session = client.session(scope).unwrap();
475
476 assert_eq!(
477 session.object_url("foo/bar").to_string(),
478 "http://127.0.0.1:8888/v1/testing/org.12345/project.1337/app_slug.email_app/objects/foo/bar"
479 )
480 }
481
482 #[test]
483 fn test_object_url_with_base_path() {
484 let client = Client::new("http://127.0.0.1:8888/api/prefix").unwrap();
485 let usecase = Usecase::new("testing");
486 let scope = usecase.for_project(12345, 1337);
487 let session = client.session(scope).unwrap();
488
489 assert_eq!(
490 session.object_url("foo/bar").to_string(),
491 "http://127.0.0.1:8888/api/prefix/v1/testing/org.12345/project.1337/objects/foo/bar"
492 )
493 }
494}