# -*- coding: utf-8 -*- """ @author: yq @time: 2025/2/27 @desc: """ import os from os.path import dirname, realpath from typing import Dict, List import joblib import pandas as pd import scorecardpy as sc import xgboost as xgb from pandas.core.dtypes.common import is_numeric_dtype from pypmml import Model from sklearn.preprocessing import OneHotEncoder from sklearn2pmml import PMMLPipeline, sklearn2pmml from sklearn_pandas import DataFrameMapper from tqdm import tqdm from commom import GeneralException, f_image_crop_white_borders, f_df_to_image, f_display_title, \ f_display_images_by_side, silent_print, df_print_nolimit from entitys import DataSplitEntity, OnlineLearningConfigEntity, MetricFucResultEntity from enums import ResultCodesEnum, ConstantEnum, FileEnum from init import init from model import f_get_model_score_bin, f_calcu_model_ks, f_stress_test, f_calcu_model_psi, Xtransformer_fit, \ Xtransform, fit init() class OnlineLearningTrainerXgb: def __init__(self, data: DataSplitEntity = None, ol_config: OnlineLearningConfigEntity = None, *args, **kwargs): # 覆写方法 PMMLPipeline.Xtransformer_fit = Xtransformer_fit PMMLPipeline.Xtransform = Xtransform PMMLPipeline.fit = fit if ol_config is not None: self._ol_config = ol_config else: self._ol_config = OnlineLearningConfigEntity(*args, **kwargs) self._data = data self._df_param_optimized = None self._model_optimized_list = [] self._pipeline_original: PMMLPipeline self._pipeline_optimized: PMMLPipeline self.model_optimized: xgb.XGBClassifier # 报告模板 self._template_path = os.path.join(dirname(dirname(realpath(__file__))), "./template/OnlineLearning报告模板_xgb.docx") self._init(self._ol_config.path_resources) def _init(self, path: str): if not os.path.isdir(path): raise GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"【{path}】不是文件夹") path_model = os.path.join(path, FileEnum.PIPELINE_XGB.value) if not os.path.isfile(path_model): raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在") self._pipeline_original = joblib.load(path_model) self._pipeline_optimized = joblib.load(path_model) print(f"model load from【{path_model}】success.") path_model = os.path.join(path, FileEnum.MODEL_XGB.value) if os.path.isfile(path_model): model = xgb.XGBClassifier() model.load_model(path_model) self._pipeline_optimized.steps[-1] = ("classifier", model) print(f"model load from【{path_model}】success.") def _f_rewrite_pmml(self, path_pmml: str): with open(path_pmml, mode="r", encoding="utf-8") as f: pmml = f.read() pmml = pmml.replace('optype="categorical" dataType="double"', 'optype="categorical" dataType="string"') with open(path_pmml, mode="w", encoding="utf-8") as f: f.write(pmml) f.flush() def _f_get_best_model(self, df_param: pd.DataFrame, ntree: int = None): if ntree is None: df_param_sort = df_param.sort_values(by=["ks_test", "auc_test"], ascending=[False, False]) print(f"选择最佳参数:\n{df_param_sort.iloc[0].to_dict()}") self._train(int(df_param_sort.iloc[0][5])) else: print(f"选择ntree:【{ntree}】的参数:\n{df_param[df_param['ntree'] == ntree].iloc[0].to_dict()}") self._train(ntree) if self._ol_config.save_pmml: data = self._data.data path_pmml = self._ol_config.f_get_save_path(FileEnum.PMML_XGB.value) # pipeline = make_pmml_pipeline(self.model) sklearn2pmml(self._pipeline_optimized, path_pmml, with_repr=True, ) self._f_rewrite_pmml(path_pmml) print(f"model save to【{path_pmml}】success. ") # pmml与原生模型结果一致性校验 model_pmml = Model.fromFile(path_pmml) prob_pmml = model_pmml.predict(data)["probability(1)"] with silent_print(): prob_pipeline = self._pipeline_optimized.predict_proba(data)[:, 1] diff = pd.DataFrame() diff["prob_pmml"] = prob_pmml diff["prob_pipeline"] = prob_pipeline diff["diff"] = diff["prob_pmml"] - diff["prob_pipeline"] diff["diff_format"] = diff["diff"].apply(lambda x: 1 if abs(x) < 0.001 else 0) print(f"pmml模型结果一致率(误差小于0.001):{(diff['diff_format'].sum() / len(diff)).round(3) * 100}%") def _f_get_metric_auc_ks(self, model_type: str): def _get_auc_ks(data, title): y = data[self._ol_config.y_column] y_prob = self.prob(data, model) perf = sc.perf_eva(y, y_prob, title=f"{title}", show_plot=True) path = self._ol_config.f_get_save_path(f"perf_{title}.png") perf["pic"].savefig(path) auc = perf["AUC"] ks = perf["KS"] f_image_crop_white_borders(path, path) return auc, ks, path train_data = self._data.train_data test_data = self._data.test_data data = self._data.data model = self._pipeline_optimized if model_type != "新模型": model = self._pipeline_original img_path_auc_ks = [] auc, ks, path = _get_auc_ks(data, f"{model_type}-建模数据") img_path_auc_ks.append(path) train_auc, train_ks, path = _get_auc_ks(train_data, f"{model_type}-训练集") img_path_auc_ks.append(path) test_auc, test_ks, path = _get_auc_ks(test_data, f"{model_type}-测试集") img_path_auc_ks.append(path) df_auc_ks = pd.DataFrame() df_auc_ks["样本集"] = ["建模数据", "训练集", "测试集"] df_auc_ks["AUC"] = [auc, train_auc, test_auc] df_auc_ks["KS"] = [ks, train_ks, test_ks] return MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks, image_size=5, table_font_size=10) def _f_get_metric_gain(self, model_type: str): y_column = self._ol_config.y_column data = self._data.data model = self._pipeline_optimized if model_type != "新模型": model = self._pipeline_original score = self.prob(data, model) score_bin, _ = f_get_model_score_bin(data, score) gain = f_calcu_model_ks(score_bin, y_column, sort_ascending=False) img_path_gain = self._ol_config.f_get_save_path(f"{model_type}-gain.png") f_df_to_image(gain, img_path_gain) return MetricFucResultEntity(table=gain, image_path=img_path_gain) def _f_get_stress_test(self, ): stress_sample_times = self._ol_config.stress_sample_times stress_bad_rate_list = self._ol_config.stress_bad_rate_list y_column = self._ol_config.y_column data = self._data.data score = self.prob(data, self._pipeline_optimized) score_bin, _ = f_get_model_score_bin(data, score) df_stress = f_stress_test(score_bin, sample_times=stress_sample_times, bad_rate_list=stress_bad_rate_list, target_column=y_column, score_column=ConstantEnum.SCORE.value, sort_ascending=False) img_path_stress = self._ol_config.f_get_save_path(f"stress.png") f_df_to_image(df_stress, img_path_stress) return MetricFucResultEntity(table=df_stress, image_path=img_path_stress) def prob(self, x: pd.DataFrame, pipeline=None, ntree_limit=None): if pipeline is None: pipeline = self._pipeline_optimized with silent_print(): y_prob = pipeline.predict_proba(x, ntree_limit=ntree_limit)[:, 1] return y_prob def psi(self, x1: pd.DataFrame, x2: pd.DataFrame, points: List[float] = None, print_sum=True, ntree_limit=None) -> pd.DataFrame: y1 = self.prob(x1, ntree_limit=ntree_limit) y2 = self.prob(x2, ntree_limit=ntree_limit) x1_score_bin, score_bins = f_get_model_score_bin(x1, y1, points) x2_score_bin, _ = f_get_model_score_bin(x2, y2, score_bins) model_psi = f_calcu_model_psi(x1_score_bin, x2_score_bin, sort_ascending=False) if print_sum: print(f"模型psi: {model_psi['psi'].sum()}") return model_psi def _train(self, n_estimators: int = None): y_column = self._ol_config.y_column train_data = self._data.train_data params_xgb = self._ol_config.params_xgb model_original: xgb.XGBClassifier = self._pipeline_original.steps[-1][1] ntree = model_original.n_estimators if model_original.best_ntree_limit is None else model_original.best_ntree_limit if params_xgb.get("oltype") == "tree_refresh": self.model_optimized = xgb.XGBClassifier( n_estimators=n_estimators if n_estimators else ntree, reg_alpha=params_xgb.get("alpha"), reg_lambda=params_xgb.get("lambda"), importance_type='weight', updater="refresh", process_type="update", refresh_leaf=True, **params_xgb, ) else: # 处理新增的变量 add_columns = params_xgb.get("add_columns") num_columns = [] for x_column in add_columns: if is_numeric_dtype(train_data[x_column]): num_columns.append(x_column) str_columns = [i for i in add_columns if i not in num_columns] mapper_new = [] if len(str_columns) > 0: mapper_new.append((str_columns, OneHotEncoder())) for x_column in num_columns: mapper_new.append((x_column, None)) mapper_new = DataFrameMapper(mapper_new) mapper_new.fit(self._data.data) features_new = mapper_new.features built_features_new = mapper_new.built_features # 合并特征处理器 mapper_old: list = self._pipeline_optimized.steps[0][1] features_old = mapper_old.features features_old.extend(features_new) built_features_old = mapper_old.built_features built_features_old.extend(built_features_new) mapper_old.features = features_old mapper_old.built_features = built_features_old self._pipeline_optimized.steps[0] = ("mapper", mapper_old) # 模型初始化 self.model_optimized = xgb.XGBClassifier( n_estimators=n_estimators if n_estimators else params_xgb.get("num_boost_round"), reg_alpha=params_xgb.get("alpha"), reg_lambda=params_xgb.get("lambda"), importance_type='weight', **params_xgb, ) self._pipeline_optimized.steps[-1] = ("classifier", self.model_optimized) feature_names_old = model_original.get_booster().feature_names data_transform = self._pipeline_optimized.Xtransform(self._data.data) feature_names_new = [f"f{i}" for i in range(data_transform.shape[1])] model_original.get_booster().feature_names = feature_names_new with silent_print(): self._pipeline_optimized.fit(train_data, train_data[y_column], classifier__verbose=False, classifier__xgb_model=model_original.get_booster(), ) model_original.get_booster().feature_names = feature_names_old return ntree def train(self, ): y_column = self._ol_config.y_column params_xgb = self._ol_config.params_xgb train_data = self._data.train_data test_data = self._data.test_data df_param_columns = ["auc_train", "ks_train", "auc_test", "ks_test", "psi", "ntree"] self._df_param_optimized = pd.DataFrame(columns=df_param_columns) ntree = self._train() print(f"原模型一共有【{ntree}】棵树") # 迭代效果回溯 if params_xgb.get("oltype") == "tree_refresh": print("更新原模型模式") iteration_n = ntree else: print("原模型基础上新增树模式") iteration_n = params_xgb.get("num_boost_round") for n in tqdm(range(iteration_n)): if params_xgb.get("oltype") == "tree_refresh": ntree_limit = n + 1 else: ntree_limit = ntree + n + 1 with silent_print(): train_y_prob = self._pipeline_optimized.predict_proba(train_data, ntree_limit=ntree_limit)[:, 1] test_y_prob = self._pipeline_optimized.predict_proba(test_data, ntree_limit=ntree_limit)[:, 1] train_y = train_data[y_column] test_y = test_data[y_column] psi = round(self.psi(train_data, test_data, print_sum=False, ntree_limit=ntree_limit)['psi'].sum(), 3) # auc_test = roc_auc_score(test_y, test_y_prob) # auc_test = round(auc_test, 4) # df = pd.DataFrame({'label': test_y, 'pred': test_y_prob}) # dfkslift = eva_dfkslift(df) # ks_test = round(dfkslift["ks"].max(), 4) perf = sc.perf_eva(train_y, train_y_prob, show_plot=False) auc_train = perf["AUC"] ks_train = perf["KS"] perf = sc.perf_eva(test_y, test_y_prob, show_plot=False) auc_test = perf["AUC"] ks_test = perf["KS"] row = dict(zip(df_param_columns, [auc_train, ks_train, auc_test, ks_test, psi, n + 1])) self._df_param_optimized.loc[len(self._df_param_optimized)] = row def save(self): self._ol_config.config_save() if self._pipeline_optimized is None: GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在") path_model = self._ol_config.f_get_save_path(FileEnum.PIPELINE_XGB.value) joblib.dump(self._pipeline_optimized, path_model) print(f"model save to【{path_model}】success. ") # 在xgb的增量学习下直接保存pipeline会出错,所以这里需要单独保存xgb model,然后进行复原 # path_model = self._ol_config.f_get_save_path(FileEnum.MODEL_XGB.value) # self.model_optimized.save_model(path_model) # print(f"model save to【{path_model}】success. ") @staticmethod def load(path: str): ol_config = OnlineLearningConfigEntity.from_config(path) ol_config._path_resources = path return OnlineLearningTrainerXgb(ol_config=ol_config) def report(self, ntree: int = None): train_data = self._data.train_data test_data = self._data.test_data self._f_get_best_model(self._df_param_optimized, ntree) if self._ol_config.jupyter_print: from IPython import display f_display_title(display, "模型优化过程") with df_print_nolimit(): display.display(self._df_param_optimized) metric_value_dict = {} # 样本分布 metric_value_dict["样本分布"] = MetricFucResultEntity(table=self._data.get_distribution(self._ol_config.y_column), table_font_size=10, table_cell_width=3) # 模型结果对比 metric_value_dict[f"模型结果-新模型"] = self._f_get_metric_auc_ks("新模型") metric_value_dict[f"模型结果-原模型"] = self._f_get_metric_auc_ks("原模型") # 模型分psi model_psi = self.psi(train_data, test_data, print_sum=False) img_path_psi = self._ol_config.f_get_save_path(f"model_psi.png") f_df_to_image(model_psi, img_path_psi) metric_value_dict[f"模型稳定性"] = MetricFucResultEntity(table=model_psi, value=model_psi["psi"].sum().round(3), image_path=img_path_psi) # 分数分箱 metric_value_dict["分数分箱-建模数据-新模型"] = self._f_get_metric_gain("新模型") metric_value_dict["分数分箱-建模数据-原模型"] = self._f_get_metric_gain("原模型") # 压力测试 if self._ol_config.stress_test: metric_value_dict["压力测试"] = self._f_get_stress_test() if self._ol_config.jupyter_print: self.jupyter_print(metric_value_dict) # save_path = self._ol_config.f_get_save_path("OnlineLearning报告.docx") # ReportWord.generate_report(metric_value_dict, self._template_path, save_path=save_path) # print(f"模型报告文件储存路径:{save_path}") def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity]): from IPython import display f_display_title(display, "样本分布") display.display(metric_value_dict["样本分布"].table) f_display_title(display, "模型结果") print(f"原模型") display.display(metric_value_dict["模型结果-原模型"].table) f_display_images_by_side(display, metric_value_dict["模型结果-原模型"].image_path) print(f"新模型") display.display(metric_value_dict["模型结果-新模型"].table) f_display_images_by_side(display, metric_value_dict["模型结果-新模型"].image_path) # 模型psi f_display_title(display, "模型psi") display.display(metric_value_dict["模型稳定性"].table) print(f"模型psi: {metric_value_dict['模型稳定性'].value}") f_display_title(display, "分数分箱") print(f"建模数据上分数分箱") print(f"原模型") display.display(metric_value_dict["分数分箱-建模数据-原模型"].table) print(f"新模型") display.display(metric_value_dict["分数分箱-建模数据-新模型"].table) if "压力测试" in metric_value_dict.keys(): f_display_title(display, "压力测试") display.display(metric_value_dict["压力测试"].table) if __name__ == "__main__": pass