objectstore_service/backend/changelog.rs
1//! Change lifecycle tracking and durable write-ahead log.
2//!
3//! When a storage mutation spans both the high-volume (HV) and long-term (LT)
4//! backends, several non-atomic steps must happen in sequence: uploading to LT,
5//! committing a tombstone in HV via compare-and-swap, and cleaning up
6//! unreferenced blobs. A crash at any point can leave orphaned LT blobs.
7//!
8//! This module provides two layers of protection:
9//!
10//! 1. **In-process tracking** — [`ChangeGuard`] is an RAII guard that tracks
11//! the current [`ChangePhase`] of an operation. When dropped, it spawns a
12//! background task to clean up whichever blob is unreferenced based on the
13//! phase reached before the drop. This handles normal errors and early
14//! returns within a running process.
15//!
16//! 2. **Durable write-ahead log** — The [`ChangeLog`] trait records a
17//! [`Change`] to durable storage *before* any LT side effects begin. If the
18//! process crashes, a recovery scan reads outstanding entries and cleans up
19//! orphaned blobs. Recovery is garbage collection — it never replays CAS
20//! mutations or finishes incomplete operations.
21
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::{Arc, Mutex};
25use std::time::{Duration, SystemTime};
26
27use tokio_util::task::TaskTracker;
28use tokio_util::task::task_tracker::TaskTrackerToken;
29
30use crate::backend::common::{HighVolumeBackend, MultipartUploadBackend, TieredMetadata};
31use crate::error::Result;
32use crate::id::ObjectId;
33
34/// Initial delay for exponential backoff retries in background cleanup tasks.
35const INITIAL_BACKOFF: Duration = Duration::from_millis(100);
36/// Maximum delay for exponential backoff retries in background cleanup tasks.
37const MAX_BACKOFF: Duration = Duration::from_secs(30);
38
39/// Unique identifier for a change log entry.
40///
41/// Generated per-operation as a UUIDv7. In durable storage, scoped to the
42/// owning service instance (e.g., `~oplog/{instance_id}/{change_id}`).
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct ChangeId(uuid::Uuid);
45
46impl ChangeId {
47 /// Generates a new unique change ID.
48 #[allow(clippy::new_without_default)]
49 pub fn new() -> Self {
50 Self(uuid::Uuid::now_v7())
51 }
52}
53
54impl fmt::Display for ChangeId {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 self.0.fmt(f)
57 }
58}
59
60/// Describes the LT blobs involved in a multi-step storage change.
61///
62/// Every mutating flow maps to: "I may have written a `new` LT blob and I
63/// may be replacing an `old` LT blob." Recovery uses these fields to determine
64/// which blobs are orphaned by reading the current HV state.
65#[derive(Debug, Clone)]
66pub struct Change {
67 /// The logical object being mutated.
68 ///
69 /// Used by cleanup to query HV and determine which blob is currently referenced.
70 pub id: ObjectId,
71 /// The new LT blob written by this operation.
72 ///
73 /// Needs cleanup on failure (the CAS did not commit).
74 pub new: Option<ObjectId>,
75 /// The old LT blob being replaced.
76 ///
77 /// Needs cleanup on success (the CAS committed and the old blob is unreferenced).
78 pub old: Option<ObjectId>,
79 /// Earliest time at which this entry becomes eligible for cleanup.
80 ///
81 /// [`ChangeLog::scan`] filters out the entry, unless the deadline has passed.
82 pub cleanup_after: Option<SystemTime>,
83}
84
85/// Manager for multi-step storage changes, including backends and durable log.
86///
87/// Encapsulates the state and logic for recording changes, advancing their phases,
88/// and performing cleanup on drop. The `TieredStorage` backend holds an instance
89/// of this manager to use it for its multi-step operations.
90#[derive(Debug)]
91pub struct ChangeManager {
92 /// The backend for small objects (≤ 1 MiB).
93 pub(crate) high_volume: Box<dyn HighVolumeBackend>,
94 /// The backend for large objects (> 1 MiB).
95 pub(crate) long_term: Box<dyn MultipartUploadBackend>,
96 /// Durable write-ahead log for multi-step changes.
97 pub(crate) changelog: Box<dyn ChangeLog>,
98 /// Tracks outstanding background cleanup operations for graceful shutdown.
99 pub(crate) tracker: TaskTracker,
100}
101
102impl ChangeManager {
103 /// Creates a new `ChangeManager` with the given backends and changelog.
104 pub fn new(
105 high_volume: Box<dyn HighVolumeBackend>,
106 long_term: Box<dyn MultipartUploadBackend>,
107 changelog: Box<dyn ChangeLog>,
108 ) -> Arc<Self> {
109 Arc::new(Self {
110 high_volume,
111 long_term,
112 changelog,
113 tracker: TaskTracker::new(),
114 })
115 }
116
117 /// Records the change to the log and returns a guard.
118 ///
119 /// Generates a unique [`ChangeId`] and writes a durable log entry before
120 /// returning. The caller may proceed with LT side effects immediately after.
121 ///
122 /// When the [`ChangeGuard`] is dropped, a background process is spawned to
123 /// clean up any unreferenced objects in LT storage.
124 pub async fn record(self: Arc<Self>, change: Change) -> Result<ChangeGuard> {
125 let token = self.tracker.token();
126
127 let id = ChangeId::new();
128 self.changelog.record(&id, &change).await?;
129
130 let state = ChangeState {
131 id,
132 change,
133 phase: ChangePhase::Recorded,
134 manager: self.clone(),
135 _token: token,
136 };
137
138 Ok(ChangeGuard { state: Some(state) })
139 }
140
141 /// Records the change to the log and returns a guard in the `Assembling` state.
142 ///
143 /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state.
144 /// Unlike other states, this guard does nothing on drop, leaving the burden of cleaning up to
145 /// the [`ChangeLog`].
146 pub async fn record_assembling(self: Arc<Self>, change: Change) -> Result<ChangeGuard> {
147 let token = self.tracker.token();
148
149 let id = ChangeId::new();
150 self.changelog.record(&id, &change).await?;
151
152 let state = ChangeState {
153 id,
154 change,
155 phase: ChangePhase::Assembling,
156 manager: self.clone(),
157 _token: token,
158 };
159
160 Ok(ChangeGuard { state: Some(state) })
161 }
162
163 /// Scans the changelog for outstanding entries and runs cleanup for each.
164 ///
165 /// Spawn this into a background task at startup to recover from any orphaned objects after a
166 /// crash. During normal operation, this should return an empty list and have no effect.
167 pub async fn recover(self: Arc<Self>) -> Result<()> {
168 // Hold one token for the duration of recovery to prevent premature shutdown.
169 let _token = self.tracker.token();
170
171 let entries =
172 self.changelog.scan().await.inspect_err(|e| {
173 objectstore_log::error!(!!e, "Failed to run changelog recovery")
174 })?;
175
176 // NB: Intentionally clean up sequentially to reduce load on the system.
177 for (id, change) in entries {
178 let state = ChangeState {
179 id,
180 change,
181 phase: ChangePhase::Recovered,
182 manager: self.clone(),
183 _token: self.tracker.token(),
184 };
185
186 state.cleanup().await;
187 }
188
189 Ok(())
190 }
191}
192
193/// Durable write-ahead log for multi-step storage changes.
194///
195/// Records in-progress changes that span both HV and LT backends so that
196/// recovery can identify and clean up orphaned LT blobs after crashes.
197/// The log is stored independently from the data backend (though it may
198/// share infrastructure) and is scoped per service instance.
199///
200/// Recovery is garbage collection — it reads HV state to determine which
201/// blobs are unreferenced and deletes them. It never replays CAS mutations
202/// or finishes incomplete operations.
203///
204/// Implementations handle instance identity, heartbeats, and key prefixing
205/// internally — callers interact only with entries.
206#[async_trait::async_trait]
207pub trait ChangeLog: fmt::Debug + Send + Sync {
208 /// Records a change before any side effects begin (write-ahead).
209 ///
210 /// Must be durable before returning — the caller will proceed with
211 /// LT writes immediately after.
212 async fn record(&self, id: &ChangeId, change: &Change) -> Result<()>;
213
214 /// Removes a completed change from the log.
215 ///
216 /// Called after all cleanup (LT blob deletion) is finished. Removing
217 /// a nonexistent entry is not an error (idempotent).
218 async fn remove(&self, id: &ChangeId) -> Result<()>;
219
220 /// Returns all outstanding changes eligible for recovery.
221 ///
222 /// During normal operation this returns only the calling instance's
223 /// entries. During recovery of a dead instance, the implementation
224 /// may return that instance's entries after the caller has claimed
225 /// ownership (via heartbeat CAS).
226 ///
227 /// The returned entries are unordered.
228 async fn scan(&self) -> Result<Vec<(ChangeId, Change)>>;
229}
230
231/// In-memory [`ChangeLog`] for tests and deployments without durable logging.
232///
233/// Stores entries in a `HashMap`. [`Clone`]-able so tests can hold a handle
234/// for direct inspection while the service owns a boxed copy.
235#[derive(Debug, Clone, Default)]
236pub struct InMemoryChangeLog {
237 entries: Arc<Mutex<HashMap<ChangeId, Change>>>,
238}
239
240#[async_trait::async_trait]
241impl ChangeLog for InMemoryChangeLog {
242 async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> {
243 let mut entries = self.entries.lock().expect("lock poisoned");
244 entries.insert(id.clone(), change.clone());
245 Ok(())
246 }
247
248 async fn remove(&self, id: &ChangeId) -> Result<()> {
249 let mut entries = self.entries.lock().expect("lock poisoned");
250 entries.remove(id);
251 Ok(())
252 }
253
254 async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
255 let now = SystemTime::now();
256 let entries = self.entries.lock().expect("lock poisoned");
257 let result = entries
258 .iter()
259 .filter(|(_, change)| match change.cleanup_after {
260 None => true,
261 Some(deadline) => now >= deadline,
262 })
263 .map(|(id, change)| (id.clone(), change.clone()))
264 .collect();
265 Ok(result)
266 }
267}
268
269#[cfg(test)]
270impl InMemoryChangeLog {
271 /// Sets [`Change::cleanup_after`] to the past for all entries, forcing them to be returned by a subsequent [`ChangeLog::scan`].
272 pub fn expire_all(&self) {
273 let mut entries = self.entries.lock().expect("lock poisoned");
274 for change in entries.values_mut() {
275 change.cleanup_after = Some(SystemTime::UNIX_EPOCH);
276 }
277 }
278}
279
280/// [`ChangeLog`] implementation that discards all entries.
281///
282/// Used as the default when no durable log is configured. Provides no
283/// crash-recovery guarantees — orphan cleanup relies entirely on in-process
284/// [`ChangeGuard`] drop logic.
285#[derive(Debug, Default)]
286pub struct NoopChangeLog;
287
288#[async_trait::async_trait]
289impl ChangeLog for NoopChangeLog {
290 async fn record(&self, _id: &ChangeId, _change: &Change) -> Result<()> {
291 Ok(())
292 }
293
294 async fn remove(&self, _id: &ChangeId) -> Result<()> {
295 Ok(())
296 }
297
298 async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
299 Ok(Vec::new())
300 }
301}
302
303/// Phase of a multi-step storage change.
304#[derive(Debug, PartialEq)]
305pub enum ChangePhase {
306 /// The change was recovered from changelog and the phase is unknown.
307 Recovered,
308 /// The change is recorded in the log and LT upload has started.
309 Recorded,
310 /// The LT blob originated from a multipart upload and is being assembled.
311 ///
312 /// Multipart upload completion can fail, and we want the client to be able to retry it
313 /// without the change cleanup process racing to delete the LT blob.
314 /// Therefore, cleanup of changes in this phase is deferred.
315 Assembling,
316 /// LT upload has succeeded and the tombstone is being updated.
317 Written,
318 /// The tombstone update failed due to a conflict.
319 Lost,
320 /// The tombstone update succeeded.
321 Updated,
322 /// Cleanup complete.
323 Completed,
324}
325
326impl ChangePhase {
327 /// Returns the phase corresponding to the outcome of a compare-and-write operation.
328 pub fn compare_and_write(succeeded: bool) -> Self {
329 if succeeded { Self::Updated } else { Self::Lost }
330 }
331}
332
333/// Internal state for a [`ChangeGuard`].
334///
335/// Logs an error if dropped in any phase other than `Completed`.
336#[derive(Debug)]
337struct ChangeState {
338 id: ChangeId,
339 change: Change,
340 phase: ChangePhase,
341 manager: Arc<ChangeManager>,
342 _token: TaskTrackerToken,
343}
344
345impl ChangeState {
346 /// Marks the operation as completed, preventing any cleanup on drop.
347 fn mark_completed(mut self) {
348 self.phase = ChangePhase::Completed;
349 }
350
351 /// Determines tombstone state and runs cleanup for unreferenced objects.
352 async fn cleanup(self) {
353 let current = match self.phase {
354 // For `Recovered`, we must first check the state of the tombstone.
355 ChangePhase::Recovered => self.read_tombstone().await,
356 ChangePhase::Recorded => self.change.old.clone(),
357 // For `Written`, the CAS outcome is unknown — read HV to determine it.
358 ChangePhase::Written => self.read_tombstone().await,
359 ChangePhase::Lost => self.change.old.clone(),
360 ChangePhase::Updated => self.change.new.clone(),
361 ChangePhase::Assembling | ChangePhase::Completed => return, // unreachable
362 };
363
364 if current != self.change.old
365 && let Some(ref old) = self.change.old
366 {
367 self.cleanup_lt(old).await;
368 }
369
370 if current != self.change.new
371 && let Some(ref new) = self.change.new
372 {
373 self.cleanup_lt(new).await;
374 }
375
376 self.cleanup_log().await;
377 self.mark_completed();
378 }
379
380 /// Reads the tombstone target for `id` from HV, retrying with exponential backoff on error.
381 ///
382 /// Returns `None` if the entry holds an inline object or is absent.
383 async fn read_tombstone(&self) -> Option<ObjectId> {
384 let mut delay = INITIAL_BACKOFF;
385 loop {
386 match self
387 .manager
388 .high_volume
389 .get_tiered_metadata(&self.change.id)
390 .await
391 {
392 Ok(TieredMetadata::Tombstone(t)) => return Some(t.target),
393 Ok(TieredMetadata::Object(_)) => return None,
394 Ok(TieredMetadata::NotFound) => return None,
395 Err(_) => {
396 tokio::time::sleep(delay).await;
397 delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
398 }
399 }
400 }
401 }
402
403 /// Deletes `target` from `lt`, retrying with exponential backoff until success.
404 async fn cleanup_lt(&self, target: &ObjectId) {
405 let mut delay = INITIAL_BACKOFF;
406 while self.manager.long_term.delete_object(target).await.is_err() {
407 tokio::time::sleep(delay).await;
408 delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
409 }
410 }
411
412 /// Removes this change's log entry, retrying with exponential backoff until success.
413 async fn cleanup_log(&self) {
414 let mut delay = INITIAL_BACKOFF;
415 while self.manager.changelog.remove(&self.id).await.is_err() {
416 tokio::time::sleep(delay).await;
417 delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
418 }
419 }
420}
421
422impl Drop for ChangeState {
423 fn drop(&mut self) {
424 match self.phase {
425 ChangePhase::Completed => {}
426 ChangePhase::Assembling => {
427 objectstore_log::warn!(
428 change = ?self.change,
429 "Operation dropped in Assembling state, cleanup deferred to ChangeLog recovery"
430 );
431 }
432 _ => {
433 objectstore_log::error!(
434 change = ?self.change,
435 phase = ?self.phase,
436 "Operation dropped without completing cleanup"
437 );
438 }
439 }
440 }
441}
442
443/// RAII guard that tracks cleanup state for a multi-step storage change.
444///
445/// When dropped in a non-`Completed` phase, determines the LT blob to clean up
446/// and spawns a background task to delete it. If no tokio runtime is available
447/// (e.g., during shutdown), the drop logs an error instead of panicking.
448#[derive(Debug)]
449pub struct ChangeGuard {
450 state: Option<ChangeState>,
451}
452
453impl ChangeGuard {
454 /// Advances the operation to the given phase. Zero-cost, no I/O.
455 pub(crate) fn advance(&mut self, phase: ChangePhase) {
456 if let Some(ref mut state) = self.state {
457 state.phase = phase;
458 }
459 }
460}
461
462impl Drop for ChangeGuard {
463 fn drop(&mut self) {
464 if let Some(state) = self.state.take()
465 && state.phase != ChangePhase::Assembling
466 && state.phase != ChangePhase::Completed
467 && let Ok(handle) = tokio::runtime::Handle::try_current()
468 {
469 handle.spawn(state.cleanup());
470 }
471
472 // NB: Drop of `ChangeState` logs an error if cleanup is not scheduled.
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use objectstore_types::scope::{Scope, Scopes};
479
480 use super::*;
481 use crate::id::ObjectContext;
482
483 fn make_id(key: &str) -> ObjectId {
484 ObjectId::new(
485 ObjectContext {
486 usecase: "testing".into(),
487 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
488 },
489 key.into(),
490 )
491 }
492
493 #[tokio::test]
494 async fn record_then_scan_returns_entry() {
495 let log = InMemoryChangeLog::default();
496 let id = ChangeId::new();
497 let change = Change {
498 id: make_id("object-key"),
499 new: Some(make_id("object-key/rev1")),
500 old: None,
501 cleanup_after: None,
502 };
503
504 log.record(&id, &change).await.unwrap();
505
506 let entries = log.scan().await.unwrap();
507 assert_eq!(entries.len(), 1);
508 assert_eq!(entries[0].0, id);
509 }
510
511 #[tokio::test]
512 async fn remove_then_scan_does_not_return_entry() {
513 let log = InMemoryChangeLog::default();
514 let id = ChangeId::new();
515 let change = Change {
516 id: make_id("object-key"),
517 new: None,
518 old: Some(make_id("object-key/rev1")),
519 cleanup_after: None,
520 };
521
522 log.record(&id, &change).await.unwrap();
523 log.remove(&id).await.unwrap();
524
525 let entries = log.scan().await.unwrap();
526 assert!(entries.is_empty());
527 }
528
529 #[tokio::test]
530 async fn remove_nonexistent_entry_is_not_an_error() {
531 let log = InMemoryChangeLog::default();
532 let id = ChangeId::new();
533
534 log.remove(&id).await.unwrap();
535 }
536
537 /// When the tokio runtime is dropped while an operation is in flight, the `ChangeGuard`
538 /// drops outside any runtime and cannot schedule cleanup. The log entry must persist
539 /// so that a future recovery pass can identify and clean up orphaned blobs.
540 #[test]
541 fn runtime_drop_while_pending_preserves_log_entry() {
542 use crate::backend::in_memory::InMemoryBackend;
543
544 let log = InMemoryChangeLog::default();
545 let manager = ChangeManager::new(
546 Box::new(InMemoryBackend::new("hv")),
547 Box::new(InMemoryBackend::new("lt")),
548 Box::new(log.clone()),
549 );
550
551 let guard = {
552 let rt = tokio::runtime::Runtime::new().unwrap();
553 // Simulate a mid-flight operation that recorded its change but did not complete.
554 rt.block_on(manager.record(Change {
555 id: make_id("crash-test"),
556 new: Some(make_id("crash-test/rev")),
557 old: None,
558 cleanup_after: None,
559 }))
560 .unwrap()
561 // Runtime drops here while `guard` is still alive outside it.
562 };
563
564 // Guard drops with no runtime active: cleanup cannot be scheduled.
565 drop(guard);
566
567 // Log entry must survive so recovery can clean up the orphaned blob.
568 let rt = tokio::runtime::Runtime::new().unwrap();
569 let entries = rt.block_on(log.scan()).unwrap();
570 assert_eq!(entries.len(), 1, "log entry must persist");
571 }
572
573 #[tokio::test]
574 async fn scan_filters_by_cleanup_after() {
575 let log = InMemoryChangeLog::default();
576
577 let ready_id = ChangeId::new();
578 log.record(
579 &ready_id,
580 &Change {
581 id: make_id("ready"),
582 new: Some(make_id("ready/rev")),
583 old: None,
584 cleanup_after: None,
585 },
586 )
587 .await
588 .unwrap();
589
590 let expired_id = ChangeId::new();
591 log.record(
592 &expired_id,
593 &Change {
594 id: make_id("expired"),
595 new: Some(make_id("expired/rev")),
596 old: None,
597 cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)),
598 },
599 )
600 .await
601 .unwrap();
602
603 let deferred_id = ChangeId::new();
604 log.record(
605 &deferred_id,
606 &Change {
607 id: make_id("deferred"),
608 new: Some(make_id("deferred/rev")),
609 old: None,
610 cleanup_after: Some(SystemTime::now() + Duration::from_hours(24)),
611 },
612 )
613 .await
614 .unwrap();
615
616 let entries = log.scan().await.unwrap();
617 assert_eq!(entries.len(), 2);
618
619 let ids: Vec<_> = entries.iter().map(|(id, _)| id).collect();
620 assert!(ids.contains(&&ready_id));
621 assert!(ids.contains(&&expired_id));
622 }
623}