objectstore_server/web/
app.rs

1use std::net::SocketAddr;
2use std::time::Duration;
3
4use anyhow::Result;
5use axum::ServiceExt;
6use axum::extract::Request;
7use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer};
8use tokio::net::TcpListener;
9use tower::ServiceBuilder;
10use tower_http::catch_panic::CatchPanicLayer;
11use tower_http::metrics::InFlightRequestsLayer;
12use tower_http::metrics::in_flight_requests::InFlightRequestsCounter;
13use tower_http::trace::{DefaultOnFailure, TraceLayer};
14use tracing::Level;
15
16use crate::endpoints;
17use crate::state::ServiceState;
18use crate::web::middleware as m;
19
20/// Interval for emitting the in-flight requests gauge metric.
21const IN_FLIGHT_INTERVAL: Duration = Duration::from_secs(1);
22
23/// The objectstore web server application.
24#[derive(Debug)]
25pub struct App {
26    router: axum::Router,
27    in_flight_requests: InFlightRequestsCounter,
28    graceful_shutdown: bool,
29}
30
31impl App {
32    /// Creates a new application router for the given service state.
33    ///
34    /// The applications sets up middlewares and routes for the objectstore web API. Use
35    /// [`serve`](Self::serve) to run the server future.
36    pub fn new(state: ServiceState) -> Self {
37        let (in_flight_layer, in_flight_requests) = InFlightRequestsLayer::pair();
38
39        // Build the router middleware into a single service which runs _after_ routing. Service
40        // builder order defines layers added first will be called first. This means:
41        //  - Requests go from top to bottom
42        //  - Responses go from bottom to top
43        let middleware = ServiceBuilder::new()
44            .layer(axum::middleware::from_fn(m::emit_request_metrics))
45            .layer(in_flight_layer)
46            .layer(CatchPanicLayer::custom(m::handle_panic))
47            .layer(m::set_server_header())
48            .layer(NewSentryLayer::new_from_top())
49            .layer(SentryHttpLayer::new().enable_transaction())
50            .layer(
51                TraceLayer::new_for_http()
52                    .make_span_with(m::make_http_span)
53                    .on_failure(DefaultOnFailure::new().level(Level::DEBUG)),
54            );
55
56        let router = endpoints::routes().layer(middleware).with_state(state);
57
58        App {
59            router,
60            in_flight_requests,
61            graceful_shutdown: false,
62        }
63    }
64
65    /// Enables or disables graceful shutdown for the server.
66    ///
67    /// By default, graceful shutdown is disabled.
68    pub fn graceful_shutdown(mut self, enable: bool) -> Self {
69        self.graceful_shutdown = enable;
70        self
71    }
72
73    /// Runs the web server until graceful shutdown is triggered.
74    ///
75    /// This function creates a future that runs the server. The future must be spawned or awaited for
76    /// the server to continue running.
77    pub async fn serve(self, listener: TcpListener) -> Result<()> {
78        let Self {
79            router,
80            in_flight_requests,
81            graceful_shutdown,
82        } = self;
83
84        let service =
85            ServiceExt::<Request>::into_make_service_with_connect_info::<SocketAddr>(router);
86
87        let guard = if graceful_shutdown {
88            Some(elegant_departure::get_shutdown_guard())
89        } else {
90            None
91        };
92
93        let server = async {
94            if let Some(ref guard) = guard {
95                axum::serve(listener, service)
96                    .with_graceful_shutdown(guard.wait_owned())
97                    .await
98            } else {
99                axum::serve(listener, service).await
100            }
101        };
102
103        let emitter = in_flight_requests.run_emitter(IN_FLIGHT_INTERVAL, |count| async move {
104            merni::gauge!("server.requests.in_flight": count);
105        });
106
107        let (serve_result, _) = tokio::join!(server, emitter);
108        serve_result?;
109
110        Ok(())
111    }
112}