relay_server/services/
test_store.rs1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use relay_config::{Config, RelayMode};
5use relay_event_schema::protocol::EventId;
6use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender};
7
8use crate::envelope::Envelope;
9use crate::services::outcome::Outcome;
10use crate::services::processor::Processed;
11use crate::utils::TypedEnvelope;
12
13pub type CapturedEnvelope = Result<Box<Envelope>, String>;
15
16#[derive(Debug)]
21pub struct Capture {
22 event_id: Option<EventId>,
23 capture: CapturedEnvelope,
24}
25
26impl Capture {
27 pub fn should_capture(config: &Config) -> bool {
32 matches!(config.relay_mode(), RelayMode::Capture)
33 }
34
35 pub fn accepted(mut managed: TypedEnvelope<Processed>) -> Self {
37 let envelope = managed.take_envelope();
38 managed.accept();
39
40 Self {
41 event_id: envelope.event_id(),
42 capture: Ok(envelope),
43 }
44 }
45
46 pub fn rejected(event_id: Option<EventId>, outcome: &Outcome) -> Self {
48 Self {
49 event_id,
50 capture: Err(outcome.to_string()),
51 }
52 }
53}
54
55#[derive(Debug)]
57pub struct GetCapturedEnvelope {
58 pub event_id: EventId,
59}
60
61#[derive(Debug)]
63pub enum TestStore {
64 Capture(Box<Capture>),
65 Get(GetCapturedEnvelope, Sender<Option<CapturedEnvelope>>),
66}
67
68impl relay_system::Interface for TestStore {}
69
70impl FromMessage<Capture> for TestStore {
71 type Response = NoResponse;
72
73 fn from_message(message: Capture, _: ()) -> Self {
74 Self::Capture(Box::new(message))
75 }
76}
77
78impl FromMessage<GetCapturedEnvelope> for TestStore {
79 type Response = AsyncResponse<Option<CapturedEnvelope>>;
80
81 fn from_message(
82 message: GetCapturedEnvelope,
83 sender: Sender<Option<CapturedEnvelope>>,
84 ) -> Self {
85 Self::Get(message, sender)
86 }
87}
88
89pub struct TestStoreService {
91 config: Arc<Config>,
92 captures: BTreeMap<EventId, CapturedEnvelope>,
93}
94
95impl TestStoreService {
96 pub fn new(config: Arc<Config>) -> Self {
97 Self {
98 config,
99 captures: BTreeMap::new(),
100 }
101 }
102
103 fn capture(&mut self, msg: Capture) {
104 if let RelayMode::Capture = self.config.relay_mode() {
105 match (msg.event_id, msg.capture) {
106 (Some(event_id), Ok(envelope)) => {
107 relay_log::debug!("capturing envelope");
108 self.captures.insert(event_id, Ok(envelope));
109 }
110 (Some(event_id), Err(message)) => {
111 relay_log::debug!(%event_id, "capturing failed event");
112 self.captures.insert(event_id, Err(message));
113 }
114
115 (None, Ok(_)) => relay_log::debug!("dropping non event envelope"),
117 (None, Err(_)) => relay_log::debug!("dropping failed envelope without event"),
118 }
119 }
120 }
121
122 fn get(&self, message: GetCapturedEnvelope) -> Option<CapturedEnvelope> {
123 self.captures.get(&message.event_id).cloned()
124 }
125
126 fn handle_message(&mut self, message: TestStore) {
127 match message {
128 TestStore::Capture(message) => self.capture(*message),
129 TestStore::Get(message, sender) => sender.send(self.get(message)),
130 }
131 }
132}
133
134impl relay_system::Service for TestStoreService {
135 type Interface = TestStore;
136
137 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
138 while let Some(message) = rx.recv().await {
139 self.handle_message(message);
140 }
141 }
142}