Implement verification of election results

This commit is contained in:
RunasSudo 2017-11-29 18:29:42 +11:00
parent 70fea02100
commit 30cb0d674e
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
9 changed files with 130 additions and 9 deletions

View File

@ -27,7 +27,7 @@ class NullEncryptedAnswer(EncryptedAnswer):
answer = EmbeddedObjectField()
def decrypt(self):
return self.answer
return None, self.answer
class Ballot(EmbeddedObject):
#_id = UUIDField()
@ -123,6 +123,9 @@ class PreferentialAnswer(Answer):
choices = ListField(IntField())
class RawResult(Result):
_ver = StringField(default='0.2')
plaintexts = ListField(EmbeddedObjectListField())
answers = EmbeddedObjectListField()
def count(self):
@ -143,3 +146,16 @@ class Election(TopLevelObject):
voters = EmbeddedObjectListField(is_hashed=False)
questions = EmbeddedObjectListField()
results = EmbeddedObjectListField(is_hashed=False)
def verify(self):
#__pragma__('skip')
from eos.core.hashing import SHA256
#__pragma__('noskip')
election_hash = SHA256().update_obj(self).hash_as_b64()
for voter in self.voters:
for vote in voter.votes:
if vote.ballot.election_id != self._id:
raise Exception('Invalid election ID on ballot')
if vote.ballot.election_hash != election_hash:
raise Exception('Invalid election hash on ballot')

View File

@ -142,7 +142,8 @@ class TaskDecryptVotes(WorkflowTask):
vote = voter.votes[-1]
ballot = vote.ballot
for i in range(len(ballot.encrypted_answers)):
answer = ballot.encrypted_answers[i].decrypt()
plaintexts, answer = ballot.encrypted_answers[i].decrypt()
election.results[i].plaintexts.append(plaintexts)
election.results[i].answers.append(answer)
self.exit()

View File

@ -111,9 +111,13 @@ class EmbeddedObjectListField(Field):
def serialise(self, value, for_hash=False, should_protect=False):
# TNYI: Doesn't know how to deal with iterators like EosList
if value is None:
return None
return [EosObject.serialise_and_wrap(x, self.object_type, for_hash, should_protect) for x in (value.impl if isinstance(value, EosList) else value)]
def deserialise(self, value):
if value is None:
return None
return EosList([EosObject.deserialise_and_unwrap(x, self.object_type) for x in value])
if is_python:

View File

@ -137,8 +137,8 @@ class BitStream(EosObject):
@classmethod
def unmap(cls, value, func, block_size):
bs = cls()
for x in value:
bs.write(func(x), block_size)
for i in range(len(value)):
bs.write(func(value[i]), block_size)
bs.seek(0)
return bs

View File

@ -132,6 +132,24 @@ class EGPrivateKey(EmbeddedObject):
# Undo the encryption mapping
return self.public_key.m0_to_message(pt)
def decrypt_and_prove(self, ciphertext):
result = EGProvedPlaintext()
result.ciphertext = ciphertext
result.message = self.decrypt(ciphertext)
# Adida 2008
w = BigInt.crypto_random(ZERO, self.public_key.group.q - ONE) # random element in Z_q
result.commitmentA = pow(self.public_key.group.g, w, self.public_key.group.p)
result.commitmentB = pow(ciphertext.gamma, w, self.public_key.group.p)
result.challenge = SHA256().update_obj(ciphertext).update_obj(result.commitmentA).update_obj(result.commitmentB).hash_as_bigint()
result.response = w + self.x * result.challenge
return result
class EGCiphertext(EmbeddedObject):
public_key = EmbeddedObjectField(EGPublicKey)
gamma = EmbeddedObjectField(BigInt) # G^k
@ -156,6 +174,31 @@ class EGCiphertext(EmbeddedObject):
ct = self.public_key._encrypt(self.m0, self.randomness)
return ct.gamma == self.gamma and ct.delta == self.delta
class EGProvedPlaintext(EmbeddedObject):
message = EmbeddedObjectField(BigInt)
ciphertext = EmbeddedObjectField()
commitmentA = EmbeddedObjectField(BigInt)
commitmentB = EmbeddedObjectField(BigInt)
challenge = EmbeddedObjectField(BigInt)
response = EmbeddedObjectField(BigInt)
def is_proof_valid(self):
gt = pow(self.ciphertext.public_key.group.g, self.response, self.ciphertext.public_key.group.p)
Ayc = (self.commitmentA * pow(self.ciphertext.public_key.X, self.challenge, self.ciphertext.public_key.group.p)) % self.ciphertext.public_key.group.p
if gt != Ayc:
return False
at = pow(self.ciphertext.gamma, self.response, self.ciphertext.public_key.group.p)
m0 = self.ciphertext.public_key.message_to_m0(self.message)
m_inv = pow(m0, self.ciphertext.public_key.group.p - TWO, self.ciphertext.public_key.group.p)
Bbmc = (self.commitmentB * pow(self.ciphertext.delta * m_inv, self.challenge, self.ciphertext.public_key.group.p)) % self.ciphertext.public_key.group.p
if at != Bbmc:
return False
return True
# Signed ElGamal per Schnorr & Jakobssen
class SEGPublicKey(EGPublicKey):
def _encrypt(self, message, randomness=None):

View File

@ -40,11 +40,13 @@ class BlockEncryptedAnswer(EncryptedAnswer):
if sk is None:
sk = self.recurse_parents(PSRElection).sk
bs = BitStream.unmap(self.blocks, sk.decrypt, sk.public_key.nbits())
plaintexts = EosList([sk.decrypt_and_prove(self.blocks[i]) for i in range(len(self.blocks))])
bs = BitStream.unmap(plaintexts, lambda plaintext: plaintext.message, sk.public_key.nbits())
m = bs.read_string()
obj = EosObject.deserialise_and_unwrap(EosObject.from_json(m))
return obj
return plaintexts, obj
def deaudit(self):
blocks_deaudit = EosList()
@ -227,3 +229,39 @@ class PSRElection(Election):
public_key = EmbeddedObjectField(SEGPublicKey)
mixing_trustees = EmbeddedObjectListField()
def verify(self):
# Verify ballots
super().verify()
# Verify mixes
for i in range(len(self.questions)):
for j in range(len(self.mixing_trustees)):
self.mixing_trustees[j].verify(i)
# Verify decryption proofs
for q_num in range(len(self.questions)):
raw_result = self.results[q_num]
for answer_num in range(len(raw_result.plaintexts)):
# Input and output blocks:
plaintexts = raw_result.plaintexts[answer_num]
ciphertexts = self.mixing_trustees[-1].mixed_questions[q_num][answer_num].blocks
# Verify ciphertexts
if len(plaintexts) != len(ciphertexts):
raise Exception('Different number of plaintexts and ciphertexts')
for i in range(len(ciphertexts)):
if ciphertexts[i] != plaintexts[i].ciphertext:
raise Exception('Ciphertext does not match mixnet output')
# Verify decryption
for plaintext in plaintexts:
if not plaintext.is_proof_valid():
raise Exception('Proof of decryption is not valid')
# Verify block combination
bs = BitStream.unmap(plaintexts, lambda plaintext: plaintext.message, self.public_key.nbits())
m = bs.read_string()
answer = EosObject.deserialise_and_unwrap(EosObject.from_json(m))
if answer != raw_result.answers[answer_num]:
raise Exception('Result does not match claimed decryption')

View File

@ -69,9 +69,13 @@ class EGTestCase(EosTestCase):
pt = DEFAULT_GROUP.random_Zq_element()
sk = EGPrivateKey.generate()
ct = sk.public_key.encrypt(pt)
m = sk.decrypt(ct)
proved_pt = sk.decrypt_and_prove(ct)
m = proved_pt.message
self.assertEqualJSON(pt, m)
self.assertTrue(proved_pt.is_proof_valid())
class SEGTestCase(EosTestCase):
def test_eg(self):
pt = DEFAULT_GROUP.random_Zq_element()
@ -148,7 +152,7 @@ class BlockEGTestCase(EosTestCase):
obj = self.Person(name='John Smith')
ct = BlockEncryptedAnswer.encrypt(self.sk.public_key, obj)
m = ct.decrypt(self.sk)
_, m = ct.decrypt(self.sk)
self.assertEqualJSON(obj, m)
@ -310,6 +314,9 @@ class ElectionTestCase(EosTestCase):
# Check the hash hasn't changed during that
self.assertEqual(SHA256().update_obj(election).hash_as_b64(), election_hash)
# Check the election verifies
election.verify()
class PVSSTestCase(EosTestCase):
@py_only
def test_basic(self):

View File

@ -68,7 +68,8 @@ class TaskDecryptVotes(eos.base.workflow.TaskDecryptVotes):
for i in range(len(election.mixing_trustees[-1].mixed_questions)):
for encrypted_answer in election.mixing_trustees[-1].mixed_questions[i]:
answer = encrypted_answer.decrypt()
plaintexts, answer = encrypted_answer.decrypt()
election.results[i].plaintexts.append(plaintexts)
election.results[i].answers.append(answer)
self.exit()

View File

@ -166,6 +166,17 @@ def count_election(electionid):
election.save()
@app.cli.command('verify_election')
@click.option('--electionid', default=None)
def verify_election(electionid):
if electionid is None:
election = Election.get_all()[0]
else:
election = Election.get_by_id(electionid)
election.verify()
print('The election has passed validation')
@app.context_processor
def inject_globals():
return {'eos': eos, 'eosweb': eosweb, 'SHA256': eos.core.hashing.SHA256}