1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::collections::hash_map::Entry;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future;
9use itertools::Itertools;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use relay_dynamic_config::ErrorBoundary;
13use relay_statsd::metric;
14use relay_system::{
15 Addr, BroadcastChannel, BroadcastResponse, BroadcastSender, FromMessage, Interface, Service,
16};
17use serde::{Deserialize, Serialize};
18use tokio::sync::mpsc;
19use tokio::time::Instant;
20
21use crate::services::projects::project::Revision;
22use crate::services::projects::project::{ParsedProjectState, ProjectState};
23use crate::services::projects::source::{FetchProjectState, SourceProjectState};
24use crate::services::upstream::{
25 Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError,
26};
27use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
28use crate::utils::{RetryBackoff, SleepHandle};
29
30#[derive(Debug, Serialize)]
35#[serde(rename_all = "camelCase")]
36pub struct GetProjectStates {
37 public_keys: Vec<ProjectKey>,
39 revisions: Vec<Revision>,
44 full_config: bool,
48 no_cache: bool,
50}
51
52#[derive(Debug, Deserialize, Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct GetProjectStatesResponse {
59 #[serde(default)]
61 configs: HashMap<ProjectKey, ErrorBoundary<Option<ParsedProjectState>>>,
62 #[serde(default)]
64 pending: HashSet<ProjectKey>,
65 #[serde(default)]
70 unchanged: HashSet<ProjectKey>,
71}
72
73impl UpstreamQuery for GetProjectStates {
74 type Response = GetProjectStatesResponse;
75
76 fn method(&self) -> Method {
77 Method::POST
78 }
79
80 fn path(&self) -> Cow<'static, str> {
81 Cow::Borrowed("/api/0/relays/projectconfigs/?version=3")
82 }
83
84 fn priority() -> RequestPriority {
85 RequestPriority::High
86 }
87
88 fn retry() -> bool {
89 false
90 }
91
92 fn route(&self) -> &'static str {
93 "project_configs"
94 }
95}
96
97#[derive(Debug)]
99struct ProjectStateChannel {
100 channel: BroadcastChannel<SourceProjectState>,
102 merged: Vec<BroadcastChannel<SourceProjectState>>,
104 revision: Revision,
105 deadline: Instant,
106 no_cache: bool,
107 attempts: u64,
108 errors: usize,
110 pending: usize,
112}
113
114impl ProjectStateChannel {
115 pub fn new(
116 sender: BroadcastSender<SourceProjectState>,
117 revision: Revision,
118 timeout: Duration,
119 no_cache: bool,
120 ) -> Self {
121 let now = Instant::now();
122 Self {
123 no_cache,
124 channel: sender.into_channel(),
125 merged: Vec::new(),
126 revision,
127 deadline: now + timeout,
128 attempts: 0,
129 errors: 0,
130 pending: 0,
131 }
132 }
133
134 pub fn no_cache(&mut self) {
135 self.no_cache = true;
136 }
137
138 pub fn attach(&mut self, sender: BroadcastSender<SourceProjectState>, revision: Revision) {
145 self.channel.attach(sender);
146 if self.revision != revision {
147 self.revision = Revision::default();
148 }
149 }
150
151 pub fn send(self, state: SourceProjectState) {
152 for channel in self.merged {
153 channel.send(state.clone());
154 }
155 self.channel.send(state)
156 }
157
158 pub fn expired(&self) -> bool {
159 Instant::now() > self.deadline
160 }
161
162 pub fn merge(&mut self, channel: ProjectStateChannel) {
163 let ProjectStateChannel {
164 channel,
165 merged,
166 revision,
167 deadline,
168 no_cache,
169 attempts,
170 errors,
171 pending,
172 } = channel;
173
174 self.merged.push(channel);
175 self.merged.extend(merged);
176 if self.revision != revision {
177 self.revision = Revision::default();
178 }
179 self.deadline = self.deadline.max(deadline);
180 self.no_cache |= no_cache;
181 self.attempts += attempts;
182 self.errors += errors;
183 self.pending += pending;
184 }
185}
186
187type ProjectStateChannels = HashMap<ProjectKey, ProjectStateChannel>;
189
190#[derive(Debug)]
196pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender<SourceProjectState>);
197
198impl Interface for UpstreamProjectSource {}
199
200impl FromMessage<FetchProjectState> for UpstreamProjectSource {
201 type Response = BroadcastResponse<SourceProjectState>;
202
203 fn from_message(
204 message: FetchProjectState,
205 sender: BroadcastSender<SourceProjectState>,
206 ) -> Self {
207 Self(message, sender)
208 }
209}
210
211struct ChannelsBatch {
213 nocache_channels: Vec<(ProjectKey, ProjectStateChannel)>,
214 cache_channels: Vec<(ProjectKey, ProjectStateChannel)>,
215}
216
217struct UpstreamResponse {
219 channels_batch: ProjectStateChannels,
220 response: Result<GetProjectStatesResponse, UpstreamRequestError>,
221}
222
223#[derive(Debug)]
225pub struct UpstreamProjectSourceService {
226 backoff: RetryBackoff,
227 config: Arc<Config>,
228 upstream_relay: Addr<UpstreamRelay>,
229 state_channels: ProjectStateChannels,
230 inner_tx: mpsc::UnboundedSender<Vec<Option<UpstreamResponse>>>,
231 inner_rx: mpsc::UnboundedReceiver<Vec<Option<UpstreamResponse>>>,
232 fetch_handle: SleepHandle,
233 last_failed_fetch: Option<Instant>,
239 failure_interval: Duration,
243}
244
245impl UpstreamProjectSourceService {
246 pub fn new(config: Arc<Config>, upstream_relay: Addr<UpstreamRelay>) -> Self {
248 let (inner_tx, inner_rx) = mpsc::unbounded_channel();
249
250 Self {
251 backoff: RetryBackoff::new(config.http_max_retry_interval()),
252 state_channels: HashMap::new(),
253 fetch_handle: SleepHandle::idle(),
254 upstream_relay,
255 inner_tx,
256 inner_rx,
257 last_failed_fetch: None,
258 failure_interval: config.http_project_failure_interval(),
259 config,
260 }
261 }
262
263 fn next_backoff(&mut self) -> Duration {
268 self.config.query_batch_interval() + self.backoff.next_backoff()
269 }
270
271 fn prepare_batches(&mut self) -> ChannelsBatch {
274 let batch_size = self.config.query_batch_size();
275 let num_batches = self.config.max_concurrent_queries();
276
277 let projects: Vec<_> = (self.state_channels.keys().copied())
284 .take(batch_size * num_batches)
285 .collect();
286
287 let fresh_channels = (projects.iter())
288 .filter_map(|id| Some((*id, self.state_channels.remove(id)?)))
289 .filter(|(id, channel)| {
290 if channel.expired() {
291 metric!(
292 histogram(RelayHistograms::ProjectStateAttempts) = channel.attempts,
293 result = "timeout",
294 );
295 metric!(
296 counter(RelayCounters::ProjectUpstreamCompleted) += 1,
297 result = "timeout",
298 );
299 relay_log::error!(
300 errors = channel.errors,
301 pending = channel.pending,
302 tags.did_error = channel.errors > 0,
303 tags.was_pending = channel.pending > 0,
304 tags.project_key = id.to_string(),
305 "error fetching project state {id}: deadline exceeded",
306 );
307 }
308 !channel.expired()
309 });
310
311 let (nocache_channels, cache_channels): (Vec<_>, Vec<_>) =
314 fresh_channels.partition(|(_id, channel)| channel.no_cache);
315
316 let total_count = cache_channels.len() + nocache_channels.len();
317
318 metric!(histogram(RelayHistograms::ProjectStatePending) = self.state_channels.len() as u64);
319
320 relay_log::debug!(
321 "updating project states for {}/{} projects (attempt {})",
322 total_count,
323 total_count + self.state_channels.len(),
324 self.backoff.attempt(),
325 );
326
327 ChannelsBatch {
328 nocache_channels,
329 cache_channels,
330 }
331 }
332
333 fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) {
340 match self.state_channels.entry(key) {
341 Entry::Vacant(e) => {
342 e.insert(channel);
343 }
344 Entry::Occupied(mut e) => {
345 e.get_mut().merge(channel);
346 }
347 }
348 }
349
350 async fn fetch_states(
355 config: Arc<Config>,
356 upstream_relay: Addr<UpstreamRelay>,
357 channels: ChannelsBatch,
358 ) -> Vec<Option<UpstreamResponse>> {
359 let request_start = Instant::now();
360 let batch_size = config.query_batch_size();
361 let cache_batches = channels.cache_channels.into_iter().chunks(batch_size);
362 let nocache_batches = channels.nocache_channels.into_iter().chunks(batch_size);
363
364 let mut requests = vec![];
365 #[allow(clippy::useless_conversion)]
369 for channels_batch in cache_batches.into_iter().chain(nocache_batches.into_iter()) {
370 let mut channels_batch: ProjectStateChannels = channels_batch.collect();
371 for channel in channels_batch.values_mut() {
372 channel.attempts += 1;
373 }
374 relay_log::debug!("sending request of size {}", channels_batch.len());
375 metric!(
376 histogram(RelayHistograms::ProjectStateRequestBatchSize) =
377 channels_batch.len() as u64
378 );
379
380 let query = GetProjectStates {
381 public_keys: channels_batch.keys().copied().collect(),
382 revisions: channels_batch
383 .values()
384 .map(|c| c.revision.clone())
385 .collect(),
386 full_config: config.processing_enabled() || config.request_full_project_config(),
387 no_cache: channels_batch.values().any(|c| c.no_cache),
388 };
389
390 metric!(counter(RelayCounters::ProjectStateRequest) += 1);
392
393 let upstream_relay = upstream_relay.clone();
394 requests.push(async move {
395 match upstream_relay.send(SendQuery(query)).await {
396 Ok(response) => Some(UpstreamResponse {
397 channels_batch,
398 response,
399 }),
400 Err(_err) => {
407 relay_log::error!("failed to send the request to upstream: channel full");
408 None
409 }
410 }
411 });
412 }
413
414 let responses = future::join_all(requests).await;
416 metric!(timer(RelayTimers::ProjectStateRequestDuration) = request_start.elapsed());
417 responses
418 }
419
420 fn schedule_fetch(&mut self) {
424 if self.fetch_handle.is_idle() {
425 let wait = self.next_backoff();
426 self.fetch_handle.set(wait);
427 }
428 }
429
430 fn handle_responses(&mut self, responses: Vec<Option<UpstreamResponse>>) {
432 for response in responses.into_iter().flatten() {
434 let UpstreamResponse {
435 channels_batch,
436 response,
437 } = response;
438
439 match response {
440 Ok(mut response) => {
441 self.backoff.reset();
448 self.last_failed_fetch = None;
449
450 metric!(
452 histogram(RelayHistograms::ProjectStateReceived) =
453 response.configs.len() as u64
454 );
455 for (key, mut channel) in channels_batch {
456 if response.pending.contains(&key) {
457 channel.pending += 1;
458 self.merge_channel(key, channel);
459 continue;
460 }
461
462 let mut result = "ok";
463 let state = if response.unchanged.contains(&key) {
464 result = "ok_unchanged";
465 SourceProjectState::NotModified
466 } else {
467 let state = response
468 .configs
469 .remove(&key)
470 .unwrap_or(ErrorBoundary::Ok(None));
471
472 let state = match state {
473 ErrorBoundary::Err(error) => {
474 result = "invalid";
475 let error = &error as &dyn std::error::Error;
476 relay_log::error!(error, "error fetching project state {key}");
477 ProjectState::Pending
478 }
479 ErrorBoundary::Ok(None) => ProjectState::Disabled,
480 ErrorBoundary::Ok(Some(state)) => state.into(),
481 };
482
483 SourceProjectState::New(state)
484 };
485
486 metric!(
487 histogram(RelayHistograms::ProjectStateAttempts) = channel.attempts,
488 result = result,
489 );
490 metric!(
491 counter(RelayCounters::ProjectUpstreamCompleted) += 1,
492 result = result,
493 );
494
495 channel.send(state);
496 }
497 }
498 Err(err) => {
499 self.track_failed_response();
500
501 let attempts = channels_batch
502 .values()
503 .map(|b| b.attempts)
504 .max()
505 .unwrap_or(0);
506 if attempts >= 2 {
510 relay_log::error!(
511 error = &err as &dyn std::error::Error,
512 attempts = attempts,
513 "error fetching project states",
514 );
515 }
516
517 metric!(
518 histogram(RelayHistograms::ProjectStatePending) =
519 self.state_channels.len() as u64
520 );
521 self.state_channels.extend(channels_batch.into_iter().map(
523 |(key, mut channel)| {
524 channel.errors += 1;
525 (key, channel)
526 },
527 ))
528 }
529 }
530 }
531
532 if !self.state_channels.is_empty() {
533 self.schedule_fetch()
534 } else {
535 self.backoff.reset();
547 }
548 }
549
550 fn track_failed_response(&mut self) {
552 match self.last_failed_fetch {
553 None => self.last_failed_fetch = Some(Instant::now()),
554 Some(last_failed) => {
555 let failure_duration = last_failed.elapsed();
556 if failure_duration >= self.failure_interval {
557 relay_log::error!(
558 failure_duration = format!("{} seconds", failure_duration.as_secs()),
559 backoff_attempts = self.backoff.attempt(),
560 "can't fetch project states"
561 );
562 }
563 }
564 }
565 metric!(counter(RelayCounters::ProjectUpstreamFailed) += 1);
566 }
567
568 fn do_fetch(&mut self) {
570 self.fetch_handle.reset();
571
572 if self.state_channels.is_empty() {
573 relay_log::error!("project state schedule fetch request without projects");
574 return;
575 }
576
577 let config = self.config.clone();
578 let inner_tx = self.inner_tx.clone();
579 let channels = self.prepare_batches();
580 let upstream_relay = self.upstream_relay.clone();
581
582 relay_system::spawn!(async move {
583 let responses = Self::fetch_states(config, upstream_relay, channels).await;
584 if inner_tx.send(responses).is_err() {
587 relay_log::error!("unable to forward the requests to further processing");
588 }
589 });
590 }
591
592 fn handle_message(&mut self, message: UpstreamProjectSource) {
594 let UpstreamProjectSource(
595 FetchProjectState {
596 project_key,
597 current_revision,
598 no_cache,
599 },
600 sender,
601 ) = message;
602
603 let query_timeout = self.config.query_timeout();
604
605 match self.state_channels.entry(project_key) {
608 Entry::Vacant(entry) => {
609 entry.insert(ProjectStateChannel::new(
610 sender,
611 current_revision,
612 query_timeout,
613 no_cache,
614 ));
615 }
616 Entry::Occupied(mut entry) => {
617 let channel = entry.get_mut();
618 channel.attach(sender, current_revision);
619 if no_cache {
622 channel.no_cache();
623 }
624 }
625 }
626
627 if !self.backoff.started() {
629 self.backoff.reset();
630 self.schedule_fetch();
631 }
632 }
633}
634
635impl Service for UpstreamProjectSourceService {
636 type Interface = UpstreamProjectSource;
637
638 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
639 relay_log::info!("project upstream cache started");
640 loop {
641 tokio::select! {
642 biased;
643
644 () = &mut self.fetch_handle => self.do_fetch(),
645 Some(responses) = self.inner_rx.recv() => self.handle_responses(responses),
646 Some(message) = rx.recv() => self.handle_message(message),
647
648 else => break,
649 }
650 }
651 relay_log::info!("project upstream cache stopped");
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use crate::http::Response;
658 use futures::future::poll_immediate;
659
660 use super::*;
661
662 fn to_response(body: &impl serde::Serialize) -> Response {
663 let body = serde_json::to_vec(body).unwrap();
664 let response = http::response::Response::builder()
665 .status(http::StatusCode::OK)
666 .header(http::header::CONTENT_LENGTH, body.len())
667 .body(body)
668 .unwrap();
669
670 Response(response.into())
671 }
672
673 #[tokio::test]
674 async fn test_schedule_merge_channels() {
675 let (upstream_addr, mut upstream_rx) = Addr::custom();
676 let config = Arc::new(Config::from_json_value(serde_json::json!({})).unwrap());
677 let project_key = ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap();
678
679 macro_rules! next_send_request {
680 () => {{
681 let UpstreamRelay::SendRequest(mut req) = upstream_rx.recv().await.unwrap() else {
682 panic!()
683 };
684 req.configure(&config);
685 req
686 }};
687 }
688
689 let service =
690 UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start_detached();
691
692 let mut response1 = service.send(FetchProjectState {
693 project_key,
694 current_revision: "123".into(),
695 no_cache: false,
696 });
697
698 let request1 = next_send_request!();
700
701 let mut response2 = service.send(FetchProjectState {
704 project_key,
705 current_revision: Revision::default(),
706 no_cache: false,
707 });
708
709 request1
712 .respond(Ok(to_response(&serde_json::json!({
713 "pending": [project_key],
714 }))))
715 .await;
716
717 assert!(poll_immediate(&mut response1).await.is_none());
719 assert!(poll_immediate(&mut response2).await.is_none());
720
721 next_send_request!()
723 .respond(Ok(to_response(&serde_json::json!({
724 "unchanged": [project_key],
725 }))))
726 .await;
727
728 let (response1, response2) = futures::future::join(response1, response2).await;
729 assert!(matches!(response1, Ok(SourceProjectState::NotModified)));
730 assert!(matches!(response2, Ok(SourceProjectState::NotModified)));
731
732 assert!(upstream_rx.try_recv().is_err());
734 }
735}