diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index ae97534d83..9daeb8fe7c 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -18,6 +18,7 @@ use std::collections::{HashMap, HashSet}; use std::ops::Not; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; use futures::{StreamExt, TryStreamExt}; @@ -152,6 +153,7 @@ impl CachingDeleteFileLoader { &self, delete_file_entries: &[FileScanTaskDeleteFile], schema: SchemaRef, + bytes_read: &Arc, ) -> Receiver> { let (tx, rx) = channel(); @@ -171,6 +173,7 @@ impl CachingDeleteFileLoader { let del_filter = self.delete_filter.clone(); let concurrency_limit_data_files = self.concurrency_limit_data_files; let basic_delete_file_loader = self.basic_delete_file_loader.clone(); + let bytes_read = Arc::clone(bytes_read); crate::runtime::spawn(async move { let result = async move { let mut del_filter = del_filter; @@ -179,12 +182,14 @@ impl CachingDeleteFileLoader { let mut results_stream = task_stream .map(move |(task, file_io, del_filter, schema)| { let basic_delete_file_loader = basic_delete_file_loader.clone(); + let bytes_read = Arc::clone(&bytes_read); async move { Self::load_file_for_task( &task, basic_delete_file_loader.clone(), del_filter, schema, + &bytes_read, ) .await } @@ -220,6 +225,7 @@ impl CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader, del_filter: DeleteFilter, schema: SchemaRef, + bytes_read: &Arc, ) -> Result { match task.file_type { DataContentType::PositionDeletes => { @@ -235,7 +241,11 @@ impl CachingDeleteFileLoader { PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels { file_path: task.file_path.clone(), stream: basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream( + &task.file_path, + task.file_size_in_bytes, + bytes_read, + ) .await?, }), } @@ -254,7 +264,11 @@ impl CachingDeleteFileLoader { let equality_ids_vec = task.equality_ids.clone().unwrap(); let evolved_stream = BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream( + &task.file_path, + task.file_size_in_bytes, + bytes_read, + ) .await?, schema, &equality_ids_vec, @@ -613,10 +627,12 @@ mod tests { let eq_delete_file_path = setup_write_equality_delete_file_1(table_location); let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let bytes_read = Arc::new(AtomicU64::new(0)); let record_batch_stream = basic_delete_file_loader .parquet_to_batch_stream( &eq_delete_file_path, std::fs::metadata(&eq_delete_file_path).unwrap().len(), + &bytes_read, ) .await .expect("could not get batch stream"); @@ -727,11 +743,16 @@ mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); let delete_filter = delete_file_loader - .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .load_deletes( + &file_scan_tasks[0].deletes, + file_scan_tasks[0].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); @@ -809,11 +830,13 @@ mod tests { let file_io = FileIO::new_with_fs(); let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let bytes_read = Arc::new(AtomicU64::new(0)); let batch_stream = basic_delete_file_loader .parquet_to_batch_stream( &delete_file_path, std::fs::metadata(&delete_file_path).unwrap().len(), + &bytes_read, ) .await .unwrap(); @@ -947,8 +970,13 @@ mod tests { // Load the deletes - should handle both types without error let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let bytes_read = Arc::new(AtomicU64::new(0)); let delete_filter = delete_file_loader - .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) + .load_deletes( + &file_scan_task.deletes, + file_scan_task.schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); @@ -995,8 +1023,9 @@ mod tests { writer.close().unwrap(); let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let bytes_read = Arc::new(AtomicU64::new(0)); let record_batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len()) + .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len(), &bytes_read) .await .expect("could not get batch stream"); @@ -1018,19 +1047,28 @@ mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); // Load deletes for the first time let delete_filter_1 = delete_file_loader - .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .load_deletes( + &file_scan_tasks[0].deletes, + file_scan_tasks[0].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); // Load deletes for the second time (same task/files) let delete_filter_2 = delete_file_loader - .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .load_deletes( + &file_scan_tasks[0].deletes, + file_scan_tasks[0].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 0be62ad496..29961df67d 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -16,6 +16,7 @@ // under the License. use std::sync::Arc; +use std::sync::atomic::AtomicU64; use futures::{StreamExt, TryStreamExt}; use parquet::arrow::ParquetRecordBatchStreamBuilder; @@ -39,6 +40,7 @@ pub trait DeleteFileLoader { &self, task: &FileScanTaskDeleteFile, schema: SchemaRef, + bytes_read: &Arc, ) -> Result; } @@ -57,6 +59,7 @@ impl BasicDeleteFileLoader { &self, data_file_path: &str, file_size_in_bytes: u64, + bytes_read: &Arc, ) -> Result { /* Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly @@ -69,6 +72,7 @@ impl BasicDeleteFileLoader { &self.file_io, file_size_in_bytes, parquet_read_options, + bytes_read, ) .await?; @@ -108,9 +112,10 @@ impl DeleteFileLoader for BasicDeleteFileLoader { &self, task: &FileScanTaskDeleteFile, schema: SchemaRef, + bytes_read: &Arc, ) -> Result { let raw_batch_stream = self - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes, bytes_read) .await?; // For equality deletes, only evolve the equality_ids columns. @@ -138,6 +143,7 @@ mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); @@ -145,6 +151,7 @@ mod tests { .read_delete_file( &file_scan_tasks[0].deletes[0], file_scan_tasks[0].schema_ref(), + &bytes_read, ) .await .unwrap(); diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 6369938ce2..fffb9dcdce 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -267,6 +267,7 @@ pub(crate) mod tests { use std::fs::File; use std::path::Path; use std::sync::Arc; + use std::sync::atomic::AtomicU64; use arrow_array::{Int64Array, RecordBatch, StringArray}; use arrow_schema::Schema as ArrowSchema; @@ -293,11 +294,16 @@ pub(crate) mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); let delete_filter = delete_file_loader - .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .load_deletes( + &file_scan_tasks[0].deletes, + file_scan_tasks[0].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); @@ -308,7 +314,11 @@ pub(crate) mod tests { assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2 let delete_filter = delete_file_loader - .load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref()) + .load_deletes( + &file_scan_tasks[1].deletes, + file_scan_tasks[1].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 7823320452..bf53633cfc 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -32,9 +32,11 @@ mod reader; /// RecordBatch projection utilities pub mod record_batch_projector; pub(crate) mod record_batch_transformer; +mod scan_metrics; mod value; pub use reader::*; +pub use scan_metrics::{ScanMetrics, ScanResult}; pub use value::*; /// Partition value calculator for computing partition values pub mod partition_value_calculator; diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 94059fc62b..64eefa112d 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -21,6 +21,7 @@ //! of transformed Arrow `RecordBatch`es. use std::sync::Arc; +use std::sync::atomic::AtomicU64; use futures::{StreamExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; @@ -33,8 +34,9 @@ use super::{ use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::int96::coerce_int96_timestamps; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; +use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; use crate::error::Result; -use crate::io::{FileIO, FileMetadata}; +use crate::io::{FileIO, FileMetadata, FileRead}; use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::Datum; @@ -42,32 +44,26 @@ use crate::{Error, ErrorKind}; impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. - /// Returns a stream of Arrow RecordBatches containing the data from the files - pub fn read(self, tasks: FileScanTaskStream) -> Result { - let file_io = self.file_io.clone(); - let batch_size = self.batch_size; + /// Returns a [`ScanResult`] containing the record batch stream and scan metrics. + pub fn read(self, tasks: FileScanTaskStream) -> Result { let concurrency_limit_data_files = self.concurrency_limit_data_files; - let row_group_filtering_enabled = self.row_group_filtering_enabled; - let row_selection_enabled = self.row_selection_enabled; - let parquet_read_options = self.parquet_read_options; + let scan_metrics = ScanMetrics::new(); + + let task_reader = FileScanTaskReader { + batch_size: self.batch_size, + file_io: self.file_io, + delete_file_loader: self.delete_file_loader, + row_group_filtering_enabled: self.row_group_filtering_enabled, + row_selection_enabled: self.row_selection_enabled, + parquet_read_options: self.parquet_read_options, + scan_metrics: scan_metrics.clone(), + }; // Fast-path for single concurrency to avoid overhead of try_flatten_unordered let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { Box::pin( tasks - .and_then(move |task| { - let file_io = file_io.clone(); - - Self::process_file_scan_task( - task, - batch_size, - file_io, - self.delete_file_loader.clone(), - row_group_filtering_enabled, - row_selection_enabled, - parquet_read_options, - ) - }) + .and_then(move |task| task_reader.clone().process(task)) .map_err(|err| { Error::new(ErrorKind::Unexpected, "file scan task generate failed") .with_source(err) @@ -77,19 +73,7 @@ impl ArrowReader { } else { Box::pin( tasks - .map_ok(move |task| { - let file_io = file_io.clone(); - - Self::process_file_scan_task( - task, - batch_size, - file_io, - self.delete_file_loader.clone(), - row_group_filtering_enabled, - row_selection_enabled, - parquet_read_options, - ) - }) + .map_ok(move |task| task_reader.clone().process(task)) .map_err(|err| { Error::new(ErrorKind::Unexpected, "file scan task generate failed") .with_source(err) @@ -99,32 +83,43 @@ impl ArrowReader { ) }; - Ok(stream) + Ok(ScanResult::new(stream, scan_metrics)) } +} - async fn process_file_scan_task( - task: FileScanTask, - batch_size: Option, - file_io: FileIO, - delete_file_loader: CachingDeleteFileLoader, - row_group_filtering_enabled: bool, - row_selection_enabled: bool, - parquet_read_options: ParquetReadOptions, - ) -> Result { +/// Per-scan state for processing [`FileScanTask`]s. Created once per +/// [`ArrowReader::read`] call and cloned per task. +#[derive(Clone)] +struct FileScanTaskReader { + batch_size: Option, + file_io: FileIO, + delete_file_loader: CachingDeleteFileLoader, + row_group_filtering_enabled: bool, + row_selection_enabled: bool, + parquet_read_options: ParquetReadOptions, + scan_metrics: ScanMetrics, +} + +impl FileScanTaskReader { + async fn process(self, task: FileScanTask) -> Result { let should_load_page_index = - (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); - let mut parquet_read_options = parquet_read_options; + (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); + let mut parquet_read_options = self.parquet_read_options; parquet_read_options.preload_page_index = should_load_page_index; - let delete_filter_rx = - delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); + let delete_filter_rx = self.delete_file_loader.load_deletes( + &task.deletes, + Arc::clone(&task.schema), + self.scan_metrics.bytes_read_counter(), + ); // Open the Parquet file once, loading its metadata - let (parquet_file_reader, arrow_metadata) = Self::open_parquet_file( + let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file( &task.data_file_path, - &file_io, + &self.file_io, task.file_size_in_bytes, parquet_read_options, + self.scan_metrics.bytes_read_counter(), ) .await?; @@ -222,7 +217,7 @@ impl ArrowReader { // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) // - If fallback IDs: position-based projection (missing_field_ids=true) - let projection_mask = Self::get_arrow_projection_mask( + let projection_mask = ArrowReader::get_arrow_projection_mask( &project_field_ids_without_metadata, &task.schema, record_batch_stream_builder.parquet_schema(), @@ -255,7 +250,7 @@ impl ArrowReader { let mut record_batch_transformer = record_batch_transformer_builder.build(); - if let Some(batch_size) = batch_size { + if let Some(batch_size) = self.batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } @@ -296,7 +291,7 @@ impl ArrowReader { // Filter row groups based on byte range from task.start and task.length. // If both start and length are 0, read the entire file (backwards compatibility). if task.start != 0 || task.length != 0 { - let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range( + let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range( record_batch_stream_builder.metadata(), task.start, task.length, @@ -305,12 +300,12 @@ impl ArrowReader { } if let Some(predicate) = final_predicate { - let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( + let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map( record_batch_stream_builder.parquet_schema(), &predicate, )?; - let row_filter = Self::get_row_filter( + let row_filter = ArrowReader::get_row_filter( &predicate, record_batch_stream_builder.parquet_schema(), &iceberg_field_ids, @@ -318,8 +313,8 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter); - if row_group_filtering_enabled { - let predicate_filtered_row_groups = Self::get_selected_row_group_indices( + if self.row_group_filtering_enabled { + let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices( &predicate, record_batch_stream_builder.metadata(), &field_id_map, @@ -341,8 +336,8 @@ impl ArrowReader { }; } - if row_selection_enabled { - row_selection = Some(Self::get_row_selection_for_filter_predicate( + if self.row_selection_enabled { + row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate( &predicate, record_batch_stream_builder.metadata(), &selected_row_group_indices, @@ -358,7 +353,7 @@ impl ArrowReader { let delete_row_selection = { let positional_delete_indexes = positional_delete_indexes.lock().unwrap(); - Self::build_deletes_row_selection( + ArrowReader::build_deletes_row_selection( record_batch_stream_builder.metadata().row_groups(), &selected_row_group_indices, &positional_delete_indexes, @@ -400,18 +395,34 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } +} - /// Opens a Parquet file and loads its metadata, returning both the reader and metadata. - /// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without - /// reopening the file. +impl ArrowReader { + /// Opens a Parquet file and loads its metadata, wrapping the reader with + /// [`CountingFileRead`] so all I/O is accumulated into `bytes_read`. pub(crate) async fn open_parquet_file( data_file_path: &str, file_io: &FileIO, file_size_in_bytes: u64, parquet_read_options: ParquetReadOptions, + bytes_read: &Arc, ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let parquet_file = file_io.new_input(data_file_path)?; - let parquet_reader = parquet_file.reader().await?; + let counting_reader = + CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read)); + Self::build_parquet_reader( + Box::new(counting_reader), + file_size_in_bytes, + parquet_read_options, + ) + .await + } + + async fn build_parquet_reader( + parquet_reader: Box, + file_size_in_bytes: u64, + parquet_read_options: ParquetReadOptions, + ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let mut reader = ArrowFileReader::new( FileMetadata { size: file_size_in_bytes, @@ -497,7 +508,7 @@ mod tests { }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; - reader.read(tasks).unwrap().try_collect().await.unwrap() + reader.read(tasks).unwrap().stream().try_collect().await.unwrap() } // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly. @@ -748,6 +759,7 @@ mod tests { let result = reader .read(tasks_stream) .unwrap() + .stream() .try_collect::>() .await .unwrap(); diff --git a/crates/iceberg/src/arrow/reader/positional_deletes.rs b/crates/iceberg/src/arrow/reader/positional_deletes.rs index eea031852b..b2993572c5 100644 --- a/crates/iceberg/src/arrow/reader/positional_deletes.rs +++ b/crates/iceberg/src/arrow/reader/positional_deletes.rs @@ -461,6 +461,7 @@ mod tests { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -681,6 +682,7 @@ mod tests { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -895,6 +897,7 @@ mod tests { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); diff --git a/crates/iceberg/src/arrow/reader/projection.rs b/crates/iceberg/src/arrow/reader/projection.rs index d3fa00b84b..deae027e14 100644 --- a/crates/iceberg/src/arrow/reader/projection.rs +++ b/crates/iceberg/src/arrow/reader/projection.rs @@ -602,6 +602,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -704,6 +705,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -805,6 +807,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -895,6 +898,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -999,6 +1003,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -1132,6 +1137,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -1232,6 +1238,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -1346,6 +1353,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -1488,6 +1496,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -1699,6 +1708,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs b/crates/iceberg/src/arrow/reader/row_filter.rs index 52f7260cc6..80432a0437 100644 --- a/crates/iceberg/src/arrow/reader/row_filter.rs +++ b/crates/iceberg/src/arrow/reader/row_filter.rs @@ -241,6 +241,7 @@ mod tests { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -555,6 +556,7 @@ mod tests { .clone() .read(tasks1) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -571,6 +573,7 @@ mod tests { let result2 = reader .read(tasks2) .unwrap() + .stream() .try_collect::>() .await .unwrap(); diff --git a/crates/iceberg/src/arrow/scan_metrics.rs b/crates/iceberg/src/arrow/scan_metrics.rs new file mode 100644 index 0000000000..435a818b8b --- /dev/null +++ b/crates/iceberg/src/arrow/scan_metrics.rs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Scan metrics and I/O counting for Parquet data file reads. + +use std::ops::Range; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use bytes::Bytes; + +use crate::error::Result; +use crate::io::FileRead; +use crate::scan::ArrowRecordBatchStream; + +/// Wraps a [`FileRead`] to count bytes read via a shared atomic counter. +pub(crate) struct CountingFileRead { + inner: F, + bytes_read: Arc, +} + +impl CountingFileRead { + pub(crate) fn new(inner: F, bytes_read: Arc) -> Self { + Self { inner, bytes_read } + } +} + +#[async_trait::async_trait] +impl FileRead for CountingFileRead { + async fn read(&self, range: Range) -> Result { + debug_assert!(range.end >= range.start); + self.bytes_read + .fetch_add(range.end - range.start, Ordering::Relaxed); + self.inner.read(range).await + } +} + +/// Metrics collected during an Iceberg scan. +#[derive(Clone, Debug)] +pub struct ScanMetrics { + bytes_read: Arc, +} + +impl ScanMetrics { + pub(crate) fn new() -> Self { + Self { + bytes_read: Arc::new(AtomicU64::new(0)), + } + } + + pub(crate) fn bytes_read_counter(&self) -> &Arc { + &self.bytes_read + } + + /// Total bytes read from storage for data files during this scan. + pub fn bytes_read(&self) -> u64 { + self.bytes_read.load(Ordering::Relaxed) + } +} + +/// Result of [`ArrowReader::read`](super::ArrowReader::read), containing the +/// record batch stream and metrics collected during the scan. +pub struct ScanResult { + stream: ArrowRecordBatchStream, + metrics: ScanMetrics, +} + +impl ScanResult { + pub(crate) fn new(stream: ArrowRecordBatchStream, metrics: ScanMetrics) -> Self { + Self { stream, metrics } + } + + /// Consumes the result, returning only the record batch stream. + pub fn stream(self) -> ArrowRecordBatchStream { + self.stream + } + + /// Returns a reference to the scan metrics. + pub fn metrics(&self) -> &ScanMetrics { + &self.metrics + } + + /// Consumes the result into its parts. + pub fn into_parts(self) -> (ArrowRecordBatchStream, ScanMetrics) { + (self.stream, self.metrics) + } +} diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 594b070e03..ff7523d44d 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -255,6 +255,13 @@ pub trait FileRead: Send + Sync + Unpin + 'static { async fn read(&self, range: Range) -> crate::Result; } +#[async_trait::async_trait] +impl FileRead for Box { + async fn read(&self, range: Range) -> crate::Result { + self.as_ref().read(range).await + } +} + /// Input file is used for reading from files. #[derive(Debug)] pub struct InputFile { diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 4a1e27bdc1..27f479183a 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -32,6 +32,7 @@ use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; use crate::arrow::ArrowReaderBuilder; +pub use crate::arrow::{ScanMetrics, ScanResult}; use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; @@ -441,7 +442,10 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - arrow_reader_builder.build().read(self.plan_files().await?) + arrow_reader_builder + .build() + .read(self.plan_files().await?) + .map(|result| result.stream()) } /// Returns a reference to the column names of the table scan. @@ -1364,13 +1368,15 @@ pub mod tests { let batch_stream = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) - .unwrap(); + .unwrap() + .stream(); let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) - .unwrap(); + .unwrap() + .stream(); let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap(); assert_eq!(batch_1, batch_2);