Skip to content

Conversation

@khsrali
Copy link
Contributor

@khsrali khsrali commented Jan 28, 2026

A bit of history

nest_asyncio monkey-patches asyncio internals to allow re-entrant run_until_complete() calls. Although convenient, this is inherently fragile and made debugging in some cases almost impossible. Moreover a lot of packages which were using nest_asyncio historically, moved to greenlet after the author have passed [which is deeply sad] for maintainability reasons.

This PR give synchronous code a clean, explicit way to call async operations.

Why did we need nest_asyncio in the first place?

The problem that nest_asyncio was solving for us is to allow async calls occur in a sync function.
Note that plumpy's Process model is fundamentally synchronous, that means process steps, calcfunctions, and workfunctions. But aiida engine that drives it (the state machine, the daemon, the runner) is async. Whenever synchronous process code needs to do something async (like running a nested process, cancelling a scheduler job, or performing a transport operation), it historically called loop.run_until_complete(). That doesn't work when the loop is already running (which it is inside the daemon, inside Jupyter, etc.), so nest_asyncio was used to make the loop re-entrant.

The replacement

The idea is instead of letting sync code call run_until_complete() on a running loop (which requires patching), we run the sync code inside a special "worker greenlet." When that code needs an async result, it cooperatively switches to a parent greenlet that lives in an async context, hands it the awaitable, the parent does the real await, then switches back with the result. From the sync code's perspective, the call looks blocking. From the event loop's perspective, nothing unusual happened — it just awaited a coroutine.

@codecov
Copy link

codecov bot commented Jan 28, 2026

Codecov Report

❌ Patch coverage is 68.53933% with 28 lines in your changes missing coverage. Please review.
✅ Project coverage is 90.21%. Comparing base (051b3fe) to head (c036472).
⚠️ Report is 3 commits behind head on master.

Files with missing lines Patch % Lines
src/plumpy/greenlet_bridge.py 60.00% 26 Missing ⚠️
src/plumpy/processes.py 89.48% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #331      +/-   ##
==========================================
- Coverage   90.83%   90.21%   -0.62%     
==========================================
  Files          22       23       +1     
  Lines        3009     3083      +74     
==========================================
+ Hits         2733     2781      +48     
- Misses        276      302      +26     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

"set_event_loop_policy()\n",
"assert isinstance(asyncio.get_event_loop_policy(), PlumpyEventLoopPolicy)\n",
"assert hasattr(asyncio.get_event_loop(), '_nest_patched')"
"assert asyncio.get_event_loop() is asyncio.get_event_loop()"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion checks if asyncio.get_event_loop() everytime returns the same result

@khsrali khsrali changed the title Replace nest_asyncio with greenlet for nested process execution Replace nest_asyncio with greenlet for nested process execution Jan 30, 2026
@agoscinski
Copy link
Contributor

agoscinski commented Feb 2, 2026

Hey @khsrali thank you for the work on this. I was considering to use greenback (https://greenback.readthedocs.io/en/latest/principle.html) to solve this problem of reentering the asyncio loop, since it can be used more or less like nest_asyncio. Here an example how it it works

import asyncio
import greenback


async def fetch_data(item_id: int) -> str:
    """Simulates an async data fetch."""
    await asyncio.sleep(0.1)
    return f"Data for item {item_id}"


def sync_processor(item_id: int) -> str:
    """A sync function that needs to call async code."""
    # Use greenback.await_() to call async code from sync context
    result = greenback.await_(fetch_data(item_id)) # instead of run_until_complete
    return f"Processed: {result}"


async def main():
    """Main async function that calls the sync function."""
    # Must call ensure_portal() to enable greenback in this async context
    await greenback.ensure_portal() # this is different to nest_asyncio

    print("Starting async context...")

    # Call sync function from async context
    # The sync function uses greenback.await_() to call async code
    result = sync_processor(42)
    print(result)


if __name__ == "__main__":
    asyncio.run(main())

Then to make it a custom loop policy as we do it with nest_asyncio now (see events.py) we could do

import asyncio
import greenback

##############
### plumpy ###
##############

async def _with_portal(coro):
    """Wrap coroutine to ensure portal exists before running."""
    await greenback.ensure_portal() # no-op when already done
    return await coro


class GreenbackEventLoop(asyncio.SelectorEventLoop):
    """Event loop with greenback portal support."""

    def run_until_complete(self, coro):
        if self.is_running():
            return greenback.await_(coro)
        return super().run_until_complete(_with_portal(coro))

    def create_task(self, coro, *, name=None, context=None):
        return super().create_task(_with_portal(coro), name=name, context=context)


class GreenbackEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
    """Policy that creates GreenbackEventLoop instances."""

    def new_event_loop(self):
        return GreenbackEventLoop()


##################
### aiida-core ###
##################

# Install the policy
asyncio.set_event_loop_policy(GreenbackEventLoopPolicy())

async def fetch_data(item_id: int) -> str:
    """Simulates an async data fetch."""
    await asyncio.sleep(0.1)
    return f"Data for item {item_id}"


def sync_processor(item_id: int) -> str:
    """A sync function using the standard run_until_complete pattern."""
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(fetch_data(item_id))
    return f"Processed: {result}"


async def main():
    """Main async function - no manual ensure_portal() needed."""
    print("Starting async context...")

    result = sync_processor(42)
    print(result)


if __name__ == "__main__":
    asyncio.run(main())

Note that we also needed to also monkey patch the create_task function to add the ensure_portal.


What do you think? I think using greenback is a much simpler solution than implementing our own bridge.

@khsrali
Copy link
Contributor Author

khsrali commented Feb 3, 2026

Hi @agoscinski

Thanks for your comment. I didn't know about greenback. I agree it's a simpler solution, however please note this was mainly an effort to get rid of monkey patching which was the main reason of difficulties in debugging, back tracing and also introducing edge-case hard to understand racing bugs see these two for example:
aiidateam/aiida-core#7144
and
aiidateam/aiida-core#7061
Both of them were direct consequence of using monkey patched loops, simple because we didn't know which loop are we in!

Before this PR, and educating myself with Claude and iterating back and forth for hours, I had no idea that what the hell is going on with all the monkey patched event loops in aiida-core. Monkey patching allows all sorta of nasty calls in the middle of nowhere that resulted in many overheads. Moreover it was not clear at all that which event loop are we in
and I believe that was due to monkey patching behaviour itself, and not having nested event loops.

If you look into def execute its only clear explicitly that which event loop are we going in after these changes, which makes debugging a lot easier.

I plan to run some performance test for both versions of aiida-core with nest_asyncio and greenlet.
You may also want to have a look at aiidateam/aiida-core#7188 in aiida-core, in the meantime.

if loop.is_running():
if in_worker_greenlet():
return sync_await(awaitable)
else:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be replaced with a greenlet worker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no because if a loop is running, you cannot start a new loop in another worker

@agoscinski
Copy link
Contributor

agoscinski commented Feb 4, 2026

@giovannipizzi asked to check if this also fixes the recursion limit problem as mentioned here
aiidateam/aiida-core#4876

I took the code from this comment aiidateam/aiida-core#4876 (comment)

import nest_asyncio
nest_asyncio.apply()

import asyncio
import sys

sys.setrecursionlimit(220)

# Running already 20 not-too-deep recursions crashes badly
# Note: we are calling a recursive co-routine that calls a routine that internally calls again the initial coroutine
number_of_tasks = 20
recursion_depth = 5


async def recursive_task(task_name, counter):
    if counter <= 0:
        return
    print(f"Task {task_name} - Counter: {counter}")
    intermediate_function(task_name, counter)

def intermediate_function(task_name, counter):
    asyncio.get_event_loop().run_until_complete(recursive_task(task_name, counter - 1))

async def main():
    tasks = [
        asyncio.create_task(recursive_task(f"Task {i}", recursion_depth))
        for i in range(number_of_tasks)
    ]
    # Wait for all tasks to complete
    await asyncio.gather(*tasks)

asyncio.run(main())

It will produce this error (horribly nested traceback)

AttributeError: 'Task' object has no attribute '_num_cancels_requested'

because when the recursion limit is hit it seems to leave the task in a corrupted state so it cannot error out properly. I don't fully understand it, but increasing the recursion limit fixes this error.

A greenlet solution does not produce this error

import asyncio
import sys
import greenback

sys.setrecursionlimit(70)

# Same test as nest_asyncio_recursion.py
number_of_tasks = 20
recursion_depth = 5


async def recursive_task(task_name, counter):
    if counter <= 0:
        return
    print(f"Task {task_name} - Counter: {counter}")
    intermediate_function(task_name, counter)


def intermediate_function(task_name, counter):
    greenback.await_(recursive_task(task_name, counter - 1))


async def main():
    await greenback.ensure_portal()

    tasks = [
        asyncio.create_task(task_with_portal(f"Task {i}", recursion_depth))
        for i in range(number_of_tasks)
    ]
    await asyncio.gather(*tasks)


async def task_with_portal(task_name, depth):
    """Wrapper to ensure each task has its own portal."""
    await greenback.ensure_portal()
    await recursive_task(task_name, depth)

asyncio.run(main())

I tried parameters
sys.setrecursionlimit(33) for increasing number_of_tasks (up to 100) to check that an error cannot be produced increasing the number of tasks that are created. This is the important point, with nest_asyncio somehow the number of concurrent tasks increases the callstack. So the max call stack is independent of the number of tasks with greenlet. For sys.setrecursionlimit(32) one can hit a recursion limit but the error is well behaved

Traceback (most recent call last):                                                                                                                                         
  File "/Users/alexgo/code/greenbackexample/greenback_recursion.py", line 41, in <module>                                                                                  
    asyncio.run(main())                                                                                                                                                    
  File "/Users/alexgo/miniconda3/lib/python3.12/asyncio/runners.py", line 194, in run                                                                                      
    return runner.run(main)                                                                                                                                                
           ^^^^^^^^^^^^^^^^                                                                                                                                                
  File "/Users/alexgo/miniconda3/lib/python3.12/asyncio/runners.py", line 118, in run                                                                                      
    return self._loop.run_until_complete(task)                                                                                                                             
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                             
[... a lot of greenback internals]
RecursionError: maximum recursion depth exceeded

This means that we can revert the change that dynamically increases the recursion limit which was a patch for this problem aiidateam/aiida-core#6052

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants