# -*- coding:utf-8 -*- """ @author: yq @time: 2024/1/2 @desc: iv值及单调性筛选类 """ import json from itertools import combinations_with_replacement from typing import List, Dict, Tuple import matplotlib.pyplot as plt import numpy as np import pandas as pd import scorecardpy as sc import seaborn as sns from pandas.core.dtypes.common import is_numeric_dtype from tqdm import tqdm from commom import f_display_images_by_side, NumpyEncoder from entitys import DataSplitEntity, CandidateFeatureEntity, DataPreparedEntity, DataFeatureEntity, MetricFucEntity from .feature_utils import f_judge_monto, f_get_corr, f_get_ivf, f_format_bin, f_monto_contrast from .filter_strategy_base import FilterStrategyBase class StrategyIv(FilterStrategyBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _f_get_iv_by_bins(self, bins) -> pd.DataFrame: iv = {key_: [round(value_['total_iv'].max(), 4)] for key_, value_ in bins.items()} iv = pd.DataFrame.from_dict(iv, orient='index', columns=['IV']).reset_index() iv = iv.sort_values('IV', ascending=False).reset_index(drop=True) iv.columns = ['变量', 'IV'] return iv def _f_get_var_corr_image(self, train_woe): if len(train_woe.columns.to_list()) <= 1: return None train_corr = f_get_corr(train_woe) plt.figure(figsize=(12, 12)) sns.heatmap(train_corr, vmax=1, square=True, cmap='RdBu', annot=True) plt.title('Variables Correlation', fontsize=15) plt.yticks(rotation=0) plt.xticks(rotation=90) path = self.data_process_config.f_get_save_path(f"var_corr.png") plt.savefig(path) return path def _f_save_var_trend(self, bins, x_columns_candidate, prefix): image_path_list = [] for k in x_columns_candidate: bin_df = bins[k] # bin_df["bin"] = bin_df["bin"].apply(lambda x: re.sub(r"(\d+\.\d+)", # lambda m: "{:.2f}".format(float(m.group(0))), x)) sc.woebin_plot(bin_df) path = self.data_process_config.f_get_save_path(f"{prefix}_{k}.png") plt.savefig(path) image_path_list.append(path) return image_path_list def _f_get_bins_by_breaks(self, data: pd.DataFrame, candidate_dict: Dict[str, CandidateFeatureEntity], y_column=None): y_column = self.data_process_config.y_column if y_column is None else y_column special_values = self.data_process_config.special_values x_columns_candidate = list(candidate_dict.keys()) breaks_list = {} for column, candidate in candidate_dict.items(): breaks_list[column] = candidate.breaks_list bins = sc.woebin(data[x_columns_candidate + [y_column]], y=y_column, breaks_list=breaks_list, special_values=special_values, print_info=False) return bins def _f_corr_filter(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity]) -> List[str]: # 相关性剔除变量 corr_threshold = self.data_process_config.corr_threshold breaks_list = self.data_process_config.breaks_list train_data = data.train_data x_columns_candidate = list(candidate_dict.keys()) bins = self._f_get_bins_by_breaks(train_data, candidate_dict) train_woe = sc.woebin_ply(train_data[x_columns_candidate], bins, print_info=False) corr_df = f_get_corr(train_woe) corr_dict = corr_df.to_dict() for column, corr in corr_dict.items(): column = column.replace("_woe", "") if column not in x_columns_candidate: continue for challenger_column, challenger_corr in corr.items(): challenger_column = challenger_column.replace("_woe", "") if challenger_corr < corr_threshold or column == challenger_column \ or challenger_column not in x_columns_candidate: continue iv_max = candidate_dict[column].iv_max challenger_iv_max = candidate_dict[challenger_column].iv_max if iv_max > challenger_iv_max: if challenger_column not in breaks_list.keys(): x_columns_candidate.remove(challenger_column) else: if column not in breaks_list.keys(): x_columns_candidate.remove(column) break return x_columns_candidate def _f_wide_filter(self, data: DataSplitEntity) -> Dict: # 粗筛变量 train_data = data.train_data test_data = data.test_data special_values = self.data_process_config.special_values breaks_list = self.data_process_config.breaks_list.copy() y_column = self.data_process_config.y_column iv_threshold_wide = self.data_process_config.iv_threshold_wide x_columns_candidate = self.data_process_config.x_columns_candidate if x_columns_candidate is None or len(x_columns_candidate) == 0: x_columns_candidate = train_data.columns.tolist() if y_column in x_columns_candidate: x_columns_candidate.remove(y_column) bins_train = sc.woebin(train_data[x_columns_candidate + [y_column]], y=y_column, bin_num_limit=5, special_values=special_values, breaks_list=breaks_list, print_info=False) for column, bin in bins_train.items(): breaks_list[column] = list(bin['breaks']) bins_test = None if test_data is not None and len(test_data) != 0: bins_test = sc.woebin(test_data[x_columns_candidate + [y_column]], y=y_column, special_values=special_values, breaks_list=breaks_list, print_info=False) bins_iv_dict = {} for column, bin_train in bins_train.items(): train_iv = bin_train['total_iv'][0] test_iv = 0 if bins_test is not None: bin_test = bins_test[column] test_iv = bin_test['total_iv'][0] iv_max = train_iv + test_iv if train_iv < iv_threshold_wide: continue bins_iv_dict[column] = {"iv_max": iv_max, "breaks_list": breaks_list[column]} return bins_iv_dict def _f_get_best_bins_numeric(self, data: DataSplitEntity, x_column: str): # 贪婪搜索【训练集】及【测试集】加起来【iv】值最高的且【单调】的分箱 interval = self.data_process_config.bin_search_interval iv_threshold = self.data_process_config.iv_threshold special_values = self.data_process_config.get_special_values(x_column) breaks_list = self.data_process_config.get_breaks_list(x_column) y_column = self.data_process_config.y_column sample_rate = self.data_process_config.sample_rate format_bin = self.data_process_config.format_bin pos_neg_cnt = self.data_process_config.pos_neg_cnt monto_contrast_change_cnt = self.data_process_config.monto_contrast_change_cnt def _n0(x): return sum(x == 0) def _n1(x): return sum(x == 1) def _f_distribute_balls(balls, boxes): # 计算在 balls - 1 个空位中放入 boxes - 1 个隔板的方法数 total_ways = combinations_with_replacement(range(balls + boxes - 1), boxes - 1) distribute_list = [] # 遍历所有可能的隔板位置 for combo in total_ways: # 根据隔板位置分配球 distribution = [0] * boxes start = 0 for i, divider in enumerate(combo): distribution[i] = divider - start + 1 start = divider + 1 distribution[-1] = balls - start # 最后一个箱子的球数 # 确保每个箱子至少有一个球 if all(x > 0 for x in distribution): distribute_list.append(distribution) return distribute_list def _get_sv_bins(df, x_column, y_column, special_values): # special_values_bins sv_bin_list = [] for special in special_values: dtm = df[df[x_column] == special] if len(dtm) != 0: dtm['bin'] = [str(special)] * len(dtm) binning = dtm.groupby(['bin'], group_keys=False)[y_column].agg( [_n0, _n1]).reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'}) binning['is_special_values'] = [True] * len(binning) sv_bin_list.append(binning) return sv_bin_list def _get_bins(df, x_column, y_column, breaks_list): dtm = pd.DataFrame({'y': df[y_column], 'value': df[x_column]}) bstbrks = [-np.inf] + breaks_list + [np.inf] labels = ['[{},{})'.format(bstbrks[i], bstbrks[i + 1]) for i in range(len(bstbrks) - 1)] dtm.loc[:, 'bin'] = pd.cut(dtm['value'], bstbrks, right=False, labels=labels) dtm['bin'] = dtm['bin'].astype(str) bins = dtm.groupby(['bin'], group_keys=False)['y'].agg([_n0, _n1]) \ .reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'}) bins['is_special_values'] = [False] * len(bins) return bins def _get_badprob(bins): bins['count'] = bins['good'] + bins['bad'] bins['badprob'] = bins['bad'] / bins['count'] bad_prob = bins[bins['is_special_values'] == False]['badprob'].values.tolist() return bad_prob def _calculation_iv(bins, judge_monto=True, pos_neg_cnt=1): # 单调性判断 bad_prob = _get_badprob(bins) if judge_monto and not f_judge_monto(bad_prob, pos_neg_cnt): return -1 # 计算iv infovalue = pd.DataFrame({'good': bins['good'], 'bad': bins['bad']}) \ .replace(0, 0.9) \ .assign( DistrBad=lambda x: x.bad / sum(x.bad), DistrGood=lambda x: x.good / sum(x.good) ) \ .assign(iv=lambda x: (x.DistrBad - x.DistrGood) * np.log(x.DistrBad / x.DistrGood)) \ .iv bins['bin_iv'] = infovalue bins['total_iv'] = bins['bin_iv'].sum() iv = bins['total_iv'].values[0] return iv def _f_sampling(distribute_list: list, sample_rate: float): # 采样,完全贪婪搜索耗时太长 sampled_list = distribute_list[::int(1 / sample_rate)] return sampled_list train_data = data.train_data train_data_filter = train_data[~train_data[x_column].isin(special_values)] train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True) train_data_x = train_data_filter[x_column] train_data_x_describe = train_data_x.describe(percentiles=[0.1, 0.9]) test_data = data.test_data test_data_filter = None if test_data is not None and len(test_data) != 0: test_data_filter = test_data[~test_data[x_column].isin(special_values)] test_data_filter = test_data_filter.sort_values(by=x_column, ascending=True) # 构造数据切分点 # 计算 2 - 5 箱的情况 distribute_list = [] points_list = [] for bin_num in list(range(2, 6)): distribute_list_cache = _f_distribute_balls(int(1 / interval), bin_num) # 4箱及以上得采样,不然耗时太久 sample_num = 1000 * sample_rate if sample_rate <= 0.15: sample_num *= 2 if bin_num == 4 and len(distribute_list_cache) >= sample_num: distribute_list_cache = _f_sampling(distribute_list_cache, sample_num / len(distribute_list_cache)) sample_num = 4000 * sample_rate if bin_num == 5 and len(distribute_list_cache) >= sample_num: distribute_list_cache = _f_sampling(distribute_list_cache, sample_num / len(distribute_list_cache)) distribute_list.extend(distribute_list_cache) for distribute in distribute_list: point_list_cache = [] point_percentile_list = [sum(distribute[0:idx + 1]) * interval for idx, _ in enumerate(distribute[0:-1])] for point_percentile in point_percentile_list: point = train_data_x.iloc[int(len(train_data_x) * point_percentile)] if format_bin: point = f_format_bin(train_data_x_describe, point) if point not in point_list_cache: point_list_cache.append(point) if point_list_cache not in points_list: points_list.append(point_list_cache) # IV与单调性过滤 # 获取2 - 5 箱的情况下最佳分箱 bins_enum = {} iv_max = 0 breaks_list_target = None judge_monto = True if len(breaks_list) != 0: points_list = [breaks_list] judge_monto = False train_sv_bin_list = _get_sv_bins(train_data, x_column, y_column, special_values) test_sv_bin_list = None if test_data_filter is not None: test_sv_bin_list = _get_sv_bins(test_data, x_column, y_column, special_values) for point_list in points_list: is_discard = 0 discard_reason = "" is_monto = 1 is_monto_contrast = 1 train_bins = _get_bins(train_data_filter, x_column, y_column, point_list) # 与special_values合并计算iv for sv_bin in train_sv_bin_list: train_bins = pd.concat((train_bins, sv_bin)) # _calculation_iv包含了单调性判断,并排除了特殊值 train_iv = _calculation_iv(train_bins, judge_monto, pos_neg_cnt) # 只限制训练集的单调性与iv值大小 if train_iv < iv_threshold: discard_reason = f"训练集iv小于阈值{iv_threshold}" is_discard = 1 is_monto = 0 test_iv = 0 if test_data_filter is not None: test_bins = _get_bins(test_data_filter, x_column, y_column, point_list) for sv_bin in test_sv_bin_list: test_bins = pd.concat((test_bins, sv_bin)) test_iv = _calculation_iv(test_bins, judge_monto, pos_neg_cnt) # 趋势一致性判断 train_bad_prob = _get_badprob(train_bins) test_bad_prob = _get_badprob(test_bins) if not f_monto_contrast(train_bad_prob, test_bad_prob, monto_contrast_change_cnt) \ and len(breaks_list) == 0: discard_reason = f"变量趋势一致性不够" is_discard = 1 is_monto_contrast = 0 iv = train_iv + test_iv if len(breaks_list) == 0: bin_num = len(point_list) + 1 if bin_num not in bins_enum.keys(): bins_enum[bin_num] = [] bins_enum[bin_num].append({ "is_discard": is_discard, "is_monto": is_monto, "is_monto_contrast": is_monto_contrast, "discard_reason": discard_reason, "point_list": point_list, "iv": iv, }) if iv > iv_max and not is_discard: iv_max = iv breaks_list_target = point_list # 各个分箱数下的最佳分箱点 bins_enum_best_point = [] for k, v in bins_enum.items(): df_bin_enum = pd.DataFrame(data=v) df_bin_enum.sort_values(by=["is_discard", "is_monto", "is_monto_contrast", "iv"], ascending=[True, False, False, False], inplace=True) bins_enum_best_point.append(df_bin_enum.iloc[0]["point_list"]) return iv_max, breaks_list_target, bins_enum_best_point def filter(self, data: DataSplitEntity, *args, **kwargs) -> Tuple[ Dict[str, CandidateFeatureEntity], Dict[str, List[CandidateFeatureEntity]]]: # 粗筛 bins_iv_dict = self._f_wide_filter(data) x_columns_candidate = list(bins_iv_dict.keys()) candidate_num = self.data_process_config.candidate_num candidate_dict: Dict[str, CandidateFeatureEntity] = {} numeric_candidate_dict_all: Dict[str, List[CandidateFeatureEntity]] = {} for x_column in tqdm(x_columns_candidate): if is_numeric_dtype(data.train_data[x_column]): iv_max, breaks_list, bins_enum_best_point = self._f_get_best_bins_numeric(data, x_column) if len(bins_enum_best_point) != 0 : numeric_candidate_dict_all[x_column] = [] for point in bins_enum_best_point: numeric_candidate_dict_all[x_column].append(CandidateFeatureEntity(x_column, point, 0)) if breaks_list is None: continue candidate_dict[x_column] = CandidateFeatureEntity(x_column, breaks_list, iv_max) else: # 字符型暂时用scorecardpy来处理 candidate_dict[x_column] = CandidateFeatureEntity(x_column, bins_iv_dict[x_column]["breaks_list"], bins_iv_dict[x_column]["iv_max"]) # 相关性进一步剔除变量 x_columns_candidate = self._f_corr_filter(data, candidate_dict) candidate_list: List[CandidateFeatureEntity] = [] for x_column, v in candidate_dict.items(): if x_column in x_columns_candidate: candidate_list.append(v) candidate_list.sort(key=lambda x: x.iv_max, reverse=True) candidate_list = candidate_list[0:candidate_num] candidate_dict = {} for candidate in candidate_list: candidate_dict[candidate.x_column] = candidate return candidate_dict, numeric_candidate_dict_all def feature_generate(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity], *args, **kwargs) -> DataPreparedEntity: train_data = data.train_data val_data = data.val_data test_data = data.test_data y_column = self.data_process_config.y_column x_columns_candidate = list(candidate_dict.keys()) bins = self._f_get_bins_by_breaks(train_data, candidate_dict) train_woe = sc.woebin_ply(train_data[x_columns_candidate], bins, print_info=False) train_data_feature = DataFeatureEntity(pd.concat((train_woe, train_data[y_column]), axis=1), train_woe.columns.tolist(), y_column) val_data_feature = None if val_data is not None and len(val_data) != 0: val_woe = sc.woebin_ply(val_data[x_columns_candidate], bins, print_info=False) val_data_feature = DataFeatureEntity(pd.concat((val_woe, val_data[y_column]), axis=1), train_woe.columns.tolist(), y_column) test_data_feature = None if test_data is not None and len(test_data) != 0: test_woe = sc.woebin_ply(test_data[x_columns_candidate], bins, print_info=False) test_data_feature = DataFeatureEntity(pd.concat((test_woe, test_data[y_column]), axis=1), train_woe.columns.tolist(), y_column) return DataPreparedEntity(train_data_feature, val_data_feature, test_data_feature, bins=bins, data_split_original=data) def feature_report(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity], numeric_candidate_dict_all: Dict[str, List[CandidateFeatureEntity]], *args, **kwargs) -> Dict[str, MetricFucEntity]: y_column = self.data_process_config.y_column jupyter = self.data_process_config.jupyter x_columns_candidate = list(candidate_dict.keys()) train_data = data.train_data test_data = data.test_data metric_value_dict = {} # 样本分布 metric_value_dict["样本分布"] = MetricFucEntity(table=data.get_distribution(y_column), table_font_size=10, table_cell_width=3) # 变量iv及psi train_bins = self._f_get_bins_by_breaks(train_data, candidate_dict) train_iv = self._f_get_iv_by_bins(train_bins) if test_data is not None and len(test_data) != 0: # 计算psi仅需把y改成识别各自训练集测试集即可 psi_df = pd.concat((train_data, test_data)) psi_df["#target#"] = [1] * len(train_data) + [0] * len(test_data) psi = self._f_get_bins_by_breaks(psi_df, candidate_dict, y_column="#target#") psi = self._f_get_iv_by_bins(psi) psi.columns = ['变量', 'psi'] train_iv = pd.merge(train_iv, psi, on="变量", how="left") # 变量趋势-测试集 test_bins = self._f_get_bins_by_breaks(test_data, candidate_dict) image_path_list = self._f_save_var_trend(test_bins, x_columns_candidate, "test") metric_value_dict["变量趋势-测试集"] = MetricFucEntity(image_path=image_path_list, image_size=4) metric_value_dict["变量iv"] = MetricFucEntity(table=train_iv, table_font_size=10, table_cell_width=3) # 变量趋势-训练集 image_path_list = self._f_save_var_trend(train_bins, x_columns_candidate, "train") metric_value_dict["变量趋势-训练集"] = MetricFucEntity(image_path=image_path_list, image_size=4) # 变量有效性 train_woe = sc.woebin_ply(train_data[x_columns_candidate], train_bins, print_info=False) var_corr_image_path = self._f_get_var_corr_image(train_woe) # vif vif_df = f_get_ivf(train_woe) metric_value_dict["变量有效性"] = MetricFucEntity(image_path=var_corr_image_path, table=vif_df) if jupyter: from IPython import display display.display(metric_value_dict["样本分布"].table) # 打印变量iv display.display(metric_value_dict["变量iv"].table) # 打印vif display.display(metric_value_dict["变量有效性"].table) # 打印变量相关性 f_display_images_by_side(metric_value_dict["变量有效性"].image_path, display, width=800) # 打印变量趋势 var_trend_train = metric_value_dict["变量趋势-训练集"].image_path var_trend_test = None metric_test = metric_value_dict.get("变量趋势-测试集") if metric_test is not None: var_trend_test = metric_test.image_path f_display_images_by_side(var_trend_train, display, title="变量趋势训练集", image_path_list2=var_trend_test, title2="变量趋势测试集") # 打印breaks_list breaks_list = {} for x_column, feature in candidate_dict.items(): breaks_list[x_column] = feature.breaks_list print("变量切分点:") print(json.dumps(breaks_list, ensure_ascii=False, indent=2, cls=NumpyEncoder)) # 打印所有变量的推荐切分点 print("-----不同分箱数下变量的推荐切分点-----") for x_column, features in numeric_candidate_dict_all.items(): print(f"-----【{x_column}】-----") var_trend_images_train = [] var_trend_images_test = [] for feature in features: var_breaks_list = [str(i) for i in feature.breaks_list] var_trend_bins_train = self._f_get_bins_by_breaks(train_data, {x_column: feature}) image_path = self._f_save_var_trend(var_trend_bins_train, [x_column], f"train_{x_column}_{'_'.join(var_breaks_list)}") var_trend_images_train.append(image_path[0]) if metric_test is not None: var_trend_bins_test = self._f_get_bins_by_breaks(test_data, {x_column: feature}) image_path = self._f_save_var_trend(var_trend_bins_test, [x_column], f"test_{x_column}_{'_'.join(var_breaks_list)}") var_trend_images_test.append(image_path[0]) f_display_images_by_side(var_trend_images_train, display, title=f"训练集", image_path_list2=var_trend_images_test, title2="测试集") return metric_value_dict