Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/nav/development/release-notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Release Notes

## v0.13.0

- [📦 PyPI - Build 0.13.0](https://github.com/dotflow-io/dotflow/releases/tag/v0.13.0)
- [📌 Action with timeout, retry_delay and backoff](https://github.com/dotflow-io/dotflow/pull/56)

## v0.12.0

- [📦 PyPI - Build 0.12.0](https://github.com/dotflow-io/dotflow/releases/tag/v0.12.0)
Expand Down
2 changes: 1 addition & 1 deletion docs/nav/reference/action.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
- task
- retry
- params
- _retry
- _run_action
- _set_params
- _get_context
- _get_task
2 changes: 1 addition & 1 deletion dotflow/abc/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def get_tasks(self) -> List[Task]:
return self.queue

@abstractmethod
def _internal_callback(self, task: Task) -> None:
def _flow_callback(self, task: Task) -> None:
self.queue.append(task)

@abstractmethod
Expand Down
74 changes: 53 additions & 21 deletions dotflow/core/action.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
"""Action module"""

from time import sleep

from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Dict
from types import FunctionType

from dotflow.core.exception import ExecutionWithClassError
from dotflow.core.context import Context


def is_execution_with_class_internal_error(error: Exception) -> bool:
message = str(error)
patterns = [
"initial_context",
"previous_context",
"missing 1 required positional argument: 'self'",
]
return any(pattern in message for pattern in patterns)


class Action(object):
"""
Import:
Expand Down Expand Up @@ -39,14 +53,20 @@ def my_task():
"""

def __init__(
self,
func: Callable = None,
task: Callable = None,
retry: int = 1
self,
func: Callable = None,
task: Callable = None,
retry: int = 1,
timeout: int = 0,
retry_delay: int = 1,
backoff: bool = False,
) -> None:
self.func = func
self.task = task
self.retry = retry
self.timeout = timeout
self.retry_delay = retry_delay
self.backoff = backoff
self.params = []

def __call__(self, *args, **kwargs):
Expand All @@ -59,15 +79,15 @@ def __call__(self, *args, **kwargs):

if contexts:
return Context(
storage=self._retry(*args, **contexts),
storage=self._run_action(*args, **contexts),
task_id=task.task_id,
workflow_id=task.workflow_id
workflow_id=task.workflow_id,
)

return Context(
storage=self._retry(*args),
storage=self._run_action(*args),
task_id=task.task_id,
workflow_id=task.workflow_id
workflow_id=task.workflow_id,
)

# No parameters
Expand All @@ -80,31 +100,41 @@ def action(*_args, **_kwargs):

if contexts:
return Context(
storage=self._retry(*_args, **contexts),
storage=self._run_action(*_args, **contexts),
task_id=task.task_id,
workflow_id=task.workflow_id
workflow_id=task.workflow_id,
)

return Context(
storage=self._retry(*_args),
storage=self._run_action(*_args),
task_id=task.task_id,
workflow_id=task.workflow_id
workflow_id=task.workflow_id,
)

return action

def _retry(self, *args, **kwargs):
attempt = 0
exception = Exception()

while self.retry > attempt:
def _run_action(self, *args, **kwargs):
for attempt in range(1, self.retry + 1):
try:
if self.timeout:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(self.func, *args, **kwargs)
return future.result(timeout=self.timeout)

return self.func(*args, **kwargs)

except Exception as error:
exception = error
attempt += 1
last_exception = error

if is_execution_with_class_internal_error(error=last_exception):
raise ExecutionWithClassError()

if attempt == self.retry:
raise last_exception

raise exception
sleep(self.retry_delay)
if self.backoff:
self.retry_delay *= 2

def _set_params(self):
if isinstance(self.func, FunctionType):
Expand All @@ -113,7 +143,9 @@ def _set_params(self):
if type(self.func) is type:
if hasattr(self.func, "__init__"):
if hasattr(self.func.__init__, "__code__"):
self.params = [param for param in self.func.__init__.__code__.co_varnames]
self.params = [
param for param in self.func.__init__.__code__.co_varnames
]

def _get_context(self, kwargs: Dict):
context = {}
Expand Down
5 changes: 5 additions & 0 deletions dotflow/core/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,8 @@ def __init__(self, module: str, library: str):
library=library
)
)


class ExecutionWithClassError(Exception):
def __init__(self):
super(ExecutionWithClassError, self).__init__("Unknown")
14 changes: 9 additions & 5 deletions dotflow/core/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
except ImportError:
NoneType = type(None)

from dotflow.core.exception import ExecutionWithClassError
from dotflow.logging import logger
from dotflow.core.action import Action
from dotflow.core.context import Context
Expand Down Expand Up @@ -46,14 +47,14 @@ def __init__(
task: Task,
workflow_id: UUID,
previous_context: Context = None,
_internal_callback: Callable = basic_callback,
_flow_callback: Callable = basic_callback,
) -> None:
self.task = task
self.task.status = TaskStatus.IN_PROGRESS
self.task.previous_context = previous_context
self.task.workflow_id = workflow_id

self._excution(_internal_callback)
self._excution(_flow_callback)

def _is_action(self, class_instance: Callable, func: Callable):
try:
Expand Down Expand Up @@ -116,7 +117,10 @@ def _execution_with_class(self, class_instance: Callable):
new_context.storage.append(subcontext)
previous_context = subcontext

except Exception:
except Exception as error:
if not isinstance(error, ExecutionWithClassError):
raise error

subcontext = new_object(
class_instance,
initial_context=self.task.initial_context,
Expand All @@ -132,7 +136,7 @@ def _execution_with_class(self, class_instance: Callable):
return new_context

@time
def _excution(self, _internal_callback):
def _excution(self, _flow_callback):
try:
current_context = self.task.step(
initial_context=self.task.initial_context,
Expand All @@ -158,6 +162,6 @@ def _excution(self, _internal_callback):

finally:
self.task.callback(task=self.task)
_internal_callback(task=self.task)
_flow_callback(task=self.task)

return self.task
14 changes: 7 additions & 7 deletions dotflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def setup_queue(self) -> None:
def get_tasks(self) -> List[Task]:
return self.queue

def _internal_callback(self, task: Task) -> None:
def _flow_callback(self, task: Task) -> None:
self.queue.append(task)

def run(self) -> None:
Expand All @@ -172,7 +172,7 @@ def run(self) -> None:
task=task,
workflow_id=self.workflow_id,
previous_context=previous_context,
_internal_callback=self._internal_callback,
_flow_callback=self._flow_callback,
)

previous_context = task.config.storage.get(
Expand Down Expand Up @@ -204,7 +204,7 @@ def get_tasks(self) -> List[Task]:

return self.tasks

def _internal_callback(self, task: Task) -> None:
def _flow_callback(self, task: Task) -> None:
current_task = {
task.task_id: {
"current_context": task.current_context,
Expand Down Expand Up @@ -249,7 +249,7 @@ def _run_group(self, groups: List[Task]) -> None:
task=task,
workflow_id=self.workflow_id,
previous_context=previous_context,
_internal_callback=self._internal_callback,
_flow_callback=self._flow_callback,
)

previous_context = task.config.storage.get(
Expand All @@ -269,7 +269,7 @@ def setup_queue(self) -> None:
def get_tasks(self) -> List[Task]:
return self.tasks

def _internal_callback(self, task: Task) -> None:
def _flow_callback(self, task: Task) -> None:
pass

def run(self) -> None:
Expand Down Expand Up @@ -306,7 +306,7 @@ def get_tasks(self) -> List[Task]:

return self.tasks

def _internal_callback(self, task: Task) -> None:
def _flow_callback(self, task: Task) -> None:
current_task = {
task.task_id: {
"current_context": task.current_context,
Expand All @@ -324,7 +324,7 @@ def run(self) -> None:
for task in self.tasks:
process = Process(
target=Execution,
args=(task, self.workflow_id, previous_context, self._internal_callback),
args=(task, self.workflow_id, previous_context, self._flow_callback),
)
process.start()
processes.append(process)
Expand Down
2 changes: 1 addition & 1 deletion examples
4 changes: 2 additions & 2 deletions tests/core/test_workflow_background.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_instantiating_background_setup_queue(self):

self.assertListEqual(execution.queue, [])

def test_instantiating_background_internal_callback(self):
def test_instantiating_background_flow_callback(self):
task = Task(task_id=5, step=action_step, callback=simple_callback)
groups = grouper(tasks=[task])

Expand All @@ -100,7 +100,7 @@ def test_instantiating_background_internal_callback(self):
)

execution.setup_queue()
execution._internal_callback(task=task)
execution._flow_callback(task=task)

tasks = execution.get_tasks()
self.assertEqual(tasks[0].task_id, 5)
4 changes: 2 additions & 2 deletions tests/core/test_workflow_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_instantiating_parallel_setup_queue(self):

self.assertIsInstance(execution.queue, Queue)

def test_instantiating_parallel_internal_callback(self):
def test_instantiating_parallel_flow_callback(self):
task = Task(task_id=5, step=action_step, callback=simple_callback)
groups = grouper(tasks=[task])

Expand All @@ -101,7 +101,7 @@ def test_instantiating_parallel_internal_callback(self):
)

execution.setup_queue()
execution._internal_callback(task=task)
execution._flow_callback(task=task)

tasks = execution.get_tasks()
self.assertEqual(tasks[0].task_id, 5)
4 changes: 2 additions & 2 deletions tests/core/test_workflow_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def test_instantiating_sequential_setup_queue(self):

self.assertListEqual(execution.queue, [])

def test_instantiating_sequential_internal_callback(self):
def test_instantiating_sequential_flow_callback(self):
task = Task(task_id=5, step=action_step, callback=simple_callback)
groups = grouper(tasks=[task])

Expand All @@ -108,7 +108,7 @@ def test_instantiating_sequential_internal_callback(self):
)

execution.setup_queue()
execution._internal_callback(task=task)
execution._flow_callback(task=task)

tasks = execution.get_tasks()
self.assertEqual(tasks[0].task_id, 5)
4 changes: 2 additions & 2 deletions tests/core/test_workflow_sequential_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_instantiating_sequential_group_setup_queue(self):

self.assertIsInstance(execution.queue, Queue)

def test_instantiating_sequential_group_internal_callback(self):
def test_instantiating_sequential_group_flow_callback(self):
task = Task(task_id=5, step=action_step, callback=simple_callback)
groups = grouper(tasks=[task])

Expand All @@ -102,7 +102,7 @@ def test_instantiating_sequential_group_internal_callback(self):
)

execution.setup_queue()
execution._internal_callback(task=task)
execution._flow_callback(task=task)

tasks = execution.get_tasks()
self.assertEqual(tasks[0].task_id, 5)
12 changes: 12 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
from examples.workflow_parallel_mode import main as workflow_parallel_mode
from examples.workflow_sequential_group_mode import main as workflow_sequential_group_mode
from examples.workflow_sequential_mode import main as workflow_sequential_mode
from examples.workflow_with_backoff import main as workflow_with_backoff
from examples.workflow_with_retry_delay import main as workflow_with_retry_delay
from examples.workflow_with_timeout import main as workflow_with_timeout


class TestIntegration(unittest.TestCase):
Expand Down Expand Up @@ -117,3 +120,12 @@ def test_workflow_sequential_group_mode(self):

def test_workflow_sequential_mode(sefl):
workflow_sequential_mode()

def test_workflow_with_backoff(self):
workflow_with_backoff()

def test_workflow_with_retry_delay(self):
workflow_with_retry_delay()

def test_workflow_with_timeout(self):
workflow_with_timeout()