relay_system/runtime/
spawn.rs

1use futures::Future;
2use tokio::task::JoinHandle;
3
4use crate::statsd::SystemGauges;
5use crate::{Service, ServiceObj};
6
7/// Spawns an instrumented task with an automatically generated [`TaskId`].
8///
9/// Returns a [`JoinHandle`].
10#[macro_export]
11macro_rules! spawn {
12    ($future:expr) => {{
13        static _PARTS: ::std::sync::OnceLock<(String, String, String)> =
14            ::std::sync::OnceLock::new();
15        let (id, file, line) = _PARTS.get_or_init(|| {
16            let caller = *::std::panic::Location::caller();
17            let id = format!("{}:{}", caller.file(), caller.line());
18            (id, caller.file().to_owned(), caller.line().to_string())
19        });
20        $crate::spawn(
21            $crate::TaskId::_from_location(id.as_str(), file.as_str(), line.as_str()),
22            $future,
23        )
24    }};
25}
26
27/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
28///
29/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
30#[allow(clippy::disallowed_methods)]
31pub fn spawn<F>(task_id: TaskId, future: F) -> JoinHandle<F::Output>
32where
33    F: Future + Send + 'static,
34    F::Output: Send + 'static,
35{
36    tokio::spawn(Task::new(task_id, future))
37}
38
39/// Spawns a new asynchronous task in a specific runtime, returning a [`JoinHandle`] for it.
40///
41/// This is in instrumented spawn variant of Tokio's [`Handle::spawn`](tokio::runtime::Handle::spawn).
42#[allow(clippy::disallowed_methods)]
43pub fn spawn_in<F>(
44    handle: &tokio::runtime::Handle,
45    task_id: TaskId,
46    future: F,
47) -> JoinHandle<F::Output>
48where
49    F: Future + Send + 'static,
50    F::Output: Send + 'static,
51{
52    handle.spawn(Task::new(task_id, future))
53}
54
55/// An identifier for tasks spawned by [`spawn()`], used to log metrics.
56#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
57pub struct TaskId {
58    id: &'static str,
59    file: Option<&'static str>,
60    line: Option<&'static str>,
61}
62
63impl TaskId {
64    /// Create a task ID based on the service's name.
65    pub fn for_service<S: Service>() -> Self {
66        Self {
67            id: S::name(),
68            file: None,
69            line: None,
70        }
71    }
72
73    #[doc(hidden)]
74    pub fn _from_location(id: &'static str, file: &'static str, line: &'static str) -> Self {
75        Self {
76            id,
77            file: Some(file),
78            line: Some(line),
79        }
80    }
81
82    pub(crate) fn id(&self) -> &'static str {
83        self.id
84    }
85}
86
87impl From<&ServiceObj> for TaskId {
88    fn from(value: &ServiceObj) -> Self {
89        Self {
90            id: value.name(),
91            file: None,
92            line: None,
93        }
94    }
95}
96
97pin_project_lite::pin_project! {
98    /// Wraps a future and emits related task metrics.
99    struct Task<T> {
100        id: TaskId,
101        #[pin]
102        inner: T,
103    }
104
105    impl<T> PinnedDrop for Task<T> {
106        fn drop(this: Pin<&mut Self>) {
107            relay_statsd::metric!(
108                gauge(SystemGauges::RuntimeTaskCount) -= 1,
109                id = this.id.id,
110                file = this.id.file.unwrap_or_default(),
111                line = this.id.line.unwrap_or_default()
112            );
113        }
114    }
115}
116
117impl<T> Task<T> {
118    fn new(id: TaskId, inner: T) -> Self {
119        relay_statsd::metric!(
120            gauge(SystemGauges::RuntimeTaskCount) += 1,
121            id = id.id,
122            file = id.file.unwrap_or_default(),
123            line = id.line.unwrap_or_default()
124        );
125        Self { id, inner }
126    }
127}
128
129impl<T: Future> Future for Task<T> {
130    type Output = T::Output;
131
132    fn poll(
133        self: std::pin::Pin<&mut Self>,
134        cx: &mut std::task::Context<'_>,
135    ) -> std::task::Poll<Self::Output> {
136        let this = self.project();
137        this.inner.poll(cx)
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use insta::assert_debug_snapshot;
144
145    use crate::{Service, TaskId};
146
147    #[test]
148    fn test_spawn_spawns_a_future() {
149        let rt = tokio::runtime::Builder::new_current_thread()
150            .build()
151            .unwrap();
152
153        let captures = relay_statsd::with_capturing_test_client(|| {
154            rt.block_on(async {
155                let _ = crate::spawn!(async {}).await;
156            })
157        });
158
159        #[cfg(not(windows))]
160        assert_debug_snapshot!(captures, @r#"
161        [
162            "runtime.tasks:+1|g|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155",
163            "runtime.tasks:-1|g|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155",
164        ]
165        "#);
166        #[cfg(windows)]
167        assert_debug_snapshot!(captures, @r###"
168        [
169            "runtime.tasks:+1|g|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155",
170            "runtime.tasks:-1|g|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155",
171        ]
172        "###);
173    }
174
175    #[test]
176    fn test_spawn_with_custom_id() {
177        struct Foo;
178        impl Service for Foo {
179            type Interface = ();
180            async fn run(self, _rx: crate::Receiver<Self::Interface>) {}
181        }
182
183        let rt = tokio::runtime::Builder::new_current_thread()
184            .build()
185            .unwrap();
186
187        let captures = relay_statsd::with_capturing_test_client(|| {
188            rt.block_on(async {
189                let _ = crate::spawn(TaskId::for_service::<Foo>(), async {}).await;
190            })
191        });
192
193        assert_debug_snapshot!(captures, @r#"
194        [
195            "runtime.tasks:+1|g|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:",
196            "runtime.tasks:-1|g|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:",
197        ]
198        "#);
199    }
200}