This repository has been archived on 2021-05-25. You can view files and clone it, but cannot push or open issues or pull requests.
pyRCV2/pyRCV2/constraints.py

286 lines
8.6 KiB
Python

# pyRCV2: Preferential vote counting
# Copyright © 2020–2021 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
__pragma__ = lambda x: None
from pyRCV2.exceptions import BaseRCVException
from pyRCV2.model import CandidateState
from pyRCV2.safedict import SafeDict
class ConstraintException(BaseRCVException):
pass
class ConstraintMatrix:
"""
Represents a constraint matrix/table, as described by Otten
"""
def __init__(self, dimensions):
self.dimensions = dimensions
num_cells = 1
for d in dimensions:
num_cells *= (d + 1)
# FIXME: This is not the most efficient packing, as we require margins only along one dimension at a time
self.matrix = [ConstraintMatrixCell() for _ in range(num_cells)]
# Pass -1 to get margins
def index_of(self, address):
index = 0
for i, d in enumerate(self.dimensions):
index *= (d + 1)
index += (address[i] + 1)
return index
def get(self, address):
return self.matrix[self.index_of(address)]
class ConstraintMatrixCell:
def __init__(self):
self.elected = 0
self.min = 0
self.max = 0
self.cands = 0
def __repr__(self):
return '<ConstraintMatrixCell: E={}, Min={}, Max={}, C={}>'.format(self.elected, self.min, self.max, self.cands)
def ndrange(dimensions):
# n-dimensional range function
if len(dimensions) == 0:
yield []
return
if len(dimensions) == 1:
yield from ([n] for n in range(dimensions[0]))
return
__pragma__('opov')
yield from ([n] + j for n in range(dimensions[0]) for j in ndrange(dimensions[1:]))
__pragma__('noopov')
def init_matrix(counter):
"""Initialise matrix with initial constraints"""
cm = ConstraintMatrix([len(category) for category_name, category in counter.election.constraints.items()])
counter._constraint_matrix = cm
update_matrix(counter)
# Add min/max to margins
for i, x in enumerate(counter.election.constraints.items()):
category_name, category = x
for j, y in enumerate(category.items()):
group_name, constraint = y
address = [-1 for _ in range(len(cm.dimensions))]
address[i] = j
cell = cm.get(address)
cell.min = constraint.min
cell.max = constraint.max
def candidate_to_address(constraints, candidate):
address = []
for category_name, category in constraints.items():
for i, x in enumerate(category.items()):
group_name, constraint = x
if candidate in constraint.candidates:
address.append(i)
return address
def update_matrix(counter):
"""Update matrix with elected/hopeful numbers"""
cm = counter._constraint_matrix
# Reset elected/cands
for address in ndrange(cm.dimensions):
cell = cm.get(address)
cell.elected = 0
cell.cands = 0
# Update grand total cell as well
cell_gt = cm.get([-1 for _ in range(len(cm.dimensions))])
cell_gt.elected = 0
cell_gt.cands = 0
# Count elected/cands
for candidate, count_card in counter.candidates.items():
if count_card.state in (CandidateState.HOPEFUL, CandidateState.GUARDED, CandidateState.PROVISIONALLY_ELECTED, CandidateState.DISTRIBUTING_SURPLUS, CandidateState.ELECTED):
address = candidate_to_address(counter.election.constraints, candidate)
cm.get(address).cands += 1
cell_gt.cands += 1
if len(cm.dimensions) > 1:
# If only 1 dimension, this is equivalent to cell_gt
for dimension in range(len(cm.dimensions)):
addr2 = [x for x in address]
addr2[dimension] = -1
cm.get(addr2).cands += 1
if count_card.state in (CandidateState.PROVISIONALLY_ELECTED, CandidateState.DISTRIBUTING_SURPLUS, CandidateState.ELECTED):
address = candidate_to_address(counter.election.constraints, candidate)
cm.get(address).elected += 1
cell_gt.elected += 1
if len(cm.dimensions) > 1:
# If only 1 dimension, this is equivalent to cell_gt
for dimension in range(len(cm.dimensions)):
addr2 = [x for x in address]
addr2[dimension] = -1
cm.get(addr2).elected += 1
cell_gt.max = cell_gt.cands
def step_matrix(counter):
"""
Adjust the matrix one step towards stability
Return True if stable, or False if an adjustment was required
"""
cm = counter._constraint_matrix
# Rule 1
for cell in cm.matrix:
if cell.elected > cell.min:
cell.min = cell.elected
return False
if cell.cands < cell.max:
cell.max = cell.cands
return False
if cell.min > cell.max:
raise ConstraintException('No result conformant with the constraints is possible')
# Rule 2/3
for address in ndrange(cm.dimensions):
cell = cm.get(address)
for dimension1 in range(len(cm.dimensions)): # Which margin total
for dimension2 in range(len(cm.dimensions)): # Which axis to check along
if dimension1 == dimension2:
continue
tot_min_others = 0
tot_max_others = 0
addr2 = [x for x in address]
for i in range(cm.dimensions[dimension2]):
if i == address[dimension2]:
continue
addr2[dimension2] = i
cell2 = cm.get(addr2)
tot_min_others += cell2.min
tot_max_others += cell2.max
#addr2[dimension] = -1
addr2 = [-1 for _ in range(len(cm.dimensions))]
addr2[dimension1] = address[dimension1]
cell_dimension = cm.get(addr2)
min_dimension = cell_dimension.min
max_dimension = cell_dimension.max
# This many must be elected from this cell at least
this_cell_min = min_dimension - tot_max_others
this_cell_max = max_dimension - tot_min_others
# Rule 2
if this_cell_min > cell.min:
cell.min = this_cell_min
return False
# Rule 3
if this_cell_max < cell.max:
cell.max = this_cell_max
return False
# Rule 4/5
for dimension in range(len(cm.dimensions)):
for n in range(cm.dimensions[dimension]):
tot_min = 0
tot_max = 0
for addr1 in ndrange(cm.dimensions[:dimension]):
for addr2 in ndrange(cm.dimensions[dimension+1:]):
__pragma__('opov')
address = addr1 + [n] + addr2
__pragma__('noopov')
tot_min += cm.get(address).min
tot_max += cm.get(address).max
address = [-1 for _ in range(len(cm.dimensions))]
address[dimension] = n
cell = cm.get(address)
# Rule 4
if cell.min < tot_min:
cell.min = tot_min
# Rule 5
if cell.max > tot_max:
cell.max = tot_max
return True
def stabilise_matrix(counter):
"""Run step_matrix until entering a stable state"""
while True:
result = step_matrix(counter)
if result == True:
return
def guard_or_doom(counter):
"""Guard or doom candidates as required"""
cm = counter._constraint_matrix
logs = []
for address in ndrange(cm.dimensions):
cell = cm.get(address)
if cell.elected == cell.max:
# Doom remaining candidates in this group
cands_doomed = SafeDict([(c, cc) for c, cc in counter.candidates.items() if cc.state == CandidateState.HOPEFUL])
for i, x in enumerate(counter.election.constraints.items()):
category_name, category = x
for candidate, count_card in list(cands_doomed.items()):
if candidate not in category[list(category.keys())[address[i]]].candidates:
#del cands_doomed[candidate] # Transcrypt NYI
cands_doomed.__delitem__(candidate)
for candidate, count_card in cands_doomed.items():
count_card.state = CandidateState.DOOMED
if len(cands_doomed) > 0:
logs.append(counter.pretty_join([c.name for c, cc in cands_doomed.items()]) + ' must be doomed to comply with constraints.')
if cell.cands == cell.min:
# Guard remaining candidates in this group
cands_guarded = SafeDict([(c, cc) for c, cc in counter.candidates.items() if cc.state == CandidateState.HOPEFUL])
for i, x in enumerate(counter.election.constraints.items()):
category_name, category = x
for candidate, count_card in list(cands_guarded.items()):
if candidate not in category[list(category.keys())[address[i]]].candidates:
#del cands_guarded[candidate] # Transcrypt NYI
cands_guarded.__delitem__(candidate)
for candidate, count_card in cands_guarded.items():
count_card.state = CandidateState.GUARDED
if len(cands_guarded) > 0:
logs.append(counter.pretty_join([c.name for c, cc in cands_guarded.items()]) + ' must be guarded to comply with constraints.')
return logs