[Improvement]: Refactor process API and table runtime extension interfaces#4097
Conversation
# Context Split from upstream PR apache#4081 to make review easier. This MR introduces the new process extension APIs in common module, without touching most AMS internals yet. # Changes - Promote `ProcessFactory` to a richer abstraction in `amoro-common`, preparing for plugin-based table process extension. - Adjust `Action`/`IcebergActions` and `TableRuntime` to align with the new process model. - Move `ActionCoordinator` into `amoro-common` so it can be shared as a public abstraction. - Introduce `ProcessTriggerStrategy` to describe trigger policies for processes. - Clean up legacy process state classes which will be replaced by the new abstractions in follow-up MRs. # Notes - This commit only updates the common module and the shared `ActionCoordinator` API; AMS-side wiring and runtime refactors will be done in separate branches/MRs as discussed. Co-Authored-By: Aime <aime@bytedance.com> Change-Id: If84ada8fcae1cfb11577d56d3866db7ce0949102
majin1102
left a comment
There was a problem hiding this comment.
Thanks for working on this.
Left some comments
| * @return target process which has not been submitted yet. | ||
| */ | ||
| AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state); | ||
| TableProcess recover(TableRuntime tableRuntime, TableProcessStore store); |
There was a problem hiding this comment.
I think we should also use Optional here for the case something went wrong with the recovering or incapatibility issues
There was a problem hiding this comment.
Maybe in this case you should throw an exception?
How about define an UnableRecoverProcess Exception ?
There was a problem hiding this comment.
These approaches aren't mutually exclusive. We can throw exceptions or return null to skip this process when needed.
There was a problem hiding this comment.
When the recover method is called, there must be a record of Process information in a managed state within the system database. From a system‑level perspective, if recovery is not possible at this point, it should be considered an abnormal situation, meaning the system needs to clean up resources and mark the Process as failed or abnormal. Therefore, I suggest explicitly indicating through an exception that this Process can no longer be recovered.
| import java.time.Duration; | ||
|
|
||
| /** Process trigger strategy. */ | ||
| public final class ProcessTriggerStrategy { |
There was a problem hiding this comment.
Why do we need this?
I originally thought we should always triggered process when we have new snapshots, config changed and time based. And this should be unextensiable to me
There was a problem hiding this comment.
At least, the factory should tell the scheduler the trigger interval.
I can provide a default implement.
| /** Table runtime factory. */ | ||
| public interface TableRuntimeFactory extends ActivePlugin { | ||
|
|
||
| List<ActionCoordinator> supportedCoordinators(); |
There was a problem hiding this comment.
So this extension should be aware of the implementation of Coordinator?
I feel a little weried for Lance scenario. Can we provide an abstract TableRuntime if the coordinator is really necessary here? (Seriouslly I think this should be a legacy issue?)
There was a problem hiding this comment.
You are not need to implement this plugin.
The default TableRuntimeFactory will create all coordinator automaticly, the only plugin you should provide is ProcessFactory.
There was a problem hiding this comment.
You mean I don't need to provide a LanceTableRuntime?
How can I return a LanceTableConfig? Could you provide a simple demo
There was a problem hiding this comment.
You can call TableRuntime.getTableConfig(): Map<String, String>
| */ | ||
| List<? extends TableProcessStore> getProcessStates(Action action); | ||
|
|
||
| void registerProcess(TableProcessStore processStore); |
There was a problem hiding this comment.
I think we should always use table runtime to trigger a process instead of register a process into table runtime.
If not, I don't get the meaning of putting factories into runtimes
There was a problem hiding this comment.
This method is used for process scheduler.
You are not need to care about this method. And also, you are no longer need to provide the implement of TableRuntime
There was a problem hiding this comment.
-- -- getProcessStates -> -- getProcessStore or getProcess
| * the weight number of this action, the bigger the weight number, the higher positions of | ||
| * schedulers or front pages | ||
| */ | ||
| private final int weight; |
There was a problem hiding this comment.
The weight is not used so far.
But it was designed to be used what action should be prioritied if we have multiple actions to schedule in one resouce group.
I was considering to use a priority here and add a default value here. WDYT
Why are the changes needed?
This MR introduces the new process extension APIs in common module, without touching most AMS internals yet.
Brief change log
ProcessFactoryto a richer abstraction inamoro-common, preparing for plugin-based table process extension.Action/IcebergActionsandTableRuntimeto align with the new process model.ActionCoordinatorintoamoro-commonso it can be shared as a public abstraction.ProcessTriggerStrategyto describe trigger policies for processes.Note
ActionCoordinatorAPI; AMS-side wiring and runtime refactors will be done in separate branches/MRs as discussed.How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
Add screenshots for manual tests if appropriate
Run test locally before making a pull request
Documentation