# -*- coding:utf-8 -*- """ @author: yq @time: 2023/12/28 @desc: 特征工具类 """ import numpy as np import pandas as pd import scorecardpy as sc import toad as td from sklearn.preprocessing import KBinsDiscretizer from statsmodels.stats.outliers_influence import variance_inflation_factor as vif from entitys import DataSplitEntity from enums import BinsStrategyEnum def f_get_bins(data: DataSplitEntity, feat: str, strategy: str = 'quantile', nbins: int = 10) -> pd.DataFrame: # 等频分箱 if strategy == BinsStrategyEnum.QUANTILE.value: kbin_encoder = KBinsDiscretizer(n_bins=nbins, encode='ordinal', strategy='quantile') feature_binned = kbin_encoder.fit_transform(data[feat]) return feature_binned.astype(int).astype(str) # 等宽分箱 if strategy == BinsStrategyEnum.WIDTH.value: bin_width = (data.train_data()[feat].max() - data.train_data()[feat].min()) / nbins return pd.cut(data.train_data()[feat], bins=nbins, labels=[f'Bin_{i}' for i in range(1, nbins + 1)]) # 使用toad分箱 ''' c = td.transfrom.Combiner() # method参数需要根据toad指定的几种方法名称选择 c.fit(data, y = 'target', method = strategy, min_samples=None, n_bins = nbins, empty_separate = False) # 返回toad分箱combiner,用于训练集和测试集的分箱 # 可使用c.export()[feature]查看某一特征的分箱临界值 return c ''' # 此函数入参应为scorecardpy进行woebin函数转换后的dataframe def f_get_bins_display(bins_info: pd.DataFrame) -> pd.DataFrame: df_list = [] for col, bin_data in bins_info.items(): tmp_df = pd.DataFrame(bin_data) df_list.append(tmp_df) result_df = pd.concat(df_list, ignore_index=True) total_bad = result_df['bad'].sum() total_cnt = result_df['count'].sum() # 整体的坏样本率 br_overall = total_bad / total_cnt result_df['lift'] = result_df['badprob'] / br_overall result_df = \ result_df.sort_values(['total_iv', 'variable'], ascending=False).set_index(['variable', 'total_iv', 'bin']) \ [['count_distr', 'count', 'good', 'bad', 'badprob', 'lift', 'bin_iv', 'woe']] return result_df.style.format(subset=['count', 'good', 'bad'], precision=0).format( subset=['count_distr', 'bad', 'lift', 'badprob', 'woe', 'bin_iv'], precision=4).bar(subset=['badprob', 'bin_iv', 'lift'], color=['#d65f58', '#5fbb7a']) # 此函数筛除变量分箱不单调或非U型的变量 def f_bins_filter(bins: pd.DataFrame, cols: list) -> list: result_cols = [] # 遍历原始变量列表 for tmp_col in cols: tmp_br = bins[tmp_col]['bad_prob'].values.tolist() tmp_len = len(tmp_br) if tmp_len <= 2: result_cols.append(tmp_col) else: tmp_judge = f_judge_monto(tmp_br) # f_judge_monto 函数返回1表示list单调,0表示非单调 if tmp_judge: result_cols.append(tmp_col) return result_cols # 此函数判断list的单调性,允许至多N次符号变化 def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int: start_tr = bd_list[1] - bd_list[0] tmp_len = len(bd_list) pos_neg_flag = 0 for i in range(2, tmp_len): tmp_tr = bd_list[i] - bd_list[i - 1] # 后一位bad_rate减前一位bad_rate,保证bad_rate的单调性 # 记录符号变化, 允许 最多一次符号变化,即U型分布 if (tmp_tr >= 0 and start_tr >= 0) or (tmp_tr <= 0 and start_tr <= 0): # 满足趋势保持,查看下一位 continue else: # 记录一次符号变化 start_tr = tmp_tr pos_neg_flag += 1 if pos_neg_flag > pos_neg_cnt: return False # 记录满足趋势要求的变量 if pos_neg_flag <= pos_neg_cnt: return True return False def f_get_woe(data: DataSplitEntity, c: td.transform.Combiner, to_drop: list) -> pd.DataFrame: transer = td.transform.WOETransformer() # 根据训练数据来训练woe转换器,并选择目标变量和排除变量 train_woe = transer.fit_transform(c.transform(data.train_data()), data.train_data()['target'], exclude=to_drop + ['target']) test_woe = transer.transform(c.transfrom(data.test_data())) oot_woe = transer.transform(c.transform(data.val_data())) return train_woe, test_woe, oot_woe def f_get_psi(train_data: DataSplitEntity, oot_data: DataSplitEntity) -> pd.DataFrame: # 计算前,先排除掉不需要的cols return td.metrics.PSI(train_data, oot_data) def f_get_corr(data: pd.DataFrame, meth: str = 'spearman') -> pd.DataFrame: return data.corr(method=meth) def f_get_ivf(data: pd.DataFrame) -> pd.DataFrame: vif_v = [vif(data.values, data.columns.get_loc(i)) for i in data.columns] vif_df = pd.DataFrame() vif_df["变量"] = data.columns vif_df['vif'] = vif_v return vif_df def f_calcu_model_ks(data, y_column, sort_ascending): var_ks = data.groupby('MODEL_SCORE_BIN')[y_column].agg([len, np.sum]).sort_index(ascending=sort_ascending) var_ks.columns = ['样本数', '坏样本数'] var_ks['好样本数'] = var_ks['样本数'] - var_ks['坏样本数'] var_ks['坏样本比例'] = (var_ks['坏样本数'] / var_ks['样本数']).round(4) var_ks['样本数比例'] = (var_ks['样本数'] / var_ks['样本数'].sum()).round(4) var_ks['总坏样本数'] = var_ks['坏样本数'].sum() var_ks['总好样本数'] = var_ks['好样本数'].sum() var_ks['平均坏样本率'] = (var_ks['总坏样本数'] / var_ks['样本数'].sum()).round(4) var_ks['累计坏样本数'] = var_ks['坏样本数'].cumsum() var_ks['累计好样本数'] = var_ks['好样本数'].cumsum() var_ks['累计样本数'] = var_ks['样本数'].cumsum() var_ks['累计坏样本比例'] = (var_ks['累计坏样本数'] / var_ks['总坏样本数']).round(4) var_ks['累计好样本比例'] = (var_ks['累计好样本数'] / var_ks['总好样本数']).round(4) var_ks['KS'] = (var_ks['累计坏样本比例'] - var_ks['累计好样本比例']).round(4) var_ks['LIFT'] = ((var_ks['累计坏样本数'] / var_ks['累计样本数']) / var_ks['平均坏样本率']).round(4) return var_ks.reset_index() def f_get_model_score_bin(df, card, bins=None): train_score = sc.scorecard_ply(df, card, print_step=0) df['score'] = train_score if bins is None: _, bins = pd.qcut(df['score'], q=10, retbins=True) bins = list(bins) bins[0] = -np.inf bins[-1] = np.inf score_bins = pd.cut(df['score'], bins=bins) df['MODEL_SCORE_BIN'] = score_bins.astype(str).values return df, bins def f_calcu_model_psi(df_train, df_test): tmp1 = df_train.groupby('MODEL_SCORE_BIN')['MODEL_SCORE_BIN'].agg(['count']).sort_index(ascending=True) tmp1['样本数比例'] = (tmp1['count'] / tmp1['count'].sum()).round(4) tmp2 = df_test.groupby('MODEL_SCORE_BIN')['MODEL_SCORE_BIN'].agg(['count']).sort_index(ascending=True) tmp2['样本数比例'] = (tmp2['count'] / tmp2['count'].sum()).round(4) psi = ((tmp1['样本数比例'] - tmp2['样本数比例']) * np.log(tmp1['样本数比例'] / tmp2['样本数比例'])).round(4) psi = psi.reset_index() psi = psi.rename(columns={"样本数比例": "psi"}) psi['训练样本数'] = list(tmp1['count']) psi['测试样本数'] = list(tmp2['count']) psi['训练样本数比例'] = list(tmp1['样本数比例']) psi['测试样本数比例']=list(tmp2['样本数比例']) return psi