from __future__ import annotations
from collections.abc import Mapping
from dataclasses import asdict, dataclass
from io import BytesIO
from typing import IO, Any, Literal, NamedTuple, cast
from urllib.parse import urlparse
import sentry_sdk
import urllib3
import zstandard
from urllib3.connectionpool import HTTPConnectionPool
from objectstore_client.auth import TokenGenerator
from objectstore_client.metadata import (
HEADER_EXPIRATION,
HEADER_META_PREFIX,
Compression,
ExpirationPolicy,
Metadata,
format_expiration,
)
from objectstore_client.metrics import (
MetricsBackend,
NoOpMetricsBackend,
measure_storage_operation,
)
from objectstore_client.scope import Scope
[docs]
class GetResponse(NamedTuple):
metadata: Metadata
payload: IO[bytes]
[docs]
class RequestError(Exception):
"""Exception raised if an API call to Objectstore fails."""
def __init__(self, message: str, status: int, response: str):
super().__init__(message)
self.status = status
self.response = response
[docs]
class Usecase:
"""
An identifier for a workload in Objectstore, along with defaults to use for all
operations within that Usecase.
Usecases need to be statically defined in Objectstore's configuration server-side.
Objectstore can make decisions based on the Usecase. For example, choosing the most
suitable storage backend.
"""
name: str
_compression: Compression
_expiration_policy: ExpirationPolicy | None
def __init__(
self,
name: str,
compression: Compression = "zstd",
expiration_policy: ExpirationPolicy | None = None,
):
self.name = name
self._compression = compression
self._expiration_policy = expiration_policy
# Connect timeout used unless overridden in connection parameters.
DEFAULT_CONNECT_TIMEOUT = 0.1
@dataclass
class _ConnectionDefaults:
retries: urllib3.Retry = urllib3.Retry(connect=3, read=0)
"""We only retry connection problems, as we cannot rewind our compression stream."""
timeout: urllib3.Timeout = urllib3.Timeout(
connect=DEFAULT_CONNECT_TIMEOUT, read=None
)
"""
The read timeout is defined to be "between consecutive read operations", which
should mean one chunk of the response, with a large response being split into
multiple chunks.
By default, the client limits the connection phase to 100ms, and has no read
timeout.
"""
[docs]
class Client:
"""
A client for Objectstore. Constructing it initializes a connection pool.
Args:
base_url: The base URL of the Objectstore server (e.g.,
"http://objectstore:8888"). metrics_backend: Optional metrics backend for
tracking storage operations. Defaults to ``NoOpMetricsBackend`` if not
provided.
propagate_traces: Whether to propagate Sentry trace headers in requests to
objectstore. Defaults to ``False``.
retries: Number of connection retries for failed requests.
Defaults to ``3`` if not specified. **Note:** only connection failures are
retried, not read failures (as compression streams cannot be rewound).
timeout_ms: Read timeout in milliseconds for API requests. The read timeout
is the maximum time to wait between consecutive read operations on the
socket (i.e., between receiving chunks of data). Defaults to no read timeout
if not specified. The connection timeout is always 100ms. To override the
connection timeout, pass a custom ``urllib3.Timeout`` object via
``connection_kwargs``. For example:
.. code-block:: python
client = Client(
"http://objectstore:8888", connection_kwargs={
"timeout": urllib3.Timeout(connect=1.0, read=5.0)
}
)
connection_kwargs: Additional keyword arguments to pass to the underlying
urllib3 connection pool (e.g., custom headers, SSL settings, advanced
timeouts).
"""
def __init__(
self,
base_url: str,
metrics_backend: MetricsBackend | None = None,
propagate_traces: bool = False,
retries: int | None = None,
timeout_ms: float | None = None,
connection_kwargs: Mapping[str, Any] | None = None,
token_generator: TokenGenerator | None = None,
):
connection_kwargs_to_use = asdict(_ConnectionDefaults())
if retries:
connection_kwargs_to_use["retries"] = urllib3.Retry(
connect=retries,
# we only retry connection problems, as we cannot rewind our
# compression stream
read=0,
)
if timeout_ms:
connection_kwargs_to_use["timeout"] = urllib3.Timeout(
connect=DEFAULT_CONNECT_TIMEOUT, read=timeout_ms / 1000
)
if connection_kwargs:
connection_kwargs_to_use = {**connection_kwargs_to_use, **connection_kwargs}
self._pool = urllib3.connectionpool.connection_from_url(
base_url, **connection_kwargs_to_use
)
self._base_path = urlparse(base_url).path
self._metrics_backend = metrics_backend or NoOpMetricsBackend()
self._propagate_traces = propagate_traces
self._token_generator = token_generator
[docs]
def session(self, usecase: Usecase, **scopes: str | int | bool) -> Session:
"""
Create a [Session] with the Objectstore server, tied to a specific [Usecase] and
[Scope].
A Scope is a (possibly nested) namespace within a Usecase, given as a sequence
of key-value pairs passed as kwargs.
IMPORTANT: the order of the kwargs matters!
The admitted characters for keys and values are: `A-Za-z0-9_-()$!+*'`.
Users are free to choose the scope structure that best suits their Usecase.
The combination of Usecase and Scope will determine the physical key/path of the
blob in the underlying storage backend.
For most usecases, it's recommended to use the organization and project ID as
the first components of the scope, as follows:
```
client.session(usecase, org=organization_id, project=project_id, ...)
```
"""
return Session(
self._pool,
self._base_path,
self._metrics_backend,
self._propagate_traces,
usecase,
Scope(**scopes),
self._token_generator,
)
[docs]
class Session:
"""
A session with the Objectstore server, scoped to a specific [Usecase] and Scope.
This should never be constructed directly, use [Client.session].
"""
def __init__(
self,
pool: HTTPConnectionPool,
base_path: str,
metrics_backend: MetricsBackend,
propagate_traces: bool,
usecase: Usecase,
scope: Scope,
token_generator: TokenGenerator | None,
):
self._pool = pool
self._base_path = base_path
self._metrics_backend = metrics_backend
self._propagate_traces = propagate_traces
self._usecase = usecase
self._scope = scope
self._token_generator = token_generator
def _make_headers(self) -> dict[str, str]:
headers = dict(self._pool.headers)
if self._propagate_traces:
headers.update(
dict(sentry_sdk.get_current_scope().iter_trace_propagation_headers())
)
if self._token_generator:
token = self._token_generator.sign_for_scope(
self._usecase.name, self._scope
)
headers["Authorization"] = f"Bearer {token}"
return headers
def _make_url(self, key: str | None, full: bool = False) -> str:
relative_path = f"/v1/objects/{self._usecase.name}/{self._scope}/{key or ''}"
path = self._base_path.rstrip("/") + relative_path
if full:
return f"http://{self._pool.host}:{self._pool.port}{path}"
return path
[docs]
def put(
self,
contents: bytes | IO[bytes],
key: str | None = None,
compression: Compression | Literal["none"] | None = None,
content_type: str | None = None,
metadata: dict[str, str] | None = None,
expiration_policy: ExpirationPolicy | None = None,
) -> str:
"""
Uploads the given `contents` to blob storage.
If no `key` is provided, one will be automatically generated and returned
from this function.
The client will select the configured `default_compression` if none is given
explicitly.
This can be overridden by explicitly giving a `compression` argument.
Providing `"none"` as the argument will instruct the client to not apply
any compression to this upload, which is useful for uncompressible formats.
You can use the utility function `objectstore_client.utils.guess_mime_type`
to attempt to guess a `content_type` based on magic bytes.
"""
headers = self._make_headers()
body = BytesIO(contents) if isinstance(contents, bytes) else contents
original_body: IO[bytes] = body
compression = compression or self._usecase._compression
if compression == "zstd":
cctx = zstandard.ZstdCompressor()
body = cctx.stream_reader(original_body)
headers["Content-Encoding"] = "zstd"
if content_type:
headers["Content-Type"] = content_type
expiration_policy = expiration_policy or self._usecase._expiration_policy
if expiration_policy:
headers[HEADER_EXPIRATION] = format_expiration(expiration_policy)
if metadata:
for k, v in metadata.items():
headers[f"{HEADER_META_PREFIX}{k}"] = v
if key == "":
key = None
with measure_storage_operation(
self._metrics_backend, "put", self._usecase.name
) as metric_emitter:
response = self._pool.request(
"POST" if not key else "PUT",
self._make_url(key),
body=body,
headers=headers,
preload_content=True,
decode_content=True,
)
raise_for_status(response)
res = response.json()
# Must do this after streaming `body` as that's what is responsible
# for advancing the seek position in both streams
metric_emitter.record_uncompressed_size(original_body.tell())
if compression and compression != "none":
metric_emitter.record_compressed_size(body.tell(), compression)
return res["key"]
[docs]
def get(self, key: str, decompress: bool = True) -> GetResponse:
"""
This fetches the blob with the given `key`, returning an `IO` stream that
can be read.
By default, content that was uploaded compressed will be automatically
decompressed, unless `decompress=True` is passed.
"""
headers = self._make_headers()
with measure_storage_operation(
self._metrics_backend, "get", self._usecase.name
):
response = self._pool.request(
"GET",
self._make_url(key),
preload_content=False,
decode_content=False,
headers=headers,
)
raise_for_status(response)
# OR: should I use `response.stream()`?
stream = cast(IO[bytes], response)
metadata = Metadata.from_headers(response.headers)
if metadata.compression and decompress:
if metadata.compression != "zstd":
raise NotImplementedError(
"Transparent decoding of anything but `zstd` is not implemented yet"
)
metadata.compression = None
dctx = zstandard.ZstdDecompressor()
stream = dctx.stream_reader(stream, read_across_frames=True)
return GetResponse(metadata, stream)
[docs]
def object_url(self, key: str) -> str:
"""
Generates a GET url to the object with the given `key`.
This can then be used by downstream services to fetch the given object.
NOTE however that the service does not strictly follow HTTP semantics,
in particular in relation to `Accept-Encoding`.
"""
return self._make_url(key, full=True)
[docs]
def delete(self, key: str) -> None:
"""
Deletes the blob with the given `key`.
"""
headers = self._make_headers()
with measure_storage_operation(
self._metrics_backend, "delete", self._usecase.name
):
response = self._pool.request(
"DELETE",
self._make_url(key),
headers=headers,
)
raise_for_status(response)
[docs]
def raise_for_status(response: urllib3.BaseHTTPResponse) -> None:
if response.status >= 400:
res = (response.data or response.read() or b"").decode("utf-8", "replace")
raise RequestError(
f"Objectstore request failed with status {response.status}",
response.status,
res,
)