# -*- coding:utf-8 -*- """ @author: yq @time: 2024/1/2 @desc: iv值及单调性筛选类 """ from itertools import combinations_with_replacement from typing import List, Dict import matplotlib.pyplot as plt import numpy as np import pandas as pd import scorecardpy as sc import seaborn as sns from pandas.core.dtypes.common import is_numeric_dtype from entitys import DataSplitEntity, CandidateFeatureEntity, DataPreparedEntity, DataFeatureEntity, MetricFucEntity from init import f_get_save_path from .feature_utils import f_judge_monto, f_get_corr from .filter_strategy_base import FilterStrategyBase plt.rcParams['figure.figsize'] = (8, 8) class StrategyIv(FilterStrategyBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _f_save_var_trend(self, bins, x_columns_candidate, prefix): image_path_list = [] for k in x_columns_candidate: bin_df = bins[k] # bin_df["bin"] = bin_df["bin"].apply(lambda x: re.sub(r"(\d+\.\d+)", # lambda m: "{:.2f}".format(float(m.group(0))), x)) sc.woebin_plot(bin_df) path = f_get_save_path(f"{prefix}_{k}.png") plt.savefig(path) image_path_list.append(path) return image_path_list def _f_get_bins_by_breaks(self, data: pd.DataFrame, candidate_dict: Dict[str, CandidateFeatureEntity], y_column=None): y_column = self.data_process_config.y_column if y_column is None else y_column special_values = self.data_process_config.special_values x_columns_candidate = list(candidate_dict.keys()) breaks_list = {} for column, candidate in candidate_dict.items(): breaks_list[column] = candidate.breaks_list bins = sc.woebin(data[x_columns_candidate + [y_column]], y=y_column, breaks_list=breaks_list, special_values=special_values) return bins def _f_corr_filter(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity]) -> List[str]: # 相关性剔除变量 corr_threshold = self.data_process_config.corr_threshold train_data = data.train_data x_columns_candidate = list(candidate_dict.keys()) bins = self._f_get_bins_by_breaks(train_data, candidate_dict) train_woe = sc.woebin_ply(train_data[x_columns_candidate], bins) corr_df = f_get_corr(train_woe) corr_dict = corr_df.to_dict() for column, corr in corr_dict.items(): column = column.replace("_woe", "") if column not in x_columns_candidate: continue for challenger_column, challenger_corr in corr.items(): challenger_column = challenger_column.replace("_woe", "") if challenger_corr < corr_threshold or column == challenger_column \ or challenger_column not in x_columns_candidate: continue iv_max = candidate_dict[column].iv_max challenger_iv_max = candidate_dict[challenger_column].iv_max if iv_max > challenger_iv_max: x_columns_candidate.remove(challenger_column) else: x_columns_candidate.remove(column) break return x_columns_candidate def _f_wide_filter(self, data: DataSplitEntity) -> Dict: # 粗筛变量 train_data = data.train_data test_data = data.test_data special_values = self.data_process_config.special_values y_column = self.data_process_config.y_column iv_threshold_wide = self.data_process_config.iv_threshold_wide x_columns_candidate = self.data_process_config.x_columns_candidate if x_columns_candidate is None or len(x_columns_candidate) == 0: x_columns_candidate = train_data.columns.tolist() x_columns_candidate.remove(y_column) bins_train = sc.woebin(train_data[x_columns_candidate + [y_column]], y=y_column, special_values=special_values, bin_num_limit=5) breaks_list = {} for column, bin in bins_train.items(): breaks_list[column] = list(bin['breaks']) bins_test = None if test_data is not None and len(test_data) != 0: bins_test = sc.woebin(test_data[x_columns_candidate + [y_column]], y=y_column, breaks_list=breaks_list, special_values=special_values ) bins_iv_dict = {} for column, bin_train in bins_train.items(): train_iv = bin_train['total_iv'][0] test_iv = 0 if bins_test is not None: bin_test = bins_test[column] test_iv = bin_test['total_iv'][0] iv_max = train_iv + test_iv if train_iv < iv_threshold_wide: continue bins_iv_dict[column] = {"iv_max": iv_max, "breaks_list": breaks_list[column]} return bins_iv_dict def _f_get_best_bins_numeric(self, data: DataSplitEntity, x_column: str): # 贪婪搜索【训练集】及【测试集】加起来【iv】值最高的且【单调】的分箱 interval = self.data_process_config.bin_search_interval iv_threshold = self.data_process_config.iv_threshold special_values = self.data_process_config.get_special_values(x_column) y_column = self.data_process_config.y_column sample_rate = self.data_process_config.sample_rate def _n0(x): return sum(x == 0) def _n1(x): return sum(x == 1) def _f_distribute_balls(balls, boxes): # 计算在 balls - 1 个空位中放入 boxes - 1 个隔板的方法数 total_ways = combinations_with_replacement(range(balls + boxes - 1), boxes - 1) distribute_list = [] # 遍历所有可能的隔板位置 for combo in total_ways: # 根据隔板位置分配球 distribution = [0] * boxes start = 0 for i, divider in enumerate(combo): distribution[i] = divider - start + 1 start = divider + 1 distribution[-1] = balls - start # 最后一个箱子的球数 # 确保每个箱子至少有一个球 if all(x > 0 for x in distribution): distribute_list.append(distribution) return distribute_list def _get_sv_bins(df, x_column, y_column, special_values): # special_values_bins sv_bin_list = [] for special in special_values: dtm = df[df[x_column] == special] if len(dtm) != 0: dtm['bin'] = [str(special)] * len(dtm) binning = dtm.groupby(['bin'], group_keys=False)[y_column].agg( [_n0, _n1]).reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'}) binning['is_special_values'] = [True] * len(binning) sv_bin_list.append(binning) return sv_bin_list def _get_bins(df, x_column, y_column, breaks_list): dtm = pd.DataFrame({'y': df[y_column], 'value': df[x_column]}) bstbrks = [-np.inf] + breaks_list + [np.inf] labels = ['[{},{})'.format(bstbrks[i], bstbrks[i + 1]) for i in range(len(bstbrks) - 1)] dtm.loc[:, 'bin'] = pd.cut(dtm['value'], bstbrks, right=False, labels=labels) dtm['bin'] = dtm['bin'].astype(str) bins = dtm.groupby(['bin'], group_keys=False)['y'].agg([_n0, _n1]) \ .reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'}) bins['is_special_values'] = [False] * len(bins) return bins def _calculation_iv(bins): bins['count'] = bins['good'] + bins['bad'] bins['badprob'] = bins['bad'] / bins['count'] # 单调性判断 bad_prob = bins[bins['is_special_values'] == False]['badprob'].values.tolist() if not f_judge_monto(bad_prob): return -1 # 计算iv infovalue = pd.DataFrame({'good': bins['good'], 'bad': bins['bad']}) \ .replace(0, 0.9) \ .assign( DistrBad=lambda x: x.bad / sum(x.bad), DistrGood=lambda x: x.good / sum(x.good) ) \ .assign(iv=lambda x: (x.DistrBad - x.DistrGood) * np.log(x.DistrBad / x.DistrGood)) \ .iv bins['bin_iv'] = infovalue bins['total_iv'] = bins['bin_iv'].sum() iv = bins['total_iv'].values[0] return iv def _f_sampling(distribute_list: list, sample_rate: float): # 采样,完全贪婪搜索耗时太长 sampled_list = distribute_list[::int(1 / sample_rate)] return sampled_list train_data = data.train_data train_data_filter = train_data[~train_data[x_column].isin(special_values)] train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True) train_data_x = train_data_filter[x_column] test_data = data.test_data test_data_filter = None if test_data is not None and len(test_data) != 0: test_data_filter = test_data[~test_data[x_column].isin(special_values)] test_data_filter = test_data_filter.sort_values(by=x_column, ascending=True) # 构造数据切分点 # 计算 2 - 5 箱的情况 distribute_list = [] points_list = [] for bin_num in list(range(2, 6)): distribute_list_cache = _f_distribute_balls(int(1 / interval), bin_num) # 4箱及以上得采样,不然耗时太久 sample_num = 1000 * sample_rate if sample_rate <= 0.15: sample_num *= 2 if bin_num == 4 and len(distribute_list_cache) >= sample_num: distribute_list_cache = _f_sampling(distribute_list_cache, sample_num / len(distribute_list_cache)) sample_num = 4000 * sample_rate if bin_num == 5 and len(distribute_list_cache) >= sample_num: distribute_list_cache = _f_sampling(distribute_list_cache, sample_num / len(distribute_list_cache)) distribute_list.extend(distribute_list_cache) for distribute in distribute_list: point_list_cache = [] point_percentile_list = [sum(distribute[0:idx + 1]) * interval for idx, _ in enumerate(distribute[0:-1])] for point_percentile in point_percentile_list: point = train_data_x.iloc[int(len(train_data_x) * point_percentile)] if point not in point_list_cache: point_list_cache.append(point) if point_list_cache not in points_list: points_list.append(point_list_cache) # IV与单调性过滤 iv_max = 0 breaks_list = [] train_sv_bin_list = _get_sv_bins(train_data, x_column, y_column, special_values) test_sv_bin_list = None if test_data_filter is not None: test_sv_bin_list = _get_sv_bins(test_data, x_column, y_column, special_values) from tqdm import tqdm for point_list in tqdm(points_list): train_bins = _get_bins(train_data_filter, x_column, y_column, point_list) # 与special_values合并计算iv for sv_bin in train_sv_bin_list: train_bins = pd.concat((train_bins, sv_bin)) train_iv = _calculation_iv(train_bins) # 只限制训练集的单调性与iv值大小 if train_iv < iv_threshold: continue test_iv = 0 if test_data_filter is not None: test_bins = _get_bins(test_data_filter, x_column, y_column, point_list) for sv_bin in test_sv_bin_list: test_bins = pd.concat((test_bins, sv_bin)) test_iv = _calculation_iv(test_bins) iv = train_iv + test_iv if iv > iv_max: iv_max = iv breaks_list = point_list return iv_max, breaks_list def filter(self, data: DataSplitEntity, *args, **kwargs) -> Dict[str, CandidateFeatureEntity]: # 粗筛 bins_iv_dict = self._f_wide_filter(data) x_columns_candidate = list(bins_iv_dict.keys()) candidate_num = self.data_process_config.candidate_num candidate_dict: Dict[str, CandidateFeatureEntity] = {} for x_column in x_columns_candidate: if is_numeric_dtype(data.train_data[x_column]): iv_max, breaks_list = self._f_get_best_bins_numeric(data, x_column) candidate_dict[x_column] = CandidateFeatureEntity(x_column, breaks_list, iv_max) else: # 字符型暂时用scorecardpy来处理 candidate_dict[x_column] = CandidateFeatureEntity(x_column, bins_iv_dict[x_column]["breaks_list"], bins_iv_dict[x_column]["iv_max"]) # 相关性进一步剔除变量 x_columns_candidate = self._f_corr_filter(data, candidate_dict) candidate_list: List[CandidateFeatureEntity] = [] for x_column, v in candidate_dict.items(): if x_column in x_columns_candidate: candidate_list.append(v) candidate_list.sort(key=lambda x: x.iv_max, reverse=True) candidate_list = candidate_list[0:candidate_num] candidate_dict = {} for candidate in candidate_list: candidate_dict[candidate.x_column] = candidate return candidate_dict def feature_generate(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity], *args, **kwargs) -> DataPreparedEntity: train_data = data.train_data val_data = data.val_data test_data = data.test_data y_column = self.data_process_config.y_column x_columns_candidate = list(candidate_dict.keys()) bins = self._f_get_bins_by_breaks(train_data, candidate_dict) train_woe = sc.woebin_ply(train_data[x_columns_candidate], bins) train_data_feature = DataFeatureEntity(pd.concat((train_woe, train_data[y_column]), axis=1), train_woe.columns.tolist(), y_column) val_data_feature = None if val_data is not None and len(val_data) != 0: val_woe = sc.woebin_ply(val_data[x_columns_candidate], bins) val_data_feature = DataFeatureEntity(pd.concat((val_woe, val_data[y_column]), axis=1), train_woe.columns.tolist(), y_column) test_data_feature = None if test_data is not None and len(test_data) != 0: test_woe = sc.woebin_ply(test_data[x_columns_candidate], bins) test_data_feature = DataFeatureEntity(pd.concat((test_woe, test_data[y_column]), axis=1), train_woe.columns.tolist(), y_column) return DataPreparedEntity(train_data_feature, val_data_feature, test_data_feature) def feature_report(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity], *args, **kwargs) -> Dict[str, MetricFucEntity]: y_column = self.data_process_config.y_column x_columns_candidate = list(candidate_dict.keys()) train_data = data.train_data test_data = data.test_data metric_value_dict = {} # 样本分布 metric_value_dict["样本分布"] = MetricFucEntity(table=data.get_distribution(y_column), table_font_size=12, table_cell_width=3) # 变量iv及psi train_bins = self._f_get_bins_by_breaks(train_data, candidate_dict) train_iv = {key_: [round(value_['total_iv'].max(), 4)] for key_, value_ in train_bins.items()} train_iv = pd.DataFrame.from_dict(train_iv, orient='index', columns=['IV']).reset_index() train_iv = train_iv.sort_values('IV', ascending=False).reset_index(drop=True) train_iv.columns = ['变量', 'IV'] if test_data is not None and len(test_data) != 0: # 计算psi仅需把y改成识别各自训练集测试集即可 psi_df = pd.concat((train_data, test_data)) psi_df["#target#"] = [1] * len(train_data) + [0] * len(test_data) psi = self._f_get_bins_by_breaks(psi_df, candidate_dict, y_column="#target#") psi = {key_: [round(value_['total_iv'].max(), 4)] for key_, value_ in psi.items()} psi = pd.DataFrame.from_dict(psi, orient='index', columns=['psi']).reset_index() psi.columns = ['变量', 'psi'] train_iv = pd.merge(train_iv, psi, on="变量", how="left") # 变量趋势-测试集 test_bins = self._f_get_bins_by_breaks(test_data, candidate_dict) image_path_list = self._f_save_var_trend(test_bins, x_columns_candidate, "test") metric_value_dict["变量趋势-测试集"] = MetricFucEntity(image_path=image_path_list, image_size=4) metric_value_dict["变量iv"] = MetricFucEntity(table=train_iv, table_font_size=12, table_cell_width=3) # 变量趋势-训练集 image_path_list = self._f_save_var_trend(train_bins, x_columns_candidate, "train") metric_value_dict["变量趋势-训练集"] = MetricFucEntity(image_path=image_path_list, image_size=4) # 变量有效性 train_woe = sc.woebin_ply(train_data[x_columns_candidate], train_bins) train_corr = f_get_corr(train_woe) plt.figure(figsize=(12, 12)) sns.heatmap(train_corr, vmax=1, square=True, cmap='RdBu', annot=True) plt.title('Variables Correlation', fontsize=15) plt.yticks(rotation=0) plt.xticks(rotation=90) path = f_get_save_path(f"var_corr.png") plt.savefig(path) metric_value_dict["变量有效性"] = MetricFucEntity(image_path=path) return metric_value_dict