tests: Add 3 more tests for _thread module.

uheapq-ticks
Damien George 2016-04-20 14:23:55 +00:00
parent ed36632c6c
commit 2d5ea38b49
3 changed files with 105 additions and 0 deletions

View File

@ -0,0 +1,46 @@
# test using lock to coordinate access to global mutable objects
#
# MIT license; Copyright (c) 2016 Damien P. George on behalf of Pycom Ltd
import _thread
def fac(n):
x = 1
for i in range(1, n + 1):
x *= i
return x
def thread_entry():
while True:
with jobs_lock:
try:
f, arg = jobs.pop(0)
except IndexError:
return
ans = f(arg)
with output_lock:
output.append((arg, ans))
# create a list of jobs
jobs = [(fac, i) for i in range(20, 80)]
jobs_lock = _thread.allocate_lock()
n_jobs = len(jobs)
# create a list to store the results
output = []
output_lock = _thread.allocate_lock()
# spawn threads to do the jobs
for i in range(4):
_thread.start_new_thread(thread_entry, ())
# wait for the jobs to complete
while True:
with jobs_lock:
if len(output) == n_jobs:
break
# sort and print the results
output.sort(key=lambda x: x[0])
for arg, ans in output:
print(arg, ans)

View File

@ -0,0 +1,36 @@
# test setting the thread stack size
#
# MIT license; Copyright (c) 2016 Damien P. George on behalf of Pycom Ltd
import sys
try:
import utime as time
except ImportError:
import time
import _thread
# different implementations have different minimum sizes
if sys.implementation == 'micropython':
sz = 2 * 1024
else:
sz = 32 * 1024
def foo():
pass
def thread_entry():
foo()
# test set/get of stack size
print(_thread.stack_size())
print(_thread.stack_size(sz))
print(_thread.stack_size() == sz)
print(_thread.stack_size())
# set stack size and spawn a few threads
_thread.stack_size(sz)
for i in range(2):
_thread.start_new_thread(thread_entry, ())
time.sleep(0.2)
print('done')

View File

@ -0,0 +1,23 @@
# test hitting the function recursion limit within a thread
#
# MIT license; Copyright (c) 2016 Damien P. George on behalf of Pycom Ltd
try:
import utime as time
except ImportError:
import time
import _thread
def foo():
foo()
def thread_entry():
try:
foo()
except RuntimeError:
print('RuntimeError')
_thread.start_new_thread(thread_entry, ())
time.sleep(0.2)
print('done')