Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 216 additions & 32 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,41 @@
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Integration tests for reading Paimon tables provisioned by Spark.

use arrow_array::{Int32Array, StringArray};
use arrow_array::{Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use paimon::catalog::Identifier;
use paimon::{Catalog, FileSystemCatalog};
use paimon::{Catalog, FileSystemCatalog, Plan};
use std::collections::HashSet;

/// Get the test warehouse path from environment variable or use default.
fn get_test_warehouse() -> String {
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string())
}

async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
async fn scan_and_read(table_name: &str) -> (Plan, Vec<RecordBatch>) {
let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog");

let identifier = Identifier::new("default", table_name);
let table = catalog
.get_table(&identifier)
.await
.expect("Failed to get table");

let read_builder = table.new_read_builder();
let read = read_builder.new_read().expect("Failed to create read");
let scan = read_builder.new_scan();
let plan = scan.plan().await.expect("Failed to plan scan");

let read = read_builder.new_read().expect("Failed to create read");
let stream = read
.to_arrow(plan.splits())
.expect("Failed to create arrow stream");

let batches: Vec<_> = stream
.try_collect()
.await
Expand All @@ -55,57 +53,243 @@ async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
!batches.is_empty(),
"Expected at least one batch from table {table_name}"
);
(plan, batches)
}

let mut actual_rows: Vec<(i32, String)> = Vec::new();

for batch in &batches {
let id_array = batch
fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
let mut rows = Vec::new();
for batch in batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("Expected Int32Array for id column");
let name_array = batch
.expect("Expected Int32Array for id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("Expected StringArray for name column");

.expect("Expected StringArray for name");
for i in 0..batch.num_rows() {
actual_rows.push((id_array.value(i), name_array.value(i).to_string()));
rows.push((id.value(i), name.value(i).to_string()));
}
}

actual_rows.sort_by_key(|(id, _)| *id);
actual_rows
rows.sort_by_key(|(id, _)| *id);
rows
}

#[tokio::test]
async fn test_read_log_table() {
let actual_rows = read_rows("simple_log_table").await;
let expected_rows = vec![
let (plan, batches) = scan_and_read("simple_log_table").await;

// Non-partitioned table: partition should be a valid arity=0 BinaryRow
// deserialized from manifest bytes, not a stub without backing data.
for split in plan.splits() {
let partition = split.partition();
assert_eq!(partition.arity(), 0);
assert!(
!partition.is_empty(),
"Non-partitioned split should have backing data from manifest deserialization"
);
}

let actual = extract_id_name(&batches);
let expected = vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
];

assert_eq!(
actual_rows, expected_rows,
"Rows should match expected values"
);
assert_eq!(actual, expected, "Rows should match expected values");
}

#[tokio::test]
async fn test_read_dv_primary_key_table() {
let actual_rows = read_rows("simple_dv_pk_table").await;
let expected_rows = vec![
let (_, batches) = scan_and_read("simple_dv_pk_table").await;
let actual = extract_id_name(&batches);
let expected = vec![
(1, "alice-v2".to_string()),
(2, "bob-v2".to_string()),
(3, "carol-v2".to_string()),
(4, "dave-v2".to_string()),
(5, "eve-v2".to_string()),
(6, "frank-v1".to_string()),
];

assert_eq!(
actual_rows, expected_rows,
actual, expected,
"DV-enabled PK table should only expose the latest row per key"
);
}

#[tokio::test]
async fn test_read_partitioned_log_table() {
let (plan, batches) = scan_and_read("partitioned_log_table").await;

let mut seen_partitions: HashSet<String> = HashSet::new();
for split in plan.splits() {
let partition = split.partition();
assert_eq!(partition.arity(), 1);
assert!(!partition.is_empty());
let dt = partition.get_string(0).expect("Failed to decode dt");
let expected_suffix = format!("dt={dt}/bucket-{}", split.bucket());
assert!(
split.bucket_path().ends_with(&expected_suffix),
"bucket_path should end with '{expected_suffix}', got: {}",
split.bucket_path()
);
seen_partitions.insert(dt.to_string());
}
assert_eq!(
seen_partitions,
HashSet::from(["2024-01-01".into(), "2024-01-02".into()])
);

let mut rows: Vec<(i32, String, String)> = Vec::new();
for batch in &batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("name");
let dt = batch
.column_by_name("dt")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("dt");
for i in 0..batch.num_rows() {
rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
}
}
rows.sort_by_key(|(id, _, _)| *id);

assert_eq!(
rows,
vec![
(1, "alice".into(), "2024-01-01".into()),
(2, "bob".into(), "2024-01-01".into()),
(3, "carol".into(), "2024-01-02".into()),
]
);
}

#[tokio::test]
async fn test_read_multi_partitioned_log_table() {
let (plan, batches) = scan_and_read("multi_partitioned_log_table").await;

let mut seen_partitions: HashSet<(String, i32)> = HashSet::new();
for split in plan.splits() {
let partition = split.partition();
assert_eq!(partition.arity(), 2);
assert!(!partition.is_empty());
let dt = partition.get_string(0).expect("Failed to decode dt");
let hr = partition.get_int(1).expect("Failed to decode hr");
let expected_suffix = format!("dt={dt}/hr={hr}/bucket-{}", split.bucket());
assert!(
split.bucket_path().ends_with(&expected_suffix),
"bucket_path should end with '{expected_suffix}', got: {}",
split.bucket_path()
);
seen_partitions.insert((dt.to_string(), hr));
}
assert_eq!(
seen_partitions,
HashSet::from([
("2024-01-01".into(), 10),
("2024-01-01".into(), 20),
("2024-01-02".into(), 10),
])
);

let mut rows: Vec<(i32, String, String, i32)> = Vec::new();
for batch in &batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("name");
let dt = batch
.column_by_name("dt")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("dt");
let hr = batch
.column_by_name("hr")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("hr");
for i in 0..batch.num_rows() {
rows.push((
id.value(i),
name.value(i).into(),
dt.value(i).into(),
hr.value(i),
));
}
}
rows.sort_by_key(|(id, _, _, _)| *id);

assert_eq!(
rows,
vec![
(1, "alice".into(), "2024-01-01".into(), 10),
(2, "bob".into(), "2024-01-01".into(), 10),
(3, "carol".into(), "2024-01-01".into(), 20),
(4, "dave".into(), "2024-01-02".into(), 10),
]
);
}

#[tokio::test]
async fn test_read_partitioned_dv_pk_table() {
let (plan, batches) = scan_and_read("partitioned_dv_pk_table").await;

// Verify partition metadata on each split.
let mut seen_partitions: HashSet<String> = HashSet::new();
for split in plan.splits() {
let partition = split.partition();
assert_eq!(partition.arity(), 1);
assert!(!partition.is_empty());
let dt = partition.get_string(0).expect("Failed to decode dt");
let expected_suffix = format!("dt={dt}/bucket-{}", split.bucket());
assert!(
split.bucket_path().ends_with(&expected_suffix),
"bucket_path should end with '{expected_suffix}', got: {}",
split.bucket_path()
);
seen_partitions.insert(dt.to_string());
}
assert_eq!(
seen_partitions,
HashSet::from(["2024-01-01".into(), "2024-01-02".into()])
);

let mut rows: Vec<(i32, String, String)> = Vec::new();
for batch in &batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("name");
let dt = batch
.column_by_name("dt")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("dt");
for i in 0..batch.num_rows() {
rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
}
}
rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));

assert_eq!(
rows,
vec![
(1, "alice-v2".into(), "2024-01-01".into()),
(1, "alice-v1".into(), "2024-01-02".into()),
(2, "bob-v2".into(), "2024-01-01".into()),
(3, "carol-v2".into(), "2024-01-02".into()),
(4, "dave-v2".into(), "2024-01-02".into()),
]
);
}
51 changes: 50 additions & 1 deletion crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use std::collections::HashMap;
const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";

/// Typed accessors for common table options.
///
Expand All @@ -39,7 +42,7 @@ impl<'a> CoreOptions<'a> {
pub fn deletion_vectors_enabled(&self) -> bool {
self.options
.get(DELETION_VECTORS_ENABLED_OPTION)
.map(|value| matches!(value.to_ascii_lowercase().as_str(), "true"))
.map(|value| value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}

Expand All @@ -56,6 +59,27 @@ impl<'a> CoreOptions<'a> {
.and_then(|value| parse_memory_size(value))
.unwrap_or(DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST)
}

/// The default partition name for null/blank partition values.
///
/// Corresponds to Java `CoreOptions.PARTITION_DEFAULT_NAME`.
pub fn partition_default_name(&self) -> &str {
self.options
.get(PARTITION_DEFAULT_NAME_OPTION)
.map(String::as_str)
.unwrap_or(DEFAULT_PARTITION_DEFAULT_NAME)
}

/// Whether to use legacy partition name formatting (toString semantics).
///
/// Corresponds to Java `CoreOptions.PARTITION_GENERATE_LEGACY_NAME`.
/// Default: `true` to match Java Paimon.
pub fn legacy_partition_name(&self) -> bool {
self.options
.get(PARTITION_LEGACY_NAME_OPTION)
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(true)
}
}

/// Parse a memory size string to bytes using binary (1024-based) semantics.
Expand Down Expand Up @@ -131,4 +155,29 @@ mod tests {
assert_eq!(parse_memory_size(""), None);
assert_eq!(parse_memory_size("abc"), None);
}

#[test]
fn test_partition_options_defaults() {
let options = HashMap::new();
let core = CoreOptions::new(&options);
assert_eq!(core.partition_default_name(), "__DEFAULT_PARTITION__");
assert!(core.legacy_partition_name());
}

#[test]
fn test_partition_options_custom() {
let options = HashMap::from([
(
PARTITION_DEFAULT_NAME_OPTION.to_string(),
"NULL_PART".to_string(),
),
(
PARTITION_LEGACY_NAME_OPTION.to_string(),
"false".to_string(),
),
]);
let core = CoreOptions::new(&options);
assert_eq!(core.partition_default_name(), "NULL_PART");
assert!(!core.legacy_partition_name());
}
}
Loading
Loading