Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1440,9 +1440,8 @@ Status TableMetadataBuilder::Impl::RemoveSnapshots(
if (ids_to_remove.contains(snapshot_id)) {
snapshots_by_id_.erase(snapshot_id);
snapshot_ids_to_remove.push_back(snapshot_id);
// FIXME: implement statistics removal and uncomment below
// ICEBERG_RETURN_UNEXPECTED(RemoveStatistics(snapshot_id));
// ICEBERG_RETURN_UNEXPECTED(RemovePartitionStatistics(snapshot_id));
ICEBERG_RETURN_UNEXPECTED(RemoveStatistics(snapshot_id));
ICEBERG_RETURN_UNEXPECTED(RemovePartitionStatistics(snapshot_id));
} else {
retained_snapshots.push_back(std::move(snapshot));
}
Expand Down
508 changes: 508 additions & 0 deletions src/iceberg/test/expire_snapshots_test.cc

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,13 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
auto commit_result =
ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, updates);

Result<const TableMetadata*> finalize_result =
commit_result.has_value()
? Result<const TableMetadata*>(commit_result.value()->metadata().get())
: std::unexpected(commit_result.error());

for (const auto& update : pending_updates_) {
std::ignore = update->Finalize(commit_result.has_value()
? std::nullopt
: std::make_optional(commit_result.error()));
std::ignore = update->Finalize(finalize_result);
}

ICEBERG_RETURN_UNEXPECTED(commit_result);
Expand Down
332 changes: 332 additions & 0 deletions src/iceberg/update/expire_snapshots.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
#include <cstdint>
#include <iterator>
#include <memory>
#include <optional>
#include <string>
#include <unordered_set>
#include <vector>

#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/transaction.h"
Expand All @@ -37,6 +43,296 @@

namespace iceberg {

namespace {

Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
const TableMetadata& metadata) {
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
ICEBERG_ASSIGN_OR_RAISE(auto spec,
metadata.PartitionSpecById(manifest.partition_spec_id));
return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec));
}

/// \brief Abstract strategy for cleaning up files after snapshot expiration.
class FileCleanupStrategy {
public:
FileCleanupStrategy(std::shared_ptr<FileIO> file_io,
std::function<void(const std::string&)> delete_func)
: file_io_(std::move(file_io)), delete_func_(std::move(delete_func)) {}

virtual ~FileCleanupStrategy() = default;

/// \brief Clean up files that are only reachable by expired snapshots.
///
/// \param metadata_before_expiration Table metadata before expiration.
/// \param metadata_after_expiration Table metadata after expiration.
/// \param expired_snapshot_ids Snapshot IDs that were expired during this operation.
/// \param level Controls which types of files are eligible for deletion.
virtual Status CleanFiles(const TableMetadata& metadata_before_expiration,
const TableMetadata& metadata_after_expiration,
const std::unordered_set<int64_t>& expired_snapshot_ids,
CleanupLevel level) = 0;

protected:
/// \brief Delete a single file
void DeleteFile(const std::string& path) {
try {
if (delete_func_) {
delete_func_(path);
} else {
std::ignore = file_io_->DeleteFile(path);
}
} catch (...) {
/// TODO(shangxinli): add retry
}
}

/// TODO(shangxinli): Add bulk deletion
void DeleteFiles(const std::unordered_set<std::string>& paths) {
for (const auto& path : paths) {
DeleteFile(path);
}
}

bool HasAnyStatisticsFiles(const TableMetadata& metadata) const {
return !metadata.statistics.empty() || !metadata.partition_statistics.empty();
}

std::unordered_set<std::string> StatisticsFilesToDelete(
const TableMetadata& metadata_before_expiration,
const TableMetadata& metadata_after_expiration) const {
std::unordered_set<std::string> stats_files_to_delete;
std::unordered_set<std::string> live_stats_paths;

for (const auto& stats_file : metadata_after_expiration.statistics) {
if (stats_file) {
live_stats_paths.insert(stats_file->path);
}
}

for (const auto& part_stats_file : metadata_after_expiration.partition_statistics) {
if (part_stats_file) {
live_stats_paths.insert(part_stats_file->path);
}
}

for (const auto& stats_file : metadata_before_expiration.statistics) {
if (stats_file && !live_stats_paths.contains(stats_file->path)) {
stats_files_to_delete.insert(stats_file->path);
}
}

for (const auto& part_stats_file : metadata_before_expiration.partition_statistics) {
if (part_stats_file && !live_stats_paths.contains(part_stats_file->path)) {
stats_files_to_delete.insert(part_stats_file->path);
}
}

return stats_files_to_delete;
}

std::shared_ptr<FileIO> file_io_;
std::function<void(const std::string&)> delete_func_;
};

/// \brief File cleanup strategy that determines safe deletions via full reachability.
///
/// Collects manifests from all expired and retained snapshots, prunes candidates
/// still referenced by retained snapshots, then deletes orphaned manifests, data
/// files, and manifest lists.
///
/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support.
class ReachableFileCleanup : public FileCleanupStrategy {
public:
using FileCleanupStrategy::FileCleanupStrategy;

Status CleanFiles(const TableMetadata& metadata_before_expiration,
const TableMetadata& metadata_after_expiration,
const std::unordered_set<int64_t>& expired_snapshot_ids,
CleanupLevel level) override {
std::unordered_set<int64_t> retained_snapshot_ids;
for (const auto& snapshot : metadata_after_expiration.snapshots) {
if (snapshot) {
retained_snapshot_ids.insert(snapshot->snapshot_id);
}
}

std::unordered_set<std::string> manifest_lists_to_delete;
for (int64_t snapshot_id : expired_snapshot_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
metadata_before_expiration.SnapshotById(snapshot_id));
if (snapshot && !snapshot->manifest_list.empty()) {
manifest_lists_to_delete.insert(snapshot->manifest_list);
}
}

ICEBERG_ASSIGN_OR_RAISE(
auto deletion_candidates,
ReadManifests(metadata_before_expiration, expired_snapshot_ids));

if (!deletion_candidates.empty()) {
std::unordered_set<ManifestFile> current_manifests;
ICEBERG_ASSIGN_OR_RAISE(
auto manifests_to_delete,
PruneReferencedManifests(metadata_after_expiration, retained_snapshot_ids,
std::move(deletion_candidates), current_manifests));

if (!manifests_to_delete.empty()) {
if (level == CleanupLevel::kAll) {
// Deleting data files
auto data_files_to_delete = FindDataFilesToDelete(
metadata_after_expiration, manifests_to_delete, current_manifests);
DeleteFiles(data_files_to_delete);
}

// Deleting manifest files
DeleteFiles(ManifestPaths(manifests_to_delete));
}
}

// Deleting manifest-list files
DeleteFiles(manifest_lists_to_delete);

// Deleting statistics files
if (HasAnyStatisticsFiles(metadata_before_expiration) ||
HasAnyStatisticsFiles(metadata_after_expiration)) {
DeleteFiles(
StatisticsFilesToDelete(metadata_before_expiration, metadata_after_expiration));
}

return {};
}

private:
/// \brief Collect manifests for a snapshot into manifests.
Result<std::unordered_set<ManifestFile>> ReadManifestsForSnapshot(
const TableMetadata& metadata, int64_t snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, metadata.SnapshotById(snapshot_id));

SnapshotCache snapshot_cache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests, snapshot_cache.Manifests(file_io_));

std::unordered_set<ManifestFile> manifests;
for (const auto& manifest : snapshot_manifests) {
manifests.insert(manifest);
}
return manifests;
}

/// \brief Collect manifests for a set of snapshots.
Result<std::unordered_set<ManifestFile>> ReadManifests(
const TableMetadata& metadata, const std::unordered_set<int64_t>& snapshot_ids) {
std::unordered_set<ManifestFile> manifests;
for (int64_t snapshot_id : snapshot_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests,
ReadManifestsForSnapshot(metadata, snapshot_id));
manifests.insert(snapshot_manifests.begin(), snapshot_manifests.end());
}
return manifests;
}

/// \brief Remove manifests still referenced by retained snapshots.
Result<std::unordered_set<ManifestFile>> PruneReferencedManifests(
const TableMetadata& metadata,
const std::unordered_set<int64_t>& retained_snapshot_ids,
std::unordered_set<ManifestFile> manifests_to_delete,
std::unordered_set<ManifestFile>& current_manifests) {
for (int64_t snapshot_id : retained_snapshot_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests,
ReadManifestsForSnapshot(metadata, snapshot_id));

for (const auto& manifest : snapshot_manifests) {
manifests_to_delete.erase(manifest);

if (manifests_to_delete.empty()) {
return manifests_to_delete;
}

current_manifests.insert(manifest);
}
}

return manifests_to_delete;
}

Result<std::unordered_set<std::string>> ReadLiveDataFilePaths(
const TableMetadata& metadata, const ManifestFile& manifest) {
ICEBERG_PRECHECK(manifest.content == ManifestContent::kData,
"Cannot read data file paths from a delete manifest: {}",
manifest.manifest_path);

/// TODO(shangxinli): optimize by only reading file paths
ICEBERG_ASSIGN_OR_RAISE(auto reader,
MakeManifestReader(manifest, file_io_, metadata));
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());

std::unordered_set<std::string> data_file_paths;
for (const auto& entry : entries) {
if (entry.data_file) {
data_file_paths.insert(entry.data_file->file_path);
}
}

return data_file_paths;
}

/// \brief Project manifests to manifest paths for deletion.
std::unordered_set<std::string> ManifestPaths(
const std::unordered_set<ManifestFile>& manifests) const {
std::unordered_set<std::string> manifest_paths;
manifest_paths.reserve(manifests.size());
for (const auto& manifest : manifests) {
manifest_paths.insert(manifest.manifest_path);
}
return manifest_paths;
}

/// \brief Find data files to delete from manifests being removed.
std::unordered_set<std::string> FindDataFilesToDelete(
const TableMetadata& metadata,
const std::unordered_set<ManifestFile>& manifests_to_delete,
const std::unordered_set<ManifestFile>& current_manifests) {
std::unordered_set<std::string> data_files_to_delete;

// Collect live file paths from manifests being deleted.
for (const auto& manifest : manifests_to_delete) {
auto live_data_files = ReadLiveDataFilePaths(metadata, manifest);
// Ignore expired-manifest read failures and keep scanning candidates.
if (!live_data_files.has_value()) {
continue;
}

data_files_to_delete.insert(live_data_files->begin(), live_data_files->end());
}

if (data_files_to_delete.empty()) {
return data_files_to_delete;
}

// Remove files still referenced by current manifests.
for (const auto& manifest : current_manifests) {
if (data_files_to_delete.empty()) {
return data_files_to_delete;
}

auto live_data_files = ReadLiveDataFilePaths(metadata, manifest);
// Fail closed if any retained manifest cannot be read safely.
if (!live_data_files.has_value()) {
return std::unordered_set<std::string>{};
}

for (const auto& file_path : live_data_files.value()) {
data_files_to_delete.erase(file_path);
}
}

return data_files_to_delete;
}
};

} // namespace

Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make(
std::shared_ptr<TransactionContext> ctx) {
ICEBERG_PRECHECK(ctx != nullptr, "Cannot create ExpireSnapshots without a context");
Expand Down Expand Up @@ -241,6 +537,7 @@ Result<ExpireSnapshots::ApplyResult> ExpireSnapshots::Apply() {
unreferenced_snapshot_ids.end());

ApplyResult result;
result.metadata_before_expiration = std::make_shared<TableMetadata>(base);

std::ranges::for_each(base.refs, [&retained_refs, &result](const auto& key_to_ref) {
if (!retained_refs.contains(key_to_ref.first)) {
Expand Down Expand Up @@ -285,7 +582,42 @@ Result<ExpireSnapshots::ApplyResult> ExpireSnapshots::Apply() {
});
}

// Cache the result for use during Finalize()
apply_result_ = result;

return result;
}

Status ExpireSnapshots::Finalize(Result<const TableMetadata*> commit_result) {
if (!commit_result.has_value()) {
return {};
}

if (cleanup_level_ == CleanupLevel::kNone) {
return {};
}

if (!apply_result_.has_value() || apply_result_->snapshot_ids_to_remove.empty()) {
return {};
}

ICEBERG_PRECHECK(apply_result_->metadata_before_expiration != nullptr,
"Missing pre-expiration table metadata for cleanup");
ICEBERG_PRECHECK(commit_result.value() != nullptr,
"Missing committed table metadata for cleanup");
auto metadata_before_expiration_ptr = apply_result_->metadata_before_expiration;
const TableMetadata& metadata_before_expiration = *metadata_before_expiration_ptr;
const TableMetadata& metadata_after_expiration = *commit_result.value();
std::unordered_set<int64_t> expired_ids(apply_result_->snapshot_ids_to_remove.begin(),
apply_result_->snapshot_ids_to_remove.end());
apply_result_.reset();

// File cleanup is best-effort: log and continue on individual file deletion failures
ReachableFileCleanup strategy(ctx_->table->io(), delete_func_);
return strategy.CleanFiles(metadata_before_expiration, metadata_after_expiration,
expired_ids, cleanup_level_);
}

// TODO(shangxinli): add IncrementalFileCleanup strategy for linear ancestry optimization.

} // namespace iceberg
Loading
Loading