Refactor Brant test API as method of RegressionResult

This commit is contained in:
RunasSudo 2022-12-02 20:46:00 +11:00
parent fc8303678f
commit 4d59e1a521
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
2 changed files with 128 additions and 110 deletions

View File

@ -19,7 +19,7 @@ from .config import config
from .descriptives import auto_descriptives from .descriptives import auto_descriptives
from .distributions import beta_oddsratio, beta_ratio, hdi, transformed_dist from .distributions import beta_oddsratio, beta_ratio, hdi, transformed_dist
from .io import pickle_read_compressed, pickle_read_encrypted, pickle_write_compressed, pickle_write_encrypted from .io import pickle_read_compressed, pickle_read_encrypted, pickle_write_compressed, pickle_write_encrypted
from .regress import OrdinalLogit, PenalisedLogit, brant, logit_then_regress, regress, regress_bootstrap, vif from .regress import OrdinalLogit, PenalisedLogit, logit_then_regress, regress, regress_bootstrap, vif
from .sig_tests import anova_oneway, auto_univariable, chi2, mannwhitney, pearsonr, ttest_ind from .sig_tests import anova_oneway, auto_univariable, chi2, mannwhitney, pearsonr, ttest_ind
def reload_me(): def reload_me():

View File

@ -28,6 +28,7 @@ from tqdm import tqdm
from datetime import datetime from datetime import datetime
import itertools import itertools
import warnings import warnings
import weakref
from .bayes_factors import BayesFactor, bayesfactor_afbf from .bayes_factors import BayesFactor, bayesfactor_afbf
from .config import config from .config import config
@ -124,15 +125,28 @@ class RegressionResult:
""" """
def __init__(self, def __init__(self,
model_class, df, dep, formula, regress_kwargs,
raw_result, raw_result,
full_name, model_name, fit_method, full_name, model_name, fit_method,
dep, nobs, dof_model, fitted_dt, cov_type, nobs, dof_model, fitted_dt, cov_type,
terms, terms,
llf, llnull, llf, llnull,
dof_resid, rsquared, f_statistic, dof_resid, rsquared, f_statistic,
comments, comments,
exp exp
): ):
# Data about how model fitted
#: statsmodels model class used for fitting the model
self.model_class = model_class
#: Data fitted (*weakref* to *DataFrame*)
self.df = df
#: Name of the dependent variable (*str*)
self.dep = dep
#: Formula for the model (*str*)
self.formula = formula
#: Other kwargs passed to :func:`yli.regress` (*dict*)
self.regress_kwargs = regress_kwargs
#: Raw result from statsmodels *model.fit* #: Raw result from statsmodels *model.fit*
self.raw_result = raw_result self.raw_result = raw_result
@ -145,8 +159,6 @@ class RegressionResult:
self.fit_method = fit_method self.fit_method = fit_method
# Basic fitted model information # Basic fitted model information
#: Name of the dependent variable (*str*)
self.dep = dep
#: Number of observations (*int*) #: Number of observations (*int*)
self.nobs = nobs self.nobs = nobs
#: Degrees of freedom for the model (*int*) #: Degrees of freedom for the model (*int*)
@ -239,6 +251,101 @@ class RegressionResult:
else: else:
return bf01.invert() return bf01.invert()
def brant(self):
# TODO: Documentation
df = self.df()
if df is None:
raise Exception('Referenced DataFrame has been dropped')
dep = self.dep
# Check for/clean NaNs
# Following this, we pass nan_policy='raise' to assert no NaNs remaining
df = df[[dep] + cols_for_formula(self.formula, df)]
df = check_nan(df, self.regress_kwargs['nan_policy'])
# Ensure numeric type for dependent variable
if df[dep].dtype != 'float64':
df[dep] = df[dep].astype('float64')
# Convert pandas nullable types for independent variables as this breaks statsmodels
df = convert_pandas_nullable(df)
# Precompute design matrix for RHS
# This is also X+ in Brant paper
dmatrix_right = patsy.dmatrix(self.formula, df, return_type='dataframe')
dmatrix_right.reset_index(drop=True, inplace=True) # Otherwise this confuses matrix multiplication
# Fit individual logistic regressions
logit_models = []
for upper_limit in sorted(df[dep].unique())[:-1]: # FIXME: Sort order
dep_dichotomous = (df[dep] <= upper_limit).astype(int).reset_index(drop=True)
logit_result = sm.Logit(dep_dichotomous, dmatrix_right).fit(disp=False, **self.regress_kwargs.get('fit_kwargs', {}))
if not logit_result.mle_retvals['converged']:
raise Exception('Maximum likelihood estimation failed to converge for {} <= {}. Check raw_result.mle_retvals.'.format(dep, upper_limit))
if pd.isna(logit_result.bse).any():
raise Exception('Regression returned NaN standard errors for {} <= {}.'.format(dep, upper_limit))
logit_models.append(logit_result)
logit_betas = np.array([model._results.params for model in logit_models]).T
logit_pihat = np.array([expit(-model.fittedvalues) for model in logit_models]).T # Predicted probabilities
# vcov is the variance-covariance matrix of all individually fitted betas across all terms
# | model 1 | model 2 | model 3 | ...
# | term 1 | term 2 | term 1 | term 2 | term 1 | term 2 | ...
# model 1 | term 1 |
# | term 2 |
# model 2 | term 1 |
# | term 2 |
# ...
n_terms = len(dmatrix_right.columns) - 1 # number of beta terms (excluding intercept)
n_betas = len(logit_models) * n_terms
vcov = np.zeros((n_betas, n_betas))
# Populate the variance-covariance matrix for comparisons between individually fitted models
for j in range(0, len(logit_models) - 1):
for l in range(j + 1, len(logit_models)):
Wjj = np.diag(logit_pihat[:,j] - logit_pihat[:,j] * logit_pihat[:,j])
Wjl = np.diag(logit_pihat[:,l] - logit_pihat[:,j] * logit_pihat[:,l])
Wll = np.diag(logit_pihat[:,l] - logit_pihat[:,l] * logit_pihat[:,l])
matrix_result = np.linalg.inv(dmatrix_right.T @ Wjj @ dmatrix_right) @ dmatrix_right.T @ Wjl @ dmatrix_right @ np.linalg.inv(dmatrix_right.T @ Wll @ dmatrix_right)
j_vs_l_vcov = np.asarray(matrix_result)[1:,1:] # Asymptotic covariance for j,l
vcov[j*n_terms:(j+1)*n_terms, l*n_terms:(l+1)*n_terms] = j_vs_l_vcov
vcov[l*n_terms:(l+1)*n_terms, j*n_terms:(j+1)*n_terms] = j_vs_l_vcov
# Populate the variance-covariance matrix within each individually fitted model
for i in range(len(logit_models)):
vcov[i*n_terms:(i+1)*n_terms, i*n_terms:(i+1)*n_terms] = logit_models[i]._results.cov_params()[1:,1:]
# ------------------
# Perform Wald tests
beta_names = ['{}_{}'.format(raw_name, i) for i in range(len(logit_models)) for raw_name in dmatrix_right.columns[1:]]
wald_results = {}
# Omnibus test
constraints = [' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models))) for raw_name in dmatrix_right.columns[1:]]
constraint = ', '.join(constraints)
df = (len(logit_models) - 1) * (len(dmatrix_right.columns) - 1) # df = (number of levels minus 2) * (number of terms excluding intercept)
wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, df)
wald_results['Omnibus'] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue)
# Individual terms
for raw_name in dmatrix_right.columns[1:]:
constraint = ' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models)))
df = len(logit_models) - 1 # df = (number of levels minus 2)
wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, df)
wald_results[raw_name] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue)
return BrantResult(wald_results)
def _header_table(self, html): def _header_table(self, html):
"""Return the entries for the header table""" """Return the entries for the header table"""
@ -534,6 +641,9 @@ def regress(
if start_params is not None: if start_params is not None:
fit_kwargs['start_params'] = start_params fit_kwargs['start_params'] = start_params
# Collect kwargs to store in result later
regress_kwargs = dict(nan_policy=nan_policy, fit_kwargs=fit_kwargs)
# Autodetect whether to exponentiate # Autodetect whether to exponentiate
if exp is None: if exp is None:
if model_class in (sm.Logit, sm.Poisson, PenalisedLogit): if model_class in (sm.Logit, sm.Poisson, PenalisedLogit):
@ -543,6 +653,8 @@ def regress(
else: else:
exp = False exp = False
df_ref = weakref.ref(df)
if _dmatrices is None: if _dmatrices is None:
# Check for/clean NaNs # Check for/clean NaNs
df = df[[dep] + cols_for_formula(formula, df)] df = df[[dep] + cols_for_formula(formula, df)]
@ -572,6 +684,11 @@ def regress(
if isinstance(result, RegressionResult): if isinstance(result, RegressionResult):
# Already processed! # Already processed!
result.model_class = model_class
result.df = df_ref
result.dep = dep
result.formula = formula
result.regress_kwargs = regress_kwargs
result.exp = exp result.exp = exp
return result return result
@ -653,9 +770,10 @@ def regress(
full_name = 'Robust {}'.format(full_name) full_name = 'Robust {}'.format(full_name)
return RegressionResult( return RegressionResult(
model_class, df_ref, dep, formula, regress_kwargs,
result, result,
full_name, model_class.__name__, method_name, full_name, model_class.__name__, method_name,
dep, result.nobs, result.df_model, datetime.now(), getattr(result, 'cov_type', 'nonrobust'), result.nobs, result.df_model, datetime.now(), getattr(result, 'cov_type', 'nonrobust'),
terms, terms,
result.llf, llnull, result.llf, llnull,
getattr(result, 'df_resid', None), getattr(result, 'rsquared', None), getattr(result, 'fvalue', None), getattr(result, 'df_resid', None), getattr(result, 'rsquared', None), getattr(result, 'fvalue', None),
@ -692,6 +810,8 @@ def regress_bootstrap(
if fit_kwargs is None: if fit_kwargs is None:
fit_kwargs = {} fit_kwargs = {}
df_ref = weakref.ref(df)
# Check for/clean NaNs # Check for/clean NaNs
# Following this, we pass nan_policy='raise' to assert no NaNs remaining # Following this, we pass nan_policy='raise' to assert no NaNs remaining
df = df[[dep] + cols_for_formula(formula, df)] df = df[[dep] + cols_for_formula(formula, df)]
@ -750,6 +870,7 @@ def regress_bootstrap(
terms[term_name] = CategoricalTerm(categories, term.ref_category) terms[term_name] = CategoricalTerm(categories, term.ref_category)
return RegressionResult( return RegressionResult(
model_class, df_ref, dep, formula, full_model.regress_kwargs,
None, None,
full_model.full_name, full_model.model_name, full_model.fit_method, full_model.full_name, full_model.model_name, full_model.fit_method,
dep, full_model.nobs, full_model.dof_model, datetime.now(), 'Bootstrap', dep, full_model.nobs, full_model.dof_model, datetime.now(), 'Bootstrap',
@ -868,9 +989,10 @@ class PenalisedLogit(statsmodels.discrete.discrete_model.BinaryModel):
terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])} terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])}
return RegressionResult( return RegressionResult(
None, None, None, None, None, # Set in regress()
model, model,
'Penalised Logistic Regression', 'Logit', 'Penalised ML', 'Penalised Logistic Regression', 'Logit', 'Penalised ML',
self.endog_names, model['n'][0], model['df'][0], datetime.now(), 'nonrobust', model['n'][0], model['df'][0], datetime.now(), 'nonrobust',
terms, terms,
model['loglik'][0], model['loglik'][1], model['loglik'][0], model['loglik'][1],
None, None, None, None, None, None,
@ -948,107 +1070,3 @@ class BrantResult:
# FIXME # FIXME
return 'FIXME' return 'FIXME'
def brant(
df, dep, formula, *,
nan_policy='warn',
fit_kwargs=None,
method=None, maxiter=None, start_params=None, # common fit_kwargs
):
# TODO: Documentation
if fit_kwargs is None:
fit_kwargs = {}
if method is not None:
fit_kwargs['method'] = method
if maxiter is not None:
fit_kwargs['maxiter'] = maxiter
if start_params is not None:
fit_kwargs['start_params'] = start_params
# Check for/clean NaNs
# Following this, we pass nan_policy='raise' to assert no NaNs remaining
df = df[[dep] + cols_for_formula(formula, df)]
df = check_nan(df, nan_policy)
# Ensure numeric type for dependent variable
if df[dep].dtype != 'float64':
df[dep] = df[dep].astype('float64')
# Convert pandas nullable types for independent variables as this breaks statsmodels
df = convert_pandas_nullable(df)
# Precompute design matrix for RHS
# This is also X+ in Brant paper
dmatrix_right = patsy.dmatrix(formula, df, return_type='dataframe')
dmatrix_right.reset_index(drop=True, inplace=True) # Otherwise this confuses matrix multiplication
# Fit individual logistic regressions
logit_models = []
for upper_limit in sorted(df[dep].unique())[:-1]: # FIXME: Sort order
dep_dichotomous = (df[dep] <= upper_limit).astype(int).reset_index(drop=True)
logit_result = sm.Logit(dep_dichotomous, dmatrix_right).fit(disp=False, **fit_kwargs)
if not logit_result.mle_retvals['converged']:
raise Exception('Maximum likelihood estimation failed to converge for {} <= {}. Check raw_result.mle_retvals.'.format(dep, upper_limit))
if pd.isna(logit_result.bse).any():
raise Exception('Regression returned NaN standard errors for {} <= {}.'.format(dep, upper_limit))
logit_models.append(logit_result)
logit_betas = np.array([model._results.params for model in logit_models]).T
logit_pihat = np.array([expit(-model.fittedvalues) for model in logit_models]).T # Predicted probabilities
# vcov is the variance-covariance matrix of all individually fitted betas across all terms
# | model 1 | model 2 | model 3 | ...
# | term 1 | term 2 | term 1 | term 2 | term 1 | term 2 | ...
# model 1 | term 1 |
# | term 2 |
# model 2 | term 1 |
# | term 2 |
# ...
n_terms = len(dmatrix_right.columns) - 1 # number of beta terms (excluding intercept)
n_betas = len(logit_models) * n_terms
vcov = np.zeros((n_betas, n_betas))
# Populate the variance-covariance matrix for comparisons between individually fitted models
for j in range(0, len(logit_models) - 1):
for l in range(j + 1, len(logit_models)):
Wjj = np.diag(logit_pihat[:,j] - logit_pihat[:,j] * logit_pihat[:,j])
Wjl = np.diag(logit_pihat[:,l] - logit_pihat[:,j] * logit_pihat[:,l])
Wll = np.diag(logit_pihat[:,l] - logit_pihat[:,l] * logit_pihat[:,l])
matrix_result = np.linalg.inv(dmatrix_right.T @ Wjj @ dmatrix_right) @ dmatrix_right.T @ Wjl @ dmatrix_right @ np.linalg.inv(dmatrix_right.T @ Wll @ dmatrix_right)
j_vs_l_vcov = np.asarray(matrix_result)[1:,1:] # Asymptotic covariance for j,l
vcov[j*n_terms:(j+1)*n_terms, l*n_terms:(l+1)*n_terms] = j_vs_l_vcov
vcov[l*n_terms:(l+1)*n_terms, j*n_terms:(j+1)*n_terms] = j_vs_l_vcov
# Populate the variance-covariance matrix within each individually fitted model
for i in range(len(logit_models)):
vcov[i*n_terms:(i+1)*n_terms, i*n_terms:(i+1)*n_terms] = logit_models[i]._results.cov_params()[1:,1:]
# ------------------
# Perform Wald tests
beta_names = ['{}_{}'.format(raw_name, i) for i in range(len(logit_models)) for raw_name in dmatrix_right.columns[1:]]
wald_results = {}
# Omnibus test
constraints = [' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models))) for raw_name in dmatrix_right.columns[1:]]
constraint = ', '.join(constraints)
df = (len(logit_models) - 1) * (len(dmatrix_right.columns) - 1) # df = (number of levels minus 2) * (number of terms excluding intercept)
wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, df)
wald_results['Omnibus'] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue)
# Individual terms
for raw_name in dmatrix_right.columns[1:]:
constraint = ' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models)))
df = len(logit_models) - 1 # df = (number of levels minus 2)
wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, df)
wald_results[raw_name] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue)
return BrantResult(wald_results)