from __future__ import annotations
import time
from abc import abstractmethod
from collections.abc import Generator, Mapping
from contextlib import contextmanager
from typing import Protocol, runtime_checkable
Tags = Mapping[str, str]
[docs]
@runtime_checkable
class MetricsBackend(Protocol):
"""
An abstract class that defines the interface for metrics backends.
"""
[docs]
@abstractmethod
def increment(
self,
name: str,
value: int | float = 1,
tags: Tags | None = None,
) -> None:
"""
Increments a counter metric by a given value.
"""
raise NotImplementedError
[docs]
@abstractmethod
def gauge(self, name: str, value: int | float, tags: Tags | None = None) -> None:
"""
Sets a gauge metric to the given value.
"""
raise NotImplementedError
[docs]
@abstractmethod
def distribution(
self,
name: str,
value: int | float,
tags: Tags | None = None,
unit: str | None = None,
) -> None:
"""
Records a distribution metric.
"""
raise NotImplementedError
[docs]
class NoOpMetricsBackend(MetricsBackend):
"""
Default metrics backend that does not record anything.
"""
[docs]
def increment(
self,
name: str,
value: int | float = 1,
tags: Tags | None = None,
) -> None:
pass
[docs]
def gauge(self, name: str, value: int | float, tags: Tags | None = None) -> None:
pass
[docs]
def distribution(
self,
name: str,
value: int | float,
tags: Tags | None = None,
unit: str | None = None,
) -> None:
pass
[docs]
class StorageMetricEmitter:
def __init__(self, backend: MetricsBackend, operation: str, usecase: str):
self.backend = backend
self.operation = operation
self.usecase = usecase
# These may be set during or after the enclosed operation
self.start: int | None = None
self.elapsed: float | None = None
self.uncompressed_size: int | None = None
self.compressed_size: int | None = None
self.compression: str = "unknown"
[docs]
def record_latency(self, elapsed: float) -> None:
tags = {"usecase": self.usecase}
self.backend.distribution(
f"storage.{self.operation}.latency", elapsed, tags=tags
)
self.elapsed = elapsed
[docs]
def record_uncompressed_size(self, value: int) -> None:
tags = {"usecase": self.usecase, "compression": "none"}
self.backend.distribution(
f"storage.{self.operation}.size", value, tags=tags, unit="byte"
)
self.uncompressed_size = value
[docs]
def record_compressed_size(self, value: int, compression: str = "unknown") -> None:
tags = {"usecase": self.usecase, "compression": compression}
self.backend.distribution(
f"storage.{self.operation}.size", value, tags=tags, unit="byte"
)
self.compressed_size = value
self.compression = compression
[docs]
def maybe_record_compression_ratio(self) -> None:
if not self.uncompressed_size or not self.compressed_size:
return None
tags = {"usecase": self.usecase, "compression": self.compression}
self.backend.distribution(
f"storage.{self.operation}.compression_ratio",
self.compressed_size / self.uncompressed_size,
tags=tags,
)
[docs]
def maybe_record_throughputs(self) -> None:
if not self.elapsed or self.elapsed <= 0:
return None
sizes = []
if self.uncompressed_size:
sizes.append((self.uncompressed_size, "none"))
if self.compressed_size:
sizes.append((self.compressed_size, self.compression))
for size, compression in sizes:
tags = {"usecase": self.usecase, "compression": compression}
self.backend.distribution(
f"storage.{self.operation}.throughput", size / self.elapsed, tags=tags
)
self.backend.distribution(
f"storage.{self.operation}.inverse_throughput",
self.elapsed / size,
tags=tags,
)
[docs]
@contextmanager
def measure_storage_operation(
backend: MetricsBackend,
operation: str,
usecase: str,
uncompressed_size: int | None = None,
compressed_size: int | None = None,
compression: str = "unknown",
) -> Generator[StorageMetricEmitter]:
"""
Context manager which records the latency of the enclosed storage operation.
Can also record the compressed or uncompressed size of an object, the
compression ratio, the throughput, and the inverse throughput.
Yields a `StorageMetricEmitter` because for some operations (GET) the size
is not known until the inside of the enclosed block.
"""
emitter = StorageMetricEmitter(backend, operation, usecase)
if uncompressed_size:
emitter.record_uncompressed_size(uncompressed_size)
if compressed_size:
emitter.record_compressed_size(compressed_size, compression)
start = time.monotonic()
# Yield an emitter in case the size becomes known inside the enclosed block
try:
yield emitter
finally:
elapsed = time.monotonic() - start
emitter.record_latency(elapsed)
# If `uncompressed_size` and/or `compressed_size` have been set, we have
# extra metrics we can send.
emitter.maybe_record_compression_ratio()
emitter.maybe_record_throughputs()