feature_utils.py 4.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding:utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2023/12/28
  5. @desc: 特征工具类
  6. """
  7. import numpy as np
  8. import pandas as pd
  9. import scorecardpy as sc
  10. from statsmodels.stats.outliers_influence import variance_inflation_factor as vif
  11. # 此函数判断list的单调性,允许至多N次符号变化
  12. def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int:
  13. start_tr = bd_list[1] - bd_list[0]
  14. tmp_len = len(bd_list)
  15. pos_neg_flag = 0
  16. for i in range(2, tmp_len):
  17. tmp_tr = bd_list[i] - bd_list[i - 1]
  18. # 后一位bad_rate减前一位bad_rate,保证bad_rate的单调性
  19. # 记录符号变化, 允许 最多一次符号变化,即U型分布
  20. if (tmp_tr >= 0 and start_tr >= 0) or (tmp_tr <= 0 and start_tr <= 0):
  21. # 满足趋势保持,查看下一位
  22. continue
  23. else:
  24. # 记录一次符号变化
  25. start_tr = tmp_tr
  26. pos_neg_flag += 1
  27. if pos_neg_flag > pos_neg_cnt:
  28. return False
  29. # 记录满足趋势要求的变量
  30. if pos_neg_flag <= pos_neg_cnt:
  31. return True
  32. return False
  33. def f_get_corr(data: pd.DataFrame, meth: str = 'spearman') -> pd.DataFrame:
  34. return data.corr(method=meth)
  35. def f_get_ivf(data: pd.DataFrame) -> pd.DataFrame:
  36. if len(data.columns.to_list()) <= 1:
  37. return None
  38. vif_v = [vif(data.values, data.columns.get_loc(i)) for i in data.columns]
  39. vif_df = pd.DataFrame()
  40. vif_df["变量"] = data.columns
  41. vif_df['vif'] = vif_v
  42. return vif_df
  43. def f_calcu_model_ks(data, y_column, sort_ascending):
  44. var_ks = data.groupby('MODEL_SCORE_BIN')[y_column].agg([len, np.sum]).sort_index(ascending=sort_ascending)
  45. var_ks.columns = ['样本数', '坏样本数']
  46. var_ks['好样本数'] = var_ks['样本数'] - var_ks['坏样本数']
  47. var_ks['坏样本比例'] = (var_ks['坏样本数'] / var_ks['样本数']).round(4)
  48. var_ks['样本数比例'] = (var_ks['样本数'] / var_ks['样本数'].sum()).round(4)
  49. var_ks['总坏样本数'] = var_ks['坏样本数'].sum()
  50. var_ks['总好样本数'] = var_ks['好样本数'].sum()
  51. var_ks['平均坏样本率'] = (var_ks['总坏样本数'] / var_ks['样本数'].sum()).round(4)
  52. var_ks['累计坏样本数'] = var_ks['坏样本数'].cumsum()
  53. var_ks['累计好样本数'] = var_ks['好样本数'].cumsum()
  54. var_ks['累计样本数'] = var_ks['样本数'].cumsum()
  55. var_ks['累计坏样本比例'] = (var_ks['累计坏样本数'] / var_ks['总坏样本数']).round(4)
  56. var_ks['累计好样本比例'] = (var_ks['累计好样本数'] / var_ks['总好样本数']).round(4)
  57. var_ks['KS'] = (var_ks['累计坏样本比例'] - var_ks['累计好样本比例']).round(4)
  58. var_ks['LIFT'] = ((var_ks['累计坏样本数'] / var_ks['累计样本数']) / var_ks['平均坏样本率']).round(4)
  59. return var_ks.reset_index()
  60. def f_get_model_score_bin(df, card, bins=None):
  61. train_score = sc.scorecard_ply(df, card, print_step=0)
  62. df['score'] = train_score
  63. if bins is None:
  64. _, bins = pd.qcut(df['score'], q=10, retbins=True, duplicates="drop")
  65. bins = list(bins)
  66. bins[0] = -np.inf
  67. bins[-1] = np.inf
  68. score_bins = pd.cut(df['score'], bins=bins)
  69. df['MODEL_SCORE_BIN'] = score_bins.astype(str).values
  70. return df, bins
  71. def f_calcu_model_psi(df_train, df_test):
  72. tmp1 = df_train.groupby('MODEL_SCORE_BIN')['MODEL_SCORE_BIN'].agg(['count']).sort_index(ascending=True)
  73. tmp1['样本数比例'] = (tmp1['count'] / tmp1['count'].sum()).round(4)
  74. tmp2 = df_test.groupby('MODEL_SCORE_BIN')['MODEL_SCORE_BIN'].agg(['count']).sort_index(ascending=True)
  75. tmp2['样本数比例'] = (tmp2['count'] / tmp2['count'].sum()).round(4)
  76. psi = ((tmp1['样本数比例'] - tmp2['样本数比例']) * np.log(tmp1['样本数比例'] / tmp2['样本数比例'])).round(4)
  77. psi = psi.reset_index()
  78. psi = psi.rename(columns={"样本数比例": "psi"})
  79. psi['训练样本数'] = list(tmp1['count'])
  80. psi['测试样本数'] = list(tmp2['count'])
  81. psi['训练样本数比例'] = list(tmp1['样本数比例'])
  82. psi['测试样本数比例'] = list(tmp2['样本数比例'])
  83. return psi