Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::collections::{HashMap, HashSet};
use std::ops::Not;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;

use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -152,6 +153,7 @@ impl CachingDeleteFileLoader {
&self,
delete_file_entries: &[FileScanTaskDeleteFile],
schema: SchemaRef,
bytes_read: &Arc<AtomicU64>,
) -> Receiver<Result<DeleteFilter>> {
let (tx, rx) = channel();

Expand All @@ -171,6 +173,7 @@ impl CachingDeleteFileLoader {
let del_filter = self.delete_filter.clone();
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let basic_delete_file_loader = self.basic_delete_file_loader.clone();
let bytes_read = Arc::clone(bytes_read);
crate::runtime::spawn(async move {
let result = async move {
let mut del_filter = del_filter;
Expand All @@ -179,12 +182,14 @@ impl CachingDeleteFileLoader {
let mut results_stream = task_stream
.map(move |(task, file_io, del_filter, schema)| {
let basic_delete_file_loader = basic_delete_file_loader.clone();
let bytes_read = Arc::clone(&bytes_read);
async move {
Self::load_file_for_task(
&task,
basic_delete_file_loader.clone(),
del_filter,
schema,
&bytes_read,
)
.await
}
Expand Down Expand Up @@ -220,6 +225,7 @@ impl CachingDeleteFileLoader {
basic_delete_file_loader: BasicDeleteFileLoader,
del_filter: DeleteFilter,
schema: SchemaRef,
bytes_read: &Arc<AtomicU64>,
) -> Result<DeleteFileContext> {
match task.file_type {
DataContentType::PositionDeletes => {
Expand All @@ -235,7 +241,11 @@ impl CachingDeleteFileLoader {
PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels {
file_path: task.file_path.clone(),
stream: basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
.parquet_to_batch_stream(
&task.file_path,
task.file_size_in_bytes,
bytes_read,
)
.await?,
}),
}
Expand All @@ -254,7 +264,11 @@ impl CachingDeleteFileLoader {
let equality_ids_vec = task.equality_ids.clone().unwrap();
let evolved_stream = BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
.parquet_to_batch_stream(
&task.file_path,
task.file_size_in_bytes,
bytes_read,
)
.await?,
schema,
&equality_ids_vec,
Expand Down Expand Up @@ -613,10 +627,12 @@ mod tests {
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);

let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let bytes_read = Arc::new(AtomicU64::new(0));
let record_batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(
&eq_delete_file_path,
std::fs::metadata(&eq_delete_file_path).unwrap().len(),
&bytes_read,
)
.await
.expect("could not get batch stream");
Expand Down Expand Up @@ -727,11 +743,16 @@ mod tests {
let file_io = FileIO::new_with_fs();

let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
let bytes_read = Arc::new(AtomicU64::new(0));

let file_scan_tasks = setup(table_location);

let delete_filter = delete_file_loader
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.load_deletes(
&file_scan_tasks[0].deletes,
file_scan_tasks[0].schema_ref(),
&bytes_read,
)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -809,11 +830,13 @@ mod tests {

let file_io = FileIO::new_with_fs();
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let bytes_read = Arc::new(AtomicU64::new(0));

let batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(
&delete_file_path,
std::fs::metadata(&delete_file_path).unwrap().len(),
&bytes_read,
)
.await
.unwrap();
Expand Down Expand Up @@ -947,8 +970,13 @@ mod tests {

// Load the deletes - should handle both types without error
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
let bytes_read = Arc::new(AtomicU64::new(0));
let delete_filter = delete_file_loader
.load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
.load_deletes(
&file_scan_task.deletes,
file_scan_task.schema_ref(),
&bytes_read,
)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -995,8 +1023,9 @@ mod tests {
writer.close().unwrap();

let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let bytes_read = Arc::new(AtomicU64::new(0));
let record_batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len())
.parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len(), &bytes_read)
.await
.expect("could not get batch stream");

Expand All @@ -1018,19 +1047,28 @@ mod tests {
let file_io = FileIO::new_with_fs();

let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
let bytes_read = Arc::new(AtomicU64::new(0));

let file_scan_tasks = setup(table_location);

// Load deletes for the first time
let delete_filter_1 = delete_file_loader
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.load_deletes(
&file_scan_tasks[0].deletes,
file_scan_tasks[0].schema_ref(),
&bytes_read,
)
.await
.unwrap()
.unwrap();

// Load deletes for the second time (same task/files)
let delete_filter_2 = delete_file_loader
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.load_deletes(
&file_scan_tasks[0].deletes,
file_scan_tasks[0].schema_ref(),
&bytes_read,
)
.await
.unwrap()
.unwrap();
Expand Down
9 changes: 8 additions & 1 deletion crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::sync::Arc;
use std::sync::atomic::AtomicU64;

use futures::{StreamExt, TryStreamExt};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
Expand All @@ -39,6 +40,7 @@ pub trait DeleteFileLoader {
&self,
task: &FileScanTaskDeleteFile,
schema: SchemaRef,
bytes_read: &Arc<AtomicU64>,
) -> Result<ArrowRecordBatchStream>;
}

Expand All @@ -57,6 +59,7 @@ impl BasicDeleteFileLoader {
&self,
data_file_path: &str,
file_size_in_bytes: u64,
bytes_read: &Arc<AtomicU64>,
) -> Result<ArrowRecordBatchStream> {
/*
Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
Expand All @@ -69,6 +72,7 @@ impl BasicDeleteFileLoader {
&self.file_io,
file_size_in_bytes,
parquet_read_options,
bytes_read,
)
.await?;

Expand Down Expand Up @@ -108,9 +112,10 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
&self,
task: &FileScanTaskDeleteFile,
schema: SchemaRef,
bytes_read: &Arc<AtomicU64>,
) -> Result<ArrowRecordBatchStream> {
let raw_batch_stream = self
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes, bytes_read)
.await?;

// For equality deletes, only evolve the equality_ids columns.
Expand Down Expand Up @@ -138,13 +143,15 @@ mod tests {
let file_io = FileIO::new_with_fs();

let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let bytes_read = Arc::new(AtomicU64::new(0));

let file_scan_tasks = setup(table_location);

let result = delete_file_loader
.read_delete_file(
&file_scan_tasks[0].deletes[0],
file_scan_tasks[0].schema_ref(),
&bytes_read,
)
.await
.unwrap();
Expand Down
14 changes: 12 additions & 2 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ pub(crate) mod tests {
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;

use arrow_array::{Int64Array, RecordBatch, StringArray};
use arrow_schema::Schema as ArrowSchema;
Expand All @@ -293,11 +294,16 @@ pub(crate) mod tests {
let file_io = FileIO::new_with_fs();

let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
let bytes_read = Arc::new(AtomicU64::new(0));

let file_scan_tasks = setup(table_location);

let delete_filter = delete_file_loader
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.load_deletes(
&file_scan_tasks[0].deletes,
file_scan_tasks[0].schema_ref(),
&bytes_read,
)
.await
.unwrap()
.unwrap();
Expand All @@ -308,7 +314,11 @@ pub(crate) mod tests {
assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2

let delete_filter = delete_file_loader
.load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref())
.load_deletes(
&file_scan_tasks[1].deletes,
file_scan_tasks[1].schema_ref(),
&bytes_read,
)
.await
.unwrap()
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ mod reader;
/// RecordBatch projection utilities
pub mod record_batch_projector;
pub(crate) mod record_batch_transformer;
mod scan_metrics;
mod value;

pub use reader::*;
pub use scan_metrics::{ScanMetrics, ScanResult};
pub use value::*;
/// Partition value calculator for computing partition values
pub mod partition_value_calculator;
Expand Down
Loading
Loading