208 lines
7.3 KiB
Python
208 lines
7.3 KiB
Python
# pyRCV2: Preferential vote counting
|
|
# Copyright © 2020–2021 Lee Yingtong Li (RunasSudo)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
__pragma__ = lambda x: None
|
|
|
|
from pyRCV2.method.base_stv import BaseSTVCounter, STVException
|
|
from pyRCV2.model import CandidateState, CountCard, CountStepResult
|
|
from pyRCV2.numbers import Num
|
|
from pyRCV2.safedict import SafeDict
|
|
|
|
class MeekCountCard(CountCard):
|
|
def __init__(self, *args):
|
|
CountCard.__init__(self, *args)
|
|
self.keep_value = Num(1) # Not read by the count algorithm, but can be used for auditing
|
|
|
|
def clone(self):
|
|
"""Overrides CountCard.clone"""
|
|
result = MeekCountCard()
|
|
result.orig_votes = self.orig_votes
|
|
result.transfers = self.transfers
|
|
result.parcels = [[(b[0].clone(), b[1]) for b in p] for p in self.parcels]
|
|
result.state = self.state
|
|
result.order_elected = self.order_elected
|
|
result.keep_value = keep_value
|
|
return result
|
|
|
|
class MeekSTVCounter(BaseSTVCounter):
|
|
def __init__(self, *args):
|
|
BaseSTVCounter.__init__(self, *args)
|
|
self.candidates = SafeDict([(c, MeekCountCard()) for c in self.election.candidates])
|
|
|
|
self._quota_tolerance_ub = Num('1.0001')
|
|
self._quota_tolerance_lb = Num('0.9999')
|
|
|
|
def reset(self):
|
|
if self.options['quota_mode'] != 'progressive':
|
|
raise STVException('Meek method requires --quota-mode progressive')
|
|
if self.options['bulk_exclude']:
|
|
raise STVException('Meek method is incompatible with --bulk_exclude')
|
|
if self.options['defer_surpluses']:
|
|
raise STVException('Meek method is incompatible with --defer-surpluses')
|
|
if self.options['papers'] != 'both':
|
|
raise STVException('Meek method is incompatible with --transferable-only')
|
|
if self.options['exclusion'] != 'one_round':
|
|
raise STVException('Meek method requires --exclusion one_round')
|
|
if self.options['round_tvs'] is not None:
|
|
raise STVException('Meek method is incompatible with --round-tvs')
|
|
return BaseSTVCounter.reset(self)
|
|
|
|
def distribute_first_preferences(self):
|
|
"""
|
|
Overrides BaseSTVCounter.distribute_first_preferences
|
|
Unlike in other STV methods, this is called not only as part of reset() but also at other stages
|
|
"""
|
|
|
|
# Reset the count
|
|
# Carry over candidate states, keep values, etc.
|
|
new_candidates = SafeDict()
|
|
for candidate, count_card in self.candidates.items():
|
|
new_count_card = MeekCountCard()
|
|
new_count_card.state = count_card.state
|
|
new_count_card.keep_value = count_card.keep_value
|
|
new_count_card.order_elected = count_card.order_elected
|
|
|
|
__pragma__('opov')
|
|
new_candidates[candidate] = new_count_card
|
|
__pragma__('noopov')
|
|
|
|
self.candidates = new_candidates
|
|
self.exhausted = CountCard()
|
|
self.loss_fraction = CountCard()
|
|
|
|
# Distribute votes
|
|
for ballot in self.election.ballots:
|
|
remaining_value = Num(ballot.value) # Clone the value so we don't update ballot.value
|
|
for candidate in ballot.preferences:
|
|
__pragma__('opov')
|
|
count_card = self.candidates[candidate]
|
|
__pragma__('noopov')
|
|
|
|
if count_card.state == CandidateState.HOPEFUL:
|
|
# Hopeful candidate has keep value 1, so transfer entire remaining value
|
|
__pragma__('opov')
|
|
count_card.transfers += remaining_value
|
|
__pragma__('noopov')
|
|
remaining_value = Num(0)
|
|
break
|
|
elif count_card.state == CandidateState.EXCLUDED:
|
|
# Excluded candidate has keep value 0, so skip over this candidate
|
|
pass
|
|
elif count_card.state == CandidateState.ELECTED:
|
|
# Transfer according to elected candidate's keep value
|
|
__pragma__('opov')
|
|
count_card.transfers += remaining_value * count_card.keep_value
|
|
remaining_value *= (Num(1) - count_card.keep_value)
|
|
__pragma__('noopov')
|
|
else:
|
|
raise STVException('Unexpected candidate state')
|
|
|
|
# Credit exhausted votes
|
|
__pragma__('opov')
|
|
self.exhausted.transfers += remaining_value
|
|
__pragma__('noopov')
|
|
|
|
# Recompute transfers
|
|
if len(self.step_results) > 0:
|
|
last_result = self.step_results[len(self.step_results)-1]
|
|
__pragma__('opov')
|
|
for candidate, count_card in self.candidates.items():
|
|
count_card.continue_from(last_result.candidates[candidate])
|
|
self.exhausted.continue_from(last_result.exhausted)
|
|
self.loss_fraction.continue_from(last_result.loss_fraction)
|
|
__pragma__('noopov')
|
|
|
|
def distribute_surpluses(self):
|
|
"""
|
|
Overrides BaseSTVCounter.distribute_surpluses
|
|
Surpluses are distributed in Meek STV by recomputing the keep values, and redistributing all votes
|
|
"""
|
|
|
|
# Do surpluses need to be distributed?
|
|
__pragma__('opov')
|
|
has_surplus = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.ELECTED and cc.votes / self.quota > self._quota_tolerance_ub]
|
|
__pragma__('noopov')
|
|
|
|
if len(has_surplus) > 0:
|
|
while len(has_surplus) > 0:
|
|
# Recompute keep values
|
|
for candidate, count_card in has_surplus:
|
|
__pragma__('opov')
|
|
count_card.keep_value *= self.quota / count_card.votes
|
|
__pragma__('noopov')
|
|
|
|
# Redistribute votes
|
|
self.distribute_first_preferences()
|
|
|
|
# Recompute quota if more ballots have become exhausted
|
|
self.compute_quota()
|
|
|
|
__pragma__('opov')
|
|
has_surplus = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.ELECTED and cc.votes / self.quota > self._quota_tolerance_ub]
|
|
__pragma__('noopov')
|
|
|
|
# Declare elected any candidates meeting the quota as a result of surpluses
|
|
# NB: We could do this earlier, but this shows the flow of the election more clearly in the count sheet
|
|
self.elect_meeting_quota()
|
|
|
|
__pragma__('opov')
|
|
result = CountStepResult(
|
|
'Surpluses distributed',
|
|
self.candidates,
|
|
self.exhausted,
|
|
self.loss_fraction,
|
|
self.total + self.exhausted.votes + self.loss_fraction.votes,
|
|
self.quota,
|
|
self.vote_required_election,
|
|
)
|
|
__pragma__('noopov')
|
|
|
|
self.step_results.append(result)
|
|
return result
|
|
|
|
def do_exclusion(self, candidates_excluded):
|
|
"""
|
|
Overrides BaseSTVCounter.do_exclusion
|
|
"""
|
|
for candidate, count_card in candidates_excluded:
|
|
count_card.state = CandidateState.EXCLUDED
|
|
|
|
# Redistribute votes
|
|
self.distribute_first_preferences()
|
|
|
|
def elect_meeting_quota(self):
|
|
"""
|
|
Overrides BaseSTVCounter.elect_meeting_quota
|
|
Skip directly to CandidateState.ELECTED
|
|
"""
|
|
|
|
# Does a candidate meet the quota?
|
|
meets_quota = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL and self.meets_quota(cc)]
|
|
|
|
if len(meets_quota) > 0:
|
|
meets_quota.sort(key=lambda x: x[1].votes, reverse=True)
|
|
|
|
# Declare elected any candidate who meets the quota
|
|
while len(meets_quota) > 0:
|
|
x = self.choose_highest(meets_quota)
|
|
candidate, count_card = x[0], x[1]
|
|
|
|
count_card.state = CandidateState.ELECTED
|
|
self.num_elected += 1
|
|
count_card.order_elected = self.num_elected
|
|
|
|
meets_quota.remove(x)
|