Skip to main content

objectstore_service/backend/
changelog.rs

1//! Change lifecycle tracking and durable write-ahead log.
2//!
3//! When a storage mutation spans both the high-volume (HV) and long-term (LT)
4//! backends, several non-atomic steps must happen in sequence: uploading to LT,
5//! committing a tombstone in HV via compare-and-swap, and cleaning up
6//! unreferenced blobs. A crash at any point can leave orphaned LT blobs.
7//!
8//! This module provides two layers of protection:
9//!
10//! 1. **In-process tracking** — [`ChangeGuard`] is an RAII guard that tracks
11//!    the current [`ChangePhase`] of an operation. When dropped, it spawns a
12//!    background task to clean up whichever blob is unreferenced based on the
13//!    phase reached before the drop. This handles normal errors and early
14//!    returns within a running process.
15//!
16//! 2. **Durable write-ahead log** — The [`ChangeLog`] trait records a
17//!    [`Change`] to durable storage *before* any LT side effects begin. If the
18//!    process crashes, a recovery scan reads outstanding entries and cleans up
19//!    orphaned blobs. Recovery is garbage collection — it never replays CAS
20//!    mutations or finishes incomplete operations.
21
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::{Arc, Mutex};
25use std::time::{Duration, SystemTime};
26
27use tokio_util::task::TaskTracker;
28use tokio_util::task::task_tracker::TaskTrackerToken;
29
30use crate::backend::common::{HighVolumeBackend, MultipartUploadBackend, TieredMetadata};
31use crate::error::Result;
32use crate::id::ObjectId;
33
34/// Initial delay for exponential backoff retries in background cleanup tasks.
35const INITIAL_BACKOFF: Duration = Duration::from_millis(100);
36/// Maximum delay for exponential backoff retries in background cleanup tasks.
37const MAX_BACKOFF: Duration = Duration::from_secs(30);
38
39/// Unique identifier for a change log entry.
40///
41/// Generated per-operation as a UUIDv7. In durable storage, scoped to the
42/// owning service instance (e.g., `~oplog/{instance_id}/{change_id}`).
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct ChangeId(uuid::Uuid);
45
46impl ChangeId {
47    /// Generates a new unique change ID.
48    #[allow(clippy::new_without_default)]
49    pub fn new() -> Self {
50        Self(uuid::Uuid::now_v7())
51    }
52}
53
54impl fmt::Display for ChangeId {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        self.0.fmt(f)
57    }
58}
59
60/// Describes the LT blobs involved in a multi-step storage change.
61///
62/// Every mutating flow maps to: "I may have written a `new` LT blob and I
63/// may be replacing an `old` LT blob." Recovery uses these fields to determine
64/// which blobs are orphaned by reading the current HV state.
65#[derive(Debug, Clone)]
66pub struct Change {
67    /// The logical object being mutated.
68    ///
69    /// Used by cleanup to query HV and determine which blob is currently referenced.
70    pub id: ObjectId,
71    /// The new LT blob written by this operation.
72    ///
73    /// Needs cleanup on failure (the CAS did not commit).
74    pub new: Option<ObjectId>,
75    /// The old LT blob being replaced.
76    ///
77    /// Needs cleanup on success (the CAS committed and the old blob is unreferenced).
78    pub old: Option<ObjectId>,
79    /// Earliest time at which this entry becomes eligible for cleanup.
80    ///
81    /// [`ChangeLog::scan`] filters out the entry, unless the deadline has passed.
82    pub cleanup_after: Option<SystemTime>,
83}
84
85/// Manager for multi-step storage changes, including backends and durable log.
86///
87/// Encapsulates the state and logic for recording changes, advancing their phases,
88/// and performing cleanup on drop. The `TieredStorage` backend holds an instance
89/// of this manager to use it for its multi-step operations.
90#[derive(Debug)]
91pub struct ChangeManager {
92    /// The backend for small objects (≤ 1 MiB).
93    pub(crate) high_volume: Box<dyn HighVolumeBackend>,
94    /// The backend for large objects (> 1 MiB).
95    pub(crate) long_term: Box<dyn MultipartUploadBackend>,
96    /// Durable write-ahead log for multi-step changes.
97    pub(crate) changelog: Box<dyn ChangeLog>,
98    /// Tracks outstanding background cleanup operations for graceful shutdown.
99    pub(crate) tracker: TaskTracker,
100}
101
102impl ChangeManager {
103    /// Creates a new `ChangeManager` with the given backends and changelog.
104    pub fn new(
105        high_volume: Box<dyn HighVolumeBackend>,
106        long_term: Box<dyn MultipartUploadBackend>,
107        changelog: Box<dyn ChangeLog>,
108    ) -> Arc<Self> {
109        Arc::new(Self {
110            high_volume,
111            long_term,
112            changelog,
113            tracker: TaskTracker::new(),
114        })
115    }
116
117    /// Records the change to the log and returns a guard.
118    ///
119    /// Generates a unique [`ChangeId`] and writes a durable log entry before
120    /// returning. The caller may proceed with LT side effects immediately after.
121    ///
122    /// When the [`ChangeGuard`] is dropped, a background process is spawned to
123    /// clean up any unreferenced objects in LT storage.
124    pub async fn record(self: Arc<Self>, change: Change) -> Result<ChangeGuard> {
125        let token = self.tracker.token();
126
127        let id = ChangeId::new();
128        self.changelog.record(&id, &change).await?;
129
130        let state = ChangeState {
131            id,
132            change,
133            phase: ChangePhase::Recorded,
134            manager: self.clone(),
135            _token: token,
136        };
137
138        Ok(ChangeGuard { state: Some(state) })
139    }
140
141    /// Records the change to the log and returns a guard in the `Assembling` state.
142    ///
143    /// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state.
144    /// Unlike other states, this guard does nothing on drop, leaving the burden of cleaning up to
145    /// the [`ChangeLog`].
146    pub async fn record_assembling(self: Arc<Self>, change: Change) -> Result<ChangeGuard> {
147        let token = self.tracker.token();
148
149        let id = ChangeId::new();
150        self.changelog.record(&id, &change).await?;
151
152        let state = ChangeState {
153            id,
154            change,
155            phase: ChangePhase::Assembling,
156            manager: self.clone(),
157            _token: token,
158        };
159
160        Ok(ChangeGuard { state: Some(state) })
161    }
162
163    /// Scans the changelog for outstanding entries and runs cleanup for each.
164    ///
165    /// Spawn this into a background task at startup to recover from any orphaned objects after a
166    /// crash. During normal operation, this should return an empty list and have no effect.
167    pub async fn recover(self: Arc<Self>) -> Result<()> {
168        // Hold one token for the duration of recovery to prevent premature shutdown.
169        let _token = self.tracker.token();
170
171        let entries =
172            self.changelog.scan().await.inspect_err(|e| {
173                objectstore_log::error!(!!e, "Failed to run changelog recovery")
174            })?;
175
176        // NB: Intentionally clean up sequentially to reduce load on the system.
177        for (id, change) in entries {
178            let state = ChangeState {
179                id,
180                change,
181                phase: ChangePhase::Recovered,
182                manager: self.clone(),
183                _token: self.tracker.token(),
184            };
185
186            state.cleanup().await;
187        }
188
189        Ok(())
190    }
191}
192
193/// Durable write-ahead log for multi-step storage changes.
194///
195/// Records in-progress changes that span both HV and LT backends so that
196/// recovery can identify and clean up orphaned LT blobs after crashes.
197/// The log is stored independently from the data backend (though it may
198/// share infrastructure) and is scoped per service instance.
199///
200/// Recovery is garbage collection — it reads HV state to determine which
201/// blobs are unreferenced and deletes them. It never replays CAS mutations
202/// or finishes incomplete operations.
203///
204/// Implementations handle instance identity, heartbeats, and key prefixing
205/// internally — callers interact only with entries.
206#[async_trait::async_trait]
207pub trait ChangeLog: fmt::Debug + Send + Sync {
208    /// Records a change before any side effects begin (write-ahead).
209    ///
210    /// Must be durable before returning — the caller will proceed with
211    /// LT writes immediately after.
212    async fn record(&self, id: &ChangeId, change: &Change) -> Result<()>;
213
214    /// Removes a completed change from the log.
215    ///
216    /// Called after all cleanup (LT blob deletion) is finished. Removing
217    /// a nonexistent entry is not an error (idempotent).
218    async fn remove(&self, id: &ChangeId) -> Result<()>;
219
220    /// Returns all outstanding changes eligible for recovery.
221    ///
222    /// During normal operation this returns only the calling instance's
223    /// entries. During recovery of a dead instance, the implementation
224    /// may return that instance's entries after the caller has claimed
225    /// ownership (via heartbeat CAS).
226    ///
227    /// The returned entries are unordered.
228    async fn scan(&self) -> Result<Vec<(ChangeId, Change)>>;
229}
230
231/// In-memory [`ChangeLog`] for tests and deployments without durable logging.
232///
233/// Stores entries in a `HashMap`. [`Clone`]-able so tests can hold a handle
234/// for direct inspection while the service owns a boxed copy.
235#[derive(Debug, Clone, Default)]
236pub struct InMemoryChangeLog {
237    entries: Arc<Mutex<HashMap<ChangeId, Change>>>,
238}
239
240#[async_trait::async_trait]
241impl ChangeLog for InMemoryChangeLog {
242    async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> {
243        let mut entries = self.entries.lock().expect("lock poisoned");
244        entries.insert(id.clone(), change.clone());
245        Ok(())
246    }
247
248    async fn remove(&self, id: &ChangeId) -> Result<()> {
249        let mut entries = self.entries.lock().expect("lock poisoned");
250        entries.remove(id);
251        Ok(())
252    }
253
254    async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
255        let now = SystemTime::now();
256        let entries = self.entries.lock().expect("lock poisoned");
257        let result = entries
258            .iter()
259            .filter(|(_, change)| match change.cleanup_after {
260                None => true,
261                Some(deadline) => now >= deadline,
262            })
263            .map(|(id, change)| (id.clone(), change.clone()))
264            .collect();
265        Ok(result)
266    }
267}
268
269#[cfg(test)]
270impl InMemoryChangeLog {
271    /// Sets [`Change::cleanup_after`] to the past for all entries, forcing them to be returned by a subsequent [`ChangeLog::scan`].
272    pub fn expire_all(&self) {
273        let mut entries = self.entries.lock().expect("lock poisoned");
274        for change in entries.values_mut() {
275            change.cleanup_after = Some(SystemTime::UNIX_EPOCH);
276        }
277    }
278}
279
280/// [`ChangeLog`] implementation that discards all entries.
281///
282/// Used as the default when no durable log is configured. Provides no
283/// crash-recovery guarantees — orphan cleanup relies entirely on in-process
284/// [`ChangeGuard`] drop logic.
285#[derive(Debug, Default)]
286pub struct NoopChangeLog;
287
288#[async_trait::async_trait]
289impl ChangeLog for NoopChangeLog {
290    async fn record(&self, _id: &ChangeId, _change: &Change) -> Result<()> {
291        Ok(())
292    }
293
294    async fn remove(&self, _id: &ChangeId) -> Result<()> {
295        Ok(())
296    }
297
298    async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
299        Ok(Vec::new())
300    }
301}
302
303/// Phase of a multi-step storage change.
304#[derive(Debug, PartialEq)]
305pub enum ChangePhase {
306    /// The change was recovered from changelog and the phase is unknown.
307    Recovered,
308    /// The change is recorded in the log and LT upload has started.
309    Recorded,
310    /// The LT blob originated from a multipart upload and is being assembled.
311    ///
312    /// Multipart upload completion can fail, and we want the client to be able to retry it
313    /// without the change cleanup process racing to delete the LT blob.
314    /// Therefore, cleanup of changes in this phase is deferred.
315    Assembling,
316    /// LT upload has succeeded and the tombstone is being updated.
317    Written,
318    /// The tombstone update failed due to a conflict.
319    Lost,
320    /// The tombstone update succeeded.
321    Updated,
322    /// Cleanup complete.
323    Completed,
324}
325
326impl ChangePhase {
327    /// Returns the phase corresponding to the outcome of a compare-and-write operation.
328    pub fn compare_and_write(succeeded: bool) -> Self {
329        if succeeded { Self::Updated } else { Self::Lost }
330    }
331}
332
333/// Internal state for a [`ChangeGuard`].
334///
335/// Logs an error if dropped in any phase other than `Completed`.
336#[derive(Debug)]
337struct ChangeState {
338    id: ChangeId,
339    change: Change,
340    phase: ChangePhase,
341    manager: Arc<ChangeManager>,
342    _token: TaskTrackerToken,
343}
344
345impl ChangeState {
346    /// Marks the operation as completed, preventing any cleanup on drop.
347    fn mark_completed(mut self) {
348        self.phase = ChangePhase::Completed;
349    }
350
351    /// Determines tombstone state and runs cleanup for unreferenced objects.
352    async fn cleanup(self) {
353        let current = match self.phase {
354            // For `Recovered`, we must first check the state of the tombstone.
355            ChangePhase::Recovered => self.read_tombstone().await,
356            ChangePhase::Recorded => self.change.old.clone(),
357            // For `Written`, the CAS outcome is unknown — read HV to determine it.
358            ChangePhase::Written => self.read_tombstone().await,
359            ChangePhase::Lost => self.change.old.clone(),
360            ChangePhase::Updated => self.change.new.clone(),
361            ChangePhase::Assembling | ChangePhase::Completed => return, // unreachable
362        };
363
364        if current != self.change.old
365            && let Some(ref old) = self.change.old
366        {
367            self.cleanup_lt(old).await;
368        }
369
370        if current != self.change.new
371            && let Some(ref new) = self.change.new
372        {
373            self.cleanup_lt(new).await;
374        }
375
376        self.cleanup_log().await;
377        self.mark_completed();
378    }
379
380    /// Reads the tombstone target for `id` from HV, retrying with exponential backoff on error.
381    ///
382    /// Returns `None` if the entry holds an inline object or is absent.
383    async fn read_tombstone(&self) -> Option<ObjectId> {
384        let mut delay = INITIAL_BACKOFF;
385        loop {
386            match self
387                .manager
388                .high_volume
389                .get_tiered_metadata(&self.change.id)
390                .await
391            {
392                Ok(TieredMetadata::Tombstone(t)) => return Some(t.target),
393                Ok(TieredMetadata::Object(_)) => return None,
394                Ok(TieredMetadata::NotFound) => return None,
395                Err(_) => {
396                    tokio::time::sleep(delay).await;
397                    delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
398                }
399            }
400        }
401    }
402
403    /// Deletes `target` from `lt`, retrying with exponential backoff until success.
404    async fn cleanup_lt(&self, target: &ObjectId) {
405        let mut delay = INITIAL_BACKOFF;
406        while self.manager.long_term.delete_object(target).await.is_err() {
407            tokio::time::sleep(delay).await;
408            delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
409        }
410    }
411
412    /// Removes this change's log entry, retrying with exponential backoff until success.
413    async fn cleanup_log(&self) {
414        let mut delay = INITIAL_BACKOFF;
415        while self.manager.changelog.remove(&self.id).await.is_err() {
416            tokio::time::sleep(delay).await;
417            delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF);
418        }
419    }
420}
421
422impl Drop for ChangeState {
423    fn drop(&mut self) {
424        match self.phase {
425            ChangePhase::Completed => {}
426            ChangePhase::Assembling => {
427                objectstore_log::warn!(
428                    change = ?self.change,
429                    "Operation dropped in Assembling state, cleanup deferred to ChangeLog recovery"
430                );
431            }
432            _ => {
433                objectstore_log::error!(
434                    change = ?self.change,
435                    phase = ?self.phase,
436                    "Operation dropped without completing cleanup"
437                );
438            }
439        }
440    }
441}
442
443/// RAII guard that tracks cleanup state for a multi-step storage change.
444///
445/// When dropped in a non-`Completed` phase, determines the LT blob to clean up
446/// and spawns a background task to delete it. If no tokio runtime is available
447/// (e.g., during shutdown), the drop logs an error instead of panicking.
448#[derive(Debug)]
449pub struct ChangeGuard {
450    state: Option<ChangeState>,
451}
452
453impl ChangeGuard {
454    /// Advances the operation to the given phase. Zero-cost, no I/O.
455    pub(crate) fn advance(&mut self, phase: ChangePhase) {
456        if let Some(ref mut state) = self.state {
457            state.phase = phase;
458        }
459    }
460}
461
462impl Drop for ChangeGuard {
463    fn drop(&mut self) {
464        if let Some(state) = self.state.take()
465            && state.phase != ChangePhase::Assembling
466            && state.phase != ChangePhase::Completed
467            && let Ok(handle) = tokio::runtime::Handle::try_current()
468        {
469            handle.spawn(state.cleanup());
470        }
471
472        // NB: Drop of `ChangeState` logs an error if cleanup is not scheduled.
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use objectstore_types::scope::{Scope, Scopes};
479
480    use super::*;
481    use crate::id::ObjectContext;
482
483    fn make_id(key: &str) -> ObjectId {
484        ObjectId::new(
485            ObjectContext {
486                usecase: "testing".into(),
487                scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]),
488            },
489            key.into(),
490        )
491    }
492
493    #[tokio::test]
494    async fn record_then_scan_returns_entry() {
495        let log = InMemoryChangeLog::default();
496        let id = ChangeId::new();
497        let change = Change {
498            id: make_id("object-key"),
499            new: Some(make_id("object-key/rev1")),
500            old: None,
501            cleanup_after: None,
502        };
503
504        log.record(&id, &change).await.unwrap();
505
506        let entries = log.scan().await.unwrap();
507        assert_eq!(entries.len(), 1);
508        assert_eq!(entries[0].0, id);
509    }
510
511    #[tokio::test]
512    async fn remove_then_scan_does_not_return_entry() {
513        let log = InMemoryChangeLog::default();
514        let id = ChangeId::new();
515        let change = Change {
516            id: make_id("object-key"),
517            new: None,
518            old: Some(make_id("object-key/rev1")),
519            cleanup_after: None,
520        };
521
522        log.record(&id, &change).await.unwrap();
523        log.remove(&id).await.unwrap();
524
525        let entries = log.scan().await.unwrap();
526        assert!(entries.is_empty());
527    }
528
529    #[tokio::test]
530    async fn remove_nonexistent_entry_is_not_an_error() {
531        let log = InMemoryChangeLog::default();
532        let id = ChangeId::new();
533
534        log.remove(&id).await.unwrap();
535    }
536
537    /// When the tokio runtime is dropped while an operation is in flight, the `ChangeGuard`
538    /// drops outside any runtime and cannot schedule cleanup. The log entry must persist
539    /// so that a future recovery pass can identify and clean up orphaned blobs.
540    #[test]
541    fn runtime_drop_while_pending_preserves_log_entry() {
542        use crate::backend::in_memory::InMemoryBackend;
543
544        let log = InMemoryChangeLog::default();
545        let manager = ChangeManager::new(
546            Box::new(InMemoryBackend::new("hv")),
547            Box::new(InMemoryBackend::new("lt")),
548            Box::new(log.clone()),
549        );
550
551        let guard = {
552            let rt = tokio::runtime::Runtime::new().unwrap();
553            // Simulate a mid-flight operation that recorded its change but did not complete.
554            rt.block_on(manager.record(Change {
555                id: make_id("crash-test"),
556                new: Some(make_id("crash-test/rev")),
557                old: None,
558                cleanup_after: None,
559            }))
560            .unwrap()
561            // Runtime drops here while `guard` is still alive outside it.
562        };
563
564        // Guard drops with no runtime active: cleanup cannot be scheduled.
565        drop(guard);
566
567        // Log entry must survive so recovery can clean up the orphaned blob.
568        let rt = tokio::runtime::Runtime::new().unwrap();
569        let entries = rt.block_on(log.scan()).unwrap();
570        assert_eq!(entries.len(), 1, "log entry must persist");
571    }
572
573    #[tokio::test]
574    async fn scan_filters_by_cleanup_after() {
575        let log = InMemoryChangeLog::default();
576
577        let ready_id = ChangeId::new();
578        log.record(
579            &ready_id,
580            &Change {
581                id: make_id("ready"),
582                new: Some(make_id("ready/rev")),
583                old: None,
584                cleanup_after: None,
585            },
586        )
587        .await
588        .unwrap();
589
590        let expired_id = ChangeId::new();
591        log.record(
592            &expired_id,
593            &Change {
594                id: make_id("expired"),
595                new: Some(make_id("expired/rev")),
596                old: None,
597                cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)),
598            },
599        )
600        .await
601        .unwrap();
602
603        let deferred_id = ChangeId::new();
604        log.record(
605            &deferred_id,
606            &Change {
607                id: make_id("deferred"),
608                new: Some(make_id("deferred/rev")),
609                old: None,
610                cleanup_after: Some(SystemTime::now() + Duration::from_hours(24)),
611            },
612        )
613        .await
614        .unwrap();
615
616        let entries = log.scan().await.unwrap();
617        assert_eq!(entries.len(), 2);
618
619        let ids: Vec<_> = entries.iter().map(|(id, _)| id).collect();
620        assert!(ids.contains(&&ready_id));
621        assert!(ids.contains(&&expired_id));
622    }
623}