diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index f7f7b57..96e4f40 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -20,7 +20,7 @@ use arrow_array::{Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use paimon::catalog::Identifier; -use paimon::{Catalog, FileSystemCatalog, Plan}; +use paimon::{Catalog, Error, FileSystemCatalog, Plan}; use std::collections::HashSet; fn get_test_warehouse() -> String { @@ -293,3 +293,172 @@ async fn test_read_partitioned_dv_pk_table() { ] ); } + +async fn get_test_table(table_name: &str) -> paimon::Table { + let warehouse = get_test_warehouse(); + let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); + let identifier = Identifier::new("default", table_name); + catalog + .get_table(&identifier) + .await + .expect("Failed to get table") +} + +#[tokio::test] +async fn test_read_with_column_projection() { + let table = get_test_table("partitioned_log_table").await; + + let plan = table + .new_read_builder() + .new_scan() + .plan() + .await + .expect("Failed to plan scan"); + + // Input order ["name", "id"] — output must preserve this order (not schema order). + let read = table + .new_read_builder() + .with_projection(&["name", "id"]) + .new_read() + .expect("Failed to create projected read"); + + let field_names: Vec<&str> = read.read_type().iter().map(|f| f.name()).collect(); + assert_eq!( + field_names, + vec!["name", "id"], + "read_type should preserve caller-specified order" + ); + + let stream = read + .to_arrow(plan.splits()) + .expect("Failed to create arrow stream"); + let batches: Vec = stream + .try_collect() + .await + .expect("Failed to collect batches"); + assert!(!batches.is_empty()); + + for batch in &batches { + let schema = batch.schema(); + let batch_field_names: Vec<&str> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + batch_field_names, + vec!["name", "id"], + "RecordBatch schema should preserve caller-specified order" + ); + assert!( + batch.column_by_name("dt").is_none(), + "Non-projected column 'dt' should be absent" + ); + } + + let actual = extract_id_name(&batches); + let expected = vec![ + (1, "alice".to_string()), + (2, "bob".to_string()), + (3, "carol".to_string()), + ]; + assert_eq!(actual, expected); +} + +#[tokio::test] +async fn test_read_projection_empty() { + let table = get_test_table("simple_log_table").await; + + let read = table + .new_read_builder() + .with_projection(&[]) + .new_read() + .expect("Empty projection should succeed"); + + assert_eq!( + read.read_type().len(), + 0, + "Empty projection should produce empty read_type" + ); + + let plan = table + .new_read_builder() + .new_scan() + .plan() + .await + .expect("Failed to plan scan"); + + let stream = read + .to_arrow(plan.splits()) + .expect("Failed to create arrow stream"); + let batches: Vec = stream + .try_collect() + .await + .expect("Failed to collect batches"); + assert!(!batches.is_empty()); + + for batch in &batches { + assert_eq!( + batch.num_columns(), + 0, + "Empty projection should produce 0-column batches" + ); + } +} + +#[tokio::test] +async fn test_read_projection_unknown_column() { + let table = get_test_table("simple_log_table").await; + + let err = table + .new_read_builder() + .with_projection(&["id", "nonexistent_column"]) + .new_read() + .expect_err("Unknown columns should fail"); + + assert!( + matches!( + &err, + Error::ColumnNotExist { + full_name, + column, + } if full_name == "default.simple_log_table" && column == "nonexistent_column" + ), + "Expected ColumnNotExist for nonexistent_column, got: {err:?}" + ); +} + +#[tokio::test] +async fn test_read_projection_all_invalid() { + let table = get_test_table("simple_log_table").await; + + let err = table + .new_read_builder() + .with_projection(&["nonexistent_a", "nonexistent_b"]) + .new_read() + .expect_err("All-invalid projection should fail"); + + assert!( + matches!( + &err, + Error::ColumnNotExist { + full_name, + column, + } if full_name == "default.simple_log_table" && column == "nonexistent_a" + ), + "Expected ColumnNotExist for nonexistent_a, got: {err:?}" + ); +} + +#[tokio::test] +async fn test_read_projection_duplicate_column() { + let table = get_test_table("simple_log_table").await; + + let err = table + .new_read_builder() + .with_projection(&["id", "id"]) + .new_read() + .expect_err("Duplicate projection should fail"); + + assert!( + matches!(&err, Error::ConfigInvalid { message } if message.contains("Duplicate projection column 'id'")), + "Expected ConfigInvalid for duplicate projection, got: {err:?}" + ); +} diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 7690e92..0ec11a4 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -85,12 +85,10 @@ impl ArrowReader { // Owned list of splits so the stream does not hold references. let splits: Vec = data_splits.to_vec(); let read_type = self.read_type; - let projected_column_names = (!read_type.is_empty()).then(|| { - read_type - .iter() - .map(|field| field.name().to_string()) - .collect::>() - }); + let projected_column_names: Vec = read_type + .iter() + .map(|field| field.name().to_string()) + .collect(); Ok(try_stream! { for split in splits { // Create DV factory for this split only (like Java createReader(partition, bucket, files, deletionFiles)). @@ -134,15 +132,13 @@ impl ArrowReader { let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader) .await?; - // Clip to read type columns; file-only columns are dropped. - if let Some(projected_column_names) = projected_column_names.as_ref() { - let parquet_schema = batch_stream_builder.parquet_schema(); - let mask = ProjectionMask::columns( - parquet_schema, - projected_column_names.iter().map(String::as_str), - ); - batch_stream_builder = batch_stream_builder.with_projection(mask); - } + // ProjectionMask preserves parquet-schema order; read_type order is restored below. + let parquet_schema = batch_stream_builder.parquet_schema().clone(); + let mask = ProjectionMask::columns( + &parquet_schema, + projected_column_names.iter().map(String::as_str), + ); + batch_stream_builder = batch_stream_builder.with_projection(mask); if let Some(dv) = dv { if !dv.is_empty() { @@ -157,7 +153,30 @@ impl ArrowReader { let mut batch_stream = batch_stream_builder.build()?; while let Some(batch) = batch_stream.next().await { - yield batch? + let batch = batch?; + // Reorder columns from parquet-schema order to read_type order. + // Every projected column must exist in the batch; a missing + // column indicates schema mismatch and must not be silenced. + let reorder_indices: Vec = projected_column_names + .iter() + .map(|name| { + batch.schema().index_of(name).map_err(|_| { + Error::UnexpectedError { + message: format!( + "Projected column '{}' not found in Parquet batch schema", + name + ), + source: None, + } + }) + }) + .collect::>>()?; + yield batch.project(&reorder_indices).map_err(|e| { + Error::UnexpectedError { + message: "Failed to reorder projected columns".to_string(), + source: Some(Box::new(e)), + } + })?; } } } diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 1f640cd..b52d9a1 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -17,26 +17,40 @@ //! ReadBuilder and TableRead for table read API. //! -//! Reference: [pypaimon.read.read_builder.ReadBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/read_builder.py) -//! and [pypaimon.table.file_store_table.FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/table/file_store_table.py). +//! Reference: [Java ReadBuilder.withProjection](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java) +//! and [TypeUtils.project](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java). use super::{ArrowRecordBatchStream, Table, TableScan}; use crate::arrow::ArrowReaderBuilder; use crate::spec::{CoreOptions, DataField}; use crate::Result; use crate::{DataSplit, Error}; +use std::collections::{HashMap, HashSet}; /// Builder for table scan and table read (new_scan, new_read). /// -/// Reference: [pypaimon.read.read_builder.ReadBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/read_builder.py) +/// Rust keeps a names-based projection API for ergonomics, while aligning the +/// resulting read semantics with Java Paimon's order-preserving projection. #[derive(Debug, Clone)] pub struct ReadBuilder<'a> { table: &'a Table, + projected_fields: Option>, } impl<'a> ReadBuilder<'a> { pub(crate) fn new(table: &'a Table) -> Self { - Self { table } + Self { + table, + projected_fields: None, + } + } + + /// Set column projection by name. Output order follows the caller-specified order. + /// Unknown or duplicate names cause `new_read()` to fail; an empty list is a valid + /// zero-column projection. + pub fn with_projection(mut self, columns: &[&str]) -> Self { + self.projected_fields = Some(columns.iter().map(|c| (*c).to_string()).collect()); + self } /// Create a table scan. Call [TableScan::plan] to get splits. @@ -46,12 +60,55 @@ impl<'a> ReadBuilder<'a> { /// Create a table read for consuming splits (e.g. from a scan plan). pub fn new_read(&self) -> Result> { - let read_type = self.table.schema.fields(); + let read_type = match &self.projected_fields { + None => self.table.schema.fields().to_vec(), + Some(projected) => self.resolve_projected_fields(projected)?, + }; + Ok(TableRead { table: self.table, read_type, }) } + + fn resolve_projected_fields(&self, projected_fields: &[String]) -> Result> { + if projected_fields.is_empty() { + return Ok(Vec::new()); + } + + let full_name = self.table.identifier().full_name(); + let field_map: HashMap<&str, &DataField> = self + .table + .schema + .fields() + .iter() + .map(|field| (field.name(), field)) + .collect(); + + let mut seen = HashSet::with_capacity(projected_fields.len()); + let mut resolved = Vec::with_capacity(projected_fields.len()); + + for name in projected_fields { + if !seen.insert(name.as_str()) { + return Err(Error::ConfigInvalid { + message: format!( + "Duplicate projection column '{}' for table {}", + name, full_name + ), + }); + } + + let field = field_map + .get(name.as_str()) + .ok_or_else(|| Error::ColumnNotExist { + full_name: full_name.clone(), + column: name.clone(), + })?; + resolved.push((*field).clone()); + } + + Ok(resolved) + } } /// Table read: reads data from splits (e.g. produced by [TableScan::plan]). @@ -60,13 +117,13 @@ impl<'a> ReadBuilder<'a> { #[derive(Debug, Clone)] pub struct TableRead<'a> { table: &'a Table, - read_type: &'a [DataField], + read_type: Vec, } impl<'a> TableRead<'a> { /// Schema (fields) that this read will produce. pub fn read_type(&self) -> &[DataField] { - self.read_type + &self.read_type } /// Table for this read.