36 lines
963 B
Python
Executable file
36 lines
963 B
Python
Executable file
#!/usr/bin/env python3
|
|
import os
|
|
os.system("halcmd unload all")
|
|
import hal
|
|
c = hal.component("stream_test")
|
|
writer = hal.stream(c, hal.streamer_base, 10, "bfsu")
|
|
c.ready()
|
|
|
|
for i in range(9):
|
|
assert writer.writable
|
|
writer.write((i % 2, i, i, i))
|
|
assert not writer.writable
|
|
assert writer.num_overruns == 0
|
|
try:
|
|
writer.write((1,1,1,1))
|
|
except:
|
|
pass
|
|
else:
|
|
assert False, "failed to get exception on full stream"
|
|
assert writer.num_overruns == 1
|
|
|
|
# In rtai realtime, it's only permitted to map a shared memory region more
|
|
# once in a free-running component, so the writer and reader of a stream can't
|
|
# exist at the same time in the same process
|
|
del writer
|
|
|
|
reader = hal.stream(c, hal.streamer_base, "bfsu")
|
|
for i in range(9):
|
|
assert reader.readable
|
|
assert reader.read() == ((i % 2, i, i, i))
|
|
assert reader.num_underruns == 0
|
|
assert reader.sampleno == i+1
|
|
assert reader.read() is None
|
|
assert reader.num_underruns == 1
|
|
|
|
print("pass")
|