relay_system/runtime/
spawn.rs1use futures::Future;
2use tokio::task::JoinHandle;
3
4use crate::statsd::SystemCounters;
5use crate::{Service, ServiceObj};
6
7#[macro_export]
11macro_rules! spawn {
12 ($future:expr) => {{
13 static _PARTS: ::std::sync::OnceLock<(String, String, String)> =
14 ::std::sync::OnceLock::new();
15 let (id, file, line) = _PARTS.get_or_init(|| {
16 let caller = *::std::panic::Location::caller();
17 let id = format!("{}:{}", caller.file(), caller.line());
18 (id, caller.file().to_owned(), caller.line().to_string())
19 });
20 $crate::spawn(
21 $crate::TaskId::_from_location(id.as_str(), file.as_str(), line.as_str()),
22 $future,
23 )
24 }};
25}
26
27#[allow(clippy::disallowed_methods)]
31pub fn spawn<F>(task_id: TaskId, future: F) -> JoinHandle<F::Output>
32where
33 F: Future + Send + 'static,
34 F::Output: Send + 'static,
35{
36 tokio::spawn(Task::new(task_id, future))
37}
38
39#[allow(clippy::disallowed_methods)]
43pub fn spawn_in<F>(
44 handle: &tokio::runtime::Handle,
45 task_id: TaskId,
46 future: F,
47) -> JoinHandle<F::Output>
48where
49 F: Future + Send + 'static,
50 F::Output: Send + 'static,
51{
52 handle.spawn(Task::new(task_id, future))
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
57pub struct TaskId {
58 id: &'static str,
59 file: Option<&'static str>,
60 line: Option<&'static str>,
61}
62
63impl TaskId {
64 pub fn for_service<S: Service>() -> Self {
66 Self {
67 id: S::name(),
68 file: None,
69 line: None,
70 }
71 }
72
73 #[doc(hidden)]
74 pub fn _from_location(id: &'static str, file: &'static str, line: &'static str) -> Self {
75 Self {
76 id,
77 file: Some(file),
78 line: Some(line),
79 }
80 }
81
82 pub(crate) fn id(&self) -> &'static str {
83 self.id
84 }
85
86 fn emit_metric(&self, metric: SystemCounters) {
87 let Self { id, file, line } = self;
88 relay_statsd::metric!(
89 counter(metric) += 1,
90 id = id,
91 file = file.unwrap_or_default(),
92 line = line.unwrap_or_default()
93 );
94 }
95}
96
97impl From<&ServiceObj> for TaskId {
98 fn from(value: &ServiceObj) -> Self {
99 Self {
100 id: value.name(),
101 file: None,
102 line: None,
103 }
104 }
105}
106
107pin_project_lite::pin_project! {
108 struct Task<T> {
110 id: TaskId,
111 #[pin]
112 inner: T,
113 }
114
115 impl<T> PinnedDrop for Task<T> {
116 fn drop(this: Pin<&mut Self>) {
117 this.id.emit_metric(SystemCounters::RuntimeTaskTerminated);
118 }
119 }
120}
121
122impl<T> Task<T> {
123 fn new(id: TaskId, inner: T) -> Self {
124 id.emit_metric(SystemCounters::RuntimeTaskCreated);
125 Self { id, inner }
126 }
127}
128
129impl<T: Future> Future for Task<T> {
130 type Output = T::Output;
131
132 fn poll(
133 self: std::pin::Pin<&mut Self>,
134 cx: &mut std::task::Context<'_>,
135 ) -> std::task::Poll<Self::Output> {
136 let this = self.project();
137 this.inner.poll(cx)
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use insta::assert_debug_snapshot;
144
145 use crate::{Service, TaskId};
146
147 #[test]
148 fn test_spawn_spawns_a_future() {
149 let rt = tokio::runtime::Builder::new_current_thread()
150 .build()
151 .unwrap();
152
153 let captures = relay_statsd::with_capturing_test_client(|| {
154 rt.block_on(async {
155 let _ = crate::spawn!(async {}).await;
156 })
157 });
158
159 #[cfg(not(windows))]
160 assert_debug_snapshot!(captures, @r###"
161 [
162 "runtime.task.spawn.created:1|c|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155",
163 "runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155",
164 ]
165 "###);
166 #[cfg(windows)]
167 assert_debug_snapshot!(captures, @r###"
168 [
169 "runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155",
170 "runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155",
171 ]
172 "###);
173 }
174
175 #[test]
176 fn test_spawn_with_custom_id() {
177 struct Foo;
178 impl Service for Foo {
179 type Interface = ();
180 async fn run(self, _rx: crate::Receiver<Self::Interface>) {}
181 }
182
183 let rt = tokio::runtime::Builder::new_current_thread()
184 .build()
185 .unwrap();
186
187 let captures = relay_statsd::with_capturing_test_client(|| {
188 rt.block_on(async {
189 let _ = crate::spawn(TaskId::for_service::<Foo>(), async {}).await;
190 })
191 });
192
193 assert_debug_snapshot!(captures, @r###"
194 [
195 "runtime.task.spawn.created:1|c|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:",
196 "runtime.task.spawn.terminated:1|c|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:",
197 ]
198 "###);
199 }
200}