Adafruit-NextBus/predict.py
2015-02-26 21:22:21 -08:00

70 lines
2.5 KiB
Python

# NextBus prediction class. For each route/stop, NextBus server is polled
# automatically at regular intervals. Front-end app just needs to init
# this with stop data, which can be found using the routefinder.py script.
import threading
import time
import urllib
from xml.dom.minidom import parseString
class predict:
interval = 120 # Default polling interval = 2 minutes
initSleep = 0 # Stagger polling threads to avoid load spikes
# predict object initializer. 1 parameter, a 4-element tuple:
# First element is agengy tag (e.g. 'actransit')
# Second is line tag (e.g. '210')
# Third is stop tag (e.g. '0702630')
# Fourth is direction -- not a tag, this element is human-readable
# and editable (e.g. 'Union Landing') -- for UI purposes you may
# want to keep this short. The other elements MUST be kept
# verbatim as displayed by the routefinder.py script.
# Each predict object spawns its own thread and will perform
# periodic server queries in the background, which can then be
# read via the predictions[] list (est. arrivals, in seconds).
def __init__(self, data):
self.data = data
self.predictions = []
self.lastQueryTime = time.time()
t = threading.Thread(target=self.thread)
t.daemon = True
t.start()
# Periodically get predictions from server ---------------------------
def thread(self):
initSleep = predict.initSleep
predict.initSleep += 5 # Thread staggering may
time.sleep(initSleep) # drift over time, no problem
while True:
dom = predict.req('predictions' +
'&a=' + self.data[0] + # Agency
'&r=' + self.data[1] + # Route
'&s=' + self.data[2]) # Stop
if dom is None: return # Connection error
self.lastQueryTime = time.time()
predictions = dom.getElementsByTagName('prediction')
newList = []
for p in predictions: # Build new prediction list
newList.append(
int(p.getAttribute('seconds')))
self.predictions = newList # Replace current list
time.sleep(predict.interval)
# Open URL, send request, read & parse XML response ------------------
@staticmethod
def req(cmd):
xml = None
try:
connection = urllib.urlopen(
'http://webservices.nextbus.com' +
'/service/publicXMLFeed?command=' + cmd)
raw = connection.read()
connection.close()
xml = parseString(raw)
finally:
return xml
# Set polling interval (seconds) -------------------------------------
@staticmethod
def setInterval(i):
interval = i