2022-10-13 15:57:56 +11:00
# scipy-yli: Helpful SciPy utilities and recipes
# Copyright © 2022 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import numpy as np
2022-10-13 16:24:01 +11:00
import pandas as pd
2022-10-13 15:57:56 +11:00
from scipy import stats
2022-10-13 17:23:29 +11:00
import statsmodels
2022-10-13 15:57:56 +11:00
import statsmodels . api as sm
from statsmodels . iolib . table import SimpleTable
2022-10-13 16:24:01 +11:00
from statsmodels . stats . outliers_influence import variance_inflation_factor
2022-10-13 15:57:56 +11:00
from datetime import datetime
import itertools
2022-10-14 14:09:29 +11:00
from . bayes_factors import BayesFactor , bayesfactor_afbf
2022-10-14 20:18:25 +11:00
from . sig_tests import FTestResult
2022-10-15 01:09:40 +11:00
from . utils import Estimate , check_nan , cols_for_formula , fmt_p , formula_factor_ref_category , parse_patsy_term
2022-10-13 15:57:56 +11:00
2022-10-14 14:08:51 +11:00
def vif ( df , formula = None , nan_policy = ' warn ' ) :
"""
Calculate the variance inflation factor for each variable in df
formula : If specified , calculate the VIF only for the variables in the formula
"""
if formula :
# Only consider columns in the formula
2022-10-15 01:09:40 +11:00
df = df [ cols_for_formula ( formula , df ) ]
2022-10-13 16:24:01 +11:00
# Check for/clean NaNs
df = check_nan ( df , nan_policy )
# Convert all to float64 otherwise statsmodels chokes with "ufunc 'isfinite' not supported for the input types ..."
df = pd . get_dummies ( df , drop_first = True ) # Convert categorical dtypes
df = df . astype ( ' float64 ' ) # Convert all other dtypes
# Add intercept column
orig_columns = list ( df . columns )
df [ ' Intercept ' ] = [ 1 ] * len ( df )
vifs = { }
for i , col in enumerate ( orig_columns ) :
vifs [ col ] = variance_inflation_factor ( df , i )
return pd . Series ( vifs )
2022-10-13 15:57:56 +11:00
# ----------
# Regression
class LikelihoodRatioTestResult :
""" Result of a likelihood ratio test for regression """
def __init__ ( self , statistic , dof , pvalue ) :
self . statistic = statistic
self . dof = dof
self . pvalue = pvalue
def _repr_html_ ( self ) :
2022-10-14 14:48:26 +11:00
return ' LR( {} ) = {:.2f} ; <i>p</i> {} ' . format ( self . dof , self . statistic , fmt_p ( self . pvalue , html = True ) )
2022-10-13 15:57:56 +11:00
def summary ( self ) :
2022-10-14 14:48:26 +11:00
return ' LR( {} ) = {:.2f} ; p {} ' . format ( self . dof , self . statistic , fmt_p ( self . pvalue , html = False ) )
2022-10-13 15:57:56 +11:00
class RegressionResult :
"""
Result of a regression
llf / llnull : Log - likelihood of model / null model
"""
def __init__ ( self ,
raw_result ,
full_name , model_name , fit_method ,
dep , nobs , dof_model , fitted_dt ,
2022-10-14 21:57:22 +11:00
terms ,
2022-10-13 15:57:56 +11:00
llf , llnull ,
dof_resid , rsquared , f_statistic ,
exp
) :
# A copy of the raw result so we can access it
self . raw_result = raw_result
# Information for display
self . full_name = full_name
self . model_name = model_name
self . fit_method = fit_method
# Basic fitted model information
self . dep = dep
self . nobs = nobs
self . dof_model = dof_model
self . fitted_dt = fitted_dt
2022-10-14 21:57:22 +11:00
# Regression coefficients/p values
self . terms = terms
2022-10-13 15:57:56 +11:00
# Model log-likelihood
self . llf = llf
self . llnull = llnull
# Extra statistics (not all regression models have these)
self . dof_resid = dof_resid
self . rsquared = rsquared
self . f_statistic = f_statistic
# Config for display style
self . exp = exp
@property
def pseudo_rsquared ( self ) :
""" McFadden ' s pseudo R-squared """
return 1 - self . llf / self . llnull
def lrtest_null ( self ) :
""" Compute the likelihood ratio test comparing the model to the null model """
statistic = - 2 * ( self . llnull - self . llf )
pvalue = 1 - stats . chi2 . cdf ( statistic , self . dof_model )
return LikelihoodRatioTestResult ( statistic , self . dof_model , pvalue )
def ftest ( self ) :
""" Perform the F test that all slopes are 0 """
pvalue = 1 - stats . f ( self . dof_model , self . dof_resid ) . cdf ( self . f_statistic )
return FTestResult ( self . f_statistic , self . dof_model , self . dof_resid , pvalue )
2022-10-14 14:09:29 +11:00
def bayesfactor_beta_zero ( self , term ) :
"""
Compute Bayes factor testing the hypothesis that the given beta is 0
Requires statsmodels regression
"""
# Get parameters required for AFBF
2022-10-14 21:57:22 +11:00
params = pd . Series ( { term . raw_name . replace ( ' [ ' , ' _ ' ) . replace ( ' ] ' , ' _ ' ) : term . beta . point for term in self . terms . values ( ) } )
2022-10-14 14:09:29 +11:00
cov = self . raw_result . cov_params ( )
# Compute BF matrix
bf01 = bayesfactor_afbf ( params , cov , self . nobs , ' {} = 0 ' . format ( term . replace ( ' [ ' , ' _ ' ) . replace ( ' ] ' , ' _ ' ) ) )
bf01 = BayesFactor ( bf01 . factor , ' 0 ' , ' {} = 0 ' . format ( term ) , ' 1 ' , ' {} ≠ 0 ' . format ( term ) )
if bf01 . factor > = 1 :
return bf01
else :
return bf01 . invert ( )
2022-10-13 15:57:56 +11:00
def _header_table ( self , html ) :
""" Return the entries for the header table """
# Left column
left_col = [ ]
left_col . append ( ( ' Dep. Variable: ' , self . dep ) )
left_col . append ( ( ' Model: ' , self . model_name ) )
left_col . append ( ( ' Method: ' , self . fit_method ) )
left_col . append ( ( ' Date: ' , self . fitted_dt . strftime ( ' % Y- % m- %d ' ) ) )
left_col . append ( ( ' Time: ' , self . fitted_dt . strftime ( ' % H: % M: % S ' ) ) )
left_col . append ( ( ' No. Observations: ' , format ( self . nobs , ' .0f ' ) ) )
# Right column
right_col = [ ]
if self . dof_resid :
right_col . append ( ( ' Df. Residuals: ' , format ( self . dof_resid , ' .0f ' ) ) )
right_col . append ( ( ' Df. Model: ' , format ( self . dof_model , ' .0f ' ) ) )
if self . rsquared :
right_col . append ( ( ' <i>R</i><sup>2</sup>: ' if html else ' R²: ' , format ( self . rsquared , ' .2f ' ) ) )
else :
right_col . append ( ( ' Pseudo <i>R</i><sup>2</sup>: ' if html else ' Pseudo R²: ' , format ( self . pseudo_rsquared , ' .2f ' ) ) )
if self . f_statistic :
# Report the F test if available
f_result = self . ftest ( )
if html :
right_col . append ( ( ' <i>F</i>: ' , format ( f_result . statistic , ' .2f ' ) ) )
2022-10-14 14:48:26 +11:00
right_col . append ( ( ' <i>p</i> (<i>F</i>): ' , fmt_p ( f_result . pvalue , html = True , nospace = True ) ) )
2022-10-13 15:57:56 +11:00
else :
right_col . append ( ( ' F: ' , format ( f_result . statistic , ' .2f ' ) ) )
2022-10-14 14:48:26 +11:00
right_col . append ( ( ' p (F): ' , fmt_p ( f_result . pvalue , html = False , nospace = True ) ) )
2022-10-13 15:57:56 +11:00
else :
# Otherwise report likelihood ratio test as overall test
lrtest_result = self . lrtest_null ( )
right_col . append ( ( ' LL-Model: ' , format ( self . llf , ' .2f ' ) ) )
right_col . append ( ( ' LL-Null: ' , format ( self . llnull , ' .2f ' ) ) )
if html :
2022-10-14 14:48:26 +11:00
right_col . append ( ( ' <i>p</i> (LR): ' , fmt_p ( lrtest_result . pvalue , html = True , nospace = True ) ) )
2022-10-13 15:57:56 +11:00
else :
2022-10-14 14:48:26 +11:00
right_col . append ( ( ' p (LR): ' , fmt_p ( lrtest_result . pvalue , html = False , nospace = True ) ) )
2022-10-13 15:57:56 +11:00
return left_col , right_col
def _repr_html_ ( self ) :
# Render header table
left_col , right_col = self . _header_table ( html = True )
out = ' <table><caption> {} Results</caption> ' . format ( self . full_name )
for left_cell , right_cell in itertools . zip_longest ( left_col , right_col ) :
out + = ' <tr><th> {} </th><td> {} </td><th> {} </th><td> {} </td></tr> ' . format (
left_cell [ 0 ] if left_cell else ' ' ,
left_cell [ 1 ] if left_cell else ' ' ,
right_cell [ 0 ] if right_cell else ' ' ,
right_cell [ 1 ] if right_cell else ' '
)
out + = ' </table> '
# Render coefficients table
out + = ' <table><tr><th></th><th style= " text-align:center " > {} </th><th colspan= " 3 " style= " text-align:center " >(95 % CI)</th><th style= " text-align:center " ><i>p</i></th></tr> ' . format ( ' exp(<i>β</i>) ' if self . exp else ' <i>β</i> ' )
2022-10-14 21:57:22 +11:00
for term_name , term in self . terms . items ( ) :
if isinstance ( term , SingleTerm ) :
# Single term
# Exponentiate beta if requested
beta = term . beta
if self . exp :
beta = np . exp ( beta )
out + = ' <tr><th> {} </th><td> {:.2f} </td><td style= " padding-right:0 " >( {:.2f} </td><td>–</td><td style= " padding-left:0 " > {:.2f} )</td><td style= " text-align:left " > {} </td></tr> ' . format ( term_name , beta . point , beta . ci_lower , beta . ci_upper , fmt_p ( term . pvalue , html = True , nospace = True ) )
elif isinstance ( term , CategoricalTerm ) :
# Categorical term
out + = ' <tr><th> {} </th><td></td><td style= " padding-right:0 " ></td><td></td><td style= " padding-left:0 " ></td><td></td></tr> ' . format ( term_name )
# Render reference category
out + = ' <tr><td style= " text-align:right;font-style:italic " > {} </td><td>Ref.</td><td style= " padding-right:0 " ></td><td></td><td style= " padding-left:0 " ></td><td></td></tr> ' . format ( term . ref_category )
# Loop over terms
for sub_term_name , sub_term in term . categories . items ( ) :
# Exponentiate beta if requested
beta = sub_term . beta
if self . exp :
beta = np . exp ( beta )
out + = ' <tr><td style= " text-align:right;font-style:italic " > {} </td><td> {:.2f} </td><td style= " padding-right:0 " >( {:.2f} </td><td>–</td><td style= " padding-left:0 " > {:.2f} )</td><td style= " text-align:left " > {} </td></tr> ' . format ( sub_term_name , beta . point , beta . ci_lower , beta . ci_upper , fmt_p ( sub_term . pvalue , html = True , nospace = True ) )
else :
raise Exception ( ' Attempt to render unknown term type ' )
2022-10-13 15:57:56 +11:00
out + = ' </table> '
# TODO: Have a detailed view which shows SE, t/z, etc.
return out
def summary ( self ) :
# Render header table
left_col , right_col = self . _header_table ( html = False )
# Ensure equal sizes for SimpleTable
if len ( right_col ) > len ( left_col ) :
left_col . extend ( [ ( ' ' , ' ' ) ] * ( len ( right_col ) - len ( left_col ) ) )
elif len ( left_col ) > len ( right_col ) :
right_col . extend ( [ ( ' ' , ' ' ) ] * ( len ( left_col ) - len ( right_col ) ) )
table1 = SimpleTable ( np . concatenate ( [ left_col , right_col ] , axis = 1 ) , title = ' {} Results ' . format ( self . full_name ) )
table1 . insert_stubs ( 2 , [ ' | ' ] * len ( left_col ) )
# Get rid of last line (merge with next table)
table1_lines = table1 . as_text ( ) . splitlines ( keepends = False )
out = ' \n ' . join ( table1_lines [ : - 1 ] ) + ' \n '
# Render coefficients table
table_data = [ ]
2022-10-14 21:57:22 +11:00
for term_name , term in self . terms . items ( ) :
if isinstance ( term , SingleTerm ) :
# Single term
# Exponentiate beta if requested
beta = term . beta
if self . exp :
beta = np . exp ( beta )
# Add some extra padding
table_data . append ( [ term_name + ' ' , format ( beta . point , ' .2f ' ) , ' ( {:.2f} ' . format ( beta . ci_lower ) , ' - ' , ' {:.2f} ) ' . format ( beta . ci_upper ) , ' ' + fmt_p ( term . pvalue , html = False , nospace = True ) ] )
elif isinstance ( term , CategoricalTerm ) :
# Categorical term
table_data . append ( [ term_name + ' ' , ' ' , ' ' , ' ' , ' ' , ' ' ] )
# Render reference category
table_data . append ( [ ' {} ' . format ( term . ref_category ) , ' Ref. ' , ' ' , ' ' , ' ' , ' ' ] )
# Loop over terms
for sub_term_name , sub_term in term . categories . items ( ) :
# Exponentiate beta if requested
beta = sub_term . beta
if self . exp :
beta = np . exp ( beta )
table_data . append ( [ sub_term_name + ' ' , format ( beta . point , ' .2f ' ) , ' ( {:.2f} ' . format ( beta . ci_lower ) , ' - ' , ' {:.2f} ) ' . format ( beta . ci_upper ) , ' ' + fmt_p ( sub_term . pvalue , html = False , nospace = True ) ] )
else :
raise Exception ( ' Attempt to render unknown term type ' )
2022-10-13 15:57:56 +11:00
table2 = SimpleTable ( data = table_data , headers = [ ' ' , ' exp(β) ' if self . exp else ' β ' , ' ' , ' \ue000 ' , ' ' , ' p ' ] ) # U+E000 is in Private Use Area, mark middle of CI column
table2_text = table2 . as_text ( ) . replace ( ' \ue000 ' , ' (95 % CI) ' ) # Render heading in the right spot
table2_lines = table2_text . splitlines ( keepends = False )
# Render divider line between 2 tables
max_table_len = max ( len ( table1_lines [ - 1 ] ) , len ( table2_lines [ - 1 ] ) )
out + = ' = ' * max_table_len + ' \n '
out + = ' \n ' . join ( table2_lines [ 1 : ] )
return out
2022-10-14 21:57:22 +11:00
class SingleTerm :
"""
A term in a RegressionResult which is a single term
raw_name : The raw name of the term ( e . g . in RegressionResult . raw_result data )
beta : An Estimate of the coefficient
pvalue : The p value
"""
def __init__ ( self , raw_name , beta , pvalue ) :
self . raw_name = raw_name
self . beta = beta
self . pvalue = pvalue
class CategoricalTerm :
""" A group of terms in a RegressionResult corresponding to a categorical variable """
def __init__ ( self , categories , ref_category ) :
self . categories = categories
self . ref_category = ref_category
2022-10-13 15:57:56 +11:00
def regress (
model_class , df , dep , formula , * ,
nan_policy = ' warn ' , exp = None
) :
""" Fit a statsmodels regression model """
# Autodetect whether to exponentiate
if exp is None :
2022-10-13 17:23:29 +11:00
if model_class is sm . Logit or model_class is PenalisedLogit :
2022-10-13 15:57:56 +11:00
exp = True
else :
exp = False
# Check for/clean NaNs
2022-10-15 01:09:40 +11:00
df = df [ [ dep ] + cols_for_formula ( formula , df ) ]
2022-10-13 15:57:56 +11:00
df = check_nan ( df , nan_policy )
# Ensure numeric type for dependent variable
if df [ dep ] . dtype != ' float64 ' :
df [ dep ] = df [ dep ] . astype ( ' float64 ' )
# Convert pandas nullable types for independent variables
for col in df . columns :
if df [ col ] . dtype == ' Int64 ' :
df [ col ] = df [ col ] . astype ( ' float64 ' )
# Fit model
model = model_class . from_formula ( formula = dep + ' ~ ' + formula , data = df )
result = model . fit ( )
2022-10-13 17:23:29 +11:00
if isinstance ( result , RegressionResult ) :
# Already processed!
result . exp = exp
return result
2022-10-14 21:57:22 +11:00
# Process terms
terms = { }
2022-10-13 15:57:56 +11:00
confint = result . conf_int ( )
2022-10-14 21:57:22 +11:00
for raw_name , raw_beta in result . params . items ( ) :
beta = Estimate ( raw_beta , confint [ 0 ] [ raw_name ] , confint [ 1 ] [ raw_name ] )
# Rename terms
if raw_name == ' Intercept ' :
# Intercept term (single term)
term = ' (Intercept) '
terms [ term ] = SingleTerm ( raw_name , beta , result . pvalues [ raw_name ] )
else :
2022-10-15 01:09:40 +11:00
# Parse if required
factor , column , contrast = parse_patsy_term ( formula , df , raw_name )
if contrast is not None :
# Categorical term
# Add a new categorical term if not exists
if column not in terms :
ref_category = formula_factor_ref_category ( formula , df , factor )
terms [ column ] = CategoricalTerm ( { } , ref_category )
terms [ column ] . categories [ contrast ] = SingleTerm ( raw_name , beta , result . pvalues [ raw_name ] )
else :
# Single term
term = raw_name
terms [ term ] = SingleTerm ( raw_name , beta , result . pvalues [ raw_name ] )
2022-10-13 15:57:56 +11:00
# Fit null model (for llnull)
if hasattr ( result , ' llnull ' ) :
llnull = result . llnull
else :
result_null = model_class . from_formula ( formula = dep + ' ~ 1 ' , data = df ) . fit ( )
llnull = result_null . llf
# Parse raw regression results (to get fit method)
header_entries = np . vectorize ( str . strip ) ( np . concatenate ( np . split ( np . array ( result . summary ( ) . tables [ 0 ] . data ) , 2 , axis = 1 ) ) )
header_dict = { x [ 0 ] : x [ 1 ] for x in header_entries }
return RegressionResult (
result ,
' Logistic Regression ' if model_class is sm . Logit else ' {} Regression ' . format ( model_class . __name__ ) , model_class . __name__ , header_dict [ ' Method: ' ] ,
dep , result . nobs , result . df_model , datetime . now ( ) ,
2022-10-14 21:57:22 +11:00
terms ,
2022-10-13 15:57:56 +11:00
result . llf , llnull ,
getattr ( result , ' df_resid ' , None ) , getattr ( result , ' rsquared ' , None ) , getattr ( result , ' fvalue ' , None ) ,
exp
)
2022-10-13 17:23:29 +11:00
# -----------------------------
# Penalised logistic regression
class PenalisedLogit ( statsmodels . discrete . discrete_model . BinaryModel ) :
"""
Statsmodel - compatible model for computing Firth penalised logistic regression
Uses R " logistf " library
NB : This class expects to be used in the context of yli . regress ( )
"""
def fit ( self ) :
import rpy2 . robjects as ro
import rpy2 . robjects . packages
import rpy2 . robjects . pandas2ri
# Assume data is already cleaned from regress()
df = self . data . frame . copy ( )
# Convert bool to int otherwise rpy2 chokes
df = df . replace ( { False : 0 , True : 1 } )
# Import logistf
ro . packages . importr ( ' logistf ' )
with ro . conversion . localconverter ( ro . default_converter + ro . pandas2ri . converter ) :
with ro . local_context ( ) as lc :
# Convert DataFrame to R
lc [ ' df ' ] = df
# Transfer other parameters to R
lc [ ' formula_ ' ] = self . formula
# Fit the model
model = ro . r ( ' logistf(formula_, data=df) ' )
2022-10-14 21:57:22 +11:00
# TODO: Handle categorical terms?
terms = { t : SingleTerm ( t , Estimate ( b , ci0 , ci1 ) , p ) for t , b , ci0 , ci1 , p in zip ( model [ ' terms ' ] , model [ ' coefficients ' ] , model [ ' ci.lower ' ] , model [ ' ci.upper ' ] , model [ ' prob ' ] ) }
2022-10-13 17:23:29 +11:00
return RegressionResult (
model ,
' Penalised Logistic Regression ' , ' Logit ' , ' Penalised ML ' ,
self . endog_names , model [ ' n ' ] [ 0 ] , model [ ' df ' ] [ 0 ] , datetime . now ( ) ,
2022-10-14 21:57:22 +11:00
terms ,
2022-10-13 17:23:29 +11:00
model [ ' loglik ' ] [ 0 ] , model [ ' loglik ' ] [ 1 ] ,
None , None , None ,
None # Set exp in regress()
)