relay_server/endpoints/
project_configs.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use axum::extract::{Query, Request};
5use axum::handler::Handler;
6use axum::http::StatusCode;
7use axum::response::{IntoResponse, Result};
8use axum::{Json, RequestExt};
9use relay_base_schema::project::ProjectKey;
10use relay_dynamic_config::{ErrorBoundary, GlobalConfig};
11use serde::{Deserialize, Serialize};
12
13use crate::endpoints::common::ServiceUnavailable;
14use crate::endpoints::forward;
15use crate::extractors::SignedJson;
16use crate::service::ServiceState;
17use crate::services::global_config::{self, StatusResponse};
18use crate::services::projects::project::{
19 LimitedOutgoingProjectState, OutgoingProjectState, ProjectState, Revision,
20};
21use crate::utils::ApiErrorResponse;
22
23const ENDPOINT_V3: u16 = 3;
30
31#[derive(Debug, Clone, Copy, thiserror::Error)]
32#[error("This API version is no longer supported, upgrade your Relay or Client")]
33struct VersionOutdatedError;
34
35impl IntoResponse for VersionOutdatedError {
36 fn into_response(self) -> axum::response::Response {
37 (StatusCode::BAD_REQUEST, ApiErrorResponse::from_error(&self)).into_response()
38 }
39}
40
41#[derive(Clone, Copy, Debug, Deserialize)]
43struct VersionQuery {
44 #[serde(default)]
45 version: u16,
46}
47
48#[derive(Debug, Clone, Serialize)]
56#[serde(untagged)]
57enum ProjectStateWrapper {
58 Full(OutgoingProjectState),
59 Limited(#[serde(with = "LimitedOutgoingProjectState")] OutgoingProjectState),
60}
61
62impl ProjectStateWrapper {
63 pub fn new(state: OutgoingProjectState, full: bool) -> Self {
65 match full {
66 true => Self::Full(state),
67 false => Self::Limited(state),
68 }
69 }
70}
71
72#[derive(Debug, Serialize)]
82#[serde(rename_all = "camelCase")]
83struct GetProjectStatesResponseWrapper {
84 configs: HashMap<ProjectKey, ProjectStateWrapper>,
85 #[serde(skip_serializing_if = "Vec::is_empty")]
86 pending: Vec<ProjectKey>,
87 #[serde(skip_serializing_if = "Vec::is_empty")]
88 unchanged: Vec<ProjectKey>,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 global: Option<Arc<GlobalConfig>>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 global_status: Option<StatusResponse>,
93}
94
95#[derive(Debug, Deserialize)]
100#[serde(rename_all = "camelCase")]
101struct GetProjectStatesRequest {
102 public_keys: Vec<ErrorBoundary<ProjectKey>>,
104 revisions: Option<ErrorBoundary<Vec<Revision>>>,
109 #[serde(default)]
110 full_config: bool,
111 #[serde(default)]
112 global: bool,
113}
114
115fn into_valid_keys(
116 public_keys: Vec<ErrorBoundary<ProjectKey>>,
117 revisions: Option<ErrorBoundary<Vec<Revision>>>,
118) -> impl Iterator<Item = (ProjectKey, Revision)> {
119 let mut revisions = revisions.and_then(|e| e.ok()).unwrap_or_default();
120 if !revisions.is_empty() && revisions.len() != public_keys.len() {
121 relay_log::warn!(
125 "downstream sent {} project keys but {} revisions, discarding all revisions",
126 public_keys.len(),
127 revisions.len()
128 );
129 revisions.clear();
130 }
131 let revisions = revisions
132 .into_iter()
133 .chain(std::iter::repeat_with(Revision::default));
134
135 std::iter::zip(public_keys, revisions).filter_map(|(public_key, revision)| {
136 let public_key = public_key.ok()?;
139 Some((public_key, revision))
140 })
141}
142
143async fn inner(
144 state: ServiceState,
145 body: SignedJson<GetProjectStatesRequest>,
146) -> Result<impl IntoResponse, ServiceUnavailable> {
147 let SignedJson { inner, relay } = body;
148
149 let (global, global_status) = if inner.global {
150 match state.global_config().send(global_config::Get).await? {
151 global_config::Status::Ready(config) => (Some(config), Some(StatusResponse::Ready)),
152 global_config::Status::Pending => (
155 Some(GlobalConfig::default().into()),
156 Some(StatusResponse::Pending),
157 ),
158 }
159 } else {
160 (None, None)
161 };
162
163 let keys_len = inner.public_keys.len();
164 let mut pending = Vec::with_capacity(keys_len);
165 let mut unchanged = Vec::with_capacity(keys_len);
166 let mut configs = HashMap::with_capacity(keys_len);
167
168 for (project_key, revision) in into_valid_keys(inner.public_keys, inner.revisions) {
169 let project = state.project_cache_handle().get(project_key);
170
171 let project_info = match project.state() {
172 ProjectState::Enabled(info) => info,
173 ProjectState::Disabled => {
174 continue;
176 }
177 ProjectState::Pending => {
178 pending.push(project_key);
179 continue;
180 }
181 };
182
183 if project_info.rev == revision {
185 unchanged.push(project_key);
186 continue;
187 }
188
189 let has_access = relay.internal
192 || project_info
193 .config
194 .trusted_relays
195 .contains(&relay.public_key);
196
197 if has_access {
198 let full = relay.internal && inner.full_config;
199 let wrapper = ProjectStateWrapper::new(
200 OutgoingProjectState {
201 disabled: false,
202 info: Arc::clone(project_info),
203 upstream: state.config().advertised_upstream().cloned(),
204 },
205 full,
206 );
207 configs.insert(project_key, wrapper);
208 } else {
209 relay_log::debug!(
210 relay = %relay.public_key,
211 project_key = %project_key,
212 "relay does not have access to project key",
213 );
214 };
215 }
216
217 Ok(Json(GetProjectStatesResponseWrapper {
218 configs,
219 pending,
220 unchanged,
221 global,
222 global_status,
223 }))
224}
225
226fn is_outdated(Query(query): Query<VersionQuery>) -> bool {
228 query.version < ENDPOINT_V3
229}
230
231fn is_compatible(Query(query): Query<VersionQuery>) -> bool {
233 query.version == ENDPOINT_V3
234}
235
236pub async fn handle(state: ServiceState, mut req: Request) -> Result<impl IntoResponse> {
248 let data = req.extract_parts().await?;
249 if is_outdated(data) {
250 Err(VersionOutdatedError.into())
251 } else if is_compatible(data) {
252 Ok(inner.call(req, state).await)
253 } else {
254 Ok(forward::forward(state, req).await)
255 }
256}