objectstore_server/web/
app.rs1use std::net::SocketAddr;
2use std::time::Duration;
3
4use anyhow::Result;
5use axum::ServiceExt;
6use axum::extract::Request;
7use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer};
8use tokio::net::TcpListener;
9use tower::ServiceBuilder;
10use tower_http::catch_panic::CatchPanicLayer;
11use tower_http::metrics::InFlightRequestsLayer;
12use tower_http::metrics::in_flight_requests::InFlightRequestsCounter;
13use tower_http::trace::{DefaultOnFailure, TraceLayer};
14use tracing::Level;
15
16use crate::endpoints;
17use crate::state::ServiceState;
18use crate::web::middleware as m;
19
20const IN_FLIGHT_INTERVAL: Duration = Duration::from_secs(1);
22
23#[derive(Debug)]
25pub struct App {
26 router: axum::Router,
27 in_flight_requests: InFlightRequestsCounter,
28 graceful_shutdown: bool,
29}
30
31impl App {
32 pub fn new(state: ServiceState) -> Self {
37 let (in_flight_layer, in_flight_requests) = InFlightRequestsLayer::pair();
38
39 let middleware = ServiceBuilder::new()
44 .layer(axum::middleware::from_fn(m::emit_request_metrics))
45 .layer(in_flight_layer)
46 .layer(CatchPanicLayer::custom(m::handle_panic))
47 .layer(m::set_server_header())
48 .layer(NewSentryLayer::new_from_top())
49 .layer(SentryHttpLayer::new().enable_transaction())
50 .layer(
51 TraceLayer::new_for_http()
52 .make_span_with(m::make_http_span)
53 .on_failure(DefaultOnFailure::new().level(Level::DEBUG)),
54 );
55
56 let router = endpoints::routes().layer(middleware).with_state(state);
57
58 App {
59 router,
60 in_flight_requests,
61 graceful_shutdown: false,
62 }
63 }
64
65 pub fn graceful_shutdown(mut self, enable: bool) -> Self {
69 self.graceful_shutdown = enable;
70 self
71 }
72
73 pub async fn serve(self, listener: TcpListener) -> Result<()> {
78 let Self {
79 router,
80 in_flight_requests,
81 graceful_shutdown,
82 } = self;
83
84 let service =
85 ServiceExt::<Request>::into_make_service_with_connect_info::<SocketAddr>(router);
86
87 let guard = if graceful_shutdown {
88 Some(elegant_departure::get_shutdown_guard())
89 } else {
90 None
91 };
92
93 let server = async {
94 if let Some(ref guard) = guard {
95 axum::serve(listener, service)
96 .with_graceful_shutdown(guard.wait_owned())
97 .await
98 } else {
99 axum::serve(listener, service).await
100 }
101 };
102
103 let emitter = in_flight_requests.run_emitter(IN_FLIGHT_INTERVAL, |count| async move {
104 merni::gauge!("server.requests.in_flight": count);
105 });
106
107 let (serve_result, _) = tokio::join!(server, emitter);
108 serve_result?;
109
110 Ok(())
111 }
112}