relay_server/services/server/
mod.rs

1use std::net::{SocketAddr, TcpListener};
2use std::sync::Arc;
3use std::time::Duration;
4
5use axum::ServiceExt;
6use axum::extract::Request;
7use axum::http::{HeaderName, HeaderValue, header};
8use axum_server::Handle;
9use hyper_util::rt::TokioTimer;
10use relay_config::Config;
11use relay_system::{Controller, Service, Shutdown};
12use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer};
13use tokio::net::TcpSocket;
14use tower::ServiceBuilder;
15use tower_http::compression::predicate::SizeAbove;
16use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate};
17use tower_http::set_header::SetResponseHeaderLayer;
18
19use crate::constants;
20use crate::middlewares::{self, CatchPanicLayer, NormalizePath, RequestDecompressionLayer};
21use crate::service::ServiceState;
22use crate::statsd::{RelayCounters, RelayGauges};
23
24mod acceptor;
25mod concurrency_limit;
26mod io;
27
28/// Set the number of keep-alive retransmissions to be carried out before declaring that remote end
29/// is not available.
30const KEEPALIVE_RETRIES: u32 = 5;
31
32/// Set a timeout for reading client request headers. If a client does not transmit the entire
33/// header within this time, the connection is closed.
34const CLIENT_HEADER_TIMEOUT: Duration = Duration::from_secs(5);
35
36/// Only compress responses above this configured size, in bytes.
37///
38/// Small responses don't benefit from compression,
39/// additionally the envelope endpoint which returns the event id
40/// should not be compressed due to a bug in the Unity SDK.
41const COMPRESSION_MIN_SIZE: u16 = 128;
42
43/// Indicates the type of failure of the server.
44#[allow(clippy::enum_variant_names)]
45#[derive(Debug, thiserror::Error)]
46pub enum ServerError {
47    /// Binding failed.
48    #[error("bind to interface failed")]
49    BindFailed(#[from] std::io::Error),
50
51    /// TLS support was not compiled in.
52    #[error("SSL is no longer supported by Relay, please use a proxy in front")]
53    TlsNotSupported,
54}
55
56type App = NormalizePath<axum::Router>;
57
58/// Build the axum application with all routes and middleware.
59fn make_app(service: ServiceState, f: impl FnOnce(&Config) -> axum::Router<ServiceState>) -> App {
60    // Build the router middleware into a single service which runs _after_ routing. Service
61    // builder order defines layers added first will be called first. This means:
62    //  - Requests go from top to bottom
63    //  - Responses go from bottom to top
64    let middleware = ServiceBuilder::new()
65        .layer(axum::middleware::from_fn(middlewares::metrics))
66        .layer(CatchPanicLayer::custom(middlewares::handle_panic))
67        .layer(SetResponseHeaderLayer::overriding(
68            header::SERVER,
69            HeaderValue::from_static(constants::SERVER),
70        ))
71        .layer(SetResponseHeaderLayer::overriding(
72            HeaderName::from_static("cross-origin-resource-policy"),
73            HeaderValue::from_static("cross-origin"),
74        ))
75        .layer(NewSentryLayer::new_from_top())
76        .layer(SentryHttpLayer::new().enable_transaction())
77        .layer(middlewares::trace_http_layer())
78        .map_request(middlewares::remove_empty_encoding)
79        .layer(RequestDecompressionLayer::new())
80        .layer(
81            CompressionLayer::new()
82                .compress_when(SizeAbove::new(COMPRESSION_MIN_SIZE).and(DefaultPredicate::new())),
83        );
84
85    let router = f(service.config()).layer(middleware).with_state(service);
86
87    // Add middlewares that need to run _before_ routing, which need to wrap the router. This are
88    // especially middlewares that modify the request path for the router:
89    NormalizePath::new(router)
90}
91
92fn listen(addr: SocketAddr, config: &Config) -> Result<TcpListener, ServerError> {
93    let socket = match addr {
94        SocketAddr::V4(_) => TcpSocket::new_v4(),
95        SocketAddr::V6(_) => TcpSocket::new_v6(),
96    }?;
97
98    #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
99    socket.set_reuseport(true)?;
100    socket.bind(addr)?;
101    Ok(socket.listen(config.tcp_listen_backlog())?.into_std()?)
102}
103
104async fn serve(listener: TcpListener, app: App, config: &Config) -> std::io::Result<()> {
105    let handle = Handle::new();
106
107    let acceptor = self::acceptor::RelayAcceptor::new()
108        .tcp_keepalive(config.keepalive_timeout(), KEEPALIVE_RETRIES)
109        .idle_timeout(config.idle_timeout());
110
111    let mut server = axum_server::from_tcp(listener)
112        .acceptor(acceptor)
113        .handle(handle.clone());
114
115    server
116        .http_builder()
117        .http1()
118        .timer(TokioTimer::new())
119        .half_close(true)
120        .keep_alive(true)
121        .header_read_timeout(CLIENT_HEADER_TIMEOUT)
122        .writev(true);
123
124    server
125        .http_builder()
126        .http2()
127        .timer(TokioTimer::new())
128        .keep_alive_timeout(config.keepalive_timeout());
129
130    let service = ServiceBuilder::new()
131        .layer(concurrency_limit::ConcurrencyLimitLayer::new(
132            config.max_connections(),
133        ))
134        .service(ServiceExt::<Request>::into_make_service_with_connect_info::<SocketAddr>(app));
135
136    relay_system::spawn!(emit_active_connections_metric(
137        config.metrics_periodic_interval(),
138        handle.clone(),
139    ));
140
141    relay_system::spawn!(async move {
142        let Shutdown { timeout } = Controller::shutdown_handle().notified().await;
143        relay_log::info!("Shutting down HTTP server");
144
145        match timeout {
146            Some(timeout) => handle.graceful_shutdown(Some(timeout)),
147            None => handle.shutdown(),
148        }
149    });
150
151    server.serve(service).await
152}
153
154/// HTTP server service.
155///
156/// This is the main HTTP server of Relay which hosts all [services](ServiceState) and dispatches
157/// incoming traffic to them. The server stops when a [`Shutdown`] is triggered.
158pub struct HttpServer {
159    config: Arc<Config>,
160    service: ServiceState,
161    listener: TcpListener,
162    internal_listener: Option<TcpListener>,
163}
164
165impl HttpServer {
166    pub fn new(config: Arc<Config>, service: ServiceState) -> Result<Self, ServerError> {
167        // Inform the user about a removed feature.
168        if config.tls_listen_addr().is_some()
169            || config.tls_identity_password().is_some()
170            || config.tls_identity_path().is_some()
171        {
172            return Err(ServerError::TlsNotSupported);
173        }
174
175        let listener = listen(config.listen_addr(), &config)?;
176        let internal_listener = match config.listen_addr_internal() {
177            Some(addr) => Some(listen(addr, &config)?),
178            None => None,
179        };
180
181        Ok(Self {
182            config,
183            service,
184            listener,
185            internal_listener,
186        })
187    }
188}
189
190impl Service for HttpServer {
191    type Interface = ();
192
193    async fn run(self, _rx: relay_system::Receiver<Self::Interface>) {
194        let Self {
195            config,
196            service,
197            listener,
198            internal_listener,
199        } = self;
200
201        let listen_addr = config.listen_addr();
202
203        relay_log::info!("spawning http server");
204        relay_log::info!("  listening on http://{listen_addr}/");
205        if let Some(internal_addr) = config.listen_addr_internal() {
206            relay_log::info!("  listening on http://{internal_addr}/ [internal]");
207        }
208        relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1);
209
210        if let Some(internal_listener) = internal_listener {
211            let public = make_app(service.clone(), crate::endpoints::public_routes);
212            let internal = make_app(service, crate::endpoints::internal_routes);
213
214            tokio::try_join!(
215                serve(listener, public, &config),
216                serve(internal_listener, internal, &config),
217            )
218            .map(drop)
219        } else {
220            let app = make_app(service, crate::endpoints::all_routes);
221            serve(listener, app, &config).await
222        }
223        .expect("axum listener to not fail")
224    }
225}
226
227async fn emit_active_connections_metric(interval: Option<Duration>, handle: Handle) {
228    let Some(mut ticker) = interval.map(tokio::time::interval) else {
229        return;
230    };
231
232    let addr = handle.listening().await.map(|addr| addr.to_string());
233
234    loop {
235        ticker.tick().await;
236        relay_statsd::metric!(
237            gauge(RelayGauges::ServerActiveConnections) = handle.connection_count() as u64,
238            addr = addr.as_deref().unwrap_or("unknown"),
239        );
240    }
241}