From e628f03c2c807685dfc2a6a6e78c27351d2401e5 Mon Sep 17 00:00:00 2001 From: FernandoCelmer Date: Mon, 28 Apr 2025 01:48:41 -0300 Subject: [PATCH 1/8] =?UTF-8?q?=F0=9F=93=8C=20ISSUE-#56:=20Action=20with?= =?UTF-8?q?=20timeout,=20retry=5Fdelay=20and=20backoff?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/nav/reference/action.md | 2 +- dotflow/core/action.py | 59 ++++++++++++++++++++++++------------ 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/docs/nav/reference/action.md b/docs/nav/reference/action.md index 3f2ade9..1203823 100644 --- a/docs/nav/reference/action.md +++ b/docs/nav/reference/action.md @@ -7,7 +7,7 @@ - task - retry - params - - _retry + - _run_action - _set_params - _get_context - _get_task diff --git a/dotflow/core/action.py b/dotflow/core/action.py index 3d36451..b8c7631 100644 --- a/dotflow/core/action.py +++ b/dotflow/core/action.py @@ -1,5 +1,8 @@ """Action module""" +from time import sleep + +from concurrent.futures import ThreadPoolExecutor from typing import Callable, Dict from types import FunctionType @@ -39,14 +42,20 @@ def my_task(): """ def __init__( - self, - func: Callable = None, - task: Callable = None, - retry: int = 1 + self, + func: Callable = None, + task: Callable = None, + retry: int = 1, + timeout: int = 0, + retry_delay: int = 1, + backoff: bool = False, ) -> None: self.func = func self.task = task self.retry = retry + self.timeout = timeout + self.retry_delay = retry_delay + self.backoff = backoff self.params = [] def __call__(self, *args, **kwargs): @@ -59,15 +68,15 @@ def __call__(self, *args, **kwargs): if contexts: return Context( - storage=self._retry(*args, **contexts), + storage=self._run_action(*args, **contexts), task_id=task.task_id, - workflow_id=task.workflow_id + workflow_id=task.workflow_id, ) return Context( - storage=self._retry(*args), + storage=self._run_action(*args), task_id=task.task_id, - workflow_id=task.workflow_id + workflow_id=task.workflow_id, ) # No parameters @@ -80,31 +89,39 @@ def action(*_args, **_kwargs): if contexts: return Context( - storage=self._retry(*_args, **contexts), + storage=self._run_action(*_args, **contexts), task_id=task.task_id, - workflow_id=task.workflow_id + workflow_id=task.workflow_id, ) return Context( - storage=self._retry(*_args), + storage=self._run_action(*_args), task_id=task.task_id, - workflow_id=task.workflow_id + workflow_id=task.workflow_id, ) return action - def _retry(self, *args, **kwargs): - attempt = 0 - exception = Exception() + def _run_action(self, *args, **kwargs): + last_exception = Exception("Unknown") - while self.retry > attempt: + for _ in range(1, self.retry + 1): try: + if self.timeout: + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(self.func, *args, **kwargs) + return future.result(timeout=self.timeout) + return self.func(*args, **kwargs) + except Exception as error: - exception = error - attempt += 1 + last_exception = error + + sleep(self.retry_delay) + if self.backoff: + self.retry_delay *= 2 - raise exception + raise last_exception def _set_params(self): if isinstance(self.func, FunctionType): @@ -113,7 +130,9 @@ def _set_params(self): if type(self.func) is type: if hasattr(self.func, "__init__"): if hasattr(self.func.__init__, "__code__"): - self.params = [param for param in self.func.__init__.__code__.co_varnames] + self.params = [ + param for param in self.func.__init__.__code__.co_varnames + ] def _get_context(self, kwargs: Dict): context = {} From 2540cdd10f99b604641e601780839b41fad0fc74 Mon Sep 17 00:00:00 2001 From: FernandoCelmer Date: Mon, 28 Apr 2025 22:44:27 -0300 Subject: [PATCH 2/8] =?UTF-8?q?=F0=9F=93=8C=20ISSUE-#56:=20Action=20with?= =?UTF-8?q?=20timeout,=20retry=5Fdelay=20and=20backoff?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotflow/core/action.py | 23 ++++++++++++++++++----- dotflow/core/exception.py | 5 +++++ dotflow/core/execution.py | 6 +++++- examples | 2 +- tests/test_integration.py | 12 ++++++++++++ 5 files changed, 41 insertions(+), 7 deletions(-) diff --git a/dotflow/core/action.py b/dotflow/core/action.py index b8c7631..a91d6be 100644 --- a/dotflow/core/action.py +++ b/dotflow/core/action.py @@ -6,9 +6,20 @@ from typing import Callable, Dict from types import FunctionType +from dotflow.core.exception import ExecutionWithClassError from dotflow.core.context import Context +def is_execution_with_class_internal_error(error: Exception) -> bool: + message = str(error) + patterns = [ + "initial_context", + "previous_context", + "missing 1 required positional argument: 'self'", + ] + return any(pattern in message for pattern in patterns) + + class Action(object): """ Import: @@ -103,9 +114,7 @@ def action(*_args, **_kwargs): return action def _run_action(self, *args, **kwargs): - last_exception = Exception("Unknown") - - for _ in range(1, self.retry + 1): + for attempt in range(1, self.retry + 1): try: if self.timeout: with ThreadPoolExecutor(max_workers=1) as executor: @@ -117,12 +126,16 @@ def _run_action(self, *args, **kwargs): except Exception as error: last_exception = error + if is_execution_with_class_internal_error(error=last_exception): + raise ExecutionWithClassError() + + if attempt == self.retry: + raise last_exception + sleep(self.retry_delay) if self.backoff: self.retry_delay *= 2 - raise last_exception - def _set_params(self): if isinstance(self.func, FunctionType): self.params = [param for param in self.func.__code__.co_varnames] diff --git a/dotflow/core/exception.py b/dotflow/core/exception.py index 2e06982..399aed6 100644 --- a/dotflow/core/exception.py +++ b/dotflow/core/exception.py @@ -63,3 +63,8 @@ def __init__(self, module: str, library: str): library=library ) ) + + +class ExecutionWithClassError(Exception): + def __init__(self): + super(ExecutionWithClassError, self).__init__("Unknown") diff --git a/dotflow/core/execution.py b/dotflow/core/execution.py index d6037c3..f5d52e5 100644 --- a/dotflow/core/execution.py +++ b/dotflow/core/execution.py @@ -10,6 +10,7 @@ except ImportError: NoneType = type(None) +from dotflow.core.exception import ExecutionWithClassError from dotflow.logging import logger from dotflow.core.action import Action from dotflow.core.context import Context @@ -116,7 +117,10 @@ def _execution_with_class(self, class_instance: Callable): new_context.storage.append(subcontext) previous_context = subcontext - except Exception: + except Exception as error: + if not isinstance(error, ExecutionWithClassError): + raise error + subcontext = new_object( class_instance, initial_context=self.task.initial_context, diff --git a/examples b/examples index b66286b..7a9cdc4 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit b66286bc1d8a7a39faf2d1bcd10660ce02816ee4 +Subproject commit 7a9cdc4ea59ba2ba5803f97efdd289068f99f5e3 diff --git a/tests/test_integration.py b/tests/test_integration.py index 1e50ffa..04640aa 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -30,6 +30,9 @@ from examples.workflow_parallel_mode import main as workflow_parallel_mode from examples.workflow_sequential_group_mode import main as workflow_sequential_group_mode from examples.workflow_sequential_mode import main as workflow_sequential_mode +from examples.workflow_with_backoff import main as workflow_with_backoff +from examples.workflow_with_retry_delay import main as workflow_with_retry_delay +from examples.workflow_with_timeout import main as workflow_with_timeout class TestIntegration(unittest.TestCase): @@ -117,3 +120,12 @@ def test_workflow_sequential_group_mode(self): def test_workflow_sequential_mode(sefl): workflow_sequential_mode() + + def test_workflow_with_backoff(self): + workflow_with_backoff() + + def test_workflow_with_retry_delay(self): + workflow_with_retry_delay() + + def test_workflow_with_timeout(self): + workflow_with_timeout() From dcd50ce157dea1347f23dd150168b259e8e668e3 Mon Sep 17 00:00:00 2001 From: FernandoCelmer Date: Thu, 1 May 2025 21:21:06 -0300 Subject: [PATCH 3/8] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20FEATURE:=20Updated?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples b/examples index 7a9cdc4..ed59c0d 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit 7a9cdc4ea59ba2ba5803f97efdd289068f99f5e3 +Subproject commit ed59c0de135c6df7a984f59a383a61ff2a6cf1eb From 3ddc0ee880003e0c76811faa2e9fea41a6be7663 Mon Sep 17 00:00:00 2001 From: FernandoCelmer Date: Thu, 1 May 2025 22:00:16 -0300 Subject: [PATCH 4/8] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20FEATURE:=20Updated?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples b/examples index ed59c0d..9fcf3f8 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit ed59c0de135c6df7a984f59a383a61ff2a6cf1eb +Subproject commit 9fcf3f800dc0fb47aa713a456703719d8adcadda From 01c701a7ea668ea68b9ac5dab79676f1673946d9 Mon Sep 17 00:00:00 2001 From: FernandoCelmer Date: Sat, 3 May 2025 18:39:24 -0300 Subject: [PATCH 5/8] =?UTF-8?q?=F0=9F=93=8C=20ISSUE-#56:=20Updated?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotflow/abc/flow.py | 2 +- dotflow/core/workflow.py | 14 +++++++------- tests/core/test_workflow_background.py | 4 ++-- tests/core/test_workflow_parallel.py | 4 ++-- tests/core/test_workflow_sequential.py | 4 ++-- tests/core/test_workflow_sequential_group.py | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/dotflow/abc/flow.py b/dotflow/abc/flow.py index 423a04f..3840dd1 100644 --- a/dotflow/abc/flow.py +++ b/dotflow/abc/flow.py @@ -34,7 +34,7 @@ def get_tasks(self) -> List[Task]: return self.queue @abstractmethod - def _internal_callback(self, task: Task) -> None: + def _flow_callback(self, task: Task) -> None: self.queue.append(task) @abstractmethod diff --git a/dotflow/core/workflow.py b/dotflow/core/workflow.py index be9f48b..4c08274 100644 --- a/dotflow/core/workflow.py +++ b/dotflow/core/workflow.py @@ -161,7 +161,7 @@ def setup_queue(self) -> None: def get_tasks(self) -> List[Task]: return self.queue - def _internal_callback(self, task: Task) -> None: + def _flow_callback(self, task: Task) -> None: self.queue.append(task) def run(self) -> None: @@ -172,7 +172,7 @@ def run(self) -> None: task=task, workflow_id=self.workflow_id, previous_context=previous_context, - _internal_callback=self._internal_callback, + _flow_callback=self._flow_callback, ) previous_context = task.config.storage.get( @@ -204,7 +204,7 @@ def get_tasks(self) -> List[Task]: return self.tasks - def _internal_callback(self, task: Task) -> None: + def _flow_callback(self, task: Task) -> None: current_task = { task.task_id: { "current_context": task.current_context, @@ -249,7 +249,7 @@ def _run_group(self, groups: List[Task]) -> None: task=task, workflow_id=self.workflow_id, previous_context=previous_context, - _internal_callback=self._internal_callback, + _flow_callback=self._flow_callback, ) previous_context = task.config.storage.get( @@ -269,7 +269,7 @@ def setup_queue(self) -> None: def get_tasks(self) -> List[Task]: return self.tasks - def _internal_callback(self, task: Task) -> None: + def _flow_callback(self, task: Task) -> None: pass def run(self) -> None: @@ -306,7 +306,7 @@ def get_tasks(self) -> List[Task]: return self.tasks - def _internal_callback(self, task: Task) -> None: + def _flow_callback(self, task: Task) -> None: current_task = { task.task_id: { "current_context": task.current_context, @@ -324,7 +324,7 @@ def run(self) -> None: for task in self.tasks: process = Process( target=Execution, - args=(task, self.workflow_id, previous_context, self._internal_callback), + args=(task, self.workflow_id, previous_context, self._flow_callback), ) process.start() processes.append(process) diff --git a/tests/core/test_workflow_background.py b/tests/core/test_workflow_background.py index 37aebed..2c6a792 100644 --- a/tests/core/test_workflow_background.py +++ b/tests/core/test_workflow_background.py @@ -88,7 +88,7 @@ def test_instantiating_background_setup_queue(self): self.assertListEqual(execution.queue, []) - def test_instantiating_background_internal_callback(self): + def test_instantiating_background_flow_callback(self): task = Task(task_id=5, step=action_step, callback=simple_callback) groups = grouper(tasks=[task]) @@ -100,7 +100,7 @@ def test_instantiating_background_internal_callback(self): ) execution.setup_queue() - execution._internal_callback(task=task) + execution._flow_callback(task=task) tasks = execution.get_tasks() self.assertEqual(tasks[0].task_id, 5) diff --git a/tests/core/test_workflow_parallel.py b/tests/core/test_workflow_parallel.py index 1642dcf..038ead5 100644 --- a/tests/core/test_workflow_parallel.py +++ b/tests/core/test_workflow_parallel.py @@ -89,7 +89,7 @@ def test_instantiating_parallel_setup_queue(self): self.assertIsInstance(execution.queue, Queue) - def test_instantiating_parallel_internal_callback(self): + def test_instantiating_parallel_flow_callback(self): task = Task(task_id=5, step=action_step, callback=simple_callback) groups = grouper(tasks=[task]) @@ -101,7 +101,7 @@ def test_instantiating_parallel_internal_callback(self): ) execution.setup_queue() - execution._internal_callback(task=task) + execution._flow_callback(task=task) tasks = execution.get_tasks() self.assertEqual(tasks[0].task_id, 5) diff --git a/tests/core/test_workflow_sequential.py b/tests/core/test_workflow_sequential.py index 5c74440..2d0bbba 100644 --- a/tests/core/test_workflow_sequential.py +++ b/tests/core/test_workflow_sequential.py @@ -96,7 +96,7 @@ def test_instantiating_sequential_setup_queue(self): self.assertListEqual(execution.queue, []) - def test_instantiating_sequential_internal_callback(self): + def test_instantiating_sequential_flow_callback(self): task = Task(task_id=5, step=action_step, callback=simple_callback) groups = grouper(tasks=[task]) @@ -108,7 +108,7 @@ def test_instantiating_sequential_internal_callback(self): ) execution.setup_queue() - execution._internal_callback(task=task) + execution._flow_callback(task=task) tasks = execution.get_tasks() self.assertEqual(tasks[0].task_id, 5) diff --git a/tests/core/test_workflow_sequential_group.py b/tests/core/test_workflow_sequential_group.py index 2c5bf70..7bd39b7 100644 --- a/tests/core/test_workflow_sequential_group.py +++ b/tests/core/test_workflow_sequential_group.py @@ -90,7 +90,7 @@ def test_instantiating_sequential_group_setup_queue(self): self.assertIsInstance(execution.queue, Queue) - def test_instantiating_sequential_group_internal_callback(self): + def test_instantiating_sequential_group_flow_callback(self): task = Task(task_id=5, step=action_step, callback=simple_callback) groups = grouper(tasks=[task]) @@ -102,7 +102,7 @@ def test_instantiating_sequential_group_internal_callback(self): ) execution.setup_queue() - execution._internal_callback(task=task) + execution._flow_callback(task=task) tasks = execution.get_tasks() self.assertEqual(tasks[0].task_id, 5) From 44c7d1d6a15a546b4ef56f7c822077fbccf2483b Mon Sep 17 00:00:00 2001 From: FernandoCelmer Date: Sat, 3 May 2025 18:53:52 -0300 Subject: [PATCH 6/8] =?UTF-8?q?=F0=9F=93=8C=20ISSUE-#56:=20Updated?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotflow/core/execution.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dotflow/core/execution.py b/dotflow/core/execution.py index f5d52e5..bbac826 100644 --- a/dotflow/core/execution.py +++ b/dotflow/core/execution.py @@ -47,14 +47,14 @@ def __init__( task: Task, workflow_id: UUID, previous_context: Context = None, - _internal_callback: Callable = basic_callback, + _flow_callback: Callable = basic_callback, ) -> None: self.task = task self.task.status = TaskStatus.IN_PROGRESS self.task.previous_context = previous_context self.task.workflow_id = workflow_id - self._excution(_internal_callback) + self._excution(_flow_callback) def _is_action(self, class_instance: Callable, func: Callable): try: @@ -136,7 +136,7 @@ def _execution_with_class(self, class_instance: Callable): return new_context @time - def _excution(self, _internal_callback): + def _excution(self, _flow_callback): try: current_context = self.task.step( initial_context=self.task.initial_context, @@ -162,6 +162,6 @@ def _excution(self, _internal_callback): finally: self.task.callback(task=self.task) - _internal_callback(task=self.task) + _flow_callback(task=self.task) return self.task From 07630998ab9856321b30d11023b920527b61a6c0 Mon Sep 17 00:00:00 2001 From: FernandoCelmer Date: Sat, 3 May 2025 19:04:01 -0300 Subject: [PATCH 7/8] =?UTF-8?q?=F0=9F=93=98=20DOCS:=20Updated?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/nav/development/release-notes.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/nav/development/release-notes.md b/docs/nav/development/release-notes.md index 8d2076d..a723bc6 100644 --- a/docs/nav/development/release-notes.md +++ b/docs/nav/development/release-notes.md @@ -1,5 +1,10 @@ # Release Notes +## v0.13.0 + +- [📦 PyPI - Build 0.12.0](https://github.com/dotflow-io/dotflow/releases/tag/v0.13.0) +- [📌 Action with timeout, retry_delay and backoff](https://github.com/dotflow-io/dotflow/pull/56) + ## v0.12.0 - [📦 PyPI - Build 0.12.0](https://github.com/dotflow-io/dotflow/releases/tag/v0.12.0) From a51edbce37da109108f85e2758337cc67517cbd2 Mon Sep 17 00:00:00 2001 From: FernandoCelmer Date: Sat, 3 May 2025 19:04:29 -0300 Subject: [PATCH 8/8] =?UTF-8?q?=F0=9F=93=98=20DOCS:=20Updated?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/nav/development/release-notes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/nav/development/release-notes.md b/docs/nav/development/release-notes.md index a723bc6..a259d89 100644 --- a/docs/nav/development/release-notes.md +++ b/docs/nav/development/release-notes.md @@ -2,7 +2,7 @@ ## v0.13.0 -- [📦 PyPI - Build 0.12.0](https://github.com/dotflow-io/dotflow/releases/tag/v0.13.0) +- [📦 PyPI - Build 0.13.0](https://github.com/dotflow-io/dotflow/releases/tag/v0.13.0) - [📌 Action with timeout, retry_delay and backoff](https://github.com/dotflow-io/dotflow/pull/56) ## v0.12.0