123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- """
- @author: yq
- @time: 2023/12/28
- @desc: 特征工具类
- """
- import pandas as pd
- from sklearn.preprocessing import KBinsDiscretizer
- from entitys import DataSplitEntity
- from enums import BinsStrategyEnum
- import scorecardpy as sc
- import toad as td
- def f_get_bins(data: DataSplitEntity, feat: str, strategy: str = 'quantile', nbins: int = 10) -> pd.DataFrame:
-
- if strategy == BinsStrategyEnum.QUANTILE.value:
- kbin_encoder = KBinsDiscretizer(n_bins=nbins, encode='ordinal', strategy='quantile')
- feature_binned = kbin_encoder.fit_transform(data[feat])
- return feature_binned.astype(int).astype(str)
-
- if strategy == BinsStrategyEnum.WIDTH.value:
- bin_width = (data.train_data()[feat].max() - data.train_data()[feat].min()) / nbins
- return pd.cut(data.train_data()[feat], bins=nbins, labels=[f'Bin_{i}' for i in range(1, nbins + 1)])
-
- '''
- c = td.transfrom.Combiner()
- # method参数需要根据toad指定的几种方法名称选择
- c.fit(data, y = 'target', method = strategy, min_samples=None, n_bins = nbins, empty_separate = False)
- # 返回toad分箱combiner,用于训练集和测试集的分箱
- # 可使用c.export()[feature]查看某一特征的分箱临界值
- return c
- '''
- def f_get_bins_display(bins_info: pd.DataFrame) -> pd.DataFrame:
- df_list = []
- for col, bin_data in bins_info.items():
- tmp_df = pd.DataFrame(bin_data)
- df_list.append(tmp_df)
- result_df = pd.concat(df_list, ignore_index=True)
- total_bad = result_df['bad'].sum()
- total_cnt = result_df['count'].sum()
-
- br_overall = total_bad / total_cnt
- result_df['lift'] = result_df['badprob'] / br_overall
- result_df = \
- result_df.sort_values(['total_iv', 'variable'], ascending=False).set_index(['variable', 'total_iv', 'bin']) \
- [['count_distr', 'count', 'good', 'bad', 'badprob', 'lift', 'bin_iv', 'woe']]
- return result_df.style.format(subset=['count', 'good', 'bad'], precision=0).format(
- subset=['count_distr', 'bad', 'lift',
- 'badprob', 'woe', 'bin_iv'], precision=4).bar(subset=['badprob', 'bin_iv', 'lift'],
- color=['#d65f58', '#5fbb7a'])
- def f_bins_filter(bins: pd.DataFrame, cols: list) -> list:
- result_cols = []
-
- for tmp_col in cols:
- tmp_br = bins[tmp_col]['bad_prob'].values.tolist()
- tmp_len = len(tmp_br)
- if tmp_len <= 2:
- result_cols.append(tmp_col)
- else:
- tmp_judge = f_judge_monto(tmp_br)
-
- if tmp_judge:
- result_cols.append(tmp_col)
- return result_cols
- def f_judge_monto(bd_list: list, pos_neg_cnt: int = 1) -> int:
- start_tr = bd_list[1] - bd_list[0]
- tmp_len = len(bd_list)
- pos_neg_flag = 0
- for i in range(2, tmp_len):
- tmp_tr = bd_list[i] - bd_list[i - 1]
-
-
- if (tmp_tr >= 0 and start_tr >= 0) or (tmp_tr <= 0 and start_tr <= 0):
-
- continue
- else:
-
- pos_neg_flag += 1
- if pos_neg_flag > pos_neg_cnt:
- return False
-
- if pos_neg_flag <= pos_neg_cnt:
- return True
- return False
- def f_get_woe(data: DataSplitEntity, c: td.transform.Combiner, to_drop: list) -> pd.DataFrame:
- transer = td.transform.WOETransformer()
-
- train_woe = transer.fit_transform(c.transform(data.train_data()), data.train_data()['target'],
- exclude=to_drop + ['target'])
- test_woe = transer.transform(c.transfrom(data.test_data()))
- oot_woe = transer.transform(c.transform(data.val_data()))
- return train_woe, test_woe, oot_woe
- def f_get_iv(data: DataSplitEntity) -> pd.DataFrame:
-
- return td.quality(data, 'target', iv_only=True)
- def f_get_psi(train_data: DataSplitEntity, oot_data: DataSplitEntity) -> pd.DataFrame:
-
- return td.metrics.PSI(train_data, oot_data)
- def f_get_corr(data: DataSplitEntity, meth: str = 'spearman') -> pd.DataFrame:
- return data.train_data().corr(method=meth)
- def f_get_ivf(data: DataSplitEntity) -> pd.DataFrame:
- pass
- def f_get_best_bins(data: DataSplitEntity, x_column: str, special_values: list = []):
- interval = 0.05
-
- train_data = data.train_data
- train_data_filter = train_data[~train_data[x_column].isin(special_values)]
- train_data_filter = train_data_filter.sort_values(by=x_column, ascending=True)
-
-
-
-
-
-
- x_train_data = train_data_filter[x_column]
-
- bin_num_list = list(range(2, 6))
- for bin_num in bin_num_list:
-
- point_list = []
- init_point_percentile_list = [interval * i for i in range(1, bin_num)]
- init_point_percentile_list.append(1 - point_list[-1])
- for point_percentile in init_point_percentile_list:
- point = x_train_data.iloc[int(len(x_train_data) * point_percentile)]
- if point not in point_list:
- point_list.append(point)
-
- bins = sc.woebin(train_data, y=data.y_column, breaks_list=point_list)
-
- pass
|