relay_server/services/projects/source/
upstream.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::collections::hash_map::Entry;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future;
9use itertools::Itertools;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use relay_dynamic_config::ErrorBoundary;
13use relay_statsd::metric;
14use relay_system::{
15    Addr, BroadcastChannel, BroadcastResponse, BroadcastSender, FromMessage, Interface, Service,
16};
17use serde::{Deserialize, Serialize};
18use tokio::sync::mpsc;
19use tokio::time::Instant;
20
21use crate::services::projects::project::Revision;
22use crate::services::projects::project::{ParsedProjectState, ProjectState};
23use crate::services::projects::source::{FetchProjectState, SourceProjectState};
24use crate::services::upstream::{
25    Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError,
26};
27use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
28use crate::utils::{RetryBackoff, SleepHandle};
29
30/// A query to retrieve a batch of project states from upstream.
31///
32/// This query does not implement `Deserialize`. To parse the query, use a wrapper that skips
33/// invalid project keys instead of failing the entire batch.
34#[derive(Debug, Serialize)]
35#[serde(rename_all = "camelCase")]
36pub struct GetProjectStates {
37    /// List of requested project keys.
38    public_keys: Vec<ProjectKey>,
39    /// List of revisions for each project key.
40    ///
41    /// The revisions are mapped by index to the project key,
42    /// this is a separate field to keep the API compatible.
43    revisions: Vec<Revision>,
44    /// If `true` the upstream should return a full configuration.
45    ///
46    /// Upstreams will ignore this for non-internal Relays.
47    full_config: bool,
48    /// If `true` the upstream should not serve from cache.
49    no_cache: bool,
50}
51
52/// The response of the projects states requests.
53///
54/// A [`ProjectKey`] is either pending or has a result, it can not appear in both and doing
55/// so is undefined.
56#[derive(Debug, Deserialize, Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct GetProjectStatesResponse {
59    /// Map of [`ProjectKey`] to [`ParsedProjectState`] that was fetched from the upstream.
60    #[serde(default)]
61    configs: HashMap<ProjectKey, ErrorBoundary<Option<ParsedProjectState>>>,
62    /// The [`ProjectKey`]'s that couldn't be immediately retrieved from the upstream.
63    #[serde(default)]
64    pending: HashSet<ProjectKey>,
65    /// The [`ProjectKey`]'s that the upstream has no updates for.
66    ///
67    /// List is only populated when the request contains revision information
68    /// for all requested configurations.
69    #[serde(default)]
70    unchanged: HashSet<ProjectKey>,
71}
72
73impl UpstreamQuery for GetProjectStates {
74    type Response = GetProjectStatesResponse;
75
76    fn method(&self) -> Method {
77        Method::POST
78    }
79
80    fn path(&self) -> Cow<'static, str> {
81        Cow::Borrowed("/api/0/relays/projectconfigs/?version=3")
82    }
83
84    fn priority() -> RequestPriority {
85        RequestPriority::High
86    }
87
88    fn retry() -> bool {
89        false
90    }
91
92    fn route(&self) -> &'static str {
93        "project_configs"
94    }
95}
96
97/// The wrapper struct for the incoming external requests which also keeps addition information.
98#[derive(Debug)]
99struct ProjectStateChannel {
100    // Main broadcast channel.
101    channel: BroadcastChannel<SourceProjectState>,
102    // Additional broadcast channels tracked from merge operations.
103    merged: Vec<BroadcastChannel<SourceProjectState>>,
104    revision: Revision,
105    deadline: Instant,
106    no_cache: bool,
107    attempts: u64,
108    /// How often the request failed.
109    errors: usize,
110    /// How often a "pending" response was received for this project state.
111    pending: usize,
112}
113
114impl ProjectStateChannel {
115    pub fn new(
116        sender: BroadcastSender<SourceProjectState>,
117        revision: Revision,
118        timeout: Duration,
119        no_cache: bool,
120    ) -> Self {
121        let now = Instant::now();
122        Self {
123            no_cache,
124            channel: sender.into_channel(),
125            merged: Vec::new(),
126            revision,
127            deadline: now + timeout,
128            attempts: 0,
129            errors: 0,
130            pending: 0,
131        }
132    }
133
134    pub fn no_cache(&mut self) {
135        self.no_cache = true;
136    }
137
138    /// Attaches a new sender to the same channel.
139    ///
140    /// Also makes sure the new sender's revision matches the already requested revision.
141    /// If the new revision is different from the contained revision this clears the revision.
142    /// To not have multiple fetches per revision per batch, we need to find a common denominator
143    /// for requests with different revisions, which is always to fetch the full project config.
144    pub fn attach(&mut self, sender: BroadcastSender<SourceProjectState>, revision: Revision) {
145        self.channel.attach(sender);
146        if self.revision != revision {
147            self.revision = Revision::default();
148        }
149    }
150
151    pub fn send(self, state: SourceProjectState) {
152        for channel in self.merged {
153            channel.send(state.clone());
154        }
155        self.channel.send(state)
156    }
157
158    pub fn expired(&self) -> bool {
159        Instant::now() > self.deadline
160    }
161
162    pub fn merge(&mut self, channel: ProjectStateChannel) {
163        let ProjectStateChannel {
164            channel,
165            merged,
166            revision,
167            deadline,
168            no_cache,
169            attempts,
170            errors,
171            pending,
172        } = channel;
173
174        self.merged.push(channel);
175        self.merged.extend(merged);
176        if self.revision != revision {
177            self.revision = Revision::default();
178        }
179        self.deadline = self.deadline.max(deadline);
180        self.no_cache |= no_cache;
181        self.attempts += attempts;
182        self.errors += errors;
183        self.pending += pending;
184    }
185}
186
187/// The map of project keys with their project state channels.
188type ProjectStateChannels = HashMap<ProjectKey, ProjectStateChannel>;
189
190/// This is the [`UpstreamProjectSourceService`] interface.
191///
192/// The service is responsible for fetching the [`ParsedProjectState`] from the upstream.
193/// Internally it maintains the buffer queue of the incoming requests, which got scheduled to fetch the
194/// state and takes care of the backoff in case there is a problem with the requests.
195#[derive(Debug)]
196pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender<SourceProjectState>);
197
198impl Interface for UpstreamProjectSource {}
199
200impl FromMessage<FetchProjectState> for UpstreamProjectSource {
201    type Response = BroadcastResponse<SourceProjectState>;
202
203    fn from_message(
204        message: FetchProjectState,
205        sender: BroadcastSender<SourceProjectState>,
206    ) -> Self {
207        Self(message, sender)
208    }
209}
210
211/// The batch of the channels which used to fetch the project states.
212struct ChannelsBatch {
213    nocache_channels: Vec<(ProjectKey, ProjectStateChannel)>,
214    cache_channels: Vec<(ProjectKey, ProjectStateChannel)>,
215}
216
217/// Collected Upstream responses, with associated project state channels.
218struct UpstreamResponse {
219    channels_batch: ProjectStateChannels,
220    response: Result<GetProjectStatesResponse, UpstreamRequestError>,
221}
222
223/// The service which handles the fetching of the [`ParsedProjectState`] from upstream.
224#[derive(Debug)]
225pub struct UpstreamProjectSourceService {
226    backoff: RetryBackoff,
227    config: Arc<Config>,
228    upstream_relay: Addr<UpstreamRelay>,
229    state_channels: ProjectStateChannels,
230    inner_tx: mpsc::UnboundedSender<Vec<Option<UpstreamResponse>>>,
231    inner_rx: mpsc::UnboundedReceiver<Vec<Option<UpstreamResponse>>>,
232    fetch_handle: SleepHandle,
233    /// Instant when the last fetch failed, `None` if there aren't any failures.
234    ///
235    /// Relay updates this value to the instant when the first fetch fails, and
236    /// resets it to `None` on successful responses. Relay does nothing during
237    /// long times without requests.
238    last_failed_fetch: Option<Instant>,
239    /// Duration of continued fetch fails before emitting an error.
240    ///
241    /// Relay emits an error if all requests for at least this interval fail.
242    failure_interval: Duration,
243}
244
245impl UpstreamProjectSourceService {
246    /// Creates a new [`UpstreamProjectSourceService`] instance.
247    pub fn new(config: Arc<Config>, upstream_relay: Addr<UpstreamRelay>) -> Self {
248        let (inner_tx, inner_rx) = mpsc::unbounded_channel();
249
250        Self {
251            backoff: RetryBackoff::new(config.http_max_retry_interval()),
252            state_channels: HashMap::new(),
253            fetch_handle: SleepHandle::idle(),
254            upstream_relay,
255            inner_tx,
256            inner_rx,
257            last_failed_fetch: None,
258            failure_interval: config.http_project_failure_interval(),
259            config,
260        }
261    }
262
263    /// Returns the backoff timeout for a batched upstream query.
264    ///
265    /// If previous queries succeeded, this will be the general batch interval. Additionally, an
266    /// exponentially increasing backoff is used for retrying the upstream request.
267    fn next_backoff(&mut self) -> Duration {
268        self.config.query_batch_interval() + self.backoff.next_backoff()
269    }
270
271    /// Prepares the batches of the cache and nocache channels which could be used to request the
272    /// project states.
273    fn prepare_batches(&mut self) -> ChannelsBatch {
274        let batch_size = self.config.query_batch_size();
275        let num_batches = self.config.max_concurrent_queries();
276
277        // Pop N items from state_channels. Intuitively, we would use
278        // `state_channels.drain().take(n)`, but that clears the entire hashmap regardless how
279        // much of the iterator is consumed.
280        //
281        // Instead, we have to collect the keys we want into a separate vector and pop them
282        // one-by-one.
283        let projects: Vec<_> = (self.state_channels.keys().copied())
284            .take(batch_size * num_batches)
285            .collect();
286
287        let fresh_channels = (projects.iter())
288            .filter_map(|id| Some((*id, self.state_channels.remove(id)?)))
289            .filter(|(id, channel)| {
290                if channel.expired() {
291                    metric!(
292                        histogram(RelayHistograms::ProjectStateAttempts) = channel.attempts,
293                        result = "timeout",
294                    );
295                    metric!(
296                        counter(RelayCounters::ProjectUpstreamCompleted) += 1,
297                        result = "timeout",
298                    );
299                    relay_log::error!(
300                        errors = channel.errors,
301                        pending = channel.pending,
302                        tags.did_error = channel.errors > 0,
303                        tags.was_pending = channel.pending > 0,
304                        tags.project_key = id.to_string(),
305                        "error fetching project state {id}: deadline exceeded",
306                    );
307                }
308                !channel.expired()
309            });
310
311        // Separate regular channels from those with the `nocache` flag. The latter go in separate
312        // requests, since the upstream will block the response.
313        let (nocache_channels, cache_channels): (Vec<_>, Vec<_>) =
314            fresh_channels.partition(|(_id, channel)| channel.no_cache);
315
316        let total_count = cache_channels.len() + nocache_channels.len();
317
318        metric!(histogram(RelayHistograms::ProjectStatePending) = self.state_channels.len() as u64);
319
320        relay_log::debug!(
321            "updating project states for {}/{} projects (attempt {})",
322            total_count,
323            total_count + self.state_channels.len(),
324            self.backoff.attempt(),
325        );
326
327        ChannelsBatch {
328            nocache_channels,
329            cache_channels,
330        }
331    }
332
333    /// Merges a [`ProjectStateChannel`] into the existing list of tracked channels.
334    ///
335    /// A channel is removed when querying the upstream for the project,
336    /// when the upstream returns pending for this project it needs to be returned to
337    /// the list of channels. If there is already another request for the same project
338    /// outstanding those two requests must be merged.
339    fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) {
340        match self.state_channels.entry(key) {
341            Entry::Vacant(e) => {
342                e.insert(channel);
343            }
344            Entry::Occupied(mut e) => {
345                e.get_mut().merge(channel);
346            }
347        }
348    }
349
350    /// Executes an upstream request to fetch project configs.
351    ///
352    /// This assumes that currently no request is running. If the upstream request fails or new
353    /// channels are pushed in the meanwhile, this will reschedule automatically.
354    async fn fetch_states(
355        config: Arc<Config>,
356        upstream_relay: Addr<UpstreamRelay>,
357        channels: ChannelsBatch,
358    ) -> Vec<Option<UpstreamResponse>> {
359        let request_start = Instant::now();
360        let batch_size = config.query_batch_size();
361        let cache_batches = channels.cache_channels.into_iter().chunks(batch_size);
362        let nocache_batches = channels.nocache_channels.into_iter().chunks(batch_size);
363
364        let mut requests = vec![];
365        // The `nocache_batches.into_iter()` still must be called here, since compiler produces the
366        // error: `that nocache_batches is not an iterator`.
367        // Since `IntoChunks` is not an iterator itself but only implements `IntoIterator` trait.
368        #[allow(clippy::useless_conversion)]
369        for channels_batch in cache_batches.into_iter().chain(nocache_batches.into_iter()) {
370            let mut channels_batch: ProjectStateChannels = channels_batch.collect();
371            for channel in channels_batch.values_mut() {
372                channel.attempts += 1;
373            }
374            relay_log::debug!("sending request of size {}", channels_batch.len());
375            metric!(
376                histogram(RelayHistograms::ProjectStateRequestBatchSize) =
377                    channels_batch.len() as u64
378            );
379
380            let query = GetProjectStates {
381                public_keys: channels_batch.keys().copied().collect(),
382                revisions: channels_batch
383                    .values()
384                    .map(|c| c.revision.clone())
385                    .collect(),
386                full_config: config.processing_enabled() || config.request_full_project_config(),
387                no_cache: channels_batch.values().any(|c| c.no_cache),
388            };
389
390            // count number of http requests for project states
391            metric!(counter(RelayCounters::ProjectStateRequest) += 1);
392
393            let upstream_relay = upstream_relay.clone();
394            requests.push(async move {
395                match upstream_relay.send(SendQuery(query)).await {
396                    Ok(response) => Some(UpstreamResponse {
397                        channels_batch,
398                        response,
399                    }),
400                    // If sending of the request to upstream fails:
401                    // - drop the current batch of the channels
402                    // - report the error, since this is the case we should not have in proper
403                    //   workflow
404                    // - return `None` to signal that we do not have any response from the Upstream
405                    //   and we should ignore this.
406                    Err(_err) => {
407                        relay_log::error!("failed to send the request to upstream: channel full");
408                        None
409                    }
410                }
411            });
412        }
413
414        // Wait on results of all fanouts, and return the resolved responses.
415        let responses = future::join_all(requests).await;
416        metric!(timer(RelayTimers::ProjectStateRequestDuration) = request_start.elapsed());
417        responses
418    }
419
420    /// Schedules the next trigger for fetching the project states.
421    ///
422    /// The next trigger will be scheduled only if the current handle is idle.
423    fn schedule_fetch(&mut self) {
424        if self.fetch_handle.is_idle() {
425            let wait = self.next_backoff();
426            self.fetch_handle.set(wait);
427        }
428    }
429
430    /// Handles the responses from the upstream.
431    fn handle_responses(&mut self, responses: Vec<Option<UpstreamResponse>>) {
432        // Iterate only over the returned responses.
433        for response in responses.into_iter().flatten() {
434            let UpstreamResponse {
435                channels_batch,
436                response,
437            } = response;
438
439            match response {
440                Ok(mut response) => {
441                    // If a single request succeeded we reset the backoff. We decided to
442                    // only backoff if we see that the project config endpoint is
443                    // completely down and did not answer a single request successfully.
444                    //
445                    // Otherwise we might refuse to fetch any project configs because of a
446                    // single, reproducible 500 we observed for a particular project.
447                    self.backoff.reset();
448                    self.last_failed_fetch = None;
449
450                    // Count number of project states returned (via http requests).
451                    metric!(
452                        histogram(RelayHistograms::ProjectStateReceived) =
453                            response.configs.len() as u64
454                    );
455                    for (key, mut channel) in channels_batch {
456                        if response.pending.contains(&key) {
457                            channel.pending += 1;
458                            self.merge_channel(key, channel);
459                            continue;
460                        }
461
462                        let mut result = "ok";
463                        let state = if response.unchanged.contains(&key) {
464                            result = "ok_unchanged";
465                            SourceProjectState::NotModified
466                        } else {
467                            let state = response
468                                .configs
469                                .remove(&key)
470                                .unwrap_or(ErrorBoundary::Ok(None));
471
472                            let state = match state {
473                                ErrorBoundary::Err(error) => {
474                                    result = "invalid";
475                                    let error = &error as &dyn std::error::Error;
476                                    relay_log::error!(error, "error fetching project state {key}");
477                                    ProjectState::Pending
478                                }
479                                ErrorBoundary::Ok(None) => ProjectState::Disabled,
480                                ErrorBoundary::Ok(Some(state)) => state.into(),
481                            };
482
483                            SourceProjectState::New(state)
484                        };
485
486                        metric!(
487                            histogram(RelayHistograms::ProjectStateAttempts) = channel.attempts,
488                            result = result,
489                        );
490                        metric!(
491                            counter(RelayCounters::ProjectUpstreamCompleted) += 1,
492                            result = result,
493                        );
494
495                        channel.send(state);
496                    }
497                }
498                Err(err) => {
499                    self.track_failed_response();
500
501                    let attempts = channels_batch
502                        .values()
503                        .map(|b| b.attempts)
504                        .max()
505                        .unwrap_or(0);
506                    // Only log an error if the request failed more than once.
507                    // We are not interested in single failures. Our retry mechanism is able to
508                    // handle those.
509                    if attempts >= 2 {
510                        relay_log::error!(
511                            error = &err as &dyn std::error::Error,
512                            attempts = attempts,
513                            "error fetching project states",
514                        );
515                    }
516
517                    metric!(
518                        histogram(RelayHistograms::ProjectStatePending) =
519                            self.state_channels.len() as u64
520                    );
521                    // Put the channels back into the queue, we will retry again shortly.
522                    self.state_channels.extend(channels_batch.into_iter().map(
523                        |(key, mut channel)| {
524                            channel.errors += 1;
525                            (key, channel)
526                        },
527                    ))
528                }
529            }
530        }
531
532        if !self.state_channels.is_empty() {
533            self.schedule_fetch()
534        } else {
535            // No open channels left, if this is because we fetched everything we
536            // have already reset the backoff. If however, this is because we had
537            // failures but the channels have been cleaned up because the requests
538            // expired we need to reset the backoff so that the next request is not
539            // simply ignored (by handle) and does a schedule_fetch().
540            // Explanation 2: We use the backoff member for two purposes:
541            //  - 1 to schedule repeated fetch requests (at less and less frequent intervals)
542            //  - 2 as a flag to know if a fetch is already scheduled.
543            // Resetting it in here signals that we don't have a backoff scheduled (either
544            // because everything went fine or because all the requests have expired).
545            // Next time a user wants a project it should schedule fetch requests.
546            self.backoff.reset();
547        }
548    }
549
550    /// Tracks the last failed fetch, and emits an error if it exceeds the failure interval.
551    fn track_failed_response(&mut self) {
552        match self.last_failed_fetch {
553            None => self.last_failed_fetch = Some(Instant::now()),
554            Some(last_failed) => {
555                let failure_duration = last_failed.elapsed();
556                if failure_duration >= self.failure_interval {
557                    relay_log::error!(
558                        failure_duration = format!("{} seconds", failure_duration.as_secs()),
559                        backoff_attempts = self.backoff.attempt(),
560                        "can't fetch project states"
561                    );
562                }
563            }
564        }
565        metric!(counter(RelayCounters::ProjectUpstreamFailed) += 1);
566    }
567
568    /// Creates the async task to fetch the project states.
569    fn do_fetch(&mut self) {
570        self.fetch_handle.reset();
571
572        if self.state_channels.is_empty() {
573            relay_log::error!("project state schedule fetch request without projects");
574            return;
575        }
576
577        let config = self.config.clone();
578        let inner_tx = self.inner_tx.clone();
579        let channels = self.prepare_batches();
580        let upstream_relay = self.upstream_relay.clone();
581
582        relay_system::spawn!(async move {
583            let responses = Self::fetch_states(config, upstream_relay, channels).await;
584            // Send back all resolved responses and also unused channels.
585            // These responses will be handled by `handle_responses` function.
586            if inner_tx.send(responses).is_err() {
587                relay_log::error!("unable to forward the requests to further processing");
588            }
589        });
590    }
591
592    /// Handles the incoming external messages.
593    fn handle_message(&mut self, message: UpstreamProjectSource) {
594        let UpstreamProjectSource(
595            FetchProjectState {
596                project_key,
597                current_revision,
598                no_cache,
599            },
600            sender,
601        ) = message;
602
603        let query_timeout = self.config.query_timeout();
604
605        // If there is already channel for the requested project key, we attach to it,
606        // otherwise create a new one.
607        match self.state_channels.entry(project_key) {
608            Entry::Vacant(entry) => {
609                entry.insert(ProjectStateChannel::new(
610                    sender,
611                    current_revision,
612                    query_timeout,
613                    no_cache,
614                ));
615            }
616            Entry::Occupied(mut entry) => {
617                let channel = entry.get_mut();
618                channel.attach(sender, current_revision);
619                // Ensure upstream skips caches if one of the recipients requests an uncached response. This
620                // operation is additive across requests.
621                if no_cache {
622                    channel.no_cache();
623                }
624            }
625        }
626
627        // Schedule the fetch if there is nothing running at this moment.
628        if !self.backoff.started() {
629            self.backoff.reset();
630            self.schedule_fetch();
631        }
632    }
633}
634
635impl Service for UpstreamProjectSourceService {
636    type Interface = UpstreamProjectSource;
637
638    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
639        relay_log::info!("project upstream cache started");
640        loop {
641            tokio::select! {
642                biased;
643
644                () = &mut self.fetch_handle => self.do_fetch(),
645                Some(responses) = self.inner_rx.recv() => self.handle_responses(responses),
646                Some(message) = rx.recv() => self.handle_message(message),
647
648                else => break,
649            }
650        }
651        relay_log::info!("project upstream cache stopped");
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use crate::http::Response;
658    use futures::future::poll_immediate;
659
660    use super::*;
661
662    fn to_response(body: &impl serde::Serialize) -> Response {
663        let body = serde_json::to_vec(body).unwrap();
664        let response = http::response::Response::builder()
665            .status(http::StatusCode::OK)
666            .header(http::header::CONTENT_LENGTH, body.len())
667            .body(body)
668            .unwrap();
669
670        Response(response.into())
671    }
672
673    #[tokio::test]
674    async fn test_schedule_merge_channels() {
675        let (upstream_addr, mut upstream_rx) = Addr::custom();
676        let config = Arc::new(Config::from_json_value(serde_json::json!({})).unwrap());
677        let project_key = ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap();
678
679        macro_rules! next_send_request {
680            () => {{
681                let UpstreamRelay::SendRequest(mut req) = upstream_rx.recv().await.unwrap() else {
682                    panic!()
683                };
684                req.configure(&config);
685                req
686            }};
687        }
688
689        let service =
690            UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start_detached();
691
692        let mut response1 = service.send(FetchProjectState {
693            project_key,
694            current_revision: "123".into(),
695            no_cache: false,
696        });
697
698        // Wait for the upstream request to make sure we're in the pending state.
699        let request1 = next_send_request!();
700
701        // Add another request for the same project, which should be combined into a single
702        // request, after responding to the first inflight request.
703        let mut response2 = service.send(FetchProjectState {
704            project_key,
705            current_revision: Revision::default(),
706            no_cache: false,
707        });
708
709        // Return pending to the service.
710        // Now the two requests should be combined.
711        request1
712            .respond(Ok(to_response(&serde_json::json!({
713                "pending": [project_key],
714            }))))
715            .await;
716
717        // Make sure there is no response yet.
718        assert!(poll_immediate(&mut response1).await.is_none());
719        assert!(poll_immediate(&mut response2).await.is_none());
720
721        // Send a response to the second request which should successfully resolve both responses.
722        next_send_request!()
723            .respond(Ok(to_response(&serde_json::json!({
724                "unchanged": [project_key],
725            }))))
726            .await;
727
728        let (response1, response2) = futures::future::join(response1, response2).await;
729        assert!(matches!(response1, Ok(SourceProjectState::NotModified)));
730        assert!(matches!(response2, Ok(SourceProjectState::NotModified)));
731
732        // No more messages to upstream expected.
733        assert!(upstream_rx.try_recv().is_err());
734    }
735}