relay_system/runtime/
spawn.rs

1use futures::Future;
2use tokio::task::JoinHandle;
3
4use crate::statsd::SystemCounters;
5use crate::{Service, ServiceObj};
6
7/// Spawns an instrumented task with an automatically generated [`TaskId`].
8///
9/// Returns a [`JoinHandle`].
10#[macro_export]
11macro_rules! spawn {
12    ($future:expr) => {{
13        static _PARTS: ::std::sync::OnceLock<(String, String, String)> =
14            ::std::sync::OnceLock::new();
15        let (id, file, line) = _PARTS.get_or_init(|| {
16            let caller = *::std::panic::Location::caller();
17            let id = format!("{}:{}", caller.file(), caller.line());
18            (id, caller.file().to_owned(), caller.line().to_string())
19        });
20        $crate::spawn(
21            $crate::TaskId::_from_location(id.as_str(), file.as_str(), line.as_str()),
22            $future,
23        )
24    }};
25}
26
27/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
28///
29/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
30#[allow(clippy::disallowed_methods)]
31pub fn spawn<F>(task_id: TaskId, future: F) -> JoinHandle<F::Output>
32where
33    F: Future + Send + 'static,
34    F::Output: Send + 'static,
35{
36    tokio::spawn(Task::new(task_id, future))
37}
38
39/// Spawns a new asynchronous task in a specific runtime, returning a [`JoinHandle`] for it.
40///
41/// This is in instrumented spawn variant of Tokio's [`Handle::spawn`](tokio::runtime::Handle::spawn).
42#[allow(clippy::disallowed_methods)]
43pub fn spawn_in<F>(
44    handle: &tokio::runtime::Handle,
45    task_id: TaskId,
46    future: F,
47) -> JoinHandle<F::Output>
48where
49    F: Future + Send + 'static,
50    F::Output: Send + 'static,
51{
52    handle.spawn(Task::new(task_id, future))
53}
54
55/// An identifier for tasks spawned by [`spawn()`], used to log metrics.
56#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
57pub struct TaskId {
58    id: &'static str,
59    file: Option<&'static str>,
60    line: Option<&'static str>,
61}
62
63impl TaskId {
64    /// Create a task ID based on the service's name.
65    pub fn for_service<S: Service>() -> Self {
66        Self {
67            id: S::name(),
68            file: None,
69            line: None,
70        }
71    }
72
73    #[doc(hidden)]
74    pub fn _from_location(id: &'static str, file: &'static str, line: &'static str) -> Self {
75        Self {
76            id,
77            file: Some(file),
78            line: Some(line),
79        }
80    }
81
82    pub(crate) fn id(&self) -> &'static str {
83        self.id
84    }
85
86    fn emit_metric(&self, metric: SystemCounters) {
87        let Self { id, file, line } = self;
88        relay_statsd::metric!(
89            counter(metric) += 1,
90            id = id,
91            file = file.unwrap_or_default(),
92            line = line.unwrap_or_default()
93        );
94    }
95}
96
97impl From<&ServiceObj> for TaskId {
98    fn from(value: &ServiceObj) -> Self {
99        Self {
100            id: value.name(),
101            file: None,
102            line: None,
103        }
104    }
105}
106
107pin_project_lite::pin_project! {
108    /// Wraps a future and emits related task metrics.
109    struct Task<T> {
110        id: TaskId,
111        #[pin]
112        inner: T,
113    }
114
115    impl<T> PinnedDrop for Task<T> {
116        fn drop(this: Pin<&mut Self>) {
117            this.id.emit_metric(SystemCounters::RuntimeTaskTerminated);
118        }
119    }
120}
121
122impl<T> Task<T> {
123    fn new(id: TaskId, inner: T) -> Self {
124        id.emit_metric(SystemCounters::RuntimeTaskCreated);
125        Self { id, inner }
126    }
127}
128
129impl<T: Future> Future for Task<T> {
130    type Output = T::Output;
131
132    fn poll(
133        self: std::pin::Pin<&mut Self>,
134        cx: &mut std::task::Context<'_>,
135    ) -> std::task::Poll<Self::Output> {
136        let this = self.project();
137        this.inner.poll(cx)
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use insta::assert_debug_snapshot;
144
145    use crate::{Service, TaskId};
146
147    #[test]
148    fn test_spawn_spawns_a_future() {
149        let rt = tokio::runtime::Builder::new_current_thread()
150            .build()
151            .unwrap();
152
153        let captures = relay_statsd::with_capturing_test_client(|| {
154            rt.block_on(async {
155                let _ = crate::spawn!(async {}).await;
156            })
157        });
158
159        #[cfg(not(windows))]
160        assert_debug_snapshot!(captures, @r###"
161        [
162            "runtime.task.spawn.created:1|c|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155",
163            "runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155",
164        ]
165        "###);
166        #[cfg(windows)]
167        assert_debug_snapshot!(captures, @r###"
168        [
169            "runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155",
170            "runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155",
171        ]
172        "###);
173    }
174
175    #[test]
176    fn test_spawn_with_custom_id() {
177        struct Foo;
178        impl Service for Foo {
179            type Interface = ();
180            async fn run(self, _rx: crate::Receiver<Self::Interface>) {}
181        }
182
183        let rt = tokio::runtime::Builder::new_current_thread()
184            .build()
185            .unwrap();
186
187        let captures = relay_statsd::with_capturing_test_client(|| {
188            rt.block_on(async {
189                let _ = crate::spawn(TaskId::for_service::<Foo>(), async {}).await;
190            })
191        });
192
193        assert_debug_snapshot!(captures, @r###"
194        [
195            "runtime.task.spawn.created:1|c|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:",
196            "runtime.task.spawn.terminated:1|c|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:",
197        ]
198        "###);
199    }
200}