123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- # -*- coding: utf-8 -*-
- """
- @author: yq
- @time: 2024/11/1
- @desc:
- """
- import json
- import os.path
- from os.path import dirname, realpath
- from typing import Dict
- import joblib
- import numpy as np
- import pandas as pd
- import scorecardpy as sc
- import xgboost as xgb
- from pypmml import Model
- from sklearn.preprocessing import OneHotEncoder
- from sklearn2pmml import sklearn2pmml, PMMLPipeline
- from sklearn2pmml.preprocessing import CutTransformer
- from sklearn_pandas import DataFrameMapper
- from commom import f_df_to_image, f_display_images_by_side, GeneralException, f_display_title, \
- f_image_crop_white_borders
- from entitys import MetricFucResultEntity, DataSplitEntity, DataFeatureEntity
- from enums import ResultCodesEnum, ConstantEnum, FileEnum, ContextEnum
- from init import context
- from .model_base import ModelBase
- from .model_utils import f_stress_test, f_calcu_model_ks, f_get_model_score_bin, f_calcu_model_psi
- from .pipeline_xgb_util import fit, Xtransform, Xtransformer_fit
- class ModelXgb(ModelBase):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # 覆写方法
- PMMLPipeline.Xtransformer_fit = Xtransformer_fit
- PMMLPipeline.Xtransform = Xtransform
- PMMLPipeline.fit = fit
- # 报告模板
- self._template_path = os.path.join(dirname(dirname(realpath(__file__))), "./template/模型开发报告模板_xgb.docx")
- self.pipeline: PMMLPipeline
- self.model = xgb.XGBClassifier
- self._test_case = None
- def _f_rewrite_pmml(self, path_pmml: str):
- with open(path_pmml, mode="r", encoding="utf-8") as f:
- pmml = f.read()
- pmml = pmml.replace('optype="categorical" dataType="double"', 'optype="categorical" dataType="string"')
- with open(path_pmml, mode="w", encoding="utf-8") as f:
- f.write(pmml)
- f.flush()
- def get_report_template_path(self):
- return self._template_path
- def train(self, train_data: DataFeatureEntity, test_data: DataFeatureEntity, *args, **kwargs):
- print(f"{'-' * 50}开始训练{'-' * 50}")
- params_xgb = self.ml_config.params_xgb
- y_column = self._ml_config.y_column
- # 选定的变量
- x_columns_selected = context.get(ContextEnum.XGB_COLUMNS_SELECTED)
- # 包含了未选定的变量
- num_columns = context.get(ContextEnum.XGB_COLUMNS_NUM)
- points_dict: dict = context.get(ContextEnum.XGB_POINTS)
- data: DataSplitEntity = kwargs["data"]
- train_data_raw = data.train_data
- test_data_raw = data.test_data
- # xgb原生接口训练
- # dtrain = xgb.DMatrix(data=train_data.data_x, label=train_data.data_y)
- # dtest = xgb.DMatrix(data=test_data.data_x, label=test_data.data_y)
- # self.model = xgb.train(
- # params_xgb,
- # dtrain=dtrain,
- # evals=[(dtrain, 'train'), (dtest, 'test')],
- # num_boost_round=params_xgb.get("num_boost_round"),
- # early_stopping_rounds=params_xgb.get("early_stopping_rounds"),
- # verbose_eval=params_xgb.get("verbose_eval")
- # )
- # xgb二次封装为sklearn接口
- self.model = xgb.XGBClassifier(objective=params_xgb.get("objective"),
- n_estimators=params_xgb.get("num_boost_round"),
- max_depth=params_xgb.get("max_depth"),
- learning_rate=params_xgb.get("learning_rate"),
- random_state=params_xgb.get("random_state"),
- reg_alpha=params_xgb.get("alpha"),
- subsample=params_xgb.get("subsample"),
- colsample_bytree=params_xgb.get("colsample_bytree"),
- importance_type='weight'
- )
- # self.model.fit(X=train_data.data_x, y=train_data.data_y,
- # eval_set=[(train_data.data_x, train_data.data_y), (test_data.data_x, test_data.data_y)],
- # eval_metric=params_xgb.get("eval_metric"),
- # early_stopping_rounds=params_xgb.get("early_stopping_rounds"),
- # verbose=params_xgb.get("verbose_eval"),
- # )
- str_columns_selected = [i for i in x_columns_selected if i not in num_columns]
- mapper = [(str_columns_selected, OneHotEncoder())]
- for column in x_columns_selected:
- if column in str_columns_selected:
- continue
- # 粗分箱
- if column in points_dict.keys():
- points = [-np.inf] + points_dict[column] + [np.inf]
- labels = [ConstantEnum.XGB_BIN_LOWEST.value] + points_dict[column]
- mapper.append((column, CutTransformer(points, right=False, labels=labels)))
- else:
- mapper.append((column, None))
- mapper = DataFrameMapper(mapper)
- self.pipeline = PMMLPipeline([("mapper", mapper), ("classifier", self.model)])
- self.pipeline.Xtransformer_fit(data.data, data.data[y_column])
- self.pipeline.fit(train_data_raw, train_data_raw[y_column],
- classifier__eval_set=[
- (self.pipeline.Xtransform(train_data_raw), train_data_raw[y_column]),
- (self.pipeline.Xtransform(test_data_raw), test_data_raw[y_column])
- ],
- classifier__eval_metric=params_xgb.get("eval_metric"),
- classifier__early_stopping_rounds=params_xgb.get("early_stopping_rounds"),
- classifier__verbose=params_xgb.get("verbose_eval"),
- )
- if params_xgb.get("save_pmml"):
- path_pmml = self.ml_config.f_get_save_path(FileEnum.PMML.value)
- # pipeline = make_pmml_pipeline(self.model)
- sklearn2pmml(self.pipeline, path_pmml, with_repr=True, )
- self._f_rewrite_pmml(path_pmml)
- print(f"model save to【{path_pmml}】success. ")
- # pmml与原生模型结果一致性校验
- model_pmml = Model.fromFile(path_pmml)
- prob_pmml = model_pmml.predict(data.data)["probability(1)"]
- prob_pipeline = self.pipeline.predict_proba(data.data)[:, 1]
- diff = pd.DataFrame()
- diff["prob_pmml"] = prob_pmml
- diff["prob_pipeline"] = prob_pipeline
- diff["diff"] = diff["prob_pmml"] - diff["prob_pipeline"]
- diff["diff_format"] = diff["diff"].apply(lambda x: 1 if abs(x) < 0.001 else 0)
- print(f"pmml模型结果一致率(误差小于0.001):{len(diff) / diff['diff_format'].sum().round(3) * 100}%")
- if params_xgb.get("trees_print"):
- trees = self.model.get_booster().get_dump()
- for i, tree in enumerate(trees):
- if i < self.model.best_ntree_limit:
- print(f"Tree {i}:")
- print(tree)
- def prob(self, x: pd.DataFrame, *args, **kwargs) -> np.array:
- # prob = self.model.predict_proba(x)[:, 1]
- prob = self.pipeline.predict_proba(x)[:, 1]
- return prob
- def score(self, x: pd.DataFrame, *args, **kwargs) -> np.array:
- pass
- def score_rule(self, x: pd.DataFrame, *args, **kwargs) -> np.array:
- pass
- def model_save(self):
- if self.pipeline is None:
- GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在")
- path_model = self.ml_config.f_get_save_path(FileEnum.MODEL.value)
- # self.model.save_model(path_model)
- joblib.dump(self.pipeline, path_model)
- print(f"model save to【{path_model}】success. ")
- path = self.ml_config.f_get_save_path(FileEnum.TEST_CASE.value)
- self._test_case.to_csv(path, encoding="utf-8")
- print(f"test case save to【{path}】success. ")
- def model_load(self, path: str, *args, **kwargs):
- if not os.path.isdir(path):
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"【{path}】不是文件夹")
- path_model = os.path.join(path, FileEnum.MODEL.value)
- if not os.path.isfile(path_model):
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在")
- # self.model = xgb.XGBClassifier()
- # self.model.load_model(path_model)
- self.pipeline = joblib.load(path_model)
- print(f"model load from【{path_model}】success.")
- def train_report(self, data: DataSplitEntity, *args, **kwargs) -> Dict[str, MetricFucResultEntity]:
- def _get_auc_ks(data_y, score, title):
- perf = sc.perf_eva(data_y, score, title=title, show_plot=True)
- path = self.ml_config.f_get_save_path(f"perf_{title}.png")
- perf["pic"].savefig(path)
- auc = perf["AUC"]
- ks = perf["KS"]
- f_image_crop_white_borders(path, path)
- return auc, ks, path
- def _get_perf():
- # 模型ks auc
- img_path_auc_ks = []
- train_score = self.prob(train_data)
- test_score = self.prob(test_data)
- train_auc, train_ks, path = _get_auc_ks(train_data[y_column], train_score, f"train")
- img_path_auc_ks.append(path)
- test_auc, test_ks, path = _get_auc_ks(test_data[y_column], test_score, f"test")
- img_path_auc_ks.append(path)
- df_auc_ks = pd.DataFrame()
- df_auc_ks["样本集"] = ["训练集", "测试集"]
- df_auc_ks["AUC"] = [train_auc, test_auc]
- df_auc_ks["KS"] = [train_ks, test_ks]
- metric_value_dict[f"模型结果"] = MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks,
- image_size=5, table_font_size=10)
- # 评分卡分箱
- train_score_bin, score_bins = f_get_model_score_bin(train_data, train_score)
- train_data_gain = f_calcu_model_ks(train_score_bin, y_column, sort_ascending=False)
- img_path_train_gain = self.ml_config.f_get_save_path(f"train_gain.png")
- f_df_to_image(train_data_gain, img_path_train_gain)
- metric_value_dict[f"训练集分数分箱"] = MetricFucResultEntity(table=train_data_gain,
- image_path=img_path_train_gain)
- test_score_bin, _ = f_get_model_score_bin(test_data, test_score, score_bins)
- test_data_gain = f_calcu_model_ks(test_score_bin, y_column, sort_ascending=False)
- img_path_test_gain = self.ml_config.f_get_save_path(f"test_gain.png")
- f_df_to_image(test_data_gain, img_path_test_gain)
- metric_value_dict[f"测试集分数分箱"] = MetricFucResultEntity(table=test_data_gain,
- image_path=img_path_test_gain)
- # 模型分psi
- model_psi = f_calcu_model_psi(train_score_bin, test_score_bin, sort_ascending=False)
- img_path_psi = self.ml_config.f_get_save_path(f"model_psi.png")
- f_df_to_image(model_psi, img_path_psi)
- metric_value_dict[f"模型稳定性"] = MetricFucResultEntity(table=model_psi,
- value=model_psi["psi"].sum().round(3),
- image_path=img_path_psi)
- return train_score_bin, test_score_bin
- y_column = self._ml_config.y_column
- stress_test = self.ml_config.stress_test
- stress_sample_times = self.ml_config.stress_sample_times
- stress_bad_rate_list = self.ml_config.stress_bad_rate_list
- train_data = data.train_data
- test_data = data.test_data
- metric_value_dict = {}
- metric_value_dict["模型超参数"] = MetricFucResultEntity(
- value=json.dumps(self.model.get_xgb_params(), ensure_ascii=False, indent=2))
- _, test_score_bin = _get_perf()
- # 压力测试
- if stress_test:
- df_stress = f_stress_test(test_score_bin, sample_times=stress_sample_times,
- bad_rate_list=stress_bad_rate_list,
- target_column=y_column, score_column=ConstantEnum.SCORE.value,
- sort_ascending=False)
- img_path_stress = self.ml_config.f_get_save_path(f"stress_test.png")
- f_df_to_image(df_stress, img_path_stress)
- metric_value_dict["压力测试"] = MetricFucResultEntity(table=df_stress, image_path=img_path_stress)
- if self.ml_config.jupyter_print:
- self.jupyter_print(metric_value_dict)
- # 测试案例
- self._test_case = data.test_data.copy()
- test_score = self.prob(test_data)
- self._test_case["score"] = test_score
- return metric_value_dict
- def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity], *args, **kwargs):
- from IPython import display
- f_display_title(display, "模型结果")
- display.display(metric_value_dict["模型结果"].table)
- f_display_images_by_side(display, metric_value_dict["模型结果"].image_path)
- f_display_title(display, "模型超参数")
- print(metric_value_dict["模型超参数"].value)
- # 模型psi
- f_display_title(display, "模型psi")
- display.display(metric_value_dict["模型稳定性"].table)
- print(f"模型psi: {metric_value_dict['模型稳定性'].value}")
- f_display_title(display, "分数分箱")
- print("训练集-分数分箱")
- display.display(metric_value_dict["训练集分数分箱"].table)
- print("测试集-分数分箱")
- display.display(metric_value_dict["测试集分数分箱"].table)
- if "压力测试" in metric_value_dict.keys():
- f_display_title(display, "压力测试")
- display.display(metric_value_dict["压力测试"].table)
- if __name__ == "__main__":
- pass
|