From 01712528b4f21e8db82aeae4337b81972cf79785 Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Mon, 23 Mar 2026 21:36:39 +0800 Subject: [PATCH 1/6] feat: introduce consumer_manager --- crates/paimon/Cargo.toml | 1 + crates/paimon/src/lib.rs | 5 +- crates/paimon/src/spec/consumer.rs | 127 ++++++ crates/paimon/src/spec/mod.rs | 2 + crates/paimon/src/table/consumer_manager.rs | 377 ++++++++++++++++++ crates/paimon/src/table/consumer_manager_1.rs | 377 ++++++++++++++++++ crates/paimon/src/table/mod.rs | 2 + 7 files changed, 889 insertions(+), 2 deletions(-) create mode 100644 crates/paimon/src/spec/consumer.rs create mode 100644 crates/paimon/src/table/consumer_manager.rs create mode 100644 crates/paimon/src/table/consumer_manager_1.rs diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index d2768b4..69dd921 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -49,6 +49,7 @@ serde_with = "3.9.0" serde_repr = "0.1" snafu = "0.8.3" typed-builder = "^0.19" +regex = "1.10" opendal = { version = "0.49", features = ["services-fs"] } pretty_assertions = "1" apache-avro = { version = "0.17", features = ["snappy", "zstandard"] } diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 8279aac..9b816a4 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -36,7 +36,8 @@ pub mod table; pub use catalog::Catalog; pub use catalog::FileSystemCatalog; +pub use spec::Consumer; pub use table::{ - DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, SnapshotManager, - Table, TableRead, TableScan, + ConsumerManager, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, + SnapshotManager, Table, TableRead, TableScan, }; diff --git a/crates/paimon/src/spec/consumer.rs b/crates/paimon/src/spec/consumer.rs new file mode 100644 index 0000000..8db13c6 --- /dev/null +++ b/crates/paimon/src/spec/consumer.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::io::FileIO; +use serde::{Deserialize, Serialize}; + +/// Consumer which contains next snapshot. +/// +/// Reference: +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Consumer { + #[serde(rename = "nextSnapshot")] + next_snapshot: i64, +} + +impl Consumer { + /// Create a new consumer with the given next snapshot ID. + pub fn new(next_snapshot: i64) -> Self { + Self { next_snapshot } + } + + /// Get the next snapshot ID. + pub fn next_snapshot(&self) -> i64 { + self.next_snapshot + } + + /// Serialize consumer to JSON string. + pub fn to_json(&self) -> Result { + serde_json::to_string(self).map_err(|e| crate::error::Error::DataInvalid { + message: format!("Failed to serialize consumer: {}", e), + source: Some(Box::new(e) as Box), + }) + } + + /// Deserialize consumer from JSON string. + pub fn from_json(json: &str) -> Result { + serde_json::from_str(json).map_err(|e| crate::error::Error::DataInvalid { + message: format!("Failed to deserialize consumer: {}", e), + source: Some(Box::new(e) as Box), + }) + } + + /// Read consumer from a file path with retry mechanism. + /// + /// This implements a retry mechanism similar to the Java version to handle + /// concurrent write scenarios. + pub async fn from_path(file_io: &FileIO, path: &str) -> Result> { + let input_file = file_io.new_input(path)?; + + let mut last_error = None; + for _ in 0..10 { + match input_file.read().await { + Ok(bytes) => { + let json = String::from_utf8(bytes.to_vec()).map_err(|e| { + crate::error::Error::DataInvalid { + message: format!("Consumer file is not valid UTF-8: {}", e), + source: Some(Box::new(e) as Box), + } + })?; + return Ok(Some(Self::from_json(&json)?)); + } + Err(e) => { + // Check if file doesn't exist + if e.to_string().contains("not found") + || e.to_string().contains("No such file") + || e.to_string().contains("is not found") + { + return Ok(None); + } + last_error = Some(e); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + } + } + + Err( + last_error.unwrap_or_else(|| crate::error::Error::DataInvalid { + message: format!("Failed to read consumer from {} after 10 attempts", path), + source: Some(Box::new(std::io::Error::other("All retry attempts failed")) + as Box), + }), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_consumer_serialization() { + let consumer = Consumer::new(5); + let json = consumer.to_json().unwrap(); + assert!(json.contains("nextSnapshot")); + assert!(json.contains("5")); + + let deserialized = Consumer::from_json(&json).unwrap(); + assert_eq!(consumer, deserialized); + } + + #[test] + fn test_consumer_next_snapshot() { + let consumer = Consumer::new(10); + assert_eq!(consumer.next_snapshot(), 10); + } + + #[test] + fn test_consumer_invalid_json() { + let result = Consumer::from_json("invalid json"); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index f1ef422..0ac89c4 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -53,4 +53,6 @@ pub use objects_file::from_avro_bytes; pub(crate) mod stats; mod types; pub use types::*; +mod consumer; mod partition_utils; +pub use consumer::Consumer; diff --git a/crates/paimon/src/table/consumer_manager.rs b/crates/paimon/src/table/consumer_manager.rs new file mode 100644 index 0000000..809fb25 --- /dev/null +++ b/crates/paimon/src/table/consumer_manager.rs @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::io::FileIO; +use crate::spec::Consumer; +use chrono::{DateTime, Utc}; +use regex::Regex; +use std::collections::HashMap; + +/// Manage consumer groups. +/// +/// Reference: +#[derive(Debug, Clone)] +pub struct ConsumerManager { + file_io: FileIO, + table_path: String, + branch: String, +} + +const CONSUMER_PREFIX: &str = "consumer-"; +const DEFAULT_MAIN_BRANCH: &str = "main"; + +impl ConsumerManager { + /// Create a new ConsumerManager with default "main" branch. + pub fn new(file_io: FileIO, table_path: String) -> Self { + Self { + file_io, + table_path, + branch: DEFAULT_MAIN_BRANCH.to_string(), + } + } + + /// Create a new ConsumerManager with custom branch name. + /// + /// If branch name is null or whitespace, it will default to "main". + pub fn with_branch(file_io: FileIO, table_path: String, branch_name: Option) -> Self { + let branch = if let Some(name) = branch_name { + if name.trim().is_empty() { + DEFAULT_MAIN_BRANCH.to_string() + } else { + name + } + } else { + DEFAULT_MAIN_BRANCH.to_string() + }; + + Self { + file_io, + table_path, + branch, + } + } + + /// Get consumer by ID. + pub async fn consumer(&self, consumer_id: &str) -> Result> { + let path = self.consumer_path(consumer_id); + Consumer::from_path(&self.file_io, &path).await + } + + /// Reset (update) consumer with given ID and consumer data. + pub async fn reset_consumer(&self, consumer_id: &str, consumer: &Consumer) -> Result<()> { + let path = self.consumer_path(consumer_id); + let output_file = self.file_io.new_output(&path)?; + let json = consumer.to_json()?; + output_file.write(bytes::Bytes::from(json)).await + } + + /// Delete consumer with given ID. + pub async fn delete_consumer(&self, consumer_id: &str) -> Result<()> { + let path = self.consumer_path(consumer_id); + self.file_io.delete_file(&path).await + } + + /// Find the minimum next snapshot ID among all consumers. + /// + /// Returns None if there are no consumers. + pub async fn min_next_snapshot(&self) -> Result> { + let consumer_ids = self.list_all_ids().await?; + let mut min_snapshot: Option = None; + + for consumer_id in &consumer_ids { + if let Some(consumer) = self.consumer(consumer_id).await? { + let snapshot_id = consumer.next_snapshot(); + match min_snapshot { + None => min_snapshot = Some(snapshot_id), + Some(current_min) if snapshot_id < current_min => { + min_snapshot = Some(snapshot_id) + } + _ => {} + } + } + } + + Ok(min_snapshot) + } + + /// Expire consumers whose modification time is before given datetime. + pub async fn expire(&self, expire_datetime: DateTime) -> Result<()> { + let consumer_ids = self.list_all_ids().await?; + + for consumer_id in &consumer_ids { + let path = self.consumer_path(consumer_id); + + // Get file modification time + if let Some(modification_time) = self.get_file_modification_time(&path).await? { + if expire_datetime > modification_time { + // Delete the consumer file + self.file_io.delete_file(&path).await?; + } + } + } + + Ok(()) + } + + /// Clear consumers matching the given patterns. + /// + /// # Arguments + /// * `including_pattern` - Regex pattern for consumers to include (if None, match all) + /// * `excluding_pattern` - Regex pattern for consumers to exclude (if None, exclude none) + pub async fn clear_consumers( + &self, + including_pattern: Option<&Regex>, + excluding_pattern: Option<&Regex>, + ) -> Result<()> { + let consumer_ids = self.list_all_ids().await?; + + for consumer_id in &consumer_ids { + let mut should_clear = match including_pattern { + Some(pattern) => pattern.is_match(consumer_id), + None => true, + }; + + if should_clear { + should_clear = match excluding_pattern { + Some(pattern) => !pattern.is_match(consumer_id), + None => true, + }; + } + + if should_clear { + let path = self.consumer_path(consumer_id); + self.file_io.delete_file(&path).await?; + } + } + + Ok(()) + } + + /// Get all consumers as a map of consumer ID to next snapshot ID. + pub async fn consumers(&self) -> Result> { + let consumer_ids = self.list_all_ids().await?; + let mut consumers_map = HashMap::new(); + + for consumer_id in &consumer_ids { + if let Some(consumer) = self.consumer(consumer_id).await? { + consumers_map.insert(consumer_id.clone(), consumer.next_snapshot()); + } + } + + Ok(consumers_map) + } + + /// List all consumer IDs. + pub async fn list_all_ids(&self) -> Result> { + let consumer_dir = self.consumer_directory(); + let mut consumer_ids = Vec::new(); + + // Try to list the consumer directory + match self.file_io.list_status(&consumer_dir).await { + Ok(statuses) => { + for status in &statuses { + if !status.is_dir { + let filename = status.path.rsplit('/').next().unwrap_or(&status.path); + + if let Some(consumer_id) = self.extract_consumer_id_from_filename(filename) + { + consumer_ids.push(consumer_id); + } + } + } + } + Err(e) => { + // If directory doesn't exist, return empty list + if e.to_string().contains("not found") + || e.to_string().contains("No such file") + || e.to_string().contains("is not a directory") + { + return Ok(Vec::new()); + } + return Err(e); + } + } + + Ok(consumer_ids) + } + + // Helper methods + + fn consumer_directory(&self) -> String { + format!("{}/{}/consumer", self.table_path, self.branch) + } + + fn consumer_path(&self, consumer_id: &str) -> String { + format!( + "{}/{}{}", + self.consumer_directory(), + CONSUMER_PREFIX, + consumer_id + ) + } + + fn extract_consumer_id_from_filename(&self, filename: &str) -> Option { + filename + .strip_prefix(CONSUMER_PREFIX) + .map(|s| s.to_string()) + } + + async fn get_file_modification_time(&self, path: &str) -> Result>> { + match self.file_io.get_status(path).await { + Ok(status) => Ok(status.last_modified), + Err(e) => { + if e.to_string().contains("not found") || e.to_string().contains("No such file") { + Ok(None) + } else { + Err(e) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + + async fn create_test_manager() -> (ConsumerManager, tempfile::TempDir) { + let temp_dir = tempfile::tempdir().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let manager = ConsumerManager::new(file_io, temp_dir.path().to_string_lossy().to_string()); + (manager, temp_dir) + } + + #[tokio::test] + async fn test_reset_and_get_consumer() { + let (manager, _temp) = create_test_manager().await; + let consumer = Consumer::new(5); + + manager.reset_consumer("test-id", &consumer).await.unwrap(); + let retrieved = manager.consumer("test-id").await.unwrap(); + + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().next_snapshot(), 5); + } + + #[tokio::test] + async fn test_nonexistent_consumer() { + let (manager, _temp) = create_test_manager().await; + let result = manager.consumer("nonexistent").await.unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_delete_consumer() { + let (manager, _temp) = create_test_manager().await; + let consumer = Consumer::new(5); + + manager.reset_consumer("test-id", &consumer).await.unwrap(); + manager.delete_consumer("test-id").await.unwrap(); + let result = manager.consumer("test-id").await.unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_min_next_snapshot() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("id1", &Consumer::new(10)) + .await + .unwrap(); + manager + .reset_consumer("id2", &Consumer::new(5)) + .await + .unwrap(); + manager + .reset_consumer("id3", &Consumer::new(15)) + .await + .unwrap(); + + let min_snapshot = manager.min_next_snapshot().await.unwrap(); + assert_eq!(min_snapshot, Some(5)); + } + + #[tokio::test] + async fn test_empty_min_next_snapshot() { + let (manager, _temp) = create_test_manager().await; + let min_snapshot = manager.min_next_snapshot().await.unwrap(); + + assert_eq!(min_snapshot, None); + } + + #[tokio::test] + async fn test_list_all_ids() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("id1", &Consumer::new(1)) + .await + .unwrap(); + manager + .reset_consumer("id2", &Consumer::new(2)) + .await + .unwrap(); + manager + .reset_consumer("id3", &Consumer::new(3)) + .await + .unwrap(); + + let ids = manager.list_all_ids().await.unwrap(); + assert_eq!(ids.len(), 3); + assert!(ids.contains(&"id1".to_string())); + assert!(ids.contains(&"id2".to_string())); + assert!(ids.contains(&"id3".to_string())); + } + + #[tokio::test] + async fn test_clear_consumers_with_patterns() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("test-1", &Consumer::new(1)) + .await + .unwrap(); + manager + .reset_consumer("test-2", &Consumer::new(2)) + .await + .unwrap(); + manager + .reset_consumer("prod-1", &Consumer::new(3)) + .await + .unwrap(); + manager + .reset_consumer("prod-2", &Consumer::new(4)) + .await + .unwrap(); + + let including = Regex::new(r"^test-").unwrap(); + manager + .clear_consumers(Some(&including), None) + .await + .unwrap(); + + let ids = manager.list_all_ids().await.unwrap(); + assert_eq!(ids.len(), 2); + assert!(ids.contains(&"prod-1".to_string())); + assert!(ids.contains(&"prod-2".to_string())); + } +} diff --git a/crates/paimon/src/table/consumer_manager_1.rs b/crates/paimon/src/table/consumer_manager_1.rs new file mode 100644 index 0000000..809fb25 --- /dev/null +++ b/crates/paimon/src/table/consumer_manager_1.rs @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::io::FileIO; +use crate::spec::Consumer; +use chrono::{DateTime, Utc}; +use regex::Regex; +use std::collections::HashMap; + +/// Manage consumer groups. +/// +/// Reference: +#[derive(Debug, Clone)] +pub struct ConsumerManager { + file_io: FileIO, + table_path: String, + branch: String, +} + +const CONSUMER_PREFIX: &str = "consumer-"; +const DEFAULT_MAIN_BRANCH: &str = "main"; + +impl ConsumerManager { + /// Create a new ConsumerManager with default "main" branch. + pub fn new(file_io: FileIO, table_path: String) -> Self { + Self { + file_io, + table_path, + branch: DEFAULT_MAIN_BRANCH.to_string(), + } + } + + /// Create a new ConsumerManager with custom branch name. + /// + /// If branch name is null or whitespace, it will default to "main". + pub fn with_branch(file_io: FileIO, table_path: String, branch_name: Option) -> Self { + let branch = if let Some(name) = branch_name { + if name.trim().is_empty() { + DEFAULT_MAIN_BRANCH.to_string() + } else { + name + } + } else { + DEFAULT_MAIN_BRANCH.to_string() + }; + + Self { + file_io, + table_path, + branch, + } + } + + /// Get consumer by ID. + pub async fn consumer(&self, consumer_id: &str) -> Result> { + let path = self.consumer_path(consumer_id); + Consumer::from_path(&self.file_io, &path).await + } + + /// Reset (update) consumer with given ID and consumer data. + pub async fn reset_consumer(&self, consumer_id: &str, consumer: &Consumer) -> Result<()> { + let path = self.consumer_path(consumer_id); + let output_file = self.file_io.new_output(&path)?; + let json = consumer.to_json()?; + output_file.write(bytes::Bytes::from(json)).await + } + + /// Delete consumer with given ID. + pub async fn delete_consumer(&self, consumer_id: &str) -> Result<()> { + let path = self.consumer_path(consumer_id); + self.file_io.delete_file(&path).await + } + + /// Find the minimum next snapshot ID among all consumers. + /// + /// Returns None if there are no consumers. + pub async fn min_next_snapshot(&self) -> Result> { + let consumer_ids = self.list_all_ids().await?; + let mut min_snapshot: Option = None; + + for consumer_id in &consumer_ids { + if let Some(consumer) = self.consumer(consumer_id).await? { + let snapshot_id = consumer.next_snapshot(); + match min_snapshot { + None => min_snapshot = Some(snapshot_id), + Some(current_min) if snapshot_id < current_min => { + min_snapshot = Some(snapshot_id) + } + _ => {} + } + } + } + + Ok(min_snapshot) + } + + /// Expire consumers whose modification time is before given datetime. + pub async fn expire(&self, expire_datetime: DateTime) -> Result<()> { + let consumer_ids = self.list_all_ids().await?; + + for consumer_id in &consumer_ids { + let path = self.consumer_path(consumer_id); + + // Get file modification time + if let Some(modification_time) = self.get_file_modification_time(&path).await? { + if expire_datetime > modification_time { + // Delete the consumer file + self.file_io.delete_file(&path).await?; + } + } + } + + Ok(()) + } + + /// Clear consumers matching the given patterns. + /// + /// # Arguments + /// * `including_pattern` - Regex pattern for consumers to include (if None, match all) + /// * `excluding_pattern` - Regex pattern for consumers to exclude (if None, exclude none) + pub async fn clear_consumers( + &self, + including_pattern: Option<&Regex>, + excluding_pattern: Option<&Regex>, + ) -> Result<()> { + let consumer_ids = self.list_all_ids().await?; + + for consumer_id in &consumer_ids { + let mut should_clear = match including_pattern { + Some(pattern) => pattern.is_match(consumer_id), + None => true, + }; + + if should_clear { + should_clear = match excluding_pattern { + Some(pattern) => !pattern.is_match(consumer_id), + None => true, + }; + } + + if should_clear { + let path = self.consumer_path(consumer_id); + self.file_io.delete_file(&path).await?; + } + } + + Ok(()) + } + + /// Get all consumers as a map of consumer ID to next snapshot ID. + pub async fn consumers(&self) -> Result> { + let consumer_ids = self.list_all_ids().await?; + let mut consumers_map = HashMap::new(); + + for consumer_id in &consumer_ids { + if let Some(consumer) = self.consumer(consumer_id).await? { + consumers_map.insert(consumer_id.clone(), consumer.next_snapshot()); + } + } + + Ok(consumers_map) + } + + /// List all consumer IDs. + pub async fn list_all_ids(&self) -> Result> { + let consumer_dir = self.consumer_directory(); + let mut consumer_ids = Vec::new(); + + // Try to list the consumer directory + match self.file_io.list_status(&consumer_dir).await { + Ok(statuses) => { + for status in &statuses { + if !status.is_dir { + let filename = status.path.rsplit('/').next().unwrap_or(&status.path); + + if let Some(consumer_id) = self.extract_consumer_id_from_filename(filename) + { + consumer_ids.push(consumer_id); + } + } + } + } + Err(e) => { + // If directory doesn't exist, return empty list + if e.to_string().contains("not found") + || e.to_string().contains("No such file") + || e.to_string().contains("is not a directory") + { + return Ok(Vec::new()); + } + return Err(e); + } + } + + Ok(consumer_ids) + } + + // Helper methods + + fn consumer_directory(&self) -> String { + format!("{}/{}/consumer", self.table_path, self.branch) + } + + fn consumer_path(&self, consumer_id: &str) -> String { + format!( + "{}/{}{}", + self.consumer_directory(), + CONSUMER_PREFIX, + consumer_id + ) + } + + fn extract_consumer_id_from_filename(&self, filename: &str) -> Option { + filename + .strip_prefix(CONSUMER_PREFIX) + .map(|s| s.to_string()) + } + + async fn get_file_modification_time(&self, path: &str) -> Result>> { + match self.file_io.get_status(path).await { + Ok(status) => Ok(status.last_modified), + Err(e) => { + if e.to_string().contains("not found") || e.to_string().contains("No such file") { + Ok(None) + } else { + Err(e) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + + async fn create_test_manager() -> (ConsumerManager, tempfile::TempDir) { + let temp_dir = tempfile::tempdir().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let manager = ConsumerManager::new(file_io, temp_dir.path().to_string_lossy().to_string()); + (manager, temp_dir) + } + + #[tokio::test] + async fn test_reset_and_get_consumer() { + let (manager, _temp) = create_test_manager().await; + let consumer = Consumer::new(5); + + manager.reset_consumer("test-id", &consumer).await.unwrap(); + let retrieved = manager.consumer("test-id").await.unwrap(); + + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().next_snapshot(), 5); + } + + #[tokio::test] + async fn test_nonexistent_consumer() { + let (manager, _temp) = create_test_manager().await; + let result = manager.consumer("nonexistent").await.unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_delete_consumer() { + let (manager, _temp) = create_test_manager().await; + let consumer = Consumer::new(5); + + manager.reset_consumer("test-id", &consumer).await.unwrap(); + manager.delete_consumer("test-id").await.unwrap(); + let result = manager.consumer("test-id").await.unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_min_next_snapshot() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("id1", &Consumer::new(10)) + .await + .unwrap(); + manager + .reset_consumer("id2", &Consumer::new(5)) + .await + .unwrap(); + manager + .reset_consumer("id3", &Consumer::new(15)) + .await + .unwrap(); + + let min_snapshot = manager.min_next_snapshot().await.unwrap(); + assert_eq!(min_snapshot, Some(5)); + } + + #[tokio::test] + async fn test_empty_min_next_snapshot() { + let (manager, _temp) = create_test_manager().await; + let min_snapshot = manager.min_next_snapshot().await.unwrap(); + + assert_eq!(min_snapshot, None); + } + + #[tokio::test] + async fn test_list_all_ids() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("id1", &Consumer::new(1)) + .await + .unwrap(); + manager + .reset_consumer("id2", &Consumer::new(2)) + .await + .unwrap(); + manager + .reset_consumer("id3", &Consumer::new(3)) + .await + .unwrap(); + + let ids = manager.list_all_ids().await.unwrap(); + assert_eq!(ids.len(), 3); + assert!(ids.contains(&"id1".to_string())); + assert!(ids.contains(&"id2".to_string())); + assert!(ids.contains(&"id3".to_string())); + } + + #[tokio::test] + async fn test_clear_consumers_with_patterns() { + let (manager, _temp) = create_test_manager().await; + + manager + .reset_consumer("test-1", &Consumer::new(1)) + .await + .unwrap(); + manager + .reset_consumer("test-2", &Consumer::new(2)) + .await + .unwrap(); + manager + .reset_consumer("prod-1", &Consumer::new(3)) + .await + .unwrap(); + manager + .reset_consumer("prod-2", &Consumer::new(4)) + .await + .unwrap(); + + let including = Regex::new(r"^test-").unwrap(); + manager + .clear_consumers(Some(&including), None) + .await + .unwrap(); + + let ids = manager.list_all_ids().await.unwrap(); + assert_eq!(ids.len(), 2); + assert!(ids.contains(&"prod-1".to_string())); + assert!(ids.contains(&"prod-2".to_string())); + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 7c3f9a5..e7f57f9 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -18,6 +18,7 @@ //! Table API for Apache Paimon pub(crate) mod bin_pack; +mod consumer_manager; mod read_builder; mod snapshot_manager; mod source; @@ -25,6 +26,7 @@ mod table_scan; use crate::Result; use arrow_array::RecordBatch; +pub use consumer_manager::ConsumerManager; use futures::stream::BoxStream; pub use read_builder::{ReadBuilder, TableRead}; pub use snapshot_manager::SnapshotManager; From 9dc32cb27066cfa899d931929183ca1feb6376e5 Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Mon, 23 Mar 2026 21:38:47 +0800 Subject: [PATCH 2/6] fix --- crates/paimon/src/table/consumer_manager_1.rs | 377 ------------------ 1 file changed, 377 deletions(-) delete mode 100644 crates/paimon/src/table/consumer_manager_1.rs diff --git a/crates/paimon/src/table/consumer_manager_1.rs b/crates/paimon/src/table/consumer_manager_1.rs deleted file mode 100644 index 809fb25..0000000 --- a/crates/paimon/src/table/consumer_manager_1.rs +++ /dev/null @@ -1,377 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::error::Result; -use crate::io::FileIO; -use crate::spec::Consumer; -use chrono::{DateTime, Utc}; -use regex::Regex; -use std::collections::HashMap; - -/// Manage consumer groups. -/// -/// Reference: -#[derive(Debug, Clone)] -pub struct ConsumerManager { - file_io: FileIO, - table_path: String, - branch: String, -} - -const CONSUMER_PREFIX: &str = "consumer-"; -const DEFAULT_MAIN_BRANCH: &str = "main"; - -impl ConsumerManager { - /// Create a new ConsumerManager with default "main" branch. - pub fn new(file_io: FileIO, table_path: String) -> Self { - Self { - file_io, - table_path, - branch: DEFAULT_MAIN_BRANCH.to_string(), - } - } - - /// Create a new ConsumerManager with custom branch name. - /// - /// If branch name is null or whitespace, it will default to "main". - pub fn with_branch(file_io: FileIO, table_path: String, branch_name: Option) -> Self { - let branch = if let Some(name) = branch_name { - if name.trim().is_empty() { - DEFAULT_MAIN_BRANCH.to_string() - } else { - name - } - } else { - DEFAULT_MAIN_BRANCH.to_string() - }; - - Self { - file_io, - table_path, - branch, - } - } - - /// Get consumer by ID. - pub async fn consumer(&self, consumer_id: &str) -> Result> { - let path = self.consumer_path(consumer_id); - Consumer::from_path(&self.file_io, &path).await - } - - /// Reset (update) consumer with given ID and consumer data. - pub async fn reset_consumer(&self, consumer_id: &str, consumer: &Consumer) -> Result<()> { - let path = self.consumer_path(consumer_id); - let output_file = self.file_io.new_output(&path)?; - let json = consumer.to_json()?; - output_file.write(bytes::Bytes::from(json)).await - } - - /// Delete consumer with given ID. - pub async fn delete_consumer(&self, consumer_id: &str) -> Result<()> { - let path = self.consumer_path(consumer_id); - self.file_io.delete_file(&path).await - } - - /// Find the minimum next snapshot ID among all consumers. - /// - /// Returns None if there are no consumers. - pub async fn min_next_snapshot(&self) -> Result> { - let consumer_ids = self.list_all_ids().await?; - let mut min_snapshot: Option = None; - - for consumer_id in &consumer_ids { - if let Some(consumer) = self.consumer(consumer_id).await? { - let snapshot_id = consumer.next_snapshot(); - match min_snapshot { - None => min_snapshot = Some(snapshot_id), - Some(current_min) if snapshot_id < current_min => { - min_snapshot = Some(snapshot_id) - } - _ => {} - } - } - } - - Ok(min_snapshot) - } - - /// Expire consumers whose modification time is before given datetime. - pub async fn expire(&self, expire_datetime: DateTime) -> Result<()> { - let consumer_ids = self.list_all_ids().await?; - - for consumer_id in &consumer_ids { - let path = self.consumer_path(consumer_id); - - // Get file modification time - if let Some(modification_time) = self.get_file_modification_time(&path).await? { - if expire_datetime > modification_time { - // Delete the consumer file - self.file_io.delete_file(&path).await?; - } - } - } - - Ok(()) - } - - /// Clear consumers matching the given patterns. - /// - /// # Arguments - /// * `including_pattern` - Regex pattern for consumers to include (if None, match all) - /// * `excluding_pattern` - Regex pattern for consumers to exclude (if None, exclude none) - pub async fn clear_consumers( - &self, - including_pattern: Option<&Regex>, - excluding_pattern: Option<&Regex>, - ) -> Result<()> { - let consumer_ids = self.list_all_ids().await?; - - for consumer_id in &consumer_ids { - let mut should_clear = match including_pattern { - Some(pattern) => pattern.is_match(consumer_id), - None => true, - }; - - if should_clear { - should_clear = match excluding_pattern { - Some(pattern) => !pattern.is_match(consumer_id), - None => true, - }; - } - - if should_clear { - let path = self.consumer_path(consumer_id); - self.file_io.delete_file(&path).await?; - } - } - - Ok(()) - } - - /// Get all consumers as a map of consumer ID to next snapshot ID. - pub async fn consumers(&self) -> Result> { - let consumer_ids = self.list_all_ids().await?; - let mut consumers_map = HashMap::new(); - - for consumer_id in &consumer_ids { - if let Some(consumer) = self.consumer(consumer_id).await? { - consumers_map.insert(consumer_id.clone(), consumer.next_snapshot()); - } - } - - Ok(consumers_map) - } - - /// List all consumer IDs. - pub async fn list_all_ids(&self) -> Result> { - let consumer_dir = self.consumer_directory(); - let mut consumer_ids = Vec::new(); - - // Try to list the consumer directory - match self.file_io.list_status(&consumer_dir).await { - Ok(statuses) => { - for status in &statuses { - if !status.is_dir { - let filename = status.path.rsplit('/').next().unwrap_or(&status.path); - - if let Some(consumer_id) = self.extract_consumer_id_from_filename(filename) - { - consumer_ids.push(consumer_id); - } - } - } - } - Err(e) => { - // If directory doesn't exist, return empty list - if e.to_string().contains("not found") - || e.to_string().contains("No such file") - || e.to_string().contains("is not a directory") - { - return Ok(Vec::new()); - } - return Err(e); - } - } - - Ok(consumer_ids) - } - - // Helper methods - - fn consumer_directory(&self) -> String { - format!("{}/{}/consumer", self.table_path, self.branch) - } - - fn consumer_path(&self, consumer_id: &str) -> String { - format!( - "{}/{}{}", - self.consumer_directory(), - CONSUMER_PREFIX, - consumer_id - ) - } - - fn extract_consumer_id_from_filename(&self, filename: &str) -> Option { - filename - .strip_prefix(CONSUMER_PREFIX) - .map(|s| s.to_string()) - } - - async fn get_file_modification_time(&self, path: &str) -> Result>> { - match self.file_io.get_status(path).await { - Ok(status) => Ok(status.last_modified), - Err(e) => { - if e.to_string().contains("not found") || e.to_string().contains("No such file") { - Ok(None) - } else { - Err(e) - } - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::io::FileIOBuilder; - - async fn create_test_manager() -> (ConsumerManager, tempfile::TempDir) { - let temp_dir = tempfile::tempdir().unwrap(); - let file_io = FileIOBuilder::new("memory").build().unwrap(); - let manager = ConsumerManager::new(file_io, temp_dir.path().to_string_lossy().to_string()); - (manager, temp_dir) - } - - #[tokio::test] - async fn test_reset_and_get_consumer() { - let (manager, _temp) = create_test_manager().await; - let consumer = Consumer::new(5); - - manager.reset_consumer("test-id", &consumer).await.unwrap(); - let retrieved = manager.consumer("test-id").await.unwrap(); - - assert!(retrieved.is_some()); - assert_eq!(retrieved.unwrap().next_snapshot(), 5); - } - - #[tokio::test] - async fn test_nonexistent_consumer() { - let (manager, _temp) = create_test_manager().await; - let result = manager.consumer("nonexistent").await.unwrap(); - - assert!(result.is_none()); - } - - #[tokio::test] - async fn test_delete_consumer() { - let (manager, _temp) = create_test_manager().await; - let consumer = Consumer::new(5); - - manager.reset_consumer("test-id", &consumer).await.unwrap(); - manager.delete_consumer("test-id").await.unwrap(); - let result = manager.consumer("test-id").await.unwrap(); - - assert!(result.is_none()); - } - - #[tokio::test] - async fn test_min_next_snapshot() { - let (manager, _temp) = create_test_manager().await; - - manager - .reset_consumer("id1", &Consumer::new(10)) - .await - .unwrap(); - manager - .reset_consumer("id2", &Consumer::new(5)) - .await - .unwrap(); - manager - .reset_consumer("id3", &Consumer::new(15)) - .await - .unwrap(); - - let min_snapshot = manager.min_next_snapshot().await.unwrap(); - assert_eq!(min_snapshot, Some(5)); - } - - #[tokio::test] - async fn test_empty_min_next_snapshot() { - let (manager, _temp) = create_test_manager().await; - let min_snapshot = manager.min_next_snapshot().await.unwrap(); - - assert_eq!(min_snapshot, None); - } - - #[tokio::test] - async fn test_list_all_ids() { - let (manager, _temp) = create_test_manager().await; - - manager - .reset_consumer("id1", &Consumer::new(1)) - .await - .unwrap(); - manager - .reset_consumer("id2", &Consumer::new(2)) - .await - .unwrap(); - manager - .reset_consumer("id3", &Consumer::new(3)) - .await - .unwrap(); - - let ids = manager.list_all_ids().await.unwrap(); - assert_eq!(ids.len(), 3); - assert!(ids.contains(&"id1".to_string())); - assert!(ids.contains(&"id2".to_string())); - assert!(ids.contains(&"id3".to_string())); - } - - #[tokio::test] - async fn test_clear_consumers_with_patterns() { - let (manager, _temp) = create_test_manager().await; - - manager - .reset_consumer("test-1", &Consumer::new(1)) - .await - .unwrap(); - manager - .reset_consumer("test-2", &Consumer::new(2)) - .await - .unwrap(); - manager - .reset_consumer("prod-1", &Consumer::new(3)) - .await - .unwrap(); - manager - .reset_consumer("prod-2", &Consumer::new(4)) - .await - .unwrap(); - - let including = Regex::new(r"^test-").unwrap(); - manager - .clear_consumers(Some(&including), None) - .await - .unwrap(); - - let ids = manager.list_all_ids().await.unwrap(); - assert_eq!(ids.len(), 2); - assert!(ids.contains(&"prod-1".to_string())); - assert!(ids.contains(&"prod-2".to_string())); - } -} From d88ede0d36d1acea5168eed35447c64d1c3e94c3 Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Mon, 23 Mar 2026 21:41:51 +0800 Subject: [PATCH 3/6] fix comments --- crates/paimon/src/spec/consumer.rs | 2 +- crates/paimon/src/table/consumer_manager.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/paimon/src/spec/consumer.rs b/crates/paimon/src/spec/consumer.rs index 8db13c6..d31be58 100644 --- a/crates/paimon/src/spec/consumer.rs +++ b/crates/paimon/src/spec/consumer.rs @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize}; /// Consumer which contains next snapshot. /// -/// Reference: +/// Reference: #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Consumer { #[serde(rename = "nextSnapshot")] diff --git a/crates/paimon/src/table/consumer_manager.rs b/crates/paimon/src/table/consumer_manager.rs index 809fb25..b255b7e 100644 --- a/crates/paimon/src/table/consumer_manager.rs +++ b/crates/paimon/src/table/consumer_manager.rs @@ -24,7 +24,7 @@ use std::collections::HashMap; /// Manage consumer groups. /// -/// Reference: +/// Reference: #[derive(Debug, Clone)] pub struct ConsumerManager { file_io: FileIO, From 073bbaf77471e7e44b163e2ad0609aa58b56a04d Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Mon, 23 Mar 2026 22:05:24 +0800 Subject: [PATCH 4/6] fix test --- crates/paimon/src/table/consumer_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/paimon/src/table/consumer_manager.rs b/crates/paimon/src/table/consumer_manager.rs index b255b7e..c439ddc 100644 --- a/crates/paimon/src/table/consumer_manager.rs +++ b/crates/paimon/src/table/consumer_manager.rs @@ -252,7 +252,7 @@ mod tests { async fn create_test_manager() -> (ConsumerManager, tempfile::TempDir) { let temp_dir = tempfile::tempdir().unwrap(); - let file_io = FileIOBuilder::new("memory").build().unwrap(); + let file_io = FileIOBuilder::new("file").build().unwrap(); let manager = ConsumerManager::new(file_io, temp_dir.path().to_string_lossy().to_string()); (manager, temp_dir) } From 5fe015e5f8f0b926eeb35ab65f21655d024cc243 Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Mon, 23 Mar 2026 22:16:39 +0800 Subject: [PATCH 5/6] fix windows test --- crates/paimon/src/spec/consumer.rs | 12 +++++++++--- crates/paimon/src/table/consumer_manager.rs | 3 ++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/paimon/src/spec/consumer.rs b/crates/paimon/src/spec/consumer.rs index d31be58..88e166d 100644 --- a/crates/paimon/src/spec/consumer.rs +++ b/crates/paimon/src/spec/consumer.rs @@ -76,9 +76,15 @@ impl Consumer { } Err(e) => { // Check if file doesn't exist - if e.to_string().contains("not found") - || e.to_string().contains("No such file") - || e.to_string().contains("is not found") + // Handle different error messages across platforms (Windows/Linux/macOS) + let error_str = e.to_string().to_lowercase(); + if error_str.contains("not found") + || error_str.contains("no such file") + || error_str.contains("is not found") + || error_str.contains("notexist") + || error_str.contains("does not exist") + || error_str.contains("invalid") + // Windows may return "invalid filename" { return Ok(None); } diff --git a/crates/paimon/src/table/consumer_manager.rs b/crates/paimon/src/table/consumer_manager.rs index c439ddc..b91cd74 100644 --- a/crates/paimon/src/table/consumer_manager.rs +++ b/crates/paimon/src/table/consumer_manager.rs @@ -253,7 +253,8 @@ mod tests { async fn create_test_manager() -> (ConsumerManager, tempfile::TempDir) { let temp_dir = tempfile::tempdir().unwrap(); let file_io = FileIOBuilder::new("file").build().unwrap(); - let manager = ConsumerManager::new(file_io, temp_dir.path().to_string_lossy().to_string()); + let table_path = format!("file:{}", temp_dir.path().to_string_lossy()); + let manager = ConsumerManager::new(file_io, table_path); (manager, temp_dir) } From 8407cc0be5942493080c86c2473c34058eaeb6b2 Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Mon, 23 Mar 2026 22:32:47 +0800 Subject: [PATCH 6/6] fix windows test --- crates/paimon/src/table/consumer_manager.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/paimon/src/table/consumer_manager.rs b/crates/paimon/src/table/consumer_manager.rs index b91cd74..17ba04d 100644 --- a/crates/paimon/src/table/consumer_manager.rs +++ b/crates/paimon/src/table/consumer_manager.rs @@ -248,12 +248,14 @@ impl ConsumerManager { #[cfg(test)] mod tests { use super::*; - use crate::io::FileIOBuilder; + use crate::io::FileIO; + use url::Url; async fn create_test_manager() -> (ConsumerManager, tempfile::TempDir) { let temp_dir = tempfile::tempdir().unwrap(); - let file_io = FileIOBuilder::new("file").build().unwrap(); - let table_path = format!("file:{}", temp_dir.path().to_string_lossy()); + let temp_path = temp_dir.path().to_string_lossy().to_string(); + let file_io = FileIO::from_path(&temp_path).unwrap().build().unwrap(); + let table_path = Url::from_file_path(&temp_path).unwrap().to_string(); let manager = ConsumerManager::new(file_io, table_path); (manager, temp_dir) }