Skip to content

perf(reader): replace O(N*M) equality-delete predicate tree with O(N+M) HashSet filter#2343

Draft
t3hw wants to merge 2 commits intoapache:mainfrom
t3hw:pr/hashset-equality-delete-filter
Draft

perf(reader): replace O(N*M) equality-delete predicate tree with O(N+M) HashSet filter#2343
t3hw wants to merge 2 commits intoapache:mainfrom
t3hw:pr/hashset-equality-delete-filter

Conversation

@t3hw
Copy link
Copy Markdown

@t3hw t3hw commented Apr 18, 2026

Which issue does this PR close?

No tracking issue filed yet. The symptom ("equality-delete scans scale
quadratically in deletes × data rows") has been observed on our own
uncompacted MOR workloads. Happy to open a tracking issue first if a
committer prefers.

What changes are included in this PR?

This PR contains two commits that are tightly coupled. The squash
merge collapses them; the split is purely to help reviewers read the
perf change separately from the correctness fix it enables.

Commit 1 — perf(reader): replace O(N*M) equality-delete predicate tree with O(N+M) HashSet-based filter

The existing implementation builds a BoundPredicate AST with one
leaf per delete record and evaluates it against every data row via
parquet-rs RowFilter. For N data rows and M delete records this
is O(N · M): data-row count multiplies by delete-record count at
evaluation time.

This commit replaces the predicate tree with a
HashSet<EqDeleteKey> of delete-key tuples — the same approach Java
(StructLikeSet) takes. Delete records are collected into the hash set
at parse time (O(M)); each data row is then checked with an O(1)
lookup (O(N + M) total).

Key pieces:

  • caching_delete_file_loader.rs
    parse_equality_deletes_record_batch_stream returns an EqDeleteSet
    (HashSet<EqDeleteKey> + field metadata) instead of a Predicate.
    The balanced-binary-tree construction and rewrite_not() passes are
    eliminated. Datum already derives Hash + Eq (via OrderedFloat
    for floats), so no new trait impls are needed.
  • delete_filter.rsEqDelState::Loaded holds
    Arc<EqDeleteSet> instead of Predicate. The new
    build_equality_delete_sets() groups delete files by their
    equality_ids field layout before unioning, preventing incorrect
    merges when different delete files use different equality column
    sets. Single-file groups return the cached Arc directly with no
    deep clone.
  • reader.rs — equality-delete filtering is decoupled from the
    scan predicate RowFilter. The scan predicate remains in the Parquet
    RowFilter pipeline (so page/row-group pruning is preserved).
    Equality deletes are applied as a lazy post-read .map() step on
    the record-batch stream via apply_eq_delete_filter, which reuses a
    single EqDeleteKey allocation across all rows to avoid per-row
    Vec allocations.

Commit 2 — fix(reader): auto-include equality delete key columns in projection

Consequence of commit 1: because the new apply_eq_delete_filter
operates on arrow batches after the Parquet read (rather than
through parquet-rs's RowFilter, which transparently widens its own
ProjectionMask), the post-read filter needs the delete-key columns to
be materialized. When a caller scans with
.select(["col_a", "col_b"]) against a table whose equality deletes
are keyed on a column outside the projection (e.g. id), the filter
raises:

Equality delete key column 'id' (field_id=1) not found in batch

This commit widens the Parquet projection mask and
RecordBatchTransformer to include any equality-delete key field IDs
missing from the user's projection, then strips those extra columns
from the output batches
after deletes are applied — so the user sees
only the columns they requested. This matches the behavior of Spark,
Flink, and Trino, which all transparently widen the internal
projection for delete evaluation.

Note: the previous RowFilter-based path did not need this fix because
parquet-rs's ArrowPredicateFn carries its own
ProjectionMask::leaves(parquet_schema, column_indices) that reads the
filter's columns independently of the user projection. The need for
explicit auto-include is a direct consequence of moving deletes to a
post-read filter.

Measured impact

The primary motivation is asymptotic: the old algorithm is O(N · M),
so delete-heavy MOR tables (large M) pay a runtime penalty that
scales with the data row count. The new algorithm is O(N + M).

We verified that the change is not a regression on low-delete-ratio
workloads
. On a 36-hour pipeline-level benchmark (Iceberg table,
~71 M rows scanned, M = 120 k equality-delete records — a 0.17 %
delete ratio, pipeline including downstream Rayon-parallel aggregation
and DataFusion transforms), wall-clock changed from 59.15 s to 59.62 s
(+0.8 %, within run-to-run noise). At this delete ratio the HashSet
setup cost (~7.5 CPU-seconds of extra userland work) shows up as
higher CPU utilization (166 % → 178 % average cores active) rather
than added wall-clock.

In the regime the change is actually designed for — large M — the
setup cost is amortized by O(1) per-row lookups replacing
O(M)-per-row evaluation of the predicate tree. An empirical
measurement on a delete-heavy table would put hard numbers on that
claim; we haven't run one yet.

Memory trade-off

HashSet peak-RSS grows with M (the number of delete records, not
data rows). On the low-M matrix workload we observed ~0.3 GB RSS
growth; a delete-heavy table with 10 M delete records would grow
proportionally. Callers on memory-constrained systems with very
delete-heavy MOR tables should size their heap accordingly.

Are these changes tested?

Yes.

  • Unit tests for the parser and filter live in
    caching_delete_file_loader.rs (group-by-equality-ids, single-file
    fast path, mixed layouts) and delete_filter.rs.
  • Integration-style tests in reader.rs cover:
    • Equality deletes where the user projection does include the
      delete-key column.
    • Equality deletes where the user projection does not include
      the delete-key column (the auto-include fix).
    • Mixed projections with other non-key columns to confirm the
      extra key column is stripped from output batches.
  • The arrow::reader lib test suite is green (43/43), and the full
    cargo test -p iceberg --lib passes (1229/1229).
  • cargo clippy -p iceberg --all-features --lib --tests -- -D warnings
    and nightly cargo fmt --check are clean.

Notes for reviewers

  • Diff size (~920 lines). Above the 300–500 line guideline — I am
    happy to land the two commits as separate PRs, but the fix in commit
    2 only fires once commit 1 is in (otherwise the bug cannot trigger),
    so reviewing them together is easier. If a committer prefers them
    split, happy to open a follow-up PR for the fix.
  • Semantics. The EqDeleteKey tuple uses Datum equality (via
    OrderedFloat for floats), matching Iceberg's spec for equality
    deletes. NaN handling is left as the Rust-stdlib/OrderedFloat
    default, same as existing equality-predicate paths.
  • equality_ids grouping. The pre-union grouping by
    equality_ids is load-bearing: it prevents merging delete files
    that target different columns. A regression test specifically
    covers this.

t3hw added 2 commits April 19, 2026 00:11
O(N+M) HashSet-based filter

The existing equality delete implementation builds a predicate AST with
one node per delete record and evaluates every node against every data
batch. For N delete records and M data rows, this is O(N*M) — 25
minutes for 30M rows with ~5000 delete records in our benchmark.

Replace the predicate tree with a `HashSet<EqDeleteKey>` of delete key
tuples, matching the approach used by the Java/Spark implementation
(`StructLikeSet`). Delete records are collected into a hash set during
parsing (O(N)), then each data row is checked with an O(1) lookup.

Key changes:

- `caching_delete_file_loader.rs`: `parse_equality_deletes_record_batch_stream`
  now returns an `EqDeleteSet` (hash set + field metadata) instead of a
  `Predicate`. The balanced binary tree construction and `rewrite_not()`
  calls are eliminated entirely. `Datum` already derives `Hash` and `Eq`
  (via `OrderedFloat` for floats), so no new trait implementations are
  needed.

- `delete_filter.rs`: `EqDelState::Loaded` holds `Arc<EqDeleteSet>`
  instead of `Predicate`. The new `build_equality_delete_sets()` groups
  delete files by their `equality_ids` field layout before unioning,
  preventing incorrect merges when different delete files use different
  equality column sets. Single-file groups return the cached `Arc`
  directly with no deep clone.

- `reader.rs`: Equality delete filtering is decoupled from the scan
  predicate `RowFilter`. The scan predicate stays in the Parquet
  `RowFilter` pipeline (page/row-group pruning preserved). Equality
  deletes are applied as a lazy post-read `.map()` step on the record
  batch stream via `apply_eq_delete_filter()`, which reuses a single
  `EqDeleteKey` allocation across all rows to avoid per-row `Vec`
  allocations.
When a user scans with `.select(["col_a", "col_b"])` and the table has
merge-on-read equality delete files keyed on a column NOT in the select
list (e.g. `id`), the HashSet-based `apply_eq_delete_filter` fails with:

    Equality delete key column 'id' (field_id=1) not found in batch

The fix augments the Parquet projection mask and RecordBatchTransformer
with any equality delete key field IDs that are missing from the user's
projection. After applying equality deletes, the extra columns are
stripped from the output batches so the user sees only their requested
columns.

This matches the behavior of Spark, Flink, and Trino, which
transparently widen the internal projection for delete evaluation.
@t3hw t3hw force-pushed the pr/hashset-equality-delete-filter branch from facf9b9 to 72bbff2 Compare April 18, 2026 21:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant