123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- # -*- coding: utf-8 -*-
- """
- @author: yq
- @time: 2025/2/27
- @desc:
- """
- import os
- from os.path import dirname, realpath
- from typing import Dict, List
- import joblib
- import pandas as pd
- import scorecardpy as sc
- import xgboost as xgb
- from sklearn2pmml import PMMLPipeline
- from tqdm import tqdm
- from commom import GeneralException, f_image_crop_white_borders, f_df_to_image, f_display_title, \
- f_display_images_by_side
- from entitys import DataSplitEntity, OnlineLearningConfigEntity, MetricFucResultEntity
- from enums import ResultCodesEnum, ConstantEnum, FileEnum
- from init import init
- from model import f_get_model_score_bin, f_calcu_model_ks, f_stress_test, f_calcu_model_psi, Xtransformer_fit, \
- Xtransform, fit
- init()
- class OnlineLearningTrainerXgb:
- def __init__(self, data: DataSplitEntity = None, ol_config: OnlineLearningConfigEntity = None, *args, **kwargs):
- # 覆写方法
- PMMLPipeline.Xtransformer_fit = Xtransformer_fit
- PMMLPipeline.Xtransform = Xtransform
- PMMLPipeline.fit = fit
- if ol_config is not None:
- self._ol_config = ol_config
- else:
- self._ol_config = OnlineLearningConfigEntity(*args, **kwargs)
- self._data = data
- self._df_param_optimized = None
- self._model_optimized_list = []
- self._pipeline_original: PMMLPipeline
- self._pipeline_optimized: PMMLPipeline
- # 报告模板
- self._template_path = os.path.join(dirname(dirname(realpath(__file__))),
- "./template/OnlineLearning报告模板_xgb.docx")
- self._init(self._ol_config.path_resources)
- def _init(self, path: str):
- if not os.path.isdir(path):
- raise GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"【{path}】不是文件夹")
- path_model = os.path.join(path, FileEnum.MODEL.value)
- if not os.path.isfile(path_model):
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在")
- self._pipeline_original = joblib.load(path_model)
- self._pipeline_optimized = joblib.load(path_model)
- print(f"model load from【{path_model}】success.")
- def _f_get_best_model(self, df_param: pd.DataFrame, ntree: int = None):
- if ntree is None:
- df_param_sort = df_param.sort_values(by=["ks_test", "auc_test"], ascending=[False, False])
- print(f"选择最佳参数:\n{df_param_sort.iloc[0].to_dict()}")
- self._train(df_param_sort.iloc[0][2])
- else:
- print(f"选择ntree:【{ntree}】的参数:\n{df_param[df_param['ntree'] == ntree].iloc[0].to_dict()}")
- self._train(ntree)
- def _f_get_metric_auc_ks(self, model_type: str):
- def _get_auc_ks(data, title):
- y = data[self._ol_config.y_column]
- y_prob = self.prob(data, model)
- perf = sc.perf_eva(y, y_prob, title=f"{title}", show_plot=True)
- path = self._ol_config.f_get_save_path(f"perf_{title}.png")
- perf["pic"].savefig(path)
- auc = perf["AUC"]
- ks = perf["KS"]
- f_image_crop_white_borders(path, path)
- return auc, ks, path
- train_data = self._data.train_data
- test_data = self._data.test_data
- data = self._data.data
- model = self._pipeline_optimized
- if model_type != "新模型":
- model = self._pipeline_original
- img_path_auc_ks = []
- auc, ks, path = _get_auc_ks(data, f"{model_type}-建模数据")
- img_path_auc_ks.append(path)
- train_auc, train_ks, path = _get_auc_ks(train_data, f"{model_type}-训练集")
- img_path_auc_ks.append(path)
- test_auc, test_ks, path = _get_auc_ks(test_data, f"{model_type}-测试集")
- img_path_auc_ks.append(path)
- df_auc_ks = pd.DataFrame()
- df_auc_ks["样本集"] = ["建模数据", "训练集", "测试集"]
- df_auc_ks["AUC"] = [auc, train_auc, test_auc]
- df_auc_ks["KS"] = [ks, train_ks, test_ks]
- return MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks, image_size=5, table_font_size=10)
- def _f_get_metric_gain(self, model_type: str):
- y_column = self._ol_config.y_column
- data = self._data.data
- model = self._pipeline_optimized
- if model_type != "新模型":
- model = self._pipeline_original
- score = self.prob(data, model)
- score_bin, _ = f_get_model_score_bin(data, score)
- gain = f_calcu_model_ks(score_bin, y_column, sort_ascending=False)
- img_path_gain = self._ol_config.f_get_save_path(f"{model_type}-gain.png")
- f_df_to_image(gain, img_path_gain)
- return MetricFucResultEntity(table=gain, image_path=img_path_gain)
- def _f_get_stress_test(self, ):
- stress_sample_times = self._ol_config.stress_sample_times
- stress_bad_rate_list = self._ol_config.stress_bad_rate_list
- y_column = self._ol_config.y_column
- data = self._data.data
- score = self.prob(data, self._pipeline_optimized)
- score_bin, _ = f_get_model_score_bin(data, score)
- df_stress = f_stress_test(score_bin, sample_times=stress_sample_times, bad_rate_list=stress_bad_rate_list,
- target_column=y_column, score_column=ConstantEnum.SCORE.value, sort_ascending=False)
- img_path_stress = self._ol_config.f_get_save_path(f"stress.png")
- f_df_to_image(df_stress, img_path_stress)
- return MetricFucResultEntity(table=df_stress, image_path=img_path_stress)
- def prob(self, x: pd.DataFrame, pipeline=None):
- if pipeline is None:
- pipeline = self._pipeline_optimized
- y_prob = pipeline.predict_proba(x)[:, 1]
- return y_prob
- def psi(self, x1: pd.DataFrame, x2: pd.DataFrame, points: List[float] = None) -> pd.DataFrame:
- y1 = self.prob(x1)
- y2 = self.prob(x2)
- x1_score_bin, score_bins = f_get_model_score_bin(x1, y1, points)
- x2_score_bin, _ = f_get_model_score_bin(x2, y2, score_bins)
- model_psi = f_calcu_model_psi(x1_score_bin, x2_score_bin, sort_ascending=False)
- print(f"模型psi: {model_psi['psi'].sum()}")
- return model_psi
- def _train(self, n_estimators: int = None):
- y_column = self._ol_config.y_column
- train_data = self._data.train_data
- model_original: xgb.XGBClassifier = self._pipeline_original.steps[-1][1]
- ntree = model_original.n_estimators if model_original.best_ntree_limit is None else model_original.best_ntree_limit
- model_optimized = xgb.XGBClassifier(
- n_estimators=n_estimators if n_estimators else ntree,
- updater="refresh",
- process_type="update",
- refresh_leaf=True,
- learning_rate=self._ol_config.lr,
- random_state=self._ol_config.random_state,
- )
- self._pipeline_optimized.steps[-1] = ("classifier", model_optimized)
- self._pipeline_optimized.fit(train_data, train_data[y_column],
- classifier__verbose=False,
- classifier__xgb_model=model_original.get_booster(),
- )
- return ntree
- def train(self, ):
- y_column = self._ol_config.y_column
- test_data = self._data.test_data
- df_param_columns = ["auc_test", "ks_test", "ntree"]
- self._df_param_optimized = pd.DataFrame(columns=df_param_columns)
- ntree = self._train()
- print(f"原模型一共有【{ntree}】棵树")
- for n in tqdm(range(ntree)):
- n = n + 1
- test_y_prob = self._pipeline_optimized.predict_proba(test_data, ntree_limit=n)[:, 1]
- test_y = test_data[y_column]
- perf = sc.perf_eva(test_y, test_y_prob, show_plot=False)
- auc_test = perf["AUC"]
- ks_test = perf["KS"]
- row = dict(zip(df_param_columns, [auc_test, ks_test, n]))
- self._df_param_optimized.loc[len(self._df_param_optimized)] = row
- def save(self):
- self._ol_config.config_save()
- if self._pipeline_optimized is None:
- GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在")
- path_model = self._ol_config.f_get_save_path(FileEnum.MODEL.value)
- joblib.dump(self._pipeline_optimized, path_model)
- print(f"model save to【{path_model}】success. ")
- @staticmethod
- def load(path: str):
- ol_config = OnlineLearningConfigEntity.from_config(path)
- ol_config._path_resources = path
- return OnlineLearningTrainerXgb(ol_config=ol_config)
- def report(self, ntree: int = None):
- self._f_get_best_model(self._df_param_optimized, ntree)
- if self._ol_config.jupyter_print:
- from IPython import display
- f_display_title(display, "模型优化过程")
- display.display(self._df_param_optimized)
- metric_value_dict = {}
- # 样本分布
- metric_value_dict["样本分布"] = MetricFucResultEntity(table=self._data.get_distribution(self._ol_config.y_column),
- table_font_size=10, table_cell_width=3)
- # 模型结果对比
- metric_value_dict[f"模型结果-新模型"] = self._f_get_metric_auc_ks("新模型")
- metric_value_dict[f"模型结果-原模型"] = self._f_get_metric_auc_ks("原模型")
- # 分数分箱
- metric_value_dict["分数分箱-建模数据-新模型"] = self._f_get_metric_gain("新模型")
- metric_value_dict["分数分箱-建模数据-原模型"] = self._f_get_metric_gain("原模型")
- # 压力测试
- if self._ol_config.stress_test:
- metric_value_dict["压力测试"] = self._f_get_stress_test()
- if self._ol_config.jupyter_print:
- self.jupyter_print(metric_value_dict)
- # save_path = self._ol_config.f_get_save_path("OnlineLearning报告.docx")
- # ReportWord.generate_report(metric_value_dict, self._template_path, save_path=save_path)
- # print(f"模型报告文件储存路径:{save_path}")
- def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity]):
- from IPython import display
- f_display_title(display, "样本分布")
- display.display(metric_value_dict["样本分布"].table)
- f_display_title(display, "模型结果")
- print(f"原模型")
- display.display(metric_value_dict["模型结果-原模型"].table)
- f_display_images_by_side(display, metric_value_dict["模型结果-原模型"].image_path)
- print(f"新模型")
- display.display(metric_value_dict["模型结果-新模型"].table)
- f_display_images_by_side(display, metric_value_dict["模型结果-新模型"].image_path)
- f_display_title(display, "分数分箱")
- print(f"建模数据上分数分箱")
- print(f"原模型")
- display.display(metric_value_dict["分数分箱-建模数据-原模型"].table)
- print(f"新模型")
- display.display(metric_value_dict["分数分箱-建模数据-新模型"].table)
- if "压力测试" in metric_value_dict.keys():
- f_display_title(display, "压力测试")
- display.display(metric_value_dict["压力测试"].table)
- if __name__ == "__main__":
- pass
|