1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::collections::hash_map::Entry;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future;
9use itertools::Itertools;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use relay_dynamic_config::ErrorBoundary;
13use relay_statsd::metric;
14use relay_system::{
15 Addr, BroadcastChannel, BroadcastResponse, BroadcastSender, FromMessage, Interface, Service,
16};
17use serde::{Deserialize, Serialize};
18use tokio::sync::mpsc;
19use tokio::time::Instant;
20
21use crate::services::projects::project::Revision;
22use crate::services::projects::project::{ParsedProjectState, ProjectState};
23use crate::services::projects::source::{FetchProjectState, SourceProjectState};
24use crate::services::upstream::{
25 Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError,
26};
27use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
28use crate::utils::{RetryBackoff, SleepHandle};
29
30#[derive(Debug, Serialize)]
35#[serde(rename_all = "camelCase")]
36pub struct GetProjectStates {
37 public_keys: Vec<ProjectKey>,
39 revisions: Vec<Revision>,
44 full_config: bool,
48 no_cache: bool,
50}
51
52#[derive(Debug, Deserialize, Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct GetProjectStatesResponse {
59 #[serde(default)]
61 configs: HashMap<ProjectKey, ErrorBoundary<Option<ParsedProjectState>>>,
62 #[serde(default)]
64 pending: HashSet<ProjectKey>,
65 #[serde(default)]
70 unchanged: HashSet<ProjectKey>,
71}
72
73impl UpstreamQuery for GetProjectStates {
74 type Response = GetProjectStatesResponse;
75
76 fn method(&self) -> Method {
77 Method::POST
78 }
79
80 fn path(&self) -> Cow<'static, str> {
81 Cow::Borrowed("/api/0/relays/projectconfigs/?version=3")
82 }
83
84 fn priority() -> RequestPriority {
85 RequestPriority::High
86 }
87
88 fn retry() -> bool {
89 false
90 }
91
92 fn route(&self) -> &'static str {
93 "project_configs"
94 }
95}
96
97#[derive(Copy, Clone, Debug, thiserror::Error)]
99pub enum Error {
100 #[error("deadline exceeded while fetching project state")]
102 DeadlineExceeded,
103}
104
105#[derive(Debug)]
107struct ProjectStateChannel {
108 channel: BroadcastChannel<Message>,
110 merged: Vec<BroadcastChannel<Message>>,
112 revision: Revision,
113 deadline: Instant,
114 no_cache: bool,
115 attempts: u64,
116 errors: usize,
118 pending: usize,
120}
121
122impl ProjectStateChannel {
123 pub fn new(
124 sender: BroadcastSender<Message>,
125 revision: Revision,
126 timeout: Duration,
127 no_cache: bool,
128 ) -> Self {
129 let now = Instant::now();
130 Self {
131 no_cache,
132 channel: sender.into_channel(),
133 merged: Vec::new(),
134 revision,
135 deadline: now + timeout,
136 attempts: 0,
137 errors: 0,
138 pending: 0,
139 }
140 }
141
142 pub fn no_cache(&mut self) {
143 self.no_cache = true;
144 }
145
146 pub fn attach(&mut self, sender: BroadcastSender<Message>, revision: Revision) {
153 self.channel.attach(sender);
154 if self.revision != revision {
155 self.revision = Revision::default();
156 }
157 }
158
159 pub fn send(self, state: SourceProjectState) {
160 self.do_send(Ok(state));
161 }
162
163 pub fn error(self, err: Error) {
164 self.do_send(Err(err));
165 }
166
167 pub fn expired(&self, now: Instant) -> bool {
168 now > self.deadline
169 }
170
171 pub fn merge(&mut self, channel: ProjectStateChannel) {
172 let ProjectStateChannel {
173 channel,
174 merged,
175 revision,
176 deadline,
177 no_cache,
178 attempts,
179 errors,
180 pending,
181 } = channel;
182
183 self.merged.push(channel);
184 self.merged.extend(merged);
185 if self.revision != revision {
186 self.revision = Revision::default();
187 }
188 self.deadline = self.deadline.max(deadline);
189 self.no_cache |= no_cache;
190 self.attempts += attempts;
191 self.errors += errors;
192 self.pending += pending;
193 }
194
195 fn do_send(self, message: Message) {
196 for channel in self.merged {
197 channel.send(message.clone());
198 }
199 self.channel.send(message)
200 }
201}
202
203type ProjectStateChannels = HashMap<ProjectKey, ProjectStateChannel>;
205
206type Message = Result<SourceProjectState, Error>;
208
209#[derive(Debug)]
215pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender<Message>);
216
217impl Interface for UpstreamProjectSource {}
218
219impl FromMessage<FetchProjectState> for UpstreamProjectSource {
220 type Response = BroadcastResponse<Message>;
221
222 fn from_message(message: FetchProjectState, sender: BroadcastSender<Message>) -> Self {
223 Self(message, sender)
224 }
225}
226
227struct ChannelsBatch {
229 nocache_channels: Vec<(ProjectKey, ProjectStateChannel)>,
230 cache_channels: Vec<(ProjectKey, ProjectStateChannel)>,
231}
232
233struct UpstreamResponse {
235 channels_batch: ProjectStateChannels,
236 response: Result<GetProjectStatesResponse, UpstreamRequestError>,
237}
238
239#[derive(Debug)]
241pub struct UpstreamProjectSourceService {
242 backoff: RetryBackoff,
243 config: Arc<Config>,
244 upstream_relay: Addr<UpstreamRelay>,
245 state_channels: ProjectStateChannels,
246 inner_tx: mpsc::UnboundedSender<Vec<Option<UpstreamResponse>>>,
247 inner_rx: mpsc::UnboundedReceiver<Vec<Option<UpstreamResponse>>>,
248 fetch_handle: SleepHandle,
249 last_failed_fetch: Option<Instant>,
255 failure_interval: Duration,
259}
260
261impl UpstreamProjectSourceService {
262 pub fn new(config: Arc<Config>, upstream_relay: Addr<UpstreamRelay>) -> Self {
264 let (inner_tx, inner_rx) = mpsc::unbounded_channel();
265
266 Self {
267 backoff: RetryBackoff::new(config.http_max_retry_interval()),
268 state_channels: HashMap::new(),
269 fetch_handle: SleepHandle::idle(),
270 upstream_relay,
271 inner_tx,
272 inner_rx,
273 last_failed_fetch: None,
274 failure_interval: config.http_project_failure_interval(),
275 config,
276 }
277 }
278
279 fn next_backoff(&mut self) -> Duration {
284 self.config.query_batch_interval() + self.backoff.next_backoff()
285 }
286
287 fn prepare_batches(&mut self) -> ChannelsBatch {
290 let now = Instant::now();
291 let batch_size = self.config.query_batch_size();
292 let num_batches = self.config.max_concurrent_queries();
293
294 let projects: Vec<_> = (self.state_channels.keys().copied())
301 .take(batch_size * num_batches)
302 .collect();
303
304 let fresh_channels = (projects.iter())
305 .filter_map(|id| Some((*id, self.state_channels.remove(id)?)))
306 .filter_map(|(id, channel)| {
307 if !channel.expired(now) {
308 return Some((id, channel));
309 }
310
311 metric!(
313 distribution(RelayDistributions::ProjectStateAttempts) = channel.attempts,
314 result = "timeout",
315 );
316 metric!(
317 counter(RelayCounters::ProjectUpstreamCompleted) += 1,
318 result = "timeout",
319 );
320 relay_log::warn!(
321 errors = channel.errors,
322 pending = channel.pending,
323 tags.did_error = channel.errors > 0,
324 tags.was_pending = channel.pending > 0,
325 tags.project_key = %id,
326 "error fetching project state {id}: deadline exceeded",
327 );
328 channel.error(Error::DeadlineExceeded);
329
330 None
331 });
332
333 let (nocache_channels, cache_channels): (Vec<_>, Vec<_>) =
336 fresh_channels.partition(|(_id, channel)| channel.no_cache);
337
338 let total_count = cache_channels.len() + nocache_channels.len();
339
340 metric!(
341 distribution(RelayDistributions::ProjectStatePending) =
342 self.state_channels.len() as u64
343 );
344
345 relay_log::debug!(
346 "updating project states for {}/{} projects (attempt {})",
347 total_count,
348 total_count + self.state_channels.len(),
349 self.backoff.attempt(),
350 );
351
352 ChannelsBatch {
353 nocache_channels,
354 cache_channels,
355 }
356 }
357
358 fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) {
365 match self.state_channels.entry(key) {
366 Entry::Vacant(e) => {
367 e.insert(channel);
368 }
369 Entry::Occupied(mut e) => {
370 e.get_mut().merge(channel);
371 }
372 }
373 }
374
375 async fn fetch_states(
380 config: Arc<Config>,
381 upstream_relay: Addr<UpstreamRelay>,
382 channels: ChannelsBatch,
383 ) -> Vec<Option<UpstreamResponse>> {
384 let request_start = Instant::now();
385 let batch_size = config.query_batch_size();
386 let cache_batches = channels.cache_channels.into_iter().chunks(batch_size);
387 let nocache_batches = channels.nocache_channels.into_iter().chunks(batch_size);
388
389 let mut requests = vec![];
390 #[allow(clippy::useless_conversion)]
394 for channels_batch in cache_batches.into_iter().chain(nocache_batches.into_iter()) {
395 let mut channels_batch: ProjectStateChannels = channels_batch.collect();
396 for channel in channels_batch.values_mut() {
397 channel.attempts += 1;
398 }
399 relay_log::debug!("sending request of size {}", channels_batch.len());
400 metric!(
401 distribution(RelayDistributions::ProjectStateRequestBatchSize) =
402 channels_batch.len() as u64
403 );
404
405 let query = GetProjectStates {
406 public_keys: channels_batch.keys().copied().collect(),
407 revisions: channels_batch
408 .values()
409 .map(|c| c.revision.clone())
410 .collect(),
411 full_config: config.processing_enabled() || config.request_full_project_config(),
412 no_cache: channels_batch.values().any(|c| c.no_cache),
413 };
414
415 metric!(counter(RelayCounters::ProjectStateRequest) += 1);
417
418 let upstream_relay = upstream_relay.clone();
419 requests.push(async move {
420 match upstream_relay.send(SendQuery(query)).await {
421 Ok(response) => Some(UpstreamResponse {
422 channels_batch,
423 response,
424 }),
425 Err(_err) => {
432 relay_log::error!("failed to send the request to upstream: channel full");
433 None
434 }
435 }
436 });
437 }
438
439 let responses = future::join_all(requests).await;
441 metric!(timer(RelayTimers::ProjectStateRequestDuration) = request_start.elapsed());
442 responses
443 }
444
445 fn schedule_fetch(&mut self) {
449 if self.fetch_handle.is_idle() {
450 let wait = self.next_backoff();
451 self.fetch_handle.set(wait);
452 }
453 }
454
455 fn handle_responses(&mut self, responses: Vec<Option<UpstreamResponse>>) {
457 for response in responses.into_iter().flatten() {
459 let UpstreamResponse {
460 channels_batch,
461 response,
462 } = response;
463
464 match response {
465 Ok(mut response) => {
466 self.backoff.reset();
473 self.last_failed_fetch = None;
474
475 metric!(
477 distribution(RelayDistributions::ProjectStateReceived) =
478 response.configs.len() as u64
479 );
480 for (key, mut channel) in channels_batch {
481 if response.pending.contains(&key) {
482 channel.pending += 1;
483 self.merge_channel(key, channel);
484 continue;
485 }
486
487 let mut result = "ok";
488 let state = if response.unchanged.contains(&key) {
489 result = "ok_unchanged";
490 SourceProjectState::NotModified
491 } else {
492 let state = response
493 .configs
494 .remove(&key)
495 .unwrap_or(ErrorBoundary::Ok(None));
496
497 let state = match state {
498 ErrorBoundary::Err(error) => {
499 result = "invalid";
500 let error = &error as &dyn std::error::Error;
501 relay_log::error!(error, "error fetching project state {key}");
502 ProjectState::Pending
503 }
504 ErrorBoundary::Ok(None) => ProjectState::Disabled,
505 ErrorBoundary::Ok(Some(state)) => state.into(),
506 };
507
508 SourceProjectState::New(state)
509 };
510
511 metric!(
512 distribution(RelayDistributions::ProjectStateAttempts) =
513 channel.attempts,
514 result = result,
515 );
516 metric!(
517 counter(RelayCounters::ProjectUpstreamCompleted) += 1,
518 result = result,
519 );
520
521 channel.send(state);
522 }
523 }
524 Err(err) => {
525 self.track_failed_response();
526
527 let attempts = channels_batch
528 .values()
529 .map(|b| b.attempts)
530 .max()
531 .unwrap_or(0);
532 if attempts >= 2 {
536 relay_log::error!(
537 error = &err as &dyn std::error::Error,
538 attempts = attempts,
539 "error fetching project states",
540 );
541 }
542
543 metric!(
544 distribution(RelayDistributions::ProjectStatePending) =
545 self.state_channels.len() as u64
546 );
547 self.state_channels.extend(channels_batch.into_iter().map(
549 |(key, mut channel)| {
550 channel.errors += 1;
551 (key, channel)
552 },
553 ))
554 }
555 }
556 }
557
558 if !self.state_channels.is_empty() {
559 self.schedule_fetch()
560 } else {
561 self.backoff.reset();
573 }
574 }
575
576 fn track_failed_response(&mut self) {
578 match self.last_failed_fetch {
579 None => self.last_failed_fetch = Some(Instant::now()),
580 Some(last_failed) => {
581 let failure_duration = last_failed.elapsed();
582 if failure_duration >= self.failure_interval {
583 relay_log::error!(
584 failure_duration = format!("{} seconds", failure_duration.as_secs()),
585 backoff_attempts = self.backoff.attempt(),
586 "can't fetch project states"
587 );
588 }
589 }
590 }
591 metric!(counter(RelayCounters::ProjectUpstreamFailed) += 1);
592 }
593
594 fn do_fetch(&mut self) {
596 self.fetch_handle.reset();
597
598 if self.state_channels.is_empty() {
599 relay_log::error!("project state schedule fetch request without projects");
600 return;
601 }
602
603 let config = self.config.clone();
604 let inner_tx = self.inner_tx.clone();
605 let channels = self.prepare_batches();
606 let upstream_relay = self.upstream_relay.clone();
607
608 relay_system::spawn!(async move {
609 let responses = Self::fetch_states(config, upstream_relay, channels).await;
610 if inner_tx.send(responses).is_err() {
613 relay_log::error!("unable to forward the requests to further processing");
614 }
615 });
616 }
617
618 fn handle_message(&mut self, message: UpstreamProjectSource) {
620 let UpstreamProjectSource(
621 FetchProjectState {
622 project_key,
623 current_revision,
624 no_cache,
625 },
626 sender,
627 ) = message;
628
629 let query_timeout = self.config.query_timeout();
630
631 match self.state_channels.entry(project_key) {
634 Entry::Vacant(entry) => {
635 entry.insert(ProjectStateChannel::new(
636 sender,
637 current_revision,
638 query_timeout,
639 no_cache,
640 ));
641 }
642 Entry::Occupied(mut entry) => {
643 let channel = entry.get_mut();
644 channel.attach(sender, current_revision);
645 if no_cache {
648 channel.no_cache();
649 }
650 }
651 }
652
653 if !self.backoff.started() {
655 self.backoff.reset();
656 self.schedule_fetch();
657 }
658 }
659}
660
661impl Service for UpstreamProjectSourceService {
662 type Interface = UpstreamProjectSource;
663
664 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
665 relay_log::info!("project upstream cache started");
666 loop {
667 tokio::select! {
668 biased;
669
670 () = &mut self.fetch_handle => self.do_fetch(),
671 Some(responses) = self.inner_rx.recv() => self.handle_responses(responses),
672 Some(message) = rx.recv() => self.handle_message(message),
673
674 else => break,
675 }
676 }
677 relay_log::info!("project upstream cache stopped");
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use crate::http::Response;
684 use futures::future::poll_immediate;
685
686 use super::*;
687
688 fn to_response(body: &impl serde::Serialize) -> Response {
689 let body = serde_json::to_vec(body).unwrap();
690 let response = http::response::Response::builder()
691 .status(http::StatusCode::OK)
692 .header(http::header::CONTENT_LENGTH, body.len())
693 .body(body)
694 .unwrap();
695
696 Response(response.into())
697 }
698
699 #[tokio::test]
700 async fn test_schedule_merge_channels() {
701 let (upstream_addr, mut upstream_rx) = Addr::custom();
702 let config = Arc::new(Config::from_json_value(serde_json::json!({})).unwrap());
703 let project_key = ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap();
704
705 macro_rules! next_send_request {
706 () => {{
707 let UpstreamRelay::SendRequest(mut req) = upstream_rx.recv().await.unwrap() else {
708 panic!()
709 };
710 req.configure(&config);
711 req
712 }};
713 }
714
715 let service =
716 UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start_detached();
717
718 let mut response1 = service.send(FetchProjectState {
719 project_key,
720 current_revision: "123".into(),
721 no_cache: false,
722 });
723
724 let request1 = next_send_request!();
726
727 let mut response2 = service.send(FetchProjectState {
730 project_key,
731 current_revision: Revision::default(),
732 no_cache: false,
733 });
734
735 request1
738 .respond(Ok(to_response(&serde_json::json!({
739 "pending": [project_key],
740 }))))
741 .await;
742
743 assert!(poll_immediate(&mut response1).await.is_none());
745 assert!(poll_immediate(&mut response2).await.is_none());
746
747 next_send_request!()
749 .respond(Ok(to_response(&serde_json::json!({
750 "unchanged": [project_key],
751 }))))
752 .await;
753
754 let (response1, response2) = futures::future::join(response1, response2).await;
755 assert!(matches!(response1, Ok(Ok(SourceProjectState::NotModified))));
756 assert!(matches!(response2, Ok(Ok(SourceProjectState::NotModified))));
757
758 assert!(upstream_rx.try_recv().is_err());
760 }
761}