entity.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2025/2/14
  5. @desc:
  6. """
  7. from typing import Union, List
  8. import pandas as pd
  9. from enums import ContextEnum
  10. from init import context
  11. class BinInfo():
  12. def __init__(self,
  13. x_column: str = None,
  14. bin_num: int = None,
  15. points: list = None,
  16. is_auto_bins: int = None,
  17. train_iv: float = None,
  18. test_iv: float = None,
  19. iv: float = None,
  20. is_qualified_iv_train: int = None,
  21. monto_shift_nsv: int = None,
  22. is_qualified_monto_train_nsv: int = None,
  23. trend_shift_nsv: int = None,
  24. is_qualified_trend_nsv: int = None,
  25. psi: float = None,
  26. is_qualified_psi: int = None,
  27. ):
  28. self.x_column = x_column
  29. self.bin_num = bin_num
  30. self.points = points
  31. self.is_auto_bins = is_auto_bins
  32. self.train_iv = train_iv
  33. self.test_iv = test_iv
  34. self.iv = iv
  35. self.is_qualified_iv_train = is_qualified_iv_train
  36. self.monto_shift_nsv = monto_shift_nsv
  37. self.is_qualified_monto_train_nsv = is_qualified_monto_train_nsv
  38. self.trend_shift_nsv = trend_shift_nsv
  39. self.is_qualified_trend_nsv = is_qualified_trend_nsv
  40. self.psi = psi
  41. self.is_qualified_psi = is_qualified_psi
  42. def to_dict(self):
  43. return self.__dict__
  44. @staticmethod
  45. def ivTopN(data: dict, top_n: int):
  46. candidate = list(data.values())
  47. candidate.sort(key=lambda x: x.iv, reverse=True)
  48. filter_ivtop_overview = ""
  49. filter_ivtop_detail = []
  50. if top_n < len(candidate):
  51. for bin_info in candidate[top_n:]:
  52. filter_ivtop_overview = f"{filter_ivtop_overview}{bin_info.x_column} 因为ivtop【{bin_info.iv}】被剔除\n"
  53. filter_ivtop_detail.append(bin_info.x_column)
  54. candidate = candidate[0:top_n]
  55. context.set_filter_info(ContextEnum.FILTER_IVTOP, filter_ivtop_overview, filter_ivtop_detail)
  56. return {bin_info.x_column: bin_info for bin_info in candidate}
  57. @staticmethod
  58. def ofConvertByDict(data: dict):
  59. bin_info = BinInfo()
  60. for k, v in data.items():
  61. bin_info.__setattr__(k, v)
  62. return bin_info
  63. class HomologousBinInfo():
  64. """
  65. 同一变量不同分箱下的特征信息
  66. """
  67. def __init__(self, x_column: str, is_auto_bins: int = None, is_include: bool = False):
  68. self.x_column = x_column
  69. self.is_auto_bins = is_auto_bins
  70. self.is_include = is_include
  71. self.bins_info: List[BinInfo] = []
  72. def add(self, bin_info: BinInfo):
  73. self.bins_info.append(bin_info)
  74. def convert_to_df(self) -> pd.DataFrame:
  75. data = []
  76. for bin_info in self.bins_info:
  77. data.append(bin_info.to_dict())
  78. df_bins_info = pd.DataFrame(data=data)
  79. return df_bins_info
  80. def drop_reason(self, ) -> str:
  81. df_bins_info = self.convert_to_df()
  82. df_bins_info_filter1 = df_bins_info[df_bins_info["is_qualified_iv_train"] == 1]
  83. if len(df_bins_info_filter1) == 0:
  84. return f"因为train_iv最大值【{df_bins_info['train_iv'].max()}】小于阈值被剔除"
  85. df_bins_info_filter2 = df_bins_info[
  86. (df_bins_info["is_qualified_iv_train"] == 1)
  87. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  88. ]
  89. if len(df_bins_info_filter2) == 0:
  90. return f"因为monto单调变化最小次数【{df_bins_info_filter1['monto_shift_nsv'].min()}】大于阈值被剔除"
  91. df_bins_info_filter3 = df_bins_info[
  92. (df_bins_info["is_qualified_iv_train"] == 1)
  93. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  94. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  95. ]
  96. if len(df_bins_info_filter3) == 0:
  97. return f"因为trend变量趋势一致性变化最小次数【{df_bins_info_filter2['trend_shift_nsv'].min()}】大于阈值被剔除"
  98. df_bins_info_filter4 = df_bins_info[
  99. (df_bins_info["is_qualified_iv_train"] == 1)
  100. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  101. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  102. & (df_bins_info["is_qualified_psi"] == 1)
  103. ]
  104. if len(df_bins_info_filter4) == 0:
  105. return f"因为psi【{df_bins_info_filter3['psi'].min()}】大于阈值被剔除"
  106. print(df_bins_info_filter4)
  107. return f"因为【未知原因】被剔除"
  108. def filter(self) -> Union[BinInfo, None]:
  109. # iv psi 变量单调性 变量趋势一致性 筛选
  110. df_bins_info = self.convert_to_df()
  111. # 人工指定切分点的直接返回
  112. if not self.is_auto_bins:
  113. return BinInfo.ofConvertByDict(df_bins_info.iloc[0].to_dict())
  114. if self.is_include:
  115. df_bins_info_filter = df_bins_info
  116. else:
  117. df_bins_info_filter = df_bins_info[
  118. (df_bins_info["is_qualified_iv_train"] == 1)
  119. & (df_bins_info["is_qualified_monto_train_nsv"] == 1)
  120. & (df_bins_info["is_qualified_trend_nsv"] == 1)
  121. & (df_bins_info["is_qualified_psi"] == 1)
  122. ]
  123. # 选取单调性变化最少,iv最大,psi 最小的分箱
  124. df_bins_info_filter.sort_values(by=["monto_shift_nsv", "trend_shift_nsv", "iv", "psi"],
  125. ascending=[True, True, False, True], inplace=True)
  126. if len(df_bins_info_filter) != 0:
  127. return BinInfo.ofConvertByDict(df_bins_info_filter.iloc[0].to_dict())
  128. return None
  129. def get_best_bins(self) -> List[BinInfo]:
  130. df_bins_info = self.convert_to_df()
  131. bin_num_list = df_bins_info["bin_num"].unique().tolist()
  132. bin_num_list.sort()
  133. bins_info = []
  134. for bin_num in bin_num_list:
  135. df_bins_info_filter = df_bins_info[df_bins_info["bin_num"] == bin_num]
  136. df_bins_info_filter.sort_values(by=["monto_shift_nsv", "trend_shift_nsv", "iv", "psi"],
  137. ascending=[True, True, False, True], inplace=True)
  138. bin_info_dict1 = df_bins_info_filter.iloc[0].to_dict()
  139. bins_info.append(BinInfo.ofConvertByDict(bin_info_dict1))
  140. # 获取没单调性排序的,考虑到age这种变量允许有转折的
  141. df_bins_info_filter.sort_values(by=["trend_shift_nsv", "iv", "psi"],
  142. ascending=[True, False, True], inplace=True)
  143. bin_info_dict2 = df_bins_info_filter.iloc[0].to_dict()
  144. if bin_info_dict1["monto_shift_nsv"] != bin_info_dict2["monto_shift_nsv"]:
  145. bins_info.append(BinInfo.ofConvertByDict(bin_info_dict2))
  146. return bins_info