relay_server/services/server/
acceptor.rs1use std::io;
2use std::time::Duration;
3
4use axum_server::accept::Accept;
5use socket2::TcpKeepalive;
6use tokio::net::TcpStream;
7
8use crate::services::server::io::IdleTimeout;
9use crate::statsd::RelayCounters;
10
11#[derive(Clone, Debug, Default)]
12pub struct RelayAcceptor {
13 tcp_keepalive: Option<TcpKeepalive>,
14 idle_timeout: Option<Duration>,
15}
16
17impl RelayAcceptor {
18 pub fn new() -> Self {
20 Default::default()
21 }
22
23 pub fn tcp_keepalive(mut self, timeout: Duration, retries: u32) -> Self {
30 if timeout.is_zero() {
31 self.tcp_keepalive = None;
32 return self;
33 }
34
35 let mut keepalive = socket2::TcpKeepalive::new().with_time(timeout);
36 #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))]
37 {
38 keepalive = keepalive.with_interval(timeout);
39 }
40
41 let _retries = retries;
42 #[cfg(not(any(
43 target_os = "openbsd",
44 target_os = "redox",
45 target_os = "solaris",
46 target_os = "windows"
47 )))]
48 {
49 keepalive = keepalive.with_retries(_retries);
50 }
51 self.tcp_keepalive = Some(keepalive);
52
53 self
54 }
55
56 pub fn idle_timeout(mut self, idle_timeout: Option<Duration>) -> Self {
64 self.idle_timeout = idle_timeout;
65 self
66 }
67}
68
69impl<S> Accept<TcpStream, S> for RelayAcceptor {
70 type Stream = IdleTimeout<TcpStream>;
71 type Service = S;
72 type Future = std::future::Ready<io::Result<(Self::Stream, Self::Service)>>;
73
74 fn accept(&self, stream: TcpStream, service: S) -> Self::Future {
75 let mut keepalive = "ok";
76 let mut nodelay = "ok";
77
78 if let Some(tcp_keepalive) = &self.tcp_keepalive {
79 let sock_ref = socket2::SockRef::from(&stream);
80 if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) {
81 relay_log::trace!("error trying to set TCP keepalive: {e}");
82 keepalive = "error";
83 }
84 }
85
86 if let Err(e) = stream.set_nodelay(true) {
87 relay_log::trace!("failed to set TCP_NODELAY: {e}");
88 nodelay = "error";
89 }
90
91 relay_statsd::metric!(
92 counter(RelayCounters::ServerSocketAccept) += 1,
93 keepalive = keepalive,
94 nodelay = nodelay
95 );
96
97 let stream = IdleTimeout::new(stream, self.idle_timeout);
98 std::future::ready(Ok((stream, service)))
99 }
100}