pub trait Service: Sized {
type Interface: Interface;
// Required method
fn run(
self,
rx: Receiver<Self::Interface>,
) -> impl Future<Output = ()> + Send + 'static;
// Provided methods
fn start_detached(self) -> Addr<Self::Interface> { ... }
fn name() -> &'static str { ... }
}
Expand description
An asynchronous unit responding to messages.
Services receive messages conforming to some Interface
through an Addr
and handle them
one by one. Internally, services are free to concurrently process these messages or not, most
probably should.
Individual messages can have a response which will be sent once the message is handled by the service. The sender can asynchronously await the responses of such messages.
To start a service, create a service runner and call ServiceRunner::start
.
§Implementing Services
The standard way to implement services is through the run
function. It receives an inbound
channel for all messages sent through the service’s address. Note that this function is
synchronous, so that this needs to spawn at least one task internally:
use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service, ServiceRunner};
struct MyMessage;
impl Interface for MyMessage {}
impl FromMessage<Self> for MyMessage {
type Response = NoResponse;
fn from_message(message: Self, _: ()) -> Self {
message
}
}
struct MyService;
impl Service for MyService {
type Interface = MyMessage;
async fn run(self, mut rx: Receiver<Self::Interface>) {
while let Some(message) = rx.recv().await {
// handle the message
}
}
}
let addr = ServiceRunner::new().start(MyService);
§Debounce and Caching
Services that cache or debounce their responses can benefit from the BroadcastResponse
behavior. To use this behavior, implement the message and interface identical to
AsyncResponse
above. This will provide a different sender type that can be converted into a
channel to debounce responses. It is still possible to send values directly via the sender
without a broadcast channel.
use std::collections::btree_map::{BTreeMap, Entry};
use relay_system::{BroadcastChannel, BroadcastSender};
// FromMessage implementation using BroadcastResponse omitted for brevity.
struct MyService {
cache: BTreeMap<u32, String>,
channels: BTreeMap<u32, BroadcastChannel<String>>,
}
impl MyService {
fn handle_message(&mut self, id: u32, sender: BroadcastSender<String>) {
if let Some(cached) = self.cache.get(&id) {
sender.send(cached.clone());
return;
}
match self.channels.entry(id) {
Entry::Vacant(entry) => {
entry.insert(sender.into_channel());
// Start async computation here.
}
Entry::Occupied(mut entry) => {
entry.get_mut().attach(sender);
}
}
}
fn finish_compute(&mut self, id: u32, value: String) {
if let Some(channel) = self.channels.remove(&id) {
channel.send(value.clone());
}
self.cache.insert(id, value);
}
}
Required Associated Types§
Required Methods§
Provided Methods§
sourcefn start_detached(self) -> Addr<Self::Interface>
fn start_detached(self) -> Addr<Self::Interface>
Starts the service in the current runtime and returns an address for it.
The service runs in a detached tokio task that cannot be joined on. This is mainly useful for tests.