feature_utils.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. # -*- coding:utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2023/12/28
  5. @desc: 特征工具类
  6. """
  7. import pandas as pd
  8. from sklearn.preprocessing import KBinsDiscretizer
  9. from entitys import DataSplitEntity
  10. from enums import BinsStrategyEnum
  11. import scorecardpy as sc
  12. import toad as td
  13. def f_get_bins(data: DataSplitEntity, feat: str, strategy: str = 'quantile', nbins: int = 10) -> pd.DataFrame:
  14. # 等频分箱
  15. if strategy == BinsStrategyEnum.QUANTILE.value:
  16. kbin_encoder = KBinsDiscretizer(n_bins=nbins, encode='ordinal', strategy='quantile')
  17. feature_binned = kbin_encoder.fit_transform(data[feat])
  18. return feature_binned.astype(int).astype(str)
  19. # 等宽分箱
  20. if strategy == BinsStrategyEnum.WIDTH.value:
  21. bin_width = (data.train_data()[feat].max() - data.train_data()[feat].min()) / nbins
  22. return pd.cut(data.train_data()[feat], bins=nbins, labels=[f'Bin_{i}' for i in range(1, nbins + 1)])
  23. # 使用toad分箱
  24. '''
  25. c = td.transfrom.Combiner()
  26. # method参数需要根据toad指定的几种方法名称选择
  27. c.fit(data, y = 'target', method = strategy, min_samples=None, n_bins = nbins, empty_separate = False)
  28. # 返回toad分箱combiner,用于训练集和测试集的分箱
  29. # 可使用c.export()[feature]查看某一特征的分箱临界值
  30. return c
  31. '''
  32. # 此函数入参应为scorecardpy进行woebin函数转换后的dataframe
  33. def f_get_bins_display(bins_info: pd.DataFrame) -> pd.DataFrame:
  34. df_list = []
  35. for col, bin_data in bins_info.items():
  36. tmp_df = pd.DataFrame(bin_data)
  37. df_list.append(tmp_df)
  38. result_df = pd.concat(df_list, ignore_index=True)
  39. total_bad = result_df['bad'].sum()
  40. total_cnt = result_df['count'].sum()
  41. # 整体的坏样本率
  42. br_overall = total_bad / total_cnt
  43. result_df['lift'] = result_df['badprob'] / br_overall
  44. result_df = \
  45. result_df.sort_values(['total_iv', 'variable'], ascending=False).set_index(['variable', 'total_iv', 'bin']) \
  46. [['count_distr', 'count', 'good', 'bad', 'badprob', 'lift', 'bin_iv', 'woe']]
  47. return result_df.style.format(subset=['count', 'good', 'bad'], precision=0).format(
  48. subset=['count_distr', 'bad', 'lift',
  49. 'badprob', 'woe', 'bin_iv'], precision=4).bar(subset=['badprob', 'bin_iv', 'lift'],
  50. color=['#d65f58', '#5fbb7a'])
  51. # 此函数筛除变量分箱不单调或非U型的变量
  52. def f_bins_filter(bins: pd.DataFrame, cols: list) -> list:
  53. result_cols = []
  54. # 遍历原始变量列表
  55. for tmp_col in cols:
  56. tmp_br = bins[tmp_col]['bad_prob'].values.tolist()
  57. tmp_len = len(tmp_br)
  58. if tmp_len <= 2:
  59. result_cols.append(tmp_col)
  60. else:
  61. tmp_judge = f_judge_monto(tmp_br)
  62. # f_judge_monto 函数返回1表示list单调,0表示非单调
  63. if tmp_judge:
  64. result_cols.append(tmp_col)
  65. return result_cols
  66. # 此函数判断list的单调性,允许至多N次符号变化
  67. def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int:
  68. start_tr = bd_list[1] - bd_list[0]
  69. tmp_len = len(bd_list)
  70. pos_neg_flag = 0
  71. for i in range(2, tmp_len):
  72. tmp_tr = bd_list[i] - bd_list[i - 1]
  73. # 后一位bad_rate减前一位bad_rate,保证bad_rate的单调性
  74. # 记录符号变化, 允许 最多一次符号变化,即U型分布
  75. if (tmp_tr >= 0 and start_tr >= 0) or (tmp_tr <= 0 and start_tr <= 0):
  76. # 满足趋势保持,查看下一位
  77. continue
  78. else:
  79. # 记录一次符号变化
  80. pos_neg_flag += 1
  81. # 记录满足趋势要求的变量
  82. if pos_neg_flag <= pos_neg_cnt:
  83. return True
  84. return False
  85. def f_get_woe(data: DataSplitEntity, c: td.transform.Combiner, to_drop: list) -> pd.DataFrame:
  86. transer = td.transform.WOETransformer()
  87. # 根据训练数据来训练woe转换器,并选择目标变量和排除变量
  88. train_woe = transer.fit_transform(c.transform(data.train_data()), data.train_data()['target'],
  89. exclude=to_drop + ['target'])
  90. test_woe = transer.transform(c.transfrom(data.test_data()))
  91. oot_woe = transer.transform(c.transform(data.val_data()))
  92. return train_woe, test_woe, oot_woe
  93. def f_get_iv(data: DataSplitEntity) -> pd.DataFrame:
  94. # 计算前,先排除掉不需要计算IV的cols
  95. return td.quality(data, 'target', iv_only=True)
  96. def f_get_psi(train_data: DataSplitEntity, oot_data: DataSplitEntity) -> pd.DataFrame:
  97. # 计算前,先排除掉不需要的cols
  98. return td.metrics.PSI(train_data, oot_data)
  99. def f_get_corr(data: DataSplitEntity, meth: str = 'spearman') -> pd.DataFrame:
  100. return data.train_data().corr(method=meth)
  101. def f_get_ivf(data: DataSplitEntity) -> pd.DataFrame:
  102. pass
  103. def f_get_best_bins(data: DataSplitEntity, x_column: str, special_values: list = []):
  104. interval = 0.05
  105. # 贪婪搜索训练集及测试集iv值最高的且单调的分箱
  106. train_data = data.train_data
  107. train_data_filter = train_data[~train_data[x_column].isin(special_values)]
  108. train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True)
  109. # 特殊值单独一箱
  110. # train_data_special_list = []
  111. # for special in special_values:
  112. # df_cache = train_data[train_data[x_column] == special]
  113. # if len(df_cache) != 0:
  114. # train_data_special_list.append(df_cache)
  115. x_train_data = train_data_filter[x_column]
  116. # 计算 2 - 5 箱的情况
  117. bin_num_list = list(range(2, 6))
  118. for bin_num in bin_num_list:
  119. # 构造数据切分点
  120. point_list = []
  121. init_point_percentile_list = [interval * i for i in range(1, bin_num)]
  122. init_point_percentile_list.append(1 - point_list[-1])
  123. for point_percentile in init_point_percentile_list:
  124. point = x_train_data.iloc[int(len(x_train_data) * point_percentile)]
  125. if point not in point_list:
  126. point_list.append(point)
  127. # 获取分箱结果
  128. bins = sc.woebin(train_data, y=data.y_column, breaks_list=point_list)
  129. # 单调性判断
  130. pass