circuitpython/tests/extmod/asyncio_new_event_loop.py
Angus Gratton eb80b04944 tests/extmod: Workaround CPython warning in asyncio_new_event_loop test.
This started failing in CI on the mingw build, after CPython updated to
3.12.7. The test prints two warnings during interpreter shutdown of
"Task was destroyed but it is pending!".

This didn't happen on other CPython builds, and I think that's because of
finalizer order in CPython interpreter shutdown but not certain (the loop
finalizer calls loop.close() if not already closed).

Adding explicit calls to loop.close() causes the warning to be printed on
every run with CPython 3.12.7 on Linux.

Next, added the workaround exception handler to swallow this exception
as MicroPython doesn't produce an equivalent.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-11-28 23:12:34 +11:00

52 lines
1.4 KiB
Python

# Test Loop.new_event_loop()
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
# CPython 3.12 deprecated calling get_event_loop() when there is no current event
# loop, so to make this test run on CPython requires setting the event loop.
if hasattr(asyncio, "set_event_loop"):
asyncio.set_event_loop(asyncio.new_event_loop())
def exception_handler(loop, context):
# This is a workaround for a difference between CPython and MicroPython: if
# a CPython event loop is closed while there are tasks pending (i.e. not finished)
# on it, then the task will log an error. MicroPython does not log this error.
if context.get("message", "") == "Task was destroyed but it is pending!":
pass
else:
loop.default_exception_handler(context)
async def task():
for i in range(4):
print("task", i)
await asyncio.sleep(0)
await asyncio.sleep(0)
async def main():
print("start")
loop.create_task(task())
await asyncio.sleep(0) # yields, meaning new task will run once
print("stop")
loop.stop()
# Use default event loop to run some tasks
loop = asyncio.get_event_loop()
loop.set_exception_handler(exception_handler)
loop.create_task(main())
loop.run_forever()
loop.close()
# Create new event loop, old one should not keep running
loop = asyncio.new_event_loop()
loop.set_exception_handler(exception_handler)
loop.create_task(main())
loop.run_forever()
loop.close()