entity.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2025/2/14
  5. @desc:
  6. """
  7. from typing import Union, List
  8. import pandas as pd
  9. from enums import ContextEnum
  10. from init import context
  11. class BinInfo():
  12. def __init__(self,
  13. x_column: str = None,
  14. bin_num: int = None,
  15. points: list = None,
  16. is_auto_bins: int = None,
  17. train_iv: float = None,
  18. test_iv: float = None,
  19. iv: float = None,
  20. is_qualified_iv_train: int = None,
  21. monto_shift_nsv: int = None,
  22. is_qualified_monto_train_nsv: int = None,
  23. trend_shift_nsv: int = None,
  24. is_qualified_trend_nsv: int = None,
  25. psi: float = None,
  26. is_qualified_psi: int = None,
  27. vif: float = None,
  28. ):
  29. self.x_column = x_column
  30. self.bin_num = bin_num
  31. self.points = points
  32. self.is_auto_bins = is_auto_bins
  33. self.train_iv = train_iv
  34. self.test_iv = test_iv
  35. self.iv = iv
  36. self.is_qualified_iv_train = is_qualified_iv_train
  37. self.monto_shift_nsv = monto_shift_nsv
  38. self.is_qualified_monto_train_nsv = is_qualified_monto_train_nsv
  39. self.trend_shift_nsv = trend_shift_nsv
  40. self.is_qualified_trend_nsv = is_qualified_trend_nsv
  41. self.psi = psi
  42. self.is_qualified_psi = is_qualified_psi
  43. self.vif = vif
  44. def to_dict(self):
  45. return self.__dict__
  46. @staticmethod
  47. def ivTopN(data: dict, top_n: int):
  48. candidate = list(data.values())
  49. candidate.sort(key=lambda x: x.iv, reverse=True)
  50. filter_ivtop_overview = ""
  51. filter_ivtop_detail = []
  52. if top_n < len(candidate):
  53. for bin_info in candidate[top_n:]:
  54. filter_ivtop_overview = f"{filter_ivtop_overview}{bin_info.x_column} 因为ivtop【{bin_info.iv}】被剔除\n"
  55. filter_ivtop_detail.append(bin_info.x_column)
  56. candidate = candidate[0:top_n]
  57. context.set_filter_info(ContextEnum.FILTER_IVTOP, filter_ivtop_overview, filter_ivtop_detail)
  58. return {bin_info.x_column: bin_info for bin_info in candidate}
  59. @staticmethod
  60. def ofConvertByDict(data: dict):
  61. bin_info = BinInfo()
  62. for k, v in data.items():
  63. bin_info.__setattr__(k, v)
  64. return bin_info
  65. class HomologousBinInfo():
  66. """
  67. 同一变量不同分箱下的特征信息
  68. """
  69. def __init__(self, x_column: str, is_auto_bins: int = None, is_include: bool = False):
  70. self.x_column = x_column
  71. self.is_auto_bins = is_auto_bins
  72. self.is_include = is_include
  73. self.bins_info: List[BinInfo] = []
  74. def add(self, bin_info: BinInfo):
  75. self.bins_info.append(bin_info)
  76. def convert_to_df(self) -> pd.DataFrame:
  77. data = []
  78. for bin_info in self.bins_info:
  79. data.append(bin_info.to_dict())
  80. df_bins_info = pd.DataFrame(data=data)
  81. return df_bins_info
  82. def drop_reason(self, ) -> str:
  83. df_bins_info = self.convert_to_df()
  84. df_bins_info_filter1 = df_bins_info[df_bins_info["is_qualified_iv_train"] == 1]
  85. if len(df_bins_info_filter1) == 0:
  86. return f"因为train_iv最大值【{df_bins_info['train_iv'].max()}】小于阈值被剔除"
  87. df_bins_info_filter2 = df_bins_info[
  88. (df_bins_info["is_qualified_iv_train"] == 1)
  89. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  90. ]
  91. if len(df_bins_info_filter2) == 0:
  92. return f"因为monto单调变化最小次数【{df_bins_info_filter1['monto_shift_nsv'].min()}】大于阈值被剔除"
  93. df_bins_info_filter3 = df_bins_info[
  94. (df_bins_info["is_qualified_iv_train"] == 1)
  95. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  96. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  97. ]
  98. if len(df_bins_info_filter3) == 0:
  99. return f"因为trend变量趋势一致性变化最小次数【{df_bins_info_filter2['trend_shift_nsv'].min()}】大于阈值被剔除"
  100. df_bins_info_filter4 = df_bins_info[
  101. (df_bins_info["is_qualified_iv_train"] == 1)
  102. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  103. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  104. & (df_bins_info["is_qualified_psi"] == 1)
  105. ]
  106. if len(df_bins_info_filter4) == 0:
  107. return f"因为psi【{df_bins_info_filter3['psi'].min()}】大于阈值被剔除"
  108. print(df_bins_info_filter4)
  109. return f"因为【未知原因】被剔除"
  110. def filter(self) -> Union[BinInfo, None]:
  111. # iv psi 变量单调性 变量趋势一致性 筛选
  112. df_bins_info = self.convert_to_df()
  113. # 人工指定切分点的直接返回
  114. if not self.is_auto_bins:
  115. return BinInfo.ofConvertByDict(df_bins_info.iloc[0].to_dict())
  116. if self.is_include:
  117. df_bins_info_filter = df_bins_info
  118. else:
  119. df_bins_info_filter = df_bins_info[
  120. (df_bins_info["is_qualified_iv_train"] == 1)
  121. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  122. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  123. & (df_bins_info["is_qualified_psi"] == 1)
  124. ]
  125. # 选取单调性变化最少,iv最大,psi 最小的分箱
  126. df_bins_info_filter.sort_values(by=["monto_shift_nsv", "trend_shift_nsv", "iv", "psi"],
  127. ascending=[True, True, False, True], inplace=True)
  128. if len(df_bins_info_filter) != 0:
  129. return BinInfo.ofConvertByDict(df_bins_info_filter.iloc[0].to_dict())
  130. return None
  131. def get_best_bins(self) -> List[BinInfo]:
  132. df_bins_info = self.convert_to_df()
  133. bin_num_list = df_bins_info["bin_num"].unique().tolist()
  134. bin_num_list.sort()
  135. bins_info = []
  136. for bin_num in bin_num_list:
  137. df_bins_info_filter = df_bins_info[df_bins_info["bin_num"] == bin_num]
  138. df_bins_info_filter.sort_values(by=["monto_shift_nsv", "trend_shift_nsv", "iv", "psi"],
  139. ascending=[True, True, False, True], inplace=True)
  140. bin_info_dict1 = df_bins_info_filter.iloc[0].to_dict()
  141. bins_info.append(BinInfo.ofConvertByDict(bin_info_dict1))
  142. # 获取没单调性排序的,考虑到age这种变量允许有转折的
  143. df_bins_info_filter.sort_values(by=["trend_shift_nsv", "iv", "psi"],
  144. ascending=[True, False, True], inplace=True)
  145. bin_info_dict2 = df_bins_info_filter.iloc[0].to_dict()
  146. if bin_info_dict1["monto_shift_nsv"] != bin_info_dict2["monto_shift_nsv"]:
  147. bins_info.append(BinInfo.ofConvertByDict(bin_info_dict2))
  148. return bins_info