relay_system/service/
mod.rs

1use std::fmt;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::future::{BoxFuture, Shared};
12use tokio::sync::{mpsc, oneshot};
13use tokio::time::MissedTickBehavior;
14
15use crate::statsd::SystemGauges;
16use crate::{TaskId, spawn};
17
18mod concurrent;
19mod registry;
20mod simple;
21mod status;
22
23pub use self::concurrent::{ConcurrentService, LoadShed};
24pub(crate) use self::registry::Registry as ServiceRegistry;
25pub use self::registry::{ServiceId, ServiceMetrics, ServicesMetrics};
26pub use self::simple::SimpleService;
27pub use self::status::{
28    ServiceError, ServiceJoinHandle, ServiceStatusError, ServiceStatusJoinHandle,
29};
30
31/// Interval for recording backlog metrics on service channels.
32const BACKLOG_INTERVAL: Duration = Duration::from_secs(1);
33
34/// A message interface for [services](Service).
35///
36/// Most commonly, this interface is an enumeration of messages, but it can also be implemented on a
37/// single message. For each individual message, this type needs to implement the [`FromMessage`]
38/// trait.
39///
40/// # Implementating Interfaces
41///
42/// There are three main ways to implement interfaces, which depends on the number of messages and
43/// their return values. The simplest way is an interface consisting of a **single message** with
44/// **no return value**. For this case, use the message directly as interface and choose
45/// `NoResponse` as response:
46///
47/// ```
48/// use relay_system::{FromMessage, Interface, NoResponse};
49///
50/// #[derive(Debug)]
51/// pub struct MyMessage;
52///
53/// impl Interface for MyMessage {}
54///
55/// impl FromMessage<Self> for MyMessage {
56///     type Response = NoResponse;
57///
58///     fn from_message(message: Self, _: ()) -> Self {
59///         message
60///     }
61/// }
62/// ```
63///
64/// If there is a **single message with a return value**, implement the interface as a wrapper for
65/// the message and the return [`Sender`]:
66///
67/// ```
68/// use relay_system::{AsyncResponse, FromMessage, Interface, Sender};
69///
70/// #[derive(Debug)]
71/// pub struct MyMessage;
72///
73/// #[derive(Debug)]
74/// pub struct MyInterface(MyMessage, Sender<bool>);
75///
76/// impl Interface for MyInterface {}
77///
78/// impl FromMessage<MyMessage> for MyInterface {
79///     type Response = AsyncResponse<bool>;
80///
81///     fn from_message(message: MyMessage, sender: Sender<bool>) -> Self {
82///         Self(message, sender)
83///     }
84/// }
85/// ```
86///
87/// Finally, interfaces with **multiple messages** of any kind can most commonly be implemented
88/// through an enumeration for every message. The variants of messages with return values need a
89/// `Sender` again:
90///
91/// ```
92/// use relay_system::{AsyncResponse, FromMessage, Interface, NoResponse, Sender};
93///
94/// #[derive(Debug)]
95/// pub struct GetFlag;
96///
97/// #[derive(Debug)]
98/// pub struct SetFlag(pub bool);
99///
100/// #[derive(Debug)]
101/// pub enum MyInterface {
102///     Get(GetFlag, Sender<bool>),
103///     Set(SetFlag),
104/// }
105///
106/// impl Interface for MyInterface {}
107///
108/// impl FromMessage<GetFlag> for MyInterface {
109///     type Response = AsyncResponse<bool>;
110///
111///     fn from_message(message: GetFlag, sender: Sender<bool>) -> Self {
112///         Self::Get(message, sender)
113///     }
114/// }
115///
116/// impl FromMessage<SetFlag> for MyInterface {
117///     type Response = NoResponse;
118///
119///     fn from_message(message: SetFlag, _: ()) -> Self {
120///         Self::Set(message)
121///     }
122/// }
123/// ```
124///
125/// # Requirements
126///
127/// Interfaces are meant to be sent to services via channels. As such, they need to be both `Send`
128/// and `'static`. It is highly encouraged to implement `Debug` on all interfaces and their
129/// messages.
130pub trait Interface: Send + 'static {}
131
132/// Services without messages can use `()` as their interface.
133impl Interface for () {}
134
135/// An error when [sending](Addr::send) a message to a service fails.
136#[derive(Clone, Copy, Debug, PartialEq)]
137pub struct SendError;
138
139impl fmt::Display for SendError {
140    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141        write!(f, "failed to send message to service")
142    }
143}
144
145impl std::error::Error for SendError {}
146
147/// Response behavior of an [`Interface`] message.
148///
149/// It defines how a service handles and responds to interface messages, such as through
150/// asynchronous responses or fire-and-forget without responding. [`FromMessage`] implementations
151/// declare this behavior on the interface.
152///
153/// See [`FromMessage`] for more information on how to use this trait.
154pub trait MessageResponse {
155    /// Sends responses from the service back to the waiting recipient.
156    type Sender;
157
158    /// The type returned from [`Addr::send`].
159    ///
160    /// This type can be either synchronous and asynchronous based on the responder.
161    type Output;
162
163    /// Returns the response channel for an interface message.
164    fn channel() -> (Self::Sender, Self::Output);
165}
166
167/// The request when sending an asynchronous message to a service.
168///
169/// This is returned from [`Addr::send`] when the message responds asynchronously through
170/// [`AsyncResponse`]. It is a future that should be awaited. The message still runs to
171/// completion if this future is dropped.
172pub struct Request<T>(oneshot::Receiver<T>);
173
174impl<T> fmt::Debug for Request<T> {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        f.debug_struct("Request").finish_non_exhaustive()
177    }
178}
179
180impl<T> Future for Request<T> {
181    type Output = Result<T, SendError>;
182
183    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
184        Pin::new(&mut self.0)
185            .poll(cx)
186            .map(|r| r.map_err(|_| SendError))
187    }
188}
189
190/// Sends a message response from a service back to the waiting [`Request`].
191///
192/// The sender is part of an [`AsyncResponse`] and should be moved into the service interface
193/// type. If this sender is dropped without calling [`send`](Self::send), the request fails with
194/// [`SendError`].
195pub struct Sender<T>(oneshot::Sender<T>);
196
197impl<T> fmt::Debug for Sender<T> {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        f.debug_struct("Sender")
200            .field("open", &!self.0.is_closed())
201            .finish()
202    }
203}
204
205impl<T> Sender<T> {
206    /// Sends the response value and closes the [`Request`].
207    ///
208    /// This silently drops the value if the request has been dropped.
209    pub fn send(self, value: T) {
210        self.0.send(value).ok();
211    }
212}
213
214/// Message response resulting in an asynchronous [`Request`].
215///
216/// The sender must be placed on the interface in [`FromMessage::from_message`].
217///
218/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
219pub struct AsyncResponse<T>(PhantomData<T>);
220
221impl<T> fmt::Debug for AsyncResponse<T> {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        f.write_str("AsyncResponse")
224    }
225}
226
227impl<T> MessageResponse for AsyncResponse<T> {
228    type Sender = Sender<T>;
229    type Output = Request<T>;
230
231    fn channel() -> (Self::Sender, Self::Output) {
232        let (tx, rx) = oneshot::channel();
233        (Sender(tx), Request(rx))
234    }
235}
236
237/// Message response for fire-and-forget messages with no output.
238///
239/// There is no sender associated to this response. When implementing [`FromMessage`], the sender
240/// can be ignored.
241///
242/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
243pub struct NoResponse;
244
245impl fmt::Debug for NoResponse {
246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247        f.write_str("NoResponse")
248    }
249}
250
251impl MessageResponse for NoResponse {
252    type Sender = ();
253    type Output = ();
254
255    fn channel() -> (Self::Sender, Self::Output) {
256        ((), ())
257    }
258}
259
260/// Initial response to a [`BroadcastRequest`].
261#[derive(Debug)]
262enum InitialResponse<T> {
263    /// The response value is immediately ready.
264    ///
265    /// The sender did not attach to a broadcast channel and instead resolved the requested value
266    /// immediately. The request is now ready and can resolve. See [`BroadcastChannel::attach`].
267    Ready(T),
268    /// The sender is attached to a channel that needs to be polled.
269    Poll(Shared<oneshot::Receiver<T>>),
270}
271
272/// States of a [`BroadcastRequest`].
273enum BroadcastState<T> {
274    /// The request is waiting for an initial response.
275    Pending(oneshot::Receiver<InitialResponse<T>>),
276    /// The request is attached to a [`BroadcastChannel`].
277    Attached(Shared<oneshot::Receiver<T>>),
278}
279
280/// The request when sending an asynchronous message to a service.
281///
282/// This is returned from [`Addr::send`] when the message responds asynchronously through
283/// [`BroadcastResponse`]. It is a future that should be awaited. The message still runs to
284/// completion if this future is dropped.
285///
286/// # Panics
287///
288/// This future is not fused and panics if it is polled again after it has resolved.
289pub struct BroadcastRequest<T>(BroadcastState<T>)
290where
291    T: Clone;
292
293impl<T: Clone> Future for BroadcastRequest<T> {
294    type Output = Result<T, SendError>;
295
296    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
297        Poll::Ready(loop {
298            match self.0 {
299                BroadcastState::Pending(ref mut pending) => {
300                    match futures::ready!(Pin::new(pending).poll(cx)) {
301                        Ok(InitialResponse::Ready(value)) => break Ok(value),
302                        Ok(InitialResponse::Poll(shared)) => {
303                            self.0 = BroadcastState::Attached(shared)
304                        }
305                        Err(_) => break Err(SendError),
306                    }
307                }
308                BroadcastState::Attached(ref mut shared) => {
309                    match futures::ready!(Pin::new(shared).poll(cx)) {
310                        Ok(value) => break Ok(value),
311                        Err(_) => break Err(SendError),
312                    }
313                }
314            }
315        })
316    }
317}
318
319/// A channel that broadcasts values to attached [senders](BroadcastSender).
320///
321/// This is part of the [`BroadcastResponse`] message behavior to efficiently send delayed responses
322/// to a large number of senders. All requests that are attached to this channel via their senders
323/// resolve with the same value.
324///
325/// # Example
326///
327/// ```
328/// use relay_system::{BroadcastChannel, BroadcastSender};
329///
330/// struct MyService {
331///     channel: Option<BroadcastChannel<String>>,
332/// }
333///
334/// impl MyService {
335///     fn handle_message(&mut self, sender: BroadcastSender<String>) {
336///         if let Some(ref mut channel) = self.channel {
337///             channel.attach(sender);
338///         } else {
339///             self.channel = Some(sender.into_channel());
340///         }
341///     }
342///
343///     fn finish_compute(&mut self, value: String) {
344///         if let Some(channel) = self.channel.take() {
345///             channel.send(value);
346///         }
347///     }
348/// }
349/// ```
350#[derive(Debug)]
351pub struct BroadcastChannel<T>
352where
353    T: Clone,
354{
355    tx: oneshot::Sender<T>,
356    rx: Shared<oneshot::Receiver<T>>,
357}
358
359impl<T: Clone> BroadcastChannel<T> {
360    /// Creates a standalone channel.
361    ///
362    /// Use [`attach`](Self::attach) to add senders to this channel. Alternatively, create a channel
363    /// with [`BroadcastSender::into_channel`].
364    pub fn new() -> Self {
365        let (tx, rx) = oneshot::channel();
366        Self {
367            tx,
368            rx: rx.shared(),
369        }
370    }
371
372    /// Attaches a sender of another message to this channel to receive the same value.
373    ///
374    /// # Example
375    ///
376    /// ```
377    /// use relay_system::{BroadcastChannel, BroadcastResponse, BroadcastSender};
378    /// # use relay_system::MessageResponse;
379    ///
380    /// // This is usually done as part of `Addr::send`
381    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
382    ///
383    /// let mut channel = BroadcastChannel::new();
384    /// channel.attach(sender);
385    /// ```
386    pub fn attach(&mut self, sender: BroadcastSender<T>) {
387        sender.0.send(InitialResponse::Poll(self.rx.clone())).ok();
388    }
389
390    /// Sends a value to all attached senders and closes the channel.
391    ///
392    /// This method succeeds even if no senders are attached to this channel anymore. To check if
393    /// this channel is still active with senders attached, use [`is_attached`](Self::is_attached).
394    ///
395    /// # Example
396    ///
397    /// ```
398    /// use relay_system::BroadcastResponse;
399    /// # use relay_system::MessageResponse;
400    /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
401    ///
402    /// // This is usually done as part of `Addr::send`
403    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
404    ///
405    /// let channel = sender.into_channel();
406    /// channel.send("test");
407    /// assert_eq!(rx.await, Ok("test"));
408    /// # })
409    /// ```
410    pub fn send(self, value: T) {
411        self.tx.send(value).ok();
412    }
413
414    /// Returns `true` if there are [requests](BroadcastRequest) waiting for this channel.
415    ///
416    /// The channel is not permanently closed when all waiting requests have detached. A new sender
417    /// can be attached using [`attach`](Self::attach) even after this method returns `false`.
418    ///
419    /// # Example
420    ///
421    /// ```
422    /// use relay_system::BroadcastResponse;
423    /// # use relay_system::MessageResponse;
424    ///
425    /// // This is usually done as part of `Addr::send`
426    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
427    ///
428    /// let channel = sender.into_channel();
429    /// assert!(channel.is_attached());
430    ///
431    /// drop(rx);
432    /// assert!(!channel.is_attached());
433    /// ```
434    pub fn is_attached(&self) -> bool {
435        self.rx.strong_count() > Some(1)
436    }
437}
438
439impl<T: Clone> Default for BroadcastChannel<T> {
440    fn default() -> Self {
441        Self::new()
442    }
443}
444
445/// Sends a message response from a service back to the waiting [`BroadcastRequest`].
446///
447/// The sender is part of an [`BroadcastResponse`] and should be moved into the service interface
448/// type. If this sender is dropped without calling [`send`](Self::send), the request fails with
449/// [`SendError`].
450///
451/// As opposed to the regular [`Sender`] for asynchronous messages, this sender can be converted
452/// into a [channel](Self::into_channel) that efficiently shares a common response for multiple
453/// requests to the same data value. This is useful if resolving or computing the value takes more
454/// time.
455///
456/// # Example
457///
458/// ```
459/// use relay_system::BroadcastResponse;
460/// # use relay_system::MessageResponse;
461/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
462///
463/// // This is usually done as part of `Addr::send`
464/// let (sender1, rx1) = BroadcastResponse::<&str>::channel();
465/// let (sender2, rx2) = BroadcastResponse::<&str>::channel();
466///
467/// // On the first time, convert the sender into a channel
468/// let mut channel = sender1.into_channel();
469///
470/// // The second time, attach the sender to the existing channel
471/// channel.attach(sender2);
472///
473/// // Send a value into the channel to resolve all requests simultaneously
474/// channel.send("test");
475/// assert_eq!(rx1.await, Ok("test"));
476/// assert_eq!(rx2.await, Ok("test"));
477/// # })
478/// ```
479#[derive(Debug)]
480pub struct BroadcastSender<T>(oneshot::Sender<InitialResponse<T>>)
481where
482    T: Clone;
483
484impl<T: Clone> BroadcastSender<T> {
485    /// Immediately resolve a ready value.
486    ///
487    /// This bypasses shared channels and directly sends the a value to the waiting
488    /// [request](BroadcastRequest). In terms of performance and behavior, using `send` is
489    /// equivalent to calling [`Sender::send`] for a regular [`AsyncResponse`].
490    ///
491    /// # Example
492    ///
493    /// ```
494    /// use relay_system::BroadcastResponse;
495    /// # use relay_system::MessageResponse;
496    /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
497    ///
498    /// // This is usually done as part of `Addr::send`
499    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
500    ///
501    /// // sender is NOT converted into a channel!
502    ///
503    /// sender.send("test");
504    /// assert_eq!(rx.await, Ok("test"));
505    /// # })
506    /// ```
507    pub fn send(self, value: T) {
508        self.0.send(InitialResponse::Ready(value)).ok();
509    }
510
511    /// Creates a channel from this sender that can be shared with other senders.
512    ///
513    /// To add more senders to the created channel at a later point, use
514    /// [`attach`](BroadcastChannel::attach).
515    ///
516    /// # Example
517    ///
518    /// ```
519    /// use relay_system::{BroadcastChannel, BroadcastResponse};
520    /// # use relay_system::MessageResponse;
521    ///
522    /// // This is usually done as part of `Addr::send`
523    /// let (sender, rx) = BroadcastResponse::<&str>::channel();
524    ///
525    /// let channel: BroadcastChannel<&str> = sender.into_channel();
526    /// ```
527    pub fn into_channel(self) -> BroadcastChannel<T> {
528        let mut channel = BroadcastChannel::new();
529        channel.attach(self);
530        channel
531    }
532}
533
534/// Variation of [`AsyncResponse`] that efficiently broadcasts responses to many requests.
535///
536/// This response behavior is useful for services that cache or debounce requests. Instead of
537/// responding to each equivalent request via its individual sender, the broadcast behavior allows
538/// to create a [`BroadcastChannel`] that efficiently resolves all pending requests once the value
539/// is ready.
540///
541/// Similar to `AsyncResponse`, the service receives a sender that it can use to send a value
542/// directly back to the waiting request. Additionally, the sender can be converted into a channel
543/// or attached to an already existing channel, if the service expects more requests while computing
544/// the response.
545///
546/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
547pub struct BroadcastResponse<T>(PhantomData<T>)
548where
549    T: Clone;
550
551impl<T: Clone> fmt::Debug for BroadcastResponse<T> {
552    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
553        f.write_str("BroadcastResponse")
554    }
555}
556
557impl<T: Clone> MessageResponse for BroadcastResponse<T> {
558    type Sender = BroadcastSender<T>;
559    type Output = BroadcastRequest<T>;
560
561    fn channel() -> (Self::Sender, Self::Output) {
562        let (tx, rx) = oneshot::channel();
563        (
564            BroadcastSender(tx),
565            BroadcastRequest(BroadcastState::Pending(rx)),
566        )
567    }
568}
569
570/// Declares a message as part of an [`Interface`].
571///
572/// Messages have an associated `Response` type that determines the return value of sending the
573/// message. Within an interface, the responder can vary for each message. There are two provided
574/// responders.
575///
576/// # No Response
577///
578/// [`NoResponse`] is used for fire-and-forget messages that do not return any values. These
579/// messages do not spawn futures and cannot be awaited. It is neither possible to verify whether
580/// the message was delivered to the service.
581///
582/// When implementing `FromMessage` for such messages, the second argument can be ignored by
583/// convention:
584///
585/// ```
586/// use relay_system::{FromMessage, Interface, NoResponse};
587///
588/// struct MyMessage;
589///
590/// enum MyInterface {
591///     MyMessage(MyMessage),
592///     // ...
593/// }
594///
595/// impl Interface for MyInterface {}
596///
597/// impl FromMessage<MyMessage> for MyInterface {
598///     type Response = NoResponse;
599///
600///     fn from_message(message: MyMessage, _: ()) -> Self {
601///         Self::MyMessage(message)
602///     }
603/// }
604/// ```
605///
606/// # Asynchronous Responses
607///
608/// [`AsyncResponse`] is used for messages that resolve to some future value. This value is sent
609/// back by the service through a [`Sender`], which must be added into the interface:
610///
611/// ```
612/// use relay_system::{AsyncResponse, FromMessage, Interface, Sender};
613///
614/// struct MyMessage;
615///
616/// enum MyInterface {
617///     MyMessage(MyMessage, Sender<bool>),
618///     // ...
619/// }
620///
621/// impl Interface for MyInterface {}
622///
623/// impl FromMessage<MyMessage> for MyInterface {
624///     type Response = AsyncResponse<bool>;
625///
626///     fn from_message(message: MyMessage, sender: Sender<bool>) -> Self {
627///         Self::MyMessage(message, sender)
628///     }
629/// }
630/// ```
631///
632/// # Broadcast Responses
633///
634/// [`BroadcastResponse`] is similar to the previous asynchronous response, but it additionally
635/// allows to efficiently handle duplicate requests for services that debounce equivalent requests
636/// or cache results. On the requesting side, this behavior is identical to the asynchronous
637/// behavior, but it provides more utilities to the implementing service.
638///
639/// ```
640/// use relay_system::{BroadcastResponse, BroadcastSender, FromMessage, Interface};
641///
642/// struct MyMessage;
643///
644/// enum MyInterface {
645///     MyMessage(MyMessage, BroadcastSender<bool>),
646///     // ...
647/// }
648///
649/// impl Interface for MyInterface {}
650///
651/// impl FromMessage<MyMessage> for MyInterface {
652///     type Response = BroadcastResponse<bool>;
653///
654///     fn from_message(message: MyMessage, sender: BroadcastSender<bool>) -> Self {
655///         Self::MyMessage(message, sender)
656///     }
657/// }
658/// ```
659///
660/// See [`Interface`] for more examples on how to build interfaces using this trait and [`Service`]
661/// documentation for patterns and advice to handle messages.
662pub trait FromMessage<M>: Interface {
663    /// The behavior declaring the return value when sending this message.
664    type Response: MessageResponse;
665
666    /// Converts the message into the service interface.
667    fn from_message(message: M, sender: <Self::Response as MessageResponse>::Sender) -> Self;
668}
669
670/// Abstraction over address types for service channels.
671trait SendDispatch<M>: Send + Sync {
672    /// The behavior declaring the return value when sending this message.
673    ///
674    /// When this is implemented for a type bound to an [`Interface`], this is the same behavior as
675    /// used in [`FromMessage::Response`].
676    type Response: MessageResponse;
677
678    /// Sends a message to the service and returns the response.
679    ///
680    /// See [`Addr::send`] for more information on a concrete type.
681    fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output;
682
683    /// Returns a trait object of this type.
684    fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>>;
685}
686
687/// An address to a [`Service`] implementing any interface that takes a given message.
688///
689/// This is similar to an [`Addr`], but it is bound to a single message rather than an interface. As
690/// such, this type is not meant for communicating with a service implementation, but rather as a
691/// handle to any service that can consume a given message. These can be back-channels or hooks that
692/// are configured externally through Inversion of Control (IoC).
693///
694/// Recipients are created through [`Addr::recipient`].
695pub struct Recipient<M, R> {
696    inner: Box<dyn SendDispatch<M, Response = R>>,
697}
698
699impl<M, R> Recipient<M, R>
700where
701    R: MessageResponse,
702{
703    /// Sends a message to the service and returns the response.
704    ///
705    /// This is equivalent to [`send`](Addr::send) on the originating address.
706    pub fn send(&self, message: M) -> R::Output {
707        self.inner.send(message)
708    }
709}
710
711impl<M: 'static + Send> Recipient<M, NoResponse> {
712    /// Creates a new recipient from a tokio channel. Used in tests.
713    pub fn new(tx: tokio::sync::mpsc::UnboundedSender<M>) -> Self {
714        Self {
715            inner: Box::new(tx),
716        }
717    }
718}
719
720// Manual implementation since `XSender` cannot require `Clone` for object safety.
721impl<M, R: MessageResponse> Clone for Recipient<M, R> {
722    fn clone(&self) -> Self {
723        Self {
724            inner: self.inner.to_trait_object(),
725        }
726    }
727}
728
729/// The address of a [`Service`].
730///
731/// Addresses allow to [send](Self::send) messages to a service that implements a corresponding
732/// [`Interface`] as long as the service is running.
733///
734/// Addresses can be freely cloned. When the last clone is dropped, the message channel of the
735/// service closes permanently, which signals to the service that it can shut down.
736pub struct Addr<I: Interface> {
737    tx: mpsc::UnboundedSender<I>,
738    queue_size: Arc<AtomicU64>,
739}
740
741impl<I: Interface> Addr<I> {
742    /// Sends a message to the service and returns the response.
743    ///
744    /// Depending on the message's response behavior, this either returns a future resolving to the
745    /// return value, or does not return anything for fire-and-forget messages. The communication
746    /// channel with the service is unbounded, so backlogs could occur when sending too many
747    /// messages.
748    ///
749    /// Sending asynchronous messages can fail with `Err(SendError)` if the service has shut down.
750    /// The result of asynchronous messages does not have to be awaited. The message will be
751    /// delivered and handled regardless:
752    pub fn send<M>(&self, message: M) -> <I::Response as MessageResponse>::Output
753    where
754        I: FromMessage<M>,
755    {
756        let (tx, rx) = I::Response::channel();
757        self.queue_size.fetch_add(1, Ordering::SeqCst);
758        self.tx.send(I::from_message(message, tx)).ok(); // it's ok to drop, the response will fail
759        rx
760    }
761
762    /// Returns a handle that can receive a given message independent of the interface.
763    ///
764    /// See [`Recipient`] for more information and examples.
765    pub fn recipient<M>(self) -> Recipient<M, I::Response>
766    where
767        I: FromMessage<M>,
768    {
769        Recipient {
770            inner: Box::new(self),
771        }
772    }
773
774    /// Returns wether the queue is currently empty.
775    pub fn is_empty(&self) -> bool {
776        self.len() == 0
777    }
778
779    /// Returns the current queue size.
780    pub fn len(&self) -> u64 {
781        self.queue_size.load(Ordering::Relaxed)
782    }
783
784    /// Custom address used for testing.
785    ///
786    /// Returns the receiving end of the channel for inspection.
787    pub fn custom() -> (Self, mpsc::UnboundedReceiver<I>) {
788        let (tx, rx) = mpsc::unbounded_channel();
789        (
790            Addr {
791                tx,
792                queue_size: Default::default(),
793            },
794            rx,
795        )
796    }
797
798    /// Dummy address used for testing.
799    pub fn dummy() -> Self {
800        Self::custom().0
801    }
802}
803
804impl<I: Interface> fmt::Debug for Addr<I> {
805    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
806        f.debug_struct("Addr")
807            .field("open", &!self.tx.is_closed())
808            .field("queue_size", &self.queue_size.load(Ordering::Relaxed))
809            .finish()
810    }
811}
812
813// Manually derive `Clone` since we do not require `I: Clone` and the Clone derive adds this
814// constraint.
815impl<I: Interface> Clone for Addr<I> {
816    fn clone(&self) -> Self {
817        Self {
818            tx: self.tx.clone(),
819            queue_size: self.queue_size.clone(),
820        }
821    }
822}
823
824impl<I, M> SendDispatch<M> for Addr<I>
825where
826    I: Interface + FromMessage<M>,
827{
828    type Response = <I as FromMessage<M>>::Response;
829
830    fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output {
831        Addr::send(self, message)
832    }
833
834    fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>> {
835        Box::new(self.clone())
836    }
837}
838
839// Used in tests.
840impl<M: 'static + Send> SendDispatch<M> for tokio::sync::mpsc::UnboundedSender<M> {
841    type Response = NoResponse;
842
843    fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output {
844        self.send(message).ok();
845    }
846
847    fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>> {
848        Box::new(self.clone())
849    }
850}
851
852/// Inbound channel for messages sent through an [`Addr`].
853///
854/// This channel is meant to be polled in a [`Service`].
855///
856/// Instances are created automatically when [spawning](ServiceSpawn) a service, or can be
857/// created through [`channel`]. The channel closes when all associated [`Addr`]s are dropped.
858pub struct Receiver<I: Interface> {
859    rx: mpsc::UnboundedReceiver<I>,
860    name: &'static str,
861    interval: tokio::time::Interval,
862    queue_size: Arc<AtomicU64>,
863}
864
865impl<I: Interface> Receiver<I> {
866    /// Receives the next value for this receiver.
867    ///
868    /// This method returns `None` if the channel has been closed and there are
869    /// no remaining messages in the channel's buffer. This indicates that no
870    /// further values can ever be received from this `Receiver`. The channel is
871    /// closed when all senders have been dropped.
872    ///
873    /// If there are no messages in the channel's buffer, but the channel has
874    /// not yet been closed, this method will sleep until a message is sent or
875    /// the channel is closed.
876    pub async fn recv(&mut self) -> Option<I> {
877        loop {
878            tokio::select! {
879                biased;
880
881                _ = self.interval.tick() => {
882                    let backlog = self.queue_size.load(Ordering::Relaxed);
883                    relay_statsd::metric!(
884                        gauge(SystemGauges::ServiceBackPressure) = backlog,
885                        service = self.name
886                    );
887                },
888                message = self.rx.recv() => {
889                    self.queue_size.fetch_sub(1, Ordering::SeqCst);
890                    return message;
891                },
892            }
893        }
894    }
895}
896
897impl<I: Interface> fmt::Debug for Receiver<I> {
898    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
899        f.debug_struct("Receiver")
900            .field("name", &self.name)
901            .field("queue_size", &self.queue_size.load(Ordering::Relaxed))
902            .finish()
903    }
904}
905
906/// Creates an unbounded channel for communicating with a [`Service`].
907///
908/// The `Addr` as the sending part provides public access to the service, while the `Receiver`
909/// should remain internal to the service.
910pub fn channel<I: Interface>(name: &'static str) -> (Addr<I>, Receiver<I>) {
911    let queue_size = Arc::new(AtomicU64::new(0));
912    let (tx, rx) = mpsc::unbounded_channel();
913
914    let addr = Addr {
915        tx,
916        queue_size: queue_size.clone(),
917    };
918
919    let mut interval = tokio::time::interval(BACKLOG_INTERVAL);
920    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
921
922    let receiver = Receiver {
923        rx,
924        name,
925        interval,
926        queue_size,
927    };
928
929    (addr, receiver)
930}
931
932/// An asynchronous unit responding to messages.
933///
934/// Services receive messages conforming to some [`Interface`] through an [`Addr`] and handle them
935/// one by one. Internally, services are free to concurrently process these messages or not, most
936/// probably should.
937///
938/// Individual messages can have a response which will be sent once the message is handled by the
939/// service. The sender can asynchronously await the responses of such messages.
940///
941/// To start a service, create a service runner and call [`ServiceSpawnExt::start`].
942///
943/// # Implementing Services
944///
945/// The standard way to implement services is through the `run` function. It receives an inbound
946/// channel for all messages sent through the service's address. Note that this function is
947/// synchronous, so that this needs to spawn at least one task internally:
948///
949/// ```no_run
950/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service, ServiceSpawnExt};
951/// # fn test(services: &dyn relay_system::ServiceSpawn) {
952/// struct MyMessage;
953///
954/// impl Interface for MyMessage {}
955///
956/// impl FromMessage<Self> for MyMessage {
957///     type Response = NoResponse;
958///
959///     fn from_message(message: Self, _: ()) -> Self {
960///         message
961///     }
962/// }
963///
964/// struct MyService;
965///
966/// impl Service for MyService {
967///     type Interface = MyMessage;
968///
969///     async fn run(self, mut rx: Receiver<Self::Interface>) {
970///         while let Some(message) = rx.recv().await {
971///             // handle the message
972///         }
973///     }
974/// }
975///
976/// let addr = services.start(MyService);
977/// # }
978/// ```
979///
980/// ## Debounce and Caching
981///
982/// Services that cache or debounce their responses can benefit from the [`BroadcastResponse`]
983/// behavior. To use this behavior, implement the message and interface identical to
984/// [`AsyncResponse`] above. This will provide a different sender type that can be converted into a
985/// channel to debounce responses. It is still possible to send values directly via the sender
986/// without a broadcast channel.
987///
988/// ```
989/// use std::collections::btree_map::{BTreeMap, Entry};
990/// use relay_system::{BroadcastChannel, BroadcastSender};
991///
992/// // FromMessage implementation using BroadcastResponse omitted for brevity.
993///
994/// struct MyService {
995///     cache: BTreeMap<u32, String>,
996///     channels: BTreeMap<u32, BroadcastChannel<String>>,
997/// }
998///
999/// impl MyService {
1000///     fn handle_message(&mut self, id: u32, sender: BroadcastSender<String>) {
1001///         if let Some(cached) = self.cache.get(&id) {
1002///             sender.send(cached.clone());
1003///             return;
1004///         }
1005///
1006///         match self.channels.entry(id) {
1007///             Entry::Vacant(entry) => {
1008///                 entry.insert(sender.into_channel());
1009///                 // Start async computation here.
1010///             }
1011///             Entry::Occupied(mut entry) => {
1012///                 entry.get_mut().attach(sender);
1013///             }
1014///         }
1015///     }
1016///
1017///     fn finish_compute(&mut self, id: u32, value: String) {
1018///         if let Some(channel) = self.channels.remove(&id) {
1019///             channel.send(value.clone());
1020///         }
1021///
1022///         self.cache.insert(id, value);
1023///     }
1024/// }
1025/// ```
1026///
1027pub trait Service: Sized {
1028    /// The interface of messages this service implements.
1029    ///
1030    /// The interface can be a single message type or an enumeration of all the messages that
1031    /// can be handled by this service.
1032    type Interface: Interface;
1033
1034    /// Defines the main task of this service.
1035    ///
1036    /// `run` typically contains a loop that reads from `rx`, or a `select!` that reads
1037    /// from multiple sources at once.
1038    fn run(self, rx: Receiver<Self::Interface>) -> impl Future<Output = ()> + Send + 'static;
1039
1040    /// Starts the service in the current runtime and returns an address for it.
1041    ///
1042    /// The service runs in a detached tokio task that cannot be joined on. This is mainly useful
1043    /// for tests.
1044    fn start_detached(self) -> Addr<Self::Interface> {
1045        let (addr, rx) = channel(Self::name());
1046        spawn(TaskId::for_service::<Self>(), self.run(rx));
1047        addr
1048    }
1049
1050    /// Returns a unique name for this service implementation.
1051    ///
1052    /// This is used for internal diagnostics and uses the fully qualified type name of the service
1053    /// implementor by default.
1054    fn name() -> &'static str {
1055        std::any::type_name::<Self>()
1056    }
1057}
1058
1059/// The [`ServiceSpawn`] trait allows for starting a [`Service`] on an executor that will run them to completion.
1060///
1061/// Often you want to spawn your service directly via [`ServiceSpawnExt`].
1062pub trait ServiceSpawn {
1063    /// Starts the service on the executor.
1064    fn start_obj(&self, service: ServiceObj);
1065}
1066
1067/// Extension trait for [`ServiceSpawn`], providing more convenient methods to spawn a service.
1068pub trait ServiceSpawnExt {
1069    /// Starts a service and starts tracking its join handle, exposing an [`Addr`] for message passing.
1070    fn start<S: Service>(&self, service: S) -> Addr<S::Interface>;
1071
1072    /// Starts a service and starts tracking its join handle, given a predefined receiver.
1073    fn start_with<S: Service>(&self, service: S, rx: Receiver<S::Interface>);
1074}
1075
1076impl<T: ServiceSpawn + ?Sized> ServiceSpawnExt for T {
1077    fn start<S: Service>(&self, service: S) -> Addr<S::Interface> {
1078        let (addr, rx) = crate::channel(S::name());
1079        self.start_with(service, rx);
1080        addr
1081    }
1082
1083    fn start_with<S: Service>(&self, service: S, rx: Receiver<S::Interface>) {
1084        self.start_obj(ServiceObj::new(service, rx));
1085    }
1086}
1087
1088/// Type erased [`Service`].
1089///
1090/// Used to start a service using [`ServiceSpawn`] and usually not directly interacted with.
1091/// Use [`ServiceSpawnExt`] when possible instead.
1092pub struct ServiceObj {
1093    name: &'static str,
1094    future: BoxFuture<'static, ()>,
1095}
1096
1097impl ServiceObj {
1098    /// Creates a new bundled type erased [`Service`], that can be started using [`ServiceSpawn`].
1099    pub fn new<S: Service>(service: S, rx: Receiver<S::Interface>) -> Self {
1100        Self {
1101            name: S::name(),
1102            future: service.run(rx).boxed(),
1103        }
1104    }
1105
1106    /// Returns the name of the service.
1107    ///
1108    /// The name of the service is inferred from [`Service::name`].
1109    pub fn name(&self) -> &'static str {
1110        self.name
1111    }
1112}
1113
1114#[cfg(feature = "test")]
1115/// A [`ServiceSpawn`] implementation which spawns a service on the current Tokio runtime.
1116pub struct TokioServiceSpawn;
1117
1118#[cfg(feature = "test")]
1119impl ServiceSpawn for TokioServiceSpawn {
1120    #[track_caller]
1121    fn start_obj(&self, service: ServiceObj) {
1122        crate::spawn!(service.future);
1123    }
1124}
1125
1126#[cfg(test)]
1127mod tests {
1128    use super::*;
1129
1130    struct MockMessage;
1131
1132    impl Interface for MockMessage {}
1133
1134    impl FromMessage<Self> for MockMessage {
1135        type Response = NoResponse;
1136
1137        fn from_message(message: Self, _: ()) -> Self {
1138            message
1139        }
1140    }
1141
1142    struct MockService;
1143
1144    impl Service for MockService {
1145        type Interface = MockMessage;
1146
1147        async fn run(self, mut rx: Receiver<Self::Interface>) {
1148            while rx.recv().await.is_some() {
1149                tokio::time::sleep(BACKLOG_INTERVAL * 2).await;
1150            }
1151        }
1152
1153        fn name() -> &'static str {
1154            "mock"
1155        }
1156    }
1157
1158    #[test]
1159    fn test_backpressure_metrics() {
1160        let rt = tokio::runtime::Builder::new_current_thread()
1161            .enable_time()
1162            .build()
1163            .unwrap();
1164
1165        let _guard = rt.enter();
1166        tokio::time::pause();
1167
1168        // Mock service takes 2 * BACKLOG_INTERVAL for every message
1169        let addr = MockService.start_detached();
1170
1171        // Advance the timer by a tiny offset to trigger the first metric emission.
1172        let captures = relay_statsd::with_capturing_test_client(|| {
1173            rt.block_on(async {
1174                tokio::time::sleep(Duration::from_millis(10)).await;
1175            })
1176        });
1177
1178        assert_eq!(captures, ["service.back_pressure:0|g|#service:mock"]);
1179
1180        // Send messages and advance to 0.5 * INTERVAL. No metrics expected at this point.
1181        let captures = relay_statsd::with_capturing_test_client(|| {
1182            rt.block_on(async {
1183                addr.send(MockMessage); // will be pulled immediately
1184                addr.send(MockMessage);
1185                addr.send(MockMessage);
1186
1187                tokio::time::sleep(BACKLOG_INTERVAL / 2).await;
1188            })
1189        });
1190
1191        assert!(captures.is_empty());
1192
1193        // Advance to 6.5 * INTERVAL. The service should pull the first message immediately, another
1194        // message every 2 INTERVALS. The messages are fully handled after 6 INTERVALS, but we
1195        // cannot observe that since the last message exits the queue at 4.
1196        let captures = relay_statsd::with_capturing_test_client(|| {
1197            rt.block_on(async {
1198                tokio::time::sleep(BACKLOG_INTERVAL * 6).await;
1199            })
1200        });
1201
1202        assert_eq!(
1203            captures,
1204            [
1205                "service.back_pressure:2|g|#service:mock", // 2 * INTERVAL
1206                "service.back_pressure:1|g|#service:mock", // 4 * INTERVAL
1207                "service.back_pressure:0|g|#service:mock", // 6 * INTERVAL
1208            ]
1209        );
1210    }
1211}