relay_server/services/projects/source/
upstream.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::collections::hash_map::Entry;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future;
9use itertools::Itertools;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use relay_dynamic_config::ErrorBoundary;
13use relay_statsd::metric;
14use relay_system::{
15    Addr, BroadcastChannel, BroadcastResponse, BroadcastSender, FromMessage, Interface, Service,
16};
17use serde::{Deserialize, Serialize};
18use tokio::sync::mpsc;
19use tokio::time::Instant;
20
21use crate::services::projects::project::Revision;
22use crate::services::projects::project::{ParsedProjectState, ProjectState};
23use crate::services::projects::source::{FetchProjectState, SourceProjectState};
24use crate::services::upstream::{
25    Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError,
26};
27use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
28use crate::utils::{RetryBackoff, SleepHandle};
29
30/// A query to retrieve a batch of project states from upstream.
31///
32/// This query does not implement `Deserialize`. To parse the query, use a wrapper that skips
33/// invalid project keys instead of failing the entire batch.
34#[derive(Debug, Serialize)]
35#[serde(rename_all = "camelCase")]
36pub struct GetProjectStates {
37    /// List of requested project keys.
38    public_keys: Vec<ProjectKey>,
39    /// List of revisions for each project key.
40    ///
41    /// The revisions are mapped by index to the project key,
42    /// this is a separate field to keep the API compatible.
43    revisions: Vec<Revision>,
44    /// If `true` the upstream should return a full configuration.
45    ///
46    /// Upstreams will ignore this for non-internal Relays.
47    full_config: bool,
48    /// If `true` the upstream should not serve from cache.
49    no_cache: bool,
50}
51
52/// The response of the projects states requests.
53///
54/// A [`ProjectKey`] is either pending or has a result, it can not appear in both and doing
55/// so is undefined.
56#[derive(Debug, Deserialize, Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct GetProjectStatesResponse {
59    /// Map of [`ProjectKey`] to [`ParsedProjectState`] that was fetched from the upstream.
60    #[serde(default)]
61    configs: HashMap<ProjectKey, ErrorBoundary<Option<ParsedProjectState>>>,
62    /// The [`ProjectKey`]'s that couldn't be immediately retrieved from the upstream.
63    #[serde(default)]
64    pending: HashSet<ProjectKey>,
65    /// The [`ProjectKey`]'s that the upstream has no updates for.
66    ///
67    /// List is only populated when the request contains revision information
68    /// for all requested configurations.
69    #[serde(default)]
70    unchanged: HashSet<ProjectKey>,
71}
72
73impl UpstreamQuery for GetProjectStates {
74    type Response = GetProjectStatesResponse;
75
76    fn method(&self) -> Method {
77        Method::POST
78    }
79
80    fn path(&self) -> Cow<'static, str> {
81        Cow::Borrowed("/api/0/relays/projectconfigs/?version=3")
82    }
83
84    fn priority() -> RequestPriority {
85        RequestPriority::High
86    }
87
88    fn retry() -> bool {
89        false
90    }
91
92    fn route(&self) -> &'static str {
93        "project_configs"
94    }
95}
96
97/// Error returned to the requester.
98#[derive(Copy, Clone, Debug, thiserror::Error)]
99pub enum Error {
100    /// Fetching the project state exceeded the configured deadline.
101    #[error("deadline exceeded while fetching project state")]
102    DeadlineExceeded,
103}
104
105/// The wrapper struct for the incoming external requests which also keeps addition information.
106#[derive(Debug)]
107struct ProjectStateChannel {
108    // Main broadcast channel.
109    channel: BroadcastChannel<Message>,
110    // Additional broadcast channels tracked from merge operations.
111    merged: Vec<BroadcastChannel<Message>>,
112    revision: Revision,
113    deadline: Instant,
114    no_cache: bool,
115    attempts: u64,
116    /// How often the request failed.
117    errors: usize,
118    /// How often a "pending" response was received for this project state.
119    pending: usize,
120}
121
122impl ProjectStateChannel {
123    pub fn new(
124        sender: BroadcastSender<Message>,
125        revision: Revision,
126        timeout: Duration,
127        no_cache: bool,
128    ) -> Self {
129        let now = Instant::now();
130        Self {
131            no_cache,
132            channel: sender.into_channel(),
133            merged: Vec::new(),
134            revision,
135            deadline: now + timeout,
136            attempts: 0,
137            errors: 0,
138            pending: 0,
139        }
140    }
141
142    pub fn no_cache(&mut self) {
143        self.no_cache = true;
144    }
145
146    /// Attaches a new sender to the same channel.
147    ///
148    /// Also makes sure the new sender's revision matches the already requested revision.
149    /// If the new revision is different from the contained revision this clears the revision.
150    /// To not have multiple fetches per revision per batch, we need to find a common denominator
151    /// for requests with different revisions, which is always to fetch the full project config.
152    pub fn attach(&mut self, sender: BroadcastSender<Message>, revision: Revision) {
153        self.channel.attach(sender);
154        if self.revision != revision {
155            self.revision = Revision::default();
156        }
157    }
158
159    pub fn send(self, state: SourceProjectState) {
160        self.do_send(Ok(state));
161    }
162
163    pub fn error(self, err: Error) {
164        self.do_send(Err(err));
165    }
166
167    pub fn expired(&self, now: Instant) -> bool {
168        now > self.deadline
169    }
170
171    pub fn merge(&mut self, channel: ProjectStateChannel) {
172        let ProjectStateChannel {
173            channel,
174            merged,
175            revision,
176            deadline,
177            no_cache,
178            attempts,
179            errors,
180            pending,
181        } = channel;
182
183        self.merged.push(channel);
184        self.merged.extend(merged);
185        if self.revision != revision {
186            self.revision = Revision::default();
187        }
188        self.deadline = self.deadline.max(deadline);
189        self.no_cache |= no_cache;
190        self.attempts += attempts;
191        self.errors += errors;
192        self.pending += pending;
193    }
194
195    fn do_send(self, message: Message) {
196        for channel in self.merged {
197            channel.send(message.clone());
198        }
199        self.channel.send(message)
200    }
201}
202
203/// The map of project keys with their project state channels.
204type ProjectStateChannels = HashMap<ProjectKey, ProjectStateChannel>;
205
206/// The message used to communicate with the requester.
207type Message = Result<SourceProjectState, Error>;
208
209/// This is the [`UpstreamProjectSourceService`] interface.
210///
211/// The service is responsible for fetching the [`ParsedProjectState`] from the upstream.
212/// Internally it maintains the buffer queue of the incoming requests, which got scheduled to fetch the
213/// state and takes care of the backoff in case there is a problem with the requests.
214#[derive(Debug)]
215pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender<Message>);
216
217impl Interface for UpstreamProjectSource {}
218
219impl FromMessage<FetchProjectState> for UpstreamProjectSource {
220    type Response = BroadcastResponse<Message>;
221
222    fn from_message(message: FetchProjectState, sender: BroadcastSender<Message>) -> Self {
223        Self(message, sender)
224    }
225}
226
227/// The batch of the channels which used to fetch the project states.
228struct ChannelsBatch {
229    nocache_channels: Vec<(ProjectKey, ProjectStateChannel)>,
230    cache_channels: Vec<(ProjectKey, ProjectStateChannel)>,
231}
232
233/// Collected Upstream responses, with associated project state channels.
234struct UpstreamResponse {
235    channels_batch: ProjectStateChannels,
236    response: Result<GetProjectStatesResponse, UpstreamRequestError>,
237}
238
239/// The service which handles the fetching of the [`ParsedProjectState`] from upstream.
240#[derive(Debug)]
241pub struct UpstreamProjectSourceService {
242    backoff: RetryBackoff,
243    config: Arc<Config>,
244    upstream_relay: Addr<UpstreamRelay>,
245    state_channels: ProjectStateChannels,
246    inner_tx: mpsc::UnboundedSender<Vec<Option<UpstreamResponse>>>,
247    inner_rx: mpsc::UnboundedReceiver<Vec<Option<UpstreamResponse>>>,
248    fetch_handle: SleepHandle,
249    /// Instant when the last fetch failed, `None` if there aren't any failures.
250    ///
251    /// Relay updates this value to the instant when the first fetch fails, and
252    /// resets it to `None` on successful responses. Relay does nothing during
253    /// long times without requests.
254    last_failed_fetch: Option<Instant>,
255    /// Duration of continued fetch fails before emitting an error.
256    ///
257    /// Relay emits an error if all requests for at least this interval fail.
258    failure_interval: Duration,
259}
260
261impl UpstreamProjectSourceService {
262    /// Creates a new [`UpstreamProjectSourceService`] instance.
263    pub fn new(config: Arc<Config>, upstream_relay: Addr<UpstreamRelay>) -> Self {
264        let (inner_tx, inner_rx) = mpsc::unbounded_channel();
265
266        Self {
267            backoff: RetryBackoff::new(config.http_max_retry_interval()),
268            state_channels: HashMap::new(),
269            fetch_handle: SleepHandle::idle(),
270            upstream_relay,
271            inner_tx,
272            inner_rx,
273            last_failed_fetch: None,
274            failure_interval: config.http_project_failure_interval(),
275            config,
276        }
277    }
278
279    /// Returns the backoff timeout for a batched upstream query.
280    ///
281    /// If previous queries succeeded, this will be the general batch interval. Additionally, an
282    /// exponentially increasing backoff is used for retrying the upstream request.
283    fn next_backoff(&mut self) -> Duration {
284        self.config.query_batch_interval() + self.backoff.next_backoff()
285    }
286
287    /// Prepares the batches of the cache and nocache channels which could be used to request the
288    /// project states.
289    fn prepare_batches(&mut self) -> ChannelsBatch {
290        let now = Instant::now();
291        let batch_size = self.config.query_batch_size();
292        let num_batches = self.config.max_concurrent_queries();
293
294        // Pop N items from state_channels. Intuitively, we would use
295        // `state_channels.drain().take(n)`, but that clears the entire hashmap regardless how
296        // much of the iterator is consumed.
297        //
298        // Instead, we have to collect the keys we want into a separate vector and pop them
299        // one-by-one.
300        let projects: Vec<_> = (self.state_channels.keys().copied())
301            .take(batch_size * num_batches)
302            .collect();
303
304        let fresh_channels = (projects.iter())
305            .filter_map(|id| Some((*id, self.state_channels.remove(id)?)))
306            .filter_map(|(id, channel)| {
307                if !channel.expired(now) {
308                    return Some((id, channel));
309                }
310
311                // Channel is expired, emit telemetry and notify the other end.
312                metric!(
313                    distribution(RelayDistributions::ProjectStateAttempts) = channel.attempts,
314                    result = "timeout",
315                );
316                metric!(
317                    counter(RelayCounters::ProjectUpstreamCompleted) += 1,
318                    result = "timeout",
319                );
320                relay_log::warn!(
321                    errors = channel.errors,
322                    pending = channel.pending,
323                    tags.did_error = channel.errors > 0,
324                    tags.was_pending = channel.pending > 0,
325                    tags.project_key = %id,
326                    "error fetching project state {id}: deadline exceeded",
327                );
328                channel.error(Error::DeadlineExceeded);
329
330                None
331            });
332
333        // Separate regular channels from those with the `nocache` flag. The latter go in separate
334        // requests, since the upstream will block the response.
335        let (nocache_channels, cache_channels): (Vec<_>, Vec<_>) =
336            fresh_channels.partition(|(_id, channel)| channel.no_cache);
337
338        let total_count = cache_channels.len() + nocache_channels.len();
339
340        metric!(
341            distribution(RelayDistributions::ProjectStatePending) =
342                self.state_channels.len() as u64
343        );
344
345        relay_log::debug!(
346            "updating project states for {}/{} projects (attempt {})",
347            total_count,
348            total_count + self.state_channels.len(),
349            self.backoff.attempt(),
350        );
351
352        ChannelsBatch {
353            nocache_channels,
354            cache_channels,
355        }
356    }
357
358    /// Merges a [`ProjectStateChannel`] into the existing list of tracked channels.
359    ///
360    /// A channel is removed when querying the upstream for the project,
361    /// when the upstream returns pending for this project it needs to be returned to
362    /// the list of channels. If there is already another request for the same project
363    /// outstanding those two requests must be merged.
364    fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) {
365        match self.state_channels.entry(key) {
366            Entry::Vacant(e) => {
367                e.insert(channel);
368            }
369            Entry::Occupied(mut e) => {
370                e.get_mut().merge(channel);
371            }
372        }
373    }
374
375    /// Executes an upstream request to fetch project configs.
376    ///
377    /// This assumes that currently no request is running. If the upstream request fails or new
378    /// channels are pushed in the meanwhile, this will reschedule automatically.
379    async fn fetch_states(
380        config: Arc<Config>,
381        upstream_relay: Addr<UpstreamRelay>,
382        channels: ChannelsBatch,
383    ) -> Vec<Option<UpstreamResponse>> {
384        let request_start = Instant::now();
385        let batch_size = config.query_batch_size();
386        let cache_batches = channels.cache_channels.into_iter().chunks(batch_size);
387        let nocache_batches = channels.nocache_channels.into_iter().chunks(batch_size);
388
389        let mut requests = vec![];
390        // The `nocache_batches.into_iter()` still must be called here, since compiler produces the
391        // error: `that nocache_batches is not an iterator`.
392        // Since `IntoChunks` is not an iterator itself but only implements `IntoIterator` trait.
393        #[allow(clippy::useless_conversion)]
394        for channels_batch in cache_batches.into_iter().chain(nocache_batches.into_iter()) {
395            let mut channels_batch: ProjectStateChannels = channels_batch.collect();
396            for channel in channels_batch.values_mut() {
397                channel.attempts += 1;
398            }
399            relay_log::debug!("sending request of size {}", channels_batch.len());
400            metric!(
401                distribution(RelayDistributions::ProjectStateRequestBatchSize) =
402                    channels_batch.len() as u64
403            );
404
405            let query = GetProjectStates {
406                public_keys: channels_batch.keys().copied().collect(),
407                revisions: channels_batch
408                    .values()
409                    .map(|c| c.revision.clone())
410                    .collect(),
411                full_config: config.processing_enabled() || config.request_full_project_config(),
412                no_cache: channels_batch.values().any(|c| c.no_cache),
413            };
414
415            // count number of http requests for project states
416            metric!(counter(RelayCounters::ProjectStateRequest) += 1);
417
418            let upstream_relay = upstream_relay.clone();
419            requests.push(async move {
420                match upstream_relay.send(SendQuery(query)).await {
421                    Ok(response) => Some(UpstreamResponse {
422                        channels_batch,
423                        response,
424                    }),
425                    // If sending of the request to upstream fails:
426                    // - drop the current batch of the channels
427                    // - report the error, since this is the case we should not have in proper
428                    //   workflow
429                    // - return `None` to signal that we do not have any response from the Upstream
430                    //   and we should ignore this.
431                    Err(_err) => {
432                        relay_log::error!("failed to send the request to upstream: channel full");
433                        None
434                    }
435                }
436            });
437        }
438
439        // Wait on results of all fanouts, and return the resolved responses.
440        let responses = future::join_all(requests).await;
441        metric!(timer(RelayTimers::ProjectStateRequestDuration) = request_start.elapsed());
442        responses
443    }
444
445    /// Schedules the next trigger for fetching the project states.
446    ///
447    /// The next trigger will be scheduled only if the current handle is idle.
448    fn schedule_fetch(&mut self) {
449        if self.fetch_handle.is_idle() {
450            let wait = self.next_backoff();
451            self.fetch_handle.set(wait);
452        }
453    }
454
455    /// Handles the responses from the upstream.
456    fn handle_responses(&mut self, responses: Vec<Option<UpstreamResponse>>) {
457        // Iterate only over the returned responses.
458        for response in responses.into_iter().flatten() {
459            let UpstreamResponse {
460                channels_batch,
461                response,
462            } = response;
463
464            match response {
465                Ok(mut response) => {
466                    // If a single request succeeded we reset the backoff. We decided to
467                    // only backoff if we see that the project config endpoint is
468                    // completely down and did not answer a single request successfully.
469                    //
470                    // Otherwise we might refuse to fetch any project configs because of a
471                    // single, reproducible 500 we observed for a particular project.
472                    self.backoff.reset();
473                    self.last_failed_fetch = None;
474
475                    // Count number of project states returned (via http requests).
476                    metric!(
477                        distribution(RelayDistributions::ProjectStateReceived) =
478                            response.configs.len() as u64
479                    );
480                    for (key, mut channel) in channels_batch {
481                        if response.pending.contains(&key) {
482                            channel.pending += 1;
483                            self.merge_channel(key, channel);
484                            continue;
485                        }
486
487                        let mut result = "ok";
488                        let state = if response.unchanged.contains(&key) {
489                            result = "ok_unchanged";
490                            SourceProjectState::NotModified
491                        } else {
492                            let state = response
493                                .configs
494                                .remove(&key)
495                                .unwrap_or(ErrorBoundary::Ok(None));
496
497                            let state = match state {
498                                ErrorBoundary::Err(error) => {
499                                    result = "invalid";
500                                    let error = &error as &dyn std::error::Error;
501                                    relay_log::error!(error, "error fetching project state {key}");
502                                    ProjectState::Pending
503                                }
504                                ErrorBoundary::Ok(None) => ProjectState::Disabled,
505                                ErrorBoundary::Ok(Some(state)) => state.into(),
506                            };
507
508                            SourceProjectState::New(state)
509                        };
510
511                        metric!(
512                            distribution(RelayDistributions::ProjectStateAttempts) =
513                                channel.attempts,
514                            result = result,
515                        );
516                        metric!(
517                            counter(RelayCounters::ProjectUpstreamCompleted) += 1,
518                            result = result,
519                        );
520
521                        channel.send(state);
522                    }
523                }
524                Err(err) => {
525                    self.track_failed_response();
526
527                    let attempts = channels_batch
528                        .values()
529                        .map(|b| b.attempts)
530                        .max()
531                        .unwrap_or(0);
532                    // Only log an error if the request failed more than once.
533                    // We are not interested in single failures. Our retry mechanism is able to
534                    // handle those.
535                    if attempts >= 2 {
536                        relay_log::error!(
537                            error = &err as &dyn std::error::Error,
538                            attempts = attempts,
539                            "error fetching project states",
540                        );
541                    }
542
543                    metric!(
544                        distribution(RelayDistributions::ProjectStatePending) =
545                            self.state_channels.len() as u64
546                    );
547                    // Put the channels back into the queue, we will retry again shortly.
548                    self.state_channels.extend(channels_batch.into_iter().map(
549                        |(key, mut channel)| {
550                            channel.errors += 1;
551                            (key, channel)
552                        },
553                    ))
554                }
555            }
556        }
557
558        if !self.state_channels.is_empty() {
559            self.schedule_fetch()
560        } else {
561            // No open channels left, if this is because we fetched everything we
562            // have already reset the backoff. If however, this is because we had
563            // failures but the channels have been cleaned up because the requests
564            // expired we need to reset the backoff so that the next request is not
565            // simply ignored (by handle) and does a schedule_fetch().
566            // Explanation 2: We use the backoff member for two purposes:
567            //  - 1 to schedule repeated fetch requests (at less and less frequent intervals)
568            //  - 2 as a flag to know if a fetch is already scheduled.
569            // Resetting it in here signals that we don't have a backoff scheduled (either
570            // because everything went fine or because all the requests have expired).
571            // Next time a user wants a project it should schedule fetch requests.
572            self.backoff.reset();
573        }
574    }
575
576    /// Tracks the last failed fetch, and emits an error if it exceeds the failure interval.
577    fn track_failed_response(&mut self) {
578        match self.last_failed_fetch {
579            None => self.last_failed_fetch = Some(Instant::now()),
580            Some(last_failed) => {
581                let failure_duration = last_failed.elapsed();
582                if failure_duration >= self.failure_interval {
583                    relay_log::error!(
584                        failure_duration = format!("{} seconds", failure_duration.as_secs()),
585                        backoff_attempts = self.backoff.attempt(),
586                        "can't fetch project states"
587                    );
588                }
589            }
590        }
591        metric!(counter(RelayCounters::ProjectUpstreamFailed) += 1);
592    }
593
594    /// Creates the async task to fetch the project states.
595    fn do_fetch(&mut self) {
596        self.fetch_handle.reset();
597
598        if self.state_channels.is_empty() {
599            relay_log::error!("project state schedule fetch request without projects");
600            return;
601        }
602
603        let config = self.config.clone();
604        let inner_tx = self.inner_tx.clone();
605        let channels = self.prepare_batches();
606        let upstream_relay = self.upstream_relay.clone();
607
608        relay_system::spawn!(async move {
609            let responses = Self::fetch_states(config, upstream_relay, channels).await;
610            // Send back all resolved responses and also unused channels.
611            // These responses will be handled by `handle_responses` function.
612            if inner_tx.send(responses).is_err() {
613                relay_log::error!("unable to forward the requests to further processing");
614            }
615        });
616    }
617
618    /// Handles the incoming external messages.
619    fn handle_message(&mut self, message: UpstreamProjectSource) {
620        let UpstreamProjectSource(
621            FetchProjectState {
622                project_key,
623                current_revision,
624                no_cache,
625            },
626            sender,
627        ) = message;
628
629        let query_timeout = self.config.query_timeout();
630
631        // If there is already channel for the requested project key, we attach to it,
632        // otherwise create a new one.
633        match self.state_channels.entry(project_key) {
634            Entry::Vacant(entry) => {
635                entry.insert(ProjectStateChannel::new(
636                    sender,
637                    current_revision,
638                    query_timeout,
639                    no_cache,
640                ));
641            }
642            Entry::Occupied(mut entry) => {
643                let channel = entry.get_mut();
644                channel.attach(sender, current_revision);
645                // Ensure upstream skips caches if one of the recipients requests an uncached response. This
646                // operation is additive across requests.
647                if no_cache {
648                    channel.no_cache();
649                }
650            }
651        }
652
653        // Schedule the fetch if there is nothing running at this moment.
654        if !self.backoff.started() {
655            self.backoff.reset();
656            self.schedule_fetch();
657        }
658    }
659}
660
661impl Service for UpstreamProjectSourceService {
662    type Interface = UpstreamProjectSource;
663
664    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
665        relay_log::info!("project upstream cache started");
666        loop {
667            tokio::select! {
668                biased;
669
670                () = &mut self.fetch_handle => self.do_fetch(),
671                Some(responses) = self.inner_rx.recv() => self.handle_responses(responses),
672                Some(message) = rx.recv() => self.handle_message(message),
673
674                else => break,
675            }
676        }
677        relay_log::info!("project upstream cache stopped");
678    }
679}
680
681#[cfg(test)]
682mod tests {
683    use crate::http::Response;
684    use futures::future::poll_immediate;
685
686    use super::*;
687
688    fn to_response(body: &impl serde::Serialize) -> Response {
689        let body = serde_json::to_vec(body).unwrap();
690        let response = http::response::Response::builder()
691            .status(http::StatusCode::OK)
692            .header(http::header::CONTENT_LENGTH, body.len())
693            .body(body)
694            .unwrap();
695
696        Response(response.into())
697    }
698
699    #[tokio::test]
700    async fn test_schedule_merge_channels() {
701        let (upstream_addr, mut upstream_rx) = Addr::custom();
702        let config = Arc::new(Config::from_json_value(serde_json::json!({})).unwrap());
703        let project_key = ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap();
704
705        macro_rules! next_send_request {
706            () => {{
707                let UpstreamRelay::SendRequest(mut req) = upstream_rx.recv().await.unwrap() else {
708                    panic!()
709                };
710                req.configure(&config);
711                req
712            }};
713        }
714
715        let service =
716            UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start_detached();
717
718        let mut response1 = service.send(FetchProjectState {
719            project_key,
720            current_revision: "123".into(),
721            no_cache: false,
722        });
723
724        // Wait for the upstream request to make sure we're in the pending state.
725        let request1 = next_send_request!();
726
727        // Add another request for the same project, which should be combined into a single
728        // request, after responding to the first inflight request.
729        let mut response2 = service.send(FetchProjectState {
730            project_key,
731            current_revision: Revision::default(),
732            no_cache: false,
733        });
734
735        // Return pending to the service.
736        // Now the two requests should be combined.
737        request1
738            .respond(Ok(to_response(&serde_json::json!({
739                "pending": [project_key],
740            }))))
741            .await;
742
743        // Make sure there is no response yet.
744        assert!(poll_immediate(&mut response1).await.is_none());
745        assert!(poll_immediate(&mut response2).await.is_none());
746
747        // Send a response to the second request which should successfully resolve both responses.
748        next_send_request!()
749            .respond(Ok(to_response(&serde_json::json!({
750                "unchanged": [project_key],
751            }))))
752            .await;
753
754        let (response1, response2) = futures::future::join(response1, response2).await;
755        assert!(matches!(response1, Ok(Ok(SourceProjectState::NotModified))));
756        assert!(matches!(response2, Ok(Ok(SourceProjectState::NotModified))));
757
758        // No more messages to upstream expected.
759        assert!(upstream_rx.try_recv().is_err());
760    }
761}