relay_server/services/server/
mod.rs

1use std::net::{SocketAddr, TcpListener};
2use std::sync::Arc;
3use std::time::Duration;
4
5use axum::extract::Request;
6use axum::http::{header, HeaderName, HeaderValue};
7use axum::ServiceExt;
8use axum_server::Handle;
9use hyper_util::rt::TokioTimer;
10use relay_config::Config;
11use relay_system::{Controller, Service, Shutdown};
12use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer};
13use tokio::net::TcpSocket;
14use tower::ServiceBuilder;
15use tower_http::compression::predicate::SizeAbove;
16use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate};
17use tower_http::set_header::SetResponseHeaderLayer;
18
19use crate::constants;
20use crate::middlewares::{
21    self, BodyTimingLayer, CatchPanicLayer, NormalizePath, RequestDecompressionLayer,
22};
23use crate::service::ServiceState;
24use crate::statsd::{RelayCounters, RelayGauges};
25
26mod acceptor;
27mod concurrency_limit;
28mod io;
29
30/// Set the number of keep-alive retransmissions to be carried out before declaring that remote end
31/// is not available.
32const KEEPALIVE_RETRIES: u32 = 5;
33
34/// Set a timeout for reading client request headers. If a client does not transmit the entire
35/// header within this time, the connection is closed.
36const CLIENT_HEADER_TIMEOUT: Duration = Duration::from_secs(5);
37
38/// Only compress responses above this configured size, in bytes.
39///
40/// Small responses don't benefit from compression,
41/// additionally the envelope endpoint which returns the event id
42/// should not be compressed due to a bug in the Unity SDK.
43const COMPRESSION_MIN_SIZE: u16 = 128;
44
45/// Indicates the type of failure of the server.
46#[allow(clippy::enum_variant_names)]
47#[derive(Debug, thiserror::Error)]
48pub enum ServerError {
49    /// Binding failed.
50    #[error("bind to interface failed")]
51    BindFailed(#[from] std::io::Error),
52
53    /// TLS support was not compiled in.
54    #[error("SSL is no longer supported by Relay, please use a proxy in front")]
55    TlsNotSupported,
56}
57
58type App = NormalizePath<axum::Router>;
59
60/// Build the axum application with all routes and middleware.
61fn make_app(service: ServiceState) -> App {
62    // Build the router middleware into a single service which runs _after_ routing. Service
63    // builder order defines layers added first will be called first. This means:
64    //  - Requests go from top to bottom
65    //  - Responses go from bottom to top
66    let middleware = ServiceBuilder::new()
67        .layer(BodyTimingLayer)
68        .layer(axum::middleware::from_fn(middlewares::metrics))
69        .layer(CatchPanicLayer::custom(middlewares::handle_panic))
70        .layer(SetResponseHeaderLayer::overriding(
71            header::SERVER,
72            HeaderValue::from_static(constants::SERVER),
73        ))
74        .layer(SetResponseHeaderLayer::overriding(
75            HeaderName::from_static("cross-origin-resource-policy"),
76            HeaderValue::from_static("cross-origin"),
77        ))
78        .layer(NewSentryLayer::new_from_top())
79        .layer(SentryHttpLayer::with_transaction())
80        .layer(middlewares::trace_http_layer())
81        .map_request(middlewares::remove_empty_encoding)
82        .layer(RequestDecompressionLayer::new())
83        .layer(
84            CompressionLayer::new()
85                .compress_when(SizeAbove::new(COMPRESSION_MIN_SIZE).and(DefaultPredicate::new())),
86        );
87
88    let router = crate::endpoints::routes(service.config())
89        .layer(middleware)
90        .with_state(service);
91
92    // Add middlewares that need to run _before_ routing, which need to wrap the router. This are
93    // especially middlewares that modify the request path for the router:
94    NormalizePath::new(router)
95}
96
97fn listen(config: &Config) -> Result<TcpListener, ServerError> {
98    // Inform the user about a removed feature.
99    if config.tls_listen_addr().is_some()
100        || config.tls_identity_password().is_some()
101        || config.tls_identity_path().is_some()
102    {
103        return Err(ServerError::TlsNotSupported);
104    }
105
106    let addr = config.listen_addr();
107    let socket = match addr {
108        SocketAddr::V4(_) => TcpSocket::new_v4(),
109        SocketAddr::V6(_) => TcpSocket::new_v6(),
110    }?;
111
112    #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
113    socket.set_reuseport(true)?;
114    socket.bind(addr)?;
115    Ok(socket.listen(config.tcp_listen_backlog())?.into_std()?)
116}
117
118async fn serve(listener: TcpListener, app: App, config: Arc<Config>) {
119    let handle = Handle::new();
120
121    let acceptor = self::acceptor::RelayAcceptor::new()
122        .tcp_keepalive(config.keepalive_timeout(), KEEPALIVE_RETRIES)
123        .idle_timeout(config.idle_timeout());
124
125    let mut server = axum_server::from_tcp(listener)
126        .acceptor(acceptor)
127        .handle(handle.clone());
128
129    server
130        .http_builder()
131        .http1()
132        .timer(TokioTimer::new())
133        .half_close(true)
134        .keep_alive(true)
135        .header_read_timeout(CLIENT_HEADER_TIMEOUT)
136        .writev(true);
137
138    server
139        .http_builder()
140        .http2()
141        .timer(TokioTimer::new())
142        .keep_alive_timeout(config.keepalive_timeout());
143
144    let service = ServiceBuilder::new()
145        .layer(concurrency_limit::ConcurrencyLimitLayer::new(
146            config.max_connections(),
147        ))
148        .service(ServiceExt::<Request>::into_make_service_with_connect_info::<SocketAddr>(app));
149
150    relay_system::spawn!(emit_active_connections_metric(
151        config.metrics_periodic_interval(),
152        handle.clone(),
153    ));
154
155    relay_system::spawn!(async move {
156        let Shutdown { timeout } = Controller::shutdown_handle().notified().await;
157        relay_log::info!("Shutting down HTTP server");
158
159        match timeout {
160            Some(timeout) => handle.graceful_shutdown(Some(timeout)),
161            None => handle.shutdown(),
162        }
163    });
164
165    server
166        .serve(service)
167        .await
168        .expect("failed to start axum server");
169}
170
171/// HTTP server service.
172///
173/// This is the main HTTP server of Relay which hosts all [services](ServiceState) and dispatches
174/// incoming traffic to them. The server stops when a [`Shutdown`] is triggered.
175pub struct HttpServer {
176    config: Arc<Config>,
177    service: ServiceState,
178    listener: TcpListener,
179}
180
181impl HttpServer {
182    pub fn new(config: Arc<Config>, service: ServiceState) -> Result<Self, ServerError> {
183        let listener = listen(&config)?;
184
185        Ok(Self {
186            config,
187            service,
188            listener,
189        })
190    }
191}
192
193impl Service for HttpServer {
194    type Interface = ();
195
196    async fn run(self, _rx: relay_system::Receiver<Self::Interface>) {
197        let Self {
198            config,
199            service,
200            listener,
201        } = self;
202
203        relay_log::info!("spawning http server");
204        relay_log::info!("  listening on http://{}/", config.listen_addr());
205        relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1);
206
207        let app = make_app(service);
208        serve(listener, app, config).await;
209    }
210}
211
212async fn emit_active_connections_metric(interval: Option<Duration>, handle: Handle) {
213    let Some(mut ticker) = interval.map(tokio::time::interval) else {
214        return;
215    };
216
217    loop {
218        ticker.tick().await;
219        relay_statsd::metric!(
220            gauge(RelayGauges::ServerActiveConnections) = handle.connection_count() as u64
221        );
222    }
223}