objectstore_service/backend/
changelog.rs

1//! Change lifecycle tracking and durable write-ahead log.
2//!
3//! When a storage mutation spans both the high-volume (HV) and long-term (LT)
4//! backends, several non-atomic steps must happen in sequence: uploading to LT,
5//! committing a tombstone in HV via compare-and-swap, and cleaning up
6//! unreferenced blobs. A crash at any point can leave orphaned LT blobs.
7//!
8//! This module provides two layers of protection:
9//!
10//! 1. **In-process tracking** — [`ChangeGuard`] is an RAII guard that tracks
11//!    the current [`ChangePhase`] of an operation. When dropped, it spawns a
12//!    background task to clean up whichever blob is unreferenced based on the
13//!    phase reached before the drop. This handles normal errors and early
14//!    returns within a running process.
15//!
16//! 2. **Durable write-ahead log** — The [`ChangeLog`] trait records a
17//!    [`Change`] to durable storage *before* any LT side effects begin. If the
18//!    process crashes, a recovery scan reads outstanding entries and cleans up
19//!    orphaned blobs. Recovery is garbage collection — it never replays CAS
20//!    mutations or finishes incomplete operations.
21
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27use tokio_util::task::TaskTracker;
28use tokio_util::task::task_tracker::TaskTrackerToken;
29
30use crate::backend::common::{Backend, HighVolumeBackend, TieredMetadata};
31use crate::error::Result;
32use crate::id::ObjectId;
33
34/// Initial delay for exponential backoff retries in background cleanup tasks.
35const INITIAL_BACKOFF: Duration = Duration::from_millis(100);
36/// Maximum delay for exponential backoff retries in background cleanup tasks.
37const MAX_BACKOFF: Duration = Duration::from_secs(30);
38
39/// Unique identifier for a change log entry.
40///
41/// Generated per-operation as a UUIDv7. In durable storage, scoped to the
42/// owning service instance (e.g., `~oplog/{instance_id}/{change_id}`).
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct ChangeId(uuid::Uuid);
45
46impl ChangeId {
47    /// Generates a new unique change ID.
48    #[allow(clippy::new_without_default)]
49    pub fn new() -> Self {
50        Self(uuid::Uuid::now_v7())
51    }
52}
53
54impl fmt::Display for ChangeId {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        self.0.fmt(f)
57    }
58}
59
60/// Describes the LT blobs involved in a multi-step storage change.
61///
62/// Every mutating flow maps to: "I may have written a `new` LT blob and I
63/// may be replacing an `old` LT blob." Recovery uses these fields to determine
64/// which blobs are orphaned by reading the current HV state.
65#[derive(Debug, Clone)]
66pub struct Change {
67    /// The logical object being mutated.
68    ///
69    /// Used by cleanup to query HV and determine which blob is currently referenced.
70    pub id: ObjectId,
71    /// The new LT blob written by this operation.
72    ///
73    /// Needs cleanup on failure (the CAS did not commit).
74    pub new: Option<ObjectId>,
75    /// The old LT blob being replaced.
76    ///
77    /// Needs cleanup on success (the CAS committed and the old blob is unreferenced).
78    pub old: Option<ObjectId>,
79}
80
81/// Manager for multi-step storage changes, including backends and durable log.
82///
83/// Encapsulates the state and logic for recording changes, advancing their phases,
84/// and performing cleanup on drop. The `TieredStorage` backend holds an instance
85/// of this manager to use it for its multi-step operations.
86#[derive(Debug)]
87pub struct ChangeManager {
88    /// The backend for small objects (≤ 1 MiB).
89    pub(crate) high_volume: Box<dyn HighVolumeBackend>,
90    /// The backend for large objects (> 1 MiB).
91    pub(crate) long_term: Box<dyn Backend>,
92    /// Durable write-ahead log for multi-step changes.
93    pub(crate) changelog: Box<dyn ChangeLog>,
94    /// Tracks outstanding background cleanup operations for graceful shutdown.
95    pub(crate) tracker: TaskTracker,
96}
97
98impl ChangeManager {
99    /// Creates a new `ChangeManager` with the given backends and changelog.
100    pub fn new(
101        high_volume: Box<dyn HighVolumeBackend>,
102        long_term: Box<dyn Backend>,
103        changelog: Box<dyn ChangeLog>,
104    ) -> Arc<Self> {
105        Arc::new(Self {
106            high_volume,
107            long_term,
108            changelog,
109            tracker: TaskTracker::new(),
110        })
111    }
112
113    /// Records the change to the log and returns a guard.
114    ///
115    /// Generates a unique [`ChangeId`] and writes a durable log entry before
116    /// returning. The caller may proceed with LT side effects immediately after.
117    ///
118    /// When the [`ChangeGuard`] is dropped, a background process is spawned to
119    /// clean up any unreferenced objects in LT storage.
120    pub async fn record(self: Arc<Self>, change: Change) -> Result<ChangeGuard> {
121        let token = self.tracker.token();
122
123        let id = ChangeId::new();
124        self.changelog.record(&id, &change).await?;
125
126        let state = ChangeState {
127            id,
128            change,
129            phase: ChangePhase::Recorded,
130            manager: self.clone(),
131            _token: token,
132        };
133
134        Ok(ChangeGuard { state: Some(state) })
135    }
136
137    /// Scans the changelog for outstanding entries and runs cleanup for each.
138    ///
139    /// Spawn this into a background task at startup to recover from any orphaned objects after a
140    /// crash. During normal operation, this should return an empty list and have no effect.
141    pub async fn recover(self: Arc<Self>) -> Result<()> {
142        // Hold one token for the duration of recovery to prevent premature shutdown.
143        let _token = self.tracker.token();
144
145        let entries =
146            self.changelog.scan().await.inspect_err(|e| {
147                objectstore_log::error!(!!e, "Failed to run changelog recovery")
148            })?;
149
150        // NB: Intentionally clean up sequentially to reduce load on the system.
151        for (id, change) in entries {
152            let state = ChangeState {
153                id,
154                change,
155                phase: ChangePhase::Recovered,
156                manager: self.clone(),
157                _token: self.tracker.token(),
158            };
159
160            state.cleanup().await;
161        }
162
163        Ok(())
164    }
165}
166
167/// Durable write-ahead log for multi-step storage changes.
168///
169/// Records in-progress changes that span both HV and LT backends so that
170/// recovery can identify and clean up orphaned LT blobs after crashes.
171/// The log is stored independently from the data backend (though it may
172/// share infrastructure) and is scoped per service instance.
173///
174/// Recovery is garbage collection — it reads HV state to determine which
175/// blobs are unreferenced and deletes them. It never replays CAS mutations
176/// or finishes incomplete operations.
177///
178/// Implementations handle instance identity, heartbeats, and key prefixing
179/// internally — callers interact only with entries.
180#[async_trait::async_trait]
181pub trait ChangeLog: fmt::Debug + Send + Sync {
182    /// Records a change before any side effects begin (write-ahead).
183    ///
184    /// Must be durable before returning — the caller will proceed with
185    /// LT writes immediately after.
186    async fn record(&self, id: &ChangeId, change: &Change) -> Result<()>;
187
188    /// Removes a completed change from the log.
189    ///
190    /// Called after all cleanup (LT blob deletion) is finished. Removing
191    /// a nonexistent entry is not an error (idempotent).
192    async fn remove(&self, id: &ChangeId) -> Result<()>;
193
194    /// Returns all outstanding changes eligible for recovery.
195    ///
196    /// During normal operation this returns only the calling instance's
197    /// entries. During recovery of a dead instance, the implementation
198    /// may return that instance's entries after the caller has claimed
199    /// ownership (via heartbeat CAS).
200    ///
201    /// The returned entries are unordered.
202    async fn scan(&self) -> Result<Vec<(ChangeId, Change)>>;
203}
204
205/// In-memory [`ChangeLog`] for tests and deployments without durable logging.
206///
207/// Stores entries in a `HashMap`. [`Clone`]-able so tests can hold a handle
208/// for direct inspection while the service owns a boxed copy.
209#[derive(Debug, Clone, Default)]
210pub struct InMemoryChangeLog {
211    entries: Arc<Mutex<HashMap<ChangeId, Change>>>,
212}
213
214#[async_trait::async_trait]
215impl ChangeLog for InMemoryChangeLog {
216    async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> {
217        let mut entries = self.entries.lock().expect("lock poisoned");
218        entries.insert(id.clone(), change.clone());
219        Ok(())
220    }
221
222    async fn remove(&self, id: &ChangeId) -> Result<()> {
223        let mut entries = self.entries.lock().expect("lock poisoned");
224        entries.remove(id);
225        Ok(())
226    }
227
228    async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
229        let entries = self.entries.lock().expect("lock poisoned");
230        let result = entries
231            .iter()
232            .map(|(id, change)| (id.clone(), change.clone()))
233            .collect();
234        Ok(result)
235    }
236}
237
238/// [`ChangeLog`] implementation that discards all entries.
239///
240/// Used as the default when no durable log is configured. Provides no
241/// crash-recovery guarantees — orphan cleanup relies entirely on in-process
242/// [`ChangeGuard`] drop logic.
243#[derive(Debug, Default)]
244pub struct NoopChangeLog;
245
246#[async_trait::async_trait]
247impl ChangeLog for NoopChangeLog {
248    async fn record(&self, _id: &ChangeId, _change: &Change) -> Result<()> {
249        Ok(())
250    }
251
252    async fn remove(&self, _id: &ChangeId) -> Result<()> {
253        Ok(())
254    }
255
256    async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
257        Ok(Vec::new())
258    }
259}
260
261/// Phase of a multi-step storage change.
262#[derive(Debug, PartialEq)]
263pub enum ChangePhase {
264    /// The change was recovered from changelog and the phase is unknown.
265    Recovered,
266    /// The change is recorded in the log and LT upload has started.
267    Recorded,
268    /// LT upload has succeeded and the tombstone is being updated.
269    Written,
270    /// The tombstone update failed due to a conflict.
271    Lost,
272    /// The tombstone update succeeded.
273    Updated,
274    /// Cleanup complete.
275    Completed,
276}
277
278impl ChangePhase {
279    /// Returns the phase corresponding to the outcome of a compare-and-write operation.
280    pub fn compare_and_write(succeeded: bool) -> Self {
281        if succeeded { Self::Updated } else { Self::Lost }
282    }
283}
284
285/// Internal state for a [`ChangeGuard`].
286///
287/// Logs an error if dropped in any phase other than `Completed`.
288#[derive(Debug)]
289struct ChangeState {
290    id: ChangeId,
291    change: Change,
292    phase: ChangePhase,
293    manager: Arc<ChangeManager>,
294    _token: TaskTrackerToken,
295}
296
297impl ChangeState {
298    /// Marks the operation as completed, preventing any cleanup on drop.
299    fn mark_completed(mut self) {
300        self.phase = ChangePhase::Completed;
301    }
302
303    /// Determines tombstone state and runs cleanup for unreferenced objects.
304    async fn cleanup(self) {
305        let current = match self.phase {
306            // For `Recovered`, we must first check the state of the tombstone.
307            ChangePhase::Recovered => self.read_tombstone().await,
308            ChangePhase::Recorded => self.change.old.clone(),
309            // For `Written`, the CAS outcome is unknown — read HV to determine it.
310            ChangePhase::Written => self.read_tombstone().await,
311            ChangePhase::Lost => self.change.old.clone(),
312            ChangePhase::Updated => self.change.new.clone(),
313            ChangePhase::Completed => return, // unreachable
314        };
315
316        if current != self.change.old
317            && let Some(ref old) = self.change.old
318        {
319            self.cleanup_lt(old).await;
320        }
321
322        if current != self.change.new
323            && let Some(ref new) = self.change.new
324        {
325            self.cleanup_lt(new).await;
326        }
327
328        self.cleanup_log().await;
329        self.mark_completed();
330    }
331
332    /// Reads the tombstone target for `id` from HV, retrying with exponential backoff on error.
333    ///
334    /// Returns `None` if the entry holds an inline object or is absent.
335    async fn read_tombstone(&self) -> Option<ObjectId> {
336        let mut delay = INITIAL_BACKOFF;
337        loop {
338            match self
339                .manager
340                .high_volume
341                .get_tiered_metadata(&self.change.id)
342                .await
343            {
344                Ok(TieredMetadata::Tombstone(t)) => return Some(t.target),
345                Ok(TieredMetadata::Object(_)) => return None,
346                Ok(TieredMetadata::NotFound) => return None,
347                Err(_) => {
348                    tokio::time::sleep(delay).await;
349                    delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
350                }
351            }
352        }
353    }
354
355    /// Deletes `target` from `lt`, retrying with exponential backoff until success.
356    async fn cleanup_lt(&self, target: &ObjectId) {
357        let mut delay = INITIAL_BACKOFF;
358        while self.manager.long_term.delete_object(target).await.is_err() {
359            tokio::time::sleep(delay).await;
360            delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
361        }
362    }
363
364    /// Removes this change's log entry, retrying with exponential backoff until success.
365    async fn cleanup_log(&self) {
366        let mut delay = INITIAL_BACKOFF;
367        while self.manager.changelog.remove(&self.id).await.is_err() {
368            tokio::time::sleep(delay).await;
369            delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
370        }
371    }
372}
373
374impl Drop for ChangeState {
375    fn drop(&mut self) {
376        if self.phase != ChangePhase::Completed {
377            objectstore_log::error!(
378                change = ?self.change,
379                phase = ?self.phase,
380                "Operation dropped without completing cleanup"
381            );
382        }
383    }
384}
385
386/// RAII guard that tracks cleanup state for a multi-step storage change.
387///
388/// When dropped in a non-`Completed` phase, determines the LT blob to clean up
389/// and spawns a background task to delete it. If no tokio runtime is available
390/// (e.g., during shutdown), the drop logs an error instead of panicking.
391#[derive(Debug)]
392pub struct ChangeGuard {
393    state: Option<ChangeState>,
394}
395
396impl ChangeGuard {
397    /// Advances the operation to the given phase. Zero-cost, no I/O.
398    pub(crate) fn advance(&mut self, phase: ChangePhase) {
399        if let Some(ref mut state) = self.state {
400            state.phase = phase;
401        }
402    }
403}
404
405impl Drop for ChangeGuard {
406    fn drop(&mut self) {
407        if let Some(state) = self.state.take()
408            && state.phase != ChangePhase::Completed
409            && let Ok(handle) = tokio::runtime::Handle::try_current()
410        {
411            handle.spawn(state.cleanup());
412        }
413
414        // NB: Drop of `ChangeState` logs an error if cleanup is not scheduled.
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use objectstore_types::scope::{Scope, Scopes};
421
422    use super::*;
423    use crate::id::ObjectContext;
424
425    fn make_id(key: &str) -> ObjectId {
426        ObjectId::new(
427            ObjectContext {
428                usecase: "testing".into(),
429                scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
430            },
431            key.into(),
432        )
433    }
434
435    #[tokio::test]
436    async fn record_then_scan_returns_entry() {
437        let log = InMemoryChangeLog::default();
438        let id = ChangeId::new();
439        let change = Change {
440            id: make_id("object-key"),
441            new: Some(make_id("object-key/rev1")),
442            old: None,
443        };
444
445        log.record(&id, &change).await.unwrap();
446
447        let entries = log.scan().await.unwrap();
448        assert_eq!(entries.len(), 1);
449        assert_eq!(entries[0].0, id);
450    }
451
452    #[tokio::test]
453    async fn remove_then_scan_does_not_return_entry() {
454        let log = InMemoryChangeLog::default();
455        let id = ChangeId::new();
456        let change = Change {
457            id: make_id("object-key"),
458            new: None,
459            old: Some(make_id("object-key/rev1")),
460        };
461
462        log.record(&id, &change).await.unwrap();
463        log.remove(&id).await.unwrap();
464
465        let entries = log.scan().await.unwrap();
466        assert!(entries.is_empty());
467    }
468
469    #[tokio::test]
470    async fn remove_nonexistent_entry_is_not_an_error() {
471        let log = InMemoryChangeLog::default();
472        let id = ChangeId::new();
473
474        log.remove(&id).await.unwrap();
475    }
476
477    /// When the tokio runtime is dropped while an operation is in flight, the `ChangeGuard`
478    /// drops outside any runtime and cannot schedule cleanup. The log entry must persist
479    /// so that a future recovery pass can identify and clean up orphaned blobs.
480    #[test]
481    fn runtime_drop_while_pending_preserves_log_entry() {
482        use crate::backend::in_memory::InMemoryBackend;
483
484        let log = InMemoryChangeLog::default();
485        let manager = ChangeManager::new(
486            Box::new(InMemoryBackend::new("hv")),
487            Box::new(InMemoryBackend::new("lt")),
488            Box::new(log.clone()),
489        );
490
491        let guard = {
492            let rt = tokio::runtime::Runtime::new().unwrap();
493            // Simulate a mid-flight operation that recorded its change but did not complete.
494            rt.block_on(manager.record(Change {
495                id: make_id("crash-test"),
496                new: Some(make_id("crash-test/rev")),
497                old: None,
498            }))
499            .unwrap()
500            // Runtime drops here while `guard` is still alive outside it.
501        };
502
503        // Guard drops with no runtime active: cleanup cannot be scheduled.
504        drop(guard);
505
506        // Log entry must survive so recovery can clean up the orphaned blob.
507        let rt = tokio::runtime::Runtime::new().unwrap();
508        let entries = rt.block_on(log.scan()).unwrap();
509        assert_eq!(entries.len(), 1, "log entry must persist");
510    }
511}