diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index d2768b4..69dd921 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -49,6 +49,7 @@ serde_with = "3.9.0" serde_repr = "0.1" snafu = "0.8.3" typed-builder = "^0.19" +regex = "1.10" opendal = { version = "0.49", features = ["services-fs"] } pretty_assertions = "1" apache-avro = { version = "0.17", features = ["snappy", "zstandard"] } diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 8279aac..9b816a4 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -36,7 +36,8 @@ pub mod table; pub use catalog::Catalog; pub use catalog::FileSystemCatalog; +pub use spec::Consumer; pub use table::{ - DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, SnapshotManager, - Table, TableRead, TableScan, + ConsumerManager, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, + SnapshotManager, Table, TableRead, TableScan, }; diff --git a/crates/paimon/src/spec/consumer.rs b/crates/paimon/src/spec/consumer.rs new file mode 100644 index 0000000..88e166d --- /dev/null +++ b/crates/paimon/src/spec/consumer.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::io::FileIO; +use serde::{Deserialize, Serialize}; + +/// Consumer which contains next snapshot. +/// +/// Reference: +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Consumer { + #[serde(rename = "nextSnapshot")] + next_snapshot: i64, +} + +impl Consumer { + /// Create a new consumer with the given next snapshot ID. + pub fn new(next_snapshot: i64) -> Self { + Self { next_snapshot } + } + + /// Get the next snapshot ID. + pub fn next_snapshot(&self) -> i64 { + self.next_snapshot + } + + /// Serialize consumer to JSON string. + pub fn to_json(&self) -> Result { + serde_json::to_string(self).map_err(|e| crate::error::Error::DataInvalid { + message: format!("Failed to serialize consumer: {}", e), + source: Some(Box::new(e) as Box), + }) + } + + /// Deserialize consumer from JSON string. + pub fn from_json(json: &str) -> Result { + serde_json::from_str(json).map_err(|e| crate::error::Error::DataInvalid { + message: format!("Failed to deserialize consumer: {}", e), + source: Some(Box::new(e) as Box), + }) + } + + /// Read consumer from a file path with retry mechanism. + /// + /// This implements a retry mechanism similar to the Java version to handle + /// concurrent write scenarios. + pub async fn from_path(file_io: &FileIO, path: &str) -> Result> { + let input_file = file_io.new_input(path)?; + + let mut last_error = None; + for _ in 0..10 { + match input_file.read().await { + Ok(bytes) => { + let json = String::from_utf8(bytes.to_vec()).map_err(|e| { + crate::error::Error::DataInvalid { + message: format!("Consumer file is not valid UTF-8: {}", e), + source: Some(Box::new(e) as Box), + } + })?; + return Ok(Some(Self::from_json(&json)?)); + } + Err(e) => { + // Check if file doesn't exist + // Handle different error messages across platforms (Windows/Linux/macOS) + let error_str = e.to_string().to_lowercase(); + if error_str.contains("not found") + || error_str.contains("no such file") + || error_str.contains("is not found") + || error_str.contains("notexist") + || error_str.contains("does not exist") + || error_str.contains("invalid") + // Windows may return "invalid filename" + { + return Ok(None); + } + last_error = Some(e); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + } + } + + Err( + last_error.unwrap_or_else(|| crate::error::Error::DataInvalid { + message: format!("Failed to read consumer from {} after 10 attempts", path), + source: Some(Box::new(std::io::Error::other("All retry attempts failed")) + as Box), + }), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_consumer_serialization() { + let consumer = Consumer::new(5); + let json = consumer.to_json().unwrap(); + assert!(json.contains("nextSnapshot")); + assert!(json.contains("5")); + + let deserialized = Consumer::from_json(&json).unwrap(); + assert_eq!(consumer, deserialized); + } + + #[test] + fn test_consumer_next_snapshot() { + let consumer = Consumer::new(10); + assert_eq!(consumer.next_snapshot(), 10); + } + + #[test] + fn test_consumer_invalid_json() { + let result = Consumer::from_json("invalid json"); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index f1ef422..0ac89c4 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -53,4 +53,6 @@ pub use objects_file::from_avro_bytes; pub(crate) mod stats; mod types; pub use types::*; +mod consumer; mod partition_utils; +pub use consumer::Consumer; diff --git a/crates/paimon/src/table/consumer_manager.rs b/crates/paimon/src/table/consumer_manager.rs new file mode 100644 index 0000000..17ba04d --- /dev/null +++ b/crates/paimon/src/table/consumer_manager.rs @@ -0,0 +1,380 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::io::FileIO; +use crate::spec::Consumer; +use chrono::{DateTime, Utc}; +use regex::Regex; +use std::collections::HashMap; + +/// Manage consumer groups. +/// +/// Reference: +#[derive(Debug, Clone)] +pub struct ConsumerManager { + file_io: FileIO, + table_path: String, + branch: String, +} + +const CONSUMER_PREFIX: &str = "consumer-"; +const DEFAULT_MAIN_BRANCH: &str = "main"; + +impl ConsumerManager { + /// Create a new ConsumerManager with default "main" branch. + pub fn new(file_io: FileIO, table_path: String) -> Self { + Self { + file_io, + table_path, + branch: DEFAULT_MAIN_BRANCH.to_string(), + } + } + + /// Create a new ConsumerManager with custom branch name. + /// + /// If branch name is null or whitespace, it will default to "main". + pub fn with_branch(file_io: FileIO, table_path: String, branch_name: Option) -> Self { + let branch = if let Some(name) = branch_name { + if name.trim().is_empty() { + DEFAULT_MAIN_BRANCH.to_string() + } else { + name + } + } else { + DEFAULT_MAIN_BRANCH.to_string() + }; + + Self { + file_io, + table_path, + branch, + } + } + + /// Get consumer by ID. + pub async fn consumer(&self, consumer_id: &str) -> Result> { + let path = self.consumer_path(consumer_id); + Consumer::from_path(&self.file_io, &path).await + } + + /// Reset (update) consumer with given ID and consumer data. + pub async fn reset_consumer(&self, consumer_id: &str, consumer: &Consumer) -> Result<()> { + let path = self.consumer_path(consumer_id); + let output_file = self.file_io.new_output(&path)?; + let json = consumer.to_json()?; + output_file.write(bytes::Bytes::from(json)).await + } + + /// Delete consumer with given ID. + pub async fn delete_consumer(&self, consumer_id: &str) -> Result<()> { + let path = self.consumer_path(consumer_id); + self.file_io.delete_file(&path).await + } + + /// Find the minimum next snapshot ID among all consumers. + /// + /// Returns None if there are no consumers. + pub async fn min_next_snapshot(&self) -> Result> { + let consumer_ids = self.list_all_ids().await?; + let mut min_snapshot: Option = None; + + for consumer_id in &consumer_ids { + if let Some(consumer) = self.consumer(consumer_id).await? { + let snapshot_id = consumer.next_snapshot(); + match min_snapshot { + None => min_snapshot = Some(snapshot_id), + Some(current_min) if snapshot_id < current_min => { + min_snapshot = Some(snapshot_id) + } + _ => {} + } + } + } + + Ok(min_snapshot) + } + + /// Expire consumers whose modification time is before given datetime. + pub async fn expire(&self, expire_datetime: DateTime) -> Result<()> { + let consumer_ids = self.list_all_ids().await?; + + for consumer_id in &consumer_ids { + let path = self.consumer_path(consumer_id); + + // Get file modification time + if let Some(modification_time) = self.get_file_modification_time(&path).await? { + if expire_datetime > modification_time { + // Delete the consumer file + self.file_io.delete_file(&path).await?; + } + } + } + + Ok(()) + } + + /// Clear consumers matching the given patterns. + /// + /// # Arguments + /// * `including_pattern` - Regex pattern for consumers to include (if None, match all) + /// * `excluding_pattern` - Regex pattern for consumers to exclude (if None, exclude none) + pub async fn clear_consumers( + &self, + including_pattern: Option<&Regex>, + excluding_pattern: Option<&Regex>, + ) -> Result<()> { + let consumer_ids = self.list_all_ids().await?; + + for consumer_id in &consumer_ids { + let mut should_clear = match including_pattern { + Some(pattern) => pattern.is_match(consumer_id), + None => true, + }; + + if should_clear { + should_clear = match excluding_pattern { + Some(pattern) => !pattern.is_match(consumer_id), + None => true, + }; + } + + if should_clear { + let path = self.consumer_path(consumer_id); + self.file_io.delete_file(&path).await?; + } + } + + Ok(()) + } + + /// Get all consumers as a map of consumer ID to next snapshot ID. + pub async fn consumers(&self) -> Result> { + let consumer_ids = self.list_all_ids().await?; + let mut consumers_map = HashMap::new(); + + for consumer_id in &consumer_ids { + if let Some(consumer) = self.consumer(consumer_id).await? { + consumers_map.insert(consumer_id.clone(), consumer.next_snapshot()); + } + } + + Ok(consumers_map) + } + + /// List all consumer IDs. + pub async fn list_all_ids(&self) -> Result> { + let consumer_dir = self.consumer_directory(); + let mut consumer_ids = Vec::new(); + + // Try to list the consumer directory + match self.file_io.list_status(&consumer_dir).await { + Ok(statuses) => { + for status in &statuses { + if !status.is_dir { + let filename = status.path.rsplit('/').next().unwrap_or(&status.path); + + if let Some(consumer_id) = self.extract_consumer_id_from_filename(filename) + { + consumer_ids.push(consumer_id); + } + } + } + } + Err(e) => { + // If directory doesn't exist, return empty list + if e.to_string().contains("not found") + || e.to_string().contains("No such file") + || e.to_string().contains("is not a directory") + { + return Ok(Vec::new()); + } + return Err(e); + } + } + + Ok(consumer_ids) + } + + // Helper methods + + fn consumer_directory(&self) -> String { + format!("{}/{}/consumer", self.table_path, self.branch) + } + + fn consumer_path(&self, consumer_id: &str) -> String { + format!( + "{}/{}{}", + self.consumer_directory(), + CONSUMER_PREFIX, + consumer_id + ) + } + + fn extract_consumer_id_from_filename(&self, filename: &str) -> Option { + filename + .strip_prefix(CONSUMER_PREFIX) + .map(|s| s.to_string()) + } + + async fn get_file_modification_time(&self, path: &str) -> Result>> { + match self.file_io.get_status(path).await { + Ok(status) => Ok(status.last_modified), + Err(e) => { + if e.to_string().contains("not found") || e.to_string().contains("No such file") { + Ok(None) + } else { + Err(e) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIO; + use url::Url; + + async fn create_test_manager() -> (ConsumerManager, tempfile::TempDir) { + let temp_dir = tempfile::tempdir().unwrap(); + let temp_path = temp_dir.path().to_string_lossy().to_string(); + let file_io = FileIO::from_path(&temp_path).unwrap().build().unwrap(); + let table_path = Url::from_file_path(&temp_path).unwrap().to_string(); + let manager = ConsumerManager::new(file_io, table_path); + (manager, temp_dir) + } + + #[tokio::test] + async fn test_reset_and_get_consumer() { + let (manager, _temp) = create_test_manager().await; + let consumer = Consumer::new(5); + + manager.reset_consumer("test-id", &consumer).await.unwrap(); + let retrieved = manager.consumer("test-id").await.unwrap(); + + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().next_snapshot(), 5); + } + + #[tokio::test] + async fn test_nonexistent_consumer() { + let (manager, _temp) = create_test_manager().await; + let result = manager.consumer("nonexistent").await.unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_delete_consumer() { + let (manager, _temp) = create_test_manager().await; + let consumer = Consumer::new(5); + + manager.reset_consumer("test-id", &consumer).await.unwrap(); + manager.delete_consumer("test-id").await.unwrap(); + let result = manager.consumer("test-id").await.unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_min_next_snapshot() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("id1", &Consumer::new(10)) + .await + .unwrap(); + manager + .reset_consumer("id2", &Consumer::new(5)) + .await + .unwrap(); + manager + .reset_consumer("id3", &Consumer::new(15)) + .await + .unwrap(); + + let min_snapshot = manager.min_next_snapshot().await.unwrap(); + assert_eq!(min_snapshot, Some(5)); + } + + #[tokio::test] + async fn test_empty_min_next_snapshot() { + let (manager, _temp) = create_test_manager().await; + let min_snapshot = manager.min_next_snapshot().await.unwrap(); + + assert_eq!(min_snapshot, None); + } + + #[tokio::test] + async fn test_list_all_ids() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("id1", &Consumer::new(1)) + .await + .unwrap(); + manager + .reset_consumer("id2", &Consumer::new(2)) + .await + .unwrap(); + manager + .reset_consumer("id3", &Consumer::new(3)) + .await + .unwrap(); + + let ids = manager.list_all_ids().await.unwrap(); + assert_eq!(ids.len(), 3); + assert!(ids.contains(&"id1".to_string())); + assert!(ids.contains(&"id2".to_string())); + assert!(ids.contains(&"id3".to_string())); + } + + #[tokio::test] + async fn test_clear_consumers_with_patterns() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("test-1", &Consumer::new(1)) + .await + .unwrap(); + manager + .reset_consumer("test-2", &Consumer::new(2)) + .await + .unwrap(); + manager + .reset_consumer("prod-1", &Consumer::new(3)) + .await + .unwrap(); + manager + .reset_consumer("prod-2", &Consumer::new(4)) + .await + .unwrap(); + + let including = Regex::new(r"^test-").unwrap(); + manager + .clear_consumers(Some(&including), None) + .await + .unwrap(); + + let ids = manager.list_all_ids().await.unwrap(); + assert_eq!(ids.len(), 2); + assert!(ids.contains(&"prod-1".to_string())); + assert!(ids.contains(&"prod-2".to_string())); + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 7c3f9a5..e7f57f9 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -18,6 +18,7 @@ //! Table API for Apache Paimon pub(crate) mod bin_pack; +mod consumer_manager; mod read_builder; mod snapshot_manager; mod source; @@ -25,6 +26,7 @@ mod table_scan; use crate::Result; use arrow_array::RecordBatch; +pub use consumer_manager::ConsumerManager; use futures::stream::BoxStream; pub use read_builder::{ReadBuilder, TableRead}; pub use snapshot_manager::SnapshotManager;