relay_system/service/mod.rs
1use std::fmt;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::future::{BoxFuture, Shared};
12use tokio::sync::{mpsc, oneshot};
13use tokio::time::MissedTickBehavior;
14
15use crate::statsd::SystemGauges;
16use crate::{TaskId, spawn};
17
18mod registry;
19mod status;
20
21pub(crate) use self::registry::Registry as ServiceRegistry;
22pub use self::registry::{ServiceId, ServiceMetrics, ServicesMetrics};
23pub use self::status::{
24 ServiceError, ServiceJoinHandle, ServiceStatusError, ServiceStatusJoinHandle,
25};
26
27/// Interval for recording backlog metrics on service channels.
28const BACKLOG_INTERVAL: Duration = Duration::from_secs(1);
29
30/// A message interface for [services](Service).
31///
32/// Most commonly, this interface is an enumeration of messages, but it can also be implemented on a
33/// single message. For each individual message, this type needs to implement the [`FromMessage`]
34/// trait.
35///
36/// # Implementating Interfaces
37///
38/// There are three main ways to implement interfaces, which depends on the number of messages and
39/// their return values. The simplest way is an interface consisting of a **single message** with
40/// **no return value**. For this case, use the message directly as interface and choose
41/// `NoResponse` as response:
42///
43/// ```
44/// use relay_system::{FromMessage, Interface, NoResponse};
45///
46/// #[derive(Debug)]
47/// pub struct MyMessage;
48///
49/// impl Interface for MyMessage {}
50///
51/// impl FromMessage<Self> for MyMessage {
52/// type Response = NoResponse;
53///
54/// fn from_message(message: Self, _: ()) -> Self {
55/// message
56/// }
57/// }
58/// ```
59///
60/// If there is a **single message with a return value**, implement the interface as a wrapper for
61/// the message and the return [`Sender`]:
62///
63/// ```
64/// use relay_system::{AsyncResponse, FromMessage, Interface, Sender};
65///
66/// #[derive(Debug)]
67/// pub struct MyMessage;
68///
69/// #[derive(Debug)]
70/// pub struct MyInterface(MyMessage, Sender<bool>);
71///
72/// impl Interface for MyInterface {}
73///
74/// impl FromMessage<MyMessage> for MyInterface {
75/// type Response = AsyncResponse<bool>;
76///
77/// fn from_message(message: MyMessage, sender: Sender<bool>) -> Self {
78/// Self(message, sender)
79/// }
80/// }
81/// ```
82///
83/// Finally, interfaces with **multiple messages** of any kind can most commonly be implemented
84/// through an enumeration for every message. The variants of messages with return values need a
85/// `Sender` again:
86///
87/// ```
88/// use relay_system::{AsyncResponse, FromMessage, Interface, NoResponse, Sender};
89///
90/// #[derive(Debug)]
91/// pub struct GetFlag;
92///
93/// #[derive(Debug)]
94/// pub struct SetFlag(pub bool);
95///
96/// #[derive(Debug)]
97/// pub enum MyInterface {
98/// Get(GetFlag, Sender<bool>),
99/// Set(SetFlag),
100/// }
101///
102/// impl Interface for MyInterface {}
103///
104/// impl FromMessage<GetFlag> for MyInterface {
105/// type Response = AsyncResponse<bool>;
106///
107/// fn from_message(message: GetFlag, sender: Sender<bool>) -> Self {
108/// Self::Get(message, sender)
109/// }
110/// }
111///
112/// impl FromMessage<SetFlag> for MyInterface {
113/// type Response = NoResponse;
114///
115/// fn from_message(message: SetFlag, _: ()) -> Self {
116/// Self::Set(message)
117/// }
118/// }
119/// ```
120///
121/// # Requirements
122///
123/// Interfaces are meant to be sent to services via channels. As such, they need to be both `Send`
124/// and `'static`. It is highly encouraged to implement `Debug` on all interfaces and their
125/// messages.
126pub trait Interface: Send + 'static {}
127
128/// Services without messages can use `()` as their interface.
129impl Interface for () {}
130
131/// An error when [sending](Addr::send) a message to a service fails.
132#[derive(Clone, Copy, Debug, PartialEq)]
133pub struct SendError;
134
135impl fmt::Display for SendError {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 write!(f, "failed to send message to service")
138 }
139}
140
141impl std::error::Error for SendError {}
142
143/// Response behavior of an [`Interface`] message.
144///
145/// It defines how a service handles and responds to interface messages, such as through
146/// asynchronous responses or fire-and-forget without responding. [`FromMessage`] implementations
147/// declare this behavior on the interface.
148///
149/// See [`FromMessage`] for more information on how to use this trait.
150pub trait MessageResponse {
151 /// Sends responses from the service back to the waiting recipient.
152 type Sender;
153
154 /// The type returned from [`Addr::send`].
155 ///
156 /// This type can be either synchronous and asynchronous based on the responder.
157 type Output;
158
159 /// Returns the response channel for an interface message.
160 fn channel() -> (Self::Sender, Self::Output);
161}
162
163/// The request when sending an asynchronous message to a service.
164///
165/// This is returned from [`Addr::send`] when the message responds asynchronously through
166/// [`AsyncResponse`]. It is a future that should be awaited. The message still runs to
167/// completion if this future is dropped.
168pub struct Request<T>(oneshot::Receiver<T>);
169
170impl<T> fmt::Debug for Request<T> {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 f.debug_struct("Request").finish_non_exhaustive()
173 }
174}
175
176impl<T> Future for Request<T> {
177 type Output = Result<T, SendError>;
178
179 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
180 Pin::new(&mut self.0)
181 .poll(cx)
182 .map(|r| r.map_err(|_| SendError))
183 }
184}
185
186/// Sends a message response from a service back to the waiting [`Request`].
187///
188/// The sender is part of an [`AsyncResponse`] and should be moved into the service interface
189/// type. If this sender is dropped without calling [`send`](Self::send), the request fails with
190/// [`SendError`].
191pub struct Sender<T>(oneshot::Sender<T>);
192
193impl<T> fmt::Debug for Sender<T> {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 f.debug_struct("Sender")
196 .field("open", &!self.0.is_closed())
197 .finish()
198 }
199}
200
201impl<T> Sender<T> {
202 /// Sends the response value and closes the [`Request`].
203 ///
204 /// This silently drops the value if the request has been dropped.
205 pub fn send(self, value: T) {
206 self.0.send(value).ok();
207 }
208}
209
210/// Message response resulting in an asynchronous [`Request`].
211///
212/// The sender must be placed on the interface in [`FromMessage::from_message`].
213///
214/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
215pub struct AsyncResponse<T>(PhantomData<T>);
216
217impl<T> fmt::Debug for AsyncResponse<T> {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 f.write_str("AsyncResponse")
220 }
221}
222
223impl<T> MessageResponse for AsyncResponse<T> {
224 type Sender = Sender<T>;
225 type Output = Request<T>;
226
227 fn channel() -> (Self::Sender, Self::Output) {
228 let (tx, rx) = oneshot::channel();
229 (Sender(tx), Request(rx))
230 }
231}
232
233/// Message response for fire-and-forget messages with no output.
234///
235/// There is no sender associated to this response. When implementing [`FromMessage`], the sender
236/// can be ignored.
237///
238/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
239pub struct NoResponse;
240
241impl fmt::Debug for NoResponse {
242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243 f.write_str("NoResponse")
244 }
245}
246
247impl MessageResponse for NoResponse {
248 type Sender = ();
249 type Output = ();
250
251 fn channel() -> (Self::Sender, Self::Output) {
252 ((), ())
253 }
254}
255
256/// Initial response to a [`BroadcastRequest`].
257#[derive(Debug)]
258enum InitialResponse<T> {
259 /// The response value is immediately ready.
260 ///
261 /// The sender did not attach to a broadcast channel and instead resolved the requested value
262 /// immediately. The request is now ready and can resolve. See [`BroadcastChannel::attach`].
263 Ready(T),
264 /// The sender is attached to a channel that needs to be polled.
265 Poll(Shared<oneshot::Receiver<T>>),
266}
267
268/// States of a [`BroadcastRequest`].
269enum BroadcastState<T> {
270 /// The request is waiting for an initial response.
271 Pending(oneshot::Receiver<InitialResponse<T>>),
272 /// The request is attached to a [`BroadcastChannel`].
273 Attached(Shared<oneshot::Receiver<T>>),
274}
275
276/// The request when sending an asynchronous message to a service.
277///
278/// This is returned from [`Addr::send`] when the message responds asynchronously through
279/// [`BroadcastResponse`]. It is a future that should be awaited. The message still runs to
280/// completion if this future is dropped.
281///
282/// # Panics
283///
284/// This future is not fused and panics if it is polled again after it has resolved.
285pub struct BroadcastRequest<T>(BroadcastState<T>)
286where
287 T: Clone;
288
289impl<T: Clone> Future for BroadcastRequest<T> {
290 type Output = Result<T, SendError>;
291
292 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
293 Poll::Ready(loop {
294 match self.0 {
295 BroadcastState::Pending(ref mut pending) => {
296 match futures::ready!(Pin::new(pending).poll(cx)) {
297 Ok(InitialResponse::Ready(value)) => break Ok(value),
298 Ok(InitialResponse::Poll(shared)) => {
299 self.0 = BroadcastState::Attached(shared)
300 }
301 Err(_) => break Err(SendError),
302 }
303 }
304 BroadcastState::Attached(ref mut shared) => {
305 match futures::ready!(Pin::new(shared).poll(cx)) {
306 Ok(value) => break Ok(value),
307 Err(_) => break Err(SendError),
308 }
309 }
310 }
311 })
312 }
313}
314
315/// A channel that broadcasts values to attached [senders](BroadcastSender).
316///
317/// This is part of the [`BroadcastResponse`] message behavior to efficiently send delayed responses
318/// to a large number of senders. All requests that are attached to this channel via their senders
319/// resolve with the same value.
320///
321/// # Example
322///
323/// ```
324/// use relay_system::{BroadcastChannel, BroadcastSender};
325///
326/// struct MyService {
327/// channel: Option<BroadcastChannel<String>>,
328/// }
329///
330/// impl MyService {
331/// fn handle_message(&mut self, sender: BroadcastSender<String>) {
332/// if let Some(ref mut channel) = self.channel {
333/// channel.attach(sender);
334/// } else {
335/// self.channel = Some(sender.into_channel());
336/// }
337/// }
338///
339/// fn finish_compute(&mut self, value: String) {
340/// if let Some(channel) = self.channel.take() {
341/// channel.send(value);
342/// }
343/// }
344/// }
345/// ```
346#[derive(Debug)]
347pub struct BroadcastChannel<T>
348where
349 T: Clone,
350{
351 tx: oneshot::Sender<T>,
352 rx: Shared<oneshot::Receiver<T>>,
353}
354
355impl<T: Clone> BroadcastChannel<T> {
356 /// Creates a standalone channel.
357 ///
358 /// Use [`attach`](Self::attach) to add senders to this channel. Alternatively, create a channel
359 /// with [`BroadcastSender::into_channel`].
360 pub fn new() -> Self {
361 let (tx, rx) = oneshot::channel();
362 Self {
363 tx,
364 rx: rx.shared(),
365 }
366 }
367
368 /// Attaches a sender of another message to this channel to receive the same value.
369 ///
370 /// # Example
371 ///
372 /// ```
373 /// use relay_system::{BroadcastChannel, BroadcastResponse, BroadcastSender};
374 /// # use relay_system::MessageResponse;
375 ///
376 /// // This is usually done as part of `Addr::send`
377 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
378 ///
379 /// let mut channel = BroadcastChannel::new();
380 /// channel.attach(sender);
381 /// ```
382 pub fn attach(&mut self, sender: BroadcastSender<T>) {
383 sender.0.send(InitialResponse::Poll(self.rx.clone())).ok();
384 }
385
386 /// Sends a value to all attached senders and closes the channel.
387 ///
388 /// This method succeeds even if no senders are attached to this channel anymore. To check if
389 /// this channel is still active with senders attached, use [`is_attached`](Self::is_attached).
390 ///
391 /// # Example
392 ///
393 /// ```
394 /// use relay_system::BroadcastResponse;
395 /// # use relay_system::MessageResponse;
396 /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
397 ///
398 /// // This is usually done as part of `Addr::send`
399 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
400 ///
401 /// let channel = sender.into_channel();
402 /// channel.send("test");
403 /// assert_eq!(rx.await, Ok("test"));
404 /// # })
405 /// ```
406 pub fn send(self, value: T) {
407 self.tx.send(value).ok();
408 }
409
410 /// Returns `true` if there are [requests](BroadcastRequest) waiting for this channel.
411 ///
412 /// The channel is not permanently closed when all waiting requests have detached. A new sender
413 /// can be attached using [`attach`](Self::attach) even after this method returns `false`.
414 ///
415 /// # Example
416 ///
417 /// ```
418 /// use relay_system::BroadcastResponse;
419 /// # use relay_system::MessageResponse;
420 ///
421 /// // This is usually done as part of `Addr::send`
422 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
423 ///
424 /// let channel = sender.into_channel();
425 /// assert!(channel.is_attached());
426 ///
427 /// drop(rx);
428 /// assert!(!channel.is_attached());
429 /// ```
430 pub fn is_attached(&self) -> bool {
431 self.rx.strong_count() > Some(1)
432 }
433}
434
435impl<T: Clone> Default for BroadcastChannel<T> {
436 fn default() -> Self {
437 Self::new()
438 }
439}
440
441/// Sends a message response from a service back to the waiting [`BroadcastRequest`].
442///
443/// The sender is part of an [`BroadcastResponse`] and should be moved into the service interface
444/// type. If this sender is dropped without calling [`send`](Self::send), the request fails with
445/// [`SendError`].
446///
447/// As opposed to the regular [`Sender`] for asynchronous messages, this sender can be converted
448/// into a [channel](Self::into_channel) that efficiently shares a common response for multiple
449/// requests to the same data value. This is useful if resolving or computing the value takes more
450/// time.
451///
452/// # Example
453///
454/// ```
455/// use relay_system::BroadcastResponse;
456/// # use relay_system::MessageResponse;
457/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
458///
459/// // This is usually done as part of `Addr::send`
460/// let (sender1, rx1) = BroadcastResponse::<&str>::channel();
461/// let (sender2, rx2) = BroadcastResponse::<&str>::channel();
462///
463/// // On the first time, convert the sender into a channel
464/// let mut channel = sender1.into_channel();
465///
466/// // The second time, attach the sender to the existing channel
467/// channel.attach(sender2);
468///
469/// // Send a value into the channel to resolve all requests simultaneously
470/// channel.send("test");
471/// assert_eq!(rx1.await, Ok("test"));
472/// assert_eq!(rx2.await, Ok("test"));
473/// # })
474/// ```
475#[derive(Debug)]
476pub struct BroadcastSender<T>(oneshot::Sender<InitialResponse<T>>)
477where
478 T: Clone;
479
480impl<T: Clone> BroadcastSender<T> {
481 /// Immediately resolve a ready value.
482 ///
483 /// This bypasses shared channels and directly sends the a value to the waiting
484 /// [request](BroadcastRequest). In terms of performance and behavior, using `send` is
485 /// equivalent to calling [`Sender::send`] for a regular [`AsyncResponse`].
486 ///
487 /// # Example
488 ///
489 /// ```
490 /// use relay_system::BroadcastResponse;
491 /// # use relay_system::MessageResponse;
492 /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
493 ///
494 /// // This is usually done as part of `Addr::send`
495 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
496 ///
497 /// // sender is NOT converted into a channel!
498 ///
499 /// sender.send("test");
500 /// assert_eq!(rx.await, Ok("test"));
501 /// # })
502 /// ```
503 pub fn send(self, value: T) {
504 self.0.send(InitialResponse::Ready(value)).ok();
505 }
506
507 /// Creates a channel from this sender that can be shared with other senders.
508 ///
509 /// To add more senders to the created channel at a later point, use
510 /// [`attach`](BroadcastChannel::attach).
511 ///
512 /// # Example
513 ///
514 /// ```
515 /// use relay_system::{BroadcastChannel, BroadcastResponse};
516 /// # use relay_system::MessageResponse;
517 ///
518 /// // This is usually done as part of `Addr::send`
519 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
520 ///
521 /// let channel: BroadcastChannel<&str> = sender.into_channel();
522 /// ```
523 pub fn into_channel(self) -> BroadcastChannel<T> {
524 let mut channel = BroadcastChannel::new();
525 channel.attach(self);
526 channel
527 }
528}
529
530/// Variation of [`AsyncResponse`] that efficiently broadcasts responses to many requests.
531///
532/// This response behavior is useful for services that cache or debounce requests. Instead of
533/// responding to each equivalent request via its individual sender, the broadcast behavior allows
534/// to create a [`BroadcastChannel`] that efficiently resolves all pending requests once the value
535/// is ready.
536///
537/// Similar to `AsyncResponse`, the service receives a sender that it can use to send a value
538/// directly back to the waiting request. Additionally, the sender can be converted into a channel
539/// or attached to an already existing channel, if the service expects more requests while computing
540/// the response.
541///
542/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
543pub struct BroadcastResponse<T>(PhantomData<T>)
544where
545 T: Clone;
546
547impl<T: Clone> fmt::Debug for BroadcastResponse<T> {
548 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549 f.write_str("BroadcastResponse")
550 }
551}
552
553impl<T: Clone> MessageResponse for BroadcastResponse<T> {
554 type Sender = BroadcastSender<T>;
555 type Output = BroadcastRequest<T>;
556
557 fn channel() -> (Self::Sender, Self::Output) {
558 let (tx, rx) = oneshot::channel();
559 (
560 BroadcastSender(tx),
561 BroadcastRequest(BroadcastState::Pending(rx)),
562 )
563 }
564}
565
566/// Declares a message as part of an [`Interface`].
567///
568/// Messages have an associated `Response` type that determines the return value of sending the
569/// message. Within an interface, the responder can vary for each message. There are two provided
570/// responders.
571///
572/// # No Response
573///
574/// [`NoResponse`] is used for fire-and-forget messages that do not return any values. These
575/// messages do not spawn futures and cannot be awaited. It is neither possible to verify whether
576/// the message was delivered to the service.
577///
578/// When implementing `FromMessage` for such messages, the second argument can be ignored by
579/// convention:
580///
581/// ```
582/// use relay_system::{FromMessage, Interface, NoResponse};
583///
584/// struct MyMessage;
585///
586/// enum MyInterface {
587/// MyMessage(MyMessage),
588/// // ...
589/// }
590///
591/// impl Interface for MyInterface {}
592///
593/// impl FromMessage<MyMessage> for MyInterface {
594/// type Response = NoResponse;
595///
596/// fn from_message(message: MyMessage, _: ()) -> Self {
597/// Self::MyMessage(message)
598/// }
599/// }
600/// ```
601///
602/// # Asynchronous Responses
603///
604/// [`AsyncResponse`] is used for messages that resolve to some future value. This value is sent
605/// back by the service through a [`Sender`], which must be added into the interface:
606///
607/// ```
608/// use relay_system::{AsyncResponse, FromMessage, Interface, Sender};
609///
610/// struct MyMessage;
611///
612/// enum MyInterface {
613/// MyMessage(MyMessage, Sender<bool>),
614/// // ...
615/// }
616///
617/// impl Interface for MyInterface {}
618///
619/// impl FromMessage<MyMessage> for MyInterface {
620/// type Response = AsyncResponse<bool>;
621///
622/// fn from_message(message: MyMessage, sender: Sender<bool>) -> Self {
623/// Self::MyMessage(message, sender)
624/// }
625/// }
626/// ```
627///
628/// # Broadcast Responses
629///
630/// [`BroadcastResponse`] is similar to the previous asynchronous response, but it additionally
631/// allows to efficiently handle duplicate requests for services that debounce equivalent requests
632/// or cache results. On the requesting side, this behavior is identical to the asynchronous
633/// behavior, but it provides more utilities to the implementing service.
634///
635/// ```
636/// use relay_system::{BroadcastResponse, BroadcastSender, FromMessage, Interface};
637///
638/// struct MyMessage;
639///
640/// enum MyInterface {
641/// MyMessage(MyMessage, BroadcastSender<bool>),
642/// // ...
643/// }
644///
645/// impl Interface for MyInterface {}
646///
647/// impl FromMessage<MyMessage> for MyInterface {
648/// type Response = BroadcastResponse<bool>;
649///
650/// fn from_message(message: MyMessage, sender: BroadcastSender<bool>) -> Self {
651/// Self::MyMessage(message, sender)
652/// }
653/// }
654/// ```
655///
656/// See [`Interface`] for more examples on how to build interfaces using this trait and [`Service`]
657/// documentation for patterns and advice to handle messages.
658pub trait FromMessage<M>: Interface {
659 /// The behavior declaring the return value when sending this message.
660 type Response: MessageResponse;
661
662 /// Converts the message into the service interface.
663 fn from_message(message: M, sender: <Self::Response as MessageResponse>::Sender) -> Self;
664}
665
666/// Abstraction over address types for service channels.
667trait SendDispatch<M>: Send + Sync {
668 /// The behavior declaring the return value when sending this message.
669 ///
670 /// When this is implemented for a type bound to an [`Interface`], this is the same behavior as
671 /// used in [`FromMessage::Response`].
672 type Response: MessageResponse;
673
674 /// Sends a message to the service and returns the response.
675 ///
676 /// See [`Addr::send`] for more information on a concrete type.
677 fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output;
678
679 /// Returns a trait object of this type.
680 fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>>;
681}
682
683/// An address to a [`Service`] implementing any interface that takes a given message.
684///
685/// This is similar to an [`Addr`], but it is bound to a single message rather than an interface. As
686/// such, this type is not meant for communicating with a service implementation, but rather as a
687/// handle to any service that can consume a given message. These can be back-channels or hooks that
688/// are configured externally through Inversion of Control (IoC).
689///
690/// Recipients are created through [`Addr::recipient`].
691pub struct Recipient<M, R> {
692 inner: Box<dyn SendDispatch<M, Response = R>>,
693}
694
695impl<M, R> Recipient<M, R>
696where
697 R: MessageResponse,
698{
699 /// Sends a message to the service and returns the response.
700 ///
701 /// This is equivalent to [`send`](Addr::send) on the originating address.
702 pub fn send(&self, message: M) -> R::Output {
703 self.inner.send(message)
704 }
705}
706
707// Manual implementation since `XSender` cannot require `Clone` for object safety.
708impl<M, R: MessageResponse> Clone for Recipient<M, R> {
709 fn clone(&self) -> Self {
710 Self {
711 inner: self.inner.to_trait_object(),
712 }
713 }
714}
715
716/// The address of a [`Service`].
717///
718/// Addresses allow to [send](Self::send) messages to a service that implements a corresponding
719/// [`Interface`] as long as the service is running.
720///
721/// Addresses can be freely cloned. When the last clone is dropped, the message channel of the
722/// service closes permanently, which signals to the service that it can shut down.
723pub struct Addr<I: Interface> {
724 tx: mpsc::UnboundedSender<I>,
725 queue_size: Arc<AtomicU64>,
726}
727
728impl<I: Interface> Addr<I> {
729 /// Sends a message to the service and returns the response.
730 ///
731 /// Depending on the message's response behavior, this either returns a future resolving to the
732 /// return value, or does not return anything for fire-and-forget messages. The communication
733 /// channel with the service is unbounded, so backlogs could occur when sending too many
734 /// messages.
735 ///
736 /// Sending asynchronous messages can fail with `Err(SendError)` if the service has shut down.
737 /// The result of asynchronous messages does not have to be awaited. The message will be
738 /// delivered and handled regardless:
739 pub fn send<M>(&self, message: M) -> <I::Response as MessageResponse>::Output
740 where
741 I: FromMessage<M>,
742 {
743 let (tx, rx) = I::Response::channel();
744 self.queue_size.fetch_add(1, Ordering::SeqCst);
745 self.tx.send(I::from_message(message, tx)).ok(); // it's ok to drop, the response will fail
746 rx
747 }
748
749 /// Returns a handle that can receive a given message independent of the interface.
750 ///
751 /// See [`Recipient`] for more information and examples.
752 pub fn recipient<M>(self) -> Recipient<M, I::Response>
753 where
754 I: FromMessage<M>,
755 {
756 Recipient {
757 inner: Box::new(self),
758 }
759 }
760
761 /// Returns wether the queue is currently empty.
762 pub fn is_empty(&self) -> bool {
763 self.len() == 0
764 }
765
766 /// Returns the current queue size.
767 pub fn len(&self) -> u64 {
768 self.queue_size.load(Ordering::Relaxed)
769 }
770
771 /// Custom address used for testing.
772 ///
773 /// Returns the receiving end of the channel for inspection.
774 pub fn custom() -> (Self, mpsc::UnboundedReceiver<I>) {
775 let (tx, rx) = mpsc::unbounded_channel();
776 (
777 Addr {
778 tx,
779 queue_size: Default::default(),
780 },
781 rx,
782 )
783 }
784
785 /// Dummy address used for testing.
786 pub fn dummy() -> Self {
787 Self::custom().0
788 }
789}
790
791impl<I: Interface> fmt::Debug for Addr<I> {
792 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
793 f.debug_struct("Addr")
794 .field("open", &!self.tx.is_closed())
795 .field("queue_size", &self.queue_size.load(Ordering::Relaxed))
796 .finish()
797 }
798}
799
800// Manually derive `Clone` since we do not require `I: Clone` and the Clone derive adds this
801// constraint.
802impl<I: Interface> Clone for Addr<I> {
803 fn clone(&self) -> Self {
804 Self {
805 tx: self.tx.clone(),
806 queue_size: self.queue_size.clone(),
807 }
808 }
809}
810
811impl<I, M> SendDispatch<M> for Addr<I>
812where
813 I: Interface + FromMessage<M>,
814{
815 type Response = <I as FromMessage<M>>::Response;
816
817 fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output {
818 Addr::send(self, message)
819 }
820
821 fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>> {
822 Box::new(self.clone())
823 }
824}
825
826/// Inbound channel for messages sent through an [`Addr`].
827///
828/// This channel is meant to be polled in a [`Service`].
829///
830/// Instances are created automatically when [spawning](ServiceSpawn) a service, or can be
831/// created through [`channel`]. The channel closes when all associated [`Addr`]s are dropped.
832pub struct Receiver<I: Interface> {
833 rx: mpsc::UnboundedReceiver<I>,
834 name: &'static str,
835 interval: tokio::time::Interval,
836 queue_size: Arc<AtomicU64>,
837}
838
839impl<I: Interface> Receiver<I> {
840 /// Receives the next value for this receiver.
841 ///
842 /// This method returns `None` if the channel has been closed and there are
843 /// no remaining messages in the channel's buffer. This indicates that no
844 /// further values can ever be received from this `Receiver`. The channel is
845 /// closed when all senders have been dropped.
846 ///
847 /// If there are no messages in the channel's buffer, but the channel has
848 /// not yet been closed, this method will sleep until a message is sent or
849 /// the channel is closed.
850 pub async fn recv(&mut self) -> Option<I> {
851 loop {
852 tokio::select! {
853 biased;
854
855 _ = self.interval.tick() => {
856 let backlog = self.queue_size.load(Ordering::Relaxed);
857 relay_statsd::metric!(
858 gauge(SystemGauges::ServiceBackPressure) = backlog,
859 service = self.name
860 );
861 },
862 message = self.rx.recv() => {
863 self.queue_size.fetch_sub(1, Ordering::SeqCst);
864 return message;
865 },
866 }
867 }
868 }
869}
870
871impl<I: Interface> fmt::Debug for Receiver<I> {
872 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873 f.debug_struct("Receiver")
874 .field("name", &self.name)
875 .field("queue_size", &self.queue_size.load(Ordering::Relaxed))
876 .finish()
877 }
878}
879
880/// Creates an unbounded channel for communicating with a [`Service`].
881///
882/// The `Addr` as the sending part provides public access to the service, while the `Receiver`
883/// should remain internal to the service.
884pub fn channel<I: Interface>(name: &'static str) -> (Addr<I>, Receiver<I>) {
885 let queue_size = Arc::new(AtomicU64::new(0));
886 let (tx, rx) = mpsc::unbounded_channel();
887
888 let addr = Addr {
889 tx,
890 queue_size: queue_size.clone(),
891 };
892
893 let mut interval = tokio::time::interval(BACKLOG_INTERVAL);
894 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
895
896 let receiver = Receiver {
897 rx,
898 name,
899 interval,
900 queue_size,
901 };
902
903 (addr, receiver)
904}
905
906/// An asynchronous unit responding to messages.
907///
908/// Services receive messages conforming to some [`Interface`] through an [`Addr`] and handle them
909/// one by one. Internally, services are free to concurrently process these messages or not, most
910/// probably should.
911///
912/// Individual messages can have a response which will be sent once the message is handled by the
913/// service. The sender can asynchronously await the responses of such messages.
914///
915/// To start a service, create a service runner and call [`ServiceSpawnExt::start`].
916///
917/// # Implementing Services
918///
919/// The standard way to implement services is through the `run` function. It receives an inbound
920/// channel for all messages sent through the service's address. Note that this function is
921/// synchronous, so that this needs to spawn at least one task internally:
922///
923/// ```no_run
924/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service, ServiceSpawnExt};
925/// # fn test(services: &dyn relay_system::ServiceSpawn) {
926/// struct MyMessage;
927///
928/// impl Interface for MyMessage {}
929///
930/// impl FromMessage<Self> for MyMessage {
931/// type Response = NoResponse;
932///
933/// fn from_message(message: Self, _: ()) -> Self {
934/// message
935/// }
936/// }
937///
938/// struct MyService;
939///
940/// impl Service for MyService {
941/// type Interface = MyMessage;
942///
943/// async fn run(self, mut rx: Receiver<Self::Interface>) {
944/// while let Some(message) = rx.recv().await {
945/// // handle the message
946/// }
947/// }
948/// }
949///
950/// let addr = services.start(MyService);
951/// # }
952/// ```
953///
954/// ## Debounce and Caching
955///
956/// Services that cache or debounce their responses can benefit from the [`BroadcastResponse`]
957/// behavior. To use this behavior, implement the message and interface identical to
958/// [`AsyncResponse`] above. This will provide a different sender type that can be converted into a
959/// channel to debounce responses. It is still possible to send values directly via the sender
960/// without a broadcast channel.
961///
962/// ```
963/// use std::collections::btree_map::{BTreeMap, Entry};
964/// use relay_system::{BroadcastChannel, BroadcastSender};
965///
966/// // FromMessage implementation using BroadcastResponse omitted for brevity.
967///
968/// struct MyService {
969/// cache: BTreeMap<u32, String>,
970/// channels: BTreeMap<u32, BroadcastChannel<String>>,
971/// }
972///
973/// impl MyService {
974/// fn handle_message(&mut self, id: u32, sender: BroadcastSender<String>) {
975/// if let Some(cached) = self.cache.get(&id) {
976/// sender.send(cached.clone());
977/// return;
978/// }
979///
980/// match self.channels.entry(id) {
981/// Entry::Vacant(entry) => {
982/// entry.insert(sender.into_channel());
983/// // Start async computation here.
984/// }
985/// Entry::Occupied(mut entry) => {
986/// entry.get_mut().attach(sender);
987/// }
988/// }
989/// }
990///
991/// fn finish_compute(&mut self, id: u32, value: String) {
992/// if let Some(channel) = self.channels.remove(&id) {
993/// channel.send(value.clone());
994/// }
995///
996/// self.cache.insert(id, value);
997/// }
998/// }
999/// ```
1000///
1001pub trait Service: Sized {
1002 /// The interface of messages this service implements.
1003 ///
1004 /// The interface can be a single message type or an enumeration of all the messages that
1005 /// can be handled by this service.
1006 type Interface: Interface;
1007
1008 /// Defines the main task of this service.
1009 ///
1010 /// `run` typically contains a loop that reads from `rx`, or a `select!` that reads
1011 /// from multiple sources at once.
1012 fn run(self, rx: Receiver<Self::Interface>) -> impl Future<Output = ()> + Send + 'static;
1013
1014 /// Starts the service in the current runtime and returns an address for it.
1015 ///
1016 /// The service runs in a detached tokio task that cannot be joined on. This is mainly useful
1017 /// for tests.
1018 fn start_detached(self) -> Addr<Self::Interface> {
1019 let (addr, rx) = channel(Self::name());
1020 spawn(TaskId::for_service::<Self>(), self.run(rx));
1021 addr
1022 }
1023
1024 /// Returns a unique name for this service implementation.
1025 ///
1026 /// This is used for internal diagnostics and uses the fully qualified type name of the service
1027 /// implementor by default.
1028 fn name() -> &'static str {
1029 std::any::type_name::<Self>()
1030 }
1031}
1032
1033/// The [`ServiceSpawn`] trait allows for starting a [`Service`] on an executor that will run them to completion.
1034///
1035/// Often you want to spawn your service directly via [`ServiceSpawnExt`].
1036pub trait ServiceSpawn {
1037 /// Starts the service on the executor.
1038 fn start_obj(&self, service: ServiceObj);
1039}
1040
1041/// Extension trait for [`ServiceSpawn`], providing more convenient methods to spawn a service.
1042pub trait ServiceSpawnExt {
1043 /// Starts a service and starts tracking its join handle, exposing an [`Addr`] for message passing.
1044 fn start<S: Service>(&self, service: S) -> Addr<S::Interface>;
1045
1046 /// Starts a service and starts tracking its join handle, given a predefined receiver.
1047 fn start_with<S: Service>(&self, service: S, rx: Receiver<S::Interface>);
1048}
1049
1050impl<T: ServiceSpawn + ?Sized> ServiceSpawnExt for T {
1051 fn start<S: Service>(&self, service: S) -> Addr<S::Interface> {
1052 let (addr, rx) = crate::channel(S::name());
1053 self.start_with(service, rx);
1054 addr
1055 }
1056
1057 fn start_with<S: Service>(&self, service: S, rx: Receiver<S::Interface>) {
1058 self.start_obj(ServiceObj::new(service, rx));
1059 }
1060}
1061
1062/// Type erased [`Service`].
1063///
1064/// Used to start a service using [`ServiceSpawn`] and usually not directly interacted with.
1065/// Use [`ServiceSpawnExt`] when possible instead.
1066pub struct ServiceObj {
1067 name: &'static str,
1068 future: BoxFuture<'static, ()>,
1069}
1070
1071impl ServiceObj {
1072 /// Creates a new bundled type erased [`Service`], that can be started using [`ServiceSpawn`].
1073 pub fn new<S: Service>(service: S, rx: Receiver<S::Interface>) -> Self {
1074 Self {
1075 name: S::name(),
1076 future: service.run(rx).boxed(),
1077 }
1078 }
1079
1080 /// Returns the name of the service.
1081 ///
1082 /// The name of the service is inferred from [`Service::name`].
1083 pub fn name(&self) -> &'static str {
1084 self.name
1085 }
1086}
1087
1088#[cfg(feature = "test")]
1089/// A [`ServiceSpawn`] implementation which spawns a service on the current Tokio runtime.
1090pub struct TokioServiceSpawn;
1091
1092#[cfg(feature = "test")]
1093impl ServiceSpawn for TokioServiceSpawn {
1094 #[track_caller]
1095 fn start_obj(&self, service: ServiceObj) {
1096 crate::spawn!(service.future);
1097 }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102 use super::*;
1103
1104 struct MockMessage;
1105
1106 impl Interface for MockMessage {}
1107
1108 impl FromMessage<Self> for MockMessage {
1109 type Response = NoResponse;
1110
1111 fn from_message(message: Self, _: ()) -> Self {
1112 message
1113 }
1114 }
1115
1116 struct MockService;
1117
1118 impl Service for MockService {
1119 type Interface = MockMessage;
1120
1121 async fn run(self, mut rx: Receiver<Self::Interface>) {
1122 while rx.recv().await.is_some() {
1123 tokio::time::sleep(BACKLOG_INTERVAL * 2).await;
1124 }
1125 }
1126
1127 fn name() -> &'static str {
1128 "mock"
1129 }
1130 }
1131
1132 #[test]
1133 fn test_backpressure_metrics() {
1134 let rt = tokio::runtime::Builder::new_current_thread()
1135 .enable_time()
1136 .build()
1137 .unwrap();
1138
1139 let _guard = rt.enter();
1140 tokio::time::pause();
1141
1142 // Mock service takes 2 * BACKLOG_INTERVAL for every message
1143 let addr = MockService.start_detached();
1144
1145 // Advance the timer by a tiny offset to trigger the first metric emission.
1146 let captures = relay_statsd::with_capturing_test_client(|| {
1147 rt.block_on(async {
1148 tokio::time::sleep(Duration::from_millis(10)).await;
1149 })
1150 });
1151
1152 assert_eq!(captures, ["service.back_pressure:0|g|#service:mock"]);
1153
1154 // Send messages and advance to 0.5 * INTERVAL. No metrics expected at this point.
1155 let captures = relay_statsd::with_capturing_test_client(|| {
1156 rt.block_on(async {
1157 addr.send(MockMessage); // will be pulled immediately
1158 addr.send(MockMessage);
1159 addr.send(MockMessage);
1160
1161 tokio::time::sleep(BACKLOG_INTERVAL / 2).await;
1162 })
1163 });
1164
1165 assert!(captures.is_empty());
1166
1167 // Advance to 6.5 * INTERVAL. The service should pull the first message immediately, another
1168 // message every 2 INTERVALS. The messages are fully handled after 6 INTERVALS, but we
1169 // cannot observe that since the last message exits the queue at 4.
1170 let captures = relay_statsd::with_capturing_test_client(|| {
1171 rt.block_on(async {
1172 tokio::time::sleep(BACKLOG_INTERVAL * 6).await;
1173 })
1174 });
1175
1176 assert_eq!(
1177 captures,
1178 [
1179 "service.back_pressure:2|g|#service:mock", // 2 * INTERVAL
1180 "service.back_pressure:1|g|#service:mock", // 4 * INTERVAL
1181 "service.back_pressure:0|g|#service:mock", // 6 * INTERVAL
1182 ]
1183 );
1184 }
1185}