objectstore_client/
client.rs

1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8
9pub use objectstore_types::{Compression, PARAM_SCOPE, PARAM_USECASE};
10
11const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
12
13/// Service for storing and retrieving objects.
14///
15/// The Service contains the base configuration to connect to a service.
16/// It has to be further initialized with credentials using the
17/// [`for_organization`](Self::for_organization) and
18/// [`for_project`](Self::for_project) functions.
19#[derive(Debug)]
20pub struct ClientBuilder {
21    service_url: Arc<str>,
22    client: reqwest::Client,
23    propagate_traces: bool,
24
25    usecase: Arc<str>,
26    default_compression: Compression,
27    default_expiration_policy: ExpirationPolicy,
28}
29
30impl ClientBuilder {
31    /// Creates a new [`ClientBuilder`].
32    ///
33    /// This service instance is configured to target the given `service_url`.
34    /// It is also scoped for the given `usecase`.
35    ///
36    /// In order to get or put objects, one has to create a [`Client`] using the
37    /// [`for_organization`](Self::for_organization) function.
38    pub fn new(service_url: &str, usecase: &str) -> crate::Result<Self> {
39        let client = reqwest::Client::builder()
40            .user_agent(USER_AGENT)
41            // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
42            // we are dealing with de/compression ourselves:
43            .no_brotli()
44            .no_deflate()
45            .no_gzip()
46            .no_zstd()
47            // The read timeout "applies to each read operation", so should work fine for larger
48            // transfers that are split into multiple chunks.
49            // We define both as 500ms which is still very conservative, given that we are in the same network,
50            // and expect our backends to respond in <100ms.
51            .connect_timeout(Duration::from_millis(500))
52            .read_timeout(Duration::from_millis(500))
53            .build()?;
54
55        Ok(Self {
56            service_url: service_url.trim_end_matches('/').into(),
57            client,
58            propagate_traces: false,
59
60            usecase: usecase.into(),
61            default_compression: Compression::Zstd,
62            default_expiration_policy: ExpirationPolicy::Manual,
63        })
64    }
65
66    /// This changes the default compression used for uploads.
67    pub fn default_compression(mut self, compression: Compression) -> Self {
68        self.default_compression = compression;
69        self
70    }
71
72    /// This sets a default expiration policy used for uploads.
73    pub fn default_expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
74        self.default_expiration_policy = expiration_policy;
75        self
76    }
77
78    /// This changes whether the `sentry-trace` header will be sent to Objectstore
79    /// to take advantage of Sentry's distributed tracing.
80    pub fn with_distributed_tracing(mut self, propagate_traces: bool) -> Self {
81        self.propagate_traces = propagate_traces;
82        self
83    }
84
85    fn make_client(&self, scope: String) -> Client {
86        Client {
87            service_url: self.service_url.clone(),
88            http: self.client.clone(),
89            propagate_traces: self.propagate_traces,
90
91            usecase: self.usecase.clone(),
92            scope,
93            default_compression: self.default_compression,
94            default_expiration_policy: self.default_expiration_policy,
95        }
96    }
97
98    /// Create a new [`Client`] and sets its `scope` based on the provided organization.
99    pub fn for_organization(&self, organization_id: u64) -> Client {
100        let scope = format!("org.{organization_id}");
101        self.make_client(scope)
102    }
103
104    /// Create a new [`Client`] and sets its `scope` based on the provided organization
105    /// and project.
106    pub fn for_project(&self, organization_id: u64, project_id: u64) -> Client {
107        let scope = format!("org.{organization_id}/proj.{project_id}");
108        self.make_client(scope)
109    }
110}
111
112/// A scoped objectstore client that can access objects in a specific use case and scope.
113#[derive(Debug)]
114pub struct Client {
115    pub(crate) http: reqwest::Client,
116    pub(crate) service_url: Arc<str>,
117    propagate_traces: bool,
118
119    pub(crate) usecase: Arc<str>,
120
121    /// The scope that this client operates within.
122    ///
123    /// Scopes are expected to be serialized ordered lists of key/value pairs. Each
124    /// pair is serialized with a `.` character between the key and value, and with
125    /// a `/` character between each pair. For example:
126    /// - `org.123/proj.456`
127    /// - `state.washington/city.seattle`
128    ///
129    /// It is recommended that both keys and values be restricted to alphanumeric
130    /// characters.
131    pub(crate) scope: String,
132    pub(crate) default_compression: Compression,
133    pub(crate) default_expiration_policy: ExpirationPolicy,
134}
135
136/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
137pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
138
139impl Client {
140    pub(crate) fn request<U: reqwest::IntoUrl>(
141        &self,
142        method: reqwest::Method,
143        uri: U,
144    ) -> crate::Result<reqwest::RequestBuilder> {
145        let mut builder = self.http.request(method, uri).query(&[
146            (PARAM_SCOPE, self.scope.as_ref()),
147            (PARAM_USECASE, self.usecase.as_ref()),
148        ]);
149
150        if self.propagate_traces {
151            let trace_headers =
152                sentry::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
153            for (header_name, value) in trace_headers.into_iter().flatten() {
154                builder = builder.header(header_name, value);
155            }
156        }
157
158        Ok(builder)
159    }
160
161    /// Deletes the object with the given `id`.
162    pub async fn delete(&self, id: &str) -> crate::Result<()> {
163        let delete_url = format!("{}/v1/{id}", self.service_url);
164
165        let _response = self
166            .request(reqwest::Method::DELETE, delete_url)?
167            .send()
168            .await?;
169
170        Ok(())
171    }
172}