123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610 |
- # -*- coding:utf-8 -*-
- """
- @author: yq
- @time: 2024/1/2
- @desc: iv值及单调性筛选类
- """
- import json
- from itertools import combinations_with_replacement
- from typing import Dict, Optional, Union
- import matplotlib.pyplot as plt
- import numpy as np
- import pandas as pd
- import scorecardpy as sc
- import seaborn as sns
- from pandas.core.dtypes.common import is_numeric_dtype
- from tqdm import tqdm
- from commom import f_display_images_by_side, NumpyEncoder, GeneralException, f_df_to_image, f_display_title, \
- f_image_crop_white_borders
- from data import DataExplore
- from entitys import DataSplitEntity, MetricFucResultEntity
- from enums import ContextEnum, ResultCodesEnum
- from feature.feature_strategy_base import FeatureStrategyBase
- from init import context
- from .entity import BinInfo, HomologousBinInfo
- from .utils import f_monto_shift, f_get_corr, f_get_vif, f_format_bin, f_trend_shift, f_get_psi, f_woebin_load
- class StrategyWoe(FeatureStrategyBase):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # woe编码需要的分箱信息,复用scorecardpy的格式
- self.sc_woebin = None
- def _f_get_img_corr(self, train_woe) -> Union[str, None]:
- if len(train_woe.columns.to_list()) <= 1:
- return None
- train_corr = f_get_corr(train_woe)
- plt.figure(figsize=(12, 12))
- sns.heatmap(train_corr, vmax=1, square=True, cmap='RdBu', annot=True)
- plt.title('Variables Correlation', fontsize=15)
- plt.yticks(rotation=0)
- plt.xticks(rotation=90)
- img_path = self.ml_config.f_get_save_path(f"corr.png")
- plt.savefig(img_path)
- f_image_crop_white_borders(img_path, img_path)
- return img_path
- def _f_get_img_trend(self, sc_woebin, x_columns, prefix):
- imgs_path = []
- for k in x_columns:
- df_bin = sc_woebin[k]
- # df_bin["bin"] = df_bin["bin"].apply(lambda x: re.sub(r"(\d+\.\d+)",
- # lambda m: "{:.2f}".format(float(m.group(0))), x))
- sc.woebin_plot(df_bin)
- path = self.ml_config.f_get_save_path(f"{prefix}_{k}.png")
- plt.savefig(path)
- imgs_path.append(path)
- return imgs_path
- def _f_best_bins_print(self, display, data: DataSplitEntity, column: str, homo_bin_info: HomologousBinInfo):
- print(f"-----【{column}】不同分箱数下变量的推荐切分点-----")
- imgs_path_trend_train = []
- imgs_path_trend_test = []
- bins_info = homo_bin_info.get_best_bins()
- for bin_info in bins_info:
- print(json.dumps(bin_info.points, ensure_ascii=False, cls=NumpyEncoder))
- breaks_list = [str(i) for i in bin_info.points]
- sc_woebin_train = self._f_get_sc_woebin(data.train_data, {column: bin_info})
- image_path = self._f_get_img_trend(sc_woebin_train, [column],
- f"train_{column}_{'_'.join(breaks_list)}")
- imgs_path_trend_train.append(image_path[0])
- sc_woebin_test = self._f_get_sc_woebin(data.test_data, {column: bin_info})
- image_path = self._f_get_img_trend(sc_woebin_test, [column],
- f"test_{column}_{'_'.join(breaks_list)}")
- imgs_path_trend_test.append(image_path[0])
- f_display_images_by_side(display, imgs_path_trend_train, title=f"训练集",
- image_path_list2=imgs_path_trend_test, title2="测试集")
- def _f_get_sc_woebin(self, data: pd.DataFrame, bin_info_dict: Dict[str, BinInfo]) -> Dict[str, pd.DataFrame]:
- y_column = self.ml_config.y_column
- special_values = self.ml_config.special_values
- x_columns = list(bin_info_dict.keys())
- breaks_list = {column: bin_info.points for column, bin_info in bin_info_dict.items()}
- sc_woebin = sc.woebin(data[x_columns + [y_column]], y=y_column, breaks_list=breaks_list,
- special_values=special_values, print_info=False)
- return sc_woebin
- def _handle_numeric(self, data: DataSplitEntity, x_column: str) -> HomologousBinInfo:
- # 贪婪搜索【训练集】及【测试集】加起来【iv】值最高的且【单调】的分箱
- def _n0(x):
- return sum(x == 0)
- def _n1(x):
- return sum(x == 1)
- def _get_bins_sv(df, x_column):
- y_column = self.ml_config.y_column
- special_values = self.ml_config.get_special_values(x_column)
- # special_values_bins
- bins_sv = pd.DataFrame()
- for special in special_values:
- dtm = df[df[x_column] == special]
- if len(dtm) != 0:
- dtm['bin'] = [str(special)] * len(dtm)
- bin = dtm.groupby(['bin'], group_keys=False)[y_column].agg([_n0, _n1]) \
- .reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
- bin['is_special_values'] = [True] * len(bin)
- bins_sv = pd.concat((bins_sv, bin))
- return bins_sv
- def _get_bins_nsv(df, x_column, breaks_list):
- # no_special_values_bins
- def _left_value(bin: str):
- if "," not in bin:
- return float(bin)
- left = bin.split(",")[0]
- return float(left[1:])
- y_column = self.ml_config.y_column
- dtm = pd.DataFrame({'y': df[y_column], 'value': df[x_column]})
- bstbrks = [-np.inf] + breaks_list + [np.inf]
- labels = ['[{},{})'.format(bstbrks[i], bstbrks[i + 1]) for i in range(len(bstbrks) - 1)]
- dtm.loc[:, 'bin'] = pd.cut(dtm['value'], bstbrks, right=False, labels=labels)
- dtm['bin'] = dtm['bin'].astype(str)
- bins = dtm.groupby(['bin'], group_keys=False)['y'].agg([_n0, _n1]) \
- .reset_index().rename(columns={'_n0': 'good', '_n1': 'bad'})
- bins['is_special_values'] = [False] * len(bins)
- bins["ordered"] = bins['bin'].apply(_left_value)
- # 排序防止计算变量分箱后的单调性错位
- bins = bins.sort_values(by=["ordered"], ascending=[True])
- return bins
- def _get_badprobs(bins):
- bins['count'] = bins['good'] + bins['bad']
- bins['badprob'] = bins['bad'] / bins['count']
- return bins['badprob'].values.tolist()
- def _get_iv(bins):
- infovalue = pd.DataFrame({'good': bins['good'], 'bad': bins['bad']}) \
- .replace(0, 0.9) \
- .assign(DistrBad=lambda x: x.bad / sum(x.bad), DistrGood=lambda x: x.good / sum(x.good)) \
- .assign(iv=lambda x: (x.DistrBad - x.DistrGood) * np.log(x.DistrBad / x.DistrGood)) \
- .iv
- bins['bin_iv'] = infovalue
- bins['total_iv'] = bins['bin_iv'].sum()
- iv = bins['total_iv'].values[0]
- return iv.round(3)
- def _get_points(data_ascending, column):
- def _sampling(raw_list: list, num: int):
- # 按步长采样
- return raw_list[::int(len(raw_list) / num)]
- def _distribute(interval, bin_num):
- parts = int(1 / interval)
- # 穷举分布,隔板法
- total_ways = combinations_with_replacement(range(parts + bin_num - 1), bin_num - 1)
- distributions = []
- # 遍历所有可能的隔板位置
- for combo in total_ways:
- # 根据隔板位置分配球
- distribution = [0] * bin_num
- start = 0
- for i, divider in enumerate(combo):
- distribution[i] = divider - start + 1
- start = divider + 1
- distribution[-1] = parts - start # 最后一个箱子的球数
- # 确保每个箱子至少有一个球
- if all(x > 0 for x in distribution):
- distributions.append(distribution)
- return distributions
- interval = self.ml_config.bin_search_interval
- bin_sample_rate = self.ml_config.bin_sample_rate
- format_bin = self.ml_config.format_bin
- data_x = data_ascending[column]
- data_x_describe = data_x.describe(percentiles=[0.1, 0.9])
- data_x_max = data_x.max()
- # 计算 2 - 5 箱的情况
- distributions_list = []
- for bin_num in list(range(2, 6)):
- distributions = _distribute(interval, bin_num)
- # 4箱及以上得采样,不然耗时太久
- sample_num = 1000 * bin_sample_rate
- if bin_sample_rate <= 0.15:
- sample_num *= 2
- if bin_num == 5:
- sample_num = 4000 * bin_sample_rate
- if bin_num in (4, 5) and len(distributions) >= sample_num:
- distributions = _sampling(distributions, sample_num)
- distributions_list.extend(distributions)
- points_list = []
- for distributions in distributions_list:
- points = []
- point_percentile = [sum(distributions[0:idx + 1]) * interval for idx, _ in
- enumerate(distributions[0:-1])]
- for percentile in point_percentile:
- point = data_x.iloc[int(len(data_x) * percentile)]
- point = float(point)
- if format_bin:
- point = f_format_bin(data_x_describe, point)
- point = round(point, 2)
- if point == 0:
- continue
- # 排除粗分箱后越界的情况
- if point not in points and point < data_x_max:
- points.append(point)
- if points not in points_list and len(points) != 0:
- points_list.append(points)
- return points_list
- special_values = self.ml_config.get_special_values(x_column)
- breaks_list = self.ml_config.get_breaks_list(x_column)
- iv_threshold = self.ml_config.iv_threshold
- psi_threshold = self.ml_config.psi_threshold
- monto_shift_threshold = self.ml_config.monto_shift_threshold
- trend_shift_threshold = self.ml_config.trend_shift_threshold
- train_data = data.train_data
- test_data = data.test_data
- train_data_ascending_nsv = train_data[~train_data[x_column].isin(special_values)] \
- .sort_values(by=x_column, ascending=True)
- test_data_ascending_nsv = test_data[~test_data[x_column].isin(special_values)] \
- .sort_values(by=x_column, ascending=True)
- train_bins_sv = _get_bins_sv(train_data, x_column)
- test_bins_sv = _get_bins_sv(test_data, x_column)
- # 获取每种分箱的信息
- # 构造数据切分点
- is_auto_bins = 1
- if len(breaks_list) != 0:
- points_list_nsv = [breaks_list]
- is_auto_bins = 0
- else:
- points_list_nsv = _get_points(train_data_ascending_nsv, x_column)
- homo_bin_info = HomologousBinInfo(x_column, is_auto_bins, self.ml_config.is_include(x_column))
- # 计算iv psi monto_shift等
- for points in points_list_nsv:
- bin_info = BinInfo()
- bin_info.x_column = x_column
- bin_info.bin_num = len(points) + 1
- bin_info.points = points
- bin_info.is_auto_bins = is_auto_bins
- # 变量iv,与special_values合并计算iv
- train_bins_nsv = _get_bins_nsv(train_data_ascending_nsv, x_column, points)
- train_bins = pd.concat((train_bins_nsv, train_bins_sv))
- train_iv = _get_iv(train_bins)
- test_bins_nsv = _get_bins_nsv(test_data_ascending_nsv, x_column, points)
- test_bins = pd.concat((test_bins_nsv, test_bins_sv))
- test_iv = _get_iv(test_bins)
- bin_info.train_iv = train_iv
- bin_info.test_iv = test_iv
- bin_info.iv = train_iv + test_iv
- bin_info.is_qualified_iv_train = 1 if train_iv > iv_threshold else 0
- # 变量单调性变化次数
- train_badprobs_nsv = _get_badprobs(train_bins_nsv)
- monto_shift_train_nsv = f_monto_shift(train_badprobs_nsv)
- bin_info.monto_shift_nsv = monto_shift_train_nsv
- bin_info.is_qualified_monto_train_nsv = 0 if monto_shift_train_nsv > monto_shift_threshold else 1
- # 变量趋势一致性
- test_badprobs_nsv = _get_badprobs(test_bins_nsv)
- trend_shift_nsv = f_trend_shift(train_badprobs_nsv, test_badprobs_nsv)
- bin_info.trend_shift_nsv = trend_shift_nsv
- bin_info.is_qualified_trend_nsv = 0 if trend_shift_nsv > trend_shift_threshold else 1
- # 变量psi
- psi = f_get_psi(train_bins, test_bins)
- bin_info.psi = psi
- bin_info.is_qualified_psi = 1 if psi < psi_threshold else 0
- homo_bin_info.add(bin_info)
- return homo_bin_info
- def _f_fast_filter(self, data: DataSplitEntity) -> Dict[str, BinInfo]:
- # 通过iv值粗筛变量
- train_data = data.train_data
- test_data = data.test_data
- y_column = self.ml_config.y_column
- x_columns = self.ml_config.x_columns
- columns_exclude = self.ml_config.columns_exclude
- special_values = self.ml_config.special_values
- breaks_list = self.ml_config.breaks_list.copy()
- iv_threshold = self.ml_config.iv_threshold
- psi_threshold = self.ml_config.psi_threshold
- if len(x_columns) == 0:
- x_columns = train_data.columns.tolist()
- if y_column in x_columns:
- x_columns.remove(y_column)
- for column in columns_exclude:
- if column in x_columns:
- x_columns.remove(column)
- check_msg = DataExplore.check_type(train_data[x_columns])
- if check_msg != "":
- print(f"数据类型分析:\n{check_msg}\n同一变量请保持数据类型一致")
- raise GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"数据类型错误.")
- bins_train = sc.woebin(train_data[x_columns + [y_column]], y=y_column, bin_num_limit=5,
- special_values=special_values, breaks_list=breaks_list, print_info=False)
- for column, bin in bins_train.items():
- breaks_list[column] = list(bin[bin["is_special_values"] == False]['breaks'])
- bins_test = sc.woebin(test_data[x_columns + [y_column]], y=y_column,
- special_values=special_values, breaks_list=breaks_list, print_info=False)
- bin_info_fast: Dict[str, BinInfo] = {}
- filter_fast_overview = ""
- for column, bin_train in bins_train.items():
- train_iv = bin_train['total_iv'][0].round(3)
- if train_iv <= iv_threshold and not self.ml_config.is_include(column):
- filter_fast_overview = f"{filter_fast_overview}{column} 因为train_iv【{train_iv}】小于阈值被剔除\n"
- continue
- bin_test = bins_test[column]
- test_iv = bin_test['total_iv'][0].round(3)
- iv = round(train_iv + test_iv, 3)
- psi = f_get_psi(bin_train, bin_test)
- # if psi >= psi_threshold and not self.ml_config.is_include(column):
- # filter_fast_overview = f"{filter_fast_overview}{column} 因为psi【{psi}】大于阈值被剔除\n"
- # continue
- bin_info_fast[column] = BinInfo.ofConvertByDict(
- {"x_column": column, "train_iv": train_iv, "iv": iv, "psi": psi, "points": breaks_list[column]}
- )
- context.set_filter_info(ContextEnum.FILTER_FAST,
- f"筛选前变量数量:{len(x_columns)}\n{x_columns}\n"
- f"快速筛选剔除变量数量:{len(x_columns) - len(bin_info_fast)}\n{filter_fast_overview}")
- return bin_info_fast
- def _f_corr_filter(self, data: DataSplitEntity, bin_info_dict: Dict[str, BinInfo]) -> Dict[str, BinInfo]:
- # 相关性剔除变量
- corr_threshold = self.ml_config.corr_threshold
- train_data = data.train_data
- x_columns = list(bin_info_dict.keys())
- sc_woebin = self._f_get_sc_woebin(train_data, bin_info_dict)
- train_woe = sc.woebin_ply(train_data[x_columns], sc_woebin, print_info=False)
- corr_df = f_get_corr(train_woe)
- corr_dict = corr_df.to_dict()
- filter_corr_overview = ""
- filter_corr_detail = {}
- # 依次判断每个变量对于其它变量的相关性
- for column, corr in corr_dict.items():
- column = column.replace("_woe", "")
- column_remove = []
- overview = f"{column}: "
- if column not in x_columns:
- continue
- for challenger_column, challenger_corr in corr.items():
- challenger_corr = round(challenger_corr, 3)
- challenger_column = challenger_column.replace("_woe", "")
- if challenger_corr < corr_threshold or column == challenger_column \
- or challenger_column not in x_columns:
- continue
- # 相关性大于阈值的情况,选择iv值大的
- iv = bin_info_dict[column].iv
- challenger_iv = bin_info_dict[challenger_column].iv
- if iv > challenger_iv:
- if not self.ml_config.is_include(challenger_column):
- column_remove.append(challenger_column)
- overview = f"{overview}【{challenger_column}_iv{challenger_iv}_corr{challenger_corr}】 "
- else:
- # 自己被剔除的情况下不再记录
- column_remove = []
- overview = ""
- break
- # 剔除与自己相关的变量
- for c in column_remove:
- if c in x_columns:
- x_columns.remove(c)
- if len(column_remove) != 0:
- filter_corr_overview = f"{filter_corr_overview}{overview}\n"
- filter_corr_detail[column] = column_remove
- for column in list(bin_info_dict.keys()):
- if column not in x_columns:
- bin_info_dict.pop(column)
- context.set_filter_info(ContextEnum.FILTER_CORR, filter_corr_overview, filter_corr_detail)
- return bin_info_dict
- def _f_vif_filter(self, data: DataSplitEntity, bin_info_dict: Dict[str, BinInfo]) -> Dict[str, BinInfo]:
- vif_threshold = self.ml_config.vif_threshold
- train_data = data.train_data
- x_columns = list(bin_info_dict.keys())
- sc_woebin = self._f_get_sc_woebin(train_data, bin_info_dict)
- train_woe = sc.woebin_ply(train_data[x_columns], sc_woebin, print_info=False)
- df_vif = f_get_vif(train_woe)
- if df_vif is None:
- return bin_info_dict
- filter_vif_overview = ""
- filter_vif_detail = []
- for _, row in df_vif.iterrows():
- column = row["变量"]
- vif = row["vif"]
- if vif < vif_threshold or self.ml_config.is_include(column):
- continue
- filter_vif_overview = f"{filter_vif_overview}{column} 因为vif【{vif}】大于阈值被剔除\n"
- filter_vif_detail.append(column)
- bin_info_dict.pop(column)
- context.set_filter_info(ContextEnum.FILTER_VIF, filter_vif_overview, filter_vif_detail)
- return bin_info_dict
- def post_filter(self, data: DataSplitEntity, bin_info_dict: Dict[str, BinInfo]):
- # 变量之间进行比较的过滤器
- max_feature_num = self.ml_config.max_feature_num
- bin_info_filtered = self._f_corr_filter(data, bin_info_dict)
- bin_info_filtered = self._f_vif_filter(data, bin_info_filtered)
- bin_info_filtered = BinInfo.ivTopN(bin_info_filtered, max_feature_num)
- self.sc_woebin = self._f_get_sc_woebin(data.train_data, bin_info_filtered)
- context.set(ContextEnum.BIN_INFO_FILTERED, bin_info_filtered)
- context.set(ContextEnum.WOEBIN, self.sc_woebin)
- def feature_search(self, data: DataSplitEntity, *args, **kwargs):
- # 粗筛
- bin_info_fast = self._f_fast_filter(data)
- x_columns = list(bin_info_fast.keys())
- bin_info_filtered: Dict[str, BinInfo] = {}
- # 数值型变量多种分箱方式的中间结果
- homo_bin_info_numeric_set: Dict[str, HomologousBinInfo] = {}
- filter_numeric_overview = ""
- filter_numeric_detail = []
- for x_column in tqdm(x_columns):
- if is_numeric_dtype(data.train_data[x_column]):
- # 数值型变量筛选
- homo_bin_info_numeric: HomologousBinInfo = self._handle_numeric(data, x_column)
- if homo_bin_info_numeric.is_auto_bins:
- homo_bin_info_numeric_set[x_column] = homo_bin_info_numeric
- # iv psi 变量单调性 变量趋势一致性 筛选
- bin_info: Optional[BinInfo] = homo_bin_info_numeric.filter()
- if bin_info is not None:
- bin_info_filtered[x_column] = bin_info
- else:
- # 不满足要求被剔除
- filter_numeric_overview = f"{filter_numeric_overview}{x_column} {homo_bin_info_numeric.drop_reason()}\n"
- filter_numeric_detail.append(x_column)
- else:
- # 字符型暂时用scorecardpy来处理
- bin_info_filtered[x_column] = bin_info_fast[x_column]
- self.post_filter(data, bin_info_filtered)
- context.set(ContextEnum.HOMO_BIN_INFO_NUMERIC_SET, homo_bin_info_numeric_set)
- context.set_filter_info(ContextEnum.FILTER_NUMERIC, filter_numeric_overview, filter_numeric_detail)
- def variable_analyse(self, data: DataSplitEntity, column: str, format_bin=None, *args, **kwargs):
- from IPython import display
- if is_numeric_dtype(data.train_data[column]):
- format_bin_mlcfg = self.ml_config.format_bin
- if format_bin is not None:
- self.ml_config._format_bin = format_bin
- homo_bin_info_numeric: HomologousBinInfo = self._handle_numeric(data, column)
- self._f_best_bins_print(display, data, column, homo_bin_info_numeric)
- self.ml_config._format_bin = format_bin_mlcfg
- else:
- print("只能针对数值型变量进行分析。")
- def feature_save(self, *args, **kwargs):
- if self.sc_woebin is None:
- GeneralException(ResultCodesEnum.NOT_FOUND, message=f"feature不存在")
- df_woebin = pd.concat(self.sc_woebin.values())
- path = self.ml_config.f_get_save_path(f"feature.csv")
- df_woebin.to_csv(path)
- print(f"feature save to【{path}】success. ")
- def feature_load(self, path: str, *args, **kwargs):
- self.sc_woebin = f_woebin_load(path)
- def feature_generate(self, data: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
- x_columns = list(self.sc_woebin.keys())
- # 排个序,防止因为顺序原因导致的可能的bug
- x_columns.sort()
- data_woe = sc.woebin_ply(data[x_columns], self.sc_woebin, print_info=False)
- return data_woe
- def feature_report(self, data: DataSplitEntity, *args, **kwargs) -> Dict[str, MetricFucResultEntity]:
- y_column = self.ml_config.y_column
- columns_anns = self.ml_config.columns_anns
- x_columns = list(self.sc_woebin.keys())
- train_data = data.train_data
- test_data = data.test_data
- # 跨模块调用中间结果,所以从上下文里取
- bin_info_filtered: Dict[str, BinInfo] = context.get(ContextEnum.BIN_INFO_FILTERED)
- metric_value_dict = {}
- # 样本分布
- metric_value_dict["样本分布"] = MetricFucResultEntity(table=data.get_distribution(y_column), table_font_size=10,
- table_cell_width=3)
- # 变量相关性
- sc_woebin_train = self._f_get_sc_woebin(train_data, bin_info_filtered)
- train_woe = sc.woebin_ply(train_data[x_columns], sc_woebin_train, print_info=False)
- img_path_corr = self._f_get_img_corr(train_woe)
- metric_value_dict["变量相关性"] = MetricFucResultEntity(image_path=img_path_corr)
- # 变量iv、psi、vif
- df_iv_psi_vif = pd.DataFrame()
- train_iv = [bin_info_filtered[column].train_iv for column in x_columns]
- psi = [bin_info_filtered[column].psi for column in x_columns]
- anns = [columns_anns.get(column, "-") for column in x_columns]
- df_iv_psi_vif["变量"] = x_columns
- df_iv_psi_vif["iv"] = train_iv
- df_iv_psi_vif["psi"] = psi
- df_vif = f_get_vif(train_woe)
- if df_vif is not None:
- df_iv_psi_vif = pd.merge(df_iv_psi_vif, df_vif, on="变量", how="left")
- df_iv_psi_vif["释义"] = anns
- df_iv_psi_vif.sort_values(by=["iv"], ascending=[False], inplace=True)
- img_path_iv = self.ml_config.f_get_save_path(f"iv.png")
- f_df_to_image(df_iv_psi_vif, img_path_iv)
- metric_value_dict["变量iv"] = MetricFucResultEntity(table=df_iv_psi_vif, image_path=img_path_iv)
- # 变量趋势-训练集
- imgs_path_trend_train = self._f_get_img_trend(sc_woebin_train, x_columns, "train")
- metric_value_dict["变量趋势-训练集"] = MetricFucResultEntity(image_path=imgs_path_trend_train, image_size=4)
- # 变量趋势-测试集
- sc_woebin_test = self._f_get_sc_woebin(test_data, bin_info_filtered)
- imgs_path_trend_test = self._f_get_img_trend(sc_woebin_test, x_columns, "test")
- metric_value_dict["变量趋势-测试集"] = MetricFucResultEntity(image_path=imgs_path_trend_test, image_size=4)
- # context.set(ContextEnum.METRIC_FEATURE.value, metric_value_dict)
- if self.ml_config.jupyter_print:
- self.jupyter_print(data, metric_value_dict)
- return metric_value_dict
- def jupyter_print(self, data: DataSplitEntity, metric_value_dict=Dict[str, MetricFucResultEntity]):
- from IPython import display
- def detail_print(detail):
- if isinstance(detail, str):
- detail = [detail]
- if isinstance(detail, list):
- for column in detail:
- homo_bin_info_numeric = homo_bin_info_numeric_set.get(column)
- if homo_bin_info_numeric is None:
- continue
- self._f_best_bins_print(display, data, column, homo_bin_info_numeric)
- if isinstance(detail, dict):
- for column, challenger_columns in detail.items():
- print(f"-----相关性筛选保留的【{column}】-----")
- detail_print(column)
- detail_print(challenger_columns)
- def filter_print(filter, title, notes=""):
- f_display_title(display, title)
- print(notes)
- print(filter.get("overview"))
- detail = filter.get("detail")
- if detail is not None and self.ml_config.bin_detail_print:
- detail_print(detail)
- bin_info_filtered: Dict[str, BinInfo] = context.get(ContextEnum.BIN_INFO_FILTERED)
- homo_bin_info_numeric_set: Dict[str, HomologousBinInfo] = context.get(ContextEnum.HOMO_BIN_INFO_NUMERIC_SET)
- filter_fast = context.get(ContextEnum.FILTER_FAST)
- filter_numeric = context.get(ContextEnum.FILTER_NUMERIC)
- filter_corr = context.get(ContextEnum.FILTER_CORR)
- filter_vif = context.get(ContextEnum.FILTER_VIF)
- filter_ivtop = context.get(ContextEnum.FILTER_IVTOP)
- f_display_title(display, "样本分布")
- display.display(metric_value_dict["样本分布"].table)
- # 打印变量iv
- f_display_title(display, "变量iv")
- display.display(metric_value_dict["变量iv"].table)
- # 打印变量相关性
- f_display_images_by_side(display, metric_value_dict["变量相关性"].image_path, width=800)
- # 打印变量趋势
- f_display_title(display, "变量趋势")
- imgs_path_trend_train = metric_value_dict["变量趋势-训练集"].image_path
- imgs_path_trend_test = metric_value_dict.get("变量趋势-测试集").image_path
- f_display_images_by_side(display, imgs_path_trend_train, title="训练集", image_path_list2=imgs_path_trend_test,
- title2="测试集")
- # 打印breaks_list
- breaks_list = {column: bin_info.points for column, bin_info in bin_info_filtered.items()}
- print("变量切分点:")
- print(json.dumps(breaks_list, ensure_ascii=False, indent=2, cls=NumpyEncoder))
- print("选中变量不同分箱数下变量的推荐切分点:")
- detail_print(list(bin_info_filtered.keys()))
- # 打印fast_filter筛选情况
- filter_print(filter_fast, "快速筛选过程", "剔除train_iv小于阈值")
- filter_print(filter_numeric, "数值变量筛选过程")
- filter_print(filter_corr, "相关性筛选过程")
- filter_print(filter_vif, "vif筛选过程")
- filter_print(filter_ivtop, "ivtop筛选过程", "iv = train_iv + test_iv")
|