# pyRCV2: Preferential vote counting # Copyright © 2020 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . __pragma__ = lambda x: None from pyRCV2.model import CandidateState, CountCard, CountCompleted, CountStepResult from pyRCV2.numbers import Num, Rational from pyRCV2.safedict import SafeDict from pyRCV2.ties import TiesBackwards, TiesPrompt, TiesRandom class STVException(Exception): pass class BaseSTVCounter: """ Basic STV counter for various different variations """ def __init__(self, election, options=None): self.election = election self.cls_ballot_value = Num # Need to use Rational in unweighted inclusive Gregory # Default options self.options = { 'prog_quota': False, # Progressively reducing quota? 'quota': 'droop', # 'droop', 'droop_exact', 'hare' or 'hare_exact' 'quota_criterion': 'geq', # 'geq' or 'gt' 'surplus_order': 'size', # 'size' or 'order' 'ties': [TiesBackwards(), TiesRandom()] } if options is not None: self.options.update(options) self.candidates = SafeDict([(c, CountCard()) for c in self.election.candidates]) self.exhausted = CountCard() self.loss_fraction = CountCard() self.total_orig = sum((b.value for b in self.election.ballots), Num('0')) self.num_elected = 0 # Withdraw candidates for candidate in self.election.withdrawn: __pragma__('opov') self.candidates[candidate].state = CandidateState.WITHDRAWN __pragma__('noopov') def reset(self): """ Public function: Reset the count and perform the first step """ # Distribute first preferences for ballot in self.election.ballots: __pragma__('opov') candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) if candidate is not None: self.candidates[candidate].transfers += ballot.value self.candidates[candidate].ballots.append((ballot, self.cls_ballot_value(ballot.value))) else: self.exhausted.transfers += ballot.value self.exhausted.ballots.append((ballot, self.cls_ballot_value(ballot.value))) __pragma__('noopov') self.quota = None self.compute_quota() self.elect_meeting_quota() __pragma__('opov') return CountStepResult( 'First preferences', self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota ) __pragma__('noopov') def step(self): """ Public function: Perform one step of the STV count """ # Step count cards self.step_count_cards() # Check if done result = self.check_if_done() if result: return result # Distribute surpluses result = self.distribute_surpluses() if result: return result # Insufficient winners and no surpluses to distribute # Exclude the lowest ranked hopeful result = self.exclude_candidate() if result: return result raise STVException('Unable to complete step') def step_count_cards(self): """ Reset the count cards for the beginning of a new step """ for candidate, count_card in self.candidates.items(): count_card.step() self.exhausted.step() self.loss_fraction.step() def check_if_done(self): """ Check if the count can be completed """ # Have sufficient candidates been elected? if self.num_elected >= self.election.seats: return CountCompleted() # Are there just enough candidates to fill all the seats if self.num_elected + sum(1 for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL) <= self.election.seats: # TODO: Sort by size # Declare elected all remaining candidates for candidate, count_card in self.candidates.items(): if count_card.state == CandidateState.HOPEFUL: count_card.state = CandidateState.PROVISIONALLY_ELECTED count_card.order_elected = self.num_elected self.num_elected += 1 __pragma__('opov') return CountStepResult( 'Bulk election', self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota ) __pragma__('noopov') def distribute_surpluses(self): """ Distribute surpluses, if any """ # Do surpluses need to be distributed? __pragma__('opov') has_surplus = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.PROVISIONALLY_ELECTED and cc.votes > self.quota] __pragma__('noopov') if len(has_surplus) > 0: # Distribute surpluses in specified order if self.options['surplus_order'] == 'size': has_surplus.sort(key=lambda x: x[1].votes, reverse=True) candidate_surplus, count_card = self.choose_highest(has_surplus) # May need to break ties elif self.options['surplus_order'] == 'order': has_surplus.sort(key=lambda x: x[1].order_elected) candidate_surplus, count_card = has_surplus[0] # Ties were already broken when these were assigned else: raise STVException('Invalid surplus order option') count_card.state = CandidateState.ELECTED __pragma__('opov') surplus = count_card.votes - self.quota __pragma__('noopov') # Transfer surplus self.do_surplus(candidate_surplus, count_card, surplus) __pragma__('opov') count_card.transfers -= surplus __pragma__('noopov') # Declare elected any candidates meeting the quota as a result of surpluses self.compute_quota() self.elect_meeting_quota() __pragma__('opov') return CountStepResult( 'Surplus of ' + candidate_surplus.name, self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota ) __pragma__('noopov') def do_surplus(self, candidate_surplus, count_card, surplus): """ Transfer the surplus of the given candidate Subclasses must override this function """ raise NotImplementedError('Method not implemented') def exclude_candidate(self): """ Exclude the lowest ranked hopeful """ candidate_excluded, count_card = self.candidate_to_exclude() count_card.state = CandidateState.EXCLUDED # Exclude this candidate self.do_exclusion(candidate_excluded, count_card) __pragma__('opov') count_card.transfers -= count_card.votes __pragma__('noopov') # Declare any candidates meeting the quota as a result of exclusion self.compute_quota() self.elect_meeting_quota() __pragma__('opov') return CountStepResult( 'Exclusion of ' + candidate_excluded.name, self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota ) __pragma__('noopov') def candidate_to_exclude(self): """ Determine the candidate to exclude """ hopefuls = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL] hopefuls.sort(key=lambda x: x[1].votes) candidate_excluded, count_card = self.choose_lowest(hopefuls) return candidate_excluded, count_card def do_exclusion(self, candidate_excluded, count_card): """ Exclude the given candidate and transfer the votes Subclasses must override this function """ raise NotImplementedError('Method not implemented') def compute_quota(self): """ Recount total votes and (if applicable) recalculate the quota """ __pragma__('opov') self.total = sum((cc.votes for c, cc in self.candidates.items()), Num('0')) self.loss_fraction.transfers += (self.total_orig - self.total - self.exhausted.votes) - self.loss_fraction.votes if self.quota is None or self.options['prog_quota']: if self.options['quota'] == 'droop': self.quota = (self.total / Num(self.election.seats + 1)).__floor__() + Num('1') elif self.options['quota'] == 'droop_exact': self.quota = self.total / Num(self.election.seats + 1) elif self.options['quota'] == 'hare': self.quota = (self.total / Num(self.election.seats)).__floor__() + Num('1') elif self.options['quota'] == 'hare_exact': self.quota = self.total / Num(self.election.seats) else: raise STVException('Invalid quota option') __pragma__('noopov') def meets_quota(self, count_card): """ Determine if the given candidate meets the quota """ if self.options['quota_criterion'] == 'geq': __pragma__('opov') return count_card.votes >= self.quota __pragma__('noopov') elif self.options['quota_criterion'] == 'gt': __pragma__('opov') return count_card.votes > self.quota __pragma__('noopov') else: raise STVException('Invalid quota criterion') def elect_meeting_quota(self): """ Elect all candidates meeting the quota """ # Does a candidate meet the quota? meets_quota = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL and self.meets_quota(cc)] if len(meets_quota) > 0: meets_quota.sort(key=lambda x: x[1].votes, reverse=True) # Declare elected any candidate who meets the quota while len(meets_quota) > 0: x = self.choose_highest(meets_quota) candidate, count_card = x[0], x[1] count_card.state = CandidateState.PROVISIONALLY_ELECTED count_card.order_elected = self.num_elected self.num_elected += 1 meets_quota.remove(x) def choose_lowest(self, l): """ Provided a list of tuples (Candidate, CountCard), sorted in ASCENDING order of votes, choose the tuple with the fewest votes, breaking ties appropriately """ if len(l) == 1: return l[0] tied = [(c, cc) for c, cc in l if cc.votes == l[0][1].votes] if len(tied) == 1: return tied[0] # A tie exists for tie in self.options['ties']: result = tie.choose_lowest(tied) if result is not None: return result raise Exception('Unable to resolve tie') def choose_highest(self, l): """ Provided a list of tuples (Candidate, CountCard), sorted in DESCENDING order of votes, choose the tuple with the most votes, breaking ties appropriately """ if len(l) == 1: return l[0] tied = [(c, cc) for c, cc in l if cc.votes == l[0][1].votes] if len(tied) == 1: return tied[0] # A tie exists for tie in self.options['ties']: result = tie.choose_highest(tied) if result is not None: return result raise Exception('Unable to resolve tie') class BaseWIGSTVCounter(BaseSTVCounter): """ Basic weighted inclusive Gregory STV counter """ def do_surplus(self, candidate_surplus, count_card, surplus): for ballot, ballot_value in count_card.ballots: __pragma__('opov') new_value = (ballot_value * surplus) / count_card.votes # Multiply first to avoid rounding errors candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) if candidate is not None: self.candidates[candidate].transfers += new_value self.candidates[candidate].ballots.append((ballot, new_value)) else: self.exhausted.transfers += new_value self.exhausted.ballots.append((ballot, new_value)) __pragma__('noopov') def do_exclusion(self, candidate_excluded, count_card): for ballot, ballot_value in count_card.ballots: __pragma__('opov') candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) if candidate is not None: self.candidates[candidate].transfers += ballot_value self.candidates[candidate].ballots.append((ballot, ballot_value)) else: self.exhausted.transfers += ballot_value self.exhausted.ballots.append((ballot, ballot_value)) __pragma__('noopov') class BaseUIGSTVCounter(BaseSTVCounter): """ Basic unweighted inclusive Gregory STV counter """ def __init__(self, *args, **kwargs): BaseSTVCounter.__init__(self, *args, **kwargs) # Need to use Rational for ballot value internally, as Num may be set to integers only self.cls_ballot_value = Rational def do_surplus(self, candidate_surplus, count_card, surplus): # FIXME: Is it okay to use native int's here? next_preferences = SafeDict([(c, []) for c, cc in self.candidates.items()]) next_exhausted = [] total_ballots = Num('0') # Count next preferences for ballot, ballot_value in count_card.ballots: __pragma__('opov') total_ballots += ballot.value candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) __pragma__('noopov') if candidate is not None: __pragma__('opov') next_preferences[candidate].append(ballot) __pragma__('noopov') else: next_exhausted.append(ballot) # Make transfers for candidate, cand_ballots in next_preferences.items(): num_ballots = sum((b.value for b in cand_ballots), Num('0')) __pragma__('opov') self.candidates[candidate].transfers += (num_ballots * surplus) / total_ballots # Multiply first to avoid rounding errors __pragma__('noopov') for ballot in cand_ballots: __pragma__('opov') new_value = (ballot.value.to_rational() * surplus.to_rational()) / total_ballots.to_rational() self.candidates[candidate].ballots.append((ballot, new_value)) __pragma__('noopov') num_exhausted = sum((b.value for b in next_exhausted), Num('0')) __pragma__('opov') self.exhausted.transfers += (num_exhausted * surplus) / total_ballots __pragma__('noopov') for ballot in next_exhausted: __pragma__('opov') new_value = (ballot.value.to_rational() * surplus.to_rational()) / total_ballots.to_rational() __pragma__('noopov') self.exhausted.ballots.append((ballot, new_value)) def do_exclusion(self, candidate_excluded, count_card): next_preferences = SafeDict([(c, Rational('0')) for c, cc in self.candidates.items()]) next_exhausted = Rational('0') total_votes = Rational('0') # Count and transfer next preferences for ballot, ballot_value in count_card.ballots: __pragma__('opov') total_votes += ballot_value candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) __pragma__('noopov') if candidate is not None: __pragma__('opov') next_preferences[candidate] += ballot_value self.candidates[candidate].ballots.append((ballot, ballot_value)) __pragma__('noopov') else: __pragma__('opov') next_exhausted += ballot_value __pragma__('noopov') self.exhausted.ballots.append((ballot, ballot_value)) # Credit votes for candidate, cand_votes in next_preferences.items(): __pragma__('opov') self.candidates[candidate].transfers += cand_votes.to_num() __pragma__('noopov') __pragma__('opov') self.exhausted.transfers += next_exhausted.to_num() __pragma__('noopov')