# -*- coding:utf-8 -*- """ @author: yq @time: 2023/12/28 @desc: 特征工具类 """ import json import os from typing import Union import numpy as np import pandas as pd from statsmodels.stats.outliers_influence import variance_inflation_factor as vif from commom import GeneralException, f_is_number from enums import ResultCodesEnum, FileEnum FORMAT_DICT = { # 比例类 -1 - 1 "bin_rate1": np.arange(-1, 1 + 0.1, 0.1).tolist(), # 次数类1 0 -10 "bin_cnt1": np.arange(0.0, 11.0, 1.0).tolist(), # 次数类2 0 - 20 "bin_cnt2": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 12.0, 15.0, 17.0, 20.0], # 次数类3 0 - 50 "bin_cnt3": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0], # 次数类4 0 - 100 "bin_cnt4": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0, 80.0, 100.0], # 金额类1 0 - 1w "bin_amt1": np.arange(0, 1.1e4, 1e3).tolist(), # 金额类2 0 - 5w "bin_amt2": np.arange(0, 5.5e4, 5e3).tolist(), # 金额类3 0 - 10w "bin_amt3": np.arange(0, 11e4, 1e4).tolist(), # 金额类4 0 - 20w "bin_amt4": [0.0, 1e4, 2e4, 3e4, 4e4, 5e4, 8e4, 10e4, 15e4, 20e4], # 金额类5 0 - 100w "bin_amt5": [0.0, 5e4, 10e4, 15e4, 20e4, 25e4, 30e4, 40e4, 50e4, 100e4], # 年龄类 "bin_age": [20.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0, 55.0, 60.0, 65.0], } # 粗分箱 def f_format_bin(data_describe: pd.Series, raw_v): percent10 = data_describe["10%"] percent90 = data_describe["90%"] format_v = raw_v # 筛选最合适的标准化分箱节点 bin = None for k, v_list in FORMAT_DICT.items(): bin_min = min(v_list) bin_max = max(v_list) if percent10 >= bin_min and percent90 <= bin_max: if bin is None: bin = (k, bin_max) elif bin[1] > bin_max: bin = (k, bin_max) if bin is None: return format_v # 选择分箱内适合的切分点 v_list = FORMAT_DICT[bin[0]] for idx in range(1, len(v_list)): v_left = v_list[idx - 1] v_right = v_list[idx] # 就近原则 if v_left <= raw_v <= v_right: format_v = v_right if (raw_v - v_left) - (v_right - raw_v) > 0 else v_left if format_v not in v_list: if format_v > v_list[-1]: format_v = v_list[-1] if format_v < v_list[0]: format_v = v_list[0] return format_v # 单调性变化次数 def f_monto_shift(badprobs: list) -> int: if len(badprobs) <= 2: return 0 before = badprobs[1] - badprobs[0] change_cnt = 0 for i in range(2, len(badprobs)): next = badprobs[i] - badprobs[i - 1] # 后一位bad_rate减前一位bad_rate,保证bad_rate的单调性 if (next >= 0 and before >= 0) or (next <= 0 and before <= 0): # 满足趋势保持,查看下一位 continue else: # 记录一次符号变化 before = next change_cnt += 1 return change_cnt # 变量趋势一致变化次数 def f_trend_shift(train_badprobs: list, test_badprobs: list) -> int: if len(train_badprobs) != len(test_badprobs) or len(train_badprobs) < 2 or len(test_badprobs) < 2: return 0 train_monto = np.array(train_badprobs[1:]) - np.array(train_badprobs[0:-1]) train_monto = np.where(train_monto >= 0, 1, -1) test_monto = np.array(test_badprobs[1:]) - np.array(test_badprobs[0:-1]) test_monto = np.where(test_monto >= 0, 1, -1) contrast = train_monto - test_monto return len(contrast[contrast != 0]) def f_get_psi(train_bins, test_bins): train_bins['count'] = train_bins['good'] + train_bins['bad'] train_bins['proportion'] = train_bins['count'] / train_bins['count'].sum() test_bins['count'] = test_bins['good'] + test_bins['bad'] test_bins['proportion'] = test_bins['count'] / test_bins['count'].sum() psi = (train_bins['proportion'] - test_bins['proportion']) * np.log( train_bins['proportion'] / test_bins['proportion']) psi = psi.reset_index() psi = psi.rename(columns={"proportion": "psi"}) return psi["psi"].sum().round(3) def f_get_corr(data: pd.DataFrame, meth: str = 'spearman') -> pd.DataFrame: return data.corr(method=meth) def f_get_vif(data: pd.DataFrame) -> Union[pd.DataFrame, None]: if len(data.columns.to_list()) <= 1: return None vif_v = [round(vif(data.values, data.columns.get_loc(i)), 3) for i in data.columns] df_vif = pd.DataFrame() df_vif["变量"] = [column.replace("_woe", "") for column in data.columns] df_vif['vif'] = vif_v return df_vif def f_woebin_load(path: str): if os.path.isdir(path): path = os.path.join(path, FileEnum.FEATURE.value) if not os.path.isfile(path) or FileEnum.FEATURE.value not in path: raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"特征信息【{FileEnum.FEATURE.value}】不存在") df_woebin = pd.read_csv(path) variables = df_woebin["variable"].unique().tolist() sc_woebin = {} for variable in variables: sc_woebin[variable] = df_woebin[df_woebin["variable"] == variable] print(f"feature load from【{path}】success.") return sc_woebin def f_get_var_mapping(df_bins, df_card, model_name="", model_desc="", columns_anns={}) -> pd.DataFrame: def _get_bin_opt(bin: str): is_num = 0 bin = str(bin) rst = { "LEFT_OP": "", "LEFT_VALUE": "", "RIGHT_OP": "", "RIGHT_VALUE": "", } # 数值型 if "," in bin and ("[" in bin or "]" in bin or "(" in bin or ")" in bin): is_num = 1 left = bin.split(",")[0] if "-inf" not in left: rst["LEFT_VALUE"] = left[1:] rst["LEFT_OP"] = ">" if "[" in left: rst["LEFT_OP"] = ">=" right = bin.split(",")[1] if "inf" not in right: rst["RIGHT_VALUE"] = right[:-1] rst["RIGHT_OP"] = "<" if "]" in right: rst["LEFT_OP"] = "<=" else: # 字符型 e = bin.split("%,%") if len(e) == 1: rst["LEFT_VALUE"] = e[0] if f_is_number(e[0]): is_num = 1 else: rst["LEFT_VALUE"] = json.dumps(e, ensure_ascii=False) return rst, is_num rows = [] binning_id_dict = {} for _, row_bin in df_bins.iterrows(): variable = row_bin["variable"] binning_id = binning_id_dict.get(variable, 1) bin_opt, is_num = _get_bin_opt(row_bin["bin"]) var_info = { "MODEL_NAME": model_name, "MODEL_DESC": model_desc, "VERSION": 1, "VAR_NAME": variable, "VAR_DESC": columns_anns.get(variable, ""), "BINNING_ID": binning_id, "IS_NUM": is_num, "VAR_WOE": df_card[(df_card["variable"] == variable) & (df_card["bin"] == row_bin["bin"])][ 'points'].values[0], "VAR_WEIGHT": 1, "VAR_IV": round(row_bin["total_iv"], 3), "BINNING_PARTION": round(row_bin["count_distr"], 3), } var_info.update(bin_opt) rows.append(var_info) binning_id_dict[variable] = binning_id + 1 rows.append({ "MODEL_NAME": model_name, "MODEL_DESC": model_desc, "VERSION": 1, "VAR_NAME": "INTERCEPT", "VAR_DESC": "截距", "BINNING_ID": 0, "IS_NUM": 1, "LEFT_OP": "", "LEFT_VALUE": "", "RIGHT_OP": "", "RIGHT_VALUE": "", "VAR_WOE": "", "VAR_WEIGHT": 0, "VAR_IV": "", "BINNING_PARTION": "", }) df_var_mapping = pd.DataFrame( columns=["MODEL_NAME", "MODEL_DESC", "VERSION", "VAR_NAME", "VAR_DESC", "BINNING_ID", "IS_NUM", "LEFT_OP", "LEFT_VALUE", "RIGHT_OP", "RIGHT_VALUE", "VAR_WOE", "VAR_WEIGHT", "VAR_IV", "BINNING_PARTION"], data=rows ) return df_var_mapping