relay_server/services/
upstream.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
//! Service to communicate with the upstream.
//!
//! Most importantly, this module declares the [`UpstreamRelay`] service and its main implementation
//! [`UpstreamRelayService`] along with messages to communicate with the service. Please look at
//! service-level docs for more information.

use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use itertools::Itertools;
use relay_auth::{RegisterChallenge, RegisterRequest, RegisterResponse, Registration};
use relay_config::{Config, Credentials, RelayMode};
use relay_quotas::{
    DataCategories, QuotaScope, RateLimit, RateLimitScope, RateLimits, ReasonCode, RetryAfter,
    Scoping,
};
use relay_system::{
    AsyncResponse, FromMessage, Interface, MessageResponse, NoResponse, Sender, Service,
};
use reqwest::header;
pub use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::time::Instant;

use crate::http::{HttpError, Request, RequestBuilder, Response, StatusCode};
use crate::statsd::{RelayHistograms, RelayTimers};
use crate::utils::{self, ApiErrorResponse, RelayErrorAction, RetryBackoff};

/// Rate limits returned by the upstream.
///
/// Upstream rate limits can come in two forms:
///  - `Retry-After` header with a generic timeout for all categories.
///  - `X-Sentry-Rate-Limits` header with fine-grained information on applied rate limits.
///
/// These limits do not carry scope information. Use `UpstreamRateLimits::scope` to attach scope
/// identifiers and return a fully populated `RateLimits` instance.
#[derive(Debug, Clone)]
pub struct UpstreamRateLimits {
    retry_after: RetryAfter,
    rate_limits: String,
}

impl UpstreamRateLimits {
    /// Creates an empty `UpstreamRateLimits` instance.
    fn new() -> Self {
        Self {
            retry_after: RetryAfter::from_secs(0),
            rate_limits: String::new(),
        }
    }

    /// Adds the `Retry-After` header to this rate limits instance.
    fn retry_after(mut self, header: Option<&str>) -> Self {
        if let Some(retry_after) = header.and_then(|s| s.parse().ok()) {
            self.retry_after = retry_after;
        }
        self
    }

    /// Adds the `X-Sentry-Rate-Limits` header to this instance.
    ///
    /// If multiple header values are given, this header should be joined. If the header is empty,
    /// an empty string should be passed.
    fn rate_limits(mut self, header: String) -> Self {
        self.rate_limits = header;
        self
    }

    /// Creates a scoped rate limit instance based on the provided `Scoping`.
    pub fn scope(self, scoping: &Scoping) -> RateLimits {
        // Try to parse the `X-Sentry-Rate-Limits` header in the most lenient way possible. If
        // anything goes wrong, skip over the invalid parts.
        let mut rate_limits = utils::parse_rate_limits(scoping, &self.rate_limits);

        // If there are no new-style rate limits in the header, fall back to the `Retry-After`
        // header. Create a default rate limit that only applies to the current data category at the
        // most specific scope (Key).
        // One example of such a generic rate limit is the anti-abuse nginx layer used by SaaS.
        if !rate_limits.is_limited() {
            rate_limits.add(RateLimit {
                categories: DataCategories::new(),
                scope: RateLimitScope::for_quota(scoping, QuotaScope::Key),
                reason_code: Some(ReasonCode::new("generic")),
                retry_after: self.retry_after,
                namespaces: Default::default(),
            });
        }
        rate_limits
    }
}

/// An error returned from [`SendRequest`] and [`SendQuery`].
#[derive(Debug, thiserror::Error)]
pub enum UpstreamRequestError {
    #[error("attempted to send upstream request without credentials configured")]
    NoCredentials,

    /// As opposed to HTTP variant this contains all network errors.
    #[error("could not send request to upstream")]
    SendFailed(#[from] reqwest::Error),

    /// Likely a bad HTTP status code or unparseable response.
    #[error("could not send request")]
    Http(#[from] HttpError),

    #[error("upstream requests rate limited")]
    RateLimited(UpstreamRateLimits),

    #[error("upstream request returned error {0}")]
    ResponseError(StatusCode, #[source] ApiErrorResponse),

    #[error("channel closed")]
    ChannelClosed,

    #[error("upstream permanently denied authentication")]
    AuthDenied,
}

impl UpstreamRequestError {
    /// Returns the status code of the HTTP request sent to the upstream.
    ///
    /// If this error is the result of sending a request to the upstream, this method returns `Some`
    /// with the status code. If the request could not be made or the error originates elsewhere,
    /// this returns `None`.
    fn status_code(&self) -> Option<StatusCode> {
        match self {
            UpstreamRequestError::ResponseError(code, _) => Some(*code),
            UpstreamRequestError::Http(HttpError::Reqwest(e)) => e.status(),
            _ => None,
        }
    }

    /// Returns `true` if the error indicates a network downtime.
    fn is_network_error(&self) -> bool {
        match self {
            Self::SendFailed(_) => true,
            Self::ResponseError(code, _) => matches!(code.as_u16(), 502..=504),
            Self::Http(http) => http.is_network_error(),
            _ => false,
        }
    }

    /// Returns `true` if the upstream has permanently rejected this Relay.
    ///
    /// This Relay should cease communication with the upstream and may shut down.
    fn is_permanent_rejection(&self) -> bool {
        match self {
            Self::ResponseError(status_code, response) => {
                *status_code == StatusCode::FORBIDDEN
                    && response.relay_action() == RelayErrorAction::Stop
            }
            _ => false,
        }
    }

    /// Returns `true` if the request was received by the upstream.
    ///
    /// Despite resulting in an error, the server has received and acknowledged the request. This
    /// includes rate limits (status code 429), and bad payloads (4XX), but not network errors
    /// (502-504).
    pub fn is_received(&self) -> bool {
        match self {
            // Rate limits are a special case of `ResponseError(429, _)`.
            Self::RateLimited(_) => true,
            // Everything except network errors indicates the upstream has handled this request.
            Self::ResponseError(_, _) | Self::Http(_) => !self.is_network_error(),
            // Remaining kinds indicate a failure to send the request.
            Self::NoCredentials | Self::SendFailed(_) | Self::ChannelClosed | Self::AuthDenied => {
                false
            }
        }
    }

    /// Returns a categorized description of the error.
    ///
    /// This is used for metrics and logging.
    fn description(&self) -> &'static str {
        match self {
            UpstreamRequestError::NoCredentials => "credentials",
            UpstreamRequestError::SendFailed(_) => "send_failed",
            UpstreamRequestError::Http(HttpError::Io(_)) => "payload_failed",
            UpstreamRequestError::Http(HttpError::Json(_)) => "invalid_json",
            UpstreamRequestError::Http(HttpError::Reqwest(_)) => "reqwest_error",
            UpstreamRequestError::Http(HttpError::Overflow) => "overflow",
            UpstreamRequestError::RateLimited(_) => "rate_limited",
            UpstreamRequestError::ResponseError(_, _) => "response_error",
            UpstreamRequestError::ChannelClosed => "channel_closed",
            UpstreamRequestError::AuthDenied => "auth_denied",
        }
    }
}

/// Checks the authentication state with the upstream.
///
/// In static and proxy mode, Relay does not require authentication and `IsAuthenticated` always
/// returns `true`. Otherwise, this message retrieves the current state of authentication:
///
/// - Initially, Relay is unauthenticated until it has established connection.
/// - If this Relay is not known by the upstream, it remains unauthenticated indefinitely.
/// - Once Relay has registered, this message reports `true`.
/// - In periodic intervals Relay re-authenticates, which may drop authentication temporarily.
#[derive(Debug)]
pub struct IsAuthenticated;

/// Returns whether Relay is in an outage state.
///
/// On repeated failure to submit requests to the upstream, the upstream service moves into an
/// outage state. During this phase, no requests or retries are performed and all newly submitted
/// [`SendRequest`] and [`SendQuery`] messages are put into the queue. Once connection is
/// reestablished, requests resume in FIFO order.
///
/// This message resolves to `true` if Relay is in outage mode and `false` if the service is
/// performing regular operation.
#[derive(Debug)]
pub struct IsNetworkOutage;

/// Priority of an upstream request.
///
/// See [`UpstreamRequest::priority`] for more information.
#[derive(Clone, Copy, Debug)]
pub enum RequestPriority {
    /// High priority, low volume messages (e.g. ProjectConfig, ProjectStates, Registration messages).
    High,
    /// Low priority, high volume messages (e.g. Events and Outcomes).
    Low,
}

impl RequestPriority {
    /// The name of the priority for logging and metrics.
    fn name(&self) -> &'static str {
        match self {
            RequestPriority::High => "high",
            RequestPriority::Low => "low",
        }
    }
}

impl fmt::Display for RequestPriority {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.name())
    }
}

/// Represents a generic HTTP request to be sent to the upstream.
pub trait UpstreamRequest: Send + Sync + fmt::Debug {
    /// The HTTP method of the request.
    fn method(&self) -> Method;

    /// The path relative to the upstream.
    fn path(&self) -> Cow<'_, str>;

    /// Whether this request should retry on network errors.
    ///
    /// Defaults to `true` and should be disabled if there is an external retry mechanism. Note that
    /// failures other than network errors will **not** be retried.
    fn retry(&self) -> bool {
        true
    }

    /// The queueing priority of the request.
    ///
    ///  - High priority requests are always sent and retried first.
    ///  - Low priority requests are sent if no high-priority messages are pending in the queue.
    ///    This also applies to retries: A low-priority message is only sent if there are no
    ///    high-priority requests waiting.
    ///
    /// Within the same priority, requests are delivered in FIFO order.
    ///
    /// Defaults to [`Low`](RequestPriority::Low).
    fn priority(&self) -> RequestPriority {
        RequestPriority::Low
    }

    /// Controls whether request errors should be intercepted.
    ///
    /// By default, error codes from responses will be intercepted and returned as
    /// [`UpstreamRequestError`]. This also includes parsing of the request body for diagnostics.
    /// Return `false` to disable this behavior and receive the verbatim response.
    fn intercept_status_errors(&self) -> bool {
        true
    }

    /// Add the `X-Sentry-Relay-Id` header to the outgoing request.
    ///
    /// This header is used for authentication with the upstream and should be enabled only for
    /// endpoints that require it.
    ///
    /// Defaults to `true`.
    fn set_relay_id(&self) -> bool {
        true
    }

    /// Add the `X-Sentry-Relay-Signature` header to the outgoing request.
    ///
    /// When no signature should be added, this method should return `None`. Otherwise, this method
    /// should return the payload to sign. For requests with content encoding, this must be the
    /// **uncompressed** payload.
    ///
    /// This requires configuration of the Relay's credentials. If the credentials are not
    /// configured, the request will fail with [`UpstreamRequestError::NoCredentials`].
    ///
    /// Defaults to `None`.
    fn sign(&mut self) -> Option<Bytes> {
        None
    }

    /// Returns the name of the logical route.
    ///
    /// This is used for internal metrics and logging. Other than the path, this cannot contain
    /// dynamic elements and should be globally unique.
    fn route(&self) -> &'static str;

    /// Callback to apply configuration to the request.
    ///
    /// This hook is called at least once before `build`. It can be used to include additional
    /// properties from Relay's config in the Request before it is sent or handled if during request
    /// creation time the configuration is not available.
    ///
    /// This method is optional and defaults to a no-op.
    fn configure(&mut self, _config: &Config) {}

    /// Callback to build the outgoing web request.
    ///
    /// This callback populates the initialized request with headers and a request body.
    ///
    /// Note that this function can be called repeatedly if [`retry`](UpstreamRequest::retry)
    /// returns `true`. This function should therefore not move out of the request struct, but can
    /// use it to memoize heavy computation.
    fn build(&mut self, _builder: &mut RequestBuilder) -> Result<(), HttpError> {
        Ok(())
    }

    /// Callback to complete an HTTP request.
    ///
    /// This callback receives the response or error. At time of invocation, the response body has
    /// not been consumed. The response body or derived information can then be sent into a channel.
    fn respond(
        self: Box<Self>,
        result: Result<Response, UpstreamRequestError>,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
}

/// Sends a [request](UpstreamRequest) to the upstream and resolves the response.
///
/// This message is fire-and-forget. The result of sending the request is passed to the
/// [`UpstreamRequest::respond`] method, which can be used to process it and send it to a dedicated
/// channel.
#[derive(Debug)]
pub struct SendRequest<T: UpstreamRequest>(pub T);

/// Higher-level version of an [`UpstreamRequest`] with JSON request and response payloads.
///
/// The struct that implements the `UpstreamQuery` type has to be serializable and will be used as
/// JSON request body. The response body struct is declared via the associated `Response` type.
pub trait UpstreamQuery: Serialize + Send + Sync + fmt::Debug {
    /// The response type that will be deserialized from successful queries.
    type Response: DeserializeOwned + Send;

    /// The HTTP method of the query.
    fn method(&self) -> Method;

    /// The path relative to the upstream.
    fn path(&self) -> Cow<'static, str>;

    /// Whether this query should retry on network errors.
    ///
    /// This should be disabled if there is an external retry mechnism. Note that failures other
    /// than network errors will **not** be retried.
    fn retry() -> bool;

    /// The queueing priority of the query.
    ///
    ///  - High priority queries are always sent and retried first.
    ///  - Low priority queries are sent if no high-priority messages are pending in the queue.
    ///    This also applies to retries: A low-priority message is only sent if there are no
    ///    high-priority requests waiting.
    ///
    /// Within the same priority, queries are delivered in FIFO order.
    ///
    /// Defaults to [`Low`](RequestPriority::Low).
    fn priority() -> RequestPriority {
        RequestPriority::Low
    }

    /// Returns the name of the logical route.
    ///
    /// This is used for internal metrics and logging. Other than the path, this cannot contain
    /// dynamic elements and should be globally unique.
    fn route(&self) -> &'static str;
}

/// Transmitting end of the return channel for [`UpstreamQuery`].
type QuerySender<T> = Sender<Result<<T as UpstreamQuery>::Response, UpstreamRequestError>>;

/// Memoized implementation of [`UpstreamRequest`] for an [`UpstreamQuery`].
///
/// This can be used to send queries as requests to the upstream. The request wraps an internal
/// channel to send responses to.
#[derive(Debug)]
struct UpstreamQueryRequest<T: UpstreamQuery> {
    query: T,
    body: Option<Bytes>,
    max_response_size: usize,
    sender: QuerySender<T>,
}

impl<T> UpstreamQueryRequest<T>
where
    T: UpstreamQuery + 'static,
{
    /// Wraps the given `query` in an [`UpstreamQuery`] implementation.
    pub fn new(query: T, sender: QuerySender<T>) -> Self {
        Self {
            query,
            body: None,
            max_response_size: 0,
            sender,
        }
    }

    /// Memoize the serialized body for retries and signing.
    fn body(&mut self) -> Result<Bytes, HttpError> {
        let body = match self.body {
            Some(ref body) => body,
            None => self.body.insert(serde_json::to_vec(&self.query)?.into()),
        };

        Ok(body.clone())
    }
}

impl<T> UpstreamRequest for UpstreamQueryRequest<T>
where
    T: UpstreamQuery + 'static,
{
    fn retry(&self) -> bool {
        T::retry()
    }

    fn priority(&self) -> RequestPriority {
        T::priority()
    }

    fn intercept_status_errors(&self) -> bool {
        true
    }

    fn set_relay_id(&self) -> bool {
        true
    }

    fn sign(&mut self) -> Option<Bytes> {
        // Computing the body is practically infallible since we're serializing standard structures
        // into a string. Even if it fails, `sign` is called after `build` and the error will be
        // reported there.
        self.body().ok()
    }

    fn method(&self) -> Method {
        self.query.method()
    }

    fn path(&self) -> Cow<'_, str> {
        self.query.path()
    }

    fn route(&self) -> &'static str {
        self.query.route()
    }

    fn configure(&mut self, config: &Config) {
        // This config attribute is needed during `respond`, which does not have access to the
        // config. For this reason, we need to store it on the request struct.
        self.max_response_size = config.max_api_payload_size();
    }

    fn build(&mut self, builder: &mut RequestBuilder) -> Result<(), HttpError> {
        let body = self.body()?;

        relay_statsd::metric!(
            histogram(RelayHistograms::UpstreamQueryBodySize) = body.len() as u64
        );

        builder
            .header(header::CONTENT_TYPE, b"application/json")
            .body(body.clone());

        Ok(())
    }

    fn respond(
        self: Box<Self>,
        result: Result<Response, UpstreamRequestError>,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
        Box::pin(async move {
            let result = match result {
                Ok(response) => response
                    .json(self.max_response_size)
                    .await
                    .map_err(UpstreamRequestError::Http),
                Err(error) => Err(error),
            };

            self.sender.send(result)
        })
    }
}

/// Sends a [query](UpstreamQuery) to the upstream and resolves the response.
///
/// The result of the query is resolved asynchronously as response to the message. The query will
/// still be performed even if the response is not awaited.
#[derive(Debug)]
pub struct SendQuery<T: UpstreamQuery>(pub T);

/// Communication with the upstream via HTTP.
///
/// This service can send two main types of requests to the upstream, which can in turn be a Relay
/// or the Sentry webserver:
///
///  - [`SendRequest`] sends a plain HTTP request to the upstream that can be configured and handled
///    freely. Request implementations specify their priority and whether they should be retried
///    automatically by the upstream service.
///  - [`SendQuery`] sends a higher-level request with a standardized JSON body and resolves to a
///    JSON response. The upstream service will automatically sign the message with its private key
///    for authentication.
///
/// The upstream is also responsible to maintain the connection with the upstream. There are two
/// main messages to inquire about the connection state:
///
///  - [`IsAuthenticated`]
///  - [`IsNetworkOutage`]
#[derive(Debug)]
pub enum UpstreamRelay {
    /// Checks the authentication state with the upstream.
    IsAuthenticated(IsAuthenticated, Sender<bool>),
    /// Returns whether Relay is in an outage state.
    IsNetworkOutage(IsNetworkOutage, Sender<bool>),
    /// Sends a [request](SendRequest) or [query](SendQuery) to the upstream.
    SendRequest(Box<dyn UpstreamRequest>),
}

impl Interface for UpstreamRelay {}

impl FromMessage<IsAuthenticated> for UpstreamRelay {
    type Response = AsyncResponse<bool>;

    fn from_message(message: IsAuthenticated, sender: Sender<bool>) -> Self {
        Self::IsAuthenticated(message, sender)
    }
}

impl FromMessage<IsNetworkOutage> for UpstreamRelay {
    type Response = AsyncResponse<bool>;

    fn from_message(message: IsNetworkOutage, sender: Sender<bool>) -> Self {
        Self::IsNetworkOutage(message, sender)
    }
}

impl<T> FromMessage<SendRequest<T>> for UpstreamRelay
where
    T: UpstreamRequest + 'static,
{
    type Response = NoResponse;

    fn from_message(message: SendRequest<T>, _: ()) -> Self {
        let SendRequest(request) = message;
        Self::SendRequest(Box::new(request))
    }
}

impl<T> FromMessage<SendQuery<T>> for UpstreamRelay
where
    T: UpstreamQuery + 'static,
{
    type Response = AsyncResponse<Result<T::Response, UpstreamRequestError>>;

    fn from_message(message: SendQuery<T>, sender: QuerySender<T>) -> Self {
        let SendQuery(query) = message;
        Self::SendRequest(Box::new(UpstreamQueryRequest::new(query, sender)))
    }
}

/// Captures statsd metrics for a completed upstream request.
fn emit_response_metrics(
    send_start: Instant,
    entry: &Entry,
    send_result: &Result<Response, UpstreamRequestError>,
) {
    let description = match send_result {
        Ok(_) => "success",
        Err(e) => e.description(),
    };
    let status_code = match send_result {
        Ok(ref response) => Some(response.status()),
        Err(ref error) => error.status_code(),
    };
    let status_str = status_code.as_ref().map(|c| c.as_str()).unwrap_or("-");

    relay_statsd::metric!(
        timer(RelayTimers::UpstreamRequestsDuration) = send_start.elapsed(),
        result = description,
        status_code = status_str,
        route = entry.request.route(),
        retries = match entry.retries {
            0 => "0",
            1 => "1",
            2 => "2",
            3..=10 => "few",
            _ => "many",
        },
    );

    relay_statsd::metric!(
        histogram(RelayHistograms::UpstreamRetries) = entry.retries as u64,
        result = description,
        status_code = status_str,
        route = entry.request.route(),
    );
}

/// Checks the status of the network connection with the upstream server.
#[derive(Debug)]
struct GetHealthCheck;

impl UpstreamRequest for GetHealthCheck {
    fn method(&self) -> Method {
        Method::GET
    }

    fn path(&self) -> Cow<'_, str> {
        Cow::Borrowed("/api/0/relays/live/")
    }

    fn retry(&self) -> bool {
        false
    }

    fn priority(&self) -> RequestPriority {
        unreachable!("sent directly to client")
    }

    fn set_relay_id(&self) -> bool {
        true
    }

    fn intercept_status_errors(&self) -> bool {
        true
    }

    fn route(&self) -> &'static str {
        "check_live"
    }

    fn respond(
        self: Box<Self>,
        result: Result<Response, UpstreamRequestError>,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
        Box::pin(async {
            if let Ok(mut response) = result {
                response.consume().await.ok();
            }
        })
    }
}

impl UpstreamQuery for RegisterRequest {
    type Response = RegisterChallenge;

    fn method(&self) -> Method {
        Method::POST
    }

    fn path(&self) -> Cow<'static, str> {
        Cow::Borrowed("/api/0/relays/register/challenge/")
    }

    fn priority() -> RequestPriority {
        unreachable!("sent directly to client")
    }

    fn retry() -> bool {
        false
    }

    fn route(&self) -> &'static str {
        "challenge"
    }
}

impl UpstreamQuery for RegisterResponse {
    type Response = Registration;

    fn method(&self) -> Method {
        Method::POST
    }

    fn path(&self) -> Cow<'static, str> {
        Cow::Borrowed("/api/0/relays/register/response/")
    }

    fn priority() -> RequestPriority {
        unreachable!("sent directly to client")
    }

    fn retry() -> bool {
        false
    }

    fn route(&self) -> &'static str {
        "response"
    }
}

/// A shared, asynchronous client to build and execute requests.
///
/// The main way to send a request through this client is [`send`](Self::send).
///
/// This instance holds a shared reference internally and can be cloned directly, so it does not
/// have to be placed in an `Arc`.
#[derive(Debug, Clone)]
struct SharedClient {
    config: Arc<Config>,
    reqwest: reqwest::Client,
}

impl SharedClient {
    /// Creates a new `SharedClient` instance.
    pub fn build(config: Arc<Config>) -> Self {
        let reqwest = reqwest::ClientBuilder::new()
            .connect_timeout(config.http_connection_timeout())
            .timeout(config.http_timeout())
            // In the forward endpoint, this means that content negotiation is done twice, and the
            // response body is first decompressed by the client, then re-compressed by the server.
            .gzip(true)
            // Enables async resolver through the `hickory-dns` crate, which uses an LRU cache for
            // the resolved entries. This helps to limit the amount of requests made to upstream DNS
            // server (important for K8s infrastructure).
            .hickory_dns(true)
            .build()
            .unwrap();

        Self { config, reqwest }
    }

    /// Builds the request in a non-blocking fashion.
    ///
    /// This creates the request, adds internal headers, and invokes [`UpstreamRequest::build`]. The
    /// build is invoked in a non-blocking fashion internally, so it can be called from an
    /// asynchronous runtime.
    fn build_request(
        &self,
        request: &mut dyn UpstreamRequest,
    ) -> Result<reqwest::Request, UpstreamRequestError> {
        tokio::task::block_in_place(|| {
            let url = self
                .config
                .upstream_descriptor()
                .get_url(request.path().as_ref());

            let host_header = self
                .config
                .http_host_header()
                .unwrap_or_else(|| self.config.upstream_descriptor().host());

            let mut builder = RequestBuilder::reqwest(self.reqwest.request(request.method(), url));
            builder.header("Host", host_header.as_bytes());

            if request.set_relay_id() {
                if let Some(credentials) = self.config.credentials() {
                    builder.header("X-Sentry-Relay-Id", credentials.id.to_string());
                }
            }

            request.build(&mut builder)?;

            if let Some(payload) = request.sign() {
                let credentials = self
                    .config
                    .credentials()
                    .ok_or(UpstreamRequestError::NoCredentials)?;

                let signature = credentials.secret_key.sign(&payload);
                builder.header("X-Sentry-Relay-Signature", signature.as_bytes());
            }

            match builder.finish() {
                Ok(Request(client_request)) => Ok(client_request),
                Err(e) => Err(e.into()),
            }
        })
    }

    /// Handles an HTTP response returned from the upstream.
    ///
    /// If the response indicates success via 2XX status codes, `Ok(response)` is returned.
    /// Otherwise, the response is consumed and an error is returned. If `intercept_status_errors`
    /// is set to `true` on the request, depending on the status code and details provided in the
    /// payload, one of the following errors is returned:
    ///
    ///  1. `RateLimited` for a `429` status code.
    ///  2. `ResponseError` in all other cases, containing the status and details.
    async fn transform_response(
        &self,
        request: &dyn UpstreamRequest,
        response: Response,
    ) -> Result<Response, UpstreamRequestError> {
        let status = response.status();

        if !request.intercept_status_errors() || status.is_success() {
            return Ok(response);
        }

        let upstream_limits = if status == StatusCode::TOO_MANY_REQUESTS {
            let retry_after = response
                .get_header(header::RETRY_AFTER)
                .and_then(|v| std::str::from_utf8(v).ok());

            let rate_limits = response
                .get_all_headers(utils::RATE_LIMITS_HEADER)
                .iter()
                .filter_map(|v| std::str::from_utf8(v).ok())
                .join(", ");

            let upstream_limits = UpstreamRateLimits::new()
                .retry_after(retry_after)
                .rate_limits(rate_limits);

            Some(upstream_limits)
        } else {
            None
        };

        // At this point, we consume the Response. This means we need to consume the response
        // payload stream, regardless of the status code. Parsing the JSON body may fail, which is a
        // non-fatal failure as the upstream is not expected to always include a valid JSON
        // response.
        let json_result = response.json(self.config.max_api_payload_size()).await;

        if let Some(upstream_limits) = upstream_limits {
            Err(UpstreamRequestError::RateLimited(upstream_limits))
        } else {
            // Coerce the result into an empty `ApiErrorResponse` if parsing JSON did not succeed.
            let api_response = json_result.unwrap_or_default();
            Err(UpstreamRequestError::ResponseError(status, api_response))
        }
    }

    /// Builds and sends a request to the upstream, returning either a response or the error.
    pub async fn send(
        &self,
        request: &mut dyn UpstreamRequest,
    ) -> Result<Response, UpstreamRequestError> {
        request.configure(&self.config);
        let client_request = self.build_request(request)?;
        let response = self.reqwest.execute(client_request).await?;
        self.transform_response(request, Response(response)).await
    }

    /// Convenience method to send a query to the upstream and await the result.
    pub async fn send_query<T>(&self, query: T) -> Result<T::Response, UpstreamRequestError>
    where
        T: UpstreamQuery + 'static,
    {
        let (sender, receiver) = AsyncResponse::channel();

        let mut request = Box::new(UpstreamQueryRequest::new(query, sender));
        let result = self.send(request.as_mut()).await;
        request.respond(result).await;

        receiver
            .await
            .unwrap_or(Err(UpstreamRequestError::ChannelClosed))
    }
}

/// An upstream request enqueued in the [`UpstreamQueue`].
///
/// This is the primary type with which requests are passed around the service.
#[derive(Debug)]
struct Entry {
    /// The inner request.
    pub request: Box<dyn UpstreamRequest>,
    /// The number of retries.
    ///
    /// This starts with `0` and is incremented every time a request is placed back into the queue
    /// following a network error.
    pub retries: usize,
}

impl Entry {
    /// Creates a pristine queue `Entry`.
    pub fn new(request: Box<dyn UpstreamRequest>) -> Self {
        Self {
            request,
            retries: 0,
        }
    }
}

/// Queue utility for the [`UpstreamRelayService`].
///
/// Requests are queued and delivered according to their [`UpstreamRequest::priority`]. This queue
/// is synchronous and managed by the [`UpstreamBroker`].
#[derive(Debug)]
struct UpstreamQueue {
    /// High priority queue.
    high: VecDeque<Entry>,
    /// Low priority queue.
    low: VecDeque<Entry>,
    /// High priority retry queue.
    retry_high: VecDeque<Entry>,
    /// Low priority retry queue.
    retry_low: VecDeque<Entry>,
    /// Retries should not be dequeued before this instant.
    ///
    /// This retry increments by the constant `retry_after` instead of backoff,
    /// since it only kicks in for a short period of time before Relay gets into
    /// network outage mode (see [`IsNetworkOutage`]).
    next_retry: Instant,
    /// Time to wait before retrying another request from the retry queue.
    retry_interval: Duration,
}

impl UpstreamQueue {
    /// Creates an empty upstream queue.
    pub fn new(retry_interval: Duration) -> Self {
        Self {
            high: VecDeque::new(),
            low: VecDeque::new(),
            retry_high: VecDeque::new(),
            retry_low: VecDeque::new(),
            next_retry: Instant::now(),
            retry_interval,
        }
    }

    /// Returns the number of entries in the queue.
    pub fn len(&self) -> usize {
        self.high.len() + self.low.len() + self.retry_high.len() + self.retry_low.len()
    }

    /// Places an entry at the back of the queue.
    ///
    /// Since entries are dequeued in FIFO order, this entry will be dequeued
    /// last within its priority class; see
    /// [`dequeue`][`UpstreamQueue::dequeue`] for more details.
    pub fn enqueue(&mut self, entry: Entry) {
        let priority = entry.request.priority();
        match priority {
            RequestPriority::High => self.high.push_back(entry),
            RequestPriority::Low => self.low.push_back(entry),
        }
        relay_statsd::metric!(
            histogram(RelayHistograms::UpstreamMessageQueueSize) = self.len() as u64,
            priority = priority.name(),
            attempt = "first"
        );
    }

    /// Places an entry in the retry queue.
    ///
    /// Entries are dequeued by (1) high/low priority and (2) FIFO order.
    ///
    /// It also schedules the next retry time, based on the retry back off. The
    /// retry queue is not dequeued until the next retry has elapsed.
    pub fn retry(&mut self, entry: Entry) {
        let priority = entry.request.priority();
        match priority {
            RequestPriority::High => self.retry_high.push_back(entry),
            RequestPriority::Low => self.retry_low.push_back(entry),
        };

        self.next_retry = Instant::now() + self.retry_interval;

        relay_statsd::metric!(
            histogram(RelayHistograms::UpstreamMessageQueueSize) = self.len() as u64,
            priority = priority.name(),
            attempt = "retry"
        );
    }

    /// Dequeues the entry with highest priority.
    ///
    /// Highest priority entry is determined by (1) request priority and (2)
    /// retries first.
    pub fn dequeue(&mut self) -> Option<Entry> {
        let should_retry = self.next_retry <= Instant::now();

        if let Some(Some(entry)) = should_retry.then(|| self.retry_high.pop_front()) {
            Some(entry)
        } else if let Some(entry) = self.high.pop_front() {
            Some(entry)
        } else if let Some(Some(entry)) = should_retry.then(|| self.retry_low.pop_front()) {
            Some(entry)
        } else {
            self.low.pop_front()
        }
    }

    /// Starts retrying queued requests.
    pub fn trigger_retries(&mut self) {
        self.next_retry = Instant::now();
    }
}

/// Possible authentication states for Relay.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
enum AuthState {
    /// Relay is not authenticated and authentication has not started.
    Unknown,

    /// Relay is not authenticated and authentication is in progress.
    Registering,

    /// The connection is healthy and authenticated in managed mode.
    ///
    /// This state is also used as default for Relays that do not require authentication based on
    /// their configuration.
    Registered,

    /// Relay is authenticated and renewing the registration lease. During this process, Relay
    /// remains authenticated, unless an error occurs.
    Renewing,

    /// Authentication has been permanently denied by the Upstream. Do not attempt to retry.
    Denied,
}

impl AuthState {
    /// Returns the initial `AuthState` based on configuration.
    ///
    /// - Relays in managed mode require authentication. The state is set to `AuthState::Unknown`.
    /// - Other Relays do not require authentication. The state is set to `AuthState::Registered`.
    pub fn init(config: &Config) -> Self {
        match config.relay_mode() {
            RelayMode::Managed => AuthState::Unknown,
            _ => AuthState::Registered,
        }
    }

    /// Returns `true` if the state is considered authenticated.
    pub fn is_authenticated(self) -> bool {
        matches!(self, AuthState::Registered | AuthState::Renewing)
    }
}

/// Indicates whether an request was sent to the upstream.
#[derive(Clone, Copy, Debug)]
enum RequestOutcome {
    /// The request was dropped due to a network outage.
    Dropped,
    /// The request was received by the upstream.
    ///
    /// This does not automatically mean that the request was successfully accepted. It could also
    /// have been rate limited or rejected as invalid.
    Received,
}

/// Internal message of the upstream's [`UpstreamBroker`].
///
/// These messages are used to serialize state mutations to the broker's internals. They are emitted
/// by the auth monitor, connection monitor, and internal tasks for request handling.
#[derive(Debug)]
enum Action {
    /// A dropped needs to be retried.
    ///
    /// The entry is placed on the front of the [`UpstreamQueue`].
    Retry(Entry),
    /// Notifies completion of a request with a given outcome.
    ///
    /// Dropped request that need retries will additionally invoke the [`Retry`](Self::Retry)
    /// action.
    Complete(RequestOutcome),
    /// Previously lost connection has been regained.
    ///
    /// This message is delivered to the [`ConnectionMonitor`] instance.
    Connected,
    /// The auth monitor indicates a change in the authentication state.
    ///
    /// The new auth state is mirrored in an internal field for immediate access.
    UpdateAuth(AuthState),
}

type ActionTx = mpsc::UnboundedSender<Action>;

/// Service that establishes and maintains authentication.
///
/// In regular intervals, the service checks for authentication and notifies the upstream if the key
/// is no longer valid. This allows Sentry to reject registered Relays during runtim without
/// restarts.
///
/// The monitor updates subscribers via the an `Action` channel of all changes to the authentication
/// states.
#[derive(Debug)]
struct AuthMonitor {
    config: Arc<Config>,
    client: SharedClient,
    state: AuthState,
    tx: ActionTx,
}

impl AuthMonitor {
    /// Returns the interval at which this Relay should renew authentication.
    ///
    /// Returns `Some` if authentication should be retried. Returns `None` if authentication is
    /// permanent.
    fn renew_auth_interval(&self) -> Option<std::time::Duration> {
        if self.config.processing_enabled() {
            // processing relays do NOT re-authenticate
            None
        } else {
            // only relays that have a configured auth-interval reauthenticate
            self.config.http_auth_interval()
        }
    }

    /// Updates the monitor's internal state and subscribers.
    fn send_state(&mut self, state: AuthState) -> Result<(), UpstreamRequestError> {
        self.state = state;
        self.tx
            .send(Action::UpdateAuth(state))
            .map_err(|_| UpstreamRequestError::ChannelClosed)
    }

    /// Performs a single authentication pass.
    ///
    /// Authentication consists of an initial request, a challenge, and a signed response including
    /// the challenge. Throughout this sequence, the auth monitor transitions the authentication
    /// state. If authentication succeeds, the state is set to [`AuthState::Registered`] at the end.
    ///
    /// If any of the requests fail, this method returns an `Err` and leaves the last authentication
    /// state in place.
    async fn authenticate(
        &mut self,
        credentials: &Credentials,
    ) -> Result<(), UpstreamRequestError> {
        relay_log::info!(
            descriptor = %self.config.upstream_descriptor(),
            "registering with upstream"
        );

        self.send_state(if self.state.is_authenticated() {
            AuthState::Renewing
        } else {
            AuthState::Registering
        })?;

        let request = RegisterRequest::new(&credentials.id, &credentials.public_key);
        let challenge = self.client.send_query(request).await?;
        relay_log::debug!(token = challenge.token(), "got register challenge");

        let response = challenge.into_response();
        relay_log::debug!("sending register challenge response");
        self.client.send_query(response).await?;

        relay_log::info!("relay successfully registered with upstream");
        self.send_state(AuthState::Registered)?;

        Ok(())
    }

    /// Starts the authentication monitor's cycle.
    ///
    /// Authentication starts immediately and then enters a loop of recurring reauthentication until
    /// one of the following conditions is met:
    ///
    ///  - Authentication is not required based on the Relay's mode configuration.
    ///  - The upstream responded with a permanent rejection (auth denied).
    ///  - All subscibers have shut down and the action channel is closed.
    pub async fn run(mut self) {
        if self.config.relay_mode() != RelayMode::Managed {
            return;
        }

        let config = self.config.clone();
        let Some(credentials) = config.credentials() else {
            // This is checked during setup by `check_config` and should never happen.
            relay_log::error!("authentication called without credentials");
            return;
        };

        let mut backoff = RetryBackoff::new(self.config.http_max_retry_interval());

        loop {
            match self.authenticate(credentials).await {
                Ok(_) => {
                    backoff.reset();

                    match self.renew_auth_interval() {
                        Some(interval) => tokio::time::sleep(interval).await,
                        None => return,
                    }
                }
                Err(err) => {
                    if backoff.attempt() > 1 {
                        relay_log::error!(
                            error = &err as &dyn std::error::Error,
                            tags.attempts = backoff.attempt(),
                            "authentication encountered error",
                        );
                    }

                    // ChannelClosed indicates that there are no more listeners, so we stop
                    // authenticating.
                    if let UpstreamRequestError::ChannelClosed = err {
                        return;
                    }

                    if err.is_permanent_rejection() {
                        self.send_state(AuthState::Denied).ok();
                        return;
                    }

                    // If the authentication request fails due to any reason other than a network
                    // error, go back to `Registering` which indicates that this Relay is not
                    // authenticated, in case the state was `Renewing` before.
                    if !err.is_network_error() {
                        self.send_state(AuthState::Registering).ok();
                    }

                    // Even on network errors, retry authentication independently.
                    let backoff = backoff.next_backoff();
                    relay_log::debug!(
                        "scheduling authentication retry in {} seconds",
                        backoff.as_secs()
                    );
                    tokio::time::sleep(backoff).await;
                }
            };
        }
    }
}

/// Internal state of the [`ConnectionMonitor`].
#[derive(Debug)]
enum ConnectionState {
    /// The connection is healthy.
    Connected,

    /// Network errors have been observed during the grace period.
    ///
    /// The connection is still considered healthy and requests should be made to the upstream.
    Interrupted(Instant),

    /// The connection is interrupted and reconnection is in progress.
    ///
    /// If the task has finished, connection should be considered `Connected`.
    Reconnecting(tokio::task::JoinHandle<()>),
}

/// Maintains outage state of the connection to the upstream.
///
///  Use [`notify_error`](Self::notify_error) and [`reset_error`](Self::reset_error) to inform the
/// monitor of successful and failed requests. If errors persist throughout a grace period, the
/// monitor spawns a background task to re-establish connections. During this period,
/// [`is_stable`](Self::is_stable) returns `false` and no other requests should be made to the
/// upstream.
///
/// This state is synchronous and managed by the [`UpstreamBroker`].
#[derive(Debug)]
struct ConnectionMonitor {
    state: ConnectionState,
    client: SharedClient,
}

impl ConnectionMonitor {
    /// Creates a new `ConnectionMonitor` in connected state.
    pub fn new(client: SharedClient) -> Self {
        Self {
            state: ConnectionState::Connected,
            client,
        }
    }

    /// Resets `Reconnecting` if the connection task has completed.
    fn clean_state(&mut self) -> &ConnectionState {
        if let ConnectionState::Reconnecting(ref task) = self.state {
            if task.is_finished() {
                self.state = ConnectionState::Connected;
            }
        }

        &self.state
    }

    /// Returns `true` if the connection is not in outage state.
    pub fn is_stable(&mut self) -> bool {
        match self.clean_state() {
            ConnectionState::Connected => true,
            ConnectionState::Interrupted(_) => true,
            ConnectionState::Reconnecting(_) => false,
        }
    }

    /// Returns `true` if the connection is in outage state.
    pub fn is_outage(&mut self) -> bool {
        !self.is_stable()
    }

    /// Performs connection attempts with exponential backoff until successful.
    async fn connect(client: SharedClient, tx: ActionTx) {
        let mut backoff = RetryBackoff::new(client.config.http_max_retry_interval());

        loop {
            let next_backoff = backoff.next_backoff();
            relay_log::warn!("network outage, scheduling another check in {next_backoff:?}");

            tokio::time::sleep(next_backoff).await;
            match client.send(&mut GetHealthCheck).await {
                // All errors that are not connection errors are considered a successful attempt
                Err(e) if e.is_network_error() => continue,
                _ => break,
            }
        }

        tx.send(Action::Connected).ok();
    }

    /// Notifies the monitor of a request that resulted in a network error.
    ///
    /// This starts a grace period if not already started. If a prior grace period has been
    /// exceeded, the monitor spawns a background job to reestablish connection and notifies the
    /// given `return_tx` on success.
    ///
    /// This method does not block.
    pub fn notify_error(&mut self, return_tx: &ActionTx) {
        let now = Instant::now();

        let first_error = match self.clean_state() {
            ConnectionState::Connected => now,
            ConnectionState::Interrupted(first) => *first,
            ConnectionState::Reconnecting(_) => return,
        };

        self.state = ConnectionState::Interrupted(first_error);

        // Only take action if we exceeded the grace period.
        if first_error + self.client.config.http_outage_grace_period() <= now {
            let return_tx = return_tx.clone();
            let task = relay_system::spawn!(Self::connect(self.client.clone(), return_tx));
            self.state = ConnectionState::Reconnecting(task);
        }
    }

    /// Notifies the monitor of a request that was delivered to the upstream.
    ///
    /// Resets the outage grace period and aborts connect background tasks.
    pub fn reset_error(&mut self) {
        if let ConnectionState::Reconnecting(ref task) = self.state {
            task.abort();
        }

        self.state = ConnectionState::Connected;
    }
}

/// Main broker of the [`UpstreamRelayService`].
///
/// This handles incoming public messages, internal actions, and maintains the upstream queue.
#[derive(Debug)]
struct UpstreamBroker {
    client: SharedClient,
    queue: UpstreamQueue,
    auth_state: AuthState,
    conn: ConnectionMonitor,
    permits: usize,
    action_tx: ActionTx,
}

impl UpstreamBroker {
    /// Returns the next entry from the queue if the upstream is in a healthy state.
    ///
    /// This returns `None` in any of the following conditions:
    ///  - Maximum request concurrency has been reached. A slot will be reclaimed through
    ///    [`Action::Complete`].
    ///  - The connection is in outage state and all outgoing requests are suspended. Outage state
    ///    will be reset through [`Action::Connected`].
    ///  - Relay is not authenticated, including failed renewals. Auth state will be updated through
    ///    [`Action::UpdateAuth`].
    ///  - The request queue is empty. New requests will be added through [`SendRequest`] or
    ///    [`SendQuery`] in the main message loop.
    async fn next_request(&mut self) -> Option<Entry> {
        if self.permits == 0 || self.conn.is_outage() || !self.auth_state.is_authenticated() {
            return None;
        }

        let entry = self.queue.dequeue()?;
        self.permits -= 1;
        Some(entry)
    }

    /// Attempts to place a new request into the queue.
    ///
    /// If authentication is permanently denied, the request will be failed immediately. In all
    /// other cases, the request is enqueued and will wait for submission.
    async fn enqueue(&mut self, request: Box<dyn UpstreamRequest>) {
        if let AuthState::Denied = self.auth_state {
            // This respond is near-instant because it should just send the error into the request's
            // response channel. We do not expect that this blocks the broker.
            request.respond(Err(UpstreamRequestError::AuthDenied)).await;
        } else {
            self.queue.enqueue(Entry::new(request));
        }
    }

    /// Handler of the main message loop.
    async fn handle_message(&mut self, message: UpstreamRelay) {
        match message {
            UpstreamRelay::IsAuthenticated(_, sender) => {
                sender.send(self.auth_state.is_authenticated())
            }
            UpstreamRelay::IsNetworkOutage(_, sender) => sender.send(self.conn.is_outage()),
            UpstreamRelay::SendRequest(request) => self.enqueue(request).await,
        }
    }

    /// Spawns a request attempt.
    ///
    /// The request will run concurrently with other spawned requests and notify the action channel
    /// on completion.
    fn execute(&self, mut entry: Entry) {
        let client = self.client.clone();
        let action_tx = self.action_tx.clone();

        relay_system::spawn!(async move {
            let send_start = Instant::now();
            let result = client.send(entry.request.as_mut()).await;
            emit_response_metrics(send_start, &entry, &result);

            let status = match result {
                Err(ref err) if err.is_network_error() => RequestOutcome::Dropped,
                _ => RequestOutcome::Received,
            };

            match status {
                RequestOutcome::Dropped if entry.request.retry() => {
                    entry.retries += 1;
                    action_tx.send(Action::Retry(entry)).ok();
                }
                _ => entry.request.respond(result).await,
            }

            // Send an action back to the action channel of the broker, which will invoke
            // `handle_action`. This is to let the broker know in a synchronized fashion that the
            // request has finished and may need to be retried (above).
            action_tx.send(Action::Complete(status)).ok();
        });
    }

    /// Marks completion of a running request and reclaims its slot.
    fn complete(&mut self, status: RequestOutcome) {
        self.permits += 1;

        match status {
            RequestOutcome::Dropped => self.conn.notify_error(&self.action_tx),
            RequestOutcome::Received => {
                self.conn.reset_error();
                self.queue.trigger_retries();
            }
        }
    }

    /// Handler of the internal action channel.
    fn handle_action(&mut self, action: Action) {
        match action {
            Action::Retry(request) => self.queue.retry(request),
            Action::Complete(status) => self.complete(status),
            Action::Connected => self.conn.reset_error(),
            Action::UpdateAuth(state) => self.auth_state = state,
        }
    }
}

/// Implementation of the [`UpstreamRelay`] interface.
#[derive(Debug)]
pub struct UpstreamRelayService {
    config: Arc<Config>,
}

impl UpstreamRelayService {
    /// Creates a new `UpstreamRelay` instance.
    pub fn new(config: Arc<Config>) -> Self {
        // Broker and other actual components are implemented in the Service's `spawn_handler`.
        Self { config }
    }
}

impl Service for UpstreamRelayService {
    type Interface = UpstreamRelay;

    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
        let Self { config } = self;

        let client = SharedClient::build(config.clone());

        // Channel for serialized communication from the auth monitor, connection monitor, and
        // concurrent requests back to the broker.
        let (action_tx, mut action_rx) = mpsc::unbounded_channel();

        // Spawn a recurring background check for authentication. It terminates automatically if
        // authentication is not required or rejected.
        let auth = AuthMonitor {
            config: config.clone(),
            client: client.clone(),
            state: AuthState::Unknown,
            tx: action_tx.clone(),
        };
        relay_system::spawn!(auth.run());

        // Main broker that serializes public and internal messages, as well as maintains connection
        // and authentication state.
        let mut broker = UpstreamBroker {
            client: client.clone(),
            queue: UpstreamQueue::new(config.http_retry_delay()),
            auth_state: AuthState::init(&config),
            conn: ConnectionMonitor::new(client),
            permits: config.max_concurrent_requests(),
            action_tx,
        };

        loop {
            tokio::select! {
                biased;

                Some(action) = action_rx.recv() => broker.handle_action(action),
                Some(request) = broker.next_request() => broker.execute(request),
                Some(message) = rx.recv() => broker.handle_message(message).await,

                else => break,
            }
        }
    }
}