# pyRCV2: Preferential vote counting # Copyright © 2020–2021 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . __pragma__ = lambda x: None is_py = False __pragma__('skip') is_py = True __pragma__('noskip') from pyRCV2.method.base_stv import BaseSTVCounter, STVException from pyRCV2.model import CandidateState from pyRCV2.numbers import Num # Stubs for JS def groupby(iterable, keyfunc): if is_py: __pragma__('skip') import itertools return [list(g) for k, g in itertools.groupby(iterable, keyfunc)] __pragma__('noskip') else: groups = [] group = [] last_result = None for i in iterable: this_result = keyfunc(i) __pragma__('opov') if last_result is not None and this_result != last_result: __pragma__('noopov') groups.append(group) group = [] last_result = this_result group.append(i) if group: groups.append(group) return groups class WIGSTVCounter(BaseSTVCounter): """ Basic weighted inclusive Gregory STV counter """ def do_surplus(self, candidate_surplus, count_card, surplus): next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes = self.next_preferences(count_card.parcels) if self.options['papers'] == 'transferable': __pragma__('opov') transferable_votes = total_votes - exhausted_votes __pragma__('noopov') for candidate, x in next_preferences.items(): cand_ballots = x[0] num_ballots = x[1] num_votes = x[2] new_parcel = [] if len(cand_ballots) > 0: __pragma__('opov') self.candidates[candidate].parcels.append(new_parcel) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_votes * surplus) / transferable_votes) else: tv = self.round_tv(surplus / transferable_votes) self.candidates[candidate].transfers += self.round_votes(num_votes * tv) else: self.candidates[candidate].transfers += self.round_votes(num_votes) # Do not allow weight to increase else: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_votes * surplus) / total_votes) else: tv = self.round_tv(surplus / total_votes) self.candidates[candidate].transfers += self.round_votes(num_votes * tv) __pragma__('noopov') for ballot, ballot_value in cand_ballots: __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: new_value = (ballot_value * surplus) / transferable_votes else: tv = self.round_tv(surplus / transferable_votes) new_value = ballot_value * tv else: new_value = ballot_value else: if self.options['round_tvs'] is None: new_value = (ballot_value * surplus) / total_votes else: tv = self.round_tv(surplus / total_votes) new_value = ballot_value * tv new_parcel.append((ballot, self.round_weight(new_value))) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: pass # No ballots exhaust else: self.exhausted.transfers += self.round_votes((surplus - transferable_votes)) else: self.exhausted.transfers += self.round_votes((exhausted_votes * surplus) / total_votes) __pragma__('noopov') __pragma__('opov') count_card.transfers -= surplus __pragma__('noopov') count_card.state = CandidateState.ELECTED def do_exclusion(self, candidates_excluded): # Optimisation: Pre-sort exclusion ballots if applicable # self._exclusion[1] -> list of ballots-per-stage, ballots-per-stage = List[Tuple[Candidate,List[Ballot+Value]]] if self._exclusion is None: if self.options['exclusion'] == 'one_round': self._exclusion = (candidates_excluded, [[(c, [b for p in cc.parcels for b in p]) for c, cc in candidates_excluded]]) elif self.options['exclusion'] == 'parcels_by_order': c, cc = candidates_excluded[0] self._exclusion = (candidates_excluded, [[(c, p)] for p in cc.parcels]) elif self.options['exclusion'] == 'by_value': ballots = [(c, b, bv) for c, cc in candidates_excluded for p in cc.parcels for b, bv in p] # Sort ballots by value __pragma__('opov') ballots.sort(key=lambda x: x[2] / x[1].value, reverse=True) # Round to 8 decimal places to consider equality # FIXME: Work out a better way of doing this if self.options['round_tvs']: ballots_by_value = groupby(ballots, lambda x: self.round_tv(x[2] / x[1].value)) else: ballots_by_value = groupby(ballots, lambda x: (x[2] / x[1].value).round(8, x[2].ROUND_DOWN)) __pragma__('noopov') # TODO: Can we combine ballots for each candidate within each stage? self._exclusion = (candidates_excluded, [[(c, [(b, bv)]) for c, b, bv in x] for x in ballots_by_value]) else: raise STVException('Invalid exclusion mode') #print([[bv / b.value for c, bb in stage for b, bv in bb] for stage in self._exclusion[1]]) this_exclusion = self._exclusion[1][0] self._exclusion[1].remove(this_exclusion) # Transfer votes next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes = self.next_preferences([bb for c, bb in this_exclusion]) for candidate, x in next_preferences.items(): cand_ballots, num_ballots, num_votes = x[0], x[1], x[2] new_parcel = [] if len(cand_ballots) > 0: __pragma__('opov') self.candidates[candidate].parcels.append(new_parcel) __pragma__('noopov') __pragma__('opov') self.candidates[candidate].transfers += self.round_votes(num_votes) __pragma__('noopov') for ballot, ballot_value in cand_ballots: __pragma__('opov') new_parcel.append((ballot, ballot_value)) __pragma__('noopov') # Subtract votes __pragma__('opov') self.exhausted.transfers += self.round_votes(exhausted_votes) __pragma__('noopov') for candidate, ballots in this_exclusion: total_votes = Num(0) for ballot, ballot_value in ballots: __pragma__('opov') total_votes += ballot_value __pragma__('noopov') __pragma__('opov') self.candidates[candidate].transfers -= total_votes __pragma__('noopov') if len(self._exclusion[1]) == 0: for candidate_excluded, count_card in candidates_excluded: __pragma__('opov') count_card.transfers -= count_card.votes __pragma__('noopov') count_card.state = CandidateState.EXCLUDED self._exclusion = None class UIGSTVCounter(WIGSTVCounter): """ Basic unweighted inclusive Gregory STV counter """ def __init__(self, *args): WIGSTVCounter.__init__(self, *args) def do_surplus(self, candidate_surplus, count_card, surplus): next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes = self.next_preferences(count_card.parcels) if self.options['papers'] == 'transferable': __pragma__('opov') transferable_ballots = total_ballots - exhausted_ballots transferable_votes = total_votes - exhausted_votes __pragma__('noopov') for candidate, x in next_preferences.items(): cand_ballots = x[0] num_ballots = x[1] num_votes = x[2] new_parcel = [] if len(cand_ballots) > 0: __pragma__('opov') self.candidates[candidate].parcels.append(new_parcel) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_ballots * surplus) / transferable_ballots) else: tv = self.round_tv(surplus / transferable_ballots) self.candidates[candidate].transfers += self.round_votes(num_ballots * tv) else: self.candidates[candidate].transfers += self.round_votes(num_votes) else: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_ballots * surplus) / total_ballots) else: tv = self.round_tv(surplus / total_ballots) self.candidates[candidate].transfers += self.round_votes(num_ballots * tv) __pragma__('noopov') for ballot, ballot_value in cand_ballots: __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: new_value = (ballot.value * surplus) / transferable_ballots else: tv = self.round_tv(surplus / transferable_ballots) new_value = ballot.value * tv else: new_value = ballot_value else: if self.options['round_tvs'] is None: new_value = (ballot.value * surplus) / total_ballots else: tv = self.round_tv(surplus / total_ballots) new_value = ballot.value * tv new_parcel.append((ballot, self.round_weight(new_value))) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: pass # No ballots exhaust else: self.exhausted.transfers += self.round_votes(surplus - transferable_votes) else: self.exhausted.transfers += self.round_votes((exhausted_ballots * surplus) / total_ballots) __pragma__('noopov') __pragma__('opov') count_card.transfers -= surplus __pragma__('noopov') count_card.state = CandidateState.ELECTED class EGSTVCounter(UIGSTVCounter): """ Exclusive Gregory (last bundle) STV implementation """ def do_surplus(self, candidate_surplus, count_card, surplus): """Overrides UIGSTVCounter.do_surplus""" last_bundle = count_card.parcels[len(count_card.parcels)-1] next_preferences, total_ballots, total_votes, next_exhausted, exhausted_ballots, exhausted_votes = self.next_preferences([last_bundle]) if self.options['papers'] == 'transferable': __pragma__('opov') transferable_ballots = total_ballots - exhausted_ballots transferable_votes = total_votes - exhausted_votes __pragma__('noopov') for candidate, x in next_preferences.items(): cand_ballots = x[0] num_ballots = x[1] num_votes = x[2] new_parcel = [] if len(cand_ballots) > 0: __pragma__('opov') self.candidates[candidate].parcels.append(new_parcel) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_ballots * surplus) / transferable_ballots) else: tv = self.round_tv(surplus / transferable_ballots) self.candidates[candidate].transfers += self.round_votes(num_ballots * tv) else: self.candidates[candidate].transfers += self.round_votes(num_votes) else: if self.options['round_tvs'] is None: self.candidates[candidate].transfers += self.round_votes((num_ballots * surplus) / total_ballots) else: tv = self.round_tv(surplus / total_ballots) self.candidates[candidate].transfers += self.round_votes(num_ballots * tv) __pragma__('noopov') for ballot, ballot_value in cand_ballots: __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: if self.options['round_tvs'] is None: new_value = (ballot.value * surplus) / transferable_ballots else: tv = self.round_tv(surplus / transferable_ballots) new_value = ballot.value * tv else: new_value = ballot_value else: if self.options['round_tvs'] is None: new_value = (ballot.value * surplus) / total_ballots else: tv = self.round_tv(surplus / total_ballots) new_value = ballot.value * tv new_parcel.append((ballot, self.round_weight(new_value))) __pragma__('noopov') __pragma__('opov') if self.options['papers'] == 'transferable': if transferable_votes > surplus: pass # No ballots exhaust else: self.exhausted.transfers += self.round_votes((surplus - transferable_votes)) else: self.exhausted.transfers += self.round_votes((exhausted_ballots * surplus) / total_ballots) __pragma__('noopov') __pragma__('opov') count_card.transfers -= surplus __pragma__('noopov') count_card.state = CandidateState.ELECTED