linuxcnc/tests/halmodule.1/stream_test.py

36 lines
963 B
Python
Executable file

#!/usr/bin/env python3
import os
os.system("halcmd unload all")
import hal
c = hal.component("stream_test")
writer = hal.stream(c, hal.streamer_base, 10, "bfsu")
c.ready()
for i in range(9):
assert writer.writable
writer.write((i % 2, i, i, i))
assert not writer.writable
assert writer.num_overruns == 0
try:
writer.write((1,1,1,1))
except:
pass
else:
assert False, "failed to get exception on full stream"
assert writer.num_overruns == 1
# In rtai realtime, it's only permitted to map a shared memory region more
# once in a free-running component, so the writer and reader of a stream can't
# exist at the same time in the same process
del writer
reader = hal.stream(c, hal.streamer_base, "bfsu")
for i in range(9):
assert reader.readable
assert reader.read() == ((i % 2, i, i, i))
assert reader.num_underruns == 0
assert reader.sampleno == i+1
assert reader.read() is None
assert reader.num_underruns == 1
print("pass")