perf(reader): replace O(N*M) equality-delete predicate tree with O(N+M) HashSet filter#2343
Draft
t3hw wants to merge 2 commits intoapache:mainfrom
Draft
perf(reader): replace O(N*M) equality-delete predicate tree with O(N+M) HashSet filter#2343t3hw wants to merge 2 commits intoapache:mainfrom
t3hw wants to merge 2 commits intoapache:mainfrom
Conversation
O(N+M) HashSet-based filter The existing equality delete implementation builds a predicate AST with one node per delete record and evaluates every node against every data batch. For N delete records and M data rows, this is O(N*M) — 25 minutes for 30M rows with ~5000 delete records in our benchmark. Replace the predicate tree with a `HashSet<EqDeleteKey>` of delete key tuples, matching the approach used by the Java/Spark implementation (`StructLikeSet`). Delete records are collected into a hash set during parsing (O(N)), then each data row is checked with an O(1) lookup. Key changes: - `caching_delete_file_loader.rs`: `parse_equality_deletes_record_batch_stream` now returns an `EqDeleteSet` (hash set + field metadata) instead of a `Predicate`. The balanced binary tree construction and `rewrite_not()` calls are eliminated entirely. `Datum` already derives `Hash` and `Eq` (via `OrderedFloat` for floats), so no new trait implementations are needed. - `delete_filter.rs`: `EqDelState::Loaded` holds `Arc<EqDeleteSet>` instead of `Predicate`. The new `build_equality_delete_sets()` groups delete files by their `equality_ids` field layout before unioning, preventing incorrect merges when different delete files use different equality column sets. Single-file groups return the cached `Arc` directly with no deep clone. - `reader.rs`: Equality delete filtering is decoupled from the scan predicate `RowFilter`. The scan predicate stays in the Parquet `RowFilter` pipeline (page/row-group pruning preserved). Equality deletes are applied as a lazy post-read `.map()` step on the record batch stream via `apply_eq_delete_filter()`, which reuses a single `EqDeleteKey` allocation across all rows to avoid per-row `Vec` allocations.
When a user scans with `.select(["col_a", "col_b"])` and the table has
merge-on-read equality delete files keyed on a column NOT in the select
list (e.g. `id`), the HashSet-based `apply_eq_delete_filter` fails with:
Equality delete key column 'id' (field_id=1) not found in batch
The fix augments the Parquet projection mask and RecordBatchTransformer
with any equality delete key field IDs that are missing from the user's
projection. After applying equality deletes, the extra columns are
stripped from the output batches so the user sees only their requested
columns.
This matches the behavior of Spark, Flink, and Trino, which
transparently widen the internal projection for delete evaluation.
facf9b9 to
72bbff2
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
No tracking issue filed yet. The symptom ("equality-delete scans scale
quadratically in deletes × data rows") has been observed on our own
uncompacted MOR workloads. Happy to open a tracking issue first if a
committer prefers.
What changes are included in this PR?
This PR contains two commits that are tightly coupled. The squash
merge collapses them; the split is purely to help reviewers read the
perf change separately from the correctness fix it enables.
Commit 1 —
perf(reader): replace O(N*M) equality-delete predicate tree with O(N+M) HashSet-based filterThe existing implementation builds a
BoundPredicateAST with oneleaf per delete record and evaluates it against every data row via
parquet-rs
RowFilter. ForNdata rows andMdelete records thisis O(N · M): data-row count multiplies by delete-record count at
evaluation time.
This commit replaces the predicate tree with a
HashSet<EqDeleteKey>of delete-key tuples — the same approach Java(
StructLikeSet) takes. Delete records are collected into the hash setat parse time (O(M)); each data row is then checked with an O(1)
lookup (O(N + M) total).
Key pieces:
caching_delete_file_loader.rs—parse_equality_deletes_record_batch_streamreturns anEqDeleteSet(
HashSet<EqDeleteKey>+ field metadata) instead of aPredicate.The balanced-binary-tree construction and
rewrite_not()passes areeliminated.
Datumalready derivesHash + Eq(viaOrderedFloatfor floats), so no new trait impls are needed.
delete_filter.rs—EqDelState::LoadedholdsArc<EqDeleteSet>instead ofPredicate. The newbuild_equality_delete_sets()groups delete files by theirequality_idsfield layout before unioning, preventing incorrectmerges when different delete files use different equality column
sets. Single-file groups return the cached
Arcdirectly with nodeep clone.
reader.rs— equality-delete filtering is decoupled from thescan predicate
RowFilter. The scan predicate remains in the ParquetRowFilterpipeline (so page/row-group pruning is preserved).Equality deletes are applied as a lazy post-read
.map()step onthe record-batch stream via
apply_eq_delete_filter, which reuses asingle
EqDeleteKeyallocation across all rows to avoid per-rowVecallocations.Commit 2 —
fix(reader): auto-include equality delete key columns in projectionConsequence of commit 1: because the new
apply_eq_delete_filteroperates on arrow batches after the Parquet read (rather than
through parquet-rs's
RowFilter, which transparently widens its ownProjectionMask), the post-read filter needs the delete-key columns tobe materialized. When a caller scans with
.select(["col_a", "col_b"])against a table whose equality deletesare keyed on a column outside the projection (e.g.
id), the filterraises:
This commit widens the Parquet projection mask and
RecordBatchTransformerto include any equality-delete key field IDsmissing from the user's projection, then strips those extra columns
from the output batches after deletes are applied — so the user sees
only the columns they requested. This matches the behavior of Spark,
Flink, and Trino, which all transparently widen the internal
projection for delete evaluation.
Note: the previous
RowFilter-based path did not need this fix becauseparquet-rs's
ArrowPredicateFncarries its ownProjectionMask::leaves(parquet_schema, column_indices)that reads thefilter's columns independently of the user projection. The need for
explicit auto-include is a direct consequence of moving deletes to a
post-read filter.
Measured impact
The primary motivation is asymptotic: the old algorithm is O(N · M),
so delete-heavy MOR tables (large
M) pay a runtime penalty thatscales with the data row count. The new algorithm is O(N + M).
We verified that the change is not a regression on low-delete-ratio
workloads. On a 36-hour pipeline-level benchmark (Iceberg table,
~71 M rows scanned,
M = 120 kequality-delete records — a 0.17 %delete ratio, pipeline including downstream Rayon-parallel aggregation
and DataFusion transforms), wall-clock changed from 59.15 s to 59.62 s
(+0.8 %, within run-to-run noise). At this delete ratio the HashSet
setup cost (~7.5 CPU-seconds of extra userland work) shows up as
higher CPU utilization (166 % → 178 % average cores active) rather
than added wall-clock.
In the regime the change is actually designed for — large
M— thesetup cost is amortized by O(1) per-row lookups replacing
O(M)-per-row evaluation of the predicate tree. An empirical
measurement on a delete-heavy table would put hard numbers on that
claim; we haven't run one yet.
Memory trade-off
HashSetpeak-RSS grows withM(the number of delete records, notdata rows). On the low-
Mmatrix workload we observed ~0.3 GB RSSgrowth; a delete-heavy table with 10 M delete records would grow
proportionally. Callers on memory-constrained systems with very
delete-heavy MOR tables should size their heap accordingly.
Are these changes tested?
Yes.
caching_delete_file_loader.rs(group-by-equality-ids, single-filefast path, mixed layouts) and
delete_filter.rs.reader.rscover:delete-key column.
the delete-key column (the auto-include fix).
extra key column is stripped from output batches.
arrow::readerlib test suite is green (43/43), and the fullcargo test -p iceberg --libpasses (1229/1229).cargo clippy -p iceberg --all-features --lib --tests -- -D warningsand nightly
cargo fmt --checkare clean.Notes for reviewers
happy to land the two commits as separate PRs, but the fix in commit
2 only fires once commit 1 is in (otherwise the bug cannot trigger),
so reviewing them together is easier. If a committer prefers them
split, happy to open a follow-up PR for the fix.
EqDeleteKeytuple usesDatumequality (viaOrderedFloatfor floats), matching Iceberg's spec for equalitydeletes. NaN handling is left as the Rust-stdlib/OrderedFloat
default, same as existing equality-predicate paths.
equality_idsgrouping. The pre-union grouping byequality_idsis load-bearing: it prevents merging delete filesthat target different columns. A regression test specifically
covers this.