123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- # -*- coding:utf-8 -*-
- """
- @author: yq
- @time: 2024/1/2
- @desc: iv值及单调性筛选类
- """
- import json
- from itertools import combinations_with_replacement
- from typing import List, Dict
- import matplotlib.pyplot as plt
- import numpy as np
- import pandas as pd
- import scorecardpy as sc
- import seaborn as sns
- from pandas.core.dtypes.common import is_numeric_dtype
- from tqdm import tqdm
- from commom import f_display_images_by_side
- from entitys import DataSplitEntity, CandidateFeatureEntity, DataPreparedEntity, DataFeatureEntity, MetricFucEntity
- from .feature_utils import f_judge_monto, f_get_corr, f_get_ivf, f_format_bin
- from .filter_strategy_base import FilterStrategyBase
- class StrategyIv(FilterStrategyBase):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- def _f_get_iv_by_bins(self, bins) -> pd.DataFrame:
- iv = {key_: [round(value_['total_iv'].max(), 4)] for key_, value_ in bins.items()}
- iv = pd.DataFrame.from_dict(iv, orient='index', columns=['IV']).reset_index()
- iv = iv.sort_values('IV', ascending=False).reset_index(drop=True)
- iv.columns = ['变量', 'IV']
- return iv
- def _f_get_var_corr_image(self, train_woe):
- if len(train_woe.columns.to_list()) <= 1:
- return None
- train_corr = f_get_corr(train_woe)
- plt.figure(figsize=(12, 12))
- sns.heatmap(train_corr, vmax=1, square=True, cmap='RdBu', annot=True)
- plt.title('Variables Correlation', fontsize=15)
- plt.yticks(rotation=0)
- plt.xticks(rotation=90)
- path = self.data_process_config.f_get_save_path(f"var_corr.png")
- plt.savefig(path)
- return path
- def _f_save_var_trend(self, bins, x_columns_candidate, prefix):
- image_path_list = []
- for k in x_columns_candidate:
- bin_df = bins[k]
- # bin_df["bin"] = bin_df["bin"].apply(lambda x: re.sub(r"(\d+\.\d+)",
- # lambda m: "{:.2f}".format(float(m.group(0))), x))
- sc.woebin_plot(bin_df)
- path = self.data_process_config.f_get_save_path(f"{prefix}_{k}.png")
- plt.savefig(path)
- image_path_list.append(path)
- return image_path_list
- def _f_get_bins_by_breaks(self, data: pd.DataFrame, candidate_dict: Dict[str, CandidateFeatureEntity],
- y_column=None):
- y_column = self.data_process_config.y_column if y_column is None else y_column
- special_values = self.data_process_config.special_values
- x_columns_candidate = list(candidate_dict.keys())
- breaks_list = {}
- for column, candidate in candidate_dict.items():
- breaks_list[column] = candidate.breaks_list
- bins = sc.woebin(data[x_columns_candidate + [y_column]], y=y_column, breaks_list=breaks_list,
- special_values=special_values, print_info=False)
- return bins
- def _f_corr_filter(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity]) -> List[str]:
- # 相关性剔除变量
- corr_threshold = self.data_process_config.corr_threshold
- train_data = data.train_data
- x_columns_candidate = list(candidate_dict.keys())
- bins = self._f_get_bins_by_breaks(train_data, candidate_dict)
- train_woe = sc.woebin_ply(train_data[x_columns_candidate], bins, print_info=False)
- corr_df = f_get_corr(train_woe)
- corr_dict = corr_df.to_dict()
- for column, corr in corr_dict.items():
- column = column.replace("_woe", "")
- if column not in x_columns_candidate:
- continue
- for challenger_column, challenger_corr in corr.items():
- challenger_column = challenger_column.replace("_woe", "")
- if challenger_corr < corr_threshold or column == challenger_column \
- or challenger_column not in x_columns_candidate:
- continue
- iv_max = candidate_dict[column].iv_max
- challenger_iv_max = candidate_dict[challenger_column].iv_max
- if iv_max > challenger_iv_max:
- x_columns_candidate.remove(challenger_column)
- else:
- x_columns_candidate.remove(column)
- break
- return x_columns_candidate
- def _f_wide_filter(self, data: DataSplitEntity) -> Dict:
- # 粗筛变量
- train_data = data.train_data
- test_data = data.test_data
- special_values = self.data_process_config.special_values
- breaks_list = self.data_process_config.breaks_list.copy()
- y_column = self.data_process_config.y_column
- iv_threshold_wide = self.data_process_config.iv_threshold_wide
- x_columns_candidate = self.data_process_config.x_columns_candidate
- if x_columns_candidate is None or len(x_columns_candidate) == 0:
- x_columns_candidate = train_data.columns.tolist()
- if y_column in x_columns_candidate:
- x_columns_candidate.remove(y_column)
- bins_train = sc.woebin(train_data[x_columns_candidate + [y_column]], y=y_column, bin_num_limit=5,
- special_values=special_values, breaks_list=breaks_list, print_info=False)
- for column, bin in bins_train.items():
- breaks_list[column] = list(bin['breaks'])
- bins_test = None
- if test_data is not None and len(test_data) != 0:
- bins_test = sc.woebin(test_data[x_columns_candidate + [y_column]], y=y_column,
- special_values=special_values, breaks_list=breaks_list, print_info=False)
- bins_iv_dict = {}
- for column, bin_train in bins_train.items():
- train_iv = bin_train['total_iv'][0]
- test_iv = 0
- if bins_test is not None:
- bin_test = bins_test[column]
- test_iv = bin_test['total_iv'][0]
- iv_max = train_iv + test_iv
- if train_iv < iv_threshold_wide:
- continue
- bins_iv_dict[column] = {"iv_max": iv_max, "breaks_list": breaks_list[column]}
- return bins_iv_dict
- def _f_get_best_bins_numeric(self, data: DataSplitEntity, x_column: str):
- # 贪婪搜索【训练集】及【测试集】加起来【iv】值最高的且【单调】的分箱
- interval = self.data_process_config.bin_search_interval
- iv_threshold = self.data_process_config.iv_threshold
- special_values = self.data_process_config.get_special_values(x_column)
- breaks_list = self.data_process_config.get_breaks_list(x_column)
- y_column = self.data_process_config.y_column
- sample_rate = self.data_process_config.sample_rate
- format_bin = self.data_process_config.format_bin
- pos_neg_cnt = self.data_process_config.pos_neg_cnt
- def _n0(x):
- return sum(x == 0)
- def _n1(x):
- return sum(x == 1)
- def _f_distribute_balls(balls, boxes):
- # 计算在 balls - 1 个空位中放入 boxes - 1 个隔板的方法数
- total_ways = combinations_with_replacement(range(balls + boxes - 1), boxes - 1)
- distribute_list = []
- # 遍历所有可能的隔板位置
- for combo in total_ways:
- # 根据隔板位置分配球
- distribution = [0] * boxes
- start = 0
- for i, divider in enumerate(combo):
- distribution[i] = divider - start + 1
- start = divider + 1
- distribution[-1] = balls - start # 最后一个箱子的球数
- # 确保每个箱子至少有一个球
- if all(x > 0 for x in distribution):
- distribute_list.append(distribution)
- return distribute_list
- def _get_sv_bins(df, x_column, y_column, special_values):
- # special_values_bins
- sv_bin_list = []
- for special in special_values:
- dtm = df[df[x_column] == special]
- if len(dtm) != 0:
- dtm['bin'] = [str(special)] * len(dtm)
- binning = dtm.groupby(['bin'], group_keys=False)[y_column].agg(
- [_n0, _n1]).reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
- binning['is_special_values'] = [True] * len(binning)
- sv_bin_list.append(binning)
- return sv_bin_list
- def _get_bins(df, x_column, y_column, breaks_list):
- dtm = pd.DataFrame({'y': df[y_column], 'value': df[x_column]})
- bstbrks = [-np.inf] + breaks_list + [np.inf]
- labels = ['[{},{})'.format(bstbrks[i], bstbrks[i + 1]) for i in range(len(bstbrks) - 1)]
- dtm.loc[:, 'bin'] = pd.cut(dtm['value'], bstbrks, right=False, labels=labels)
- dtm['bin'] = dtm['bin'].astype(str)
- bins = dtm.groupby(['bin'], group_keys=False)['y'].agg([_n0, _n1]) \
- .reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
- bins['is_special_values'] = [False] * len(bins)
- return bins
- def _calculation_iv(bins, judge_monto=True, pos_neg_cnt=1):
- bins['count'] = bins['good'] + bins['bad']
- bins['badprob'] = bins['bad'] / bins['count']
- # 单调性判断
- bad_prob = bins[bins['is_special_values'] == False]['badprob'].values.tolist()
- if judge_monto and not f_judge_monto(bad_prob, pos_neg_cnt):
- return -1
- # 计算iv
- infovalue = pd.DataFrame({'good': bins['good'], 'bad': bins['bad']}) \
- .replace(0, 0.9) \
- .assign(
- DistrBad=lambda x: x.bad / sum(x.bad),
- DistrGood=lambda x: x.good / sum(x.good)
- ) \
- .assign(iv=lambda x: (x.DistrBad - x.DistrGood) * np.log(x.DistrBad / x.DistrGood)) \
- .iv
- bins['bin_iv'] = infovalue
- bins['total_iv'] = bins['bin_iv'].sum()
- iv = bins['total_iv'].values[0]
- return iv
- def _f_sampling(distribute_list: list, sample_rate: float):
- # 采样,完全贪婪搜索耗时太长
- sampled_list = distribute_list[::int(1 / sample_rate)]
- return sampled_list
- train_data = data.train_data
- train_data_filter = train_data[~train_data[x_column].isin(special_values)]
- train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True)
- train_data_x = train_data_filter[x_column]
- train_data_x_describe = train_data_x.describe(percentiles=[0.1, 0.9])
- test_data = data.test_data
- test_data_filter = None
- if test_data is not None and len(test_data) != 0:
- test_data_filter = test_data[~test_data[x_column].isin(special_values)]
- test_data_filter = test_data_filter.sort_values(by=x_column, ascending=True)
- # 构造数据切分点
- # 计算 2 - 5 箱的情况
- distribute_list = []
- points_list = []
- for bin_num in list(range(2, 6)):
- distribute_list_cache = _f_distribute_balls(int(1 / interval), bin_num)
- # 4箱及以上得采样,不然耗时太久
- sample_num = 1000 * sample_rate
- if sample_rate <= 0.15:
- sample_num *= 2
- if bin_num == 4 and len(distribute_list_cache) >= sample_num:
- distribute_list_cache = _f_sampling(distribute_list_cache, sample_num / len(distribute_list_cache))
- sample_num = 4000 * sample_rate
- if bin_num == 5 and len(distribute_list_cache) >= sample_num:
- distribute_list_cache = _f_sampling(distribute_list_cache, sample_num / len(distribute_list_cache))
- distribute_list.extend(distribute_list_cache)
- for distribute in distribute_list:
- point_list_cache = []
- point_percentile_list = [sum(distribute[0:idx + 1]) * interval for idx, _ in enumerate(distribute[0:-1])]
- for point_percentile in point_percentile_list:
- point = train_data_x.iloc[int(len(train_data_x) * point_percentile)]
- if format_bin:
- point = f_format_bin(train_data_x_describe, point)
- if point not in point_list_cache:
- point_list_cache.append(point)
- if point_list_cache not in points_list:
- points_list.append(point_list_cache)
- # IV与单调性过滤
- iv_max = 0
- breaks_list_target = None
- judge_monto = True
- if len(breaks_list) != 0:
- points_list = [breaks_list]
- judge_monto = False
- train_sv_bin_list = _get_sv_bins(train_data, x_column, y_column, special_values)
- test_sv_bin_list = None
- if test_data_filter is not None:
- test_sv_bin_list = _get_sv_bins(test_data, x_column, y_column, special_values)
- for point_list in points_list:
- train_bins = _get_bins(train_data_filter, x_column, y_column, point_list)
- # 与special_values合并计算iv
- for sv_bin in train_sv_bin_list:
- train_bins = pd.concat((train_bins, sv_bin))
- # _calculation_iv包含了单调性判断,并排除了特殊值
- train_iv = _calculation_iv(train_bins, judge_monto, pos_neg_cnt)
- # 只限制训练集的单调性与iv值大小
- if train_iv < iv_threshold:
- continue
- test_iv = 0
- if test_data_filter is not None:
- test_bins = _get_bins(test_data_filter, x_column, y_column, point_list)
- for sv_bin in test_sv_bin_list:
- test_bins = pd.concat((test_bins, sv_bin))
- test_iv = _calculation_iv(test_bins, judge_monto, pos_neg_cnt)
- iv = train_iv + test_iv
- if iv > iv_max:
- iv_max = iv
- breaks_list_target = point_list
- return iv_max, breaks_list_target
- def filter(self, data: DataSplitEntity, *args, **kwargs) -> Dict[str, CandidateFeatureEntity]:
- # 粗筛
- bins_iv_dict = self._f_wide_filter(data)
- x_columns_candidate = list(bins_iv_dict.keys())
- candidate_num = self.data_process_config.candidate_num
- candidate_dict: Dict[str, CandidateFeatureEntity] = {}
- for x_column in tqdm(x_columns_candidate):
- if is_numeric_dtype(data.train_data[x_column]):
- iv_max, breaks_list = self._f_get_best_bins_numeric(data, x_column)
- if breaks_list is None:
- continue
- candidate_dict[x_column] = CandidateFeatureEntity(x_column, breaks_list, iv_max)
- else:
- # 字符型暂时用scorecardpy来处理
- candidate_dict[x_column] = CandidateFeatureEntity(x_column, bins_iv_dict[x_column]["breaks_list"],
- bins_iv_dict[x_column]["iv_max"])
- # 相关性进一步剔除变量
- x_columns_candidate = self._f_corr_filter(data, candidate_dict)
- candidate_list: List[CandidateFeatureEntity] = []
- for x_column, v in candidate_dict.items():
- if x_column in x_columns_candidate:
- candidate_list.append(v)
- candidate_list.sort(key=lambda x: x.iv_max, reverse=True)
- candidate_list = candidate_list[0:candidate_num]
- candidate_dict = {}
- for candidate in candidate_list:
- candidate_dict[candidate.x_column] = candidate
- return candidate_dict
- def feature_generate(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity], *args,
- **kwargs) -> DataPreparedEntity:
- train_data = data.train_data
- val_data = data.val_data
- test_data = data.test_data
- y_column = self.data_process_config.y_column
- x_columns_candidate = list(candidate_dict.keys())
- bins = self._f_get_bins_by_breaks(train_data, candidate_dict)
- train_woe = sc.woebin_ply(train_data[x_columns_candidate], bins, print_info=False)
- train_data_feature = DataFeatureEntity(pd.concat((train_woe, train_data[y_column]), axis=1),
- train_woe.columns.tolist(), y_column)
- val_data_feature = None
- if val_data is not None and len(val_data) != 0:
- val_woe = sc.woebin_ply(val_data[x_columns_candidate], bins, print_info=False)
- val_data_feature = DataFeatureEntity(pd.concat((val_woe, val_data[y_column]), axis=1),
- train_woe.columns.tolist(), y_column)
- test_data_feature = None
- if test_data is not None and len(test_data) != 0:
- test_woe = sc.woebin_ply(test_data[x_columns_candidate], bins, print_info=False)
- test_data_feature = DataFeatureEntity(pd.concat((test_woe, test_data[y_column]), axis=1),
- train_woe.columns.tolist(), y_column)
- return DataPreparedEntity(train_data_feature, val_data_feature, test_data_feature, bins=bins,
- data_split_original=data)
- def feature_report(self, data: DataSplitEntity, candidate_dict: Dict[str, CandidateFeatureEntity],
- *args, **kwargs) -> Dict[str, MetricFucEntity]:
- y_column = self.data_process_config.y_column
- jupyter = self.data_process_config.jupyter
- x_columns_candidate = list(candidate_dict.keys())
- train_data = data.train_data
- test_data = data.test_data
- metric_value_dict = {}
- # 样本分布
- metric_value_dict["样本分布"] = MetricFucEntity(table=data.get_distribution(y_column), table_font_size=10,
- table_cell_width=3)
- # 变量iv及psi
- train_bins = self._f_get_bins_by_breaks(train_data, candidate_dict)
- train_iv = self._f_get_iv_by_bins(train_bins)
- if test_data is not None and len(test_data) != 0:
- # 计算psi仅需把y改成识别各自训练集测试集即可
- psi_df = pd.concat((train_data, test_data))
- psi_df["#target#"] = [1] * len(train_data) + [0] * len(test_data)
- psi = self._f_get_bins_by_breaks(psi_df, candidate_dict, y_column="#target#")
- psi = self._f_get_iv_by_bins(psi)
- psi.columns = ['变量', 'psi']
- train_iv = pd.merge(train_iv, psi, on="变量", how="left")
- # 变量趋势-测试集
- test_bins = self._f_get_bins_by_breaks(test_data, candidate_dict)
- image_path_list = self._f_save_var_trend(test_bins, x_columns_candidate, "test")
- metric_value_dict["变量趋势-测试集"] = MetricFucEntity(image_path=image_path_list, image_size=4)
- metric_value_dict["变量iv"] = MetricFucEntity(table=train_iv, table_font_size=10, table_cell_width=3)
- # 变量趋势-训练集
- image_path_list = self._f_save_var_trend(train_bins, x_columns_candidate, "train")
- metric_value_dict["变量趋势-训练集"] = MetricFucEntity(image_path=image_path_list, image_size=4)
- # 变量有效性
- train_woe = sc.woebin_ply(train_data[x_columns_candidate], train_bins, print_info=False)
- var_corr_image_path = self._f_get_var_corr_image(train_woe)
- # vif
- vif_df = f_get_ivf(train_woe)
- metric_value_dict["变量有效性"] = MetricFucEntity(image_path=var_corr_image_path, table=vif_df)
- if jupyter:
- from IPython import display
- display.display(metric_value_dict["样本分布"].table)
- # 打印变量iv
- display.display(metric_value_dict["变量iv"].table)
- # 打印vif
- display.display(metric_value_dict["变量有效性"].table)
- # 打印变量相关性
- f_display_images_by_side(metric_value_dict["变量有效性"].image_path, display, width=800)
- # 打印变量趋势
- var_trend_train = metric_value_dict["变量趋势-训练集"].image_path
- var_trend_test = None
- metric_test = metric_value_dict.get("变量趋势-测试集")
- if metric_test is not None:
- var_trend_test = metric_test.image_path
- f_display_images_by_side(var_trend_train, display, title="变量趋势训练集", image_path_list2=var_trend_test,
- title2="变量趋势测试集")
- # 打印breaks_list
- breaks_list = {}
- for x_column, feature in candidate_dict.items():
- breaks_list[x_column] = feature.breaks_list
- print("变量切分点:")
- print(json.dumps(breaks_list, ensure_ascii=False, indent=2))
- return metric_value_dict
|