The native emitter will not release/bounce the GIL when running code, so if it runs tight loops then no other threads get a chance to run (if the GIL is enabled). So for the thread tests, explicitly include a call to `time.sleep(0)` (or equivalent) to bounce the GIL and give other threads a chance to run. For some tests (eg `thread_coop.py`) the whole point of the test is to test that the GIL is correctly bounced. So for those cases force the use of the bytecode emitter for the busy functions. Signed-off-by: Damien George <damien@micropython.org>
50 lines
1.1 KiB
Python
50 lines
1.1 KiB
Python
# test that we can run the garbage collector within threads
|
|
#
|
|
# MIT license; Copyright (c) 2016 Damien P. George on behalf of Pycom Ltd
|
|
|
|
import time
|
|
import gc
|
|
import _thread
|
|
|
|
|
|
def thread_entry(n):
|
|
# allocate a bytearray and fill it
|
|
data = bytearray(i for i in range(256))
|
|
|
|
# do some work and call gc.collect() a few times
|
|
for i in range(n):
|
|
for i in range(len(data)):
|
|
data[i] = data[i]
|
|
gc.collect()
|
|
|
|
# check whether the data remains intact and indicate we are finished
|
|
with lock:
|
|
global n_correct, n_finished
|
|
n_correct += list(data) == list(range(256))
|
|
n_finished += 1
|
|
|
|
|
|
lock = _thread.allocate_lock()
|
|
n_thread = 0
|
|
n_thread_max = 4
|
|
n_correct = 0
|
|
n_finished = 0
|
|
|
|
# spawn threads
|
|
for _ in range(n_thread_max):
|
|
try:
|
|
_thread.start_new_thread(thread_entry, (10,))
|
|
n_thread += 1
|
|
except OSError:
|
|
# System cannot create a new thead, so stop trying to create them.
|
|
break
|
|
|
|
# also run the function on this main thread
|
|
thread_entry(10)
|
|
n_thread += 1
|
|
|
|
# busy wait for threads to finish
|
|
while n_finished < n_thread:
|
|
time.sleep(0)
|
|
|
|
print(n_correct == n_finished)
|