objectstore_client/
client.rs

1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::{Compression, ExpirationPolicy, scope};
8use url::Url;
9
10use crate::auth::TokenGenerator;
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
15struct ClientBuilderInner {
16    service_url: Url,
17    propagate_traces: bool,
18    reqwest_builder: reqwest::ClientBuilder,
19    token_generator: Option<TokenGenerator>,
20}
21
22impl ClientBuilderInner {
23    /// Applies defaults that cannot be overridden by the caller.
24    fn apply_defaults(mut self) -> Self {
25        self.reqwest_builder = self
26            .reqwest_builder
27            // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
28            // we are dealing with de/compression ourselves:
29            .no_brotli()
30            .no_deflate()
31            .no_gzip()
32            .no_zstd();
33        self
34    }
35}
36
37/// Builder to create a [`Client`].
38#[must_use = "call .build() on this ClientBuilder to create a Client"]
39#[derive(Debug)]
40pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
41
42impl ClientBuilder {
43    /// Creates a new [`ClientBuilder`], configured with the given `service_url`.
44    ///
45    /// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
46    /// and Scope in order to create a [`Session`].
47    pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
48        let service_url = match service_url.into_url() {
49            Ok(url) => url,
50            Err(err) => return Self(Err(err.into())),
51        };
52        if service_url.cannot_be_a_base() {
53            return ClientBuilder(Err(crate::Error::InvalidUrl {
54                message: "service_url cannot be a base".to_owned(),
55            }));
56        }
57
58        let reqwest_builder = reqwest::Client::builder()
59            // We define just a connection timeout by default but do not limit reads. A connect
60            // timeout of 100ms is still very conservative, but should provide a sensible upper
61            // bound to expected request latencies.
62            .connect_timeout(Duration::from_millis(100))
63            .user_agent(USER_AGENT);
64
65        Self(Ok(ClientBuilderInner {
66            service_url,
67            propagate_traces: false,
68            reqwest_builder,
69            token_generator: None,
70        }))
71    }
72
73    /// Changes whether the `sentry-trace` header will be sent to Objectstore
74    /// to take advantage of Sentry's distributed tracing.
75    ///
76    /// By default, tracing headers will not be propagated.
77    pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
78        if let Ok(ref mut inner) = self.0 {
79            inner.propagate_traces = propagate_traces;
80        }
81        self
82    }
83
84    /// Defines a read timeout for the [`reqwest::Client`].
85    ///
86    /// The read timeout is defined to be "between consecutive read operations", for example between
87    /// chunks of a streaming response. For more fine-grained configuration of this and other
88    /// timeouts, use [`Self::configure_reqwest`].
89    ///
90    /// By default, no read timeout and a connect timeout of 100ms is set.
91    pub fn timeout(self, timeout: Duration) -> Self {
92        let Ok(mut inner) = self.0 else { return self };
93        inner.reqwest_builder = inner.reqwest_builder.read_timeout(timeout);
94        Self(Ok(inner))
95    }
96
97    /// Calls the closure with the underlying [`reqwest::ClientBuilder`].
98    ///
99    /// By default, the ClientBuilder is configured to create a reqwest Client with a connect and read timeout of 500ms and a user agent identifying this library.
100    pub fn configure_reqwest<F>(self, closure: F) -> Self
101    where
102        F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
103    {
104        let Ok(mut inner) = self.0 else { return self };
105        inner.reqwest_builder = closure(inner.reqwest_builder);
106        Self(Ok(inner))
107    }
108
109    /// Sets a [`TokenGenerator`] that will be used to sign authorization tokens before
110    /// sending requests to Objectstore.
111    pub fn token_generator(self, token_generator: TokenGenerator) -> Self {
112        let Ok(mut inner) = self.0 else { return self };
113        inner.token_generator = Some(token_generator);
114        Self(Ok(inner))
115    }
116
117    /// Returns a [`Client`] that uses this [`ClientBuilder`] configuration.
118    ///
119    /// # Errors
120    ///
121    /// This method fails if:
122    /// - the given `service_url` is invalid or cannot be used as a base URL
123    /// - the [`reqwest::Client`] fails to build. Refer to [`reqwest::ClientBuilder::build`] for
124    ///   more information on when this can happen.
125    pub fn build(self) -> crate::Result<Client> {
126        let inner = self.0?.apply_defaults();
127
128        Ok(Client {
129            inner: Arc::new(ClientInner {
130                reqwest: inner.reqwest_builder.build()?,
131                service_url: inner.service_url,
132                propagate_traces: inner.propagate_traces,
133                token_generator: inner.token_generator,
134            }),
135        })
136    }
137}
138
139/// An identifier for a workload in Objectstore, along with defaults to use for all
140/// operations within that Usecase.
141///
142/// Usecases need to be statically defined in Objectstore's configuration server-side.
143/// Objectstore can make decisions based on the Usecase. For example, choosing the most
144/// suitable storage backend.
145#[derive(Debug, Clone)]
146pub struct Usecase {
147    name: Arc<str>,
148    compression: Compression,
149    expiration_policy: ExpirationPolicy,
150}
151
152impl Usecase {
153    /// Creates a new Usecase.
154    pub fn new(name: &str) -> Self {
155        Self {
156            name: name.into(),
157            compression: Compression::Zstd,
158            expiration_policy: Default::default(),
159        }
160    }
161
162    /// Returns the name of this usecase.
163    #[inline]
164    pub fn name(&self) -> &str {
165        &self.name
166    }
167
168    /// Returns the compression algorithm to use for operations within this usecase.
169    #[inline]
170    pub fn compression(&self) -> Compression {
171        self.compression
172    }
173
174    /// Sets the compression algorithm to use for operations within this usecase.
175    ///
176    /// It's still possible to override this default on each operation's builder.
177    ///
178    /// By default, [`Compression::Zstd`] is used.
179    pub fn with_compression(self, compression: Compression) -> Self {
180        Self {
181            compression,
182            ..self
183        }
184    }
185
186    /// Returns the expiration policy to use by default for operations within this usecase.
187    #[inline]
188    pub fn expiration_policy(&self) -> ExpirationPolicy {
189        self.expiration_policy
190    }
191
192    /// Sets the expiration policy to use for operations within this usecase.
193    ///
194    /// It's still possible to override this default on each operation's builder.
195    ///
196    /// By default, [`ExpirationPolicy::Manual`] is used, meaning that objects won't automatically
197    /// expire.
198    pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
199        Self {
200            expiration_policy,
201            ..self
202        }
203    }
204
205    /// Creates a new custom [`Scope`].
206    ///
207    /// Add parts to it using [`Scope::push`].
208    ///
209    /// Generally, [`Usecase::for_organization`] and [`Usecase::for_project`] should fit most usecases,
210    /// so prefer using those methods rather than creating your own custom [`Scope`].
211    pub fn scope(&self) -> Scope {
212        Scope::new(self.clone())
213    }
214
215    /// Creates a new [`Scope`] tied to the given organization.
216    pub fn for_organization(&self, organization: u64) -> Scope {
217        Scope::for_organization(self.clone(), organization)
218    }
219
220    /// Creates a new [`Scope`] tied to the given organization and project.
221    pub fn for_project(&self, organization: u64, project: u64) -> Scope {
222        Scope::for_project(self.clone(), organization, project)
223    }
224}
225
226#[derive(Debug)]
227pub(crate) struct ScopeInner {
228    usecase: Usecase,
229    scopes: scope::Scopes,
230}
231
232impl ScopeInner {
233    #[inline]
234    pub(crate) fn usecase(&self) -> &Usecase {
235        &self.usecase
236    }
237
238    #[inline]
239    pub(crate) fn scopes(&self) -> &scope::Scopes {
240        &self.scopes
241    }
242}
243
244/// A [`Scope`] is a sequence of key-value pairs that defines a (possibly nested) namespace within a
245/// [`Usecase`].
246///
247/// To construct a [`Scope`], use [`Usecase::for_organization`], [`Usecase::for_project`], or
248/// [`Usecase::scope`] for custom scopes.
249#[derive(Debug)]
250pub struct Scope(crate::Result<ScopeInner>);
251
252impl Scope {
253    /// Creates a new root-level Scope for the given usecase.
254    ///
255    /// Using a custom Scope is discouraged, prefer using [`Usecase::for_organization`] or [`Usecase::for_project`] instead.
256    pub fn new(usecase: Usecase) -> Self {
257        Self(Ok(ScopeInner {
258            usecase,
259            scopes: scope::Scopes::empty(),
260        }))
261    }
262
263    fn for_organization(usecase: Usecase, organization: u64) -> Self {
264        Self::new(usecase).push("org", organization)
265    }
266
267    fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
268        Self::for_organization(usecase, organization).push("project", project)
269    }
270
271    /// Extends this Scope by creating a new sub-scope nested within it.
272    pub fn push<V>(self, key: &str, value: V) -> Self
273    where
274        V: std::fmt::Display,
275    {
276        let result = self.0.and_then(|mut inner| {
277            inner.scopes.push(key, value)?;
278            Ok(inner)
279        });
280
281        Self(result)
282    }
283
284    /// Creates a session for this scope using the given client.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
289    pub fn session(self, client: &Client) -> crate::Result<Session> {
290        client.session(self)
291    }
292}
293
294#[derive(Debug)]
295pub(crate) struct ClientInner {
296    reqwest: reqwest::Client,
297    service_url: Url,
298    propagate_traces: bool,
299    token_generator: Option<TokenGenerator>,
300}
301
302/// A client for Objectstore. Use [`Client::builder`] to configure and construct a Client.
303///
304/// To perform CRUD operations, one has to create a Client, and then scope it to a [`Usecase`]
305/// and Scope in order to create a [`Session`].
306///
307/// If your Objectstore instance enforces authorization checks, you must provide a
308/// [`TokenGenerator`] on creation.
309///
310/// # Example
311///
312/// ```no_run
313/// use std::time::Duration;
314/// use objectstore_client::{Client, SecretKey, TokenGenerator, Usecase};
315/// use objectstore_types::Permission;
316///
317/// # async fn example() -> objectstore_client::Result<()> {
318/// let token_generator = TokenGenerator::new(SecretKey {
319///         secret_key: "<safely inject secret key>".into(),
320///         kid: "my-service".into(),
321///     })?
322///     .expiry_seconds(30)
323///     .permissions(&[Permission::ObjectRead]);
324///
325/// let client = Client::builder("http://localhost:8888/")
326///     .timeout(Duration::from_secs(1))
327///     .propagate_traces(true)
328///     .token_generator(token_generator)
329///     .build()?;
330///
331/// let session = Usecase::new("my_app")
332///     .for_project(12345, 1337)
333///     .session(&client)?;
334///
335/// let response = session.put("hello world").send().await?;
336///
337/// # Ok(())
338/// # }
339/// ```
340#[derive(Debug, Clone)]
341pub struct Client {
342    inner: Arc<ClientInner>,
343}
344
345impl Client {
346    /// Creates a new [`Client`], configured with the given `service_url` and default
347    /// configuration.
348    ///
349    /// Use [`Client::builder`] for more fine-grained configuration.
350    ///
351    /// # Errors
352    ///
353    /// This method fails if [`ClientBuilder::build`] fails.
354    pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
355        ClientBuilder::new(service_url).build()
356    }
357
358    /// Convenience function to create a [`ClientBuilder`].
359    pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
360        ClientBuilder::new(service_url)
361    }
362
363    /// Creates a session for the given scope using this client.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
368    pub fn session(&self, scope: Scope) -> crate::Result<Session> {
369        scope.0.map(|inner| Session {
370            scope: inner.into(),
371            client: self.inner.clone(),
372        })
373    }
374}
375
376/// Represents a session with Objectstore, tied to a specific Usecase and Scope within it.
377///
378/// Create a Session using [`Client::session`] or [`Scope::session`].
379#[derive(Debug, Clone)]
380pub struct Session {
381    pub(crate) scope: Arc<ScopeInner>,
382    pub(crate) client: Arc<ClientInner>,
383}
384
385/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
386pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
387
388impl Session {
389    /// Generates a GET url to the object with the given `key`.
390    ///
391    /// This can then be used by downstream services to fetch the given object.
392    /// NOTE however that the service does not strictly follow HTTP semantics,
393    /// in particular in relation to `Accept-Encoding`.
394    pub fn object_url(&self, object_key: &str) -> Url {
395        let mut url = self.client.service_url.clone();
396
397        // `path_segments_mut` can only error if the url is cannot-be-a-base,
398        // and we check that in `ClientBuilder::new`, therefore this will never panic.
399        let mut segments = url.path_segments_mut().unwrap();
400        segments
401            .push("v1")
402            .push("objects")
403            .push(&self.scope.usecase.name)
404            .push(&self.scope.scopes.as_api_path().to_string())
405            .extend(object_key.split("/"));
406        drop(segments);
407
408        url
409    }
410
411    pub(crate) fn request(
412        &self,
413        method: reqwest::Method,
414        object_key: &str,
415    ) -> crate::Result<reqwest::RequestBuilder> {
416        let url = self.object_url(object_key);
417
418        let mut builder = self.client.reqwest.request(method, url);
419
420        if let Some(token_generator) = &self.client.token_generator {
421            let token = token_generator.sign_for_scope(&self.scope)?;
422            builder = builder.bearer_auth(token);
423        }
424
425        if self.client.propagate_traces {
426            let trace_headers =
427                sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
428            for (header_name, value) in trace_headers.into_iter().flatten() {
429                builder = builder.header(header_name, value);
430            }
431        }
432
433        Ok(builder)
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440
441    #[test]
442    fn test_object_url() {
443        let client = Client::new("http://127.0.0.1:8888/").unwrap();
444        let usecase = Usecase::new("testing");
445        let scope = usecase
446            .for_project(12345, 1337)
447            .push("app_slug", "email_app");
448        let session = client.session(scope).unwrap();
449
450        assert_eq!(
451            session.object_url("foo/bar").to_string(),
452            "http://127.0.0.1:8888/v1/objects/testing/org=12345;project=1337;app_slug=email_app/foo/bar"
453        )
454    }
455
456    #[test]
457    fn test_object_url_with_base_path() {
458        let client = Client::new("http://127.0.0.1:8888/api/prefix").unwrap();
459        let usecase = Usecase::new("testing");
460        let scope = usecase.for_project(12345, 1337);
461        let session = client.session(scope).unwrap();
462
463        assert_eq!(
464            session.object_url("foo/bar").to_string(),
465            "http://127.0.0.1:8888/api/prefix/v1/objects/testing/org=12345;project=1337/foo/bar"
466        )
467    }
468}