utils.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. # -*- coding:utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2023/12/28
  5. @desc: 特征工具类
  6. """
  7. import os
  8. from typing import Union
  9. import numpy as np
  10. import pandas as pd
  11. from statsmodels.stats.outliers_influence import variance_inflation_factor as vif
  12. from commom import GeneralException
  13. from enums import ResultCodesEnum
  14. FORMAT_DICT = {
  15. # 比例类 -1 - 1
  16. "bin_rate1": np.arange(-1, 1 + 0.1, 0.1),
  17. # 次数类1 0 -10
  18. "bin_cnt1": np.arange(0, 11, 1),
  19. # 次数类2 0 - 20
  20. "bin_cnt2": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 17, 20],
  21. # 次数类3 0 - 50
  22. "bin_cnt3": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 35, 40, 45, 50],
  23. # 次数类4 0 - 100
  24. "bin_cnt4": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30, 40, 50, 80, 100],
  25. # 金额类1 0 - 1w
  26. "bin_amt1": np.arange(0, 1.1e4, 1e3),
  27. # 金额类2 0 - 5w
  28. "bin_amt2": np.arange(0, 5.5e4, 5e3),
  29. # 金额类3 0 - 10w
  30. "bin_amt3": np.arange(0, 11e4, 1e4),
  31. # 金额类4 0 - 20w
  32. "bin_amt4": [0, 1e4, 2e4, 3e4, 4e4, 5e4, 8e4, 10e4, 15e4, 20e4],
  33. # 金额类5 0 - 100w
  34. "bin_amt5": [0, 5e4, 10e4, 15e4, 20e4, 25e4, 30e4, 40e4, 50e4, 100e4],
  35. # 年龄类
  36. "bin_age": [20, 25, 30, 35, 40, 45, 50, 55, 60, 65],
  37. }
  38. # 粗分箱
  39. def f_format_bin(data_describe: pd.Series, raw_v):
  40. percent10 = data_describe["10%"]
  41. percent90 = data_describe["90%"]
  42. format_v = raw_v
  43. # 筛选最合适的标准化分箱节点
  44. bin = None
  45. for k, v_list in FORMAT_DICT.items():
  46. bin_min = min(v_list)
  47. bin_max = max(v_list)
  48. if percent10 >= bin_min and percent90 <= bin_max:
  49. if bin is None:
  50. bin = (k, bin_max)
  51. elif bin[1] > bin_max:
  52. bin = (k, bin_max)
  53. if bin is None:
  54. return format_v
  55. # 选择分箱内适合的切分点
  56. v_list = FORMAT_DICT[bin[0]]
  57. for idx in range(1, len(v_list)):
  58. v_left = v_list[idx - 1]
  59. v_right = v_list[idx]
  60. # 就近原则
  61. if v_left <= raw_v <= v_right:
  62. format_v = v_right if (raw_v - v_left) - (v_right - raw_v) > 0 else v_left
  63. if format_v not in v_list:
  64. if format_v > v_list[-1]:
  65. format_v = v_list[-1]
  66. if format_v < v_list[0]:
  67. format_v = v_list[0]
  68. return format_v
  69. # 单调性变化次数
  70. def f_monto_shift(badprobs: list) -> int:
  71. if len(badprobs) <= 2:
  72. return 0
  73. before = badprobs[1] - badprobs[0]
  74. change_cnt = 0
  75. for i in range(2, len(badprobs)):
  76. next = badprobs[i] - badprobs[i - 1]
  77. # 后一位bad_rate减前一位bad_rate,保证bad_rate的单调性
  78. if (next >= 0 and before >= 0) or (next <= 0 and before <= 0):
  79. # 满足趋势保持,查看下一位
  80. continue
  81. else:
  82. # 记录一次符号变化
  83. before = next
  84. change_cnt += 1
  85. return change_cnt
  86. # 变量趋势一致变化次数
  87. def f_trend_shift(train_badprobs: list, test_badprobs: list) -> int:
  88. if len(train_badprobs) != len(test_badprobs) or len(train_badprobs) < 2 or len(test_badprobs) < 2:
  89. return 0
  90. train_monto = np.array(train_badprobs[1:]) - np.array(train_badprobs[0:-1])
  91. train_monto = np.where(train_monto >= 0, 1, -1)
  92. test_monto = np.array(test_badprobs[1:]) - np.array(test_badprobs[0:-1])
  93. test_monto = np.where(test_monto >= 0, 1, -1)
  94. contrast = train_monto - test_monto
  95. return len(contrast[contrast != 0])
  96. def f_get_psi(train_bins, test_bins):
  97. train_bins['count'] = train_bins['good'] + train_bins['bad']
  98. train_bins['proportion'] = train_bins['count'] / train_bins['count'].sum()
  99. test_bins['count'] = test_bins['good'] + test_bins['bad']
  100. test_bins['proportion'] = test_bins['count'] / test_bins['count'].sum()
  101. psi = (train_bins['proportion'] - test_bins['proportion']) * np.log(
  102. train_bins['proportion'] / test_bins['proportion'])
  103. psi = psi.reset_index()
  104. psi = psi.rename(columns={"proportion": "psi"})
  105. return psi["psi"].sum().round(3)
  106. def f_get_corr(data: pd.DataFrame, meth: str = 'spearman') -> pd.DataFrame:
  107. return data.corr(method=meth)
  108. def f_get_vif(data: pd.DataFrame) -> Union[pd.DataFrame, None]:
  109. if len(data.columns.to_list()) <= 1:
  110. return None
  111. vif_v = [round(vif(data.values, data.columns.get_loc(i)), 3) for i in data.columns]
  112. df_vif = pd.DataFrame()
  113. df_vif["变量"] = [column.replace("_woe", "") for column in data.columns]
  114. df_vif['vif'] = vif_v
  115. return df_vif
  116. def f_woebin_load(path: str):
  117. if os.path.isdir(path):
  118. path = os.path.join(path, "feature.csv")
  119. if not os.path.isfile(path) or "feature.csv" not in path:
  120. raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"特征信息【feature.csv】不存在")
  121. df_woebin = pd.read_csv(path)
  122. variables = df_woebin["variable"].unique().tolist()
  123. sc_woebin = {}
  124. for variable in variables:
  125. sc_woebin[variable] = df_woebin[df_woebin["variable"] == variable]
  126. print(f"feature load from【{path}】success.")
  127. return sc_woebin