objectstore_service/backend/changelog.rs
1//! Change lifecycle tracking and durable write-ahead log.
2//!
3//! When a storage mutation spans both the high-volume (HV) and long-term (LT)
4//! backends, several non-atomic steps must happen in sequence: uploading to LT,
5//! committing a tombstone in HV via compare-and-swap, and cleaning up
6//! unreferenced blobs. A crash at any point can leave orphaned LT blobs.
7//!
8//! This module provides two layers of protection:
9//!
10//! 1. **In-process tracking** — [`ChangeGuard`] is an RAII guard that tracks
11//! the current [`ChangePhase`] of an operation. When dropped, it spawns a
12//! background task to clean up whichever blob is unreferenced based on the
13//! phase reached before the drop. This handles normal errors and early
14//! returns within a running process.
15//!
16//! 2. **Durable write-ahead log** — The [`ChangeLog`] trait records a
17//! [`Change`] to durable storage *before* any LT side effects begin. If the
18//! process crashes, a recovery scan reads outstanding entries and cleans up
19//! orphaned blobs. Recovery is garbage collection — it never replays CAS
20//! mutations or finishes incomplete operations.
21
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27use tokio_util::task::TaskTracker;
28use tokio_util::task::task_tracker::TaskTrackerToken;
29
30use crate::backend::common::{Backend, HighVolumeBackend, TieredMetadata};
31use crate::error::Result;
32use crate::id::ObjectId;
33
34/// Initial delay for exponential backoff retries in background cleanup tasks.
35const INITIAL_BACKOFF: Duration = Duration::from_millis(100);
36/// Maximum delay for exponential backoff retries in background cleanup tasks.
37const MAX_BACKOFF: Duration = Duration::from_secs(30);
38
39/// Unique identifier for a change log entry.
40///
41/// Generated per-operation as a UUIDv7. In durable storage, scoped to the
42/// owning service instance (e.g., `~oplog/{instance_id}/{change_id}`).
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct ChangeId(uuid::Uuid);
45
46impl ChangeId {
47 /// Generates a new unique change ID.
48 #[allow(clippy::new_without_default)]
49 pub fn new() -> Self {
50 Self(uuid::Uuid::now_v7())
51 }
52}
53
54impl fmt::Display for ChangeId {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 self.0.fmt(f)
57 }
58}
59
60/// Describes the LT blobs involved in a multi-step storage change.
61///
62/// Every mutating flow maps to: "I may have written a `new` LT blob and I
63/// may be replacing an `old` LT blob." Recovery uses these fields to determine
64/// which blobs are orphaned by reading the current HV state.
65#[derive(Debug, Clone)]
66pub struct Change {
67 /// The logical object being mutated.
68 ///
69 /// Used by cleanup to query HV and determine which blob is currently referenced.
70 pub id: ObjectId,
71 /// The new LT blob written by this operation.
72 ///
73 /// Needs cleanup on failure (the CAS did not commit).
74 pub new: Option<ObjectId>,
75 /// The old LT blob being replaced.
76 ///
77 /// Needs cleanup on success (the CAS committed and the old blob is unreferenced).
78 pub old: Option<ObjectId>,
79}
80
81/// Manager for multi-step storage changes, including backends and durable log.
82///
83/// Encapsulates the state and logic for recording changes, advancing their phases,
84/// and performing cleanup on drop. The `TieredStorage` backend holds an instance
85/// of this manager to use it for its multi-step operations.
86#[derive(Debug)]
87pub struct ChangeManager {
88 /// The backend for small objects (≤ 1 MiB).
89 pub(crate) high_volume: Box<dyn HighVolumeBackend>,
90 /// The backend for large objects (> 1 MiB).
91 pub(crate) long_term: Box<dyn Backend>,
92 /// Durable write-ahead log for multi-step changes.
93 pub(crate) changelog: Box<dyn ChangeLog>,
94 /// Tracks outstanding background cleanup operations for graceful shutdown.
95 pub(crate) tracker: TaskTracker,
96}
97
98impl ChangeManager {
99 /// Creates a new `ChangeManager` with the given backends and changelog.
100 pub fn new(
101 high_volume: Box<dyn HighVolumeBackend>,
102 long_term: Box<dyn Backend>,
103 changelog: Box<dyn ChangeLog>,
104 ) -> Arc<Self> {
105 Arc::new(Self {
106 high_volume,
107 long_term,
108 changelog,
109 tracker: TaskTracker::new(),
110 })
111 }
112
113 /// Records the change to the log and returns a guard.
114 ///
115 /// Generates a unique [`ChangeId`] and writes a durable log entry before
116 /// returning. The caller may proceed with LT side effects immediately after.
117 ///
118 /// When the [`ChangeGuard`] is dropped, a background process is spawned to
119 /// clean up any unreferenced objects in LT storage.
120 pub async fn record(self: Arc<Self>, change: Change) -> Result<ChangeGuard> {
121 let token = self.tracker.token();
122
123 let id = ChangeId::new();
124 self.changelog.record(&id, &change).await?;
125
126 let state = ChangeState {
127 id,
128 change,
129 phase: ChangePhase::Recorded,
130 manager: self.clone(),
131 _token: token,
132 };
133
134 Ok(ChangeGuard { state: Some(state) })
135 }
136
137 /// Scans the changelog for outstanding entries and runs cleanup for each.
138 ///
139 /// Spawn this into a background task at startup to recover from any orphaned objects after a
140 /// crash. During normal operation, this should return an empty list and have no effect.
141 pub async fn recover(self: Arc<Self>) -> Result<()> {
142 // Hold one token for the duration of recovery to prevent premature shutdown.
143 let _token = self.tracker.token();
144
145 let entries =
146 self.changelog.scan().await.inspect_err(|e| {
147 objectstore_log::error!(!!e, "Failed to run changelog recovery")
148 })?;
149
150 // NB: Intentionally clean up sequentially to reduce load on the system.
151 for (id, change) in entries {
152 let state = ChangeState {
153 id,
154 change,
155 phase: ChangePhase::Recovered,
156 manager: self.clone(),
157 _token: self.tracker.token(),
158 };
159
160 state.cleanup().await;
161 }
162
163 Ok(())
164 }
165}
166
167/// Durable write-ahead log for multi-step storage changes.
168///
169/// Records in-progress changes that span both HV and LT backends so that
170/// recovery can identify and clean up orphaned LT blobs after crashes.
171/// The log is stored independently from the data backend (though it may
172/// share infrastructure) and is scoped per service instance.
173///
174/// Recovery is garbage collection — it reads HV state to determine which
175/// blobs are unreferenced and deletes them. It never replays CAS mutations
176/// or finishes incomplete operations.
177///
178/// Implementations handle instance identity, heartbeats, and key prefixing
179/// internally — callers interact only with entries.
180#[async_trait::async_trait]
181pub trait ChangeLog: fmt::Debug + Send + Sync {
182 /// Records a change before any side effects begin (write-ahead).
183 ///
184 /// Must be durable before returning — the caller will proceed with
185 /// LT writes immediately after.
186 async fn record(&self, id: &ChangeId, change: &Change) -> Result<()>;
187
188 /// Removes a completed change from the log.
189 ///
190 /// Called after all cleanup (LT blob deletion) is finished. Removing
191 /// a nonexistent entry is not an error (idempotent).
192 async fn remove(&self, id: &ChangeId) -> Result<()>;
193
194 /// Returns all outstanding changes eligible for recovery.
195 ///
196 /// During normal operation this returns only the calling instance's
197 /// entries. During recovery of a dead instance, the implementation
198 /// may return that instance's entries after the caller has claimed
199 /// ownership (via heartbeat CAS).
200 ///
201 /// The returned entries are unordered.
202 async fn scan(&self) -> Result<Vec<(ChangeId, Change)>>;
203}
204
205/// In-memory [`ChangeLog`] for tests and deployments without durable logging.
206///
207/// Stores entries in a `HashMap`. [`Clone`]-able so tests can hold a handle
208/// for direct inspection while the service owns a boxed copy.
209#[derive(Debug, Clone, Default)]
210pub struct InMemoryChangeLog {
211 entries: Arc<Mutex<HashMap<ChangeId, Change>>>,
212}
213
214#[async_trait::async_trait]
215impl ChangeLog for InMemoryChangeLog {
216 async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> {
217 let mut entries = self.entries.lock().expect("lock poisoned");
218 entries.insert(id.clone(), change.clone());
219 Ok(())
220 }
221
222 async fn remove(&self, id: &ChangeId) -> Result<()> {
223 let mut entries = self.entries.lock().expect("lock poisoned");
224 entries.remove(id);
225 Ok(())
226 }
227
228 async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
229 let entries = self.entries.lock().expect("lock poisoned");
230 let result = entries
231 .iter()
232 .map(|(id, change)| (id.clone(), change.clone()))
233 .collect();
234 Ok(result)
235 }
236}
237
238/// [`ChangeLog`] implementation that discards all entries.
239///
240/// Used as the default when no durable log is configured. Provides no
241/// crash-recovery guarantees — orphan cleanup relies entirely on in-process
242/// [`ChangeGuard`] drop logic.
243#[derive(Debug, Default)]
244pub struct NoopChangeLog;
245
246#[async_trait::async_trait]
247impl ChangeLog for NoopChangeLog {
248 async fn record(&self, _id: &ChangeId, _change: &Change) -> Result<()> {
249 Ok(())
250 }
251
252 async fn remove(&self, _id: &ChangeId) -> Result<()> {
253 Ok(())
254 }
255
256 async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
257 Ok(Vec::new())
258 }
259}
260
261/// Phase of a multi-step storage change.
262#[derive(Debug, PartialEq)]
263pub enum ChangePhase {
264 /// The change was recovered from changelog and the phase is unknown.
265 Recovered,
266 /// The change is recorded in the log and LT upload has started.
267 Recorded,
268 /// LT upload has succeeded and the tombstone is being updated.
269 Written,
270 /// The tombstone update failed due to a conflict.
271 Lost,
272 /// The tombstone update succeeded.
273 Updated,
274 /// Cleanup complete.
275 Completed,
276}
277
278impl ChangePhase {
279 /// Returns the phase corresponding to the outcome of a compare-and-write operation.
280 pub fn compare_and_write(succeeded: bool) -> Self {
281 if succeeded { Self::Updated } else { Self::Lost }
282 }
283}
284
285/// Internal state for a [`ChangeGuard`].
286///
287/// Logs an error if dropped in any phase other than `Completed`.
288#[derive(Debug)]
289struct ChangeState {
290 id: ChangeId,
291 change: Change,
292 phase: ChangePhase,
293 manager: Arc<ChangeManager>,
294 _token: TaskTrackerToken,
295}
296
297impl ChangeState {
298 /// Marks the operation as completed, preventing any cleanup on drop.
299 fn mark_completed(mut self) {
300 self.phase = ChangePhase::Completed;
301 }
302
303 /// Determines tombstone state and runs cleanup for unreferenced objects.
304 async fn cleanup(self) {
305 let current = match self.phase {
306 // For `Recovered`, we must first check the state of the tombstone.
307 ChangePhase::Recovered => self.read_tombstone().await,
308 ChangePhase::Recorded => self.change.old.clone(),
309 // For `Written`, the CAS outcome is unknown — read HV to determine it.
310 ChangePhase::Written => self.read_tombstone().await,
311 ChangePhase::Lost => self.change.old.clone(),
312 ChangePhase::Updated => self.change.new.clone(),
313 ChangePhase::Completed => return, // unreachable
314 };
315
316 if current != self.change.old
317 && let Some(ref old) = self.change.old
318 {
319 self.cleanup_lt(old).await;
320 }
321
322 if current != self.change.new
323 && let Some(ref new) = self.change.new
324 {
325 self.cleanup_lt(new).await;
326 }
327
328 self.cleanup_log().await;
329 self.mark_completed();
330 }
331
332 /// Reads the tombstone target for `id` from HV, retrying with exponential backoff on error.
333 ///
334 /// Returns `None` if the entry holds an inline object or is absent.
335 async fn read_tombstone(&self) -> Option<ObjectId> {
336 let mut delay = INITIAL_BACKOFF;
337 loop {
338 match self
339 .manager
340 .high_volume
341 .get_tiered_metadata(&self.change.id)
342 .await
343 {
344 Ok(TieredMetadata::Tombstone(t)) => return Some(t.target),
345 Ok(TieredMetadata::Object(_)) => return None,
346 Ok(TieredMetadata::NotFound) => return None,
347 Err(_) => {
348 tokio::time::sleep(delay).await;
349 delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
350 }
351 }
352 }
353 }
354
355 /// Deletes `target` from `lt`, retrying with exponential backoff until success.
356 async fn cleanup_lt(&self, target: &ObjectId) {
357 let mut delay = INITIAL_BACKOFF;
358 while self.manager.long_term.delete_object(target).await.is_err() {
359 tokio::time::sleep(delay).await;
360 delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
361 }
362 }
363
364 /// Removes this change's log entry, retrying with exponential backoff until success.
365 async fn cleanup_log(&self) {
366 let mut delay = INITIAL_BACKOFF;
367 while self.manager.changelog.remove(&self.id).await.is_err() {
368 tokio::time::sleep(delay).await;
369 delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
370 }
371 }
372}
373
374impl Drop for ChangeState {
375 fn drop(&mut self) {
376 if self.phase != ChangePhase::Completed {
377 objectstore_log::error!(
378 change = ?self.change,
379 phase = ?self.phase,
380 "Operation dropped without completing cleanup"
381 );
382 }
383 }
384}
385
386/// RAII guard that tracks cleanup state for a multi-step storage change.
387///
388/// When dropped in a non-`Completed` phase, determines the LT blob to clean up
389/// and spawns a background task to delete it. If no tokio runtime is available
390/// (e.g., during shutdown), the drop logs an error instead of panicking.
391#[derive(Debug)]
392pub struct ChangeGuard {
393 state: Option<ChangeState>,
394}
395
396impl ChangeGuard {
397 /// Advances the operation to the given phase. Zero-cost, no I/O.
398 pub(crate) fn advance(&mut self, phase: ChangePhase) {
399 if let Some(ref mut state) = self.state {
400 state.phase = phase;
401 }
402 }
403}
404
405impl Drop for ChangeGuard {
406 fn drop(&mut self) {
407 if let Some(state) = self.state.take()
408 && state.phase != ChangePhase::Completed
409 && let Ok(handle) = tokio::runtime::Handle::try_current()
410 {
411 handle.spawn(state.cleanup());
412 }
413
414 // NB: Drop of `ChangeState` logs an error if cleanup is not scheduled.
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use objectstore_types::scope::{Scope, Scopes};
421
422 use super::*;
423 use crate::id::ObjectContext;
424
425 fn make_id(key: &str) -> ObjectId {
426 ObjectId::new(
427 ObjectContext {
428 usecase: "testing".into(),
429 scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
430 },
431 key.into(),
432 )
433 }
434
435 #[tokio::test]
436 async fn record_then_scan_returns_entry() {
437 let log = InMemoryChangeLog::default();
438 let id = ChangeId::new();
439 let change = Change {
440 id: make_id("object-key"),
441 new: Some(make_id("object-key/rev1")),
442 old: None,
443 };
444
445 log.record(&id, &change).await.unwrap();
446
447 let entries = log.scan().await.unwrap();
448 assert_eq!(entries.len(), 1);
449 assert_eq!(entries[0].0, id);
450 }
451
452 #[tokio::test]
453 async fn remove_then_scan_does_not_return_entry() {
454 let log = InMemoryChangeLog::default();
455 let id = ChangeId::new();
456 let change = Change {
457 id: make_id("object-key"),
458 new: None,
459 old: Some(make_id("object-key/rev1")),
460 };
461
462 log.record(&id, &change).await.unwrap();
463 log.remove(&id).await.unwrap();
464
465 let entries = log.scan().await.unwrap();
466 assert!(entries.is_empty());
467 }
468
469 #[tokio::test]
470 async fn remove_nonexistent_entry_is_not_an_error() {
471 let log = InMemoryChangeLog::default();
472 let id = ChangeId::new();
473
474 log.remove(&id).await.unwrap();
475 }
476
477 /// When the tokio runtime is dropped while an operation is in flight, the `ChangeGuard`
478 /// drops outside any runtime and cannot schedule cleanup. The log entry must persist
479 /// so that a future recovery pass can identify and clean up orphaned blobs.
480 #[test]
481 fn runtime_drop_while_pending_preserves_log_entry() {
482 use crate::backend::in_memory::InMemoryBackend;
483
484 let log = InMemoryChangeLog::default();
485 let manager = ChangeManager::new(
486 Box::new(InMemoryBackend::new("hv")),
487 Box::new(InMemoryBackend::new("lt")),
488 Box::new(log.clone()),
489 );
490
491 let guard = {
492 let rt = tokio::runtime::Runtime::new().unwrap();
493 // Simulate a mid-flight operation that recorded its change but did not complete.
494 rt.block_on(manager.record(Change {
495 id: make_id("crash-test"),
496 new: Some(make_id("crash-test/rev")),
497 old: None,
498 }))
499 .unwrap()
500 // Runtime drops here while `guard` is still alive outside it.
501 };
502
503 // Guard drops with no runtime active: cleanup cannot be scheduled.
504 drop(guard);
505
506 // Log entry must survive so recovery can clean up the orphaned blob.
507 let rt = tokio::runtime::Runtime::new().unwrap();
508 let entries = rt.block_on(log.scan()).unwrap();
509 assert_eq!(entries.len(), 1, "log entry must persist");
510 }
511}