Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 231 additions & 8 deletions tests/test_cpp_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5485,7 +5485,7 @@ def create_World(cpp=True):

#################################
# DUE
solver_DUE = DTAsolvers.SolverDUE(create_World)
solver_DUE = DTAsolvers.SolverDUE(create_World, cpp=True)
solver_DUE.solve(max_iter=20) # max_iter should be larger (e.g., 100). this is just for demonstration
W_DUE = solver_DUE.W_sol
W_DUE.analyzer.print_simple_stats(force_print=True)
Expand All @@ -5509,7 +5509,7 @@ def create_World(cpp=True):

#################################
# DSO by day-to-day: this is recommended
solver_DSO_D2D= DTAsolvers.SolverDSO_D2D(create_World)
solver_DSO_D2D= DTAsolvers.SolverDSO_D2D(create_World, cpp=True)
solver_DSO_D2D.solve(max_iter=20) # max_iter should be larger (e.g., 100). this is just for demonstration
W_DSO_D2D = solver_DSO_D2D.W_sol
W_DSO_D2D.analyzer.print_simple_stats(force_print=True)
Expand Down Expand Up @@ -5598,7 +5598,7 @@ def create_World(cpp=True):

#################################
# DUE
solver_DUE = SolverDUE(create_World)
solver_DUE = SolverDUE(create_World, cpp=True)
solver_DUE.solve(max_iter=20) # max_iter should be larger (e.g., 100). this is just for demonstration
W_DUE = solver_DUE.W_sol
W_DUE.analyzer.print_simple_stats(force_print=True)
Expand Down Expand Up @@ -5706,7 +5706,7 @@ def create_World(cpp=True):
]
}

solver_DUE = DTAsolvers.SolverDUE(create_World)
solver_DUE = DTAsolvers.SolverDUE(create_World, cpp=True)
solver_DUE.solve(max_iter=20, route_sets=route_sets)
W_DUE = solver_DUE.W_sol
W_DUE.analyzer.print_simple_stats(force_print=True)
Expand All @@ -5717,7 +5717,7 @@ def create_World(cpp=True):
assert df_DUE_link["traffic_volume"][df_DUE_link["link"]=="offramp"].item() == 0


solver_DSO_D2D= DTAsolvers.SolverDSO_D2D(create_World)
solver_DSO_D2D= DTAsolvers.SolverDSO_D2D(create_World, cpp=True)
solver_DSO_D2D.solve(max_iter=20, route_sets=route_sets)
W_DSO_D2D = solver_DSO_D2D.W_sol
W_DSO_D2D.analyzer.print_simple_stats(force_print=True)
Expand Down Expand Up @@ -5775,7 +5775,7 @@ def toll(t):
return W

# DUE
solver_DUE = SolverDUE(create_World)
solver_DUE = SolverDUE(create_World, cpp=True)
solver_DUE.solve(max_iter=40, print_progress=False)
W_DUE = solver_DUE.W_sol
W_DUE.analyzer.print_simple_stats(force_print=False)
Expand Down Expand Up @@ -7038,7 +7038,7 @@ def create_World():
W.adddemand("A", "D", 0, 2000, 0.4)
return W

solver = SolverDUE(create_World)
solver = SolverDUE(create_World, cpp=True)
solver.solve(max_iter=3, print_progress=False)
W_sol = solver.W_sol
W_sol.analyzer.print_simple_stats(force_print=False)
Expand Down Expand Up @@ -7102,7 +7102,7 @@ def toll(t):
W.adddemand("1", "3", 0, 3000, 0.3)
return W

solver = SolverDUE(create_World)
solver = SolverDUE(create_World, cpp=True)
solver.solve(max_iter=3, print_progress=False)
W_sol = solver.W_sol
assert W_sol.analyzer.total_travel_time > 0
Expand Down Expand Up @@ -7932,3 +7932,226 @@ def test_coverage_print_mode_enabled():
W.adddemand("A", "B", 0, 100, 0.3)
W.exec_simulation()
assert len(W.VEHICLES) > 0


# ======================================================================
# DTA solver tests for C++ mode (SolverDUE, SolverDSO_D2D)
# Based on 9x9 grid scenario from devlog/benchtestdta.py
# ======================================================================

def _create_dta_grid_world(seed, cpp):
"""Helper: build a 9x9 grid DTA scenario (from benchtestdta.py)."""
W = uxsim.World(
name="", deltan=10, tmax=4800,
duo_update_time=300,
print_mode=0, save_mode=0, show_mode=0,
random_seed=seed, cpp=cpp,
)
imax, jmax = 9, 9
id_center = 4
nodes = {}
for i in range(imax):
for j in range(jmax):
nodes[i, j] = W.addNode(f"n{(i, j)}", i, j, flow_capacity=1.6)
for i in range(imax):
for j in range(jmax):
free_flow_speed = 10
if i != imax - 1:
if j == id_center:
free_flow_speed = 20
W.addLink(f"l{(i, j, i+1, j)}", nodes[i, j], nodes[i+1, j],
length=1000, free_flow_speed=free_flow_speed)
free_flow_speed = 10
if i != 0:
if j == id_center:
free_flow_speed = 20
W.addLink(f"l{(i, j, i-1, j)}", nodes[i, j], nodes[i-1, j],
length=1000, free_flow_speed=free_flow_speed)
free_flow_speed = 10
if j != jmax - 1:
if i == id_center:
free_flow_speed = 20
W.addLink(f"l{(i, j, i, j+1)}", nodes[i, j], nodes[i, j+1],
length=1000, free_flow_speed=free_flow_speed)
free_flow_speed = 10
if j != 0:
if i == id_center:
free_flow_speed = 20
W.addLink(f"l{(i, j, i, j-1)}", nodes[i, j], nodes[i, j-1],
length=1000, free_flow_speed=free_flow_speed)

demand_flow = 0.08
demand_duration = 2400
outer_ids = 3
for n1 in [(0, j) for j in range(outer_ids, jmax - outer_ids)]:
for n2 in [(imax - 1, j) for j in range(outer_ids, jmax - outer_ids)]:
W.adddemand(nodes[n1], nodes[n2], 0, demand_duration, demand_flow)
for n2 in [(i, jmax - 1) for i in range(outer_ids, imax - outer_ids)]:
W.adddemand(nodes[n1], nodes[n2], 0, demand_duration, demand_flow)
for n1 in [(i, 0) for i in range(outer_ids, imax - outer_ids)]:
for n2 in [(i, jmax - 1) for i in range(outer_ids, imax - outer_ids)]:
W.adddemand(nodes[n1], nodes[n2], 0, demand_duration, demand_flow)
for n2 in [(imax - 1, j) for j in range(outer_ids, jmax - outer_ids)]:
W.adddemand(nodes[n1], nodes[n2], 0, demand_duration, demand_flow)

return W


def test_dta_solver_due_grid_cpp():
"""SolverDUE runs on a 9x9 grid with cpp=True and returns positive TTT."""
from uxsim.DTAsolvers import SolverDUE

def create_World():
return _create_dta_grid_world(seed=42, cpp=True)

solver = SolverDUE(create_World, cpp=True)
solver.solve(max_iter=1, print_progress=False)
W_sol = solver.W_sol
W_sol.analyzer.print_simple_stats(force_print=False)
assert W_sol.analyzer.total_travel_time > 0
assert W_sol.analyzer.trip_completed > 0


def test_dta_solver_dso_d2d_grid_cpp():
"""SolverDSO_D2D runs on a 9x9 grid with cpp=True and returns positive TTT."""
from uxsim.DTAsolvers import SolverDSO_D2D

def create_World():
return _create_dta_grid_world(seed=42, cpp=True)

solver = SolverDSO_D2D(create_World, cpp=True)
solver.solve(max_iter=1, print_progress=False)
W_sol = solver.W_sol
W_sol.analyzer.print_simple_stats(force_print=False)
assert W_sol.analyzer.total_travel_time > 0
assert W_sol.analyzer.trip_completed > 0


@pytest.mark.flaky(reruns=5)
def test_dta_solver_due_grid_cpp_vs_python():
"""SolverDUE on 9x9 grid: C++ and Python produce similar TTT."""
from uxsim.DTAsolvers import SolverDUE

solver_cpp = SolverDUE(lambda: _create_dta_grid_world(seed=42, cpp=True), cpp=True)
solver_cpp.solve(max_iter=1, print_progress=False)
ttt_cpp = solver_cpp.W_sol.analyzer.total_travel_time
trips_cpp = solver_cpp.W_sol.analyzer.trip_completed

solver_py = SolverDUE(lambda: _create_dta_grid_world(seed=42, cpp=False))
solver_py.solve(max_iter=1, print_progress=False)
ttt_py = solver_py.W_sol.analyzer.total_travel_time
trips_py = solver_py.W_sol.analyzer.trip_completed

assert trips_cpp == trips_py
assert ttt_cpp == pytest.approx(ttt_py, rel=0.2)


@pytest.mark.flaky(reruns=5)
def test_dta_solver_dso_d2d_grid_cpp_vs_python():
"""SolverDSO_D2D on 9x9 grid: C++ and Python produce similar TTT."""
from uxsim.DTAsolvers import SolverDSO_D2D

solver_cpp = SolverDSO_D2D(lambda: _create_dta_grid_world(seed=42, cpp=True), cpp=True)
solver_cpp.solve(max_iter=1, print_progress=False)
ttt_cpp = solver_cpp.W_sol.analyzer.total_travel_time
trips_cpp = solver_cpp.W_sol.analyzer.trip_completed

solver_py = SolverDSO_D2D(lambda: _create_dta_grid_world(seed=42, cpp=False))
solver_py.solve(max_iter=1, print_progress=False)
ttt_py = solver_py.W_sol.analyzer.total_travel_time
trips_py = solver_py.W_sol.analyzer.trip_completed

assert trips_cpp == trips_py
assert ttt_cpp == pytest.approx(ttt_py, rel=0.2)


def test_dta_solver_due_grid_convergence_metrics_cpp():
"""SolverDUE on 9x9 grid: tracks convergence metrics (ttts, n_swaps) with cpp=True."""
from uxsim.DTAsolvers import SolverDUE

n_iter = 3
solver = SolverDUE(lambda: _create_dta_grid_world(seed=42, cpp=True), cpp=True)
solver.solve(max_iter=n_iter, print_progress=False)

assert len(solver.ttts) == n_iter
assert len(solver.n_swaps) == n_iter
assert all(t > 0 for t in solver.ttts)
assert solver.W_sol is not None


def test_dta_solver_dso_d2d_grid_convergence_metrics_cpp():
"""SolverDSO_D2D on 9x9 grid: tracks convergence metrics (ttts, n_swaps) with cpp=True."""
from uxsim.DTAsolvers import SolverDSO_D2D

n_iter = 3
solver = SolverDSO_D2D(lambda: _create_dta_grid_world(seed=42, cpp=True), cpp=True)
solver.solve(max_iter=n_iter, print_progress=False)

assert len(solver.ttts) == n_iter
assert len(solver.n_swaps) == n_iter
assert all(t > 0 for t in solver.ttts)
assert solver.W_sol is not None


def test_dta_solver_due_grid_duo_baseline_cpp():
"""DUO on 9x9 grid with cpp=True produces same TTT as Python DUO."""
W_cpp = _create_dta_grid_world(seed=42, cpp=True)
W_cpp.exec_simulation()
W_cpp.analyzer.print_simple_stats(force_print=False)
ttt_cpp = W_cpp.analyzer.total_travel_time

W_py = _create_dta_grid_world(seed=42, cpp=False)
W_py.exec_simulation()
W_py.analyzer.print_simple_stats(force_print=False)
ttt_py = W_py.analyzer.total_travel_time

assert ttt_cpp > 0
assert ttt_cpp == pytest.approx(ttt_py, rel=0.1)


def test_dta_solver_due_grid_benchmark_cpp():
"""Benchmark: SolverDUE on 9x9 grid with cpp=True, prints wall-clock time."""
import time
from uxsim.DTAsolvers import SolverDUE

t0 = time.perf_counter()
solver = SolverDUE(lambda: _create_dta_grid_world(seed=42, cpp=True), cpp=True)
solver.solve(max_iter=1, print_progress=False)
elapsed_cpp = time.perf_counter() - t0

t0 = time.perf_counter()
solver_py = SolverDUE(lambda: _create_dta_grid_world(seed=42, cpp=False))
solver_py.solve(max_iter=1, print_progress=False)
elapsed_py = time.perf_counter() - t0

speedup = elapsed_py / elapsed_cpp if elapsed_cpp > 0 else float("inf")
print(f"\n[DTA Benchmark] SolverDUE 9x9 grid (max_iter=1):")
print(f" C++: {elapsed_cpp:.3f}s")
print(f" Python: {elapsed_py:.3f}s")
print(f" Speedup: {speedup:.2f}x")

assert solver.W_sol.analyzer.total_travel_time > 0


def test_dta_solver_dso_d2d_grid_benchmark_cpp():
"""Benchmark: SolverDSO_D2D on 9x9 grid with cpp=True, prints wall-clock time."""
import time
from uxsim.DTAsolvers import SolverDSO_D2D

t0 = time.perf_counter()
solver = SolverDSO_D2D(lambda: _create_dta_grid_world(seed=42, cpp=True), cpp=True)
solver.solve(max_iter=1, print_progress=False)
elapsed_cpp = time.perf_counter() - t0

t0 = time.perf_counter()
solver_py = SolverDSO_D2D(lambda: _create_dta_grid_world(seed=42, cpp=False))
solver_py.solve(max_iter=1, print_progress=False)
elapsed_py = time.perf_counter() - t0

speedup = elapsed_py / elapsed_cpp if elapsed_cpp > 0 else float("inf")
print(f"\n[DTA Benchmark] SolverDSO_D2D 9x9 grid (max_iter=1):")
print(f" C++: {elapsed_cpp:.3f}s")
print(f" Python: {elapsed_py:.3f}s")
print(f" Speedup: {speedup:.2f}x")

assert solver.W_sol.analyzer.total_travel_time > 0
Loading
Loading