entity.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2025/2/14
  5. @desc:
  6. """
  7. from typing import Union, List
  8. import pandas as pd
  9. from enums import ContextEnum
  10. from init import context
  11. class BinInfo():
  12. def __init__(self,
  13. x_column: str = None,
  14. bin_num: int = None,
  15. points: list = None,
  16. is_auto_bins: int = None,
  17. train_iv: float = None,
  18. test_iv: float = None,
  19. iv: float = None,
  20. is_qualified_iv_train: int = None,
  21. monto_shift_nsv: int = None,
  22. is_qualified_monto_train_nsv: int = None,
  23. trend_shift_nsv: int = None,
  24. is_qualified_trend_nsv: int = None,
  25. psi: float = None,
  26. is_qualified_psi: int = None,
  27. vif: float = None,
  28. ):
  29. self.x_column = x_column
  30. self.bin_num = bin_num
  31. self.points = points
  32. self.is_auto_bins = is_auto_bins
  33. self.train_iv = train_iv
  34. self.test_iv = test_iv
  35. self.iv = iv
  36. self.is_qualified_iv_train = is_qualified_iv_train
  37. self.monto_shift_nsv = monto_shift_nsv
  38. self.is_qualified_monto_train_nsv = is_qualified_monto_train_nsv
  39. self.trend_shift_nsv = trend_shift_nsv
  40. self.is_qualified_trend_nsv = is_qualified_trend_nsv
  41. self.psi = psi
  42. self.is_qualified_psi = is_qualified_psi
  43. self.vif = vif
  44. def to_dict(self):
  45. return self.__dict__
  46. @staticmethod
  47. def ivTopN(data: dict, top_n: int):
  48. candidate = list(data.values())
  49. candidate.sort(key=lambda x: x.iv, reverse=True)
  50. filter_ivtop_overview = ""
  51. filter_ivtop_detail = []
  52. if top_n < len(candidate):
  53. for bin_info in candidate[top_n:]:
  54. filter_ivtop_overview = f"{filter_ivtop_overview}{bin_info.x_column} 因为ivtop【{bin_info.iv}】被剔除\n"
  55. filter_ivtop_detail.append(bin_info.x_column)
  56. candidate = candidate[0:top_n]
  57. context.set_filter_info(ContextEnum.FILTER_IVTOP, filter_ivtop_overview, filter_ivtop_detail)
  58. return {bin_info.x_column: bin_info for bin_info in candidate}
  59. @staticmethod
  60. def ofConvertByDict(data: dict):
  61. bin_info = BinInfo()
  62. for k, v in data.items():
  63. bin_info.__setattr__(k, v)
  64. return bin_info
  65. class HomologousBinInfo():
  66. """
  67. 同一变量不同分箱下的特征信息
  68. """
  69. def __init__(self, x_column: str, is_auto_bins: int = None):
  70. self.x_column = x_column
  71. self.is_auto_bins = is_auto_bins
  72. self.bins_info: List[BinInfo] = []
  73. def add(self, bin_info: BinInfo):
  74. self.bins_info.append(bin_info)
  75. def convert_to_df(self) -> pd.DataFrame:
  76. data = []
  77. for bin_info in self.bins_info:
  78. data.append(bin_info.to_dict())
  79. df_bins_info = pd.DataFrame(data=data)
  80. return df_bins_info
  81. def drop_reason(self, ) -> str:
  82. df_bins_info = self.convert_to_df()
  83. df_bins_info_filter1 = df_bins_info[df_bins_info["is_qualified_iv_train"] == 1]
  84. if len(df_bins_info_filter1) == 0:
  85. return f"因为train_iv最大值【{df_bins_info['train_iv'].max()}】小于阈值被剔除"
  86. df_bins_info_filter2 = df_bins_info[
  87. (df_bins_info["is_qualified_iv_train"] == 1)
  88. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  89. ]
  90. if len(df_bins_info_filter2) == 0:
  91. return f"因为monto单调变化最小次数【{df_bins_info_filter1['monto_shift_nsv'].min()}】大于阈值被剔除"
  92. df_bins_info_filter3 = df_bins_info[
  93. (df_bins_info["is_qualified_iv_train"] == 1)
  94. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  95. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  96. ]
  97. if len(df_bins_info_filter3) == 0:
  98. return f"因为trend变量趋势一致性变化最小次数【{df_bins_info_filter2['trend_shift_nsv'].min()}】大于阈值被剔除"
  99. df_bins_info_filter4 = df_bins_info[
  100. (df_bins_info["is_qualified_iv_train"] == 1)
  101. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  102. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  103. & (df_bins_info["is_qualified_psi"] == 1)
  104. ]
  105. if len(df_bins_info_filter4) == 0:
  106. return f"因为psi【{df_bins_info_filter3['psi'].min()}】大于阈值被剔除"
  107. print(df_bins_info_filter4)
  108. return f"因为【未知原因】被剔除"
  109. def filter(self) -> Union[BinInfo, None]:
  110. # iv psi 变量单调性 变量趋势一致性 筛选
  111. df_bins_info = self.convert_to_df()
  112. # 人工指定切分点的直接返回
  113. if not self.is_auto_bins:
  114. return BinInfo.ofConvertByDict(df_bins_info.iloc[0].to_dict())
  115. df_bins_info_filter = df_bins_info[
  116. (df_bins_info["is_qualified_iv_train"] == 1)
  117. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  118. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  119. & (df_bins_info["is_qualified_psi"] == 1)
  120. ]
  121. # 选取单调性变化最少,iv最大,psi 最小的分箱
  122. df_bins_info_filter.sort_values(by=["monto_shift_nsv", "trend_shift_nsv", "iv", "psi"],
  123. ascending=[True, True, False, True], inplace=True)
  124. if len(df_bins_info_filter) != 0:
  125. return BinInfo.ofConvertByDict(df_bins_info_filter.iloc[0].to_dict())
  126. return None
  127. def get_best_bins(self) -> List[BinInfo]:
  128. df_bins_info = self.convert_to_df()
  129. bin_num_list = df_bins_info["bin_num"].unique().tolist()
  130. bin_num_list.sort()
  131. bins_info = []
  132. for bin_num in bin_num_list:
  133. df_bins_info_filter = df_bins_info[df_bins_info["bin_num"] == bin_num]
  134. df_bins_info_filter.sort_values(by=["monto_shift_nsv", "trend_shift_nsv", "iv", "psi"],
  135. ascending=[True, True, False, True], inplace=True)
  136. bin_info_dict1 = df_bins_info_filter.iloc[0].to_dict()
  137. bins_info.append(BinInfo.ofConvertByDict(bin_info_dict1))
  138. # 获取没单调性排序的,考虑到age这种变量允许有转折的
  139. df_bins_info_filter.sort_values(by=["trend_shift_nsv", "iv", "psi"],
  140. ascending=[True, False, True], inplace=True)
  141. bin_info_dict2 = df_bins_info_filter.iloc[0].to_dict()
  142. if bin_info_dict1["monto_shift_nsv"] != bin_info_dict2["monto_shift_nsv"]:
  143. bins_info.append(BinInfo.ofConvertByDict(bin_info_dict2))
  144. return bins_info