#!/usr/bin/env python3 import flask app = flask.Flask(__name__) from . import config import hashlib import hmac import json import pytz import traceback from binascii import hexlify from datetime import datetime, timedelta from urllib.parse import urlencode from urllib.request import Request, urlopen request_cache = {} def do_request(endpoint, args=None): url = endpoint + '?devid=' + config.PTV_USER_ID if args: url += '&' + urlencode(args) # Check cache timenow = pytz.utc.localize(datetime.utcnow()).astimezone(timezone) cachexp = timenow - timedelta(seconds=60) if url in request_cache: if request_cache[url][0] >= cachexp: # Use cached response return request_cache[url][1] # Generate signature signature = hexlify(hmac.digest(config.PTV_API_KEY.encode('ascii'), url.encode('ascii'), 'sha1')).decode('ascii') req = Request('https://timetableapi.ptv.vic.gov.au' + url + '&signature=' + signature, headers={'User-Agent': 'virtual-metro/0.1'}) try: resp = urlopen(req) except Exception as ex: print('Unable to refresh cache') traceback.print_exc() if url in request_cache: return request_cache[url][1] else: return None data = json.load(resp) # Cache the response request_cache[url] = (timenow, data) return data def parse_date(dtstring): return pytz.utc.localize(datetime.strptime(dtstring, '%Y-%m-%dT%H:%M:%SZ')).astimezone(timezone) ROUTE_TYPE = 0 LOOP_STATIONS = ['Parliament', 'Melbourne Central', 'Flagstaff', 'Southern Cross', 'Flinders Street'] timezone = pytz.timezone('Australia/Melbourne') @app.route('/') @app.route('/index') def index(): return flask.render_template('index.html') def stop_to_name(stop, route_id): name = stop['stop_name'] if name.endswith(' Station'): name = name[:-8] if name == 'Flemington Racecourse' and route_id and route_id != 16 and route_id != 17 and route_id != 1482: # PTV bug?? name = 'South Kensington' return name route_stops = {} # Cache lookup def parse_departure(departure, departures, timenow): result = {} result['dest'] = departures['runs'][str(departure['run_id'])]['destination_name'] result['sch'] = parse_date(departure['scheduled_departure_utc']).strftime('%I:%M').lstrip('0') mins = (parse_date(departure['estimated_departure_utc'] or departure['scheduled_departure_utc']) - timenow).total_seconds() / 60 if mins < 0.5: result['now'] = 'NOW' else: result['min'] = round(mins) # Get stopping pattern result['stops'] = [] pattern = do_request('/v3/pattern/run/{}/route_type/{}'.format(departure['run_id'], ROUTE_TYPE), {'expand': 'all'}) pattern_stops = [(x['stop_id'], stop_to_name(pattern['stops'][str(x['stop_id'])], departure['route_id']), False) for x in pattern['departures']] # Get all stops on route if (departure['route_id'], departure['direction_id']) not in route_stops: stops = do_request('/v3/stops/route/{}/route_type/{}'.format(departure['route_id'], ROUTE_TYPE), {'direction_id': departure['direction_id']}) stops['stops'].sort(key=lambda x: x['stop_sequence']) route_stops[(departure['route_id'], departure['direction_id'])] = stops['stops'] route_stops_dir = [] for stop in route_stops[(departure['route_id'], departure['direction_id'])]: # Cut off at Flinders Street route_stops_dir.append(stop) if stop_to_name(stop, departure['route_id']) == 'Flinders Street': break # Calculate stopping pattern result['stops'] = [] # Find this run along pattern_stops ps_route_start = next((k for k, stop in enumerate(pattern_stops) if stop[0] == int(flask.request.args['stop_id'])), 0) ps_route_end = next((k for k, stop in enumerate(pattern_stops) if stop[0] == pattern['runs'][str(departure['run_id'])]['final_stop_id']), len(pattern_stops)-1) ps_route_flinders = next((k for k, stop in enumerate(pattern_stops) if stop[0] == 1071), len(pattern_stops)-1) if ps_route_flinders > ps_route_start: ps_route_end = min(ps_route_end, ps_route_flinders) # Cut off at Flinders Street # Find this run along route_stops_dir rsd_route_start = next((k for k, stop in enumerate(route_stops_dir) if stop['stop_id'] == int(flask.request.args['stop_id'])), 0) rsd_route_end = next((k for k, stop in enumerate(route_stops_dir) if stop['stop_id'] == pattern['runs'][str(departure['run_id'])]['final_stop_id']), len(route_stops_dir)-1) rsd_route_flinders = next((k for k, stop in enumerate(route_stops_dir) if stop['stop_id'] == 1071), len(route_stops_dir)-1) if rsd_route_flinders > rsd_route_start: rsd_route_end = min(rsd_route_end, rsd_route_flinders) # Add express stops for k, stop in enumerate(route_stops_dir[rsd_route_start:rsd_route_end+1]): if any(x[0] == stop['stop_id'] for x in pattern_stops): continue if stop_to_name(stop, departure['route_id']) in LOOP_STATIONS: continue # Express stop # Identify location based on previous stop if rsd_route_start+k == 0: ps_index = 0 else: ps_index = next((l+1 for l, stop in enumerate(pattern_stops) if stop[0] == route_stops_dir[rsd_route_start+k-1]['stop_id']), None) if ps_index is None: continue pattern_stops.insert(ps_index, (stop['stop_id'], stop_to_name(stop, departure['route_id']), True)) ps_route_end += 1 # Convert to string express_stops = [] num_city_loop = 0 for _, stop_name, is_express in pattern_stops[ps_route_start+1:ps_route_end+1]: if is_express: result['stops'].append(' ---') express_stops.append(stop_name) else: result['stops'].append(stop_name) if stop_name in LOOP_STATIONS: num_city_loop += 1 # Impute remainder of journey if result['stops'] and result['stops'][-1] == 'Parliament': result['stops'].append('Melbourne Central') result['stops'].append('Flagstaff') result['stops'].append('Southern Cross') result['stops'].append('Flinders Street') num_city_loop += 4 result['dest'] = result['stops'][-1] if result['stops'] else None if result['dest'] is None: result['dest'] = 'Arrival' result['desc'] = '' elif result['dest'] in LOOP_STATIONS: # Up train # Is this a City Loop train? if num_city_loop >= 3: result['dest'] = 'City Loop' result['desc'] = 'All Except {}'.format(express_stops[0]) if len(express_stops) == 1 else 'Limited Express' if len(express_stops) > 1 else 'Stops All Stations' else: # Down train # Is this a via City Loop train? if num_city_loop >= 3: result['desc'] = 'Ltd Express via City Loop' if len(express_stops) > 1 else 'Stops All via City Loop' else: result['desc'] = 'All Except {}'.format(express_stops[0]) if len(express_stops) == 1 else 'Limited Express' if len(express_stops) > 1 else 'Stops All Stations' return result @app.route('/latest') def latest(): timenow = pytz.utc.localize(datetime.utcnow()).astimezone(timezone) result = {} result['time_offset'] = timenow.utcoffset().total_seconds() departures = do_request('/v3/departures/route_type/{}/stop/{}'.format(ROUTE_TYPE, flask.request.args['stop_id']), {'platform_numbers': flask.request.args['plat_id'], 'max_results': '5', 'expand': 'all'}) departures['departures'].sort(key=lambda x: x['scheduled_departure_utc']) if len(departures['departures']) == 0: # Invalid stop ID, platform ID, no departures, etc. return flask.jsonify(result) result['stop_name'] = stop_to_name(departures['stops'][flask.request.args['stop_id']], None) # Next train for i, departure in enumerate(departures['departures']): if parse_date(departure['estimated_departure_utc'] or departure['scheduled_departure_utc']) < timenow: continue # This is the next train result.update(parse_departure(departure, departures, timenow)) break # Next trains result['next'] = [] for departure in departures['departures'][i+1:i+3]: result['next'].append(parse_departure(departure, departures, timenow)) return flask.jsonify(result) # Cache list of stations stns = None def get_station_list(): global stns if not stns: stn_list = {} routes = do_request('/v3/routes', {'route_types': ROUTE_TYPE}) for route in routes['routes']: stops = do_request('/v3/stops/route/{}/route_type/{}'.format(route['route_id'], ROUTE_TYPE), {}) for stop in stops['stops']: stn_list[stop['stop_id']] = stop['stop_name'].replace(' Station', '') stns = [(k, v) for k, v in stn_list.items()] stns.sort(key=lambda x: x[1]) if not app.debug: get_station_list() @app.route('/stations') def stations(): get_station_list() return flask.render_template('stations.html', stations=stns)