feature_utils.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. # -*- coding:utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2023/12/28
  5. @desc: 特征工具类
  6. """
  7. from itertools import combinations_with_replacement
  8. import numpy as np
  9. import pandas as pd
  10. import scorecardpy as sc
  11. import toad as td
  12. from sklearn.preprocessing import KBinsDiscretizer
  13. from tqdm import tqdm
  14. from entitys import DataSplitEntity
  15. from enums import BinsStrategyEnum
  16. def f_get_bins(data: DataSplitEntity, feat: str, strategy: str = 'quantile', nbins: int = 10) -> pd.DataFrame:
  17. # 等频分箱
  18. if strategy == BinsStrategyEnum.QUANTILE.value:
  19. kbin_encoder = KBinsDiscretizer(n_bins=nbins, encode='ordinal', strategy='quantile')
  20. feature_binned = kbin_encoder.fit_transform(data[feat])
  21. return feature_binned.astype(int).astype(str)
  22. # 等宽分箱
  23. if strategy == BinsStrategyEnum.WIDTH.value:
  24. bin_width = (data.train_data()[feat].max() - data.train_data()[feat].min()) / nbins
  25. return pd.cut(data.train_data()[feat], bins=nbins, labels=[f'Bin_{i}' for i in range(1, nbins + 1)])
  26. # 使用toad分箱
  27. '''
  28. c = td.transfrom.Combiner()
  29. # method参数需要根据toad指定的几种方法名称选择
  30. c.fit(data, y = 'target', method = strategy, min_samples=None, n_bins = nbins, empty_separate = False)
  31. # 返回toad分箱combiner,用于训练集和测试集的分箱
  32. # 可使用c.export()[feature]查看某一特征的分箱临界值
  33. return c
  34. '''
  35. # 此函数入参应为scorecardpy进行woebin函数转换后的dataframe
  36. def f_get_bins_display(bins_info: pd.DataFrame) -> pd.DataFrame:
  37. df_list = []
  38. for col, bin_data in bins_info.items():
  39. tmp_df = pd.DataFrame(bin_data)
  40. df_list.append(tmp_df)
  41. result_df = pd.concat(df_list, ignore_index=True)
  42. total_bad = result_df['bad'].sum()
  43. total_cnt = result_df['count'].sum()
  44. # 整体的坏样本率
  45. br_overall = total_bad / total_cnt
  46. result_df['lift'] = result_df['badprob'] / br_overall
  47. result_df = \
  48. result_df.sort_values(['total_iv', 'variable'], ascending=False).set_index(['variable', 'total_iv', 'bin']) \
  49. [['count_distr', 'count', 'good', 'bad', 'badprob', 'lift', 'bin_iv', 'woe']]
  50. return result_df.style.format(subset=['count', 'good', 'bad'], precision=0).format(
  51. subset=['count_distr', 'bad', 'lift',
  52. 'badprob', 'woe', 'bin_iv'], precision=4).bar(subset=['badprob', 'bin_iv', 'lift'],
  53. color=['#d65f58', '#5fbb7a'])
  54. # 此函数筛除变量分箱不单调或非U型的变量
  55. def f_bins_filter(bins: pd.DataFrame, cols: list) -> list:
  56. result_cols = []
  57. # 遍历原始变量列表
  58. for tmp_col in cols:
  59. tmp_br = bins[tmp_col]['bad_prob'].values.tolist()
  60. tmp_len = len(tmp_br)
  61. if tmp_len <= 2:
  62. result_cols.append(tmp_col)
  63. else:
  64. tmp_judge = f_judge_monto(tmp_br)
  65. # f_judge_monto 函数返回1表示list单调,0表示非单调
  66. if tmp_judge:
  67. result_cols.append(tmp_col)
  68. return result_cols
  69. # 此函数判断list的单调性,允许至多N次符号变化
  70. def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int:
  71. start_tr = bd_list[1] - bd_list[0]
  72. tmp_len = len(bd_list)
  73. pos_neg_flag = 0
  74. for i in range(2, tmp_len):
  75. tmp_tr = bd_list[i] - bd_list[i - 1]
  76. # 后一位bad_rate减前一位bad_rate,保证bad_rate的单调性
  77. # 记录符号变化, 允许 最多一次符号变化,即U型分布
  78. if (tmp_tr >= 0 and start_tr >= 0) or (tmp_tr <= 0 and start_tr <= 0):
  79. # 满足趋势保持,查看下一位
  80. continue
  81. else:
  82. # 记录一次符号变化
  83. start_tr = tmp_tr
  84. pos_neg_flag += 1
  85. if pos_neg_flag > pos_neg_cnt:
  86. return False
  87. # 记录满足趋势要求的变量
  88. if pos_neg_flag <= pos_neg_cnt:
  89. return True
  90. return False
  91. def f_get_woe(data: DataSplitEntity, c: td.transform.Combiner, to_drop: list) -> pd.DataFrame:
  92. transer = td.transform.WOETransformer()
  93. # 根据训练数据来训练woe转换器,并选择目标变量和排除变量
  94. train_woe = transer.fit_transform(c.transform(data.train_data()), data.train_data()['target'],
  95. exclude=to_drop + ['target'])
  96. test_woe = transer.transform(c.transfrom(data.test_data()))
  97. oot_woe = transer.transform(c.transform(data.val_data()))
  98. return train_woe, test_woe, oot_woe
  99. def f_get_iv(data: DataSplitEntity) -> pd.DataFrame:
  100. # 计算前,先排除掉不需要计算IV的cols
  101. return td.quality(data, 'target', iv_only=True)
  102. def f_get_psi(train_data: DataSplitEntity, oot_data: DataSplitEntity) -> pd.DataFrame:
  103. # 计算前,先排除掉不需要的cols
  104. return td.metrics.PSI(train_data, oot_data)
  105. def f_get_corr(data: DataSplitEntity, meth: str = 'spearman') -> pd.DataFrame:
  106. return data.train_data().corr(method=meth)
  107. def f_get_ivf(data: DataSplitEntity) -> pd.DataFrame:
  108. pass
  109. def _f_distribute_balls(balls, boxes):
  110. # 计算在 balls - 1 个空位中放入 boxes - 1 个隔板的方法数
  111. total_ways = combinations_with_replacement(range(balls + boxes - 1), boxes - 1)
  112. distribute_list = []
  113. # 遍历所有可能的隔板位置
  114. for combo in total_ways:
  115. # 根据隔板位置分配球
  116. distribution = [0] * boxes
  117. start = 0
  118. for i, divider in enumerate(combo):
  119. distribution[i] = divider - start + 1
  120. start = divider + 1
  121. distribution[-1] = balls - start # 最后一个箱子的球数
  122. # 确保每个箱子至少有一个球
  123. if all(x > 0 for x in distribution):
  124. distribute_list.append(distribution)
  125. return distribute_list
  126. def f_get_best_bins(data: DataSplitEntity, x_column: str, special_values: list = []):
  127. # 贪婪搜索【训练集】及【测试集】加起来【iv】值最高的且【单调】的分箱
  128. interval = 0.05
  129. def _n0(x):
  130. return sum(x == 0)
  131. def _n1(x):
  132. return sum(x == 1)
  133. def _get_sv_bins(df, x_column, y_column, special_values):
  134. # special_values_bins
  135. sv_bin_list = []
  136. for special in special_values:
  137. dtm = df[df[x_column] == special]
  138. if len(dtm) != 0:
  139. dtm['bin'] = [str(special)] * len(dtm)
  140. binning = dtm.groupby(['bin'], group_keys=False)[y_column].agg(
  141. [_n0, _n1]).reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
  142. binning['is_special_values'] = [True] * len(binning)
  143. sv_bin_list.append(binning)
  144. return sv_bin_list
  145. def _get_bins(df, x_column, y_column, breaks_list):
  146. dtm = pd.DataFrame({'y': df[y_column], 'value': df[x_column]})
  147. bstbrks = [-np.inf] + breaks_list + [np.inf]
  148. labels = ['[{},{})'.format(bstbrks[i], bstbrks[i + 1]) for i in range(len(bstbrks) - 1)]
  149. dtm.loc[:, 'bin'] = pd.cut(dtm['value'], bstbrks, right=False, labels=labels)
  150. dtm['bin'] = dtm['bin'].astype(str)
  151. bins = dtm.groupby(['bin'], group_keys=False)['y'].agg([_n0, _n1]) \
  152. .reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
  153. bins['is_special_values'] = [False] * len(bins)
  154. return bins
  155. def _calculation_iv(bins):
  156. bins['count'] = bins['good'] + bins['bad']
  157. bins['badprob'] = bins['bad'] / bins['count']
  158. # 单调性判断
  159. bad_prob = bins[bins['is_special_values'] == False]['badprob'].values.tolist()
  160. if not f_judge_monto(bad_prob):
  161. return -1
  162. # 计算iv
  163. infovalue = pd.DataFrame({'good': bins['good'], 'bad': bins['bad']}) \
  164. .replace(0, 0.9) \
  165. .assign(
  166. DistrBad=lambda x: x.bad / sum(x.bad),
  167. DistrGood=lambda x: x.good / sum(x.good)
  168. ) \
  169. .assign(iv=lambda x: (x.DistrBad - x.DistrGood) * np.log(x.DistrBad / x.DistrGood)) \
  170. .iv
  171. bins['bin_iv'] = infovalue
  172. bins['total_iv'] = bins['bin_iv'].sum()
  173. iv = bins['total_iv'].values[0]
  174. return iv
  175. train_data = data.train_data
  176. train_data_filter = train_data[~train_data[x_column].isin(special_values)]
  177. train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True)
  178. train_data_x = train_data_filter[x_column]
  179. test_data = data.test_data
  180. test_data_filter = None
  181. if test_data is not None and len(test_data) != 0:
  182. test_data_filter = test_data[~test_data[x_column].isin(special_values)]
  183. test_data_filter = test_data_filter.sort_values(by=x_column, ascending=True)
  184. # 构造数据切分点
  185. # 计算 2 - 5 箱的情况
  186. distribute_list = []
  187. points_list = []
  188. for bin_num in list(range(2, 6)):
  189. distribute_list.extend(_f_distribute_balls(int(1 / interval), bin_num))
  190. for distribute in distribute_list:
  191. point_list_cache = []
  192. point_percentile_list = [sum(distribute[0:idx + 1]) * interval for idx, _ in enumerate(distribute[0:-1])]
  193. for point_percentile in point_percentile_list:
  194. point = train_data_x.iloc[int(len(train_data_x) * point_percentile)]
  195. if point not in point_list_cache:
  196. point_list_cache.append(point)
  197. if point_list_cache not in points_list:
  198. points_list.append(point_list_cache)
  199. # IV与单调性过滤
  200. iv_max = 0
  201. breaks_list = []
  202. train_sv_bin_list = _get_sv_bins(train_data, x_column, data.y_column, special_values)
  203. test_sv_bin_list = None
  204. if test_data_filter is not None:
  205. test_sv_bin_list = _get_sv_bins(test_data, x_column, data.y_column, special_values)
  206. for point_list in tqdm(points_list):
  207. train_bins = _get_bins(train_data_filter, x_column, data.y_column, point_list)
  208. # 与special_values合并计算iv
  209. for sv_bin in train_sv_bin_list:
  210. train_bins = pd.concat((train_bins, sv_bin))
  211. train_iv = _calculation_iv(train_bins)
  212. # 只限制训练集的单调性与iv值大小
  213. if train_iv < 0.03:
  214. continue
  215. test_iv = 0
  216. if test_data_filter is not None:
  217. test_bins = _get_bins(test_data_filter, x_column, data.y_column, point_list)
  218. for sv_bin in test_sv_bin_list:
  219. test_bins = pd.concat((test_bins, sv_bin))
  220. test_iv = _calculation_iv(test_bins)
  221. iv = train_iv + test_iv
  222. if iv > iv_max:
  223. iv_max = iv
  224. breaks_list = point_list
  225. return iv_max, breaks_list
  226. if __name__ == "__main__":
  227. dat = sc.germancredit()
  228. dat["creditability"] = dat["creditability"].apply(lambda x: 1 if x == "bad" else 0)
  229. data = DataSplitEntity(dat[:700], None, dat[700:], "creditability")
  230. iv_max, breaks_list = f_get_best_bins(data, "duration_in_month", special_values=[24, 12])
  231. print(iv_max, breaks_list)
  232. pass