|
@@ -12,7 +12,7 @@ import scorecardpy as sc
|
|
|
import toad as td
|
|
|
|
|
|
|
|
|
-def f_get_bins(data: DataSplitEntity, feat: str, strategy: str='quantile', nbins: int=10) -> pd.DataFrame:
|
|
|
+def f_get_bins(data: DataSplitEntity, feat: str, strategy: str = 'quantile', nbins: int = 10) -> pd.DataFrame:
|
|
|
# 等频分箱
|
|
|
if strategy == BinsStrategyEnum.QUANTILE.value:
|
|
|
kbin_encoder = KBinsDiscretizer(n_bins=nbins, encode='ordinal', strategy='quantile')
|
|
@@ -32,22 +32,27 @@ def f_get_bins(data: DataSplitEntity, feat: str, strategy: str='quantile', nbins
|
|
|
return c
|
|
|
'''
|
|
|
|
|
|
+
|
|
|
# 此函数入参应为scorecardpy进行woebin函数转换后的dataframe
|
|
|
def f_get_bins_display(bins_info: pd.DataFrame) -> pd.DataFrame:
|
|
|
df_list = []
|
|
|
for col, bin_data in bins_info.items():
|
|
|
tmp_df = pd.DataFrame(bin_data)
|
|
|
df_list.append(tmp_df)
|
|
|
- result_df = pd.concat(df_list, ignore_index = True)
|
|
|
+ result_df = pd.concat(df_list, ignore_index=True)
|
|
|
total_bad = result_df['bad'].sum()
|
|
|
total_cnt = result_df['count'].sum()
|
|
|
# 整体的坏样本率
|
|
|
br_overall = total_bad / total_cnt
|
|
|
result_df['lift'] = result_df['badprob'] / br_overall
|
|
|
- result_df = result_df.sort_values(['total_iv', 'variable'], ascending=False).set_index(['variable','total_iv','bin'])\
|
|
|
- [['count_distr','count','good','bad','badprob','lift','bin_iv','woe']]
|
|
|
- return result_df.style.format(subset=['count','good','bad'], precision=0).format(subset=['count_distr','bad','lift',
|
|
|
- 'badprob','woe','bin_iv'],precision=4).bar(subset=['badprob','bin_iv','lift'],color=['#d65f58','#5fbb7a'])
|
|
|
+ result_df = \
|
|
|
+ result_df.sort_values(['total_iv', 'variable'], ascending=False).set_index(['variable', 'total_iv', 'bin']) \
|
|
|
+ [['count_distr', 'count', 'good', 'bad', 'badprob', 'lift', 'bin_iv', 'woe']]
|
|
|
+ return result_df.style.format(subset=['count', 'good', 'bad'], precision=0).format(
|
|
|
+ subset=['count_distr', 'bad', 'lift',
|
|
|
+ 'badprob', 'woe', 'bin_iv'], precision=4).bar(subset=['badprob', 'bin_iv', 'lift'],
|
|
|
+ color=['#d65f58', '#5fbb7a'])
|
|
|
+
|
|
|
|
|
|
# 此函数筛除变量分箱不单调或非U型的变量
|
|
|
def f_bins_filter(bins: pd.DataFrame, cols: list) -> list:
|
|
@@ -65,8 +70,9 @@ def f_bins_filter(bins: pd.DataFrame, cols: list) -> list:
|
|
|
result_cols.append(tmp_col)
|
|
|
return result_cols
|
|
|
|
|
|
-# 此函数判断list的单调性,允许至多一次符号变化,即U型分布
|
|
|
-def f_judge_monto(bd_list: list) -> int:
|
|
|
+
|
|
|
+# 此函数判断list的单调性,允许至多N次符号变化
|
|
|
+def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int:
|
|
|
start_tr = bd_list[1] - bd_list[0]
|
|
|
tmp_len = len(bd_list)
|
|
|
pos_neg_flag = 0
|
|
@@ -81,31 +87,65 @@ def f_judge_monto(bd_list: list) -> int:
|
|
|
# 记录一次符号变化
|
|
|
pos_neg_flag += 1
|
|
|
# 记录满足趋势要求的变量
|
|
|
- if pos_neg_flag <= 1:
|
|
|
- # 1 表示单调
|
|
|
- return 1
|
|
|
- # 0 表示非单调
|
|
|
- return 0
|
|
|
+ if pos_neg_flag <= pos_neg_cnt:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
|
|
|
-def f_get_woe(data: DataSplitEntity, c: td.transform.Combiner, to_drop:list) -> pd.DataFrame:
|
|
|
+
|
|
|
+def f_get_woe(data: DataSplitEntity, c: td.transform.Combiner, to_drop: list) -> pd.DataFrame:
|
|
|
transer = td.transform.WOETransformer()
|
|
|
# 根据训练数据来训练woe转换器,并选择目标变量和排除变量
|
|
|
- train_woe = transer.fit_transform(c.transform(data.train_data()), data.train_data()['target'],exclude=to_drop+['target'])
|
|
|
+ train_woe = transer.fit_transform(c.transform(data.train_data()), data.train_data()['target'],
|
|
|
+ exclude=to_drop + ['target'])
|
|
|
test_woe = transer.transform(c.transfrom(data.test_data()))
|
|
|
oot_woe = transer.transform(c.transform(data.val_data()))
|
|
|
return train_woe, test_woe, oot_woe
|
|
|
|
|
|
+
|
|
|
def f_get_iv(data: DataSplitEntity) -> pd.DataFrame:
|
|
|
# 计算前,先排除掉不需要计算IV的cols
|
|
|
- return td.quality(data, 'target',iv_only=True)
|
|
|
+ return td.quality(data, 'target', iv_only=True)
|
|
|
+
|
|
|
|
|
|
def f_get_psi(train_data: DataSplitEntity, oot_data: DataSplitEntity) -> pd.DataFrame:
|
|
|
# 计算前,先排除掉不需要的cols
|
|
|
return td.metrics.PSI(train_data, oot_data)
|
|
|
|
|
|
|
|
|
-def f_get_corr(data: DataSplitEntity, meth: str='spearman') -> pd.DataFrame:
|
|
|
+def f_get_corr(data: DataSplitEntity, meth: str = 'spearman') -> pd.DataFrame:
|
|
|
return data.train_data().corr(method=meth)
|
|
|
|
|
|
+
|
|
|
def f_get_ivf(data: DataSplitEntity) -> pd.DataFrame:
|
|
|
pass
|
|
|
+
|
|
|
+
|
|
|
+def f_get_best_bins(data: DataSplitEntity, x_column: str, special_values: list = []):
|
|
|
+ interval = 0.05
|
|
|
+ # 贪婪搜索训练集及测试集iv值最高的且单调的分箱
|
|
|
+ train_data = data.train_data
|
|
|
+ train_data_filter = train_data[~train_data[x_column].isin(special_values)]
|
|
|
+ train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True)
|
|
|
+ # 特殊值单独一箱
|
|
|
+ # train_data_special_list = []
|
|
|
+ # for special in special_values:
|
|
|
+ # df_cache = train_data[train_data[x_column] == special]
|
|
|
+ # if len(df_cache) != 0:
|
|
|
+ # train_data_special_list.append(df_cache)
|
|
|
+ x_train_data = train_data_filter[x_column]
|
|
|
+ # 计算 2 - 5 箱的情况
|
|
|
+ bin_num_list = list(range(2, 6))
|
|
|
+ for bin_num in bin_num_list:
|
|
|
+ # 构造数据切分点
|
|
|
+ point_list = []
|
|
|
+ init_point_percentile_list = [interval * i for i in range(1, bin_num)]
|
|
|
+ init_point_percentile_list.append(1 - point_list[-1])
|
|
|
+ for point_percentile in init_point_percentile_list:
|
|
|
+ point = x_train_data.iloc[int(len(x_train_data) * point_percentile)]
|
|
|
+ if point not in point_list:
|
|
|
+ point_list.append(point)
|
|
|
+ # 获取分箱结果
|
|
|
+ bins = sc.woebin(train_data, y=data.y_column, breaks_list=point_list)
|
|
|
+ # 单调性判断
|
|
|
+
|
|
|
+ pass
|