relay_server/services/server/
mod.rs1use std::net::{SocketAddr, TcpListener};
2use std::sync::Arc;
3use std::time::Duration;
4
5use axum::ServiceExt;
6use axum::extract::Request;
7use axum::http::{HeaderName, HeaderValue, header};
8use axum_server::Handle;
9use hyper_util::rt::TokioTimer;
10use relay_config::Config;
11use relay_system::{Controller, Service, Shutdown};
12use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer};
13use tokio::net::TcpSocket;
14use tower::ServiceBuilder;
15use tower_http::compression::predicate::SizeAbove;
16use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate};
17use tower_http::set_header::SetResponseHeaderLayer;
18
19use crate::constants;
20use crate::middlewares::{self, CatchPanicLayer, NormalizePath, RequestDecompressionLayer};
21use crate::service::ServiceState;
22use crate::statsd::{RelayCounters, RelayGauges};
23
24mod acceptor;
25mod concurrency_limit;
26mod io;
27
28const KEEPALIVE_RETRIES: u32 = 5;
31
32const CLIENT_HEADER_TIMEOUT: Duration = Duration::from_secs(5);
35
36const COMPRESSION_MIN_SIZE: u16 = 128;
42
43#[allow(clippy::enum_variant_names)]
45#[derive(Debug, thiserror::Error)]
46pub enum ServerError {
47 #[error("bind to interface failed")]
49 BindFailed(#[from] std::io::Error),
50
51 #[error("SSL is no longer supported by Relay, please use a proxy in front")]
53 TlsNotSupported,
54}
55
56type App = NormalizePath<axum::Router>;
57
58fn make_app(service: ServiceState, f: impl FnOnce(&Config) -> axum::Router<ServiceState>) -> App {
60 let middleware = ServiceBuilder::new()
65 .layer(axum::middleware::from_fn(middlewares::metrics))
66 .layer(CatchPanicLayer::custom(middlewares::handle_panic))
67 .layer(SetResponseHeaderLayer::overriding(
68 header::SERVER,
69 HeaderValue::from_static(constants::SERVER),
70 ))
71 .layer(SetResponseHeaderLayer::overriding(
72 HeaderName::from_static("cross-origin-resource-policy"),
73 HeaderValue::from_static("cross-origin"),
74 ))
75 .layer(NewSentryLayer::new_from_top())
76 .layer(SentryHttpLayer::new().enable_transaction())
77 .layer(middlewares::trace_http_layer())
78 .map_request(middlewares::remove_empty_encoding)
79 .layer(RequestDecompressionLayer::new())
80 .layer(
81 CompressionLayer::new()
82 .compress_when(SizeAbove::new(COMPRESSION_MIN_SIZE).and(DefaultPredicate::new())),
83 );
84
85 let router = f(service.config()).layer(middleware).with_state(service);
86
87 NormalizePath::new(router)
90}
91
92fn listen(addr: SocketAddr, config: &Config) -> Result<TcpListener, ServerError> {
93 let socket = match addr {
94 SocketAddr::V4(_) => TcpSocket::new_v4(),
95 SocketAddr::V6(_) => TcpSocket::new_v6(),
96 }?;
97
98 #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
99 socket.set_reuseport(true)?;
100 socket.bind(addr)?;
101 Ok(socket.listen(config.tcp_listen_backlog())?.into_std()?)
102}
103
104async fn serve(listener: TcpListener, app: App, config: &Config) -> std::io::Result<()> {
105 let handle = Handle::new();
106
107 let acceptor = self::acceptor::RelayAcceptor::new()
108 .tcp_keepalive(config.keepalive_timeout(), KEEPALIVE_RETRIES)
109 .idle_timeout(config.idle_timeout());
110
111 let mut server = axum_server::from_tcp(listener)
112 .acceptor(acceptor)
113 .handle(handle.clone());
114
115 server
116 .http_builder()
117 .http1()
118 .timer(TokioTimer::new())
119 .half_close(true)
120 .keep_alive(true)
121 .header_read_timeout(CLIENT_HEADER_TIMEOUT)
122 .writev(true);
123
124 server
125 .http_builder()
126 .http2()
127 .timer(TokioTimer::new())
128 .keep_alive_timeout(config.keepalive_timeout());
129
130 let service = ServiceBuilder::new()
131 .layer(concurrency_limit::ConcurrencyLimitLayer::new(
132 config.max_connections(),
133 ))
134 .service(ServiceExt::<Request>::into_make_service_with_connect_info::<SocketAddr>(app));
135
136 relay_system::spawn!(emit_active_connections_metric(
137 config.metrics_periodic_interval(),
138 handle.clone(),
139 ));
140
141 relay_system::spawn!(async move {
142 let Shutdown { timeout } = Controller::shutdown_handle().notified().await;
143 relay_log::info!("Shutting down HTTP server");
144
145 match timeout {
146 Some(timeout) => handle.graceful_shutdown(Some(timeout)),
147 None => handle.shutdown(),
148 }
149 });
150
151 server.serve(service).await
152}
153
154pub struct HttpServer {
159 config: Arc<Config>,
160 service: ServiceState,
161 listener: TcpListener,
162 internal_listener: Option<TcpListener>,
163}
164
165impl HttpServer {
166 pub fn new(config: Arc<Config>, service: ServiceState) -> Result<Self, ServerError> {
167 if config.tls_listen_addr().is_some()
169 || config.tls_identity_password().is_some()
170 || config.tls_identity_path().is_some()
171 {
172 return Err(ServerError::TlsNotSupported);
173 }
174
175 let listener = listen(config.listen_addr(), &config)?;
176 let internal_listener = match config.listen_addr_internal() {
177 Some(addr) => Some(listen(addr, &config)?),
178 None => None,
179 };
180
181 Ok(Self {
182 config,
183 service,
184 listener,
185 internal_listener,
186 })
187 }
188}
189
190impl Service for HttpServer {
191 type Interface = ();
192
193 async fn run(self, _rx: relay_system::Receiver<Self::Interface>) {
194 let Self {
195 config,
196 service,
197 listener,
198 internal_listener,
199 } = self;
200
201 let listen_addr = config.listen_addr();
202
203 relay_log::info!("spawning http server");
204 relay_log::info!(" listening on http://{listen_addr}/");
205 if let Some(internal_addr) = config.listen_addr_internal() {
206 relay_log::info!(" listening on http://{internal_addr}/ [internal]");
207 }
208 relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1);
209
210 if let Some(internal_listener) = internal_listener {
211 let public = make_app(service.clone(), crate::endpoints::public_routes);
212 let internal = make_app(service, crate::endpoints::internal_routes);
213
214 tokio::try_join!(
215 serve(listener, public, &config),
216 serve(internal_listener, internal, &config),
217 )
218 .map(drop)
219 } else {
220 let app = make_app(service, crate::endpoints::all_routes);
221 serve(listener, app, &config).await
222 }
223 .expect("axum listener to not fail")
224 }
225}
226
227async fn emit_active_connections_metric(interval: Option<Duration>, handle: Handle) {
228 let Some(mut ticker) = interval.map(tokio::time::interval) else {
229 return;
230 };
231
232 let addr = handle.listening().await.map(|addr| addr.to_string());
233
234 loop {
235 ticker.tick().await;
236 relay_statsd::metric!(
237 gauge(RelayGauges::ServerActiveConnections) = handle.connection_count() as u64,
238 addr = addr.as_deref().unwrap_or("unknown"),
239 );
240 }
241}