|
@@ -7,72 +7,8 @@
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
import scorecardpy as sc
|
|
|
-import toad as td
|
|
|
-from sklearn.preprocessing import KBinsDiscretizer
|
|
|
from statsmodels.stats.outliers_influence import variance_inflation_factor as vif
|
|
|
|
|
|
-from entitys import DataSplitEntity
|
|
|
-from enums import BinsStrategyEnum
|
|
|
-
|
|
|
-
|
|
|
-def f_get_bins(data: DataSplitEntity, feat: str, strategy: str = 'quantile', nbins: int = 10) -> pd.DataFrame:
|
|
|
- # 等频分箱
|
|
|
- if strategy == BinsStrategyEnum.QUANTILE.value:
|
|
|
- kbin_encoder = KBinsDiscretizer(n_bins=nbins, encode='ordinal', strategy='quantile')
|
|
|
- feature_binned = kbin_encoder.fit_transform(data[feat])
|
|
|
- return feature_binned.astype(int).astype(str)
|
|
|
- # 等宽分箱
|
|
|
- if strategy == BinsStrategyEnum.WIDTH.value:
|
|
|
- bin_width = (data.train_data()[feat].max() - data.train_data()[feat].min()) / nbins
|
|
|
- return pd.cut(data.train_data()[feat], bins=nbins, labels=[f'Bin_{i}' for i in range(1, nbins + 1)])
|
|
|
- # 使用toad分箱
|
|
|
- '''
|
|
|
- c = td.transfrom.Combiner()
|
|
|
- # method参数需要根据toad指定的几种方法名称选择
|
|
|
- c.fit(data, y = 'target', method = strategy, min_samples=None, n_bins = nbins, empty_separate = False)
|
|
|
- # 返回toad分箱combiner,用于训练集和测试集的分箱
|
|
|
- # 可使用c.export()[feature]查看某一特征的分箱临界值
|
|
|
- return c
|
|
|
- '''
|
|
|
-
|
|
|
-
|
|
|
-# 此函数入参应为scorecardpy进行woebin函数转换后的dataframe
|
|
|
-def f_get_bins_display(bins_info: pd.DataFrame) -> pd.DataFrame:
|
|
|
- df_list = []
|
|
|
- for col, bin_data in bins_info.items():
|
|
|
- tmp_df = pd.DataFrame(bin_data)
|
|
|
- df_list.append(tmp_df)
|
|
|
- result_df = pd.concat(df_list, ignore_index=True)
|
|
|
- total_bad = result_df['bad'].sum()
|
|
|
- total_cnt = result_df['count'].sum()
|
|
|
- # 整体的坏样本率
|
|
|
- br_overall = total_bad / total_cnt
|
|
|
- result_df['lift'] = result_df['badprob'] / br_overall
|
|
|
- result_df = \
|
|
|
- result_df.sort_values(['total_iv', 'variable'], ascending=False).set_index(['variable', 'total_iv', 'bin']) \
|
|
|
- [['count_distr', 'count', 'good', 'bad', 'badprob', 'lift', 'bin_iv', 'woe']]
|
|
|
- return result_df.style.format(subset=['count', 'good', 'bad'], precision=0).format(
|
|
|
- subset=['count_distr', 'bad', 'lift',
|
|
|
- 'badprob', 'woe', 'bin_iv'], precision=4).bar(subset=['badprob', 'bin_iv', 'lift'],
|
|
|
- color=['#d65f58', '#5fbb7a'])
|
|
|
-
|
|
|
-
|
|
|
-# 此函数筛除变量分箱不单调或非U型的变量
|
|
|
-def f_bins_filter(bins: pd.DataFrame, cols: list) -> list:
|
|
|
- result_cols = []
|
|
|
- # 遍历原始变量列表
|
|
|
- for tmp_col in cols:
|
|
|
- tmp_br = bins[tmp_col]['bad_prob'].values.tolist()
|
|
|
- tmp_len = len(tmp_br)
|
|
|
- if tmp_len <= 2:
|
|
|
- result_cols.append(tmp_col)
|
|
|
- else:
|
|
|
- tmp_judge = f_judge_monto(tmp_br)
|
|
|
- # f_judge_monto 函数返回1表示list单调,0表示非单调
|
|
|
- if tmp_judge:
|
|
|
- result_cols.append(tmp_col)
|
|
|
- return result_cols
|
|
|
-
|
|
|
|
|
|
# 此函数判断list的单调性,允许至多N次符号变化
|
|
|
def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int:
|
|
@@ -98,21 +34,6 @@ def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int:
|
|
|
return False
|
|
|
|
|
|
|
|
|
-def f_get_woe(data: DataSplitEntity, c: td.transform.Combiner, to_drop: list) -> pd.DataFrame:
|
|
|
- transer = td.transform.WOETransformer()
|
|
|
- # 根据训练数据来训练woe转换器,并选择目标变量和排除变量
|
|
|
- train_woe = transer.fit_transform(c.transform(data.train_data()), data.train_data()['target'],
|
|
|
- exclude=to_drop + ['target'])
|
|
|
- test_woe = transer.transform(c.transfrom(data.test_data()))
|
|
|
- oot_woe = transer.transform(c.transform(data.val_data()))
|
|
|
- return train_woe, test_woe, oot_woe
|
|
|
-
|
|
|
-
|
|
|
-def f_get_psi(train_data: DataSplitEntity, oot_data: DataSplitEntity) -> pd.DataFrame:
|
|
|
- # 计算前,先排除掉不需要的cols
|
|
|
- return td.metrics.PSI(train_data, oot_data)
|
|
|
-
|
|
|
-
|
|
|
def f_get_corr(data: pd.DataFrame, meth: str = 'spearman') -> pd.DataFrame:
|
|
|
return data.corr(method=meth)
|
|
|
|
|
@@ -158,6 +79,7 @@ def f_get_model_score_bin(df, card, bins=None):
|
|
|
df['MODEL_SCORE_BIN'] = score_bins.astype(str).values
|
|
|
return df, bins
|
|
|
|
|
|
+
|
|
|
def f_calcu_model_psi(df_train, df_test):
|
|
|
tmp1 = df_train.groupby('MODEL_SCORE_BIN')['MODEL_SCORE_BIN'].agg(['count']).sort_index(ascending=True)
|
|
|
tmp1['样本数比例'] = (tmp1['count'] / tmp1['count'].sum()).round(4)
|