123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- # -*- coding: utf-8 -*-
- """
- @author: yq
- @time: 2024/11/1
- @desc:
- """
- import os.path
- import pickle
- from os.path import dirname, realpath
- from typing import Dict
- import numpy as np
- import pandas as pd
- import scorecardpy as sc
- import statsmodels.api as sm
- from commom import f_df_to_image, f_display_images_by_side, GeneralException, f_display_title, \
- f_image_crop_white_borders
- from entitys import MetricFucResultEntity, DataSplitEntity, DataFeatureEntity
- from enums import ContextEnum, ResultCodesEnum, ConstantEnum
- from init import context
- from .model_base import ModelBase
- from .model_utils import f_stress_test, f_calcu_model_ks, f_get_model_score_bin, f_calcu_model_psi, f_add_rules
- class ModelLr(ModelBase):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # 报告模板
- self._template_path = os.path.join(dirname(dirname(realpath(__file__))), "./template/模型开发报告模板_lr.docx")
- self.lr = None
- self.card = None
- def get_report_template_path(self):
- return self._template_path
- def train(self, train_data: DataFeatureEntity, test_data: DataFeatureEntity, *args, **kwargs):
- woebin = context.get(ContextEnum.WOEBIN)
- data_x = train_data.data_x.copy()
- # scorecardpy高版本区分了sklearn与statsmodels,为了后面生成评分卡需要,但是又不需要截距,所以常数项置0
- if sc.__version__ > '0.1.9.2':
- data_x["const"] = [0] * len(data_x)
- family = sm.families.Binomial()
- logit = sm.GLM(train_data.data_y, data_x, family=family)
- self.lr = logit.fit()
- # scorecardpy低版本
- if sc.__version__ <= '0.1.9.2':
- self.lr.coef_ = [list(self.lr.summary2().tables[1].loc[:, 'Coef.'])]
- self.lr.intercept_ = [0]
- if len(self.lr.coef_[0]) != len(data_x.columns):
- raise GeneralException(ResultCodesEnum.SYSTEM_ERROR, message=f"lr模型coef系数长度与x_columns长度不一致。")
- self.card = sc.scorecard(woebin, self.lr, data_x.columns, points0=600, pdo=50, odds0=train_data.get_odds0())
- def prob(self, x: pd.DataFrame, *args, **kwargs) -> np.array:
- # scorecardpy高版本
- if sc.__version__ > '0.1.9.2':
- x = x.copy()
- x["const"] = [0] * len(x)
- return np.array(self.lr.predict(x))
- def score(self, x: pd.DataFrame, *args, **kwargs) -> np.array:
- return np.array(sc.scorecard_ply(x, self.card, print_step=0)["score"])
- def score_rule(self, x: pd.DataFrame, *args, **kwargs) -> np.array:
- x[ConstantEnum.SCORE.value] = self.score(x)
- x = f_add_rules(x, self.ml_config.rules)
- return np.array(x[ConstantEnum.SCORE.value])
- def model_save(self):
- if self.lr is None:
- GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在")
- if self.card is None:
- GeneralException(ResultCodesEnum.NOT_FOUND, message=f"card不存在")
- path = self.ml_config.f_get_save_path(f"model.pkl")
- self.lr.save(path)
- print(f"model save to【{path}】success. ")
- df_card = pd.concat(self.card.values())
- path = self.ml_config.f_get_save_path(f"card.csv")
- df_card.to_csv(path)
- print(f"model save to【{path}】success. ")
- def model_load(self, path: str, *args, **kwargs):
- if not os.path.isdir(path):
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"【{path}】不是文件夹")
- path_model = os.path.join(path, "model.pkl")
- if not os.path.isfile(path_model):
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在")
- path_card = os.path.join(path, "card.csv")
- if not os.path.isfile(path_card):
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_card}】不存在")
- with open(path_model, 'rb') as f:
- self.lr = pickle.load(f)
- df_card = pd.read_csv(path_card)
- variables = df_card["variable"].unique().tolist()
- self.card = {}
- for variable in variables:
- self.card[variable] = df_card[df_card["variable"] == variable]
- print(f"model load from【{path}】success.")
- def train_report(self, data: DataSplitEntity, *args, **kwargs) -> Dict[str, MetricFucResultEntity]:
- def _get_auc_ks(data_y, score, title):
- perf = sc.perf_eva(data_y, score, title=title, show_plot=True)
- path = self.ml_config.f_get_save_path(f"perf_{title}.png")
- perf["pic"].savefig(path)
- auc = perf["AUC"]
- ks = perf["KS"]
- f_image_crop_white_borders(path, path)
- return auc, ks, path
- def _get_perf(perf_rule=False):
- # 模型ks auc
- img_path_auc_ks = []
- suffix = ""
- if perf_rule:
- suffix = "-规则"
- train_score = self.score_rule(train_data)
- test_score = self.score_rule(test_data)
- else:
- train_score = self.score(train_data)
- test_score = self.score(test_data)
- train_auc, train_ks, path = _get_auc_ks(train_data[y_column], train_score, f"train{suffix}")
- img_path_auc_ks.append(path)
- test_auc, test_ks, path = _get_auc_ks(test_data[y_column], test_score, f"test{suffix}")
- img_path_auc_ks.append(path)
- df_auc_ks = pd.DataFrame()
- df_auc_ks["样本集"] = ["训练集", "测试集"]
- df_auc_ks["AUC"] = [train_auc, test_auc]
- df_auc_ks["KS"] = [train_ks, test_ks]
- metric_value_dict[f"模型结果{suffix}"] = MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks,
- image_size=5, table_font_size=10)
- # 评分卡分箱
- train_score_bin, score_bins = f_get_model_score_bin(train_data, train_score)
- train_data_gain = f_calcu_model_ks(train_score_bin, y_column, sort_ascending=True)
- img_path_train_gain = self.ml_config.f_get_save_path(f"train_gain{suffix}.png")
- f_df_to_image(train_data_gain, img_path_train_gain)
- metric_value_dict[f"训练集分数分箱{suffix}"] = MetricFucResultEntity(table=train_data_gain,
- image_path=img_path_train_gain)
- test_score_bin, _ = f_get_model_score_bin(test_data, test_score, score_bins)
- test_data_gain = f_calcu_model_ks(test_score_bin, y_column, sort_ascending=True)
- img_path_test_gain = self.ml_config.f_get_save_path(f"test_gain{suffix}.png")
- f_df_to_image(test_data_gain, img_path_test_gain)
- metric_value_dict[f"测试集分数分箱{suffix}"] = MetricFucResultEntity(table=test_data_gain,
- image_path=img_path_test_gain)
- # 模型分psi
- model_psi = f_calcu_model_psi(train_score_bin, test_score_bin)
- img_path_psi = self.ml_config.f_get_save_path(f"model_psi{suffix}.png")
- f_df_to_image(model_psi, img_path_psi)
- metric_value_dict[f"模型稳定性{suffix}"] = MetricFucResultEntity(table=model_psi,
- value=model_psi["psi"].sum().round(3),
- image_path=img_path_psi)
- return train_score_bin, test_score_bin
- y_column = self._ml_config.y_column
- stress_test = self.ml_config.stress_test
- stress_sample_times = self.ml_config.stress_sample_times
- stress_bad_rate_list = self.ml_config.stress_bad_rate_list
- train_data = data.train_data
- test_data = data.test_data
- metric_value_dict = {}
- # 评分卡
- df_card = pd.concat(self.card.values())
- img_path_card = self.ml_config.f_get_save_path(f"card.png")
- f_df_to_image(df_card, img_path_card)
- metric_value_dict["评分卡"] = MetricFucResultEntity(table=df_card, image_path=img_path_card)
- # 模型系数
- coef_table = self.lr.summary().tables[1]
- var_name = coef_table.data[0]
- var_name[0] = "var"
- df_coef = pd.DataFrame(columns=var_name, data=coef_table.data[1:])
- img_path_coef = self.ml_config.f_get_save_path(f"coef.png")
- f_df_to_image(df_coef, img_path_coef)
- metric_value_dict["变量系数"] = MetricFucResultEntity(table=df_coef, image_path=img_path_coef)
- _, test_score_bin = _get_perf()
- if len(self.ml_config.rules) != 0:
- _, test_score_bin = _get_perf(perf_rule=True)
- # 压力测试
- if stress_test:
- df_stress = f_stress_test(test_score_bin, sample_times=stress_sample_times,
- bad_rate_list=stress_bad_rate_list,
- target_column=y_column, score_column=ConstantEnum.SCORE.value)
- img_path_stress = self.ml_config.f_get_save_path(f"stress_test.png")
- f_df_to_image(df_stress, img_path_stress)
- metric_value_dict["压力测试"] = MetricFucResultEntity(table=df_stress, image_path=img_path_stress)
- if self.ml_config.jupyter_print:
- self.jupyter_print(metric_value_dict)
- return metric_value_dict
- def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity], *args, **kwargs):
- from IPython import display
- suffix = "-规则"
- f_display_title(display, "模型结果")
- display.display(metric_value_dict["模型结果"].table)
- f_display_images_by_side(display, metric_value_dict["模型结果"].image_path)
- if len(self.ml_config.rules) != 0:
- print("加入规则后:")
- display.display(metric_value_dict[f"模型结果{suffix}"].table)
- f_display_images_by_side(display, metric_value_dict[f"模型结果{suffix}"].image_path)
- f_display_title(display, "模型变量系数")
- print(self.lr.summary().tables[0])
- display.display(metric_value_dict["变量系数"].table)
- # 模型psi
- f_display_title(display, "模型psi")
- display.display(metric_value_dict["模型稳定性"].table)
- print(f"模型psi: {metric_value_dict['模型稳定性'].value}")
- if len(self.ml_config.rules) != 0:
- print("加入规则后:")
- display.display(metric_value_dict[f"模型稳定性{suffix}"].table)
- print(f"模型psi: {metric_value_dict[f'模型稳定性{suffix}'].value}")
- f_display_title(display, "分数分箱")
- print("训练集-分数分箱")
- display.display(metric_value_dict["训练集分数分箱"].table)
- if len(self.ml_config.rules) != 0:
- print("加入规则后:")
- print(f"训练集-分数分箱")
- display.display(metric_value_dict[f"训练集分数分箱{suffix}"].table)
- print("测试集-分数分箱")
- display.display(metric_value_dict["测试集分数分箱"].table)
- if len(self.ml_config.rules) != 0:
- print("加入规则后:")
- print(f"测试集-分数分箱")
- display.display(metric_value_dict[f"测试集分数分箱{suffix}"].table)
- # 评分卡
- f_display_title(display, "评分卡")
- if len(self.ml_config.rules) != 0:
- print(f"评分卡不包含规则")
- display.display(metric_value_dict["评分卡"].table)
- if "压力测试" in metric_value_dict.keys():
- f_display_title(display, "压力测试")
- display.display(metric_value_dict["压力测试"].table)
- if __name__ == "__main__":
- pass
|