relay_server/services/
test_store.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::collections::BTreeMap;
use std::sync::Arc;

use relay_config::{Config, RelayMode};
use relay_event_schema::protocol::EventId;
use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender};

use crate::envelope::Envelope;
use crate::services::outcome::Outcome;
use crate::services::processor::Processed;
use crate::utils::TypedEnvelope;

/// Either a captured envelope or an error that occured during processing.
pub type CapturedEnvelope = Result<Box<Envelope>, String>;

/// Inserts an envelope or failure into internal captures.
///
/// Can be retrieved using [`GetCapturedEnvelope`]. Use [`Capture::should_capture`] to check whether
/// the message should even be sent to reduce the overheads.
#[derive(Debug)]
pub struct Capture {
    event_id: Option<EventId>,
    capture: CapturedEnvelope,
}

impl Capture {
    /// Returns `true` if Relay is in capture mode.
    ///
    /// The `Capture` message can still be sent and and will be ignored. This function is purely for
    /// optimization purposes.
    pub fn should_capture(config: &Config) -> bool {
        matches!(config.relay_mode(), RelayMode::Capture)
    }

    /// Captures an accepted envelope.
    pub fn accepted(mut managed: TypedEnvelope<Processed>) -> Self {
        let envelope = managed.take_envelope();
        managed.accept();

        Self {
            event_id: envelope.event_id(),
            capture: Ok(envelope),
        }
    }

    /// Captures the error that lead to envelope rejection.
    pub fn rejected(event_id: Option<EventId>, outcome: &Outcome) -> Self {
        Self {
            event_id,
            capture: Err(outcome.to_string()),
        }
    }
}

/// Resolves a [`CapturedEnvelope`] by the given `event_id`.
#[derive(Debug)]
pub struct GetCapturedEnvelope {
    pub event_id: EventId,
}

/// Stores and retrieves Envelopes for integration testing.
#[derive(Debug)]
pub enum TestStore {
    Capture(Box<Capture>),
    Get(GetCapturedEnvelope, Sender<Option<CapturedEnvelope>>),
}

impl relay_system::Interface for TestStore {}

impl FromMessage<Capture> for TestStore {
    type Response = NoResponse;

    fn from_message(message: Capture, _: ()) -> Self {
        Self::Capture(Box::new(message))
    }
}

impl FromMessage<GetCapturedEnvelope> for TestStore {
    type Response = AsyncResponse<Option<CapturedEnvelope>>;

    fn from_message(
        message: GetCapturedEnvelope,
        sender: Sender<Option<CapturedEnvelope>>,
    ) -> Self {
        Self::Get(message, sender)
    }
}

/// Service implementing the [`TestStore`] interface.
pub struct TestStoreService {
    config: Arc<Config>,
    captures: BTreeMap<EventId, CapturedEnvelope>,
}

impl TestStoreService {
    pub fn new(config: Arc<Config>) -> Self {
        Self {
            config,
            captures: BTreeMap::new(),
        }
    }

    fn capture(&mut self, msg: Capture) {
        if let RelayMode::Capture = self.config.relay_mode() {
            match (msg.event_id, msg.capture) {
                (Some(event_id), Ok(envelope)) => {
                    relay_log::debug!("capturing envelope");
                    self.captures.insert(event_id, Ok(envelope));
                }
                (Some(event_id), Err(message)) => {
                    relay_log::debug!(%event_id, "capturing failed event");
                    self.captures.insert(event_id, Err(message));
                }

                // XXX: does not work with envelopes without event_id
                (None, Ok(_)) => relay_log::debug!("dropping non event envelope"),
                (None, Err(_)) => relay_log::debug!("dropping failed envelope without event"),
            }
        }
    }

    fn get(&self, message: GetCapturedEnvelope) -> Option<CapturedEnvelope> {
        self.captures.get(&message.event_id).cloned()
    }

    fn handle_message(&mut self, message: TestStore) {
        match message {
            TestStore::Capture(message) => self.capture(*message),
            TestStore::Get(message, sender) => sender.send(self.get(message)),
        }
    }
}

impl relay_system::Service for TestStoreService {
    type Interface = TestStore;

    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
        while let Some(message) = rx.recv().await {
            self.handle_message(message);
        }
    }
}