relay_server/services/
test_store.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use relay_config::{Config, RelayMode};
5use relay_event_schema::protocol::EventId;
6use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender};
7
8use crate::envelope::Envelope;
9use crate::services::outcome::Outcome;
10use crate::services::processor::Processed;
11use crate::utils::TypedEnvelope;
12
13/// Either a captured envelope or an error that occured during processing.
14pub type CapturedEnvelope = Result<Box<Envelope>, String>;
15
16/// Inserts an envelope or failure into internal captures.
17///
18/// Can be retrieved using [`GetCapturedEnvelope`]. Use [`Capture::should_capture`] to check whether
19/// the message should even be sent to reduce the overheads.
20#[derive(Debug)]
21pub struct Capture {
22    event_id: Option<EventId>,
23    capture: CapturedEnvelope,
24}
25
26impl Capture {
27    /// Returns `true` if Relay is in capture mode.
28    ///
29    /// The `Capture` message can still be sent and and will be ignored. This function is purely for
30    /// optimization purposes.
31    pub fn should_capture(config: &Config) -> bool {
32        matches!(config.relay_mode(), RelayMode::Capture)
33    }
34
35    /// Captures an accepted envelope.
36    pub fn accepted(mut managed: TypedEnvelope<Processed>) -> Self {
37        let envelope = managed.take_envelope();
38        managed.accept();
39
40        Self {
41            event_id: envelope.event_id(),
42            capture: Ok(envelope),
43        }
44    }
45
46    /// Captures the error that lead to envelope rejection.
47    pub fn rejected(event_id: Option<EventId>, outcome: &Outcome) -> Self {
48        Self {
49            event_id,
50            capture: Err(outcome.to_string()),
51        }
52    }
53}
54
55/// Resolves a [`CapturedEnvelope`] by the given `event_id`.
56#[derive(Debug)]
57pub struct GetCapturedEnvelope {
58    pub event_id: EventId,
59}
60
61/// Stores and retrieves Envelopes for integration testing.
62#[derive(Debug)]
63pub enum TestStore {
64    Capture(Box<Capture>),
65    Get(GetCapturedEnvelope, Sender<Option<CapturedEnvelope>>),
66}
67
68impl relay_system::Interface for TestStore {}
69
70impl FromMessage<Capture> for TestStore {
71    type Response = NoResponse;
72
73    fn from_message(message: Capture, _: ()) -> Self {
74        Self::Capture(Box::new(message))
75    }
76}
77
78impl FromMessage<GetCapturedEnvelope> for TestStore {
79    type Response = AsyncResponse<Option<CapturedEnvelope>>;
80
81    fn from_message(
82        message: GetCapturedEnvelope,
83        sender: Sender<Option<CapturedEnvelope>>,
84    ) -> Self {
85        Self::Get(message, sender)
86    }
87}
88
89/// Service implementing the [`TestStore`] interface.
90pub struct TestStoreService {
91    config: Arc<Config>,
92    captures: BTreeMap<EventId, CapturedEnvelope>,
93}
94
95impl TestStoreService {
96    pub fn new(config: Arc<Config>) -> Self {
97        Self {
98            config,
99            captures: BTreeMap::new(),
100        }
101    }
102
103    fn capture(&mut self, msg: Capture) {
104        if let RelayMode::Capture = self.config.relay_mode() {
105            match (msg.event_id, msg.capture) {
106                (Some(event_id), Ok(envelope)) => {
107                    relay_log::debug!("capturing envelope");
108                    self.captures.insert(event_id, Ok(envelope));
109                }
110                (Some(event_id), Err(message)) => {
111                    relay_log::debug!(%event_id, "capturing failed event");
112                    self.captures.insert(event_id, Err(message));
113                }
114
115                // XXX: does not work with envelopes without event_id
116                (None, Ok(_)) => relay_log::debug!("dropping non event envelope"),
117                (None, Err(_)) => relay_log::debug!("dropping failed envelope without event"),
118            }
119        }
120    }
121
122    fn get(&self, message: GetCapturedEnvelope) -> Option<CapturedEnvelope> {
123        self.captures.get(&message.event_id).cloned()
124    }
125
126    fn handle_message(&mut self, message: TestStore) {
127        match message {
128            TestStore::Capture(message) => self.capture(*message),
129            TestStore::Get(message, sender) => sender.send(self.get(message)),
130        }
131    }
132}
133
134impl relay_system::Service for TestStoreService {
135    type Interface = TestStore;
136
137    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
138        while let Some(message) = rx.recv().await {
139            self.handle_message(message);
140        }
141    }
142}