relay_system/service/
mod.rs

1use std::fmt;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::future::{BoxFuture, Shared};
12use tokio::sync::{mpsc, oneshot};
13use tokio::time::MissedTickBehavior;
14
15use crate::statsd::SystemGauges;
16use crate::{TaskId, spawn};
17
18mod registry;
19mod status;
20
21pub(crate) use self::registry::Registry as ServiceRegistry;
22pub use self::registry::{ServiceId, ServiceMetrics, ServicesMetrics};
23pub use self::status::{
24    ServiceError, ServiceJoinHandle, ServiceStatusError, ServiceStatusJoinHandle,
25};
26
27/// Interval for recording backlog metrics on service channels.
28const BACKLOG_INTERVAL: Duration = Duration::from_secs(1);
29
30/// A message interface for [services](Service).
31///
32/// Most commonly, this interface is an enumeration of messages, but it can also be implemented on a
33/// single message. For each individual message, this type needs to implement the [`FromMessage`]
34/// trait.
35///
36/// # Implementating Interfaces
37///
38/// There are three main ways to implement interfaces, which depends on the number of messages and
39/// their return values. The simplest way is an interface consisting of a **single message** with
40/// **no return value**. For this case, use the message directly as interface and choose
41/// `NoResponse` as response:
42///
43/// ```
44/// use relay_system::{FromMessage, Interface, NoResponse};
45///
46/// #[derive(Debug)]
47/// pub struct MyMessage;
48///
49/// impl Interface for MyMessage {}
50///
51/// impl FromMessage<Self> for MyMessage {
52///     type Response = NoResponse;
53///
54///     fn from_message(message: Self, _: ()) -> Self {
55///         message
56///     }
57/// }
58/// ```
59///
60/// If there is a **single message with a return value**, implement the interface as a wrapper for
61/// the message and the return [`Sender`]:
62///
63/// ```
64/// use relay_system::{AsyncResponse, FromMessage, Interface, Sender};
65///
66/// #[derive(Debug)]
67/// pub struct MyMessage;
68///
69/// #[derive(Debug)]
70/// pub struct MyInterface(MyMessage, Sender<bool>);
71///
72/// impl Interface for MyInterface {}
73///
74/// impl FromMessage<MyMessage> for MyInterface {
75///     type Response = AsyncResponse<bool>;
76///
77///     fn from_message(message: MyMessage, sender: Sender<bool>) -> Self {
78///         Self(message, sender)
79///     }
80/// }
81/// ```
82///
83/// Finally, interfaces with **multiple messages** of any kind can most commonly be implemented
84/// through an enumeration for every message. The variants of messages with return values need a
85/// `Sender` again:
86///
87/// ```
88/// use relay_system::{AsyncResponse, FromMessage, Interface, NoResponse, Sender};
89///
90/// #[derive(Debug)]
91/// pub struct GetFlag;
92///
93/// #[derive(Debug)]
94/// pub struct SetFlag(pub bool);
95///
96/// #[derive(Debug)]
97/// pub enum MyInterface {
98///     Get(GetFlag, Sender<bool>),
99///     Set(SetFlag),
100/// }
101///
102/// impl Interface for MyInterface {}
103///
104/// impl FromMessage<GetFlag> for MyInterface {
105///     type Response = AsyncResponse<bool>;
106///
107///     fn from_message(message: GetFlag, sender: Sender<bool>) -> Self {
108///         Self::Get(message, sender)
109///     }
110/// }
111///
112/// impl FromMessage<SetFlag> for MyInterface {
113///     type Response = NoResponse;
114///
115///     fn from_message(message: SetFlag, _: ()) -> Self {
116///         Self::Set(message)
117///     }
118/// }
119/// ```
120///
121/// # Requirements
122///
123/// Interfaces are meant to be sent to services via channels. As such, they need to be both `Send`
124/// and `'static`. It is highly encouraged to implement `Debug` on all interfaces and their
125/// messages.
126pub trait Interface: Send + 'static {}
127
128/// Services without messages can use `()` as their interface.
129impl Interface for () {}
130
131/// An error when [sending](Addr::send) a message to a service fails.
132#[derive(Clone, Copy, Debug, PartialEq)]
133pub struct SendError;
134
135impl fmt::Display for SendError {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        write!(f, "failed to send message to service")
138    }
139}
140
141impl std::error::Error for SendError {}
142
143/// Response behavior of an [`Interface`] message.
144///
145/// It defines how a service handles and responds to interface messages, such as through
146/// asynchronous responses or fire-and-forget without responding. [`FromMessage`] implementations
147/// declare this behavior on the interface.
148///
149/// See [`FromMessage`] for more information on how to use this trait.
150pub trait MessageResponse {
151    /// Sends responses from the service back to the waiting recipient.
152    type Sender;
153
154    /// The type returned from [`Addr::send`].
155    ///
156    /// This type can be either synchronous and asynchronous based on the responder.
157    type Output;
158
159    /// Returns the response channel for an interface message.
160    fn channel() -> (Self::Sender, Self::Output);
161}
162
163/// The request when sending an asynchronous message to a service.
164///
165/// This is returned from [`Addr::send`] when the message responds asynchronously through
166/// [`AsyncResponse`]. It is a future that should be awaited. The message still runs to
167/// completion if this future is dropped.
168pub struct Request<T>(oneshot::Receiver<T>);
169
170impl<T> fmt::Debug for Request<T> {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        f.debug_struct("Request").finish_non_exhaustive()
173    }
174}
175
176impl<T> Future for Request<T> {
177    type Output = Result<T, SendError>;
178
179    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
180        Pin::new(&mut self.0)
181            .poll(cx)
182            .map(|r| r.map_err(|_| SendError))
183    }
184}
185
186/// Sends a message response from a service back to the waiting [`Request`].
187///
188/// The sender is part of an [`AsyncResponse`] and should be moved into the service interface
189/// type. If this sender is dropped without calling [`send`](Self::send), the request fails with
190/// [`SendError`].
191pub struct Sender<T>(oneshot::Sender<T>);
192
193impl<T> fmt::Debug for Sender<T> {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        f.debug_struct("Sender")
196            .field("open", &!self.0.is_closed())
197            .finish()
198    }
199}
200
201impl<T> Sender<T> {
202    /// Sends the response value and closes the [`Request`].
203    ///
204    /// This silently drops the value if the request has been dropped.
205    pub fn send(self, value: T) {
206        self.0.send(value).ok();
207    }
208}
209
210/// Message response resulting in an asynchronous [`Request`].
211///
212/// The sender must be placed on the interface in [`FromMessage::from_message`].
213///
214/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
215pub struct AsyncResponse<T>(PhantomData<T>);
216
217impl<T> fmt::Debug for AsyncResponse<T> {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        f.write_str("AsyncResponse")
220    }
221}
222
223impl<T> MessageResponse for AsyncResponse<T> {
224    type Sender = Sender<T>;
225    type Output = Request<T>;
226
227    fn channel() -> (Self::Sender, Self::Output) {
228        let (tx, rx) = oneshot::channel();
229        (Sender(tx), Request(rx))
230    }
231}
232
233/// Message response for fire-and-forget messages with no output.
234///
235/// There is no sender associated to this response. When implementing [`FromMessage`], the sender
236/// can be ignored.
237///
238/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
239pub struct NoResponse;
240
241impl fmt::Debug for NoResponse {
242    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243        f.write_str("NoResponse")
244    }
245}
246
247impl MessageResponse for NoResponse {
248    type Sender = ();
249    type Output = ();
250
251    fn channel() -> (Self::Sender, Self::Output) {
252        ((), ())
253    }
254}
255
256/// Initial response to a [`BroadcastRequest`].
257#[derive(Debug)]
258enum InitialResponse<T> {
259    /// The response value is immediately ready.
260    ///
261    /// The sender did not attach to a broadcast channel and instead resolved the requested value
262    /// immediately. The request is now ready and can resolve. See [`BroadcastChannel::attach`].
263    Ready(T),
264    /// The sender is attached to a channel that needs to be polled.
265    Poll(Shared<oneshot::Receiver<T>>),
266}
267
268/// States of a [`BroadcastRequest`].
269enum BroadcastState<T> {
270    /// The request is waiting for an initial response.
271    Pending(oneshot::Receiver<InitialResponse<T>>),
272    /// The request is attached to a [`BroadcastChannel`].
273    Attached(Shared<oneshot::Receiver<T>>),
274}
275
276/// The request when sending an asynchronous message to a service.
277///
278/// This is returned from [`Addr::send`] when the message responds asynchronously through
279/// [`BroadcastResponse`]. It is a future that should be awaited. The message still runs to
280/// completion if this future is dropped.
281///
282/// # Panics
283///
284/// This future is not fused and panics if it is polled again after it has resolved.
285pub struct BroadcastRequest<T>(BroadcastState<T>)
286where
287    T: Clone;
288
289impl<T: Clone> Future for BroadcastRequest<T> {
290    type Output = Result<T, SendError>;
291
292    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
293        Poll::Ready(loop {
294            match self.0 {
295                BroadcastState::Pending(ref mut pending) => {
296                    match futures::ready!(Pin::new(pending).poll(cx)) {
297                        Ok(InitialResponse::Ready(value)) => break Ok(value),
298                        Ok(InitialResponse::Poll(shared)) => {
299                            self.0 = BroadcastState::Attached(shared)
300                        }
301                        Err(_) => break Err(SendError),
302                    }
303                }
304                BroadcastState::Attached(ref mut shared) => {
305                    match futures::ready!(Pin::new(shared).poll(cx)) {
306                        Ok(value) => break Ok(value),
307                        Err(_) => break Err(SendError),
308                    }
309                }
310            }
311        })
312    }
313}
314
315/// A channel that broadcasts values to attached [senders](BroadcastSender).
316///
317/// This is part of the [`BroadcastResponse`] message behavior to efficiently send delayed responses
318/// to a large number of senders. All requests that are attached to this channel via their senders
319/// resolve with the same value.
320///
321/// # Example
322///
323/// ```
324/// use relay_system::{BroadcastChannel, BroadcastSender};
325///
326/// struct MyService {
327///     channel: Option<BroadcastChannel<String>>,
328/// }
329///
330/// impl MyService {
331///     fn handle_message(&mut self, sender: BroadcastSender<String>) {
332///         if let Some(ref mut channel) = self.channel {
333///             channel.attach(sender);
334///         } else {
335///             self.channel = Some(sender.into_channel());
336///         }
337///     }
338///
339///     fn finish_compute(&mut self, value: String) {
340///         if let Some(channel) = self.channel.take() {
341///             channel.send(value);
342///         }
343///     }
344/// }
345/// ```
346#[derive(Debug)]
347pub struct BroadcastChannel<T>
348where
349    T: Clone,
350{
351    tx: oneshot::Sender<T>,
352    rx: Shared<oneshot::Receiver<T>>,
353}
354
355impl<T: Clone> BroadcastChannel<T> {
356    /// Creates a standalone channel.
357    ///
358    /// Use [`attach`](Self::attach) to add senders to this channel. Alternatively, create a channel
359    /// with [`BroadcastSender::into_channel`].
360    pub fn new() -> Self {
361        let (tx, rx) = oneshot::channel();
362        Self {
363            tx,
364            rx: rx.shared(),
365        }
366    }
367
368    /// Attaches a sender of another message to this channel to receive the same value.
369    ///
370    /// # Example
371    ///
372    /// ```
373    /// use relay_system::{BroadcastChannel, BroadcastResponse, BroadcastSender};
374    /// # use relay_system::MessageResponse;
375    ///
376    /// // This is usually done as part of `Addr::send`
377    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
378    ///
379    /// let mut channel = BroadcastChannel::new();
380    /// channel.attach(sender);
381    /// ```
382    pub fn attach(&mut self, sender: BroadcastSender<T>) {
383        sender.0.send(InitialResponse::Poll(self.rx.clone())).ok();
384    }
385
386    /// Sends a value to all attached senders and closes the channel.
387    ///
388    /// This method succeeds even if no senders are attached to this channel anymore. To check if
389    /// this channel is still active with senders attached, use [`is_attached`](Self::is_attached).
390    ///
391    /// # Example
392    ///
393    /// ```
394    /// use relay_system::BroadcastResponse;
395    /// # use relay_system::MessageResponse;
396    /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
397    ///
398    /// // This is usually done as part of `Addr::send`
399    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
400    ///
401    /// let channel = sender.into_channel();
402    /// channel.send("test");
403    /// assert_eq!(rx.await, Ok("test"));
404    /// # })
405    /// ```
406    pub fn send(self, value: T) {
407        self.tx.send(value).ok();
408    }
409
410    /// Returns `true` if there are [requests](BroadcastRequest) waiting for this channel.
411    ///
412    /// The channel is not permanently closed when all waiting requests have detached. A new sender
413    /// can be attached using [`attach`](Self::attach) even after this method returns `false`.
414    ///
415    /// # Example
416    ///
417    /// ```
418    /// use relay_system::BroadcastResponse;
419    /// # use relay_system::MessageResponse;
420    ///
421    /// // This is usually done as part of `Addr::send`
422    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
423    ///
424    /// let channel = sender.into_channel();
425    /// assert!(channel.is_attached());
426    ///
427    /// drop(rx);
428    /// assert!(!channel.is_attached());
429    /// ```
430    pub fn is_attached(&self) -> bool {
431        self.rx.strong_count() > Some(1)
432    }
433}
434
435impl<T: Clone> Default for BroadcastChannel<T> {
436    fn default() -> Self {
437        Self::new()
438    }
439}
440
441/// Sends a message response from a service back to the waiting [`BroadcastRequest`].
442///
443/// The sender is part of an [`BroadcastResponse`] and should be moved into the service interface
444/// type. If this sender is dropped without calling [`send`](Self::send), the request fails with
445/// [`SendError`].
446///
447/// As opposed to the regular [`Sender`] for asynchronous messages, this sender can be converted
448/// into a [channel](Self::into_channel) that efficiently shares a common response for multiple
449/// requests to the same data value. This is useful if resolving or computing the value takes more
450/// time.
451///
452/// # Example
453///
454/// ```
455/// use relay_system::BroadcastResponse;
456/// # use relay_system::MessageResponse;
457/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
458///
459/// // This is usually done as part of `Addr::send`
460/// let (sender1, rx1) = BroadcastResponse::<&str>::channel();
461/// let (sender2, rx2) = BroadcastResponse::<&str>::channel();
462///
463/// // On the first time, convert the sender into a channel
464/// let mut channel = sender1.into_channel();
465///
466/// // The second time, attach the sender to the existing channel
467/// channel.attach(sender2);
468///
469/// // Send a value into the channel to resolve all requests simultaneously
470/// channel.send("test");
471/// assert_eq!(rx1.await, Ok("test"));
472/// assert_eq!(rx2.await, Ok("test"));
473/// # })
474/// ```
475#[derive(Debug)]
476pub struct BroadcastSender<T>(oneshot::Sender<InitialResponse<T>>)
477where
478    T: Clone;
479
480impl<T: Clone> BroadcastSender<T> {
481    /// Immediately resolve a ready value.
482    ///
483    /// This bypasses shared channels and directly sends the a value to the waiting
484    /// [request](BroadcastRequest). In terms of performance and behavior, using `send` is
485    /// equivalent to calling [`Sender::send`] for a regular [`AsyncResponse`].
486    ///
487    /// # Example
488    ///
489    /// ```
490    /// use relay_system::BroadcastResponse;
491    /// # use relay_system::MessageResponse;
492    /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
493    ///
494    /// // This is usually done as part of `Addr::send`
495    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
496    ///
497    /// // sender is NOT converted into a channel!
498    ///
499    /// sender.send("test");
500    /// assert_eq!(rx.await, Ok("test"));
501    /// # })
502    /// ```
503    pub fn send(self, value: T) {
504        self.0.send(InitialResponse::Ready(value)).ok();
505    }
506
507    /// Creates a channel from this sender that can be shared with other senders.
508    ///
509    /// To add more senders to the created channel at a later point, use
510    /// [`attach`](BroadcastChannel::attach).
511    ///
512    /// # Example
513    ///
514    /// ```
515    /// use relay_system::{BroadcastChannel, BroadcastResponse};
516    /// # use relay_system::MessageResponse;
517    ///
518    /// // This is usually done as part of `Addr::send`
519    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
520    ///
521    /// let channel: BroadcastChannel<&str> = sender.into_channel();
522    /// ```
523    pub fn into_channel(self) -> BroadcastChannel<T> {
524        let mut channel = BroadcastChannel::new();
525        channel.attach(self);
526        channel
527    }
528}
529
530/// Variation of [`AsyncResponse`] that efficiently broadcasts responses to many requests.
531///
532/// This response behavior is useful for services that cache or debounce requests. Instead of
533/// responding to each equivalent request via its individual sender, the broadcast behavior allows
534/// to create a [`BroadcastChannel`] that efficiently resolves all pending requests once the value
535/// is ready.
536///
537/// Similar to `AsyncResponse`, the service receives a sender that it can use to send a value
538/// directly back to the waiting request. Additionally, the sender can be converted into a channel
539/// or attached to an already existing channel, if the service expects more requests while computing
540/// the response.
541///
542/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
543pub struct BroadcastResponse<T>(PhantomData<T>)
544where
545    T: Clone;
546
547impl<T: Clone> fmt::Debug for BroadcastResponse<T> {
548    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549        f.write_str("BroadcastResponse")
550    }
551}
552
553impl<T: Clone> MessageResponse for BroadcastResponse<T> {
554    type Sender = BroadcastSender<T>;
555    type Output = BroadcastRequest<T>;
556
557    fn channel() -> (Self::Sender, Self::Output) {
558        let (tx, rx) = oneshot::channel();
559        (
560            BroadcastSender(tx),
561            BroadcastRequest(BroadcastState::Pending(rx)),
562        )
563    }
564}
565
566/// Declares a message as part of an [`Interface`].
567///
568/// Messages have an associated `Response` type that determines the return value of sending the
569/// message. Within an interface, the responder can vary for each message. There are two provided
570/// responders.
571///
572/// # No Response
573///
574/// [`NoResponse`] is used for fire-and-forget messages that do not return any values. These
575/// messages do not spawn futures and cannot be awaited. It is neither possible to verify whether
576/// the message was delivered to the service.
577///
578/// When implementing `FromMessage` for such messages, the second argument can be ignored by
579/// convention:
580///
581/// ```
582/// use relay_system::{FromMessage, Interface, NoResponse};
583///
584/// struct MyMessage;
585///
586/// enum MyInterface {
587///     MyMessage(MyMessage),
588///     // ...
589/// }
590///
591/// impl Interface for MyInterface {}
592///
593/// impl FromMessage<MyMessage> for MyInterface {
594///     type Response = NoResponse;
595///
596///     fn from_message(message: MyMessage, _: ()) -> Self {
597///         Self::MyMessage(message)
598///     }
599/// }
600/// ```
601///
602/// # Asynchronous Responses
603///
604/// [`AsyncResponse`] is used for messages that resolve to some future value. This value is sent
605/// back by the service through a [`Sender`], which must be added into the interface:
606///
607/// ```
608/// use relay_system::{AsyncResponse, FromMessage, Interface, Sender};
609///
610/// struct MyMessage;
611///
612/// enum MyInterface {
613///     MyMessage(MyMessage, Sender<bool>),
614///     // ...
615/// }
616///
617/// impl Interface for MyInterface {}
618///
619/// impl FromMessage<MyMessage> for MyInterface {
620///     type Response = AsyncResponse<bool>;
621///
622///     fn from_message(message: MyMessage, sender: Sender<bool>) -> Self {
623///         Self::MyMessage(message, sender)
624///     }
625/// }
626/// ```
627///
628/// # Broadcast Responses
629///
630/// [`BroadcastResponse`] is similar to the previous asynchronous response, but it additionally
631/// allows to efficiently handle duplicate requests for services that debounce equivalent requests
632/// or cache results. On the requesting side, this behavior is identical to the asynchronous
633/// behavior, but it provides more utilities to the implementing service.
634///
635/// ```
636/// use relay_system::{BroadcastResponse, BroadcastSender, FromMessage, Interface};
637///
638/// struct MyMessage;
639///
640/// enum MyInterface {
641///     MyMessage(MyMessage, BroadcastSender<bool>),
642///     // ...
643/// }
644///
645/// impl Interface for MyInterface {}
646///
647/// impl FromMessage<MyMessage> for MyInterface {
648///     type Response = BroadcastResponse<bool>;
649///
650///     fn from_message(message: MyMessage, sender: BroadcastSender<bool>) -> Self {
651///         Self::MyMessage(message, sender)
652///     }
653/// }
654/// ```
655///
656/// See [`Interface`] for more examples on how to build interfaces using this trait and [`Service`]
657/// documentation for patterns and advice to handle messages.
658pub trait FromMessage<M>: Interface {
659    /// The behavior declaring the return value when sending this message.
660    type Response: MessageResponse;
661
662    /// Converts the message into the service interface.
663    fn from_message(message: M, sender: <Self::Response as MessageResponse>::Sender) -> Self;
664}
665
666/// Abstraction over address types for service channels.
667trait SendDispatch<M>: Send + Sync {
668    /// The behavior declaring the return value when sending this message.
669    ///
670    /// When this is implemented for a type bound to an [`Interface`], this is the same behavior as
671    /// used in [`FromMessage::Response`].
672    type Response: MessageResponse;
673
674    /// Sends a message to the service and returns the response.
675    ///
676    /// See [`Addr::send`] for more information on a concrete type.
677    fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output;
678
679    /// Returns a trait object of this type.
680    fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>>;
681}
682
683/// An address to a [`Service`] implementing any interface that takes a given message.
684///
685/// This is similar to an [`Addr`], but it is bound to a single message rather than an interface. As
686/// such, this type is not meant for communicating with a service implementation, but rather as a
687/// handle to any service that can consume a given message. These can be back-channels or hooks that
688/// are configured externally through Inversion of Control (IoC).
689///
690/// Recipients are created through [`Addr::recipient`].
691pub struct Recipient<M, R> {
692    inner: Box<dyn SendDispatch<M, Response = R>>,
693}
694
695impl<M, R> Recipient<M, R>
696where
697    R: MessageResponse,
698{
699    /// Sends a message to the service and returns the response.
700    ///
701    /// This is equivalent to [`send`](Addr::send) on the originating address.
702    pub fn send(&self, message: M) -> R::Output {
703        self.inner.send(message)
704    }
705}
706
707// Manual implementation since `XSender` cannot require `Clone` for object safety.
708impl<M, R: MessageResponse> Clone for Recipient<M, R> {
709    fn clone(&self) -> Self {
710        Self {
711            inner: self.inner.to_trait_object(),
712        }
713    }
714}
715
716/// The address of a [`Service`].
717///
718/// Addresses allow to [send](Self::send) messages to a service that implements a corresponding
719/// [`Interface`] as long as the service is running.
720///
721/// Addresses can be freely cloned. When the last clone is dropped, the message channel of the
722/// service closes permanently, which signals to the service that it can shut down.
723pub struct Addr<I: Interface> {
724    tx: mpsc::UnboundedSender<I>,
725    queue_size: Arc<AtomicU64>,
726}
727
728impl<I: Interface> Addr<I> {
729    /// Sends a message to the service and returns the response.
730    ///
731    /// Depending on the message's response behavior, this either returns a future resolving to the
732    /// return value, or does not return anything for fire-and-forget messages. The communication
733    /// channel with the service is unbounded, so backlogs could occur when sending too many
734    /// messages.
735    ///
736    /// Sending asynchronous messages can fail with `Err(SendError)` if the service has shut down.
737    /// The result of asynchronous messages does not have to be awaited. The message will be
738    /// delivered and handled regardless:
739    pub fn send<M>(&self, message: M) -> <I::Response as MessageResponse>::Output
740    where
741        I: FromMessage<M>,
742    {
743        let (tx, rx) = I::Response::channel();
744        self.queue_size.fetch_add(1, Ordering::SeqCst);
745        self.tx.send(I::from_message(message, tx)).ok(); // it's ok to drop, the response will fail
746        rx
747    }
748
749    /// Returns a handle that can receive a given message independent of the interface.
750    ///
751    /// See [`Recipient`] for more information and examples.
752    pub fn recipient<M>(self) -> Recipient<M, I::Response>
753    where
754        I: FromMessage<M>,
755    {
756        Recipient {
757            inner: Box::new(self),
758        }
759    }
760
761    /// Returns wether the queue is currently empty.
762    pub fn is_empty(&self) -> bool {
763        self.len() == 0
764    }
765
766    /// Returns the current queue size.
767    pub fn len(&self) -> u64 {
768        self.queue_size.load(Ordering::Relaxed)
769    }
770
771    /// Custom address used for testing.
772    ///
773    /// Returns the receiving end of the channel for inspection.
774    pub fn custom() -> (Self, mpsc::UnboundedReceiver<I>) {
775        let (tx, rx) = mpsc::unbounded_channel();
776        (
777            Addr {
778                tx,
779                queue_size: Default::default(),
780            },
781            rx,
782        )
783    }
784
785    /// Dummy address used for testing.
786    pub fn dummy() -> Self {
787        Self::custom().0
788    }
789}
790
791impl<I: Interface> fmt::Debug for Addr<I> {
792    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
793        f.debug_struct("Addr")
794            .field("open", &!self.tx.is_closed())
795            .field("queue_size", &self.queue_size.load(Ordering::Relaxed))
796            .finish()
797    }
798}
799
800// Manually derive `Clone` since we do not require `I: Clone` and the Clone derive adds this
801// constraint.
802impl<I: Interface> Clone for Addr<I> {
803    fn clone(&self) -> Self {
804        Self {
805            tx: self.tx.clone(),
806            queue_size: self.queue_size.clone(),
807        }
808    }
809}
810
811impl<I, M> SendDispatch<M> for Addr<I>
812where
813    I: Interface + FromMessage<M>,
814{
815    type Response = <I as FromMessage<M>>::Response;
816
817    fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output {
818        Addr::send(self, message)
819    }
820
821    fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>> {
822        Box::new(self.clone())
823    }
824}
825
826/// Inbound channel for messages sent through an [`Addr`].
827///
828/// This channel is meant to be polled in a [`Service`].
829///
830/// Instances are created automatically when [spawning](ServiceSpawn) a service, or can be
831/// created through [`channel`]. The channel closes when all associated [`Addr`]s are dropped.
832pub struct Receiver<I: Interface> {
833    rx: mpsc::UnboundedReceiver<I>,
834    name: &'static str,
835    interval: tokio::time::Interval,
836    queue_size: Arc<AtomicU64>,
837}
838
839impl<I: Interface> Receiver<I> {
840    /// Receives the next value for this receiver.
841    ///
842    /// This method returns `None` if the channel has been closed and there are
843    /// no remaining messages in the channel's buffer. This indicates that no
844    /// further values can ever be received from this `Receiver`. The channel is
845    /// closed when all senders have been dropped.
846    ///
847    /// If there are no messages in the channel's buffer, but the channel has
848    /// not yet been closed, this method will sleep until a message is sent or
849    /// the channel is closed.
850    pub async fn recv(&mut self) -> Option<I> {
851        loop {
852            tokio::select! {
853                biased;
854
855                _ = self.interval.tick() => {
856                    let backlog = self.queue_size.load(Ordering::Relaxed);
857                    relay_statsd::metric!(
858                        gauge(SystemGauges::ServiceBackPressure) = backlog,
859                        service = self.name
860                    );
861                },
862                message = self.rx.recv() => {
863                    self.queue_size.fetch_sub(1, Ordering::SeqCst);
864                    return message;
865                },
866            }
867        }
868    }
869}
870
871impl<I: Interface> fmt::Debug for Receiver<I> {
872    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873        f.debug_struct("Receiver")
874            .field("name", &self.name)
875            .field("queue_size", &self.queue_size.load(Ordering::Relaxed))
876            .finish()
877    }
878}
879
880/// Creates an unbounded channel for communicating with a [`Service`].
881///
882/// The `Addr` as the sending part provides public access to the service, while the `Receiver`
883/// should remain internal to the service.
884pub fn channel<I: Interface>(name: &'static str) -> (Addr<I>, Receiver<I>) {
885    let queue_size = Arc::new(AtomicU64::new(0));
886    let (tx, rx) = mpsc::unbounded_channel();
887
888    let addr = Addr {
889        tx,
890        queue_size: queue_size.clone(),
891    };
892
893    let mut interval = tokio::time::interval(BACKLOG_INTERVAL);
894    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
895
896    let receiver = Receiver {
897        rx,
898        name,
899        interval,
900        queue_size,
901    };
902
903    (addr, receiver)
904}
905
906/// An asynchronous unit responding to messages.
907///
908/// Services receive messages conforming to some [`Interface`] through an [`Addr`] and handle them
909/// one by one. Internally, services are free to concurrently process these messages or not, most
910/// probably should.
911///
912/// Individual messages can have a response which will be sent once the message is handled by the
913/// service. The sender can asynchronously await the responses of such messages.
914///
915/// To start a service, create a service runner and call [`ServiceSpawnExt::start`].
916///
917/// # Implementing Services
918///
919/// The standard way to implement services is through the `run` function. It receives an inbound
920/// channel for all messages sent through the service's address. Note that this function is
921/// synchronous, so that this needs to spawn at least one task internally:
922///
923/// ```no_run
924/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service, ServiceSpawnExt};
925/// # fn test(services: &dyn relay_system::ServiceSpawn) {
926/// struct MyMessage;
927///
928/// impl Interface for MyMessage {}
929///
930/// impl FromMessage<Self> for MyMessage {
931///     type Response = NoResponse;
932///
933///     fn from_message(message: Self, _: ()) -> Self {
934///         message
935///     }
936/// }
937///
938/// struct MyService;
939///
940/// impl Service for MyService {
941///     type Interface = MyMessage;
942///
943///     async fn run(self, mut rx: Receiver<Self::Interface>) {
944///         while let Some(message) = rx.recv().await {
945///             // handle the message
946///         }
947///     }
948/// }
949///
950/// let addr = services.start(MyService);
951/// # }
952/// ```
953///
954/// ## Debounce and Caching
955///
956/// Services that cache or debounce their responses can benefit from the [`BroadcastResponse`]
957/// behavior. To use this behavior, implement the message and interface identical to
958/// [`AsyncResponse`] above. This will provide a different sender type that can be converted into a
959/// channel to debounce responses. It is still possible to send values directly via the sender
960/// without a broadcast channel.
961///
962/// ```
963/// use std::collections::btree_map::{BTreeMap, Entry};
964/// use relay_system::{BroadcastChannel, BroadcastSender};
965///
966/// // FromMessage implementation using BroadcastResponse omitted for brevity.
967///
968/// struct MyService {
969///     cache: BTreeMap<u32, String>,
970///     channels: BTreeMap<u32, BroadcastChannel<String>>,
971/// }
972///
973/// impl MyService {
974///     fn handle_message(&mut self, id: u32, sender: BroadcastSender<String>) {
975///         if let Some(cached) = self.cache.get(&id) {
976///             sender.send(cached.clone());
977///             return;
978///         }
979///
980///         match self.channels.entry(id) {
981///             Entry::Vacant(entry) => {
982///                 entry.insert(sender.into_channel());
983///                 // Start async computation here.
984///             }
985///             Entry::Occupied(mut entry) => {
986///                 entry.get_mut().attach(sender);
987///             }
988///         }
989///     }
990///
991///     fn finish_compute(&mut self, id: u32, value: String) {
992///         if let Some(channel) = self.channels.remove(&id) {
993///             channel.send(value.clone());
994///         }
995///
996///         self.cache.insert(id, value);
997///     }
998/// }
999/// ```
1000///
1001pub trait Service: Sized {
1002    /// The interface of messages this service implements.
1003    ///
1004    /// The interface can be a single message type or an enumeration of all the messages that
1005    /// can be handled by this service.
1006    type Interface: Interface;
1007
1008    /// Defines the main task of this service.
1009    ///
1010    /// `run` typically contains a loop that reads from `rx`, or a `select!` that reads
1011    /// from multiple sources at once.
1012    fn run(self, rx: Receiver<Self::Interface>) -> impl Future<Output = ()> + Send + 'static;
1013
1014    /// Starts the service in the current runtime and returns an address for it.
1015    ///
1016    /// The service runs in a detached tokio task that cannot be joined on. This is mainly useful
1017    /// for tests.
1018    fn start_detached(self) -> Addr<Self::Interface> {
1019        let (addr, rx) = channel(Self::name());
1020        spawn(TaskId::for_service::<Self>(), self.run(rx));
1021        addr
1022    }
1023
1024    /// Returns a unique name for this service implementation.
1025    ///
1026    /// This is used for internal diagnostics and uses the fully qualified type name of the service
1027    /// implementor by default.
1028    fn name() -> &'static str {
1029        std::any::type_name::<Self>()
1030    }
1031}
1032
1033/// The [`ServiceSpawn`] trait allows for starting a [`Service`] on an executor that will run them to completion.
1034///
1035/// Often you want to spawn your service directly via [`ServiceSpawnExt`].
1036pub trait ServiceSpawn {
1037    /// Starts the service on the executor.
1038    fn start_obj(&self, service: ServiceObj);
1039}
1040
1041/// Extension trait for [`ServiceSpawn`], providing more convenient methods to spawn a service.
1042pub trait ServiceSpawnExt {
1043    /// Starts a service and starts tracking its join handle, exposing an [`Addr`] for message passing.
1044    fn start<S: Service>(&self, service: S) -> Addr<S::Interface>;
1045
1046    /// Starts a service and starts tracking its join handle, given a predefined receiver.
1047    fn start_with<S: Service>(&self, service: S, rx: Receiver<S::Interface>);
1048}
1049
1050impl<T: ServiceSpawn + ?Sized> ServiceSpawnExt for T {
1051    fn start<S: Service>(&self, service: S) -> Addr<S::Interface> {
1052        let (addr, rx) = crate::channel(S::name());
1053        self.start_with(service, rx);
1054        addr
1055    }
1056
1057    fn start_with<S: Service>(&self, service: S, rx: Receiver<S::Interface>) {
1058        self.start_obj(ServiceObj::new(service, rx));
1059    }
1060}
1061
1062/// Type erased [`Service`].
1063///
1064/// Used to start a service using [`ServiceSpawn`] and usually not directly interacted with.
1065/// Use [`ServiceSpawnExt`] when possible instead.
1066pub struct ServiceObj {
1067    name: &'static str,
1068    future: BoxFuture<'static, ()>,
1069}
1070
1071impl ServiceObj {
1072    /// Creates a new bundled type erased [`Service`], that can be started using [`ServiceSpawn`].
1073    pub fn new<S: Service>(service: S, rx: Receiver<S::Interface>) -> Self {
1074        Self {
1075            name: S::name(),
1076            future: service.run(rx).boxed(),
1077        }
1078    }
1079
1080    /// Returns the name of the service.
1081    ///
1082    /// The name of the service is inferred from [`Service::name`].
1083    pub fn name(&self) -> &'static str {
1084        self.name
1085    }
1086}
1087
1088#[cfg(feature = "test")]
1089/// A [`ServiceSpawn`] implementation which spawns a service on the current Tokio runtime.
1090pub struct TokioServiceSpawn;
1091
1092#[cfg(feature = "test")]
1093impl ServiceSpawn for TokioServiceSpawn {
1094    #[track_caller]
1095    fn start_obj(&self, service: ServiceObj) {
1096        crate::spawn!(service.future);
1097    }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102    use super::*;
1103
1104    struct MockMessage;
1105
1106    impl Interface for MockMessage {}
1107
1108    impl FromMessage<Self> for MockMessage {
1109        type Response = NoResponse;
1110
1111        fn from_message(message: Self, _: ()) -> Self {
1112            message
1113        }
1114    }
1115
1116    struct MockService;
1117
1118    impl Service for MockService {
1119        type Interface = MockMessage;
1120
1121        async fn run(self, mut rx: Receiver<Self::Interface>) {
1122            while rx.recv().await.is_some() {
1123                tokio::time::sleep(BACKLOG_INTERVAL * 2).await;
1124            }
1125        }
1126
1127        fn name() -> &'static str {
1128            "mock"
1129        }
1130    }
1131
1132    #[test]
1133    fn test_backpressure_metrics() {
1134        let rt = tokio::runtime::Builder::new_current_thread()
1135            .enable_time()
1136            .build()
1137            .unwrap();
1138
1139        let _guard = rt.enter();
1140        tokio::time::pause();
1141
1142        // Mock service takes 2 * BACKLOG_INTERVAL for every message
1143        let addr = MockService.start_detached();
1144
1145        // Advance the timer by a tiny offset to trigger the first metric emission.
1146        let captures = relay_statsd::with_capturing_test_client(|| {
1147            rt.block_on(async {
1148                tokio::time::sleep(Duration::from_millis(10)).await;
1149            })
1150        });
1151
1152        assert_eq!(captures, ["service.back_pressure:0|g|#service:mock"]);
1153
1154        // Send messages and advance to 0.5 * INTERVAL. No metrics expected at this point.
1155        let captures = relay_statsd::with_capturing_test_client(|| {
1156            rt.block_on(async {
1157                addr.send(MockMessage); // will be pulled immediately
1158                addr.send(MockMessage);
1159                addr.send(MockMessage);
1160
1161                tokio::time::sleep(BACKLOG_INTERVAL / 2).await;
1162            })
1163        });
1164
1165        assert!(captures.is_empty());
1166
1167        // Advance to 6.5 * INTERVAL. The service should pull the first message immediately, another
1168        // message every 2 INTERVALS. The messages are fully handled after 6 INTERVALS, but we
1169        // cannot observe that since the last message exits the queue at 4.
1170        let captures = relay_statsd::with_capturing_test_client(|| {
1171            rt.block_on(async {
1172                tokio::time::sleep(BACKLOG_INTERVAL * 6).await;
1173            })
1174        });
1175
1176        assert_eq!(
1177            captures,
1178            [
1179                "service.back_pressure:2|g|#service:mock", // 2 * INTERVAL
1180                "service.back_pressure:1|g|#service:mock", // 4 * INTERVAL
1181                "service.back_pressure:0|g|#service:mock", // 6 * INTERVAL
1182            ]
1183        );
1184    }
1185}