relay_server/services/server/
acceptor.rsuse std::io;
use std::time::Duration;
use axum_server::accept::Accept;
use socket2::TcpKeepalive;
use tokio::net::TcpStream;
use crate::services::server::io::IdleTimeout;
use crate::statsd::RelayCounters;
#[derive(Clone, Debug, Default)]
pub struct RelayAcceptor {
tcp_keepalive: Option<TcpKeepalive>,
idle_timeout: Option<Duration>,
}
impl RelayAcceptor {
pub fn new() -> Self {
Default::default()
}
pub fn tcp_keepalive(mut self, timeout: Duration, retries: u32) -> Self {
if timeout.is_zero() {
self.tcp_keepalive = None;
return self;
}
let mut keepalive = socket2::TcpKeepalive::new().with_time(timeout);
#[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))]
{
keepalive = keepalive.with_interval(timeout);
}
#[cfg(not(any(
target_os = "openbsd",
target_os = "redox",
target_os = "solaris",
target_os = "windows"
)))]
{
keepalive = keepalive.with_retries(retries);
}
self.tcp_keepalive = Some(keepalive);
self
}
pub fn idle_timeout(mut self, idle_timeout: Option<Duration>) -> Self {
self.idle_timeout = idle_timeout;
self
}
}
impl<S> Accept<TcpStream, S> for RelayAcceptor {
type Stream = IdleTimeout<TcpStream>;
type Service = S;
type Future = std::future::Ready<io::Result<(Self::Stream, Self::Service)>>;
fn accept(&self, stream: TcpStream, service: S) -> Self::Future {
let mut keepalive = "ok";
let mut nodelay = "ok";
if let Some(tcp_keepalive) = &self.tcp_keepalive {
let sock_ref = socket2::SockRef::from(&stream);
if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) {
relay_log::trace!("error trying to set TCP keepalive: {e}");
keepalive = "error";
}
}
if let Err(e) = stream.set_nodelay(true) {
relay_log::trace!("failed to set TCP_NODELAY: {e}");
nodelay = "error";
}
relay_statsd::metric!(
counter(RelayCounters::ServerSocketAccept) += 1,
keepalive = keepalive,
nodelay = nodelay
);
let stream = IdleTimeout::new(stream, self.idle_timeout);
std::future::ready(Ok((stream, service)))
}
}