# -*- coding: utf-8 -*- """ @author: yq @time: 2024/11/1 @desc: """ import json import os.path import pickle from os.path import dirname, realpath from typing import Dict import numpy as np import pandas as pd import scorecardpy as sc import statsmodels.api as sm from commom import f_df_to_image, f_display_images_by_side, GeneralException, f_display_title, \ f_image_crop_white_borders from entitys import MetricFucResultEntity, DataSplitEntity, DataFeatureEntity from enums import ContextEnum, ResultCodesEnum, ConstantEnum from init import context from .model_base import ModelBase from .model_utils import f_stress_test, f_calcu_model_ks, f_get_model_score_bin, f_calcu_model_psi, f_add_rules class ModelLr(ModelBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 报告模板 self._template_path = os.path.join(dirname(dirname(realpath(__file__))), "./template/模型开发报告模板_lr.docx") self.lr = None self.card = None self.coef = None def get_report_template_path(self): return self._template_path def train(self, train_data: DataFeatureEntity, test_data: DataFeatureEntity, *args, **kwargs): woebin = context.get(ContextEnum.WOEBIN) data_x = train_data.data_x.copy() # scorecardpy高版本区分了sklearn与statsmodels,为了后面生成评分卡需要,但是又不需要截距,所以常数项置0 if sc.__version__ > '0.1.9.2': data_x["const"] = [0] * len(data_x) family = sm.families.Binomial() logit = sm.GLM(train_data.data_y, data_x, family=family) self.lr = logit.fit() # scorecardpy低版本 if sc.__version__ <= '0.1.9.2': self.lr.coef_ = [list(self.lr.summary2().tables[1].loc[:, 'Coef.'])] self.lr.intercept_ = [0] if len(self.lr.coef_[0]) != len(data_x.columns): raise GeneralException(ResultCodesEnum.SYSTEM_ERROR, message=f"lr模型coef系数长度与x_columns长度不一致。") self.card = sc.scorecard(woebin, self.lr, data_x.columns, points0=600, pdo=50, odds0=train_data.get_odds0()) coef_table = self.lr.summary2().tables[1] self.coef = dict(zip(coef_table.index, coef_table['Coef.'])) def prob(self, x: pd.DataFrame, *args, **kwargs) -> np.array: # scorecardpy高版本 if sc.__version__ > '0.1.9.2': x = x.copy() x["const"] = [0] * len(x) return np.array(self.lr.predict(x)) def score(self, x: pd.DataFrame, *args, **kwargs) -> np.array: return np.array(sc.scorecard_ply(x, self.card, print_step=0)["score"]) def score_rule(self, x: pd.DataFrame, *args, **kwargs) -> np.array: x[ConstantEnum.SCORE.value] = self.score(x) x = f_add_rules(x, self.ml_config.rules) return np.array(x[ConstantEnum.SCORE.value]) def model_save(self): if self.lr is None: GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在") if self.card is None: GeneralException(ResultCodesEnum.NOT_FOUND, message=f"card不存在") path = self.ml_config.f_get_save_path(f"model.pkl") self.lr.save(path) print(f"model save to【{path}】success. ") path = self.ml_config.f_get_save_path("coef.dict") with open(path, mode="w", encoding="utf-8") as f: j = json.dumps(self.coef, ensure_ascii=False) f.write(j) print(f"model save to【{path}】success. ") df_card = pd.concat(self.card.values()) path = self.ml_config.f_get_save_path(f"card.csv") df_card.to_csv(path) print(f"model save to【{path}】success. ") def model_load(self, path: str, *args, **kwargs): if not os.path.isdir(path): raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"【{path}】不是文件夹") path_model = os.path.join(path, "model.pkl") if not os.path.isfile(path_model): raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在") path_card = os.path.join(path, "card.csv") if not os.path.isfile(path_card): raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_card}】不存在") with open(path_model, 'rb') as f: self.lr = pickle.load(f) print(f"model load from【{path_model}】success.") df_card = pd.read_csv(path_card) variables = df_card["variable"].unique().tolist() self.card = {} for variable in variables: self.card[variable] = df_card[df_card["variable"] == variable] print(f"model load from【{path_card}】success.") def train_report(self, data: DataSplitEntity, *args, **kwargs) -> Dict[str, MetricFucResultEntity]: def _get_auc_ks(data_y, score, title): perf = sc.perf_eva(data_y, score, title=title, show_plot=True) path = self.ml_config.f_get_save_path(f"perf_{title}.png") perf["pic"].savefig(path) auc = perf["AUC"] ks = perf["KS"] f_image_crop_white_borders(path, path) return auc, ks, path def _get_perf(perf_rule=False): # 模型ks auc img_path_auc_ks = [] suffix = "" if perf_rule: suffix = "-规则" train_score = self.score_rule(train_data) test_score = self.score_rule(test_data) else: train_score = self.score(train_data) test_score = self.score(test_data) train_auc, train_ks, path = _get_auc_ks(train_data[y_column], train_score, f"train{suffix}") img_path_auc_ks.append(path) test_auc, test_ks, path = _get_auc_ks(test_data[y_column], test_score, f"test{suffix}") img_path_auc_ks.append(path) df_auc_ks = pd.DataFrame() df_auc_ks["样本集"] = ["训练集", "测试集"] df_auc_ks["AUC"] = [train_auc, test_auc] df_auc_ks["KS"] = [train_ks, test_ks] metric_value_dict[f"模型结果{suffix}"] = MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks, image_size=5, table_font_size=10) # 评分卡分箱 train_score_bin, score_bins = f_get_model_score_bin(train_data, train_score) train_data_gain = f_calcu_model_ks(train_score_bin, y_column, sort_ascending=True) img_path_train_gain = self.ml_config.f_get_save_path(f"train_gain{suffix}.png") f_df_to_image(train_data_gain, img_path_train_gain) metric_value_dict[f"训练集分数分箱{suffix}"] = MetricFucResultEntity(table=train_data_gain, image_path=img_path_train_gain) test_score_bin, _ = f_get_model_score_bin(test_data, test_score, score_bins) test_data_gain = f_calcu_model_ks(test_score_bin, y_column, sort_ascending=True) img_path_test_gain = self.ml_config.f_get_save_path(f"test_gain{suffix}.png") f_df_to_image(test_data_gain, img_path_test_gain) metric_value_dict[f"测试集分数分箱{suffix}"] = MetricFucResultEntity(table=test_data_gain, image_path=img_path_test_gain) # 模型分psi model_psi = f_calcu_model_psi(train_score_bin, test_score_bin) img_path_psi = self.ml_config.f_get_save_path(f"model_psi{suffix}.png") f_df_to_image(model_psi, img_path_psi) metric_value_dict[f"模型稳定性{suffix}"] = MetricFucResultEntity(table=model_psi, value=model_psi["psi"].sum().round(3), image_path=img_path_psi) return train_score_bin, test_score_bin y_column = self._ml_config.y_column stress_test = self.ml_config.stress_test stress_sample_times = self.ml_config.stress_sample_times stress_bad_rate_list = self.ml_config.stress_bad_rate_list train_data = data.train_data test_data = data.test_data metric_value_dict = {} # 评分卡 df_card = pd.concat(self.card.values()) img_path_card = self.ml_config.f_get_save_path(f"card.png") f_df_to_image(df_card, img_path_card) metric_value_dict["评分卡"] = MetricFucResultEntity(table=df_card, image_path=img_path_card) # 模型系数 coef_table = self.lr.summary().tables[1] var_name = coef_table.data[0] var_name[0] = "var" df_coef = pd.DataFrame(columns=var_name, data=coef_table.data[1:]) img_path_coef = self.ml_config.f_get_save_path(f"coef.png") f_df_to_image(df_coef, img_path_coef) metric_value_dict["变量系数"] = MetricFucResultEntity(table=df_coef, image_path=img_path_coef) _, test_score_bin = _get_perf() if len(self.ml_config.rules) != 0: _, test_score_bin = _get_perf(perf_rule=True) # 压力测试 if stress_test: df_stress = f_stress_test(test_score_bin, sample_times=stress_sample_times, bad_rate_list=stress_bad_rate_list, target_column=y_column, score_column=ConstantEnum.SCORE.value) img_path_stress = self.ml_config.f_get_save_path(f"stress_test.png") f_df_to_image(df_stress, img_path_stress) metric_value_dict["压力测试"] = MetricFucResultEntity(table=df_stress, image_path=img_path_stress) if self.ml_config.jupyter_print: self.jupyter_print(metric_value_dict) return metric_value_dict def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity], *args, **kwargs): from IPython import display suffix = "-规则" f_display_title(display, "模型结果") display.display(metric_value_dict["模型结果"].table) f_display_images_by_side(display, metric_value_dict["模型结果"].image_path) if len(self.ml_config.rules) != 0: print("加入规则后:") display.display(metric_value_dict[f"模型结果{suffix}"].table) f_display_images_by_side(display, metric_value_dict[f"模型结果{suffix}"].image_path) f_display_title(display, "模型变量系数") print(self.lr.summary().tables[0]) display.display(metric_value_dict["变量系数"].table) # 模型psi f_display_title(display, "模型psi") display.display(metric_value_dict["模型稳定性"].table) print(f"模型psi: {metric_value_dict['模型稳定性'].value}") if len(self.ml_config.rules) != 0: print("加入规则后:") display.display(metric_value_dict[f"模型稳定性{suffix}"].table) print(f"模型psi: {metric_value_dict[f'模型稳定性{suffix}'].value}") f_display_title(display, "分数分箱") print("训练集-分数分箱") display.display(metric_value_dict["训练集分数分箱"].table) if len(self.ml_config.rules) != 0: print("加入规则后:") print(f"训练集-分数分箱") display.display(metric_value_dict[f"训练集分数分箱{suffix}"].table) print("测试集-分数分箱") display.display(metric_value_dict["测试集分数分箱"].table) if len(self.ml_config.rules) != 0: print("加入规则后:") print(f"测试集-分数分箱") display.display(metric_value_dict[f"测试集分数分箱{suffix}"].table) # 评分卡 f_display_title(display, "评分卡") if len(self.ml_config.rules) != 0: print(f"评分卡不包含规则") display.display(metric_value_dict["评分卡"].table) if "压力测试" in metric_value_dict.keys(): f_display_title(display, "压力测试") display.display(metric_value_dict["压力测试"].table) if __name__ == "__main__": pass