objectstore_client/
client.rs1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8
9pub use objectstore_types::{Compression, PARAM_SCOPE, PARAM_USECASE};
10
11const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
12
13#[derive(Debug)]
20pub struct ClientBuilder {
21 service_url: Arc<str>,
22 client: reqwest::Client,
23 propagate_traces: bool,
24
25 usecase: Arc<str>,
26 default_compression: Compression,
27 default_expiration_policy: ExpirationPolicy,
28}
29
30impl ClientBuilder {
31 pub fn new(service_url: &str, usecase: &str) -> crate::Result<Self> {
39 let client = reqwest::Client::builder()
40 .user_agent(USER_AGENT)
41 .no_brotli()
44 .no_deflate()
45 .no_gzip()
46 .no_zstd()
47 .connect_timeout(Duration::from_millis(500))
52 .read_timeout(Duration::from_millis(500))
53 .build()?;
54
55 Ok(Self {
56 service_url: service_url.trim_end_matches('/').into(),
57 client,
58 propagate_traces: false,
59
60 usecase: usecase.into(),
61 default_compression: Compression::Zstd,
62 default_expiration_policy: ExpirationPolicy::Manual,
63 })
64 }
65
66 pub fn default_compression(mut self, compression: Compression) -> Self {
68 self.default_compression = compression;
69 self
70 }
71
72 pub fn default_expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
74 self.default_expiration_policy = expiration_policy;
75 self
76 }
77
78 pub fn with_distributed_tracing(mut self, propagate_traces: bool) -> Self {
81 self.propagate_traces = propagate_traces;
82 self
83 }
84
85 fn make_client(&self, scope: String) -> Client {
86 Client {
87 service_url: self.service_url.clone(),
88 http: self.client.clone(),
89 propagate_traces: self.propagate_traces,
90
91 usecase: self.usecase.clone(),
92 scope,
93 default_compression: self.default_compression,
94 default_expiration_policy: self.default_expiration_policy,
95 }
96 }
97
98 pub fn for_organization(&self, organization_id: u64) -> Client {
100 let scope = format!("org.{organization_id}");
101 self.make_client(scope)
102 }
103
104 pub fn for_project(&self, organization_id: u64, project_id: u64) -> Client {
107 let scope = format!("org.{organization_id}/proj.{project_id}");
108 self.make_client(scope)
109 }
110}
111
112#[derive(Debug)]
114pub struct Client {
115 pub(crate) http: reqwest::Client,
116 pub(crate) service_url: Arc<str>,
117 propagate_traces: bool,
118
119 pub(crate) usecase: Arc<str>,
120
121 pub(crate) scope: String,
132 pub(crate) default_compression: Compression,
133 pub(crate) default_expiration_policy: ExpirationPolicy,
134}
135
136pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
138
139impl Client {
140 pub(crate) fn request<U: reqwest::IntoUrl>(
141 &self,
142 method: reqwest::Method,
143 uri: U,
144 ) -> crate::Result<reqwest::RequestBuilder> {
145 let mut builder = self.http.request(method, uri).query(&[
146 (PARAM_SCOPE, self.scope.as_ref()),
147 (PARAM_USECASE, self.usecase.as_ref()),
148 ]);
149
150 if self.propagate_traces {
151 let trace_headers =
152 sentry::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
153 for (header_name, value) in trace_headers.into_iter().flatten() {
154 builder = builder.header(header_name, value);
155 }
156 }
157
158 Ok(builder)
159 }
160
161 pub async fn delete(&self, id: &str) -> crate::Result<()> {
163 let delete_url = format!("{}/v1/{id}", self.service_url);
164
165 let _response = self
166 .request(reqwest::Method::DELETE, delete_url)?
167 .send()
168 .await?;
169
170 Ok(())
171 }
172}