use std::io;
use bytes::Bytes;
use relay_config::HttpEncoding;
use reqwest::header::{HeaderMap, HeaderValue};
pub use reqwest::StatusCode;
use serde::de::DeserializeOwned;
#[derive(Debug, thiserror::Error)]
pub enum HttpError {
#[error("payload too large")]
Overflow,
#[error("could not send request")]
Reqwest(#[from] reqwest::Error),
#[error("failed to stream payload")]
Io(#[from] io::Error),
#[error("failed to parse JSON response")]
Json(#[from] serde_json::Error),
}
impl HttpError {
pub fn is_network_error(&self) -> bool {
match self {
Self::Io(_) => true,
Self::Reqwest(error) => error.is_timeout(),
Self::Json(_) => false,
HttpError::Overflow => false,
}
}
}
pub struct Request(pub reqwest::Request);
pub struct RequestBuilder {
builder: Option<reqwest::RequestBuilder>,
}
impl RequestBuilder {
pub fn reqwest(builder: reqwest::RequestBuilder) -> Self {
RequestBuilder {
builder: Some(builder),
}
}
pub fn finish(self) -> Result<Request, HttpError> {
Ok(Request(self.builder.unwrap().build()?))
}
fn build<F>(&mut self, f: F) -> &mut Self
where
F: FnOnce(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
{
self.builder = self.builder.take().map(f);
self
}
pub fn header(&mut self, key: impl AsRef<str>, value: impl AsRef<[u8]>) -> &mut Self {
self.build(|builder| builder.header(key.as_ref(), value.as_ref()))
}
pub fn header_opt(
&mut self,
key: impl AsRef<str>,
value: Option<impl AsRef<[u8]>>,
) -> &mut Self {
match value {
Some(value) => self.build(|builder| builder.header(key.as_ref(), value.as_ref())),
None => self,
}
}
pub fn content_encoding(&mut self, encoding: HttpEncoding) -> &mut Self {
self.header_opt("content-encoding", encoding.name())
}
pub fn body(&mut self, body: Bytes) -> &mut Self {
self.build(|builder| builder.body(body))
}
}
pub struct Response(pub reqwest::Response);
impl Response {
pub fn status(&self) -> StatusCode {
self.0.status()
}
pub async fn consume(&mut self) -> Result<(), HttpError> {
while self.0.chunk().await?.is_some() {}
Ok(())
}
pub fn get_header(&self, key: impl AsRef<str>) -> Option<&[u8]> {
Some(self.0.headers().get(key.as_ref())?.as_bytes())
}
pub fn get_all_headers(&self, key: impl AsRef<str>) -> Vec<&[u8]> {
self.0
.headers()
.get_all(key.as_ref())
.into_iter()
.map(|value| value.as_bytes())
.collect()
}
pub fn headers(&self) -> &HeaderMap<HeaderValue> {
self.0.headers()
}
pub async fn bytes(self, limit: usize) -> Result<Vec<u8>, HttpError> {
let Self(mut request) = self;
let mut body = Vec::with_capacity(limit.min(8192));
while let Some(chunk) = request.chunk().await? {
if (body.len() + chunk.len()) > limit {
return Err(HttpError::Overflow);
}
body.extend_from_slice(&chunk);
}
Ok(body)
}
pub async fn json<T>(self, limit: usize) -> Result<T, HttpError>
where
T: 'static + DeserializeOwned,
{
let bytes = self.bytes(limit).await?;
serde_json::from_slice(&bytes).map_err(HttpError::Json)
}
}