-
Notifications
You must be signed in to change notification settings - Fork 19
Replace nest_asyncio with greenlet for nested process execution
#331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #331 +/- ##
==========================================
- Coverage 90.83% 90.21% -0.62%
==========================================
Files 22 23 +1
Lines 3009 3083 +74
==========================================
+ Hits 2733 2781 +48
- Misses 276 302 +26 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| "set_event_loop_policy()\n", | ||
| "assert isinstance(asyncio.get_event_loop_policy(), PlumpyEventLoopPolicy)\n", | ||
| "assert hasattr(asyncio.get_event_loop(), '_nest_patched')" | ||
| "assert asyncio.get_event_loop() is asyncio.get_event_loop()" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion checks if asyncio.get_event_loop() everytime returns the same result
nest_asyncio with greenlet for nested process execution
|
Hey @khsrali thank you for the work on this. I was considering to use Then to make it a custom loop policy as we do it with nest_asyncio now (see Note that we also needed to also monkey patch the What do you think? I think using greenback is a much simpler solution than implementing our own bridge. |
|
Hi @agoscinski Thanks for your comment. I didn't know about greenback. I agree it's a simpler solution, however please note this was mainly an effort to get rid of monkey patching which was the main reason of difficulties in debugging, back tracing and also introducing edge-case hard to understand racing bugs see these two for example: Before this PR, and educating myself with Claude and iterating back and forth for hours, I had no idea that what the hell is going on with all the monkey patched event loops in aiida-core. Monkey patching allows all sorta of nasty calls in the middle of nowhere that resulted in many overheads. Moreover it was not clear at all that which event loop are we in If you look into I plan to run some performance test for both versions of aiida-core with nest_asyncio and greenlet. |
| if loop.is_running(): | ||
| if in_worker_greenlet(): | ||
| return sync_await(awaitable) | ||
| else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this be replaced with a greenlet worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no because if a loop is running, you cannot start a new loop in another worker
|
@giovannipizzi asked to check if this also fixes the recursion limit problem as mentioned here I took the code from this comment aiidateam/aiida-core#4876 (comment) import nest_asyncio
nest_asyncio.apply()
import asyncio
import sys
sys.setrecursionlimit(220)
# Running already 20 not-too-deep recursions crashes badly
# Note: we are calling a recursive co-routine that calls a routine that internally calls again the initial coroutine
number_of_tasks = 20
recursion_depth = 5
async def recursive_task(task_name, counter):
if counter <= 0:
return
print(f"Task {task_name} - Counter: {counter}")
intermediate_function(task_name, counter)
def intermediate_function(task_name, counter):
asyncio.get_event_loop().run_until_complete(recursive_task(task_name, counter - 1))
async def main():
tasks = [
asyncio.create_task(recursive_task(f"Task {i}", recursion_depth))
for i in range(number_of_tasks)
]
# Wait for all tasks to complete
await asyncio.gather(*tasks)
asyncio.run(main())It will produce this error (horribly nested traceback) because when the recursion limit is hit it seems to leave the task in a corrupted state so it cannot error out properly. I don't fully understand it, but increasing the recursion limit fixes this error. A greenlet solution does not produce this error import asyncio
import sys
import greenback
sys.setrecursionlimit(70)
# Same test as nest_asyncio_recursion.py
number_of_tasks = 20
recursion_depth = 5
async def recursive_task(task_name, counter):
if counter <= 0:
return
print(f"Task {task_name} - Counter: {counter}")
intermediate_function(task_name, counter)
def intermediate_function(task_name, counter):
greenback.await_(recursive_task(task_name, counter - 1))
async def main():
await greenback.ensure_portal()
tasks = [
asyncio.create_task(task_with_portal(f"Task {i}", recursion_depth))
for i in range(number_of_tasks)
]
await asyncio.gather(*tasks)
async def task_with_portal(task_name, depth):
"""Wrapper to ensure each task has its own portal."""
await greenback.ensure_portal()
await recursive_task(task_name, depth)
asyncio.run(main())I tried parameters This means that we can revert the change that dynamically increases the recursion limit which was a patch for this problem aiidateam/aiida-core#6052 |
A bit of history
nest_asynciomonkey-patches asyncio internals to allow re-entrant run_until_complete() calls. Although convenient, this is inherently fragile and made debugging in some cases almost impossible. Moreover a lot of packages which were usingnest_asynciohistorically, moved togreenletafter the author have passed [which is deeply sad] for maintainability reasons.This PR give synchronous code a clean, explicit way to call async operations.
Why did we need
nest_asyncioin the first place?The problem that
nest_asynciowas solving for us is to allow async calls occur in a sync function.Note that plumpy's Process model is fundamentally synchronous, that means process steps, calcfunctions, and workfunctions. But aiida engine that drives it (the state machine, the daemon, the runner) is async. Whenever synchronous process code needs to do something async (like running a nested process, cancelling a scheduler job, or performing a transport operation), it historically called
loop.run_until_complete(). That doesn't work when the loop is already running (which it is inside the daemon, inside Jupyter, etc.), sonest_asynciowas used to make the loop re-entrant.The replacement
The idea is instead of letting sync code call
run_until_complete()on a running loop (which requires patching), we run the sync code inside a special "worker greenlet." When that code needs an async result, it cooperatively switches to a parent greenlet that lives in an async context, hands it the awaitable, the parent does the realawait, then switches back with the result. From the sync code's perspective, the call looks blocking. From the event loop's perspective, nothing unusual happened — it just awaited a coroutine.