# pyRCV2: Preferential vote counting # Copyright © 2020–2021 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . __pragma__ = lambda x: None is_py = False __pragma__('skip') is_py = True __pragma__('noskip') from pyRCV2.model import CandidateState, CountCard, CountCompleted, CountStepResult from pyRCV2.numbers import Num from pyRCV2.safedict import SafeDict # Stubs for JS def groupby(iterable, keyfunc): if is_py: __pragma__('skip') import itertools return [list(g) for k, g in itertools.groupby(iterable, keyfunc)] __pragma__('noskip') else: groups = [] group = [] last_result = None for i in iterable: this_result = keyfunc(i) __pragma__('opov') if last_result is not None and this_result != last_result: __pragma__('noopov') groups.append(group) group = [] last_result = this_result group.append(i) if group: groups.append(group) return groups class STVException(Exception): def __init__(self, message): Exception.__init__(self) self.message = message class BaseSTVCounter: """ Basic STV counter for various different variations """ def __init__(self, election, options=None): self.election = election # Default options self.options = { 'bulk_elect': True, # Bulk election? 'bulk_exclude': False, # Bulk exclusion? 'defer_surpluses': False, # Defer surpluses? 'quota': 'droop', # 'droop', 'droop_exact', 'hare' or 'hare_exact' 'quota_criterion': 'geq', # 'geq' or 'gt' 'quota_mode': 'static', # 'static', 'progressive' or 'ers97' 'surplus_order': 'size', # 'size' or 'order' 'papers': 'both', # 'both' or 'transferable' 'exclusion': 'one_round', # 'one_round', 'parcels_by_order', 'by_value' or 'wright' 'ties': [], # List of tie strategies (e.g. TiesRandom) 'round_quota': None, # Number of decimal places or None 'round_votes': None, # Number of decimal places or None 'round_tvs': None, # Number of decimal places or None 'round_weights': None, # Number of decimal places or None } if options is not None: self.options.update(options) self.candidates = SafeDict([(c, CountCard()) for c in self.election.candidates]) self.exhausted = CountCard() self.loss_fraction = CountCard() self.total_orig = sum((b.value for b in self.election.ballots), Num('0')) self.num_elected = 0 # Withdraw candidates for candidate in self.election.withdrawn: __pragma__('opov') self.candidates[candidate].state = CandidateState.WITHDRAWN __pragma__('noopov') def reset(self): """ Public function: Perform the first step (distribute first preferences) Does not reset the states of candidates, etc. """ self._exclusion = None # Optimisation to avoid re-collating/re-sorting ballots self.distribute_first_preferences() self.quota = None self.vote_required_election = None # For ERS97 self.compute_quota() self.elect_meeting_quota() __pragma__('opov') result = CountStepResult( 'First preferences', self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota, self.vote_required_election, ) __pragma__('noopov') self.step_results = [result] return result def distribute_first_preferences(self): """ Distribute first preferences (called as part of the reset() step) """ for ballot in self.election.ballots: __pragma__('opov') candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) if candidate is not None: self.candidates[candidate].transfers += ballot.value if len(self.candidates[candidate].parcels) == 0: self.candidates[candidate].parcels.append([(ballot, Num(ballot.value))]) else: self.candidates[candidate].parcels[0].append((ballot, Num(ballot.value))) else: self.exhausted.transfers += ballot.value #self.exhausted.parcels[0].append((ballot, Num(ballot.value))) __pragma__('noopov') def step(self): """ Public function: Perform one step of the STV count """ # Step count cards self.step_count_cards() # Check if done result = self.before_surpluses() if result: return result # Distribute surpluses result = self.distribute_surpluses() if result: return result # Check if done (2) result = self.before_exclusion() if result: return result # Insufficient winners and no surpluses to distribute # Exclude the lowest ranked hopeful(s) result = self.exclude_candidates() if result: return result raise STVException('Unable to complete step') def step_count_cards(self): """ Reset the count cards for the beginning of a new step """ for candidate, count_card in self.candidates.items(): count_card.step() self.exhausted.step() self.loss_fraction.step() def before_surpluses(self): """ Check if the count can be completed before distributing surpluses """ # Have sufficient candidates been elected? if self.num_elected >= self.election.seats: return CountCompleted() # Are there just enough candidates to fill all the seats? if self.options['bulk_elect']: if self.num_elected + sum(1 for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL or cc.state == CandidateState.EXCLUDING) <= self.election.seats: # Declare elected all remaining candidates for candidate, count_card in self.candidates.items(): if count_card.state == CandidateState.HOPEFUL: count_card.state = CandidateState.PROVISIONALLY_ELECTED count_card.order_elected = self.num_elected self.num_elected += 1 __pragma__('opov') result = CountStepResult( 'Bulk election', self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota, self.vote_required_election, ) __pragma__('noopov') self.step_results.append(result) return result def can_defer_surpluses(self, has_surplus): """ Determine if the specified surpluses can be deferred """ # Do not defer if this could change the last 2 candidates __pragma__('opov') total_surpluses = sum((cc.votes - self.quota for c, cc in has_surplus), Num(0)) __pragma__('noopov') hopefuls = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL] hopefuls.sort(key=lambda x: x[1].votes) __pragma__('opov') if total_surpluses > hopefuls[1][1].votes - hopefuls[0][1].votes: return False __pragma__('noopov') # Do not defer if this could affect a bulk exclusion if self.options['bulk_exclude']: to_bulk_exclude = self.candidates_to_bulk_exclude(hopefuls) if len(to_bulk_exclude) > 0: total_excluded = sum((cc.votes for c, cc in to_bulk_exclude), Num(0)) __pragma__('opov') if total_surpluses > hopefuls[len(to_bulk_exclude) + 1][1].votes - total_excluded: return False __pragma__('opov') # Can defer surpluses return True def distribute_surpluses(self): """ Distribute surpluses, if any """ # Do not interrupt an exclusion if any(cc.state == CandidateState.EXCLUDING for c, cc in self.candidates.items()): return candidate_surplus, count_card = None, None # Are we distributing a surplus? has_surplus = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.DISTRIBUTING_SURPLUS] if len(has_surplus) > 0: candidate_surplus, count_card = has_surplus[0] else: # Do surpluses need to be distributed? __pragma__('opov') has_surplus = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.PROVISIONALLY_ELECTED and cc.votes > self.quota] __pragma__('noopov') if len(has_surplus) > 0: # Distribute surpluses in specified order if self.options['surplus_order'] == 'size': has_surplus.sort(key=lambda x: x[1].votes, reverse=True) elif self.options['surplus_order'] == 'order': has_surplus.sort(key=lambda x: x[1].order_elected) else: raise STVException('Invalid surplus order option') # Attempt to defer all remaining surpluses if possible if self.options['defer_surpluses']: if self.can_defer_surpluses(has_surplus): has_surplus = [] if len(has_surplus) > 0: # Cannot defer any surpluses if self.options['surplus_order'] == 'size': candidate_surplus, count_card = self.choose_highest(has_surplus) # May need to break ties elif self.options['surplus_order'] == 'order': candidate_surplus, count_card = has_surplus[0] # Ties were already broken when these were assigned if candidate_surplus is not None: count_card.state = CandidateState.DISTRIBUTING_SURPLUS __pragma__('opov') surplus = count_card.votes - self.quota __pragma__('noopov') # Transfer surplus self.do_surplus(candidate_surplus, count_card, surplus) # Declare elected any candidates meeting the quota as a result of surpluses self.compute_quota() self.elect_meeting_quota() __pragma__('opov') result = CountStepResult( 'Surplus of ' + candidate_surplus.name, self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota, self.vote_required_election, ) __pragma__('noopov') self.step_results.append(result) return result def do_surplus(self, candidate_surplus, count_card, surplus): """ Transfer the surplus of the given candidate Subclasses must override this function """ raise NotImplementedError('Method not implemented') def before_exclusion(self): """ Check before excluding a candidate """ # If we did not perform bulk election in before_surpluses: Are there just enough candidates to fill all the seats? if not self.options['bulk_elect']: if self.num_elected + sum(1 for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL) <= self.election.seats: # Declare elected one remaining candidate at a time hopefuls = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL] hopefuls.sort(key=lambda x: x[1].votes, reverse=True) candidate_elected, count_card = self.choose_highest(hopefuls) count_card.state = CandidateState.PROVISIONALLY_ELECTED count_card.order_elected = self.num_elected self.num_elected += 1 __pragma__('opov') result = CountStepResult( 'Bulk election', self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota, self.vote_required_election, ) __pragma__('noopov') self.step_results.append(result) return result def exclude_candidates(self): """ Exclude the lowest ranked hopeful(s) """ candidates_excluded = self.candidates_to_exclude() for candidate, count_card in candidates_excluded: count_card.state = CandidateState.EXCLUDING # Handle Wright STV if self.options['exclusion'] == 'wright': for candidate, count_card in candidates_excluded: count_card.state = CandidateState.EXCLUDED # Reset the count # Carry over certain candidate states new_candidates = SafeDict() for candidate, count_card in self.candidates.items(): new_count_card = CountCard() if count_card.state == CandidateState.WITHDRAWN: new_count_card.state = CandidateState.WITHDRAWN elif count_card.state == CandidateState.EXCLUDED: new_count_card.state = CandidateState.EXCLUDED __pragma__('opov') new_candidates[candidate] = new_count_card __pragma__('noopov') self.candidates = new_candidates self.exhausted = CountCard() self.loss_fraction = CountCard() self.num_elected = 0 step_results = self.step_results # Carry over step results result = self.reset() self.step_results = step_results result.comment = 'Exclusion of ' + ', '.join([c.name for c, cc in candidates_excluded]) return result # Exclude this candidate self.do_exclusion(candidates_excluded) # Declare any candidates meeting the quota as a result of exclusion self.compute_quota() self.elect_meeting_quota() __pragma__('opov') result = CountStepResult( 'Exclusion of ' + ', '.join([c.name for c, cc in candidates_excluded]), self.candidates, self.exhausted, self.loss_fraction, self.total + self.exhausted.votes + self.loss_fraction.votes, self.quota, self.vote_required_election, ) __pragma__('noopov') self.step_results.append(result) return result def candidates_to_bulk_exclude(self, hopefuls): """ Determine which candidates can be bulk excluded Returns List[Tuple[Candidate, CountCard]] """ remaining_candidates = self.num_elected + sum(1 for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL) __pragma__('opov') total_surpluses = sum((cc.votes - self.quota for c, cc in self.candidates.items() if cc.votes > self.quota), Num(0)) __pragma__('noopov') # Attempt to exclude as many candidates as possible for i in range(0, len(hopefuls)): try_exclude = hopefuls[0:len(hopefuls)-i] # Do not exclude if this splits tied candidates __pragma__('opov') if i != 0 and try_exclude[len(hopefuls)-i-1][1].votes == hopefuls[len(hopefuls)-i][1].votes: continue __pragma__('noopov') # Do not exclude if this leaves insufficient candidates if remaining_candidates - len(try_exclude) < self.election.seats: continue # Do not exclude if this could change the order of exclusion total_votes = sum((cc.votes for c, cc in try_exclude), Num(0)) __pragma__('opov') if i != 0 and total_votes + total_surpluses > hopefuls[len(hopefuls)-i][1].votes: continue __pragma__('noopov') # Can bulk exclude return try_exclude return [] def candidates_to_exclude(self): """ Determine the candidate(s) to exclude Returns List[Tuple[Candidate, CountCard]] """ # Continue current exclusion if applicable if self._exclusion is not None: __pragma__('opov') return self._exclusion[0] __pragma__('noopov') hopefuls = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL] hopefuls.sort(key=lambda x: x[1].votes) candidates_excluded = [] # Bulk exclusion if self.options['bulk_exclude']: if self.options['exclusion'] == 'parcels_by_order': # Ordering of parcels is not defined in this case raise STVException('Cannot use bulk_exclude with parcels_by_order') candidates_excluded = self.candidates_to_bulk_exclude(hopefuls) if len(candidates_excluded) == 0: candidates_excluded = [self.choose_lowest(hopefuls)] return candidates_excluded def do_exclusion(self, candidates_excluded): """ Exclude the given candidate and transfer the votes Subclasses must override this function """ raise NotImplementedError('Method not implemented') def compute_quota(self): """ Recount total votes and (if applicable) recalculate the quota """ __pragma__('opov') self.total = sum((cc.votes for c, cc in self.candidates.items()), Num('0')) self.loss_fraction.transfers += (self.total_orig - self.total - self.exhausted.votes) - self.loss_fraction.votes if self.quota is None or self.options['quota_mode'] == 'progressive': if self.options['quota'] == 'droop' or self.options['quota'] == 'droop_exact': self.quota = self.total / Num(self.election.seats + 1) elif self.options['quota'] == 'hare' or self.options['quota'] == 'hare_exact': self.quota = self.total / Num(self.election.seats) else: raise STVException('Invalid quota option') if self.options['round_quota'] is not None: if self.options['quota'] == 'droop' or self.options['quota'] == 'hare': # Increment to next available increment factor = Num(10).__pow__(self.options['round_quota']) __pragma__('opov') self.quota = ((self.quota * factor).__floor__() + Num(1)) / factor __pragma__('noopov') else: # Round up (preserving the original quota if exact) self.quota = self.quota.round(self.options['round_quota'], self.quota.ROUND_UP) __pragma__('noopov') if self.options['quota_mode'] == 'ers97': # Calculate the total active vote __pragma__('opov') total_active_vote = sum((cc.votes for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL), Num('0')) + sum((cc.votes - self.quota for c, cc in self.candidates.items() if cc.votes > self.quota), Num('0')) self.vote_required_election = total_active_vote / Num(self.election.seats - self.num_elected + 1) __pragma__('noopov') def meets_quota(self, count_card): """ Determine if the given candidate meets the quota """ if self.options['quota_criterion'] == 'geq': __pragma__('opov') return count_card.votes >= self.quota or (self.options['quota_mode'] == 'ers97' and count_card.votes >= self.vote_required_election) __pragma__('noopov') elif self.options['quota_criterion'] == 'gt': __pragma__('opov') return count_card.votes > self.quota or (self.options['quota_mode'] == 'ers97' and count_card.votes > self.vote_required_election) __pragma__('noopov') else: raise STVException('Invalid quota criterion') def elect_meeting_quota(self): """ Elect all candidates meeting the quota """ # Does a candidate meet the quota? meets_quota = [(c, cc) for c, cc in self.candidates.items() if cc.state == CandidateState.HOPEFUL and self.meets_quota(cc)] if len(meets_quota) > 0: meets_quota.sort(key=lambda x: x[1].votes, reverse=True) # Declare elected any candidate who meets the quota while len(meets_quota) > 0: x = self.choose_highest(meets_quota) candidate, count_card = x[0], x[1] count_card.state = CandidateState.PROVISIONALLY_ELECTED count_card.order_elected = self.num_elected self.num_elected += 1 meets_quota.remove(x) if self.options['quota_mode'] == 'ers97': self.elect_meeting_quota() # Repeat as the vote required for election may have changed # ----------------- # UTILITY FUNCTIONS # ----------------- def next_preferences(self, parcels): """ Examine the specified ballots and group ballot papers by next available preference """ # SafeDict: Candidate -> [List[Ballot], ballots, votes] next_preferences = SafeDict([(c, [[], Num('0'), Num('0')]) for c, cc in self.candidates.items()]) total_ballots = Num('0') total_votes = Num('0') next_exhausted = [] exhausted_ballots = Num('0') exhausted_votes = Num('0') for parcel in parcels: for ballot, ballot_value in parcel: __pragma__('opov') total_ballots += ballot.value total_votes += ballot_value candidate = next((c for c in ballot.preferences if self.candidates[c].state == CandidateState.HOPEFUL), None) if candidate is not None: next_preferences[candidate][0].append((ballot, ballot_value)) next_preferences[candidate][1] += ballot.value next_preferences[candidate][2] += ballot_value else: next_exhausted.append((ballot, ballot_value)) exhausted_ballots += ballot.value exhausted_votes += ballot_value __pragma__('noopov') return next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes def choose_lowest(self, l): """ Provided a list of tuples (Candidate, CountCard), sorted in ASCENDING order of votes, choose the tuple with the fewest votes, breaking ties appropriately """ if len(l) == 1: return l[0] __pragma__('opov') # Do not use (c, cc) for c, cc in ... as this will break equality in JS tied = [x for x in l if x[1].votes == l[0][1].votes] __pragma__('noopov') if len(tied) == 1: return tied[0] # A tie exists for tie in self.options['ties']: result = tie.choose_lowest(tied) if result is not None: return result raise STVException('Unable to resolve tie') def choose_highest(self, l): """ Provided a list of tuples (Candidate, CountCard), sorted in DESCENDING order of votes, choose the tuple with the most votes, breaking ties appropriately """ if len(l) == 1: return l[0] __pragma__('opov') # Do not use (c, cc) for c, cc in ... as this will break equality in JS tied = [x for x in l if x[1].votes == l[0][1].votes] __pragma__('noopov') if len(tied) == 1: return tied[0] # A tie exists for tie in self.options['ties']: result = tie.choose_highest(tied) if result is not None: return result raise STVException('Unable to resolve tie') def round_votes(self, num): if self.options['round_votes'] is None: return num return num.round(self.options['round_votes'], num.ROUND_DOWN) def round_weight(self, num): if self.options['round_weights'] is None: return num return num.round(self.options['round_weights'], num.ROUND_DOWN) def round_tv(self, num): if self.options['round_tvs'] is None: return num return num.round(self.options['round_tvs'], num.ROUND_DOWN) class WIGSTVCounter(BaseSTVCounter): """ Basic weighted inclusive Gregory STV counter """ def do_surplus(self, candidate_surplus, count_card, surplus): next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes = self.next_preferences(count_card.parcels) if self.options['papers'] == 'transferable': __pragma__('opov') transferable_votes = total_votes - exhausted_votes __pragma__('noopov') for candidate, x in next_preferences.items(): cand_ballots = x[0] num_ballots = x[1] num_votes = x[2] new_parcel = [] if len(cand_ballots) > 0: __pragma__('opov') self.candidates[candidate].parcels.append(new_parcel) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_votes * surplus) / transferable_votes) else: tv = self.round_tv(surplus / transferable_votes) self.candidates[candidate].transfers += self.round_votes(num_votes * tv) else: self.candidates[candidate].transfers += self.round_votes(num_votes) # Do not allow weight to increase else: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_votes * surplus) / total_votes) else: tv = self.round_tv(surplus / total_votes) self.candidates[candidate].transfers += self.round_votes(num_votes * tv) __pragma__('noopov') for ballot, ballot_value in cand_ballots: __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: new_value = (ballot_value * surplus) / transferable_votes else: tv = self.round_tv(surplus / transferable_votes) new_value = ballot_value * tv else: new_value = ballot_value else: if self.options['round_tvs'] is None: new_value = (ballot_value * surplus) / total_votes else: tv = self.round_tv(surplus / total_votes) new_value = ballot_value * tv new_parcel.append((ballot, self.round_weight(new_value))) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: pass # No ballots exhaust else: self.exhausted.transfers += self.round_votes((surplus - transferable_votes)) else: self.exhausted.transfers += self.round_votes((exhausted_votes * surplus) / total_votes) __pragma__('noopov') __pragma__('opov') count_card.transfers -= surplus __pragma__('noopov') count_card.state = CandidateState.ELECTED def do_exclusion(self, candidates_excluded): # Optimisation: Pre-sort exclusion ballots if applicable # self._exclusion[1] -> list of ballots-per-stage, ballots-per-stage = List[Tuple[Candidate,List[Ballot+Value]]] if self._exclusion is None: if self.options['exclusion'] == 'one_round': self._exclusion = (candidates_excluded, [[(c, [b for p in cc.parcels for b in p]) for c, cc in candidates_excluded]]) elif self.options['exclusion'] == 'parcels_by_order': c, cc = candidates_excluded[0] self._exclusion = (candidates_excluded, [[(c, p)] for p in cc.parcels]) elif self.options['exclusion'] == 'by_value': ballots = [(c, b, bv) for c, cc in candidates_excluded for p in cc.parcels for b, bv in p] # Sort ballots by value __pragma__('opov') ballots.sort(key=lambda x: x[2] / x[1].value, reverse=True) # Round to 8 decimal places to consider equality # FIXME: Work out a better way of doing this if self.options['round_tvs']: ballots_by_value = groupby(ballots, lambda x: self.round_tv(x[2] / x[1].value)) else: ballots_by_value = groupby(ballots, lambda x: (x[2] / x[1].value).round(8, x[2].ROUND_DOWN)) __pragma__('noopov') # TODO: Can we combine ballots for each candidate within each stage? self._exclusion = (candidates_excluded, [[(c, [(b, bv)]) for c, b, bv in x] for x in ballots_by_value]) else: raise STVException('Invalid exclusion mode') #print([[bv / b.value for c, bb in stage for b, bv in bb] for stage in self._exclusion[1]]) this_exclusion = self._exclusion[1][0] self._exclusion[1].remove(this_exclusion) # Transfer votes next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes = self.next_preferences([bb for c, bb in this_exclusion]) for candidate, x in next_preferences.items(): cand_ballots, num_ballots, num_votes = x[0], x[1], x[2] new_parcel = [] if len(cand_ballots) > 0: __pragma__('opov') self.candidates[candidate].parcels.append(new_parcel) __pragma__('noopov') __pragma__('opov') self.candidates[candidate].transfers += self.round_votes(num_votes) __pragma__('noopov') for ballot, ballot_value in cand_ballots: __pragma__('opov') new_parcel.append((ballot, ballot_value)) __pragma__('noopov') # Subtract votes __pragma__('opov') self.exhausted.transfers += self.round_votes(exhausted_votes) __pragma__('noopov') for candidate, ballots in this_exclusion: total_votes = Num(0) for ballot, ballot_value in ballots: __pragma__('opov') total_votes += ballot_value __pragma__('noopov') __pragma__('opov') self.candidates[candidate].transfers -= total_votes __pragma__('noopov') if len(self._exclusion[1]) == 0: for candidate_excluded, count_card in candidates_excluded: __pragma__('opov') count_card.transfers -= count_card.votes __pragma__('noopov') count_card.state = CandidateState.EXCLUDED self._exclusion = None class UIGSTVCounter(WIGSTVCounter): """ Basic unweighted inclusive Gregory STV counter """ def __init__(self, *args): WIGSTVCounter.__init__(self, *args) def do_surplus(self, candidate_surplus, count_card, surplus): next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes = self.next_preferences(count_card.parcels) if self.options['papers'] == 'transferable': __pragma__('opov') transferable_ballots = total_ballots - exhausted_ballots transferable_votes = total_votes - exhausted_votes __pragma__('noopov') for candidate, x in next_preferences.items(): cand_ballots = x[0] num_ballots = x[1] num_votes = x[2] new_parcel = [] if len(cand_ballots) > 0: __pragma__('opov') self.candidates[candidate].parcels.append(new_parcel) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_ballots * surplus) / transferable_ballots) else: tv = self.round_tv(surplus / transferable_ballots) self.candidates[candidate].transfers += self.round_votes(num_ballots * tv) else: self.candidates[candidate].transfers += self.round_votes(num_votes) else: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_ballots * surplus) / total_ballots) else: tv = self.round_tv(surplus / total_ballots) self.candidates[candidate].transfers += self.round_votes(num_ballots * tv) __pragma__('noopov') for ballot, ballot_value in cand_ballots: __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: new_value = (ballot.value * surplus) / transferable_ballots else: tv = self.round_tv(surplus / transferable_ballots) new_value = ballot.value * tv else: new_value = ballot_value else: if self.options['round_tvs'] is None: new_value = (ballot.value * surplus) / total_ballots else: tv = self.round_tv(surplus / total_ballots) new_value = ballot.value * tv new_parcel.append((ballot, self.round_weight(new_value))) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: pass # No ballots exhaust else: self.exhausted.transfers += self.round_votes(surplus - transferable_votes) else: self.exhausted.transfers += self.round_votes((exhausted_ballots * surplus) / total_ballots) __pragma__('noopov') __pragma__('opov') count_card.transfers -= surplus __pragma__('noopov') count_card.state = CandidateState.ELECTED class EGSTVCounter(UIGSTVCounter): """ Exclusive Gregory (last bundle) STV implementation """ def do_surplus(self, candidate_surplus, count_card, surplus): """Overrides UIGSTVCounter.do_surplus""" last_bundle = count_card.parcels[len(count_card.parcels)-1] next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes = self.next_preferences([last_bundle]) if self.options['papers'] == 'transferable': __pragma__('opov') transferable_ballots = total_ballots - exhausted_ballots transferable_votes = total_votes - exhausted_votes __pragma__('noopov') for candidate, x in next_preferences.items(): cand_ballots = x[0] num_ballots = x[1] num_votes = x[2] new_parcel = [] if len(cand_ballots) > 0: __pragma__('opov') self.candidates[candidate].parcels.append(new_parcel) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_ballots * surplus) / transferable_ballots) else: tv = self.round_tv(surplus / transferable_ballots) self.candidates[candidate].transfers += self.round_votes(num_ballots * tv) else: self.candidates[candidate].transfers += self.round_votes(num_votes) else: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_ballots * surplus) / total_ballots) else: tv = self.round_tv(surplus / total_ballots) self.candidates[candidate].transfers += self.round_votes(num_ballots * tv) __pragma__('noopov') for ballot, ballot_value in cand_ballots: __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: new_value = (ballot.value * surplus) / transferable_ballots else: tv = self.round_tv(surplus / transferable_ballots) new_value = ballot.value * tv else: new_value = ballot_value else: if self.options['round_tvs'] is None: new_value = (ballot.value * surplus) / total_ballots else: tv = self.round_tv(surplus / total_ballots) new_value = ballot.value * tv new_parcel.append((ballot, self.round_weight(new_value))) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: pass # No ballots exhaust else: self.exhausted.transfers += self.round_votes((surplus - transferable_votes)) else: self.exhausted.transfers += self.round_votes((exhausted_ballots * surplus) / total_ballots) __pragma__('noopov') __pragma__('opov') count_card.transfers -= surplus __pragma__('noopov') count_card.state = CandidateState.ELECTED