strategy_iv.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. # -*- coding:utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2024/1/2
  5. @desc: iv值及单调性筛选类
  6. """
  7. from itertools import combinations_with_replacement
  8. from typing import List, Dict
  9. import numpy as np
  10. import pandas as pd
  11. import scorecardpy as sc
  12. from entitys import DataSplitEntity, CandidateFeatureEntity
  13. from .feature_utils import f_judge_monto, f_get_corr
  14. from .filter_strategy_base import FilterStrategyBase
  15. class StrategyIv(FilterStrategyBase):
  16. def __init__(self, *args, **kwargs):
  17. super().__init__(*args, **kwargs)
  18. def _f_corr_filter(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity]) -> List[str]:
  19. # 相关性剔除变量
  20. corr_threshold = self.data_process_config.corr_threshold
  21. train_data = data.train_data
  22. x_columns_candidate = list(candidate_dict.keys())
  23. corr_df = f_get_corr(train_data[x_columns_candidate])
  24. corr_dict = corr_df.to_dict()
  25. for column, corr in corr_dict.items():
  26. if column not in x_columns_candidate:
  27. continue
  28. for challenger_column, challenger_corr in corr.items():
  29. if challenger_corr < corr_threshold or column == challenger_column \
  30. or challenger_column not in x_columns_candidate:
  31. continue
  32. iv_max = candidate_dict[column].iv_max
  33. challenger_iv_max = candidate_dict[challenger_column].iv_max
  34. if iv_max > challenger_iv_max:
  35. x_columns_candidate.remove(challenger_column)
  36. else:
  37. x_columns_candidate.remove(column)
  38. break
  39. return x_columns_candidate
  40. def _f_wide_filter(self, data: DataSplitEntity) -> List[str]:
  41. # 粗筛变量
  42. train_data = data.train_data
  43. y_column = self.data_process_config.y_column
  44. iv_threshold_wide = self.data_process_config.iv_threshold_wide
  45. x_columns_candidate = self.data_process_config.x_columns_candidate
  46. if x_columns_candidate is None or len(x_columns_candidate) == 0:
  47. x_columns_candidate = train_data.columns.tolist().remove(y_column)
  48. bins = sc.woebin(train_data[x_columns_candidate + [y_column]], y=y_column)
  49. bins_iv_list = []
  50. columns = []
  51. for column, bin in bins.items():
  52. total_iv = bin['total_iv'][0]
  53. if total_iv < iv_threshold_wide:
  54. continue
  55. bins_iv_list.append({column: total_iv})
  56. columns.append(column)
  57. bins_iv_list = bins_iv_list.sort(key=lambda x: list(x.values())[0], reverse=True)
  58. return columns
  59. def _f_get_best_bins(self, data: DataSplitEntity, x_column: str):
  60. # 贪婪搜索【训练集】及【测试集】加起来【iv】值最高的且【单调】的分箱
  61. interval = self.data_process_config.bin_search_interval
  62. iv_threshold = self.data_process_config.iv_threshold
  63. special_values = self.data_process_config.get_special_values(x_column)
  64. y_column = self.data_process_config.y_column
  65. sample_rate = self.data_process_config.sample_rate
  66. def _n0(x):
  67. return sum(x == 0)
  68. def _n1(x):
  69. return sum(x == 1)
  70. def _f_distribute_balls(balls, boxes):
  71. # 计算在 balls - 1 个空位中放入 boxes - 1 个隔板的方法数
  72. total_ways = combinations_with_replacement(range(balls + boxes - 1), boxes - 1)
  73. distribute_list = []
  74. # 遍历所有可能的隔板位置
  75. for combo in total_ways:
  76. # 根据隔板位置分配球
  77. distribution = [0] * boxes
  78. start = 0
  79. for i, divider in enumerate(combo):
  80. distribution[i] = divider - start + 1
  81. start = divider + 1
  82. distribution[-1] = balls - start # 最后一个箱子的球数
  83. # 确保每个箱子至少有一个球
  84. if all(x > 0 for x in distribution):
  85. distribute_list.append(distribution)
  86. return distribute_list
  87. def _get_sv_bins(df, x_column, y_column, special_values):
  88. # special_values_bins
  89. sv_bin_list = []
  90. for special in special_values:
  91. dtm = df[df[x_column] == special]
  92. if len(dtm) != 0:
  93. dtm['bin'] = [str(special)] * len(dtm)
  94. binning = dtm.groupby(['bin'], group_keys=False)[y_column].agg(
  95. [_n0, _n1]).reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
  96. binning['is_special_values'] = [True] * len(binning)
  97. sv_bin_list.append(binning)
  98. return sv_bin_list
  99. def _get_bins(df, x_column, y_column, breaks_list):
  100. dtm = pd.DataFrame({'y': df[y_column], 'value': df[x_column]})
  101. bstbrks = [-np.inf] + breaks_list + [np.inf]
  102. labels = ['[{},{})'.format(bstbrks[i], bstbrks[i + 1]) for i in range(len(bstbrks) - 1)]
  103. dtm.loc[:, 'bin'] = pd.cut(dtm['value'], bstbrks, right=False, labels=labels)
  104. dtm['bin'] = dtm['bin'].astype(str)
  105. bins = dtm.groupby(['bin'], group_keys=False)['y'].agg([_n0, _n1]) \
  106. .reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
  107. bins['is_special_values'] = [False] * len(bins)
  108. return bins
  109. def _calculation_iv(bins):
  110. bins['count'] = bins['good'] + bins['bad']
  111. bins['badprob'] = bins['bad'] / bins['count']
  112. # 单调性判断
  113. bad_prob = bins[bins['is_special_values'] == False]['badprob'].values.tolist()
  114. if not f_judge_monto(bad_prob):
  115. return -1
  116. # 计算iv
  117. infovalue = pd.DataFrame({'good': bins['good'], 'bad': bins['bad']}) \
  118. .replace(0, 0.9) \
  119. .assign(
  120. DistrBad=lambda x: x.bad / sum(x.bad),
  121. DistrGood=lambda x: x.good / sum(x.good)
  122. ) \
  123. .assign(iv=lambda x: (x.DistrBad - x.DistrGood) * np.log(x.DistrBad / x.DistrGood)) \
  124. .iv
  125. bins['bin_iv'] = infovalue
  126. bins['total_iv'] = bins['bin_iv'].sum()
  127. iv = bins['total_iv'].values[0]
  128. return iv
  129. def _f_sampling(distribute_list: list, sample_rate: float):
  130. # 采样,完全贪婪搜索耗时太长
  131. sampled_list = distribute_list[::int(1 / sample_rate)]
  132. return sampled_list
  133. train_data = data.train_data
  134. train_data_filter = train_data[~train_data[x_column].isin(special_values)]
  135. train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True)
  136. train_data_x = train_data_filter[x_column]
  137. test_data = data.test_data
  138. test_data_filter = None
  139. if test_data is not None and len(test_data) != 0:
  140. test_data_filter = test_data[~test_data[x_column].isin(special_values)]
  141. test_data_filter = test_data_filter.sort_values(by=x_column, ascending=True)
  142. # 构造数据切分点
  143. # 计算 2 - 5 箱的情况
  144. distribute_list = []
  145. points_list = []
  146. for bin_num in list(range(2, 6)):
  147. distribute_list_cache = _f_distribute_balls(int(1 / interval), bin_num)
  148. # 4箱及以上得采样,不然耗时太久
  149. sample_num = 1000 * sample_rate
  150. if sample_rate <= 0.15:
  151. sample_num *= 2
  152. if bin_num == 4 and len(distribute_list_cache) >= sample_num:
  153. distribute_list_cache = _f_sampling(distribute_list_cache, sample_num / len(distribute_list_cache))
  154. sample_num = 4000 * sample_rate
  155. if bin_num == 5 and len(distribute_list_cache) >= sample_num:
  156. distribute_list_cache = _f_sampling(distribute_list_cache, sample_num / len(distribute_list_cache))
  157. distribute_list.extend(distribute_list_cache)
  158. for distribute in distribute_list:
  159. point_list_cache = []
  160. point_percentile_list = [sum(distribute[0:idx + 1]) * interval for idx, _ in enumerate(distribute[0:-1])]
  161. for point_percentile in point_percentile_list:
  162. point = train_data_x.iloc[int(len(train_data_x) * point_percentile)]
  163. if point not in point_list_cache:
  164. point_list_cache.append(point)
  165. if point_list_cache not in points_list:
  166. points_list.append(point_list_cache)
  167. # IV与单调性过滤
  168. iv_max = 0
  169. breaks_list = []
  170. train_sv_bin_list = _get_sv_bins(train_data, x_column, y_column, special_values)
  171. test_sv_bin_list = None
  172. if test_data_filter is not None:
  173. test_sv_bin_list = _get_sv_bins(test_data, x_column, y_column, special_values)
  174. from tqdm import tqdm
  175. for point_list in tqdm(points_list):
  176. train_bins = _get_bins(train_data_filter, x_column, y_column, point_list)
  177. # 与special_values合并计算iv
  178. for sv_bin in train_sv_bin_list:
  179. train_bins = pd.concat((train_bins, sv_bin))
  180. train_iv = _calculation_iv(train_bins)
  181. # 只限制训练集的单调性与iv值大小
  182. if train_iv < iv_threshold:
  183. continue
  184. test_iv = 0
  185. if test_data_filter is not None:
  186. test_bins = _get_bins(test_data_filter, x_column, y_column, point_list)
  187. for sv_bin in test_sv_bin_list:
  188. test_bins = pd.concat((test_bins, sv_bin))
  189. test_iv = _calculation_iv(test_bins)
  190. iv = train_iv + test_iv
  191. if iv > iv_max:
  192. iv_max = iv
  193. breaks_list = point_list
  194. return iv_max, breaks_list
  195. def filter(self, data: DataSplitEntity, *args, **kwargs) -> List[CandidateFeatureEntity]:
  196. # 粗筛
  197. x_columns_candidate = self._f_wide_filter(data)
  198. candidate_num = self.data_process_config.candidate_num
  199. candidate_dict: Dict[str, CandidateFeatureEntity] = {}
  200. for x_column in x_columns_candidate:
  201. iv_max, breaks_list = self._f_get_best_bins(data, x_column)
  202. candidate_dict[x_column] = CandidateFeatureEntity(x_column, breaks_list, iv_max)
  203. # 相关性进一步剔除变量
  204. x_columns_candidate = self._f_corr_filter(data, candidate_dict)
  205. candidate_list: List[CandidateFeatureEntity] = []
  206. for x_column, v in candidate_dict.items():
  207. if x_column in x_columns_candidate:
  208. candidate_list.append(v)
  209. candidate_list.sort(key=lambda x: x.iv_max, reverse=True)
  210. return candidate_list[0:candidate_num]