Skip to content

ishumatil/dynamic_workflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 

Repository files navigation

Workflow Execution Engine

A concurrent workflow execution engine that supports task dependencies, retry mechanisms, and comprehensive logging.


Quick Start

1. Build the Project

mvn clean package

2. Run the Application

java -jar target/workflow-execution-engine-1.0.0-jar-with-dependencies.jar

How to Use

When you run the application, you'll be prompted to choose between creating a new workflow or resuming from a saved state:

=== Workflow Execution Engine ===
Interactive Mode

Options:
1. Create new workflow
2. Resume from saved state
Choose option (1 or 2): 1

Enter workflow ID: my-workflow
Enter number of tasks: 3
  • Option 1: Create a new workflow by defining tasks interactively
  • Option 2: Resume from a previously saved workflow state

Input Fields

For each task, you'll be asked:

Field Description Example
Task ID Unique task identifier task1, fetch, process
Duration (ms) How long the task takes 500, 1000
Should fail? Simulate failure (yes/no) no
Max retries Retry attempts (default 3) 3
Dependencies Comma-separated task IDs task1,task2 or leave blank

Example: Simple Linear Workflow

Enter workflow ID: data-pipeline
Enter number of tasks: 3

--- Task 1 ---
Task ID: fetch
Task duration (ms): 500
Should task fail? (yes/no): no
Max retries (default 3): 3
Dependencies:
✓ Task 'fetch' added

--- Task 2 ---
Task ID: process
Task duration (ms): 300
Should task fail? (yes/no): no
Max retries (default 3): 2
Dependencies: fetch
✓ Task 'process' added

--- Task 3 ---
Task ID: store
Task duration (ms): 200
Should task fail? (yes/no): no
Max retries (default 3): 1
Dependencies: process
✓ Task 'store' added

=== Workflow Summary ===
Workflow ID: data-pipeline
Total tasks: 3

Task Dependencies:
  fetch -> []
  process -> [fetch]
  store -> [process]

=== Executing Workflow ===
  [EXECUTING] fetch
  [COMPLETED] fetch
  [EXECUTING] process
  [COMPLETED] process
  [EXECUTING] store
  [COMPLETED] store

=== Execution Complete ===
Status: SUCCESS
Total execution time: 1015ms

Task Results:
  fetch: SUCCESS (attempts: 1)
  process: SUCCESS (attempts: 1)
  store: SUCCESS (attempts: 1)

Example: Parallel Workflow

Create tasks that can run concurrently:

Enter workflow ID: parallel-demo
Enter number of tasks: 4

Task 1:
  ID: init
  Duration: 100
  Fail: no
  Retries: 3
  Dependencies: (leave blank)

Task 2:
  ID: taskA
  Duration: 500
  Fail: no
  Retries: 3
  Dependencies: init

Task 3:
  ID: taskB
  Duration: 500
  Fail: no
  Retries: 3
  Dependencies: init

Task 4:
  ID: merge
  Duration: 200
  Fail: no
  Retries: 3
  Dependencies: taskA,taskB

Execution Flow:

    init
     |
   +-+-+
   |   |
 taskA taskB  (run in parallel)
   |   |
   +-+-+
     |
   merge

Result:

  • init executes first (100ms)
  • taskA and taskB execute in parallel (both take 500ms)
  • Total time for parallel execution: ~600ms (not 1100ms)
  • merge waits for both taskA and taskB to complete before executing (200ms)
  • Final status: SUCCESS
  • Total execution time: ~800ms (benefits from parallelization)

Example: Workflow with Retry

Test the retry mechanism:

Enter workflow ID: retry-test
Enter number of tasks: 3

Task 1:
  ID: start
  Duration: 100
  Fail: no
  Retries: 3
  Dependencies:

Task 2:
  ID: flaky_task
  Duration: 200
  Fail: yes     ← Simulate failure
  Retries: 5    ← Will retry 5 times
  Dependencies: start

Task 3:
  ID: end
  Duration: 100
  Fail: no
  Retries: 3
  Dependencies: flaky_task

Result:

  • start executes successfully
  • flaky_task fails and retries up to 5 times
  • If flaky_task ultimately fails, end is skipped
  • Task execution logs show retry attempts

How It Works: When you execute a workflow using the interactive terminal, the engine automatically:

  1. Creates a workflow-states/ directory if it doesn't exist
  2. Saves the workflow state in binary format: workflow-states/{workflowId}.state
  3. Saves the workflow state in JSON format: workflow-states/{workflowId}.json

Saved Information:

  • Workflow ID
  • Task definitions (ID, dependencies, max retries)
  • Task parameters (duration, failure settings)
  • Task execution results (SUCCESS, FAILED, SKIPPED)
  • Total execution time
  • Timestamp of when the state was saved

Testing Different Scenarios

Test 1: Simple Sequential Flow

Tasks: A → B → C
Dependencies: A (none), B (A), C (B)

Expected Result:

  • Tasks execute in strict sequential order
  • Task B waits for A to complete
  • Task C waits for B to complete
  • Total time = duration(A) + duration(B) + duration(C)
  • Final status: SUCCESS (if all tasks succeed)

Test 2: Parallel Execution

Tasks: init → (A, B, C) → merge
Dependencies: A/B/C depend on init, merge depends on A/B/C

Expected Result:

  • init executes first
  • A, B, and C execute simultaneously after init completes
  • merge waits for all three (A, B, C) to complete
  • Total time = duration(init) + max(duration(A, B, C)) + duration(merge)
  • Demonstrates concurrent execution capability
  • Final status: SUCCESS

Test 3: Failure Handling

Tasks: A → B (fails) → C
Set B to fail: yes

Expected Result:

  • Task A executes successfully
  • Task B fails and retries according to max retries setting
  • After all retries exhausted, B status = FAILED
  • Task C is SKIPPED (dependency failed)
  • Workflow status: FAILED
  • Execution logs show retry attempts for B

Test 4: Complex DAG

Tasks: A → (B, C) → (D, E) → F
Create a diamond-shaped dependency graph

Input:

  • A: no dependencies
  • B: depends on A
  • C: depends on A
  • D: depends on B, C
  • E: depends on B, C
  • F: depends on D, E

Expected Result:

  • A executes first
  • B and C execute in parallel after A
  • D and E execute in parallel after both B and C complete
  • F executes after both D and E complete
  • Maximum parallelization at each level
  • Final status: SUCCESS

Test 5: Retry Success

Tasks: A (fails initially but has retries)
Set fail: yes, Retries: 3

Expected Result:

  • Task A fails on first attempt
  • System retries up to 3 times
  • Each retry attempt is logged
  • Task may eventually fail after all retries
  • Demonstrates retry mechanism in action
  • Execution logs show: attempts = (number of retries used)

Test 6: Persistence - Save and Resume Workflow

Concept: Save workflow state to disk after execution. The workflow state is automatically saved in both binary and JSON formats.

Test Workflow:

Enter workflow ID: persistence-test
Enter number of tasks: 3

--- Task 1 ---
Task ID: fetch
Task duration (ms): 500
Should task fail? (yes/no): no
Max retries (default 3): 3
Dependencies:
✓ Task 'fetch' added

--- Task 2 ---
Task ID: process
Task duration (ms): 300
Should task fail? (yes/no): no
Max retries (default 3): 2
Dependencies: fetch
✓ Task 'process' added

--- Task 3 ---
Task ID: store
Task duration (ms): 200
Should task fail? (yes/no): no
Max retries (default 3): 1
Dependencies: process
✓ Task 'store' added

Expected Result:

  • Workflow executes: fetch → process → store (sequential)
  • Total execution time: ~1000ms
  • All tasks succeed (attempts: 1 each)
  • Workflow state is automatically saved to disk after execution
  • Two files are created in the workflow-states/ directory:
    • persistence-test.state (binary format - compact)
    • persistence-test.json (JSON format - human-readable)

Output:

=== Executing Workflow ===
  [EXECUTING] fetch
  [COMPLETED] fetch
  [EXECUTING] process
  [COMPLETED] process
  [EXECUTING] store
  [COMPLETED] store

=== Execution Complete ===
Status: SUCCESS
Total execution time: 1015ms

Task Results:
  fetch: SUCCESS (attempts: 1)
  process: SUCCESS (attempts: 1)
  store: SUCCESS (attempts: 1)

=== Saving Workflow State ===
✓ Workflow state saved to: workflow-states/persistence-test.state
✓ Workflow state saved to: workflow-states/persistence-test.json

Verification Steps:

  1. Check saved files exist:
ls -lh workflow-states/

Expected output:

persistence-test.state    # Binary format (serialized Java object)
persistence-test.json     # JSON format (human-readable)
  1. Inspect JSON file:
cat workflow-states/persistence-test.json

The JSON file contains:

  • Workflow ID
  • Saved timestamp
  • Total execution time
  • Task statuses (SUCCESS/FAILED/SKIPPED)
  • Task definitions with dependencies, retries, and parameters
  1. Verify file contents:
  • Check that all task IDs are present
  • Verify dependencies are correctly recorded
  • Confirm task parameters (duration, shouldFail) are saved
  • Validate execution time matches the workflow result

Use Cases:

  • Audit workflow execution history
  • Inspect workflow structure and task dependencies
  • Debug workflow issues by examining saved state
  • Transfer workflow definitions between systems
  • Demonstrate state persistence capability

Resuming from Saved State

You can resume and re-execute any previously saved workflow.

Steps to Resume:

  1. Run the application:
java -jar target/workflow-execution-engine-1.0.0-jar-with-dependencies.jar
  1. Select option 2 (Resume):
=== Workflow Execution Engine ===
Interactive Mode

Options:
1. Create new workflow
2. Resume from saved state
Choose option (1 or 2): 2
  1. Select workflow to resume:
=== Resume from Saved State ===

Available saved workflows:
1. persistence-test
2. data-pipeline
3. parallel-demo

Select workflow to resume (enter number): 1
  1. View loaded workflow details:
Loading workflow from: workflow-states/persistence-test.state
✓ Workflow loaded: persistence-test
  Saved at: 2026-02-25T11:30:15.123
  Previous execution time: 1015ms

=== Recreating Tasks ===
✓ Task 'fetch' recreated
✓ Task 'process' recreated
✓ Task 'store' recreated

=== Workflow Summary ===
Workflow ID: persistence-test
Total tasks: 3

Task Dependencies:
  fetch -> []
  process -> [fetch]
  store -> [process]
  1. Workflow re-executes:
=== Executing Workflow ===
  [EXECUTING] fetch
  [COMPLETED] fetch
  [EXECUTING] process
  [COMPLETED] process
  [EXECUTING] store
  [COMPLETED] store

=== Execution Complete ===
Status: SUCCESS
Total execution time: 1018ms

Task Results:
  fetch: SUCCESS (attempts: 1)
  process: SUCCESS (attempts: 1)
  store: SUCCESS (attempts: 1)

Key Points:

  • The workflow is recreated with the same structure (tasks, dependencies, parameters)
  • The workflow re-executes from the beginning (not from a paused state)
  • Useful for re-running the same workflow multiple times
  • Each execution saves a new state snapshot

Build Commands

# Clean and compile
mvn clean compile

# Run tests (if you add any)
mvn test

# Package as JAR
mvn package

# Clean everything
mvn clean

Project Structure

workflow_execution/
├── pom.xml
├── README.md
├── src/
│   ├── main/
│   │   ├── java/com/workflow/
│   │   │   ├── DynamicWorkflow.java   # Main entry point
│   │   │   ├── controller/            # Workflow controller (API layer)
│   │   │   ├── engine/                # Workflow execution engine
│   │   │   ├── model/                 # Domain models (Task, Workflow, etc.)
│   │   │   ├── service/               # Business services (input, state, tasks)
│   │   │   ├── persistence/           # State persistence (save/load)
│   │   │   └── config/                # Configuration loader
│   │   └── resources/
│   │       └── application.properties
└── target/
    └── workflow-execution-engine-1.0.0-jar-with-dependencies.jar

Requirements

  • Java: JDK 17 or higher
  • Maven: 3.6+
  • OS: Windows, Linux, macOS

About

Code to implement dynamic workflow using DAG with some minimum features

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages