334 lines
11 KiB
Python
334 lines
11 KiB
Python
# scipy-yli: Helpful SciPy utilities and recipes
|
|
# Copyright © 2022–2024 Lee Yingtong Li (RunasSudo)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import pandas as pd
|
|
from scipy import stats
|
|
|
|
from .config import config
|
|
from .utils import as_numeric, check_nan
|
|
|
|
def auto_descriptives(df, cols, *, ordinal_range=[]):
|
|
"""
|
|
Automatically compute descriptive summary statistics
|
|
|
|
The statistics computed are:
|
|
|
|
* For a categorical variable – Counts of values
|
|
* For a continuous variable – Mean and standard deviation
|
|
* For an ordinal variable – Median and IQR (default) or range
|
|
|
|
There is no *nan_policy* argument. *nan* values are omitted from summary statistics for each variable, and the count of *nan* values is reported.
|
|
|
|
:param df: Data to summarise
|
|
:type df: DataFrame
|
|
:param cols: Columns in *df* for the variables to summarise
|
|
:type cols: List[str]
|
|
:param ordinal_range: Columns of ordinal variables in *df* to report median and range (rather than IQR)
|
|
:type ordinal_range: List[str]
|
|
|
|
:rtype: :class:`yli.descriptives.AutoDescriptivesResult`
|
|
"""
|
|
|
|
result_data = []
|
|
result_labels = []
|
|
|
|
for col in cols:
|
|
data_cleaned = df[col].dropna()
|
|
|
|
if data_cleaned.dtype == 'category' and data_cleaned.cat.ordered and data_cleaned.cat.categories.dtype in ('float64', 'int64', 'Float64', 'Int64'):
|
|
# Ordinal numeric data
|
|
data_cleaned = data_cleaned.astype('float64')
|
|
|
|
if col in ordinal_range:
|
|
# Report range
|
|
result_labels.append((
|
|
'{}, median (range)'.format(col),
|
|
'{}, median (range)'.format(col),
|
|
))
|
|
result_data.append((
|
|
'{:.2f} ({:.2f}–{:.2f})'.format(data_cleaned.median(), data_cleaned.min(), data_cleaned.max()),
|
|
len(df) - len(data_cleaned)
|
|
))
|
|
else:
|
|
# Report IQR
|
|
result_labels.append((
|
|
'{}, median (IQR)'.format(col),
|
|
'{}, median (IQR)'.format(col),
|
|
))
|
|
result_data.append((
|
|
'{:.2f} ({:.2f}–{:.2f})'.format(data_cleaned.median(), data_cleaned.quantile(0.25), data_cleaned.quantile(0.75)),
|
|
len(df) - len(data_cleaned)
|
|
))
|
|
elif data_cleaned.dtype in ('bool', 'boolean', 'category', 'object'):
|
|
# Categorical data
|
|
# FIXME: Sort order
|
|
values = sorted(data_cleaned.unique())
|
|
|
|
# Value counts
|
|
result_labels.append((
|
|
'{}, {}'.format(col, ':'.join(str(v) for v in values)),
|
|
'{}, {}'.format(col, ':'.join(str(v) for v in values)),
|
|
))
|
|
result_data.append((
|
|
':'.join(str((data_cleaned == v).sum()) for v in values),
|
|
len(df) - len(data_cleaned)
|
|
))
|
|
elif data_cleaned.dtype in ('float64', 'int64', 'Float64', 'Int64'):
|
|
# Continuous data
|
|
result_labels.append((
|
|
'{}, μ (SD)'.format(col),
|
|
'{}, <i>μ</i> (SD)'.format(col),
|
|
))
|
|
result_data.append((
|
|
'{:.2f} ({:.2f})'.format(data_cleaned.mean(), data_cleaned.std()),
|
|
len(df) - len(data_cleaned)
|
|
))
|
|
else:
|
|
raise Exception('Unsupported dtype for auto_descriptives, {}'.format(df[col].dtype))
|
|
|
|
return AutoDescriptivesResult(result_data=result_data, result_labels=result_labels)
|
|
|
|
class AutoDescriptivesResult:
|
|
"""
|
|
Result of automatically computed descriptive summary statistics
|
|
|
|
See :func:`yli.auto_descriptives`.
|
|
|
|
Results data stored within instances of this class is not intended to be directly accessed.
|
|
"""
|
|
|
|
def __init__(self, *, result_data, result_labels):
|
|
# List of tuples (variable summary, missing count)
|
|
self._result_data = result_data
|
|
# List of tuples (plaintext label, HTML label)
|
|
self._result_labels = result_labels
|
|
|
|
def __repr__(self):
|
|
if config.repr_is_summary:
|
|
return self.summary()
|
|
return super().__repr__()
|
|
|
|
def _repr_html_(self):
|
|
result = '<table><thead><tr><th></th><th></th><th>Missing</th></tr></thead><tbody>'
|
|
|
|
for data, label in zip(self._result_data, self._result_labels):
|
|
result += '<tr><th>{}</th><td>{}</td><td>{}</td></tr>'.format(label[1], data[0], data[1])
|
|
|
|
result += '</tbody></table>'
|
|
return result
|
|
|
|
def summary(self):
|
|
"""
|
|
Return a stringified summary of the tests of association
|
|
|
|
:rtype: str
|
|
"""
|
|
|
|
# Format data for output
|
|
result_labels_fmt = [r[0] for r in self._result_labels]
|
|
table = pd.DataFrame(self._result_data, index=result_labels_fmt, columns=['', 'Missing'])
|
|
return str(table)
|
|
|
|
def auto_correlations(df, cols):
|
|
"""
|
|
Automatically compute pairwise correlation coefficients
|
|
|
|
Categorical variables are factorised and coded according to the natural sort order of the categories.
|
|
Ordinal variables are factorised and coded as ranks.
|
|
Pairwise Pearson correlation coefficients are then calculated on the coded data.
|
|
|
|
The effect of the coding is that, for example:
|
|
|
|
* 2 continuous variables are compared using Pearson's *r*
|
|
* 2 ordinal variables are compared using Spearman's *ρ*
|
|
* 2 dichotomous variables are compared using Yule's *φ*
|
|
* A continuous variable and dichotomous variable are compared using point-biserial correlation
|
|
* An ordinal variable and dichotomous variable are compared using rank-biserial correlation
|
|
|
|
The exception is involving a categorical variable with more than 2 categories.
|
|
Correlation of 2 such variables is by Cramér's *V*.
|
|
Correlation of 1 such variable with another type of variable is by the correlation ratio *η* (on ranks, if applicable).
|
|
|
|
There is no *nan_policy* argument. *nan* values are omitted from summary statistics for each variable, and the count of *nan* values is reported.
|
|
|
|
:param df: Data to compute correlations for
|
|
:type df: DataFrame
|
|
:param cols: Columns in *df* for the variables to compute correlations for
|
|
:type cols: List[str]
|
|
|
|
:rtype: :class:`yli.descriptives.AutoCorrelationsResult`
|
|
"""
|
|
|
|
# Code columns as numeric/ranks/etc. as appropriate
|
|
df_coded = pd.DataFrame(index=df.index)
|
|
|
|
categorical_columns = []
|
|
|
|
for col_name in cols:
|
|
col = df[col_name]
|
|
|
|
if col.dtype == 'category' and col.cat.ordered:
|
|
# Ordinal variable
|
|
# Factorise if required
|
|
col, _ = as_numeric(col)
|
|
|
|
# Code as ranks
|
|
col[col >= 0] = stats.rankdata(col[col >= 0])
|
|
|
|
# Put NaNs back
|
|
col = col.astype('float64')
|
|
col[col < 0] = pd.NA
|
|
|
|
df_coded[col_name] = col
|
|
elif col.dtype in ('bool', 'boolean', 'category', 'object'):
|
|
# Categorical variable
|
|
# Factorise
|
|
cat_values = col.dropna().unique()
|
|
cat_values = sorted(cat_values)
|
|
col = col.replace({v: i for i, v in enumerate(cat_values)})
|
|
df_coded[col_name] = col
|
|
|
|
categorical_columns.append(col_name)
|
|
|
|
#if len(cat_values) == 2:
|
|
# Categorical variable with 2 categories
|
|
# Code as 0/1/NA
|
|
#cat_values = sorted(cat_values)
|
|
#col = col.replace({cat_values[0]: 0, cat_values[1]: 1})
|
|
#df_coded[col_name] = col
|
|
#else:
|
|
# Categorical variable with >2 categories
|
|
# Factorise
|
|
|
|
|
|
# Create dummy variables
|
|
#dummies = pd.get_dummies(col, prefix=col_name)
|
|
#df_coded = df_coded.join(dummies)
|
|
else:
|
|
# Numeric variable, etc.
|
|
df_coded[col_name] = col
|
|
|
|
# Compute pairwise correlation
|
|
df_corr = pd.DataFrame(index=df_coded.columns, columns=df_coded.columns, dtype='float64')
|
|
|
|
for i, col1 in enumerate(df_coded.columns):
|
|
for col2 in df_coded.columns[:i]:
|
|
df_2cols = df_coded[[col1, col2]].dropna()
|
|
|
|
if col1 in categorical_columns and len(df_coded[col1].unique()) > 2:
|
|
if col2 in categorical_columns and len(df_coded[col2].unique()) > 2:
|
|
# Categorical-categorical
|
|
# Compute Cramer's V
|
|
statistic = stats.contingency.association(df_2cols, method='cramer')
|
|
df_corr.loc[col1, col2] = statistic
|
|
df_corr.loc[col2, col1] = statistic
|
|
else:
|
|
# Categorical-nominal, etc.
|
|
# Compute eta
|
|
statistic = _compute_eta(df_2cols, col1, col2)
|
|
df_corr.loc[col1, col2] = statistic
|
|
df_corr.loc[col2, col1] = statistic
|
|
else:
|
|
if col2 in categorical_columns and len(df_coded[col2].unique()) > 2:
|
|
# Categorical-nominal, etc.
|
|
# Compute eta
|
|
statistic = _compute_eta(df_2cols, col2, col1)
|
|
df_corr.loc[col1, col2] = statistic
|
|
df_corr.loc[col2, col1] = statistic
|
|
else:
|
|
# Continuous-continuous, etc.
|
|
# Compute Pearson r (or Spearman rho, point-biserial, etc.)
|
|
statistic = stats.pearsonr(df_2cols[col1], df_2cols[col2]).statistic
|
|
df_corr.loc[col1, col2] = statistic
|
|
df_corr.loc[col2, col1] = statistic
|
|
|
|
# Correlation with itself is always 1
|
|
df_corr.loc[col1, col1] = 1
|
|
|
|
return AutoCorrelationsResult(df_corr)
|
|
|
|
def _compute_eta(df, col_category, col_numeric):
|
|
"""
|
|
Compute the correlation ratio, *η*
|
|
"""
|
|
|
|
ssw = 0
|
|
ssb = 0
|
|
values_mean = df[col_numeric].astype('float64').mean()
|
|
for category in df[col_category].unique():
|
|
subgroup = df[df[col_category] == category][col_numeric].astype('float64')
|
|
ssw += ((subgroup - subgroup.mean())**2).sum()
|
|
ssb += len(subgroup) * (subgroup.mean() - values_mean)**2
|
|
statistic = (ssb / (ssb + ssw))**0.5
|
|
return statistic
|
|
|
|
class AutoCorrelationsResult:
|
|
"""
|
|
Result of automatically computed pairwise correlation coefficients
|
|
|
|
See :func:`yli.auto_correlations`.
|
|
"""
|
|
|
|
def __init__(self, correlations):
|
|
#: Pairwise correlation coefficients (*DataFrame*)
|
|
self.correlations = correlations
|
|
|
|
def __repr__(self):
|
|
if config.repr_is_summary:
|
|
return self.summary()
|
|
return super().__repr__()
|
|
|
|
def _repr_html_(self):
|
|
df_repr = self.correlations._repr_html_()
|
|
|
|
# Insert caption
|
|
idx_endopen = df_repr.index('>', df_repr.index('<table'))
|
|
df_repr = df_repr[:idx_endopen+1] + '<caption>Correlation Matrix</caption>' + df_repr[idx_endopen+1:]
|
|
|
|
return df_repr
|
|
|
|
def summary(self):
|
|
"""
|
|
Return a stringified summary of the correlation matrix
|
|
|
|
:rtype: str
|
|
"""
|
|
|
|
return 'Correlation Matrix\n\n' + str(self.correlations)
|
|
|
|
def plot(self):
|
|
"""
|
|
Plot a heatmap of the pairwise correlation coefficients
|
|
"""
|
|
|
|
import seaborn as sns
|
|
|
|
# Format values without leading 0
|
|
def fmt_value(v):
|
|
if v >= 0.995:
|
|
return '1.0'
|
|
if v <= -0.995:
|
|
return '-1.0'
|
|
if v < 0:
|
|
if v >= -0.005:
|
|
return '.00'
|
|
return '-' + format(-v, '.2f').lstrip('0')
|
|
return format(v, '.2f').lstrip('0')
|
|
|
|
correlation_values = self.correlations.applymap(fmt_value)
|
|
|
|
sns.heatmap(self.correlations, vmin=-1, vmax=1, cmap='RdBu', annot=correlation_values, fmt='', annot_kws={'fontsize': 'small'})
|