# pyRCV2: Preferential vote counting # Copyright © 2020 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . __pragma__ = lambda x: None from pyRCV2.model import CandidateState, CountCard, CountCompleted, CountStepResult from pyRCV2.numbers import Num from pyRCV2.safedict import SafeDict class STVCCounter: """Count an STV election using pyRCV STV-C rules""" def __init__(self, election): self.election = election def reset(self): self.candidates = SafeDict([(c, CountCard()) for c in self.election.candidates]) self.exhausted = CountCard() self.loss_fraction = CountCard() self.total_orig = sum((b.value for b in self.election.ballots), Num('0')) # Withdraw candidates for candidate in self.election.withdrawn: __pragma__('opov') self.candidates[candidate].state = CandidateState.WITHDRAWN __pragma__('noopov') # Distribute first preferences for ballot in self.election.ballots: __pragma__('opov') candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) if candidate is not None: self.candidates[candidate].transfers += ballot.value self.candidates[candidate].ballots.append((ballot, ballot.value)) else: self.exhausted.transfers += ballot.value self.exhausted.ballots.append((ballot, ballot.value)) __pragma__('noopov') self.compute_quota() self.elect_meeting_quota() __pragma__('opov') return CountStepResult( 'First preferences', self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota ) __pragma__('noopov') def step(self): # Step count cards for candidate, count_card in self.candidates.items(): count_card.step() self.exhausted.step() self.loss_fraction.step() # Have sufficient candidates been elected? if sum(1 for c, cc in self.candidates.items() if cc.state == CandidateState.PROVISIONALLY_ELECTED or cc.state == CandidateState.ELECTED) >= self.election.seats: return CountCompleted() # Are there just enough candidates to fill all the seats if sum(1 for c, cc in self.candidates.items() if cc.state == CandidateState.PROVISIONALLY_ELECTED or cc.state == CandidateState.ELECTED or cc.state == CandidateState.HOPEFUL) <= self.election.seats: # Declare elected all remaining candidates for candidate, count_card in self.candidates.items(): if count_card.state == CandidateState.HOPEFUL: count_card.state = CandidateState.PROVISIONALLY_ELECTED __pragma__('opov') return CountStepResult( 'Bulk election', self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota ) __pragma__('noopov') # Do surpluses need to be distributed? __pragma__('opov') has_surplus = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.PROVISIONALLY_ELECTED and cc.votes > self.quota] __pragma__('noopov') if len(has_surplus) > 0: # Distribute surpluses in order of size has_surplus.sort(lambda x: x[1].votes, True) candidate_surplus, count_card = has_surplus[0] count_card.state = CandidateState.ELECTED __pragma__('opov') surplus = count_card.votes - self.quota #transfer_value = surplus / count_card.votes # Do not do this yet to avoid rounding errors __pragma__('noopov') # Transfer surplus for ballot, ballot_value in count_card.ballots: __pragma__('opov') new_value = (ballot_value * surplus) / count_card.votes candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) if candidate is not None: self.candidates[candidate].transfers += new_value self.candidates[candidate].ballots.append((ballot, new_value)) else: self.exhausted.transfers += new_value self.exhausted.ballots.append((ballot, new_value)) __pragma__('noopov') __pragma__('opov') count_card.transfers -= surplus __pragma__('noopov') # Declare any candidates meeting the quota as a result of surpluses self.compute_quota() self.elect_meeting_quota() __pragma__('opov') return CountStepResult( 'Surplus of ' + candidate_surplus.name, self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota ) __pragma__('noopov') # Insufficient winners and no surpluses to distribute # Exclude the lowest ranked hopeful hopefuls = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL] hopefuls.sort(lambda x: x[1].votes) # TODO: Handle ties candidate_excluded, count_card = hopefuls[0] count_card.state = CandidateState.EXCLUDED __pragma__('opov') count_card.transfers -= count_card.votes __pragma__('noopov') # Exclude this candidate for ballot, ballot_value in count_card.ballots: __pragma__('opov') candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) if candidate is not None: self.candidates[candidate].transfers += ballot_value self.candidates[candidate].ballots.append((ballot, ballot_value)) else: self.exhausted.transfers += ballot_value self.exhausted.ballots.append((ballot, ballot_value)) __pragma__('noopov') # Declare any candidates meeting the quota as a result of exclusion self.compute_quota() self.elect_meeting_quota() __pragma__('opov') return CountStepResult( 'Exclusion of ' + candidate_excluded.name, self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota ) __pragma__('noopov') def compute_quota(self): # Compute quota __pragma__('opov') self.total = sum((cc.votes for c, cc in self.candidates.items()), Num('0')) self.loss_fraction.transfers += (self.total_orig - self.total - self.exhausted.votes) - self.loss_fraction.votes self.quota = self.total / Num(self.election.seats + 1) __pragma__('noopov') def elect_meeting_quota(self): # Does a candidate meet the quota? __pragma__('opov') meets_quota = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL and cc.votes > self.quota] __pragma__('noopov') if len(meets_quota) > 0: # Declare elected any candidate who meets the quota for candidate, count_card in meets_quota: count_card.state = CandidateState.PROVISIONALLY_ELECTED