Implement enums

This commit is contained in:
Yingtong Li 2018-01-11 19:06:21 +08:00
parent df0025624d
commit 65aa81844b
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
12 changed files with 124 additions and 63 deletions

View File

@ -1,5 +1,5 @@
# Eos - Verifiable elections
# Copyright © 2017 RunasSudo (Yingtong Li)
# Copyright © 2017-18 RunasSudo (Yingtong Li)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -26,13 +26,13 @@ class ElectionTestCase(EosTestCase):
cls.db_connect_and_reset()
def do_task_assert(self, election, task, next_task):
self.assertEqual(election.workflow.get_task(task).status, WorkflowTask.Status.READY)
self.assertEqual(election.workflow.get_task(task).status, WorkflowTaskStatus.READY)
if next_task is not None:
self.assertEqual(election.workflow.get_task(next_task).status, WorkflowTask.Status.NOT_READY)
self.assertEqual(election.workflow.get_task(next_task).status, WorkflowTaskStatus.NOT_READY)
election.workflow.get_task(task).enter()
self.assertEqual(election.workflow.get_task(task).status, WorkflowTask.Status.EXITED)
self.assertEqual(election.workflow.get_task(task).status, WorkflowTaskStatus.EXITED)
if next_task is not None:
self.assertEqual(election.workflow.get_task(next_task).status, WorkflowTask.Status.READY)
self.assertEqual(election.workflow.get_task(next_task).status, WorkflowTaskStatus.READY)
@py_only
def test_run_election(self):
@ -44,7 +44,7 @@ class ElectionTestCase(EosTestCase):
self.assertEqual(election.workflow._instance, (election, 'workflow'))
# Check workflow behaviour
self.assertEqual(election.workflow.get_task('eos.base.workflow.TaskConfigureElection').status, WorkflowTask.Status.READY)
self.assertEqual(election.workflow.get_task('eos.base.workflow.TaskConfigureElection').status, WorkflowTaskStatus.READY)
self.assertEqual(election.workflow.get_task('does.not.exist'), None)
# Set election details

View File

@ -1,5 +1,5 @@
# Eos - Verifiable elections
# Copyright © 2017 RunasSudo (Yingtong Li)
# Copyright © 2017-18 RunasSudo (Yingtong Li)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -17,19 +17,19 @@
from eos.core.objects import *
from eos.core.tasks import *
class WorkflowTaskStatus(EosEnum):
UNKNOWN = 0
NOT_READY = 10
READY = 20
ENTERED = 30
#COMPLETE = 40
EXITED = 50
class WorkflowTask(EmbeddedObject):
class Status:
UNKNOWN = 0
NOT_READY = 10
READY = 20
ENTERED = 30
#COMPLETE = 40
EXITED = 50
depends_on = []
provides = []
status = IntField(default=0, is_hashed=False)
status = EnumField(WorkflowTaskStatus, is_hashed=False)
exited_at = DateTimeField(is_hashed=False)
def __init__(self, *args, **kwargs):
@ -40,8 +40,8 @@ class WorkflowTask(EmbeddedObject):
self.workflow = self.recurse_parents(Workflow)
if self.status == WorkflowTask.Status.UNKNOWN:
self.status = WorkflowTask.Status.READY if self.are_dependencies_met() else WorkflowTask.Status.NOT_READY
if self.status == WorkflowTaskStatus.UNKNOWN:
self.status = WorkflowTaskStatus.READY if self.are_dependencies_met() else WorkflowTaskStatus.NOT_READY
self.listeners = {
'enter': [],
@ -51,7 +51,7 @@ class WorkflowTask(EmbeddedObject):
# Helpers
def on_dependency_exit():
self.status = WorkflowTask.Status.READY if self.are_dependencies_met() else WorkflowTask.Status.NOT_READY
self.status = WorkflowTaskStatus.READY if self.are_dependencies_met() else WorkflowTaskStatus.NOT_READY
for depends_on_desc in self.depends_on:
for depends_on_task in self.workflow.get_tasks(depends_on_desc):
depends_on_task.listeners['exit'].append(on_dependency_exit)
@ -59,7 +59,7 @@ class WorkflowTask(EmbeddedObject):
def are_dependencies_met(self):
for depends_on_desc in self.depends_on:
for depends_on_task in self.workflow.get_tasks(depends_on_desc):
if depends_on_task.status is not WorkflowTask.Status.EXITED:
if depends_on_task.status is not WorkflowTaskStatus.EXITED:
return False
return True
@ -71,10 +71,10 @@ class WorkflowTask(EmbeddedObject):
self.exit()
def enter(self):
if self.status is not WorkflowTask.Status.READY:
if self.status is not WorkflowTaskStatus.READY:
raise Exception('Attempted to enter a task when not ready')
self.status = WorkflowTask.Status.ENTERED
self.status = WorkflowTaskStatus.ENTERED
self.fire_event('enter')
self.on_enter()
@ -86,10 +86,10 @@ class WorkflowTask(EmbeddedObject):
self.exited_at = DateTimeField.now()
def exit(self):
if self.status is not WorkflowTask.Status.ENTERED:
if self.status is not WorkflowTaskStatus.ENTERED:
raise Exception('Attempted to exit a task when not entered')
self.status = WorkflowTask.Status.EXITED
self.status = WorkflowTaskStatus.EXITED
self.fire_event('exit')
self.on_exit()
@ -146,7 +146,7 @@ class TaskConfigureElection(WorkflowTask):
label = 'Freeze the election'
#def on_enter(self):
# self.status = WorkflowTask.Status.COMPLETE
# self.status = WorkflowTaskStatus.COMPLETE
class TaskOpenVoting(WorkflowTask):
label = 'Open voting'

View File

@ -503,3 +503,61 @@ class TopLevelObject(DocumentObject, metaclass=TopLevelObjectType):
class EmbeddedObject(DocumentObject):
pass
# Enums
# =====
class EosEnumType(EosObjectType):
def __new__(meta, name, bases, attrs):
cls = EosObjectType.__new__(meta, name, bases, attrs)
cls._values = {}
for attr in list(dir(cls)):
val = getattr(cls, attr);
if isinstance(val, int):
instance = cls(attr, val)
setattr(cls, attr, instance)
cls._values[val] = instance
return cls
class EosEnum(EosObject, metaclass=EosEnumType):
def __init__(self, name, value):
super().__init__()
self.name = name
self.value = value
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.value == other.value
def __ne__(self, other):
if not isinstance(other, self.__class__):
return True
return self.value != other.value
def __gt__(self, other):
if not isinstance(other, self.__class__):
raise TypeError
return self.value > other.value
def __lt__(self, other):
if not isinstance(other, self.__class__):
raise TypeError
return self.value < other.value
def __ge__(self, other):
if not isinstance(other, self.__class__):
raise TypeError
return self.value >= other.value
def __le__(self, other):
if not isinstance(other, self.__class__):
raise TypeError
return self.value <= other.value
def serialise(self, options=SerialiseOptions.DEFAULT):
return self.value
@classmethod
def deserialise(cls, value):
return cls._values[value]
EnumField = EmbeddedObjectField

View File

@ -16,17 +16,20 @@
from eos.core.objects import *
class Task(TopLevelObject):
class Status:
UNKNOWN = 0
READY = 20
PROCESSING = 30
COMPLETE = 50
FAILED = -10
TIMEOUT = -20
class TaskStatus(EosEnum):
UNKNOWN = 0
READY = 20
PROCESSING = 30
COMPLETE = 50
FAILED = -10
TIMEOUT = -20
def is_error(self):
return self.value < 0
class Task(TopLevelObject):
label = 'Unknown task'
_id = UUIDField()
@ -37,7 +40,7 @@ class Task(TopLevelObject):
started_at = DateTimeField()
completed_at = DateTimeField()
status = IntField(default=0)
status = EnumField(TaskStatus)
messages = ListField(StringField())
def run(self):
@ -74,7 +77,7 @@ class TaskScheduler:
tasks = Task.get_all()
for task in tasks:
if task.status == Task.Status.READY:
if task.status == TaskStatus.READY:
pending_tasks.append(task)
# Sort them to ensure we iterate over them in the correct order
@ -88,7 +91,7 @@ class TaskScheduler:
tasks = Task.get_all()
for task in tasks:
if task.status == Task.Status.PROCESSING:
if task.status == TaskStatus.PROCESSING:
active_tasks.append(task)
return active_tasks
@ -99,7 +102,7 @@ class TaskScheduler:
tasks = Task.get_all()
for task in tasks:
if task.status == Task.Status.COMPLETE or task.status < 0:
if task.status == TaskStatus.COMPLETE or task.status.is_error():
completed_tasks.append(task)
if limit:

View File

@ -19,19 +19,19 @@ from eos.core.objects import *
class DirectRunStrategy(RunStrategy):
def run(self, task):
task.status = Task.Status.PROCESSING
task.status = TaskStatus.PROCESSING
task.started_at = DateTimeField.now()
task.save()
try:
task._run()
task.status = Task.Status.COMPLETE
task.status = TaskStatus.COMPLETE
task.completed_at = DateTimeField.now()
task.save()
task.complete()
except Exception as e:
task.status = Task.Status.FAILED
task.status = TaskStatus.FAILED
task.completed_at = DateTimeField.now()
if is_python:
#__pragma__('skip')

View File

@ -1,5 +1,5 @@
# Eos - Verifiable elections
# Copyright © 2017 RunasSudo (Yingtong Li)
# Copyright © 2017-18 RunasSudo (Yingtong Li)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -144,7 +144,7 @@ class TaskTestCase(EosTestCase):
task.save()
task.run()
self.assertEqual(task.status, Task.Status.COMPLETE)
self.assertEqual(task.status, TaskStatus.COMPLETE)
self.assertEqual(len(task.messages), 1)
self.assertEqual(task.messages[0], 'Hello World')
self.assertEqual(task.result, 'Success')
@ -158,6 +158,6 @@ class TaskTestCase(EosTestCase):
task.save()
task.run()
self.assertEqual(task.status, Task.Status.FAILED)
self.assertEqual(task.status, TaskStatus.FAILED)
self.assertEqual(len(task.messages), 1)
self.assertTrue('Test exception' in task.messages[0])

View File

@ -1,5 +1,5 @@
# Eos - Verifiable elections
# Copyright © 2017 RunasSudo (Yingtong Li)
# Copyright © 2017-18 RunasSudo (Yingtong Li)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -224,13 +224,13 @@ class ElectionTestCase(EosTestCase):
cls.db_connect_and_reset()
def do_task_assert(self, election, task, next_task):
self.assertEqual(election.workflow.get_task(task).status, WorkflowTask.Status.READY)
self.assertEqual(election.workflow.get_task(task).status, WorkflowTaskStatus.READY)
if next_task is not None:
self.assertEqual(election.workflow.get_task(next_task).status, WorkflowTask.Status.NOT_READY)
self.assertEqual(election.workflow.get_task(next_task).status, WorkflowTaskStatus.NOT_READY)
election.workflow.get_task(task).enter()
self.assertEqual(election.workflow.get_task(task).status, WorkflowTask.Status.EXITED)
self.assertEqual(election.workflow.get_task(task).status, WorkflowTaskStatus.EXITED)
if next_task is not None:
self.assertEqual(election.workflow.get_task(next_task).status, WorkflowTask.Status.READY)
self.assertEqual(election.workflow.get_task(next_task).status, WorkflowTaskStatus.READY)
@py_only
def test_run_election(self):

View File

@ -172,7 +172,7 @@ def tally_stv_election(electionid, qnum, randfile):
q_num=qnum,
random=dat,
num_seats=7,
status=Task.Status.READY,
status=TaskStatus.READY,
run_strategy=EosObject.lookup(app.config['TASK_RUN_STRATEGY'])()
)
task.save()
@ -260,13 +260,13 @@ def election_admin_summary(election):
@election_admin
def election_admin_enter_task(election):
workflow_task = election.workflow.get_task(flask.request.args['task_name'])
if workflow_task.status != WorkflowTask.Status.READY:
if workflow_task.status != WorkflowTaskStatus.READY:
return flask.Response('Task is not yet ready or has already exited', 409)
task = WorkflowTaskEntryWebTask(
election_id=election._id,
workflow_task=workflow_task._name,
status=Task.Status.READY,
status=TaskStatus.READY,
run_strategy=EosObject.lookup(app.config['TASK_RUN_STRATEGY'])()
)
task.run()
@ -283,7 +283,7 @@ def election_admin_schedule_task(election):
election_id=election._id,
workflow_task=workflow_task._name,
run_at=DateTimeField().deserialise(flask.request.form['datetime']),
status=Task.Status.READY,
status=TaskStatus.READY,
run_strategy=EosObject.lookup(app.config['TASK_RUN_STRATEGY'])()
)
task.save()
@ -293,7 +293,7 @@ def election_admin_schedule_task(election):
@app.route('/election/<election_id>/cast_ballot', methods=['POST'])
@using_election
def election_api_cast_vote(election):
if election.workflow.get_task('eos.base.workflow.TaskOpenVoting').status < WorkflowTask.Status.EXITED or election.workflow.get_task('eos.base.workflow.TaskCloseVoting').status > WorkflowTask.Status.READY:
if election.workflow.get_task('eos.base.workflow.TaskOpenVoting').status < WorkflowTaskStatus.EXITED or election.workflow.get_task('eos.base.workflow.TaskCloseVoting').status > WorkflowTaskStatus.READY:
# Voting is not yet open or has closed
return flask.Response('Voting is not yet open or has closed', 409)

View File

@ -2,7 +2,7 @@
{#
Eos - Verifiable elections
Copyright © 2017 RunasSudo (Yingtong Li)
Copyright © 2017-18 RunasSudo (Yingtong Li)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
@ -29,7 +29,7 @@
<a href="https://github.com/RunasSudo/Eos" class="item">Source Code</a>
{% if session.user %}
{% if session.user.is_admin() %}
{% include 'active_tasks_menu.html' %}
{% include 'task/active_tasks_menu.html' %}
{% endif %}
<div class="ui simple dropdown item right">
<i class="{% if session.user.is_admin() %}legal{% else %}user circle{% endif %} icon"></i> {{ session.user.name }} <i class="dropdown icon"></i>

View File

@ -2,7 +2,7 @@
{#
Eos - Verifiable elections
Copyright © 2017 RunasSudo (Yingtong Li)
Copyright © 2017-18 RunasSudo (Yingtong Li)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
@ -23,7 +23,7 @@
<ul>
{% for task in election.workflow.tasks %}
{% if task.status == eos.base.workflow.WorkflowTask.Status.READY %}
{% if task.status == eos.base.workflow.WorkflowTaskStatus.READY %}
<li><a href="{{ url_for('election_admin_enter_task', election_id=election._id, task_name=task._name) }}" onclick="return window.confirm('Are you sure you want to execute the task \'{{ task.label }}\'? This action is irreversible.');">{{ task.label }}</a></li>
{% endif %}
{% endfor %}

View File

@ -2,7 +2,7 @@
{#
Eos - Verifiable elections
Copyright © 2017 RunasSudo (Yingtong Li)
Copyright © 2017-18 RunasSudo (Yingtong Li)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
@ -18,7 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% set Status = eos.base.workflow.WorkflowTask.Status %}
{% set Status = eos.base.workflow.WorkflowTaskStatus %}
{% block electioncontent %}
{% if election.workflow.get_task('eos.base.workflow.TaskConfigureElection').status == Status.EXITED %}

View File

@ -1,6 +1,6 @@
{#
Eos - Verifiable elections
Copyright © 2017 RunasSudo (Yingtong Li)
Copyright © 2017-18 RunasSudo (Yingtong Li)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
@ -43,7 +43,7 @@
<div class="header">Recently completed tasks</div>
{% for task in eos.core.tasks.TaskScheduler.completed_tasks(3) %}
<div class="item">
{% if task.status < 0 %}<i class="warning sign icon"></i> {% endif %}{{ task.label }}
{% if task.status.is_error() %}<i class="warning sign icon"></i> {% endif %}{{ task.label }}
<br><small><i class="wait icon"></i> completed {{ task.completed_at|pretty_date }}</small>
</div>
{% endfor %}