relay_server/services/server/
mod.rs1use std::net::{SocketAddr, TcpListener};
2use std::sync::Arc;
3use std::time::Duration;
4
5use axum::extract::Request;
6use axum::http::{header, HeaderName, HeaderValue};
7use axum::ServiceExt;
8use axum_server::Handle;
9use hyper_util::rt::TokioTimer;
10use relay_config::Config;
11use relay_system::{Controller, Service, Shutdown};
12use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer};
13use tokio::net::TcpSocket;
14use tower::ServiceBuilder;
15use tower_http::compression::predicate::SizeAbove;
16use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate};
17use tower_http::set_header::SetResponseHeaderLayer;
18
19use crate::constants;
20use crate::middlewares::{
21 self, BodyTimingLayer, CatchPanicLayer, NormalizePath, RequestDecompressionLayer,
22};
23use crate::service::ServiceState;
24use crate::statsd::{RelayCounters, RelayGauges};
25
26mod acceptor;
27mod concurrency_limit;
28mod io;
29
30const KEEPALIVE_RETRIES: u32 = 5;
33
34const CLIENT_HEADER_TIMEOUT: Duration = Duration::from_secs(5);
37
38const COMPRESSION_MIN_SIZE: u16 = 128;
44
45#[allow(clippy::enum_variant_names)]
47#[derive(Debug, thiserror::Error)]
48pub enum ServerError {
49 #[error("bind to interface failed")]
51 BindFailed(#[from] std::io::Error),
52
53 #[error("SSL is no longer supported by Relay, please use a proxy in front")]
55 TlsNotSupported,
56}
57
58type App = NormalizePath<axum::Router>;
59
60fn make_app(service: ServiceState) -> App {
62 let middleware = ServiceBuilder::new()
67 .layer(BodyTimingLayer)
68 .layer(axum::middleware::from_fn(middlewares::metrics))
69 .layer(CatchPanicLayer::custom(middlewares::handle_panic))
70 .layer(SetResponseHeaderLayer::overriding(
71 header::SERVER,
72 HeaderValue::from_static(constants::SERVER),
73 ))
74 .layer(SetResponseHeaderLayer::overriding(
75 HeaderName::from_static("cross-origin-resource-policy"),
76 HeaderValue::from_static("cross-origin"),
77 ))
78 .layer(NewSentryLayer::new_from_top())
79 .layer(SentryHttpLayer::with_transaction())
80 .layer(middlewares::trace_http_layer())
81 .map_request(middlewares::remove_empty_encoding)
82 .layer(RequestDecompressionLayer::new())
83 .layer(
84 CompressionLayer::new()
85 .compress_when(SizeAbove::new(COMPRESSION_MIN_SIZE).and(DefaultPredicate::new())),
86 );
87
88 let router = crate::endpoints::routes(service.config())
89 .layer(middleware)
90 .with_state(service);
91
92 NormalizePath::new(router)
95}
96
97fn listen(config: &Config) -> Result<TcpListener, ServerError> {
98 if config.tls_listen_addr().is_some()
100 || config.tls_identity_password().is_some()
101 || config.tls_identity_path().is_some()
102 {
103 return Err(ServerError::TlsNotSupported);
104 }
105
106 let addr = config.listen_addr();
107 let socket = match addr {
108 SocketAddr::V4(_) => TcpSocket::new_v4(),
109 SocketAddr::V6(_) => TcpSocket::new_v6(),
110 }?;
111
112 #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
113 socket.set_reuseport(true)?;
114 socket.bind(addr)?;
115 Ok(socket.listen(config.tcp_listen_backlog())?.into_std()?)
116}
117
118async fn serve(listener: TcpListener, app: App, config: Arc<Config>) {
119 let handle = Handle::new();
120
121 let acceptor = self::acceptor::RelayAcceptor::new()
122 .tcp_keepalive(config.keepalive_timeout(), KEEPALIVE_RETRIES)
123 .idle_timeout(config.idle_timeout());
124
125 let mut server = axum_server::from_tcp(listener)
126 .acceptor(acceptor)
127 .handle(handle.clone());
128
129 server
130 .http_builder()
131 .http1()
132 .timer(TokioTimer::new())
133 .half_close(true)
134 .keep_alive(true)
135 .header_read_timeout(CLIENT_HEADER_TIMEOUT)
136 .writev(true);
137
138 server
139 .http_builder()
140 .http2()
141 .timer(TokioTimer::new())
142 .keep_alive_timeout(config.keepalive_timeout());
143
144 let service = ServiceBuilder::new()
145 .layer(concurrency_limit::ConcurrencyLimitLayer::new(
146 config.max_connections(),
147 ))
148 .service(ServiceExt::<Request>::into_make_service_with_connect_info::<SocketAddr>(app));
149
150 relay_system::spawn!(emit_active_connections_metric(
151 config.metrics_periodic_interval(),
152 handle.clone(),
153 ));
154
155 relay_system::spawn!(async move {
156 let Shutdown { timeout } = Controller::shutdown_handle().notified().await;
157 relay_log::info!("Shutting down HTTP server");
158
159 match timeout {
160 Some(timeout) => handle.graceful_shutdown(Some(timeout)),
161 None => handle.shutdown(),
162 }
163 });
164
165 server
166 .serve(service)
167 .await
168 .expect("failed to start axum server");
169}
170
171pub struct HttpServer {
176 config: Arc<Config>,
177 service: ServiceState,
178 listener: TcpListener,
179}
180
181impl HttpServer {
182 pub fn new(config: Arc<Config>, service: ServiceState) -> Result<Self, ServerError> {
183 let listener = listen(&config)?;
184
185 Ok(Self {
186 config,
187 service,
188 listener,
189 })
190 }
191}
192
193impl Service for HttpServer {
194 type Interface = ();
195
196 async fn run(self, _rx: relay_system::Receiver<Self::Interface>) {
197 let Self {
198 config,
199 service,
200 listener,
201 } = self;
202
203 relay_log::info!("spawning http server");
204 relay_log::info!(" listening on http://{}/", config.listen_addr());
205 relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1);
206
207 let app = make_app(service);
208 serve(listener, app, config).await;
209 }
210}
211
212async fn emit_active_connections_metric(interval: Option<Duration>, handle: Handle) {
213 let Some(mut ticker) = interval.map(tokio::time::interval) else {
214 return;
215 };
216
217 loop {
218 ticker.tick().await;
219 relay_statsd::metric!(
220 gauge(RelayGauges::ServerActiveConnections) = handle.connection_count() as u64
221 );
222 }
223}