|
@@ -0,0 +1,267 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+@author: yq
|
|
|
+@time: 2025/2/27
|
|
|
+@desc:
|
|
|
+"""
|
|
|
+import os
|
|
|
+from os.path import dirname, realpath
|
|
|
+from typing import Dict, List
|
|
|
+
|
|
|
+import joblib
|
|
|
+import pandas as pd
|
|
|
+import scorecardpy as sc
|
|
|
+import xgboost as xgb
|
|
|
+from sklearn2pmml import PMMLPipeline
|
|
|
+from tqdm import tqdm
|
|
|
+
|
|
|
+from commom import GeneralException, f_image_crop_white_borders, f_df_to_image, f_display_title, \
|
|
|
+ f_display_images_by_side
|
|
|
+from entitys import DataSplitEntity, OnlineLearningConfigEntity, MetricFucResultEntity
|
|
|
+from enums import ResultCodesEnum, ConstantEnum, FileEnum
|
|
|
+from init import init
|
|
|
+from model import f_get_model_score_bin, f_calcu_model_ks, f_stress_test, f_calcu_model_psi, Xtransformer_fit, \
|
|
|
+ Xtransform, fit
|
|
|
+
|
|
|
+init()
|
|
|
+
|
|
|
+
|
|
|
+class OnlineLearningTrainerXgb:
|
|
|
+ def __init__(self, data: DataSplitEntity = None, ol_config: OnlineLearningConfigEntity = None, *args, **kwargs):
|
|
|
+ # 覆写方法
|
|
|
+ PMMLPipeline.Xtransformer_fit = Xtransformer_fit
|
|
|
+ PMMLPipeline.Xtransform = Xtransform
|
|
|
+ PMMLPipeline.fit = fit
|
|
|
+
|
|
|
+ if ol_config is not None:
|
|
|
+ self._ol_config = ol_config
|
|
|
+ else:
|
|
|
+ self._ol_config = OnlineLearningConfigEntity(*args, **kwargs)
|
|
|
+
|
|
|
+ self._data = data
|
|
|
+ self._df_param_optimized = None
|
|
|
+ self._model_optimized_list = []
|
|
|
+ self._pipeline_original: PMMLPipeline
|
|
|
+ self._pipeline_optimized: PMMLPipeline
|
|
|
+
|
|
|
+ # 报告模板
|
|
|
+ self._template_path = os.path.join(dirname(dirname(realpath(__file__))),
|
|
|
+ "./template/OnlineLearning报告模板_xgb.docx")
|
|
|
+
|
|
|
+ self._init(self._ol_config.path_resources)
|
|
|
+
|
|
|
+ def _init(self, path: str):
|
|
|
+ if not os.path.isdir(path):
|
|
|
+ raise GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"【{path}】不是文件夹")
|
|
|
+ path_model = os.path.join(path, FileEnum.MODEL.value)
|
|
|
+ if not os.path.isfile(path_model):
|
|
|
+ raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在")
|
|
|
+
|
|
|
+ self._pipeline_original = joblib.load(path_model)
|
|
|
+ self._pipeline_optimized = joblib.load(path_model)
|
|
|
+ print(f"model load from【{path_model}】success.")
|
|
|
+
|
|
|
+ def _f_get_best_model(self, df_param: pd.DataFrame, ntree: int = None):
|
|
|
+ if ntree is None:
|
|
|
+ df_param_sort = df_param.sort_values(by=["ks_test", "auc_test"], ascending=[False, False])
|
|
|
+ print(f"选择最佳参数:\n{df_param_sort.iloc[0].to_dict()}")
|
|
|
+ self._train(df_param_sort.iloc[0][2])
|
|
|
+ else:
|
|
|
+ print(f"选择ntree:【{ntree}】的参数:\n{df_param[df_param['ntree'] == ntree].iloc[0].to_dict()}")
|
|
|
+ self._train(ntree)
|
|
|
+
|
|
|
+ def _f_get_metric_auc_ks(self, model_type: str):
|
|
|
+ def _get_auc_ks(data, title):
|
|
|
+ y = data[self._ol_config.y_column]
|
|
|
+ y_prob = self.prob(data, model)
|
|
|
+ perf = sc.perf_eva(y, y_prob, title=f"{title}", show_plot=True)
|
|
|
+ path = self._ol_config.f_get_save_path(f"perf_{title}.png")
|
|
|
+ perf["pic"].savefig(path)
|
|
|
+ auc = perf["AUC"]
|
|
|
+ ks = perf["KS"]
|
|
|
+ f_image_crop_white_borders(path, path)
|
|
|
+ return auc, ks, path
|
|
|
+
|
|
|
+ train_data = self._data.train_data
|
|
|
+ test_data = self._data.test_data
|
|
|
+ data = self._data.data
|
|
|
+ model = self._pipeline_optimized
|
|
|
+ if model_type != "新模型":
|
|
|
+ model = self._pipeline_original
|
|
|
+
|
|
|
+ img_path_auc_ks = []
|
|
|
+ auc, ks, path = _get_auc_ks(data, f"{model_type}-建模数据")
|
|
|
+ img_path_auc_ks.append(path)
|
|
|
+ train_auc, train_ks, path = _get_auc_ks(train_data, f"{model_type}-训练集")
|
|
|
+ img_path_auc_ks.append(path)
|
|
|
+ test_auc, test_ks, path = _get_auc_ks(test_data, f"{model_type}-测试集")
|
|
|
+ img_path_auc_ks.append(path)
|
|
|
+
|
|
|
+ df_auc_ks = pd.DataFrame()
|
|
|
+ df_auc_ks["样本集"] = ["建模数据", "训练集", "测试集"]
|
|
|
+ df_auc_ks["AUC"] = [auc, train_auc, test_auc]
|
|
|
+ df_auc_ks["KS"] = [ks, train_ks, test_ks]
|
|
|
+
|
|
|
+ return MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks, image_size=5, table_font_size=10)
|
|
|
+
|
|
|
+ def _f_get_metric_gain(self, model_type: str):
|
|
|
+ y_column = self._ol_config.y_column
|
|
|
+ data = self._data.data
|
|
|
+
|
|
|
+ model = self._pipeline_optimized
|
|
|
+ if model_type != "新模型":
|
|
|
+ model = self._pipeline_original
|
|
|
+
|
|
|
+ score = self.prob(data, model)
|
|
|
+ score_bin, _ = f_get_model_score_bin(data, score)
|
|
|
+ gain = f_calcu_model_ks(score_bin, y_column, sort_ascending=False)
|
|
|
+ img_path_gain = self._ol_config.f_get_save_path(f"{model_type}-gain.png")
|
|
|
+ f_df_to_image(gain, img_path_gain)
|
|
|
+
|
|
|
+ return MetricFucResultEntity(table=gain, image_path=img_path_gain)
|
|
|
+
|
|
|
+ def _f_get_stress_test(self, ):
|
|
|
+ stress_sample_times = self._ol_config.stress_sample_times
|
|
|
+ stress_bad_rate_list = self._ol_config.stress_bad_rate_list
|
|
|
+ y_column = self._ol_config.y_column
|
|
|
+ data = self._data.data
|
|
|
+ score = self.prob(data, self._pipeline_optimized)
|
|
|
+ score_bin, _ = f_get_model_score_bin(data, score)
|
|
|
+ df_stress = f_stress_test(score_bin, sample_times=stress_sample_times, bad_rate_list=stress_bad_rate_list,
|
|
|
+ target_column=y_column, score_column=ConstantEnum.SCORE.value, sort_ascending=False)
|
|
|
+
|
|
|
+ img_path_stress = self._ol_config.f_get_save_path(f"stress.png")
|
|
|
+ f_df_to_image(df_stress, img_path_stress)
|
|
|
+ return MetricFucResultEntity(table=df_stress, image_path=img_path_stress)
|
|
|
+
|
|
|
+ def prob(self, x: pd.DataFrame, pipeline=None):
|
|
|
+ if pipeline is None:
|
|
|
+ pipeline = self._pipeline_optimized
|
|
|
+ y_prob = pipeline.predict_proba(x)[:, 1]
|
|
|
+ return y_prob
|
|
|
+
|
|
|
+ def psi(self, x1: pd.DataFrame, x2: pd.DataFrame, points: List[float] = None) -> pd.DataFrame:
|
|
|
+ y1 = self.prob(x1)
|
|
|
+ y2 = self.prob(x2)
|
|
|
+ x1_score_bin, score_bins = f_get_model_score_bin(x1, y1, points)
|
|
|
+ x2_score_bin, _ = f_get_model_score_bin(x2, y2, score_bins)
|
|
|
+ model_psi = f_calcu_model_psi(x1_score_bin, x2_score_bin, sort_ascending=False)
|
|
|
+ print(f"模型psi: {model_psi['psi'].sum()}")
|
|
|
+ return model_psi
|
|
|
+
|
|
|
+ def _train(self, n_estimators: int = None):
|
|
|
+ y_column = self._ol_config.y_column
|
|
|
+ train_data = self._data.train_data
|
|
|
+
|
|
|
+ model_original: xgb.XGBClassifier = self._pipeline_original.steps[-1][1]
|
|
|
+ ntree = model_original.n_estimators if model_original.best_ntree_limit is None else model_original.best_ntree_limit
|
|
|
+ model_optimized = xgb.XGBClassifier(
|
|
|
+ n_estimators=n_estimators if n_estimators else ntree,
|
|
|
+ updater="refresh",
|
|
|
+ process_type="update",
|
|
|
+ refresh_leaf=True,
|
|
|
+ learning_rate=self._ol_config.lr,
|
|
|
+ random_state=self._ol_config.random_state,
|
|
|
+ )
|
|
|
+ self._pipeline_optimized.steps[-1] = ("classifier", model_optimized)
|
|
|
+ self._pipeline_optimized.fit(train_data, train_data[y_column],
|
|
|
+ classifier__verbose=False,
|
|
|
+ classifier__xgb_model=model_original.get_booster(),
|
|
|
+ )
|
|
|
+ return ntree
|
|
|
+
|
|
|
+ def train(self, ):
|
|
|
+ y_column = self._ol_config.y_column
|
|
|
+ test_data = self._data.test_data
|
|
|
+
|
|
|
+ df_param_columns = ["auc_test", "ks_test", "ntree"]
|
|
|
+ self._df_param_optimized = pd.DataFrame(columns=df_param_columns)
|
|
|
+ ntree = self._train()
|
|
|
+ print(f"原模型一共有【{ntree}】棵树")
|
|
|
+ for n in tqdm(range(ntree)):
|
|
|
+ n = n + 1
|
|
|
+ test_y_prob = self._pipeline_optimized.predict_proba(test_data, ntree_limit=n)[:, 1]
|
|
|
+ test_y = test_data[y_column]
|
|
|
+ perf = sc.perf_eva(test_y, test_y_prob, show_plot=False)
|
|
|
+ auc_test = perf["AUC"]
|
|
|
+ ks_test = perf["KS"]
|
|
|
+ row = dict(zip(df_param_columns, [auc_test, ks_test, n]))
|
|
|
+ self._df_param_optimized.loc[len(self._df_param_optimized)] = row
|
|
|
+
|
|
|
+ def save(self):
|
|
|
+ self._ol_config.config_save()
|
|
|
+
|
|
|
+ if self._pipeline_optimized is None:
|
|
|
+ GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在")
|
|
|
+
|
|
|
+ path_model = self._ol_config.f_get_save_path(FileEnum.MODEL.value)
|
|
|
+ joblib.dump(self._pipeline_optimized, path_model)
|
|
|
+ print(f"model save to【{path_model}】success. ")
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def load(path: str):
|
|
|
+ ol_config = OnlineLearningConfigEntity.from_config(path)
|
|
|
+ ol_config._path_resources = path
|
|
|
+ return OnlineLearningTrainerXgb(ol_config=ol_config)
|
|
|
+
|
|
|
+ def report(self, ntree: int = None):
|
|
|
+ self._f_get_best_model(self._df_param_optimized, ntree)
|
|
|
+
|
|
|
+ if self._ol_config.jupyter_print:
|
|
|
+ from IPython import display
|
|
|
+ f_display_title(display, "模型优化过程")
|
|
|
+ display.display(self._df_param_optimized)
|
|
|
+
|
|
|
+ metric_value_dict = {}
|
|
|
+
|
|
|
+ # 样本分布
|
|
|
+ metric_value_dict["样本分布"] = MetricFucResultEntity(table=self._data.get_distribution(self._ol_config.y_column),
|
|
|
+ table_font_size=10, table_cell_width=3)
|
|
|
+
|
|
|
+ # 模型结果对比
|
|
|
+ metric_value_dict[f"模型结果-新模型"] = self._f_get_metric_auc_ks("新模型")
|
|
|
+ metric_value_dict[f"模型结果-原模型"] = self._f_get_metric_auc_ks("原模型")
|
|
|
+
|
|
|
+ # 分数分箱
|
|
|
+ metric_value_dict["分数分箱-建模数据-新模型"] = self._f_get_metric_gain("新模型")
|
|
|
+ metric_value_dict["分数分箱-建模数据-原模型"] = self._f_get_metric_gain("原模型")
|
|
|
+
|
|
|
+ # 压力测试
|
|
|
+ if self._ol_config.stress_test:
|
|
|
+ metric_value_dict["压力测试"] = self._f_get_stress_test()
|
|
|
+
|
|
|
+ if self._ol_config.jupyter_print:
|
|
|
+ self.jupyter_print(metric_value_dict)
|
|
|
+
|
|
|
+ # save_path = self._ol_config.f_get_save_path("OnlineLearning报告.docx")
|
|
|
+ # ReportWord.generate_report(metric_value_dict, self._template_path, save_path=save_path)
|
|
|
+ # print(f"模型报告文件储存路径:{save_path}")
|
|
|
+
|
|
|
+ def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity]):
|
|
|
+ from IPython import display
|
|
|
+
|
|
|
+ f_display_title(display, "样本分布")
|
|
|
+ display.display(metric_value_dict["样本分布"].table)
|
|
|
+
|
|
|
+ f_display_title(display, "模型结果")
|
|
|
+ print(f"原模型")
|
|
|
+ display.display(metric_value_dict["模型结果-原模型"].table)
|
|
|
+ f_display_images_by_side(display, metric_value_dict["模型结果-原模型"].image_path)
|
|
|
+ print(f"新模型")
|
|
|
+ display.display(metric_value_dict["模型结果-新模型"].table)
|
|
|
+ f_display_images_by_side(display, metric_value_dict["模型结果-新模型"].image_path)
|
|
|
+
|
|
|
+ f_display_title(display, "分数分箱")
|
|
|
+ print(f"建模数据上分数分箱")
|
|
|
+ print(f"原模型")
|
|
|
+ display.display(metric_value_dict["分数分箱-建模数据-原模型"].table)
|
|
|
+ print(f"新模型")
|
|
|
+ display.display(metric_value_dict["分数分箱-建模数据-新模型"].table)
|
|
|
+
|
|
|
+ if "压力测试" in metric_value_dict.keys():
|
|
|
+ f_display_title(display, "压力测试")
|
|
|
+ display.display(metric_value_dict["压力测试"].table)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ pass
|