relay_system/service/mod.rs
1use std::fmt;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::future::{BoxFuture, Shared};
12use tokio::sync::{mpsc, oneshot};
13use tokio::time::MissedTickBehavior;
14
15use crate::statsd::SystemGauges;
16use crate::{TaskId, spawn};
17
18mod registry;
19mod status;
20
21pub(crate) use self::registry::Registry as ServiceRegistry;
22pub use self::registry::{ServiceId, ServiceMetrics, ServicesMetrics};
23pub use self::status::{
24 ServiceError, ServiceJoinHandle, ServiceStatusError, ServiceStatusJoinHandle,
25};
26
27/// Interval for recording backlog metrics on service channels.
28const BACKLOG_INTERVAL: Duration = Duration::from_secs(1);
29
30/// A message interface for [services](Service).
31///
32/// Most commonly, this interface is an enumeration of messages, but it can also be implemented on a
33/// single message. For each individual message, this type needs to implement the [`FromMessage`]
34/// trait.
35///
36/// # Implementating Interfaces
37///
38/// There are three main ways to implement interfaces, which depends on the number of messages and
39/// their return values. The simplest way is an interface consisting of a **single message** with
40/// **no return value**. For this case, use the message directly as interface and choose
41/// `NoResponse` as response:
42///
43/// ```
44/// use relay_system::{FromMessage, Interface, NoResponse};
45///
46/// #[derive(Debug)]
47/// pub struct MyMessage;
48///
49/// impl Interface for MyMessage {}
50///
51/// impl FromMessage<Self> for MyMessage {
52/// type Response = NoResponse;
53///
54/// fn from_message(message: Self, _: ()) -> Self {
55/// message
56/// }
57/// }
58/// ```
59///
60/// If there is a **single message with a return value**, implement the interface as a wrapper for
61/// the message and the return [`Sender`]:
62///
63/// ```
64/// use relay_system::{AsyncResponse, FromMessage, Interface, Sender};
65///
66/// #[derive(Debug)]
67/// pub struct MyMessage;
68///
69/// #[derive(Debug)]
70/// pub struct MyInterface(MyMessage, Sender<bool>);
71///
72/// impl Interface for MyInterface {}
73///
74/// impl FromMessage<MyMessage> for MyInterface {
75/// type Response = AsyncResponse<bool>;
76///
77/// fn from_message(message: MyMessage, sender: Sender<bool>) -> Self {
78/// Self(message, sender)
79/// }
80/// }
81/// ```
82///
83/// Finally, interfaces with **multiple messages** of any kind can most commonly be implemented
84/// through an enumeration for every message. The variants of messages with return values need a
85/// `Sender` again:
86///
87/// ```
88/// use relay_system::{AsyncResponse, FromMessage, Interface, NoResponse, Sender};
89///
90/// #[derive(Debug)]
91/// pub struct GetFlag;
92///
93/// #[derive(Debug)]
94/// pub struct SetFlag(pub bool);
95///
96/// #[derive(Debug)]
97/// pub enum MyInterface {
98/// Get(GetFlag, Sender<bool>),
99/// Set(SetFlag),
100/// }
101///
102/// impl Interface for MyInterface {}
103///
104/// impl FromMessage<GetFlag> for MyInterface {
105/// type Response = AsyncResponse<bool>;
106///
107/// fn from_message(message: GetFlag, sender: Sender<bool>) -> Self {
108/// Self::Get(message, sender)
109/// }
110/// }
111///
112/// impl FromMessage<SetFlag> for MyInterface {
113/// type Response = NoResponse;
114///
115/// fn from_message(message: SetFlag, _: ()) -> Self {
116/// Self::Set(message)
117/// }
118/// }
119/// ```
120///
121/// # Requirements
122///
123/// Interfaces are meant to be sent to services via channels. As such, they need to be both `Send`
124/// and `'static`. It is highly encouraged to implement `Debug` on all interfaces and their
125/// messages.
126pub trait Interface: Send + 'static {}
127
128/// Services without messages can use `()` as their interface.
129impl Interface for () {}
130
131/// An error when [sending](Addr::send) a message to a service fails.
132#[derive(Clone, Copy, Debug, PartialEq)]
133pub struct SendError;
134
135impl fmt::Display for SendError {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 write!(f, "failed to send message to service")
138 }
139}
140
141impl std::error::Error for SendError {}
142
143/// Response behavior of an [`Interface`] message.
144///
145/// It defines how a service handles and responds to interface messages, such as through
146/// asynchronous responses or fire-and-forget without responding. [`FromMessage`] implementations
147/// declare this behavior on the interface.
148///
149/// See [`FromMessage`] for more information on how to use this trait.
150pub trait MessageResponse {
151 /// Sends responses from the service back to the waiting recipient.
152 type Sender;
153
154 /// The type returned from [`Addr::send`].
155 ///
156 /// This type can be either synchronous and asynchronous based on the responder.
157 type Output;
158
159 /// Returns the response channel for an interface message.
160 fn channel() -> (Self::Sender, Self::Output);
161}
162
163/// The request when sending an asynchronous message to a service.
164///
165/// This is returned from [`Addr::send`] when the message responds asynchronously through
166/// [`AsyncResponse`]. It is a future that should be awaited. The message still runs to
167/// completion if this future is dropped.
168pub struct Request<T>(oneshot::Receiver<T>);
169
170impl<T> fmt::Debug for Request<T> {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 f.debug_struct("Request").finish_non_exhaustive()
173 }
174}
175
176impl<T> Future for Request<T> {
177 type Output = Result<T, SendError>;
178
179 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
180 Pin::new(&mut self.0)
181 .poll(cx)
182 .map(|r| r.map_err(|_| SendError))
183 }
184}
185
186/// Sends a message response from a service back to the waiting [`Request`].
187///
188/// The sender is part of an [`AsyncResponse`] and should be moved into the service interface
189/// type. If this sender is dropped without calling [`send`](Self::send), the request fails with
190/// [`SendError`].
191pub struct Sender<T>(oneshot::Sender<T>);
192
193impl<T> fmt::Debug for Sender<T> {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 f.debug_struct("Sender")
196 .field("open", &!self.0.is_closed())
197 .finish()
198 }
199}
200
201impl<T> Sender<T> {
202 /// Sends the response value and closes the [`Request`].
203 ///
204 /// This silently drops the value if the request has been dropped.
205 pub fn send(self, value: T) {
206 self.0.send(value).ok();
207 }
208}
209
210/// Message response resulting in an asynchronous [`Request`].
211///
212/// The sender must be placed on the interface in [`FromMessage::from_message`].
213///
214/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
215pub struct AsyncResponse<T>(PhantomData<T>);
216
217impl<T> fmt::Debug for AsyncResponse<T> {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 f.write_str("AsyncResponse")
220 }
221}
222
223impl<T> MessageResponse for AsyncResponse<T> {
224 type Sender = Sender<T>;
225 type Output = Request<T>;
226
227 fn channel() -> (Self::Sender, Self::Output) {
228 let (tx, rx) = oneshot::channel();
229 (Sender(tx), Request(rx))
230 }
231}
232
233/// Message response for fire-and-forget messages with no output.
234///
235/// There is no sender associated to this response. When implementing [`FromMessage`], the sender
236/// can be ignored.
237///
238/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
239pub struct NoResponse;
240
241impl fmt::Debug for NoResponse {
242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243 f.write_str("NoResponse")
244 }
245}
246
247impl MessageResponse for NoResponse {
248 type Sender = ();
249 type Output = ();
250
251 fn channel() -> (Self::Sender, Self::Output) {
252 ((), ())
253 }
254}
255
256/// Initial response to a [`BroadcastRequest`].
257#[derive(Debug)]
258enum InitialResponse<T> {
259 /// The response value is immediately ready.
260 ///
261 /// The sender did not attach to a broadcast channel and instead resolved the requested value
262 /// immediately. The request is now ready and can resolve. See [`BroadcastChannel::attach`].
263 Ready(T),
264 /// The sender is attached to a channel that needs to be polled.
265 Poll(Shared<oneshot::Receiver<T>>),
266}
267
268/// States of a [`BroadcastRequest`].
269enum BroadcastState<T> {
270 /// The request is waiting for an initial response.
271 Pending(oneshot::Receiver<InitialResponse<T>>),
272 /// The request is attached to a [`BroadcastChannel`].
273 Attached(Shared<oneshot::Receiver<T>>),
274}
275
276/// The request when sending an asynchronous message to a service.
277///
278/// This is returned from [`Addr::send`] when the message responds asynchronously through
279/// [`BroadcastResponse`]. It is a future that should be awaited. The message still runs to
280/// completion if this future is dropped.
281///
282/// # Panics
283///
284/// This future is not fused and panics if it is polled again after it has resolved.
285pub struct BroadcastRequest<T>(BroadcastState<T>)
286where
287 T: Clone;
288
289impl<T: Clone> Future for BroadcastRequest<T> {
290 type Output = Result<T, SendError>;
291
292 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
293 Poll::Ready(loop {
294 match self.0 {
295 BroadcastState::Pending(ref mut pending) => {
296 match futures::ready!(Pin::new(pending).poll(cx)) {
297 Ok(InitialResponse::Ready(value)) => break Ok(value),
298 Ok(InitialResponse::Poll(shared)) => {
299 self.0 = BroadcastState::Attached(shared)
300 }
301 Err(_) => break Err(SendError),
302 }
303 }
304 BroadcastState::Attached(ref mut shared) => {
305 match futures::ready!(Pin::new(shared).poll(cx)) {
306 Ok(value) => break Ok(value),
307 Err(_) => break Err(SendError),
308 }
309 }
310 }
311 })
312 }
313}
314
315/// A channel that broadcasts values to attached [senders](BroadcastSender).
316///
317/// This is part of the [`BroadcastResponse`] message behavior to efficiently send delayed responses
318/// to a large number of senders. All requests that are attached to this channel via their senders
319/// resolve with the same value.
320///
321/// # Example
322///
323/// ```
324/// use relay_system::{BroadcastChannel, BroadcastSender};
325///
326/// struct MyService {
327/// channel: Option<BroadcastChannel<String>>,
328/// }
329///
330/// impl MyService {
331/// fn handle_message(&mut self, sender: BroadcastSender<String>) {
332/// if let Some(ref mut channel) = self.channel {
333/// channel.attach(sender);
334/// } else {
335/// self.channel = Some(sender.into_channel());
336/// }
337/// }
338///
339/// fn finish_compute(&mut self, value: String) {
340/// if let Some(channel) = self.channel.take() {
341/// channel.send(value);
342/// }
343/// }
344/// }
345/// ```
346#[derive(Debug)]
347pub struct BroadcastChannel<T>
348where
349 T: Clone,
350{
351 tx: oneshot::Sender<T>,
352 rx: Shared<oneshot::Receiver<T>>,
353}
354
355impl<T: Clone> BroadcastChannel<T> {
356 /// Creates a standalone channel.
357 ///
358 /// Use [`attach`](Self::attach) to add senders to this channel. Alternatively, create a channel
359 /// with [`BroadcastSender::into_channel`].
360 pub fn new() -> Self {
361 let (tx, rx) = oneshot::channel();
362 Self {
363 tx,
364 rx: rx.shared(),
365 }
366 }
367
368 /// Attaches a sender of another message to this channel to receive the same value.
369 ///
370 /// # Example
371 ///
372 /// ```
373 /// use relay_system::{BroadcastChannel, BroadcastResponse, BroadcastSender};
374 /// # use relay_system::MessageResponse;
375 ///
376 /// // This is usually done as part of `Addr::send`
377 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
378 ///
379 /// let mut channel = BroadcastChannel::new();
380 /// channel.attach(sender);
381 /// ```
382 pub fn attach(&mut self, sender: BroadcastSender<T>) {
383 sender.0.send(InitialResponse::Poll(self.rx.clone())).ok();
384 }
385
386 /// Sends a value to all attached senders and closes the channel.
387 ///
388 /// This method succeeds even if no senders are attached to this channel anymore. To check if
389 /// this channel is still active with senders attached, use [`is_attached`](Self::is_attached).
390 ///
391 /// # Example
392 ///
393 /// ```
394 /// use relay_system::BroadcastResponse;
395 /// # use relay_system::MessageResponse;
396 /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
397 ///
398 /// // This is usually done as part of `Addr::send`
399 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
400 ///
401 /// let channel = sender.into_channel();
402 /// channel.send("test");
403 /// assert_eq!(rx.await, Ok("test"));
404 /// # })
405 /// ```
406 pub fn send(self, value: T) {
407 self.tx.send(value).ok();
408 }
409
410 /// Returns `true` if there are [requests](BroadcastRequest) waiting for this channel.
411 ///
412 /// The channel is not permanently closed when all waiting requests have detached. A new sender
413 /// can be attached using [`attach`](Self::attach) even after this method returns `false`.
414 ///
415 /// # Example
416 ///
417 /// ```
418 /// use relay_system::BroadcastResponse;
419 /// # use relay_system::MessageResponse;
420 ///
421 /// // This is usually done as part of `Addr::send`
422 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
423 ///
424 /// let channel = sender.into_channel();
425 /// assert!(channel.is_attached());
426 ///
427 /// drop(rx);
428 /// assert!(!channel.is_attached());
429 /// ```
430 pub fn is_attached(&self) -> bool {
431 self.rx.strong_count() > Some(1)
432 }
433}
434
435impl<T: Clone> Default for BroadcastChannel<T> {
436 fn default() -> Self {
437 Self::new()
438 }
439}
440
441/// Sends a message response from a service back to the waiting [`BroadcastRequest`].
442///
443/// The sender is part of an [`BroadcastResponse`] and should be moved into the service interface
444/// type. If this sender is dropped without calling [`send`](Self::send), the request fails with
445/// [`SendError`].
446///
447/// As opposed to the regular [`Sender`] for asynchronous messages, this sender can be converted
448/// into a [channel](Self::into_channel) that efficiently shares a common response for multiple
449/// requests to the same data value. This is useful if resolving or computing the value takes more
450/// time.
451///
452/// # Example
453///
454/// ```
455/// use relay_system::BroadcastResponse;
456/// # use relay_system::MessageResponse;
457/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
458///
459/// // This is usually done as part of `Addr::send`
460/// let (sender1, rx1) = BroadcastResponse::<&str>::channel();
461/// let (sender2, rx2) = BroadcastResponse::<&str>::channel();
462///
463/// // On the first time, convert the sender into a channel
464/// let mut channel = sender1.into_channel();
465///
466/// // The second time, attach the sender to the existing channel
467/// channel.attach(sender2);
468///
469/// // Send a value into the channel to resolve all requests simultaneously
470/// channel.send("test");
471/// assert_eq!(rx1.await, Ok("test"));
472/// assert_eq!(rx2.await, Ok("test"));
473/// # })
474/// ```
475#[derive(Debug)]
476pub struct BroadcastSender<T>(oneshot::Sender<InitialResponse<T>>)
477where
478 T: Clone;
479
480impl<T: Clone> BroadcastSender<T> {
481 /// Immediately resolve a ready value.
482 ///
483 /// This bypasses shared channels and directly sends the a value to the waiting
484 /// [request](BroadcastRequest). In terms of performance and behavior, using `send` is
485 /// equivalent to calling [`Sender::send`] for a regular [`AsyncResponse`].
486 ///
487 /// # Example
488 ///
489 /// ```
490 /// use relay_system::BroadcastResponse;
491 /// # use relay_system::MessageResponse;
492 /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
493 ///
494 /// // This is usually done as part of `Addr::send`
495 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
496 ///
497 /// // sender is NOT converted into a channel!
498 ///
499 /// sender.send("test");
500 /// assert_eq!(rx.await, Ok("test"));
501 /// # })
502 /// ```
503 pub fn send(self, value: T) {
504 self.0.send(InitialResponse::Ready(value)).ok();
505 }
506
507 /// Creates a channel from this sender that can be shared with other senders.
508 ///
509 /// To add more senders to the created channel at a later point, use
510 /// [`attach`](BroadcastChannel::attach).
511 ///
512 /// # Example
513 ///
514 /// ```
515 /// use relay_system::{BroadcastChannel, BroadcastResponse};
516 /// # use relay_system::MessageResponse;
517 ///
518 /// // This is usually done as part of `Addr::send`
519 /// let (sender, rx) = BroadcastResponse::<&str>::channel();
520 ///
521 /// let channel: BroadcastChannel<&str> = sender.into_channel();
522 /// ```
523 pub fn into_channel(self) -> BroadcastChannel<T> {
524 let mut channel = BroadcastChannel::new();
525 channel.attach(self);
526 channel
527 }
528}
529
530/// Variation of [`AsyncResponse`] that efficiently broadcasts responses to many requests.
531///
532/// This response behavior is useful for services that cache or debounce requests. Instead of
533/// responding to each equivalent request via its individual sender, the broadcast behavior allows
534/// to create a [`BroadcastChannel`] that efficiently resolves all pending requests once the value
535/// is ready.
536///
537/// Similar to `AsyncResponse`, the service receives a sender that it can use to send a value
538/// directly back to the waiting request. Additionally, the sender can be converted into a channel
539/// or attached to an already existing channel, if the service expects more requests while computing
540/// the response.
541///
542/// See [`FromMessage`] and [`Service`] for implementation advice and examples.
543pub struct BroadcastResponse<T>(PhantomData<T>)
544where
545 T: Clone;
546
547impl<T: Clone> fmt::Debug for BroadcastResponse<T> {
548 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549 f.write_str("BroadcastResponse")
550 }
551}
552
553impl<T: Clone> MessageResponse for BroadcastResponse<T> {
554 type Sender = BroadcastSender<T>;
555 type Output = BroadcastRequest<T>;
556
557 fn channel() -> (Self::Sender, Self::Output) {
558 let (tx, rx) = oneshot::channel();
559 (
560 BroadcastSender(tx),
561 BroadcastRequest(BroadcastState::Pending(rx)),
562 )
563 }
564}
565
566/// Declares a message as part of an [`Interface`].
567///
568/// Messages have an associated `Response` type that determines the return value of sending the
569/// message. Within an interface, the responder can vary for each message. There are two provided
570/// responders.
571///
572/// # No Response
573///
574/// [`NoResponse`] is used for fire-and-forget messages that do not return any values. These
575/// messages do not spawn futures and cannot be awaited. It is neither possible to verify whether
576/// the message was delivered to the service.
577///
578/// When implementing `FromMessage` for such messages, the second argument can be ignored by
579/// convention:
580///
581/// ```
582/// use relay_system::{FromMessage, Interface, NoResponse};
583///
584/// struct MyMessage;
585///
586/// enum MyInterface {
587/// MyMessage(MyMessage),
588/// // ...
589/// }
590///
591/// impl Interface for MyInterface {}
592///
593/// impl FromMessage<MyMessage> for MyInterface {
594/// type Response = NoResponse;
595///
596/// fn from_message(message: MyMessage, _: ()) -> Self {
597/// Self::MyMessage(message)
598/// }
599/// }
600/// ```
601///
602/// # Asynchronous Responses
603///
604/// [`AsyncResponse`] is used for messages that resolve to some future value. This value is sent
605/// back by the service through a [`Sender`], which must be added into the interface:
606///
607/// ```
608/// use relay_system::{AsyncResponse, FromMessage, Interface, Sender};
609///
610/// struct MyMessage;
611///
612/// enum MyInterface {
613/// MyMessage(MyMessage, Sender<bool>),
614/// // ...
615/// }
616///
617/// impl Interface for MyInterface {}
618///
619/// impl FromMessage<MyMessage> for MyInterface {
620/// type Response = AsyncResponse<bool>;
621///
622/// fn from_message(message: MyMessage, sender: Sender<bool>) -> Self {
623/// Self::MyMessage(message, sender)
624/// }
625/// }
626/// ```
627///
628/// # Broadcast Responses
629///
630/// [`BroadcastResponse`] is similar to the previous asynchronous response, but it additionally
631/// allows to efficiently handle duplicate requests for services that debounce equivalent requests
632/// or cache results. On the requesting side, this behavior is identical to the asynchronous
633/// behavior, but it provides more utilities to the implementing service.
634///
635/// ```
636/// use relay_system::{BroadcastResponse, BroadcastSender, FromMessage, Interface};
637///
638/// struct MyMessage;
639///
640/// enum MyInterface {
641/// MyMessage(MyMessage, BroadcastSender<bool>),
642/// // ...
643/// }
644///
645/// impl Interface for MyInterface {}
646///
647/// impl FromMessage<MyMessage> for MyInterface {
648/// type Response = BroadcastResponse<bool>;
649///
650/// fn from_message(message: MyMessage, sender: BroadcastSender<bool>) -> Self {
651/// Self::MyMessage(message, sender)
652/// }
653/// }
654/// ```
655///
656/// See [`Interface`] for more examples on how to build interfaces using this trait and [`Service`]
657/// documentation for patterns and advice to handle messages.
658pub trait FromMessage<M>: Interface {
659 /// The behavior declaring the return value when sending this message.
660 type Response: MessageResponse;
661
662 /// Converts the message into the service interface.
663 fn from_message(message: M, sender: <Self::Response as MessageResponse>::Sender) -> Self;
664}
665
666/// Abstraction over address types for service channels.
667trait SendDispatch<M>: Send + Sync {
668 /// The behavior declaring the return value when sending this message.
669 ///
670 /// When this is implemented for a type bound to an [`Interface`], this is the same behavior as
671 /// used in [`FromMessage::Response`].
672 type Response: MessageResponse;
673
674 /// Sends a message to the service and returns the response.
675 ///
676 /// See [`Addr::send`] for more information on a concrete type.
677 fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output;
678
679 /// Returns a trait object of this type.
680 fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>>;
681}
682
683/// An address to a [`Service`] implementing any interface that takes a given message.
684///
685/// This is similar to an [`Addr`], but it is bound to a single message rather than an interface. As
686/// such, this type is not meant for communicating with a service implementation, but rather as a
687/// handle to any service that can consume a given message. These can be back-channels or hooks that
688/// are configured externally through Inversion of Control (IoC).
689///
690/// Recipients are created through [`Addr::recipient`].
691pub struct Recipient<M, R> {
692 inner: Box<dyn SendDispatch<M, Response = R>>,
693}
694
695impl<M, R> Recipient<M, R>
696where
697 R: MessageResponse,
698{
699 /// Sends a message to the service and returns the response.
700 ///
701 /// This is equivalent to [`send`](Addr::send) on the originating address.
702 pub fn send(&self, message: M) -> R::Output {
703 self.inner.send(message)
704 }
705}
706
707impl<M: 'static + Send> Recipient<M, NoResponse> {
708 /// Creates a new recipient from a tokio channel. Used in tests.
709 pub fn new(tx: tokio::sync::mpsc::UnboundedSender<M>) -> Self {
710 Self {
711 inner: Box::new(tx),
712 }
713 }
714}
715
716// Manual implementation since `XSender` cannot require `Clone` for object safety.
717impl<M, R: MessageResponse> Clone for Recipient<M, R> {
718 fn clone(&self) -> Self {
719 Self {
720 inner: self.inner.to_trait_object(),
721 }
722 }
723}
724
725/// The address of a [`Service`].
726///
727/// Addresses allow to [send](Self::send) messages to a service that implements a corresponding
728/// [`Interface`] as long as the service is running.
729///
730/// Addresses can be freely cloned. When the last clone is dropped, the message channel of the
731/// service closes permanently, which signals to the service that it can shut down.
732pub struct Addr<I: Interface> {
733 tx: mpsc::UnboundedSender<I>,
734 queue_size: Arc<AtomicU64>,
735}
736
737impl<I: Interface> Addr<I> {
738 /// Sends a message to the service and returns the response.
739 ///
740 /// Depending on the message's response behavior, this either returns a future resolving to the
741 /// return value, or does not return anything for fire-and-forget messages. The communication
742 /// channel with the service is unbounded, so backlogs could occur when sending too many
743 /// messages.
744 ///
745 /// Sending asynchronous messages can fail with `Err(SendError)` if the service has shut down.
746 /// The result of asynchronous messages does not have to be awaited. The message will be
747 /// delivered and handled regardless:
748 pub fn send<M>(&self, message: M) -> <I::Response as MessageResponse>::Output
749 where
750 I: FromMessage<M>,
751 {
752 let (tx, rx) = I::Response::channel();
753 self.queue_size.fetch_add(1, Ordering::SeqCst);
754 self.tx.send(I::from_message(message, tx)).ok(); // it's ok to drop, the response will fail
755 rx
756 }
757
758 /// Returns a handle that can receive a given message independent of the interface.
759 ///
760 /// See [`Recipient`] for more information and examples.
761 pub fn recipient<M>(self) -> Recipient<M, I::Response>
762 where
763 I: FromMessage<M>,
764 {
765 Recipient {
766 inner: Box::new(self),
767 }
768 }
769
770 /// Returns wether the queue is currently empty.
771 pub fn is_empty(&self) -> bool {
772 self.len() == 0
773 }
774
775 /// Returns the current queue size.
776 pub fn len(&self) -> u64 {
777 self.queue_size.load(Ordering::Relaxed)
778 }
779
780 /// Custom address used for testing.
781 ///
782 /// Returns the receiving end of the channel for inspection.
783 pub fn custom() -> (Self, mpsc::UnboundedReceiver<I>) {
784 let (tx, rx) = mpsc::unbounded_channel();
785 (
786 Addr {
787 tx,
788 queue_size: Default::default(),
789 },
790 rx,
791 )
792 }
793
794 /// Dummy address used for testing.
795 pub fn dummy() -> Self {
796 Self::custom().0
797 }
798}
799
800impl<I: Interface> fmt::Debug for Addr<I> {
801 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
802 f.debug_struct("Addr")
803 .field("open", &!self.tx.is_closed())
804 .field("queue_size", &self.queue_size.load(Ordering::Relaxed))
805 .finish()
806 }
807}
808
809// Manually derive `Clone` since we do not require `I: Clone` and the Clone derive adds this
810// constraint.
811impl<I: Interface> Clone for Addr<I> {
812 fn clone(&self) -> Self {
813 Self {
814 tx: self.tx.clone(),
815 queue_size: self.queue_size.clone(),
816 }
817 }
818}
819
820impl<I, M> SendDispatch<M> for Addr<I>
821where
822 I: Interface + FromMessage<M>,
823{
824 type Response = <I as FromMessage<M>>::Response;
825
826 fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output {
827 Addr::send(self, message)
828 }
829
830 fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>> {
831 Box::new(self.clone())
832 }
833}
834
835// Used in tests.
836impl<M: 'static + Send> SendDispatch<M> for tokio::sync::mpsc::UnboundedSender<M> {
837 type Response = NoResponse;
838
839 fn send(&self, message: M) -> <Self::Response as MessageResponse>::Output {
840 self.send(message).ok();
841 }
842
843 fn to_trait_object(&self) -> Box<dyn SendDispatch<M, Response = Self::Response>> {
844 Box::new(self.clone())
845 }
846}
847
848/// Inbound channel for messages sent through an [`Addr`].
849///
850/// This channel is meant to be polled in a [`Service`].
851///
852/// Instances are created automatically when [spawning](ServiceSpawn) a service, or can be
853/// created through [`channel`]. The channel closes when all associated [`Addr`]s are dropped.
854pub struct Receiver<I: Interface> {
855 rx: mpsc::UnboundedReceiver<I>,
856 name: &'static str,
857 interval: tokio::time::Interval,
858 queue_size: Arc<AtomicU64>,
859}
860
861impl<I: Interface> Receiver<I> {
862 /// Receives the next value for this receiver.
863 ///
864 /// This method returns `None` if the channel has been closed and there are
865 /// no remaining messages in the channel's buffer. This indicates that no
866 /// further values can ever be received from this `Receiver`. The channel is
867 /// closed when all senders have been dropped.
868 ///
869 /// If there are no messages in the channel's buffer, but the channel has
870 /// not yet been closed, this method will sleep until a message is sent or
871 /// the channel is closed.
872 pub async fn recv(&mut self) -> Option<I> {
873 loop {
874 tokio::select! {
875 biased;
876
877 _ = self.interval.tick() => {
878 let backlog = self.queue_size.load(Ordering::Relaxed);
879 relay_statsd::metric!(
880 gauge(SystemGauges::ServiceBackPressure) = backlog,
881 service = self.name
882 );
883 },
884 message = self.rx.recv() => {
885 self.queue_size.fetch_sub(1, Ordering::SeqCst);
886 return message;
887 },
888 }
889 }
890 }
891}
892
893impl<I: Interface> fmt::Debug for Receiver<I> {
894 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
895 f.debug_struct("Receiver")
896 .field("name", &self.name)
897 .field("queue_size", &self.queue_size.load(Ordering::Relaxed))
898 .finish()
899 }
900}
901
902/// Creates an unbounded channel for communicating with a [`Service`].
903///
904/// The `Addr` as the sending part provides public access to the service, while the `Receiver`
905/// should remain internal to the service.
906pub fn channel<I: Interface>(name: &'static str) -> (Addr<I>, Receiver<I>) {
907 let queue_size = Arc::new(AtomicU64::new(0));
908 let (tx, rx) = mpsc::unbounded_channel();
909
910 let addr = Addr {
911 tx,
912 queue_size: queue_size.clone(),
913 };
914
915 let mut interval = tokio::time::interval(BACKLOG_INTERVAL);
916 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
917
918 let receiver = Receiver {
919 rx,
920 name,
921 interval,
922 queue_size,
923 };
924
925 (addr, receiver)
926}
927
928/// An asynchronous unit responding to messages.
929///
930/// Services receive messages conforming to some [`Interface`] through an [`Addr`] and handle them
931/// one by one. Internally, services are free to concurrently process these messages or not, most
932/// probably should.
933///
934/// Individual messages can have a response which will be sent once the message is handled by the
935/// service. The sender can asynchronously await the responses of such messages.
936///
937/// To start a service, create a service runner and call [`ServiceSpawnExt::start`].
938///
939/// # Implementing Services
940///
941/// The standard way to implement services is through the `run` function. It receives an inbound
942/// channel for all messages sent through the service's address. Note that this function is
943/// synchronous, so that this needs to spawn at least one task internally:
944///
945/// ```no_run
946/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service, ServiceSpawnExt};
947/// # fn test(services: &dyn relay_system::ServiceSpawn) {
948/// struct MyMessage;
949///
950/// impl Interface for MyMessage {}
951///
952/// impl FromMessage<Self> for MyMessage {
953/// type Response = NoResponse;
954///
955/// fn from_message(message: Self, _: ()) -> Self {
956/// message
957/// }
958/// }
959///
960/// struct MyService;
961///
962/// impl Service for MyService {
963/// type Interface = MyMessage;
964///
965/// async fn run(self, mut rx: Receiver<Self::Interface>) {
966/// while let Some(message) = rx.recv().await {
967/// // handle the message
968/// }
969/// }
970/// }
971///
972/// let addr = services.start(MyService);
973/// # }
974/// ```
975///
976/// ## Debounce and Caching
977///
978/// Services that cache or debounce their responses can benefit from the [`BroadcastResponse`]
979/// behavior. To use this behavior, implement the message and interface identical to
980/// [`AsyncResponse`] above. This will provide a different sender type that can be converted into a
981/// channel to debounce responses. It is still possible to send values directly via the sender
982/// without a broadcast channel.
983///
984/// ```
985/// use std::collections::btree_map::{BTreeMap, Entry};
986/// use relay_system::{BroadcastChannel, BroadcastSender};
987///
988/// // FromMessage implementation using BroadcastResponse omitted for brevity.
989///
990/// struct MyService {
991/// cache: BTreeMap<u32, String>,
992/// channels: BTreeMap<u32, BroadcastChannel<String>>,
993/// }
994///
995/// impl MyService {
996/// fn handle_message(&mut self, id: u32, sender: BroadcastSender<String>) {
997/// if let Some(cached) = self.cache.get(&id) {
998/// sender.send(cached.clone());
999/// return;
1000/// }
1001///
1002/// match self.channels.entry(id) {
1003/// Entry::Vacant(entry) => {
1004/// entry.insert(sender.into_channel());
1005/// // Start async computation here.
1006/// }
1007/// Entry::Occupied(mut entry) => {
1008/// entry.get_mut().attach(sender);
1009/// }
1010/// }
1011/// }
1012///
1013/// fn finish_compute(&mut self, id: u32, value: String) {
1014/// if let Some(channel) = self.channels.remove(&id) {
1015/// channel.send(value.clone());
1016/// }
1017///
1018/// self.cache.insert(id, value);
1019/// }
1020/// }
1021/// ```
1022///
1023pub trait Service: Sized {
1024 /// The interface of messages this service implements.
1025 ///
1026 /// The interface can be a single message type or an enumeration of all the messages that
1027 /// can be handled by this service.
1028 type Interface: Interface;
1029
1030 /// Defines the main task of this service.
1031 ///
1032 /// `run` typically contains a loop that reads from `rx`, or a `select!` that reads
1033 /// from multiple sources at once.
1034 fn run(self, rx: Receiver<Self::Interface>) -> impl Future<Output = ()> + Send + 'static;
1035
1036 /// Starts the service in the current runtime and returns an address for it.
1037 ///
1038 /// The service runs in a detached tokio task that cannot be joined on. This is mainly useful
1039 /// for tests.
1040 fn start_detached(self) -> Addr<Self::Interface> {
1041 let (addr, rx) = channel(Self::name());
1042 spawn(TaskId::for_service::<Self>(), self.run(rx));
1043 addr
1044 }
1045
1046 /// Returns a unique name for this service implementation.
1047 ///
1048 /// This is used for internal diagnostics and uses the fully qualified type name of the service
1049 /// implementor by default.
1050 fn name() -> &'static str {
1051 std::any::type_name::<Self>()
1052 }
1053}
1054
1055/// The [`ServiceSpawn`] trait allows for starting a [`Service`] on an executor that will run them to completion.
1056///
1057/// Often you want to spawn your service directly via [`ServiceSpawnExt`].
1058pub trait ServiceSpawn {
1059 /// Starts the service on the executor.
1060 fn start_obj(&self, service: ServiceObj);
1061}
1062
1063/// Extension trait for [`ServiceSpawn`], providing more convenient methods to spawn a service.
1064pub trait ServiceSpawnExt {
1065 /// Starts a service and starts tracking its join handle, exposing an [`Addr`] for message passing.
1066 fn start<S: Service>(&self, service: S) -> Addr<S::Interface>;
1067
1068 /// Starts a service and starts tracking its join handle, given a predefined receiver.
1069 fn start_with<S: Service>(&self, service: S, rx: Receiver<S::Interface>);
1070}
1071
1072impl<T: ServiceSpawn + ?Sized> ServiceSpawnExt for T {
1073 fn start<S: Service>(&self, service: S) -> Addr<S::Interface> {
1074 let (addr, rx) = crate::channel(S::name());
1075 self.start_with(service, rx);
1076 addr
1077 }
1078
1079 fn start_with<S: Service>(&self, service: S, rx: Receiver<S::Interface>) {
1080 self.start_obj(ServiceObj::new(service, rx));
1081 }
1082}
1083
1084/// Type erased [`Service`].
1085///
1086/// Used to start a service using [`ServiceSpawn`] and usually not directly interacted with.
1087/// Use [`ServiceSpawnExt`] when possible instead.
1088pub struct ServiceObj {
1089 name: &'static str,
1090 future: BoxFuture<'static, ()>,
1091}
1092
1093impl ServiceObj {
1094 /// Creates a new bundled type erased [`Service`], that can be started using [`ServiceSpawn`].
1095 pub fn new<S: Service>(service: S, rx: Receiver<S::Interface>) -> Self {
1096 Self {
1097 name: S::name(),
1098 future: service.run(rx).boxed(),
1099 }
1100 }
1101
1102 /// Returns the name of the service.
1103 ///
1104 /// The name of the service is inferred from [`Service::name`].
1105 pub fn name(&self) -> &'static str {
1106 self.name
1107 }
1108}
1109
1110#[cfg(feature = "test")]
1111/// A [`ServiceSpawn`] implementation which spawns a service on the current Tokio runtime.
1112pub struct TokioServiceSpawn;
1113
1114#[cfg(feature = "test")]
1115impl ServiceSpawn for TokioServiceSpawn {
1116 #[track_caller]
1117 fn start_obj(&self, service: ServiceObj) {
1118 crate::spawn!(service.future);
1119 }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124 use super::*;
1125
1126 struct MockMessage;
1127
1128 impl Interface for MockMessage {}
1129
1130 impl FromMessage<Self> for MockMessage {
1131 type Response = NoResponse;
1132
1133 fn from_message(message: Self, _: ()) -> Self {
1134 message
1135 }
1136 }
1137
1138 struct MockService;
1139
1140 impl Service for MockService {
1141 type Interface = MockMessage;
1142
1143 async fn run(self, mut rx: Receiver<Self::Interface>) {
1144 while rx.recv().await.is_some() {
1145 tokio::time::sleep(BACKLOG_INTERVAL * 2).await;
1146 }
1147 }
1148
1149 fn name() -> &'static str {
1150 "mock"
1151 }
1152 }
1153
1154 #[test]
1155 fn test_backpressure_metrics() {
1156 let rt = tokio::runtime::Builder::new_current_thread()
1157 .enable_time()
1158 .build()
1159 .unwrap();
1160
1161 let _guard = rt.enter();
1162 tokio::time::pause();
1163
1164 // Mock service takes 2 * BACKLOG_INTERVAL for every message
1165 let addr = MockService.start_detached();
1166
1167 // Advance the timer by a tiny offset to trigger the first metric emission.
1168 let captures = relay_statsd::with_capturing_test_client(|| {
1169 rt.block_on(async {
1170 tokio::time::sleep(Duration::from_millis(10)).await;
1171 })
1172 });
1173
1174 assert_eq!(captures, ["service.back_pressure:0|g|#service:mock"]);
1175
1176 // Send messages and advance to 0.5 * INTERVAL. No metrics expected at this point.
1177 let captures = relay_statsd::with_capturing_test_client(|| {
1178 rt.block_on(async {
1179 addr.send(MockMessage); // will be pulled immediately
1180 addr.send(MockMessage);
1181 addr.send(MockMessage);
1182
1183 tokio::time::sleep(BACKLOG_INTERVAL / 2).await;
1184 })
1185 });
1186
1187 assert!(captures.is_empty());
1188
1189 // Advance to 6.5 * INTERVAL. The service should pull the first message immediately, another
1190 // message every 2 INTERVALS. The messages are fully handled after 6 INTERVALS, but we
1191 // cannot observe that since the last message exits the queue at 4.
1192 let captures = relay_statsd::with_capturing_test_client(|| {
1193 rt.block_on(async {
1194 tokio::time::sleep(BACKLOG_INTERVAL * 6).await;
1195 })
1196 });
1197
1198 assert_eq!(
1199 captures,
1200 [
1201 "service.back_pressure:2|g|#service:mock", // 2 * INTERVAL
1202 "service.back_pressure:1|g|#service:mock", // 4 * INTERVAL
1203 "service.back_pressure:0|g|#service:mock", // 6 * INTERVAL
1204 ]
1205 );
1206 }
1207}