Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f206864
v1
carloea2 Feb 26, 2026
299dc0f
Merge branch 'apache:main' into feat/ui-parameter
carloea2 Feb 27, 2026
5b29ad8
v2
carloea2 Feb 28, 2026
6258a99
Merge branch 'feat/ui-parameter' of https://github.com/carloea2/texer…
carloea2 Feb 28, 2026
3550ccd
v2
carloea2 Mar 2, 2026
7a210a7
py2udf
carloea2 Mar 4, 2026
4a56861
v2.1
carloea2 Mar 7, 2026
16f89ac
Merge branch 'feat/ui-parameter' of https://github.com/carloea2/texer…
carloea2 Mar 7, 2026
5aa3f4e
vnt
carloea2 Mar 7, 2026
1d82c9a
v2
carloea2 Mar 7, 2026
b36628f
v2.1
carloea2 Mar 7, 2026
dd46609
Update ui-udf-parameters.component.scss
carloea2 Mar 7, 2026
2c8167b
v2.1
carloea2 Mar 7, 2026
06d7f5a
v3
carloea2 Mar 7, 2026
ac2b265
v3
carloea2 Mar 7, 2026
c8239c2
Merge branch 'main' into feat/ui-parameter
carloea2 Mar 7, 2026
edd3869
Merge branch 'main' into feat/ui-parameter
carloea2 Mar 7, 2026
0c9f0cd
Merge branch 'main' into feat/ui-parameter
chenlica Mar 10, 2026
6ff1e86
v3CopilotVs
carloea2 Mar 16, 2026
ce96e19
Merge branch 'main' into feat/ui-parameter
carloea2 Mar 16, 2026
52845c3
Update PythonUdfUiParameterInjectorSpec.scala
carloea2 Mar 16, 2026
b863937
Merge branch 'feat/ui-parameter' of https://github.com/carloea2/texer…
carloea2 Mar 16, 2026
06ba53b
v4
carloea2 Mar 16, 2026
7d317a0
Merge branch 'main' into feat/ui-parameter
carloea2 Apr 2, 2026
1aee56e
v10
carloea2 Apr 2, 2026
ed6e53c
Merge branch 'feat/ui-parameter' of https://github.com/carloea2/texer…
carloea2 Apr 2, 2026
9425340
Merge branch 'main' into feat/ui-parameter
carloea2 Apr 2, 2026
16a9687
Merge branch 'main' into feat/ui-parameter
chenlica Apr 2, 2026
00c2deb
Handling empty values with defaults
carloea2 Apr 2, 2026
f8f8915
v21
carloea2 Apr 2, 2026
4634adc
Update ui-udf-parameters-sync.service.ts
carloea2 Apr 2, 2026
61e5bb9
Merge branch 'main' into feat/ui-parameter
carloea2 Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions amber/src/main/python/core/models/schema/attribute_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ class AttributeType(Enum):
}


FROM_STRING_PARSER_MAPPING = {
AttributeType.STRING: str,
AttributeType.INT: lambda v: (
0 if v is None or (isinstance(v, str) and v.strip() == "") else int(v)
),
AttributeType.LONG: lambda v: (
0 if v is None or (isinstance(v, str) and v.strip() == "") else int(v)
),
AttributeType.DOUBLE: lambda v: (
0.0 if v is None or (isinstance(v, str) and v.strip() == "") else float(v)
),
AttributeType.BOOL: lambda v: (
False
if v is None or (isinstance(v, str) and v.strip() == "")
else str(v).strip().lower() in ("true", "1", "yes")
),
AttributeType.BINARY: lambda v: (
b""
if v is None or (isinstance(v, str) and v.strip() == "")
else (v if isinstance(v, bytes) else str(v).encode())
),
AttributeType.TIMESTAMP: lambda v: (
datetime.datetime.fromtimestamp(0)
if v is None or (isinstance(v, str) and v.strip() == "")
else datetime.datetime.fromisoformat(v)
),
AttributeType.LARGE_BINARY: lambda v: largebinary(v),
}

# Only single-directional mapping.
TO_PYOBJECT_MAPPING = {
AttributeType.STRING: str,
Expand Down
2 changes: 2 additions & 0 deletions amber/src/main/python/pyamber/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
SourceOperator,
TupleOperatorV2,
State,
AttributeType,
)

__all__ = [
Expand All @@ -41,4 +42,5 @@
"TupleOperatorV2",
"SourceOperator",
"State",
"AttributeType",
]
6 changes: 5 additions & 1 deletion amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from loguru import logger
from overrides import overrides
from typing import Iterator, Optional, Union
from typing import Iterator, Optional, Union, Dict, Any

from pyamber import *
from .storage.dataset_file_document import DatasetFileDocument
Expand All @@ -30,6 +30,7 @@
UDFSourceOperator,
)
from core.models.type.large_binary import largebinary
from core.models.schema.attribute_type import *

__all__ = [
"State",
Expand All @@ -53,4 +54,7 @@
"Iterator",
"Optional",
"Union",
"Dict",
"Any",
"AttributeType",
]
154 changes: 154 additions & 0 deletions amber/src/main/python/pytexera/udf/test_udf_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
from typing import Iterator, Optional

import pytest

from core.models.type.large_binary import largebinary
from pytexera import AttributeType, Tuple, TupleLike, UDFOperatorV2
from pytexera.udf.udf_operator import _UiParameterSupport


class InjectedParametersOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {
"count": "7",
"enabled": "yes",
"created_at": "2024-01-01T00:00:00",
}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)
self.enabled_parameter = self.UiParameter(
name="enabled", type=AttributeType.BOOL
)
self.created_at_parameter = self.UiParameter(
"created_at", type=AttributeType.TIMESTAMP
)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class ConflictingParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"duplicate": "1"}

def open(self):
self.UiParameter("duplicate", AttributeType.INT)
self.UiParameter("duplicate", AttributeType.STRING)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class FirstIndependentParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"count": "1"}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class SecondIndependentParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"count": "2"}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class TestUiParameterSupport:
def test_injected_values_are_applied_before_open(self):
operator = InjectedParametersOperator()

operator.open()

assert operator.count_parameter.value == 7
assert operator.enabled_parameter.value is True
assert operator.created_at_parameter.value == datetime.datetime(
2024, 1, 1, 0, 0
)

def test_duplicate_parameter_names_with_conflicting_types_raise(self):
operator = ConflictingParameterOperator()

with pytest.raises(ValueError) as exc_info:
operator.open()

assert "Duplicate UiParameter name 'duplicate'" in str(exc_info.value)

@pytest.mark.parametrize(
("raw_value", "attr_type", "expected"),
[
("hello", AttributeType.STRING, "hello"),
("7", AttributeType.INT, 7),
("99", AttributeType.LONG, 99),
("3.14", AttributeType.DOUBLE, 3.14),
("yes", AttributeType.BOOL, True),
("payload", AttributeType.BINARY, b"payload"),
(
"2024-01-01T00:00:00",
AttributeType.TIMESTAMP,
datetime.datetime(2024, 1, 1, 0, 0),
),
(
"s3://bucket/path/to/object",
AttributeType.LARGE_BINARY,
largebinary("s3://bucket/path/to/object"),
),
],
)
def test_parse_supported_types(self, raw_value, attr_type, expected):
assert _UiParameterSupport._parse(raw_value, attr_type) == expected

def test_parse_unsupported_type_raises_helpful_error(self):
with pytest.raises(TypeError, match="UiParameter.type .* is not supported"):
_UiParameterSupport._parse("value", object())

def test_wrapped_open_uses_instance_local_state(self):
assert (
getattr(
FirstIndependentParameterOperator.open,
"__texera_ui_params_wrapped__",
False,
)
is True
)

first_operator = FirstIndependentParameterOperator()
second_operator = SecondIndependentParameterOperator()

first_operator.open()
second_operator.open()

assert first_operator.count_parameter.value == 1
assert second_operator.count_parameter.value == 2
assert first_operator._ui_parameter_injected_values == {"count": "1"}
assert second_operator._ui_parameter_injected_values == {"count": "2"}
assert (
first_operator._ui_parameter_injected_values
is not second_operator._ui_parameter_injected_values
)
119 changes: 114 additions & 5 deletions amber/src/main/python/pytexera/udf/udf_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,121 @@
# under the License.

from abc import abstractmethod
from typing import Iterator, Optional, Union
from dataclasses import dataclass
import functools
from typing import Any, Dict, Iterator, Optional, Union

from pyamber import *
from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING


class UDFOperatorV2(TupleOperatorV2):
@dataclass(frozen=True)
class _UiParameterValue:
name: str
type: AttributeType
value: Any


class _UiParameterSupport:
_ui_parameter_injected_values: Dict[str, Any]
_ui_parameter_name_types: Dict[str, AttributeType]

# Reserved hook name. Backend injector will generate this in the user's class.
def _texera_injected_ui_parameters(self) -> Dict[str, Any]:
return {}

def _ensure_ui_parameter_state(self) -> None:
if "_ui_parameter_injected_values" not in self.__dict__:
self._ui_parameter_injected_values = {}
if "_ui_parameter_name_types" not in self.__dict__:
self._ui_parameter_name_types = {}

def _texera_apply_injected_ui_parameters(self) -> None:
self._ensure_ui_parameter_state()
values = self._texera_injected_ui_parameters()
self._ui_parameter_injected_values = dict(values or {})
self._ui_parameter_name_types = {}

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)

# Wrap the effective open() method once per subclass.
original_open = getattr(cls, "open", None)
if original_open is None:
return

# Avoid double wrapping
if getattr(original_open, "__texera_ui_params_wrapped__", False):
return

@functools.wraps(original_open)
def wrapped_open(self, *args, **kwargs):
self._texera_apply_injected_ui_parameters()
return original_open(self, *args, **kwargs)

setattr(wrapped_open, "__texera_ui_params_wrapped__", True)
cls.open = wrapped_open

def UiParameter(
self, name: str, attr_type: Optional[AttributeType] = None, **kwargs: Any
) -> _UiParameterValue:
if "type" in kwargs:
if attr_type is not None:
raise TypeError("UiParameter.type was provided multiple times.")
attr_type = kwargs.pop("type")

if kwargs:
unexpected_arguments = ", ".join(sorted(kwargs))
raise TypeError(
f"UiParameter got unexpected keyword argument(s): "
f"{unexpected_arguments}."
)

if attr_type is None:
raise TypeError("UiParameter.type is required.")

if not isinstance(attr_type, AttributeType):
raise TypeError(
f"UiParameter.type must be an AttributeType, got {attr_type!r}."
)

self._ensure_ui_parameter_state()
existing_type = self._ui_parameter_name_types.get(name)
if existing_type is not None and existing_type != attr_type:
raise ValueError(
f"Duplicate UiParameter name '{name}' with conflicting types: "
f"{existing_type.name} vs {attr_type.name}."
)

self._ui_parameter_name_types[name] = attr_type
raw_value = self._ui_parameter_injected_values.get(name)
return _UiParameterValue(
name=name,
type=attr_type,
value=self._parse(raw_value, attr_type),
)

@staticmethod
def _parse(value: Any, attr_type: AttributeType) -> Any:
if value is None:
return None

py_type = FROM_STRING_PARSER_MAPPING.get(attr_type)
if py_type is None:
raise TypeError(
f"UiParameter.type {attr_type!r} is not supported for parsing."
)

try:
return py_type(value)
except Exception as e:
raise ValueError(
f"Failed to parse UiParameter value {value!r} as {attr_type.name}. "
f"Please provide a valid {attr_type.name.lower()} value."
) from e


class UDFOperatorV2(_UiParameterSupport, TupleOperatorV2):
"""
Base class for tuple-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down Expand Up @@ -65,7 +174,7 @@ def close(self) -> None:
pass


class UDFSourceOperator(SourceOperator):
class UDFSourceOperator(_UiParameterSupport, SourceOperator):
def open(self) -> None:
"""
Open a context of the operator. Usually can be used for loading/initiating some
Expand All @@ -90,7 +199,7 @@ def close(self) -> None:
pass


class UDFTableOperator(TableOperator):
class UDFTableOperator(_UiParameterSupport, TableOperator):
"""
Base class for table-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down Expand Up @@ -123,7 +232,7 @@ def close(self) -> None:
pass


class UDFBatchOperator(BatchOperator):
class UDFBatchOperator(_UiParameterSupport, BatchOperator):
"""
Base class for batch-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.texera.amber.pybuilder.EncodableStringAnnotation;


import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -49,6 +51,7 @@ public Attribute(

@JsonProperty(value = "attributeName", required = true)
@NotBlank(message = "Attribute name is required")
@EncodableStringAnnotation
public String getName() {
return attributeName;
}
Expand Down
Loading
Loading