# -*- coding: utf-8 -*- """ @author: yq @time: 2025/2/27 @desc: """ import os from os.path import dirname, realpath from typing import Dict, List import joblib import pandas as pd import scorecardpy as sc import xgboost as xgb from sklearn2pmml import PMMLPipeline from tqdm import tqdm from commom import GeneralException, f_image_crop_white_borders, f_df_to_image, f_display_title, \ f_display_images_by_side from entitys import DataSplitEntity, OnlineLearningConfigEntity, MetricFucResultEntity from enums import ResultCodesEnum, ConstantEnum, FileEnum from init import init from model import f_get_model_score_bin, f_calcu_model_ks, f_stress_test, f_calcu_model_psi, Xtransformer_fit, \ Xtransform, fit init() class OnlineLearningTrainerXgb: def __init__(self, data: DataSplitEntity = None, ol_config: OnlineLearningConfigEntity = None, *args, **kwargs): # 覆写方法 PMMLPipeline.Xtransformer_fit = Xtransformer_fit PMMLPipeline.Xtransform = Xtransform PMMLPipeline.fit = fit if ol_config is not None: self._ol_config = ol_config else: self._ol_config = OnlineLearningConfigEntity(*args, **kwargs) self._data = data self._df_param_optimized = None self._model_optimized_list = [] self._pipeline_original: PMMLPipeline self._pipeline_optimized: PMMLPipeline # 报告模板 self._template_path = os.path.join(dirname(dirname(realpath(__file__))), "./template/OnlineLearning报告模板_xgb.docx") self._init(self._ol_config.path_resources) def _init(self, path: str): if not os.path.isdir(path): raise GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"【{path}】不是文件夹") path_model = os.path.join(path, FileEnum.MODEL.value) if not os.path.isfile(path_model): raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在") self._pipeline_original = joblib.load(path_model) self._pipeline_optimized = joblib.load(path_model) print(f"model load from【{path_model}】success.") def _f_get_best_model(self, df_param: pd.DataFrame, ntree: int = None): if ntree is None: df_param_sort = df_param.sort_values(by=["ks_test", "auc_test"], ascending=[False, False]) print(f"选择最佳参数:\n{df_param_sort.iloc[0].to_dict()}") self._train(df_param_sort.iloc[0][2]) else: print(f"选择ntree:【{ntree}】的参数:\n{df_param[df_param['ntree'] == ntree].iloc[0].to_dict()}") self._train(ntree) def _f_get_metric_auc_ks(self, model_type: str): def _get_auc_ks(data, title): y = data[self._ol_config.y_column] y_prob = self.prob(data, model) perf = sc.perf_eva(y, y_prob, title=f"{title}", show_plot=True) path = self._ol_config.f_get_save_path(f"perf_{title}.png") perf["pic"].savefig(path) auc = perf["AUC"] ks = perf["KS"] f_image_crop_white_borders(path, path) return auc, ks, path train_data = self._data.train_data test_data = self._data.test_data data = self._data.data model = self._pipeline_optimized if model_type != "新模型": model = self._pipeline_original img_path_auc_ks = [] auc, ks, path = _get_auc_ks(data, f"{model_type}-建模数据") img_path_auc_ks.append(path) train_auc, train_ks, path = _get_auc_ks(train_data, f"{model_type}-训练集") img_path_auc_ks.append(path) test_auc, test_ks, path = _get_auc_ks(test_data, f"{model_type}-测试集") img_path_auc_ks.append(path) df_auc_ks = pd.DataFrame() df_auc_ks["样本集"] = ["建模数据", "训练集", "测试集"] df_auc_ks["AUC"] = [auc, train_auc, test_auc] df_auc_ks["KS"] = [ks, train_ks, test_ks] return MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks, image_size=5, table_font_size=10) def _f_get_metric_gain(self, model_type: str): y_column = self._ol_config.y_column data = self._data.data model = self._pipeline_optimized if model_type != "新模型": model = self._pipeline_original score = self.prob(data, model) score_bin, _ = f_get_model_score_bin(data, score) gain = f_calcu_model_ks(score_bin, y_column, sort_ascending=False) img_path_gain = self._ol_config.f_get_save_path(f"{model_type}-gain.png") f_df_to_image(gain, img_path_gain) return MetricFucResultEntity(table=gain, image_path=img_path_gain) def _f_get_stress_test(self, ): stress_sample_times = self._ol_config.stress_sample_times stress_bad_rate_list = self._ol_config.stress_bad_rate_list y_column = self._ol_config.y_column data = self._data.data score = self.prob(data, self._pipeline_optimized) score_bin, _ = f_get_model_score_bin(data, score) df_stress = f_stress_test(score_bin, sample_times=stress_sample_times, bad_rate_list=stress_bad_rate_list, target_column=y_column, score_column=ConstantEnum.SCORE.value, sort_ascending=False) img_path_stress = self._ol_config.f_get_save_path(f"stress.png") f_df_to_image(df_stress, img_path_stress) return MetricFucResultEntity(table=df_stress, image_path=img_path_stress) def prob(self, x: pd.DataFrame, pipeline=None): if pipeline is None: pipeline = self._pipeline_optimized y_prob = pipeline.predict_proba(x)[:, 1] return y_prob def psi(self, x1: pd.DataFrame, x2: pd.DataFrame, points: List[float] = None) -> pd.DataFrame: y1 = self.prob(x1) y2 = self.prob(x2) x1_score_bin, score_bins = f_get_model_score_bin(x1, y1, points) x2_score_bin, _ = f_get_model_score_bin(x2, y2, score_bins) model_psi = f_calcu_model_psi(x1_score_bin, x2_score_bin, sort_ascending=False) print(f"模型psi: {model_psi['psi'].sum()}") return model_psi def _train(self, n_estimators: int = None): y_column = self._ol_config.y_column train_data = self._data.train_data model_original: xgb.XGBClassifier = self._pipeline_original.steps[-1][1] ntree = model_original.n_estimators if model_original.best_ntree_limit is None else model_original.best_ntree_limit model_optimized = xgb.XGBClassifier( n_estimators=n_estimators if n_estimators else ntree, updater="refresh", process_type="update", refresh_leaf=True, learning_rate=self._ol_config.lr, random_state=self._ol_config.random_state, ) self._pipeline_optimized.steps[-1] = ("classifier", model_optimized) self._pipeline_optimized.fit(train_data, train_data[y_column], classifier__verbose=False, classifier__xgb_model=model_original.get_booster(), ) return ntree def train(self, ): y_column = self._ol_config.y_column test_data = self._data.test_data df_param_columns = ["auc_test", "ks_test", "ntree"] self._df_param_optimized = pd.DataFrame(columns=df_param_columns) ntree = self._train() print(f"原模型一共有【{ntree}】棵树") for n in tqdm(range(ntree)): n = n + 1 test_y_prob = self._pipeline_optimized.predict_proba(test_data, ntree_limit=n)[:, 1] test_y = test_data[y_column] perf = sc.perf_eva(test_y, test_y_prob, show_plot=False) auc_test = perf["AUC"] ks_test = perf["KS"] row = dict(zip(df_param_columns, [auc_test, ks_test, n])) self._df_param_optimized.loc[len(self._df_param_optimized)] = row def save(self): self._ol_config.config_save() if self._pipeline_optimized is None: GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在") path_model = self._ol_config.f_get_save_path(FileEnum.MODEL.value) joblib.dump(self._pipeline_optimized, path_model) print(f"model save to【{path_model}】success. ") @staticmethod def load(path: str): ol_config = OnlineLearningConfigEntity.from_config(path) ol_config._path_resources = path return OnlineLearningTrainerXgb(ol_config=ol_config) def report(self, ntree: int = None): self._f_get_best_model(self._df_param_optimized, ntree) if self._ol_config.jupyter_print: from IPython import display f_display_title(display, "模型优化过程") display.display(self._df_param_optimized) metric_value_dict = {} # 样本分布 metric_value_dict["样本分布"] = MetricFucResultEntity(table=self._data.get_distribution(self._ol_config.y_column), table_font_size=10, table_cell_width=3) # 模型结果对比 metric_value_dict[f"模型结果-新模型"] = self._f_get_metric_auc_ks("新模型") metric_value_dict[f"模型结果-原模型"] = self._f_get_metric_auc_ks("原模型") # 分数分箱 metric_value_dict["分数分箱-建模数据-新模型"] = self._f_get_metric_gain("新模型") metric_value_dict["分数分箱-建模数据-原模型"] = self._f_get_metric_gain("原模型") # 压力测试 if self._ol_config.stress_test: metric_value_dict["压力测试"] = self._f_get_stress_test() if self._ol_config.jupyter_print: self.jupyter_print(metric_value_dict) # save_path = self._ol_config.f_get_save_path("OnlineLearning报告.docx") # ReportWord.generate_report(metric_value_dict, self._template_path, save_path=save_path) # print(f"模型报告文件储存路径:{save_path}") def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity]): from IPython import display f_display_title(display, "样本分布") display.display(metric_value_dict["样本分布"].table) f_display_title(display, "模型结果") print(f"原模型") display.display(metric_value_dict["模型结果-原模型"].table) f_display_images_by_side(display, metric_value_dict["模型结果-原模型"].image_path) print(f"新模型") display.display(metric_value_dict["模型结果-新模型"].table) f_display_images_by_side(display, metric_value_dict["模型结果-新模型"].image_path) f_display_title(display, "分数分箱") print(f"建模数据上分数分箱") print(f"原模型") display.display(metric_value_dict["分数分箱-建模数据-原模型"].table) print(f"新模型") display.display(metric_value_dict["分数分箱-建模数据-新模型"].table) if "压力测试" in metric_value_dict.keys(): f_display_title(display, "压力测试") display.display(metric_value_dict["压力测试"].table) if __name__ == "__main__": pass