2020-10-17 22:20:13 +11:00
|
|
|
# pyRCV2: Preferential vote counting
|
|
|
|
# Copyright © 2020 Lee Yingtong Li (RunasSudo)
|
|
|
|
#
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU Affero General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
__pragma__ = lambda x: None
|
|
|
|
|
|
|
|
from pyRCV2.model import CandidateState, CountCard, CountCompleted, CountStepResult
|
|
|
|
from pyRCV2.numbers import Num
|
|
|
|
from pyRCV2.safedict import SafeDict
|
|
|
|
|
|
|
|
class STVCCounter:
|
|
|
|
"""Count an STV election using pyRCV STV-C rules"""
|
|
|
|
|
|
|
|
def __init__(self, election):
|
|
|
|
self.election = election
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
self.candidates = SafeDict([(c, CountCard()) for c in self.election.candidates])
|
|
|
|
self.exhausted = CountCard()
|
|
|
|
|
|
|
|
# Withdraw candidates
|
|
|
|
for candidate in self.election.withdrawn:
|
|
|
|
__pragma__('opov')
|
|
|
|
self.candidates[candidate].state = CandidateState.WITHDRAWN
|
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
# Distribute first preferences
|
|
|
|
for ballot in self.election.ballots:
|
|
|
|
__pragma__('opov')
|
|
|
|
candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None)
|
|
|
|
|
|
|
|
if candidate is not None:
|
|
|
|
self.candidates[candidate].transfers += ballot.value
|
|
|
|
self.candidates[candidate].ballots.append((ballot, ballot.value))
|
|
|
|
else:
|
|
|
|
self.exhausted.transfers += ballot.value
|
|
|
|
self.exhausted.ballots.append((ballot, ballot.value))
|
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
self.compute_quota()
|
|
|
|
self.elect_meeting_quota()
|
|
|
|
|
2020-10-18 02:54:51 +11:00
|
|
|
__pragma__('opov')
|
2020-10-17 22:20:13 +11:00
|
|
|
return CountStepResult(
|
|
|
|
'First preferences',
|
|
|
|
self.candidates,
|
|
|
|
self.exhausted,
|
2020-10-18 02:54:51 +11:00
|
|
|
self.total + self.exhausted.votes,
|
2020-10-17 22:20:13 +11:00
|
|
|
self.quota
|
|
|
|
)
|
2020-10-18 02:54:51 +11:00
|
|
|
__pragma__('noopov')
|
2020-10-17 22:20:13 +11:00
|
|
|
|
|
|
|
def step(self):
|
|
|
|
# Step count cards
|
|
|
|
for candidate, count_card in self.candidates.items():
|
|
|
|
count_card.step()
|
|
|
|
self.exhausted.step()
|
|
|
|
|
|
|
|
# Have sufficient candidates been elected?
|
|
|
|
if sum(1 for c, cc in self.candidates.items() if cc.state == CandidateState.PROVISIONALLY_ELECTED or cc.state == CandidateState.ELECTED) >= self.election.seats:
|
|
|
|
return CountCompleted()
|
|
|
|
|
|
|
|
# Are there just enough candidates to fill all the seats
|
|
|
|
if sum(1 for c, cc in self.candidates.items() if cc.state == CandidateState.PROVISIONALLY_ELECTED or cc.state == CandidateState.ELECTED or cc.state == CandidateState.HOPEFUL) <= self.election.seats:
|
|
|
|
# Declare elected all remaining candidates
|
|
|
|
for candidate, count_card in self.candidates.items():
|
|
|
|
if count_card.state == CandidateState.HOPEFUL:
|
|
|
|
count_card.state = CandidateState.PROVISIONALLY_ELECTED
|
|
|
|
|
2020-10-18 02:54:51 +11:00
|
|
|
__pragma__('opov')
|
2020-10-17 22:20:13 +11:00
|
|
|
return CountStepResult(
|
|
|
|
'Bulk election',
|
|
|
|
self.candidates,
|
|
|
|
self.exhausted,
|
2020-10-18 02:54:51 +11:00
|
|
|
self.total + self.exhausted.votes,
|
2020-10-17 22:20:13 +11:00
|
|
|
self.quota
|
|
|
|
)
|
2020-10-18 02:54:51 +11:00
|
|
|
__pragma__('noopov')
|
2020-10-17 22:20:13 +11:00
|
|
|
|
|
|
|
# Do surpluses need to be distributed?
|
|
|
|
__pragma__('opov')
|
|
|
|
has_surplus = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.PROVISIONALLY_ELECTED and cc.votes > self.quota]
|
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
if len(has_surplus) > 0:
|
|
|
|
# Distribute surpluses in order of size
|
|
|
|
has_surplus.sort(lambda x: x[1].votes, True)
|
2020-10-17 23:09:29 +11:00
|
|
|
candidate_surplus, count_card = has_surplus[0]
|
2020-10-17 22:20:13 +11:00
|
|
|
count_card.state = CandidateState.ELECTED
|
|
|
|
|
|
|
|
__pragma__('opov')
|
|
|
|
surplus = count_card.votes - self.quota
|
2020-10-18 00:38:07 +11:00
|
|
|
transfer_value = surplus / count_card.votes
|
2020-10-17 22:20:13 +11:00
|
|
|
count_card.transfers -= surplus
|
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
# Transfer surplus
|
|
|
|
for ballot, ballot_value in count_card.ballots:
|
|
|
|
__pragma__('opov')
|
|
|
|
new_value = ballot_value * transfer_value
|
|
|
|
candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None)
|
|
|
|
|
|
|
|
if candidate is not None:
|
|
|
|
self.candidates[candidate].transfers += new_value
|
|
|
|
self.candidates[candidate].ballots.append((ballot, new_value))
|
|
|
|
else:
|
|
|
|
self.exhausted.transfers += new_value
|
|
|
|
self.exhausted.ballots.append((ballot, new_value))
|
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
# Declare any candidates meeting the quota as a result of surpluses
|
|
|
|
self.compute_quota()
|
|
|
|
self.elect_meeting_quota()
|
|
|
|
|
2020-10-18 02:54:51 +11:00
|
|
|
__pragma__('opov')
|
2020-10-17 22:20:13 +11:00
|
|
|
return CountStepResult(
|
2020-10-17 23:09:29 +11:00
|
|
|
'Surplus of ' + candidate_surplus.name,
|
2020-10-17 22:20:13 +11:00
|
|
|
self.candidates,
|
|
|
|
self.exhausted,
|
2020-10-18 02:54:51 +11:00
|
|
|
self.total + self.exhausted.votes,
|
2020-10-17 22:20:13 +11:00
|
|
|
self.quota
|
|
|
|
)
|
2020-10-18 02:54:51 +11:00
|
|
|
__pragma__('noopov')
|
2020-10-17 22:20:13 +11:00
|
|
|
|
|
|
|
# Insufficient winners and no surpluses to distribute
|
|
|
|
# Exclude the lowest ranked hopeful
|
|
|
|
hopefuls = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL]
|
|
|
|
hopefuls.sort(lambda x: x[1].votes)
|
|
|
|
|
|
|
|
# TODO: Handle ties
|
2020-10-17 23:09:29 +11:00
|
|
|
candidate_excluded, count_card = hopefuls[0]
|
2020-10-17 22:20:13 +11:00
|
|
|
count_card.state = CandidateState.EXCLUDED
|
|
|
|
__pragma__('opov')
|
|
|
|
count_card.transfers -= count_card.votes
|
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
# Exclude this candidate
|
|
|
|
for ballot, ballot_value in count_card.ballots:
|
|
|
|
__pragma__('opov')
|
|
|
|
candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None)
|
|
|
|
|
|
|
|
if candidate is not None:
|
|
|
|
self.candidates[candidate].transfers += ballot_value
|
|
|
|
self.candidates[candidate].ballots.append((ballot, ballot_value))
|
|
|
|
else:
|
|
|
|
self.exhausted.transfers += ballot_value
|
|
|
|
self.exhausted.ballots.append((ballot, ballot_value))
|
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
# Declare any candidates meeting the quota as a result of exclusion
|
|
|
|
self.compute_quota()
|
|
|
|
self.elect_meeting_quota()
|
|
|
|
|
2020-10-18 02:54:51 +11:00
|
|
|
__pragma__('opov')
|
2020-10-17 22:20:13 +11:00
|
|
|
return CountStepResult(
|
2020-10-17 23:09:29 +11:00
|
|
|
'Exclusion of ' + candidate_excluded.name,
|
2020-10-17 22:20:13 +11:00
|
|
|
self.candidates,
|
|
|
|
self.exhausted,
|
2020-10-18 02:54:51 +11:00
|
|
|
self.total + self.exhausted.votes,
|
2020-10-17 22:20:13 +11:00
|
|
|
self.quota
|
|
|
|
)
|
2020-10-18 02:54:51 +11:00
|
|
|
__pragma__('noopov')
|
2020-10-17 22:20:13 +11:00
|
|
|
|
|
|
|
def compute_quota(self):
|
|
|
|
# Compute quota
|
|
|
|
__pragma__('opov')
|
2020-10-18 02:54:51 +11:00
|
|
|
self.total = sum((cc.votes for c, cc in self.candidates.items()), Num('0'))
|
|
|
|
self.quota = self.total / Num(self.election.seats + 1)
|
2020-10-17 22:20:13 +11:00
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
def elect_meeting_quota(self):
|
|
|
|
# Does a candidate meet the quota?
|
|
|
|
__pragma__('opov')
|
|
|
|
meets_quota = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL and cc.votes > self.quota]
|
|
|
|
__pragma__('noopov')
|
|
|
|
|
|
|
|
if len(meets_quota) > 0:
|
|
|
|
# Declare elected any candidate who meets the quota
|
|
|
|
for candidate, count_card in meets_quota:
|
|
|
|
count_card.state = CandidateState.PROVISIONALLY_ELECTED
|