objectstore_client/
client.rs

1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::metadata::{Compression, ExpirationPolicy};
8use objectstore_types::scope;
9use reqwest::RequestBuilder;
10use url::Url;
11
12use crate::auth::TokenProvider;
13
14const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
15
16#[derive(Debug)]
17struct ClientBuilderInner {
18    service_url: Url,
19    propagate_traces: bool,
20    reqwest_builder: reqwest::ClientBuilder,
21    token: Option<TokenProvider>,
22}
23
24impl ClientBuilderInner {
25    /// Applies defaults that cannot be overridden by the caller.
26    fn apply_defaults(mut self) -> Self {
27        self.reqwest_builder = self
28            .reqwest_builder
29            // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
30            // we are dealing with de/compression ourselves:
31            .no_brotli()
32            .no_deflate()
33            .no_gzip()
34            .no_zstd();
35        self
36    }
37}
38
39/// Builder to create a [`Client`].
40#[must_use = "call .build() on this ClientBuilder to create a Client"]
41#[derive(Debug)]
42pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
43
44impl ClientBuilder {
45    /// Creates a new [`ClientBuilder`], configured with the given `service_url`.
46    ///
47    /// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
48    /// and Scope in order to create a [`Session`].
49    pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
50        let service_url = match service_url.into_url() {
51            Ok(url) => url,
52            Err(err) => return Self(Err(err.into())),
53        };
54        if service_url.cannot_be_a_base() {
55            return ClientBuilder(Err(crate::Error::InvalidUrl {
56                message: "service_url cannot be a base".to_owned(),
57            }));
58        }
59
60        let reqwest_builder = reqwest::Client::builder()
61            // We define just a connection timeout by default but do not limit reads. A connect
62            // timeout of 100ms is still very conservative, but should provide a sensible upper
63            // bound to expected request latencies.
64            .connect_timeout(Duration::from_millis(100))
65            .user_agent(USER_AGENT);
66
67        Self(Ok(ClientBuilderInner {
68            service_url,
69            propagate_traces: false,
70            reqwest_builder,
71            token: None,
72        }))
73    }
74
75    /// Changes whether the `sentry-trace` header will be sent to Objectstore
76    /// to take advantage of Sentry's distributed tracing.
77    ///
78    /// By default, tracing headers will not be propagated.
79    pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
80        if let Ok(ref mut inner) = self.0 {
81            inner.propagate_traces = propagate_traces;
82        }
83        self
84    }
85
86    /// Defines a read timeout for the [`reqwest::Client`].
87    ///
88    /// The read timeout is defined to be "between consecutive read operations", for example between
89    /// chunks of a streaming response. For more fine-grained configuration of this and other
90    /// timeouts, use [`Self::configure_reqwest`].
91    ///
92    /// By default, no read timeout and a connect timeout of 100ms is set.
93    pub fn timeout(self, timeout: Duration) -> Self {
94        let Ok(mut inner) = self.0 else { return self };
95        inner.reqwest_builder = inner.reqwest_builder.read_timeout(timeout);
96        Self(Ok(inner))
97    }
98
99    /// Calls the closure with the underlying [`reqwest::ClientBuilder`].
100    ///
101    /// By default, the ClientBuilder is configured to create a reqwest Client with a connect and read timeout of 500ms and a user agent identifying this library.
102    pub fn configure_reqwest<F>(self, closure: F) -> Self
103    where
104        F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
105    {
106        let Ok(mut inner) = self.0 else { return self };
107        inner.reqwest_builder = closure(inner.reqwest_builder);
108        Self(Ok(inner))
109    }
110
111    /// Sets the authentication token to use for requests to Objectstore.
112    ///
113    /// Accepts anything that implements `Into<TokenProvider>`:
114    /// - A [`TokenGenerator`](crate::TokenGenerator) — for internal services that have access to
115    ///   an EdDSA keypair. The generator signs a fresh JWT for each request.
116    /// - A `String` or `&str` — a pre-signed JWT, used as-is for every request.
117    pub fn token(self, token: impl Into<TokenProvider>) -> Self {
118        let Ok(mut inner) = self.0 else { return self };
119        inner.token = Some(token.into());
120        Self(Ok(inner))
121    }
122
123    /// Returns a [`Client`] that uses this [`ClientBuilder`] configuration.
124    ///
125    /// # Errors
126    ///
127    /// This method fails if:
128    /// - the given `service_url` is invalid or cannot be used as a base URL
129    /// - the [`reqwest::Client`] fails to build. Refer to [`reqwest::ClientBuilder::build`] for
130    ///   more information on when this can happen.
131    pub fn build(self) -> crate::Result<Client> {
132        let inner = self.0?.apply_defaults();
133
134        Ok(Client {
135            inner: Arc::new(ClientInner {
136                reqwest: inner.reqwest_builder.build()?,
137                service_url: inner.service_url,
138                propagate_traces: inner.propagate_traces,
139                token: inner.token,
140            }),
141        })
142    }
143}
144
145/// An identifier for a workload in Objectstore, along with defaults to use for all
146/// operations within that Usecase.
147///
148/// Usecases need to be statically defined in Objectstore's configuration server-side.
149/// Objectstore can make decisions based on the Usecase. For example, choosing the most
150/// suitable storage backend.
151#[derive(Debug, Clone)]
152pub struct Usecase {
153    name: Arc<str>,
154    compression: Compression,
155    expiration_policy: ExpirationPolicy,
156}
157
158impl Usecase {
159    /// Creates a new Usecase.
160    pub fn new(name: &str) -> Self {
161        Self {
162            name: name.into(),
163            compression: Compression::Zstd,
164            expiration_policy: Default::default(),
165        }
166    }
167
168    /// Returns the name of this usecase.
169    #[inline]
170    pub fn name(&self) -> &str {
171        &self.name
172    }
173
174    /// Returns the compression algorithm to use for operations within this usecase.
175    #[inline]
176    pub fn compression(&self) -> Compression {
177        self.compression
178    }
179
180    /// Sets the compression algorithm to use for operations within this usecase.
181    ///
182    /// It's still possible to override this default on each operation's builder.
183    ///
184    /// By default, [`Compression::Zstd`] is used.
185    pub fn with_compression(self, compression: Compression) -> Self {
186        Self {
187            compression,
188            ..self
189        }
190    }
191
192    /// Returns the expiration policy to use by default for operations within this usecase.
193    #[inline]
194    pub fn expiration_policy(&self) -> ExpirationPolicy {
195        self.expiration_policy
196    }
197
198    /// Sets the expiration policy to use for operations within this usecase.
199    ///
200    /// It's still possible to override this default on each operation's builder.
201    ///
202    /// By default, [`ExpirationPolicy::Manual`] is used, meaning that objects won't automatically
203    /// expire.
204    pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
205        Self {
206            expiration_policy,
207            ..self
208        }
209    }
210
211    /// Creates a new custom [`Scope`].
212    ///
213    /// Add parts to it using [`Scope::push`].
214    ///
215    /// Generally, [`Usecase::for_organization`] and [`Usecase::for_project`] should fit most usecases,
216    /// so prefer using those methods rather than creating your own custom [`Scope`].
217    pub fn scope(&self) -> Scope {
218        Scope::new(self.clone())
219    }
220
221    /// Creates a new [`Scope`] tied to the given organization.
222    pub fn for_organization(&self, organization: u64) -> Scope {
223        Scope::for_organization(self.clone(), organization)
224    }
225
226    /// Creates a new [`Scope`] tied to the given organization and project.
227    pub fn for_project(&self, organization: u64, project: u64) -> Scope {
228        Scope::for_project(self.clone(), organization, project)
229    }
230}
231
232#[derive(Debug)]
233pub(crate) struct ScopeInner {
234    usecase: Usecase,
235    scopes: scope::Scopes,
236}
237
238impl ScopeInner {
239    #[inline]
240    pub(crate) fn usecase(&self) -> &Usecase {
241        &self.usecase
242    }
243
244    #[inline]
245    pub(crate) fn scopes(&self) -> &scope::Scopes {
246        &self.scopes
247    }
248}
249
250/// A [`Scope`] is a sequence of key-value pairs that defines a (possibly nested) namespace within a
251/// [`Usecase`].
252///
253/// To construct a [`Scope`], use [`Usecase::for_organization`], [`Usecase::for_project`], or
254/// [`Usecase::scope`] for custom scopes.
255#[derive(Debug)]
256pub struct Scope(pub(crate) crate::Result<ScopeInner>);
257
258impl Scope {
259    /// Creates a new root-level Scope for the given usecase.
260    ///
261    /// Using a custom Scope is discouraged, prefer using [`Usecase::for_organization`] or [`Usecase::for_project`] instead.
262    pub fn new(usecase: Usecase) -> Self {
263        Self(Ok(ScopeInner {
264            usecase,
265            scopes: scope::Scopes::empty(),
266        }))
267    }
268
269    fn for_organization(usecase: Usecase, organization: u64) -> Self {
270        Self::new(usecase).push("org", organization)
271    }
272
273    fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
274        Self::for_organization(usecase, organization).push("project", project)
275    }
276
277    /// Extends this Scope by creating a new sub-scope nested within it.
278    pub fn push<V>(self, key: &str, value: V) -> Self
279    where
280        V: std::fmt::Display,
281    {
282        let result = self.0.and_then(|mut inner| {
283            inner.scopes.push(key, value)?;
284            Ok(inner)
285        });
286
287        Self(result)
288    }
289
290    /// Creates a session for this scope using the given client.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
295    pub fn session(self, client: &Client) -> crate::Result<Session> {
296        client.session(self)
297    }
298}
299
300#[derive(Debug)]
301pub(crate) struct ClientInner {
302    reqwest: reqwest::Client,
303    service_url: Url,
304    propagate_traces: bool,
305    token: Option<TokenProvider>,
306}
307
308/// A client for Objectstore. Use [`Client::builder`] to configure and construct a Client.
309///
310/// To perform CRUD operations, one has to create a Client, and then scope it to a [`Usecase`]
311/// and Scope in order to create a [`Session`].
312///
313/// If your Objectstore instance enforces authorization checks, you must provide
314/// authentication via [`ClientBuilder::token`]. It accepts anything that converts
315/// into a [`TokenProvider`]:
316///
317/// - **[`TokenGenerator`](crate::TokenGenerator)** — for internal services that have access to
318///   an EdDSA keypair. The generator signs a fresh JWT for each request, scoped to the
319///   specific usecase and scope being accessed.
320/// - **`String` / `&str`** — a pre-signed JWT, used as-is for every request.
321///   Use this for external services that receive a token from another source.
322///
323/// # Examples
324///
325/// Internal service with a keypair:
326///
327/// ```no_run
328/// use std::time::Duration;
329/// use objectstore_client::{Client, SecretKey, TokenGenerator, Usecase};
330/// use objectstore_types::auth::Permission;
331///
332/// # async fn example() -> objectstore_client::Result<()> {
333/// let token_generator = TokenGenerator::new(SecretKey {
334///         secret_key: "<safely inject secret key>".into(),
335///         kid: "my-service".into(),
336///     })?
337///     .expiry_seconds(30)
338///     .permissions(&[Permission::ObjectRead]);
339///
340/// let client = Client::builder("http://localhost:8888/")
341///     .timeout(Duration::from_secs(1))
342///     .propagate_traces(true)
343///     .token(token_generator)
344///     .build()?;
345/// # Ok(())
346/// # }
347/// ```
348///
349/// External service with a pre-signed JWT (obtained via
350/// [`TokenGenerator::sign`](crate::TokenGenerator::sign)):
351///
352/// ```no_run
353/// use objectstore_client::{Client, SecretKey, TokenGenerator, Usecase};
354///
355/// # fn example() -> objectstore_client::Result<()> {
356/// let scope = Usecase::new("my_app").for_project(42, 1337);
357/// let token = TokenGenerator::new(SecretKey {
358///     secret_key: "<private key>".into(),
359///     kid: "my-service".into(),
360/// })?.sign(&scope)?;
361///
362/// let client = Client::builder("http://localhost:8888/")
363///     .token(token)
364///     .build()?;
365/// # Ok(())
366/// # }
367/// ```
368#[derive(Debug, Clone)]
369pub struct Client {
370    inner: Arc<ClientInner>,
371}
372
373impl Client {
374    /// Creates a new [`Client`], configured with the given `service_url` and default
375    /// configuration.
376    ///
377    /// Use [`Client::builder`] for more fine-grained configuration.
378    ///
379    /// # Errors
380    ///
381    /// This method fails if [`ClientBuilder::build`] fails.
382    pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
383        ClientBuilder::new(service_url).build()
384    }
385
386    /// Convenience function to create a [`ClientBuilder`].
387    pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
388        ClientBuilder::new(service_url)
389    }
390
391    /// Creates a session for the given scope using this client.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
396    pub fn session(&self, scope: Scope) -> crate::Result<Session> {
397        scope.0.map(|inner| Session {
398            scope: inner.into(),
399            client: self.inner.clone(),
400        })
401    }
402}
403
404/// Represents a session with Objectstore, tied to a specific Usecase and Scope within it.
405///
406/// Create a Session using [`Client::session`] or [`Scope::session`].
407#[derive(Debug, Clone)]
408pub struct Session {
409    pub(crate) scope: Arc<ScopeInner>,
410    pub(crate) client: Arc<ClientInner>,
411}
412
413/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
414pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
415
416impl Session {
417    /// Generates a GET url to the object with the given `key`.
418    ///
419    /// This can then be used by downstream services to fetch the given object.
420    /// NOTE however that the service does not strictly follow HTTP semantics,
421    /// in particular in relation to `Accept-Encoding`.
422    pub fn object_url(&self, object_key: &str) -> Url {
423        let mut url = self.client.service_url.clone();
424
425        // `path_segments_mut` can only error if the url is cannot-be-a-base,
426        // and we check that in `ClientBuilder::new`, therefore this will never panic.
427        let mut segments = url.path_segments_mut().unwrap();
428        segments
429            .push("v1")
430            .push("objects")
431            .push(&self.scope.usecase.name)
432            .push(&self.scope.scopes.as_api_path().to_string())
433            .extend(object_key.split("/"));
434        drop(segments);
435
436        url
437    }
438
439    fn batch_url(&self) -> Url {
440        let mut url = self.client.service_url.clone();
441
442        // `path_segments_mut` can only error if the url is cannot-be-a-base,
443        // and we check that in `ClientBuilder::new`, therefore this will never panic.
444        let mut segments = url.path_segments_mut().unwrap();
445        segments
446            .push("v1")
447            .push("objects:batch")
448            .push(self.scope.usecase().name())
449            .push(&self.scope.scopes.as_api_path().to_string())
450            .push(""); // trailing slash
451        drop(segments);
452
453        url
454    }
455
456    fn prepare_builder(&self, mut builder: RequestBuilder) -> crate::Result<RequestBuilder> {
457        match &self.client.token {
458            Some(TokenProvider::Generator(generator)) => {
459                let token = generator.sign_for_scope(&self.scope)?;
460                builder = builder.bearer_auth(token);
461            }
462            Some(TokenProvider::Static(token)) => {
463                builder = builder.bearer_auth(token);
464            }
465            None => {}
466        }
467        if self.client.propagate_traces {
468            let trace_headers =
469                sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
470            for (header_name, value) in trace_headers.into_iter().flatten() {
471                builder = builder.header(header_name, value);
472            }
473        }
474        Ok(builder)
475    }
476
477    pub(crate) fn request(
478        &self,
479        method: reqwest::Method,
480        object_key: &str,
481    ) -> crate::Result<RequestBuilder> {
482        let url = self.object_url(object_key);
483        let builder = self.client.reqwest.request(method, url);
484        self.prepare_builder(builder)
485    }
486
487    pub(crate) fn batch_request(&self) -> crate::Result<RequestBuilder> {
488        let url = self.batch_url();
489        let builder = self.client.reqwest.post(url);
490        self.prepare_builder(builder)
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn test_object_url() {
500        let client = Client::new("http://127.0.0.1:8888/").unwrap();
501        let usecase = Usecase::new("testing");
502        let scope = usecase
503            .for_project(12345, 1337)
504            .push("app_slug", "email_app");
505        let session = client.session(scope).unwrap();
506
507        assert_eq!(
508            session.object_url("foo/bar").to_string(),
509            "http://127.0.0.1:8888/v1/objects/testing/org=12345;project=1337;app_slug=email_app/foo/bar"
510        )
511    }
512
513    #[test]
514    fn test_object_url_with_base_path() {
515        let client = Client::new("http://127.0.0.1:8888/api/prefix").unwrap();
516        let usecase = Usecase::new("testing");
517        let scope = usecase.for_project(12345, 1337);
518        let session = client.session(scope).unwrap();
519
520        assert_eq!(
521            session.object_url("foo/bar").to_string(),
522            "http://127.0.0.1:8888/api/prefix/v1/objects/testing/org=12345;project=1337/foo/bar"
523        )
524    }
525}