relay_server/services/server/
acceptor.rs

1use std::io;
2use std::time::Duration;
3
4use axum_server::accept::Accept;
5use socket2::TcpKeepalive;
6use tokio::net::TcpStream;
7
8use crate::services::server::io::IdleTimeout;
9use crate::statsd::RelayCounters;
10
11#[derive(Clone, Debug, Default)]
12pub struct RelayAcceptor {
13    tcp_keepalive: Option<TcpKeepalive>,
14    idle_timeout: Option<Duration>,
15}
16
17impl RelayAcceptor {
18    /// Creates a new [`RelayAcceptor`] which only configures `TCP_NODELAY`.
19    pub fn new() -> Self {
20        Default::default()
21    }
22
23    /// Configures the acceptor to enable TCP keep-alive.
24    ///
25    /// The `timeout` is used to configure the keep-alive time as well as interval.
26    /// A zero duration timeout disables TCP keep-alive.
27    ///
28    /// `retries` configures the amount of keep-alive probes.
29    pub fn tcp_keepalive(mut self, timeout: Duration, retries: u32) -> Self {
30        if timeout.is_zero() {
31            self.tcp_keepalive = None;
32            return self;
33        }
34
35        let mut keepalive = socket2::TcpKeepalive::new().with_time(timeout);
36        #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))]
37        {
38            keepalive = keepalive.with_interval(timeout);
39        }
40
41        let _retries = retries;
42        #[cfg(not(any(
43            target_os = "openbsd",
44            target_os = "redox",
45            target_os = "solaris",
46            target_os = "windows"
47        )))]
48        {
49            keepalive = keepalive.with_retries(_retries);
50        }
51        self.tcp_keepalive = Some(keepalive);
52
53        self
54    }
55
56    /// Configures an idle timeout for the connection.
57    ///
58    /// Whenever there is no activity on a connection for the specified timeout,
59    /// the connection is closed.
60    ///
61    /// Note: This limits the total idle time of a duration and unlike read and write timeouts
62    /// also limits the time a connection is kept alive without requests.
63    pub fn idle_timeout(mut self, idle_timeout: Option<Duration>) -> Self {
64        self.idle_timeout = idle_timeout;
65        self
66    }
67}
68
69impl<S> Accept<TcpStream, S> for RelayAcceptor {
70    type Stream = IdleTimeout<TcpStream>;
71    type Service = S;
72    type Future = std::future::Ready<io::Result<(Self::Stream, Self::Service)>>;
73
74    fn accept(&self, stream: TcpStream, service: S) -> Self::Future {
75        let mut keepalive = "ok";
76        let mut nodelay = "ok";
77
78        if let Some(tcp_keepalive) = &self.tcp_keepalive {
79            let sock_ref = socket2::SockRef::from(&stream);
80            if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) {
81                relay_log::trace!("error trying to set TCP keepalive: {e}");
82                keepalive = "error";
83            }
84        }
85
86        if let Err(e) = stream.set_nodelay(true) {
87            relay_log::trace!("failed to set TCP_NODELAY: {e}");
88            nodelay = "error";
89        }
90
91        relay_statsd::metric!(
92            counter(RelayCounters::ServerSocketAccept) += 1,
93            keepalive = keepalive,
94            nodelay = nodelay
95        );
96
97        let stream = IdleTimeout::new(stream, self.idle_timeout);
98        std::future::ready(Ok((stream, service)))
99    }
100}