Eos/eosweb/core/main.py

501 lines
17 KiB
Python

# Eos - Verifiable elections
# Copyright © 2017-18 RunasSudo (Yingtong Li)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import click
import flask
import flask_session
import timeago
from eos.core.objects import *
from eos.core.tasks import *
from eos.base.election import *
from eos.base.tasks import *
from eos.base.workflow import *
from eos.psr.crypto import *
from eos.psr.election import *
from eos.psr.mixnet import *
from eos.psr.workflow import *
from eosweb.core.tasks import *
from . import emails
import eos.core.hashing
import eosweb
from datetime import datetime
import functools
import importlib
import io
import json
import os
import pytz
import subprocess
import uuid
app = flask.Flask(__name__, static_folder=None)
# Load config
app.config.from_object('eosweb.core.settings')
if 'EOSWEB_SETTINGS' in os.environ:
app.config.from_envvar('EOSWEB_SETTINGS')
# Load app config
for app_name in app.config['APPS']:
app.config.from_object(app_name + '.settings')
if 'EOSWEB_SETTINGS' in os.environ:
app.config.from_envvar('EOSWEB_SETTINGS')
# Connect to database
db_connect(app.config['DB_NAME'], app.config['DB_URI'], app.config['DB_TYPE'])
# Configure sessions
if app.config['DB_TYPE'] == 'mongodb':
app.config['SESSION_TYPE'] = 'mongodb'
app.config['SESSION_MONGODB'] = dbinfo.provider.client
app.config['SESSION_MONGODB_DB'] = dbinfo.provider.db_name
elif app.config['DB_TYPE'] == 'postgresql':
app.config['SESSION_TYPE'] = 'sqlalchemy'
app.config['SQLALCHEMY_DATABASE_URI'] = app.config['DB_URI'] + app.config['DB_NAME']
flask_session.Session(app)
# Set configs
User.admins = app.config['ADMINS']
# Make Flask's serialisation, e.g. for sessions, EosObject aware
class EosObjectJSONEncoder(flask.json.JSONEncoder):
def default(self, obj):
if isinstance(obj, EosObject):
return EosObject.serialise_and_wrap(obj)
return super().default(obj)
class EosObjectJSONDecoder(flask.json.JSONDecoder):
def __init__(self, *args, **kwargs):
self.super_object_hook = kwargs.get('object_hook', None)
kwargs['object_hook'] = self.my_object_hook
super().__init__(*args, **kwargs)
def my_object_hook(self, val):
if 'type' in val:
if val['type'] in EosObject.objects:
return EosObject.deserialise_and_unwrap(val)
if self.super_object_hook:
return self.super_object_hook(val)
return val
app.json_encoder = EosObjectJSONEncoder
app.json_decoder = EosObjectJSONDecoder
# Patch Flask's static file sending to add no-cache
# Allow "caching", but require revalidation via 304 Not Modified
@app.route('/static/<path:filename>')
def static(filename):
cache_timeout = app.get_send_file_max_age(filename)
val = flask.send_from_directory('static', filename, cache_timeout=cache_timeout)
val.headers['Cache-Control'] = val.headers['Cache-Control'].replace('public', 'no-cache')
#import pdb; pdb.set_trace()
return val
@app.cli.command('test')
@click.option('--prefix', default=None)
@click.option('--lang', default=None)
def run_tests(prefix, lang):
import eos.tests
eos.tests.run_tests(prefix, lang)
# Create the session databases (SQL only)
@app.cli.command('sessdb')
def sessdb():
app.session_interface.db.create_all()
# TODO: Will remove this once we have a web UI
@app.cli.command('drop_db_and_setup')
def setup_test_election():
# DANGER!
dbinfo.provider.reset_db()
# Set up election
election = PSRElection()
election.workflow = PSRWorkflow()
# Set election details
election.name = 'Test Election'
from eos.redditauth.election import RedditUser
election.voters.append(UserVoter(user=EmailUser(name='Alice', email='alice@localhost')))
election.voters.append(UserVoter(user=EmailUser(name='Bob', email='bob@localhost')))
election.voters.append(UserVoter(user=EmailUser(name='Carol', email='carol@localhost')))
election.voters.append(UserVoter(user=RedditUser(username='RunasSudo')))
for voter in election.voters:
if isinstance(voter, UserVoter):
if isinstance(voter.user, EmailUser):
emails.voter_email_password(election, voter)
election.mixing_trustees.append(InternalMixingTrustee(name='Eos Voting'))
election.mixing_trustees.append(InternalMixingTrustee(name='Eos Voting'))
election.sk = EGPrivateKey.generate()
election.public_key = election.sk.public_key
question = PreferentialQuestion(prompt='President', choices=[
Ticket(name='ACME Party', choices=[
Choice(name='John Smith'),
Choice(name='Joe Bloggs', party='Independent ACME')
]),
Choice(name='John Q. Public')
], min_choices=0, max_choices=3, randomise_choices=True)
election.questions.append(question)
question = ApprovalQuestion(prompt='Chairman', choices=[Choice(name='John Doe'), Choice(name='Andrew Citizen')], min_choices=0, max_choices=1)
election.questions.append(question)
election.save()
@app.cli.command('verify_election')
@click.option('--electionid', default=None)
def verify_election(electionid):
if electionid is None:
election = Election.get_all()[0]
else:
election = Election.get_by_id(electionid)
election.verify()
print('The election has passed validation')
@app.cli.command('tally_stv')
@click.option('--electionid', default=None)
@click.option('--qnum', default=0)
@click.option('--randfile', default=None)
@click.option('--seats', default=1)
def tally_stv_election(electionid, qnum, randfile, numseats):
election = Election.get_by_id(electionid)
with open(randfile, 'r') as f:
dat = json.load(f)
task = TaskTallySTV(
election_id=election._id,
q_num=qnum,
random=dat,
num_seats=numseats,
status=TaskStatus.READY,
run_strategy=EosObject.lookup(app.config['TASK_RUN_STRATEGY'])()
)
task.save()
task.run()
@app.cli.command('run_task')
@click.option('--electionid', default=None)
@click.option('--task_name', default=None)
def tally_stv_election(electionid, task_name):
election = Election.get_by_id(electionid)
task = WorkflowTaskEntryWebTask(
election_id=election._id,
workflow_task=task_name,
status=TaskStatus.READY,
run_strategy=EosObject.lookup(app.config['TASK_RUN_STRATEGY'])()
)
task.save()
task.run()
@app.context_processor
def inject_globals():
return {'eos': eos, 'eosweb': eosweb, 'SHA256': eos.core.hashing.SHA256}
@app.template_filter('pretty_date')
def pretty_date(dt):
dt_local = dt.astimezone(pytz.timezone(app.config['TIMEZONE']))
return flask.Markup('<time datetime="{}" title="{}">{}</time>'.format(dt_local.strftime('%Y-%m-%dT%H:%M:%S%z'), dt_local.strftime('%Y-%m-%d %H:%M:%S %Z'), timeago.format(dt, DateTimeField.now())))
# Tickle the plumbus every request
@app.before_request
def tick_scheduler():
# Process pending tasks
TaskScheduler.tick()
# === Views ===
@app.route('/')
def index():
elections = Election.get_all()
elections.sort(key=lambda e: e.name)
elections_open = [e for e in elections if e.workflow.get_task('eos.base.workflow.TaskCloseVoting').status == WorkflowTaskStatus.READY]
elections_soon = [e for e in elections if e.workflow.get_task('eos.base.workflow.TaskOpenVoting').status != WorkflowTaskStatus.EXITED and e.workflow.get_task('eos.base.workflow.TaskOpenVoting').get_entry_task()]
elections_soon.sort(key=lambda e: e.workflow.get_task('eos.base.workflow.TaskOpenVoting').get_entry_task().run_at)
elections_closed = [e for e in elections if e.workflow.get_task('eos.base.workflow.TaskCloseVoting').status == WorkflowTaskStatus.EXITED]
elections_closed.sort(key=lambda e: e.workflow.get_task('eos.base.workflow.TaskCloseVoting').exited_at, reverse=True)
elections_closed = elections_closed[:5]
return flask.render_template('index.html', elections_open=elections_open, elections_soon=elections_soon, elections_closed=elections_closed)
@app.route('/elections')
def elections():
elections = Election.get_all()
elections.sort(key=lambda e: e.name)
return flask.render_template('elections.html', elections=elections)
def using_election(func):
@functools.wraps(func)
def wrapped(election_id, **kwargs):
election = Election.get_by_id(election_id)
return func(election, **kwargs)
return wrapped
def election_admin(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
if 'user' in flask.session and flask.session['user'].is_admin():
return func(*args, **kwargs)
else:
return flask.Response('Administrator credentials required', 403)
return wrapped
@app.route('/election/<election_id>/')
@using_election
def election_api_json(election):
is_full = 'full' in flask.request.args
return flask.Response(EosObject.to_json(EosObject.serialise_and_wrap(election, None, SerialiseOptions(should_protect=True, for_hash=(not is_full), combine_related=True))), mimetype='application/json')
@app.route('/election/<election_id>/view')
@using_election
def election_view(election):
return flask.render_template('election/view/view.html', election=election)
@app.route('/election/<election_id>/booth')
@using_election
def election_booth(election):
selection_model_view_map = EosObject.to_json({key._name: val for key, val in model_view_map.items()}) # ewww
auth_methods = EosObject.to_json(app.config['AUTH_METHODS'])
return flask.render_template('election/view/booth.html', election=election, selection_model_view_map=selection_model_view_map, auth_methods=auth_methods)
@app.route('/election/<election_id>/view/questions')
@using_election
def election_view_questions(election):
return flask.render_template('election/view/questions.html', election=election)
@app.route('/election/<election_id>/view/ballots')
@using_election
def election_view_ballots(election):
return flask.render_template('election/view/ballots.html', election=election)
@app.route('/election/<election_id>/voter/<voter_id>')
@using_election
def election_voter_view(election, voter_id):
voter_id = uuid.UUID(voter_id)
voter = next(voter for voter in election.voters if voter._id == voter_id)
return flask.render_template('election/voter/view.html', election=election, voter=voter)
@app.route('/election/<election_id>/view/trustees')
@using_election
def election_view_trustees(election):
return flask.render_template('election/view/trustees.html', election=election)
@app.route('/election/<election_id>/admin')
@using_election
@election_admin
def election_admin_summary(election):
return flask.render_template('election/admin/admin.html', election=election)
@app.route('/election/<election_id>/admin/enter_task')
@using_election
@election_admin
def election_admin_enter_task(election):
workflow_task = election.workflow.get_task(flask.request.args['task_name'])
if workflow_task.status != WorkflowTaskStatus.READY:
return flask.Response('Task is not yet ready or has already exited', 409)
task = WorkflowTaskEntryWebTask(
election_id=election._id,
workflow_task=workflow_task._name,
status=TaskStatus.READY,
run_strategy=EosObject.lookup(app.config['TASK_RUN_STRATEGY'])()
)
task.run()
return flask.redirect(flask.url_for('election_admin_summary', election_id=election._id))
@app.route('/election/<election_id>/admin/schedule_task', methods=['POST'])
@using_election
@election_admin
def election_admin_schedule_task(election):
workflow_task = election.workflow.get_task(flask.request.form['task_name'])
task = WorkflowTaskEntryWebTask(
election_id=election._id,
workflow_task=workflow_task._name,
run_at=DateTimeField().deserialise(flask.request.form['datetime']),
status=TaskStatus.READY,
run_strategy=EosObject.lookup(app.config['TASK_RUN_STRATEGY'])()
)
task.save()
return flask.redirect(flask.url_for('election_admin_summary', election_id=election._id))
@app.route('/election/<election_id>/stage_ballot', methods=['POST'])
@using_election
def election_api_stage_ballot(election):
flask.session['staged_ballot'] = json.loads(flask.request.data)
return 'OK'
@app.route('/election/<election_id>/cast_ballot', methods=['POST'])
@using_election
def election_api_cast_vote(election):
if election.workflow.get_task('eos.base.workflow.TaskOpenVoting').status < WorkflowTaskStatus.EXITED or election.workflow.get_task('eos.base.workflow.TaskCloseVoting').status > WorkflowTaskStatus.READY:
# Voting is not yet open or has closed
return flask.Response('Voting is not yet open or has closed', 409)
data = flask.session['staged_ballot']
if 'user' not in flask.session:
# User is not authenticated
return flask.Response('Not authenticated', 403)
voter = None
for election_voter in election.voters:
if election_voter.user.matched_by(flask.session['user']):
voter = election_voter
break
if voter is None:
# Invalid user
return flask.Response('Invalid credentials', 403)
# Cast the vote
ballot = EosObject.deserialise_and_unwrap(data['ballot'])
vote = Vote(voter_id=voter._id, ballot=ballot, cast_at=DateTimeField.now())
# Store data
if app.config['CAST_FINGERPRINT']:
vote.cast_fingerprint = data['fingerprint']
if app.config['CAST_IP']:
if os.path.exists('/app/.heroku'):
vote.cast_ip = flask.request.headers['X-Forwarded-For'].split(',')[-1]
else:
vote.cast_ip = flask.request.remote_addr
vote.save()
del flask.session['staged_ballot']
return flask.Response(json.dumps({
'voter': EosObject.serialise_and_wrap(voter, None, SerialiseOptions(should_protect=True)),
'vote': EosObject.serialise_and_wrap(vote, None, SerialiseOptions(should_protect=True))
}), mimetype='application/json')
@app.route('/election/<election_id>/export/question/<int:q_num>/<format>')
@using_election
def election_api_export_question(election, q_num, format):
import eos.base.util.blt
resp = flask.send_file(io.BytesIO('\n'.join(eos.base.util.blt.writeBLT(election, q_num, 2)).encode('utf-8')), mimetype='text/plain; charset=utf-8', attachment_filename='{}.blt'.format(q_num), as_attachment=True)
resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
return resp
@app.route('/task/<task_id>')
@election_admin
def task_view(task_id):
task = Task.get_by_id(task_id)
return flask.render_template('task/view.html', task=task)
@app.route('/auditor')
def auditor():
return flask.render_template('election/auditor.html')
@app.route('/debug')
def debug():
assert False
@app.route('/auth/login')
def login():
flask.session['login_next'] = flask.request.referrer
return flask.render_template('auth/login.html')
@app.route('/auth/stage_next', methods=['POST'])
def auth_stage_next():
flask.session['login_next'] = flask.request.data
return 'OK'
@app.route('/auth/logout')
def logout():
flask.session['user'] = None
if flask.request.referrer:
return flask.redirect(flask.request.referrer)
else:
return flask.redirect('/')
@app.route('/auth/login_callback')
def login_callback():
if 'login_next' in flask.session and flask.session['login_next']:
return flask.redirect(flask.session['login_next'])
else:
return flask.redirect('/')
@app.route('/auth/login_complete')
def login_complete():
return flask.render_template('auth/login_complete.html')
@app.route('/auth/login_cancelled')
def login_cancelled():
return flask.render_template('auth/login_cancelled.html')
@app.route('/auth/email/login')
def email_login():
return flask.render_template('auth/email/login.html')
@app.route('/auth/email/authenticate', methods=['POST'])
def email_authenticate():
user = None
for u in app.config['ADMINS']:
if isinstance(u, EmailUser):
if u.email.lower() == flask.request.form['email'].lower():
if u.password == flask.request.form['password']:
user = u
break
if user is None:
for election in Election.get_all():
for voter in election.voters:
if isinstance(voter.user, EmailUser):
if voter.user.email.lower() == flask.request.form['email'].lower():
if voter.user.password == flask.request.form['password']:
user = voter.user
break
if user is None:
return flask.render_template('auth/email/login.html', error='The email or password you entered was invalid. Please check your details and try again. If the issue persists, contact the election administrator.')
flask.session['user'] = user
return flask.redirect(flask.url_for('login_complete'))
# === Apps ===
for app_name in app.config['APPS']:
app_main = importlib.import_module(app_name + '.main')
app.register_blueprint(app_main.blueprint)
# === Model-Views ===
model_view_map = {}
# TODO: Make more modular
from . import modelview
model_view_map.update(modelview.model_view_map)