2021-01-08 19:21:33 +11:00
# pyRCV2: Preferential vote counting
# Copyright © 2020–2021 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
__pragma__ = lambda x : None
is_py = False
__pragma__ ( ' skip ' )
is_py = True
__pragma__ ( ' noskip ' )
from pyRCV2 . method . base_stv import BaseSTVCounter , STVException
from pyRCV2 . model import CandidateState
from pyRCV2 . numbers import Num
# Stubs for JS
def groupby ( iterable , keyfunc ) :
if is_py :
__pragma__ ( ' skip ' )
import itertools
return [ list ( g ) for k , g in itertools . groupby ( iterable , keyfunc ) ]
__pragma__ ( ' noskip ' )
2021-01-12 03:15:47 +11:00
else : # pragma: no cover
2021-01-08 19:21:33 +11:00
groups = [ ]
group = [ ]
last_result = None
for i in iterable :
this_result = keyfunc ( i )
__pragma__ ( ' opov ' )
if last_result is not None and this_result != last_result :
__pragma__ ( ' noopov ' )
groups . append ( group )
group = [ ]
last_result = this_result
group . append ( i )
if group :
groups . append ( group )
return groups
class WIGSTVCounter ( BaseSTVCounter ) :
"""
Basic weighted inclusive Gregory STV counter
"""
2021-01-09 23:58:25 +11:00
def describe_options ( self ) :
# WIG is the default
return BaseSTVCounter . describe_options ( self )
2021-01-08 19:21:33 +11:00
def do_surplus ( self , candidate_surplus , count_card , surplus ) :
next_preferences , total_ballots , total_votes , next_exhausted , exhausted_ballots , exhausted_votes = self . next_preferences ( count_card . parcels )
if self . options [ ' papers ' ] == ' transferable ' :
__pragma__ ( ' opov ' )
transferable_votes = total_votes - exhausted_votes
__pragma__ ( ' noopov ' )
2021-01-10 02:05:02 +11:00
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + ( surplus / transferable_votes ) . pp ( 2 ) + ' . ' )
else :
tv = self . round_tv ( surplus / transferable_votes )
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + tv . pp ( 2 ) + ' . ' )
else :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value received. ' )
else :
if self . options [ ' round_tvs ' ] is None :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + ( surplus / total_votes ) . pp ( 2 ) + ' . ' )
else :
tv = self . round_tv ( surplus / total_votes )
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + tv . pp ( 2 ) + ' . ' )
__pragma__ ( ' noopov ' )
2021-01-08 19:21:33 +11:00
for candidate , x in next_preferences . items ( ) :
cand_ballots = x [ 0 ]
num_ballots = x [ 1 ]
num_votes = x [ 2 ]
new_parcel = [ ]
if len ( cand_ballots ) > 0 :
__pragma__ ( ' opov ' )
self . candidates [ candidate ] . parcels . append ( new_parcel )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
self . candidates [ candidate ] . transfers + = self . round_votes ( ( num_votes * surplus ) / transferable_votes )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_votes * tv )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_votes ) # Do not allow weight to increase
else :
if self . options [ ' round_tvs ' ] is None :
self . candidates [ candidate ] . transfers + = self . round_votes ( ( num_votes * surplus ) / total_votes )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_votes * tv )
__pragma__ ( ' noopov ' )
for ballot , ballot_value in cand_ballots :
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
new_value = ( ballot_value * surplus ) / transferable_votes
else :
tv = self . round_tv ( surplus / transferable_votes )
new_value = ballot_value * tv
else :
new_value = ballot_value
else :
if self . options [ ' round_tvs ' ] is None :
new_value = ( ballot_value * surplus ) / total_votes
else :
tv = self . round_tv ( surplus / total_votes )
new_value = ballot_value * tv
new_parcel . append ( ( ballot , self . round_weight ( new_value ) ) )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
pass # No ballots exhaust
else :
self . exhausted . transfers + = self . round_votes ( ( surplus - transferable_votes ) )
else :
self . exhausted . transfers + = self . round_votes ( ( exhausted_votes * surplus ) / total_votes )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
count_card . transfers - = surplus
__pragma__ ( ' noopov ' )
count_card . state = CandidateState . ELECTED
def do_exclusion ( self , candidates_excluded ) :
# Optimisation: Pre-sort exclusion ballots if applicable
# self._exclusion[1] -> list of ballots-per-stage, ballots-per-stage = List[Tuple[Candidate,List[Ballot+Value]]]
if self . _exclusion is None :
if self . options [ ' exclusion ' ] == ' one_round ' :
self . _exclusion = ( candidates_excluded , [ [ ( c , [ b for p in cc . parcels for b in p ] ) for c , cc in candidates_excluded ] ] )
elif self . options [ ' exclusion ' ] == ' parcels_by_order ' :
c , cc = candidates_excluded [ 0 ]
self . _exclusion = ( candidates_excluded , [ [ ( c , p ) ] for p in cc . parcels ] )
elif self . options [ ' exclusion ' ] == ' by_value ' :
ballots = [ ( c , b , bv ) for c , cc in candidates_excluded for p in cc . parcels for b , bv in p ]
# Sort ballots by value
__pragma__ ( ' opov ' )
ballots . sort ( key = lambda x : x [ 2 ] / x [ 1 ] . value , reverse = True )
# Round to 8 decimal places to consider equality
# FIXME: Work out a better way of doing this
if self . options [ ' round_tvs ' ] :
ballots_by_value = groupby ( ballots , lambda x : self . round_tv ( x [ 2 ] / x [ 1 ] . value ) )
else :
ballots_by_value = groupby ( ballots , lambda x : ( x [ 2 ] / x [ 1 ] . value ) . round ( 8 , x [ 2 ] . ROUND_DOWN ) )
__pragma__ ( ' noopov ' )
# TODO: Can we combine ballots for each candidate within each stage?
self . _exclusion = ( candidates_excluded , [ [ ( c , [ ( b , bv ) ] ) for c , b , bv in x ] for x in ballots_by_value ] )
else :
2021-01-12 03:15:47 +11:00
raise STVException ( ' Invalid exclusion mode ' ) # pragma: no cover
2021-01-08 19:21:33 +11:00
this_exclusion = self . _exclusion [ 1 ] [ 0 ]
self . _exclusion [ 1 ] . remove ( this_exclusion )
# Transfer votes
next_preferences , total_ballots , total_votes , next_exhausted , exhausted_ballots , exhausted_votes = self . next_preferences ( [ bb for c , bb in this_exclusion ] )
2021-01-09 04:33:13 +11:00
if self . options [ ' exclusion ' ] != ' one_round ' :
__pragma__ ( ' opov ' )
self . logs . append ( ' Transferring ' + total_ballots . pp ( 0 ) + ' ballot papers, totalling ' + total_votes . pp ( 2 ) + ' votes, received at value ' + ( this_exclusion [ 0 ] [ 1 ] [ 0 ] [ 1 ] / this_exclusion [ 0 ] [ 1 ] [ 0 ] [ 0 ] . value ) . pp ( 2 ) + ' . ' )
__pragma__ ( ' noopov ' )
2021-01-08 19:21:33 +11:00
for candidate , x in next_preferences . items ( ) :
cand_ballots , num_ballots , num_votes = x [ 0 ] , x [ 1 ] , x [ 2 ]
new_parcel = [ ]
if len ( cand_ballots ) > 0 :
__pragma__ ( ' opov ' )
self . candidates [ candidate ] . parcels . append ( new_parcel )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
self . candidates [ candidate ] . transfers + = self . round_votes ( num_votes )
__pragma__ ( ' noopov ' )
for ballot , ballot_value in cand_ballots :
__pragma__ ( ' opov ' )
new_parcel . append ( ( ballot , ballot_value ) )
__pragma__ ( ' noopov ' )
# Subtract votes
__pragma__ ( ' opov ' )
self . exhausted . transfers + = self . round_votes ( exhausted_votes )
__pragma__ ( ' noopov ' )
for candidate , ballots in this_exclusion :
total_votes = Num ( 0 )
for ballot , ballot_value in ballots :
__pragma__ ( ' opov ' )
total_votes + = ballot_value
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
self . candidates [ candidate ] . transfers - = total_votes
__pragma__ ( ' noopov ' )
if len ( self . _exclusion [ 1 ] ) == 0 :
2021-01-09 04:33:13 +11:00
if self . options [ ' exclusion ' ] != ' one_round ' :
self . logs . append ( ' Exclusion complete. ' )
2021-01-08 19:21:33 +11:00
for candidate_excluded , count_card in candidates_excluded :
__pragma__ ( ' opov ' )
count_card . transfers - = count_card . votes
__pragma__ ( ' noopov ' )
count_card . state = CandidateState . EXCLUDED
self . _exclusion = None
class UIGSTVCounter ( WIGSTVCounter ) :
"""
Basic unweighted inclusive Gregory STV counter
"""
2021-01-09 23:58:25 +11:00
def describe_options ( self ) :
""" Overrides BaseSTVCounter.describe_options """
return ' --method uig ' + BaseSTVCounter . describe_options ( self )
2021-01-08 19:21:33 +11:00
def __init__ ( self , * args ) :
WIGSTVCounter . __init__ ( self , * args )
def do_surplus ( self , candidate_surplus , count_card , surplus ) :
next_preferences , total_ballots , total_votes , next_exhausted , exhausted_ballots , exhausted_votes = self . next_preferences ( count_card . parcels )
if self . options [ ' papers ' ] == ' transferable ' :
__pragma__ ( ' opov ' )
transferable_ballots = total_ballots - exhausted_ballots
transferable_votes = total_votes - exhausted_votes
__pragma__ ( ' noopov ' )
2021-01-10 02:05:02 +11:00
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + ( surplus / transferable_ballots ) . pp ( 2 ) + ' . ' )
else :
tv = self . round_tv ( surplus / transferable_ballots )
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + tv . pp ( 2 ) + ' . ' )
else :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value received. ' )
else :
if self . options [ ' round_tvs ' ] is None :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + ( surplus / total_ballots ) . pp ( 2 ) + ' . ' )
else :
tv = self . round_tv ( surplus / total_ballots )
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + tv . pp ( 2 ) + ' . ' )
__pragma__ ( ' noopov ' )
2021-01-08 19:21:33 +11:00
for candidate , x in next_preferences . items ( ) :
cand_ballots = x [ 0 ]
num_ballots = x [ 1 ]
num_votes = x [ 2 ]
new_parcel = [ ]
if len ( cand_ballots ) > 0 :
__pragma__ ( ' opov ' )
self . candidates [ candidate ] . parcels . append ( new_parcel )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
self . candidates [ candidate ] . transfers + = self . round_votes ( ( num_ballots * surplus ) / transferable_ballots )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_ballots * tv )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_votes )
else :
if self . options [ ' round_tvs ' ] is None :
self . candidates [ candidate ] . transfers + = self . round_votes ( ( num_ballots * surplus ) / total_ballots )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_ballots * tv )
__pragma__ ( ' noopov ' )
for ballot , ballot_value in cand_ballots :
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
new_value = ( ballot . value * surplus ) / transferable_ballots
else :
tv = self . round_tv ( surplus / transferable_ballots )
new_value = ballot . value * tv
else :
new_value = ballot_value
else :
if self . options [ ' round_tvs ' ] is None :
new_value = ( ballot . value * surplus ) / total_ballots
else :
tv = self . round_tv ( surplus / total_ballots )
new_value = ballot . value * tv
new_parcel . append ( ( ballot , self . round_weight ( new_value ) ) )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
pass # No ballots exhaust
else :
self . exhausted . transfers + = self . round_votes ( surplus - transferable_votes )
else :
self . exhausted . transfers + = self . round_votes ( ( exhausted_ballots * surplus ) / total_ballots )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
count_card . transfers - = surplus
__pragma__ ( ' noopov ' )
count_card . state = CandidateState . ELECTED
class EGSTVCounter ( UIGSTVCounter ) :
"""
Exclusive Gregory ( last bundle ) STV implementation
"""
2021-01-09 23:58:25 +11:00
def describe_options ( self ) :
""" Overrides BaseSTVCounter.describe_options """
return ' --method eg ' + BaseSTVCounter . describe_options ( self )
2021-01-08 19:21:33 +11:00
def do_surplus ( self , candidate_surplus , count_card , surplus ) :
""" Overrides UIGSTVCounter.do_surplus """
last_bundle = count_card . parcels [ len ( count_card . parcels ) - 1 ]
next_preferences , total_ballots , total_votes , next_exhausted , exhausted_ballots , exhausted_votes = self . next_preferences ( [ last_bundle ] )
if self . options [ ' papers ' ] == ' transferable ' :
__pragma__ ( ' opov ' )
transferable_ballots = total_ballots - exhausted_ballots
transferable_votes = total_votes - exhausted_votes
__pragma__ ( ' noopov ' )
2021-01-10 02:05:02 +11:00
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + ( surplus / transferable_ballots ) . pp ( 2 ) + ' . ' )
else :
tv = self . round_tv ( surplus / transferable_ballots )
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + tv . pp ( 2 ) + ' . ' )
else :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value received. ' )
else :
if self . options [ ' round_tvs ' ] is None :
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + ( surplus / total_ballots ) . pp ( 2 ) + ' . ' )
else :
tv = self . round_tv ( surplus / total_ballots )
self . logs . append ( ' Surplus of ' + candidate_surplus . name + ' distributed at value ' + tv . pp ( 2 ) + ' . ' )
__pragma__ ( ' noopov ' )
2021-01-08 19:21:33 +11:00
for candidate , x in next_preferences . items ( ) :
cand_ballots = x [ 0 ]
num_ballots = x [ 1 ]
num_votes = x [ 2 ]
new_parcel = [ ]
if len ( cand_ballots ) > 0 :
__pragma__ ( ' opov ' )
self . candidates [ candidate ] . parcels . append ( new_parcel )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
self . candidates [ candidate ] . transfers + = self . round_votes ( ( num_ballots * surplus ) / transferable_ballots )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_ballots * tv )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_votes )
else :
if self . options [ ' round_tvs ' ] is None :
self . candidates [ candidate ] . transfers + = self . round_votes ( ( num_ballots * surplus ) / total_ballots )
else :
self . candidates [ candidate ] . transfers + = self . round_votes ( num_ballots * tv )
__pragma__ ( ' noopov ' )
for ballot , ballot_value in cand_ballots :
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
if self . options [ ' round_tvs ' ] is None :
new_value = ( ballot . value * surplus ) / transferable_ballots
else :
tv = self . round_tv ( surplus / transferable_ballots )
new_value = ballot . value * tv
else :
new_value = ballot_value
else :
if self . options [ ' round_tvs ' ] is None :
new_value = ( ballot . value * surplus ) / total_ballots
else :
tv = self . round_tv ( surplus / total_ballots )
new_value = ballot . value * tv
new_parcel . append ( ( ballot , self . round_weight ( new_value ) ) )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
if self . options [ ' papers ' ] == ' transferable ' :
if transferable_votes > surplus :
pass # No ballots exhaust
else :
self . exhausted . transfers + = self . round_votes ( ( surplus - transferable_votes ) )
else :
self . exhausted . transfers + = self . round_votes ( ( exhausted_ballots * surplus ) / total_ballots )
__pragma__ ( ' noopov ' )
__pragma__ ( ' opov ' )
count_card . transfers - = surplus
__pragma__ ( ' noopov ' )
count_card . state = CandidateState . ELECTED