linuxcnc/lib/python/qtvcp/core.py

325 lines
9.6 KiB
Python

#!/usr/bin/env python3
# vim: sts=4 sw=4 et
import linuxcnc
from gi.repository import GObject
import inspect
import _hal
import hal
import traceback
from PyQt5.QtCore import QObject, QTimer, pyqtSignal
from hal_glib import GStat
from qtvcp.qt_istat import _IStat as IStatParent
# Set up logging
from . import logger
log = logger.getLogger(__name__)
# log.setLevel(logger.INFO) # One of DEBUG, INFO, WARNING, ERROR, CRITICAL, VERBOSE
################################################################
# IStat class
################################################################
class Info(IStatParent):
_instance = None
_instanceNum = 0
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = IStatParent.__new__(cls, *args, **kwargs)
return cls._instance
# Now that the class is defined create a reference to it for the other classes
INI = Info()
class QPin(hal.Pin, QObject):
value_changed = pyqtSignal('PyQt_PyObject')
REGISTRY = []
UPDATE = False
def __init__(self, *a, **kw):
super(QPin, self).__init__(*a, **kw)
QObject.__init__(self, None)
self._item_wrap(self._item)
self._prev = None
self.REGISTRY.append(self)
self.update_start()
def update(self):
tmp = self.get()
if tmp != self._prev:
self.value_changed.emit(tmp)
self._prev = tmp
def text(self):
return self.get_name()
# always returns False because
# there was no errpr when making pin
# see class DUMMY
def error(self):
return False
@classmethod
def update_all(cls):
if not cls.UPDATE:
return
kill = []
for p in cls.REGISTRY:
try:
p.update()
except Exception as e:
kill.append(p)
log.error("Error updating pin {}; Removing".format(p))
log.exception(e)
for p in kill:
cls.REGISTRY.remove(p)
return cls.UPDATE
@classmethod
def update_start(cls):
if QPin.UPDATE:
return
QPin.UPDATE = True
cls.timer = QTimer()
cls.timer.timeout.connect(cls.update_all)
cls.timer.start(INI.HALPIN_CYCLE_TIME)
@classmethod
def update_stop(cls):
QPin.UPDATE = False
# so errors when making QPins aren't fatal
class DummyPin(QObject):
value_changed = pyqtSignal('PyQt_PyObject')
def __init__(self, *a, **kw):
super(DummyPin, self).__init__(None)
self._a = a
self._kw = kw
# always returns True because
# there was an errpr when making HAL pin
# see class QPin
def error(self):
return True
def getError(self):
print('{}'.format(self._kw.get('ERROR')))
def get(self):
return 0
def set(self, *a, **kw):
pass
def get_name(self):
return self._a[0]
class _QHal(object):
HAL_BIT = hal.HAL_BIT
HAL_FLOAT = hal.HAL_FLOAT
HAL_S32 = hal.HAL_S32
HAL_U32 = hal.HAL_U32
HAL_IN = hal.HAL_IN
HAL_OUT = hal.HAL_OUT
HAL_IO = hal.HAL_IO
HAL_RO = hal.HAL_RO
HAL_RW = hal.HAL_RW
def __new__(cls, *a, **kw):
instance = super(_QHal, cls).__new__(cls)
instance.__init__(*a, **kw)
return instance
def __init__(self, comp=None, hal=None):
# only initialize once for all instances
if not self.__class__._instance is None:
return
if isinstance(comp, _QHal):
comp = comp.comp
self.comp = comp
self.hal = hal
def newpin(self, *a, **kw):
try:
p = QPin(_hal.component.newpin(self.comp, *a, **kw))
except ValueError as e:
# if pin is already made, find a new name
if 'Duplicate pin name' in '{}'.format(e):
try:
# tuples are immutable, convert to list
y = list(a)
y[0] = self.makeUniqueName(y[0])
a = tuple(y)
# this late in the game, component is probably already 'ready'
if self.hal.component_is_ready(self.comp.getprefix()):
self.comp.unready()
p = QPin(_hal.component.newpin(self.comp, *a, **kw))
self.comp.ready()
else:
p = QPin(_hal.component.newpin(self.comp, *a, **kw))
except Exception as e:
raise
except Exception as e:
if log.getEffectiveLevel() == logger.VERBOSE:
raise
t = inspect.getframeinfo(inspect.currentframe().f_back)
log.error("Qhal: Error making new HAL pin: {}\n {}\n Line {}\n Function: {}".
format(e, t[0], t[1], t[2]))
log.error("Qhal: {}".format(traceback.format_exc()))
p = DummyPin(*a, ERROR=e)
return p
def getpin(self, *a, **kw): return QPin(_hal.component.getpin(self.comp, *a, **kw))
def getvalue(self, name):
try:
return hal.get_value(name)
except Exception as e:
log.error("Qhal: Error getting value of {}\n {}".format(name, e))
def setp(self,name, value):
try:
return hal.set_p(name,value)
except Exception as e:
log.error("Qhal: Error setting value of {} to {}\n {}".format(name,value, e))
def exit(self, *a, **kw): return self.comp.exit(*a, **kw)
# find a unique HAL pin name by adding '-x' to the base name
# x being an ever increasing number till name is unique
def makeUniqueName(self, name):
num = 2
base = self.comp.getprefix()
while True:
trial = ('{}.{}-{}').format(base,name,num)
for i in hal.get_info_pins():
if i['NAME'] == trial:
num +=1
trial = ('{}.{}-{}').format(base,name,num)
break
else:
break
return ('{}-{}').format(name,num)
def __getitem__(self, k): return self.comp[k]
def __setitem__(self, k, v): self.comp[k] = v
class Qhal(_QHal):
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = _QHal.__new__(cls, *args, **kwargs)
return cls._instance
################################################################
# GStat class
################################################################
# use the same Gstat as gladeVCP uses
# by subclassing it
class Status(GStat):
_instance = None
_instanceNum = 0
__gsignals__ = {
'toolfile-stale': (GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (GObject.TYPE_PYOBJECT,)),
}
# only make one instance of the class - pass it to all other
# requested instances
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = GStat.__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
# only initialize once for all instances
if self.__class__._instanceNum >= 1:
return
GObject.Object.__init__(self)
self.__class__._instanceNum += 1
super(GStat, self).__init__()
# set the default jog speeds before the forced update
self.current_jog_rate = INI.DEFAULT_LINEAR_JOG_VEL
self.current_angular_jog_rate = INI.DEFAULT_ANGULAR_JOG_VEL
# can only have ONE error channel instance in qtvcp
self.ERROR = linuxcnc.error_channel()
self._block_polling = False
# we override this function from hal_glib
def set_timer(self):
GObject.threads_init()
GObject.timeout_add(int(INI.CYCLE_TIME), self.update)
# error polling is usually set up by screen_option widget
# to call this function
# but when using MDI subprograms, the subprogram must be the only
# polling instance.
# this is done by blocking the main screen polling until the
# subprogram is done.
def poll_error(self):
if self._block_polling: return None
return self.ERROR.poll()
def block_error_polling(self, name=''):
if name: print(name,'block')
self._block_polling = True
def unblock_error_polling(self,name=''):
if name: print(name,'unblock')
self._block_polling = False
################################################################
# Lcnc_Action class
################################################################
from qtvcp.qt_action import _Lcnc_Action as _ActionParent
class Action(_ActionParent):
_instance = None
_instanceNum = 0
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = _ActionParent.__new__(cls, *args, **kwargs)
return cls._instance
################################################################
# TStat class
################################################################
from qtvcp.qt_tstat import _TStat as _TStatParent
class Tool(_TStatParent):
_instance = None
_instanceNum = 0
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = _TStatParent.__new__(cls, *args, **kwargs)
return cls._instance
################################################################
# PStat class
################################################################
from qtvcp.qt_pstat import _PStat as _PStatParent
class Path(_PStatParent):
_instance = None
_instanceNum = 0
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = _PStatParent.__new__(cls, *args, **kwargs)
return cls._instance