123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- # -*- coding:utf-8 -*-
- """
- @author: yq
- @time: 2023/12/28
- @desc: 特征工具类
- """
- from itertools import combinations_with_replacement
- import numpy as np
- import pandas as pd
- import scorecardpy as sc
- import toad as td
- from sklearn.preprocessing import KBinsDiscretizer
- from tqdm import tqdm
- from entitys import DataSplitEntity
- from enums import BinsStrategyEnum
- def f_get_bins(data: DataSplitEntity, feat: str, strategy: str = 'quantile', nbins: int = 10) -> pd.DataFrame:
- # 等频分箱
- if strategy == BinsStrategyEnum.QUANTILE.value:
- kbin_encoder = KBinsDiscretizer(n_bins=nbins, encode='ordinal', strategy='quantile')
- feature_binned = kbin_encoder.fit_transform(data[feat])
- return feature_binned.astype(int).astype(str)
- # 等宽分箱
- if strategy == BinsStrategyEnum.WIDTH.value:
- bin_width = (data.train_data()[feat].max() - data.train_data()[feat].min()) / nbins
- return pd.cut(data.train_data()[feat], bins=nbins, labels=[f'Bin_{i}' for i in range(1, nbins + 1)])
- # 使用toad分箱
- '''
- c = td.transfrom.Combiner()
- # method参数需要根据toad指定的几种方法名称选择
- c.fit(data, y = 'target', method = strategy, min_samples=None, n_bins = nbins, empty_separate = False)
- # 返回toad分箱combiner,用于训练集和测试集的分箱
- # 可使用c.export()[feature]查看某一特征的分箱临界值
- return c
- '''
- # 此函数入参应为scorecardpy进行woebin函数转换后的dataframe
- def f_get_bins_display(bins_info: pd.DataFrame) -> pd.DataFrame:
- df_list = []
- for col, bin_data in bins_info.items():
- tmp_df = pd.DataFrame(bin_data)
- df_list.append(tmp_df)
- result_df = pd.concat(df_list, ignore_index=True)
- total_bad = result_df['bad'].sum()
- total_cnt = result_df['count'].sum()
- # 整体的坏样本率
- br_overall = total_bad / total_cnt
- result_df['lift'] = result_df['badprob'] / br_overall
- result_df = \
- result_df.sort_values(['total_iv', 'variable'], ascending=False).set_index(['variable', 'total_iv', 'bin']) \
- [['count_distr', 'count', 'good', 'bad', 'badprob', 'lift', 'bin_iv', 'woe']]
- return result_df.style.format(subset=['count', 'good', 'bad'], precision=0).format(
- subset=['count_distr', 'bad', 'lift',
- 'badprob', 'woe', 'bin_iv'], precision=4).bar(subset=['badprob', 'bin_iv', 'lift'],
- color=['#d65f58', '#5fbb7a'])
- # 此函数筛除变量分箱不单调或非U型的变量
- def f_bins_filter(bins: pd.DataFrame, cols: list) -> list:
- result_cols = []
- # 遍历原始变量列表
- for tmp_col in cols:
- tmp_br = bins[tmp_col]['bad_prob'].values.tolist()
- tmp_len = len(tmp_br)
- if tmp_len <= 2:
- result_cols.append(tmp_col)
- else:
- tmp_judge = f_judge_monto(tmp_br)
- # f_judge_monto 函数返回1表示list单调,0表示非单调
- if tmp_judge:
- result_cols.append(tmp_col)
- return result_cols
- # 此函数判断list的单调性,允许至多N次符号变化
- def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int:
- start_tr = bd_list[1] - bd_list[0]
- tmp_len = len(bd_list)
- pos_neg_flag = 0
- for i in range(2, tmp_len):
- tmp_tr = bd_list[i] - bd_list[i - 1]
- # 后一位bad_rate减前一位bad_rate,保证bad_rate的单调性
- # 记录符号变化, 允许 最多一次符号变化,即U型分布
- if (tmp_tr >= 0 and start_tr >= 0) or (tmp_tr <= 0 and start_tr <= 0):
- # 满足趋势保持,查看下一位
- continue
- else:
- # 记录一次符号变化
- start_tr = tmp_tr
- pos_neg_flag += 1
- if pos_neg_flag > pos_neg_cnt:
- return False
- # 记录满足趋势要求的变量
- if pos_neg_flag <= pos_neg_cnt:
- return True
- return False
- def f_get_woe(data: DataSplitEntity, c: td.transform.Combiner, to_drop: list) -> pd.DataFrame:
- transer = td.transform.WOETransformer()
- # 根据训练数据来训练woe转换器,并选择目标变量和排除变量
- train_woe = transer.fit_transform(c.transform(data.train_data()), data.train_data()['target'],
- exclude=to_drop + ['target'])
- test_woe = transer.transform(c.transfrom(data.test_data()))
- oot_woe = transer.transform(c.transform(data.val_data()))
- return train_woe, test_woe, oot_woe
- def f_get_iv(data: DataSplitEntity) -> pd.DataFrame:
- # 计算前,先排除掉不需要计算IV的cols
- return td.quality(data, 'target', iv_only=True)
- def f_get_psi(train_data: DataSplitEntity, oot_data: DataSplitEntity) -> pd.DataFrame:
- # 计算前,先排除掉不需要的cols
- return td.metrics.PSI(train_data, oot_data)
- def f_get_corr(data: DataSplitEntity, meth: str = 'spearman') -> pd.DataFrame:
- return data.train_data().corr(method=meth)
- def f_get_ivf(data: DataSplitEntity) -> pd.DataFrame:
- pass
- def _f_distribute_balls(balls, boxes):
- # 计算在 balls - 1 个空位中放入 boxes - 1 个隔板的方法数
- total_ways = combinations_with_replacement(range(balls + boxes - 1), boxes - 1)
- distribute_list = []
- # 遍历所有可能的隔板位置
- for combo in total_ways:
- # 根据隔板位置分配球
- distribution = [0] * boxes
- start = 0
- for i, divider in enumerate(combo):
- distribution[i] = divider - start + 1
- start = divider + 1
- distribution[-1] = balls - start # 最后一个箱子的球数
- # 确保每个箱子至少有一个球
- if all(x > 0 for x in distribution):
- distribute_list.append(distribution)
- return distribute_list
- def f_get_best_bins(data: DataSplitEntity, x_column: str, special_values: list = []):
- # 贪婪搜索【训练集】及【测试集】加起来【iv】值最高的且【单调】的分箱
- interval = 0.05
- def _n0(x):
- return sum(x == 0)
- def _n1(x):
- return sum(x == 1)
- def _get_sv_bins(df, x_column, y_column, special_values):
- # special_values_bins
- sv_bin_list = []
- for special in special_values:
- dtm = df[df[x_column] == special]
- if len(dtm) != 0:
- dtm['bin'] = [str(special)] * len(dtm)
- binning = dtm.groupby(['bin'], group_keys=False)[y_column].agg(
- [_n0, _n1]).reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
- binning['is_special_values'] = [True] * len(binning)
- sv_bin_list.append(binning)
- return sv_bin_list
- def _get_bins(df, x_column, y_column, breaks_list):
- dtm = pd.DataFrame({'y': df[y_column], 'value': df[x_column]})
- bstbrks = [-np.inf] + breaks_list + [np.inf]
- labels = ['[{},{})'.format(bstbrks[i], bstbrks[i + 1]) for i in range(len(bstbrks) - 1)]
- dtm.loc[:, 'bin'] = pd.cut(dtm['value'], bstbrks, right=False, labels=labels)
- dtm['bin'] = dtm['bin'].astype(str)
- bins = dtm.groupby(['bin'], group_keys=False)['y'].agg([_n0, _n1]) \
- .reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
- bins['is_special_values'] = [False] * len(bins)
- return bins
- def _calculation_iv(bins):
- bins['count'] = bins['good'] + bins['bad']
- bins['badprob'] = bins['bad'] / bins['count']
- # 单调性判断
- bad_prob = bins[bins['is_special_values'] == False]['badprob'].values.tolist()
- if not f_judge_monto(bad_prob):
- return -1
- # 计算iv
- infovalue = pd.DataFrame({'good': bins['good'], 'bad': bins['bad']}) \
- .replace(0, 0.9) \
- .assign(
- DistrBad=lambda x: x.bad / sum(x.bad),
- DistrGood=lambda x: x.good / sum(x.good)
- ) \
- .assign(iv=lambda x: (x.DistrBad - x.DistrGood) * np.log(x.DistrBad / x.DistrGood)) \
- .iv
- bins['bin_iv'] = infovalue
- bins['total_iv'] = bins['bin_iv'].sum()
- iv = bins['total_iv'].values[0]
- return iv
- train_data = data.train_data
- train_data_filter = train_data[~train_data[x_column].isin(special_values)]
- train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True)
- train_data_x = train_data_filter[x_column]
- test_data = data.test_data
- test_data_filter = None
- if test_data is not None and len(test_data) != 0:
- test_data_filter = test_data[~test_data[x_column].isin(special_values)]
- test_data_filter = test_data_filter.sort_values(by=x_column, ascending=True)
- # 构造数据切分点
- # 计算 2 - 5 箱的情况
- distribute_list = []
- points_list = []
- for bin_num in list(range(2, 6)):
- distribute_list.extend(_f_distribute_balls(int(1 / interval), bin_num))
- for distribute in distribute_list:
- point_list_cache = []
- point_percentile_list = [sum(distribute[0:idx + 1]) * interval for idx, _ in enumerate(distribute[0:-1])]
- for point_percentile in point_percentile_list:
- point = train_data_x.iloc[int(len(train_data_x) * point_percentile)]
- if point not in point_list_cache:
- point_list_cache.append(point)
- if point_list_cache not in points_list:
- points_list.append(point_list_cache)
- # IV与单调性过滤
- iv_max = 0
- breaks_list = []
- train_sv_bin_list = _get_sv_bins(train_data, x_column, data.y_column, special_values)
- test_sv_bin_list = None
- if test_data_filter is not None:
- test_sv_bin_list = _get_sv_bins(test_data, x_column, data.y_column, special_values)
- for point_list in tqdm(points_list):
- train_bins = _get_bins(train_data_filter, x_column, data.y_column, point_list)
- # 与special_values合并计算iv
- for sv_bin in train_sv_bin_list:
- train_bins = pd.concat((train_bins, sv_bin))
- train_iv = _calculation_iv(train_bins)
- # 只限制训练集的单调性与iv值大小
- if train_iv < 0.03:
- continue
- test_iv = 0
- if test_data_filter is not None:
- test_bins = _get_bins(test_data_filter, x_column, data.y_column, point_list)
- for sv_bin in test_sv_bin_list:
- test_bins = pd.concat((test_bins, sv_bin))
- test_iv = _calculation_iv(test_bins)
- iv = train_iv + test_iv
- if iv > iv_max:
- iv_max = iv
- breaks_list = point_list
- return iv_max, breaks_list
- if __name__ == "__main__":
- dat = sc.germancredit()
- dat["creditability"] = dat["creditability"].apply(lambda x: 1 if x == "bad" else 0)
- data = DataSplitEntity(dat[:700], None, dat[700:], "creditability")
- iv_max, breaks_list = f_get_best_bins(data, "duration_in_month", special_values=[24, 12])
- print(iv_max, breaks_list)
- pass
|