123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- # -*- coding: utf-8 -*-
- """
- @author: yq
- @time: 2024/11/1
- @desc: 模型训练管道
- """
- from typing import List
- import pandas as pd
- from entitys import DataSplitEntity, MlConfigEntity, DataFeatureEntity
- from enums import ConstantEnum
- from feature import FeatureStrategyFactory, FeatureStrategyBase
- from init import init
- from model import ModelBase, ModelFactory, f_add_rules, f_get_model_score_bin, f_calcu_model_psi
- from monitor import ReportWord
- init()
- class Pipeline():
- def __init__(self, ml_config: MlConfigEntity = None, data: DataSplitEntity = None, *args, **kwargs):
- if ml_config is not None:
- self._ml_config = ml_config
- else:
- self._ml_config = MlConfigEntity(*args, **kwargs)
- feature_strategy_clazz = FeatureStrategyFactory.get_strategy(self._ml_config.feature_strategy)
- self._feature_strategy: FeatureStrategyBase = feature_strategy_clazz(self._ml_config)
- model_clazz = ModelFactory.get_model(self._ml_config.model_type)
- self._model: ModelBase = model_clazz(self._ml_config)
- self._data = data
- def train(self, ):
- # 特征筛选
- self._feature_strategy.feature_search(self._data)
- metric_feature = self._feature_strategy.feature_report(self._data)
- # 生成训练数据
- train_data = self._feature_strategy.feature_generate(self._data.train_data)
- train_data = DataFeatureEntity(data_x=train_data, data_y=self._data.train_data[self._ml_config.y_column])
- test_data = self._feature_strategy.feature_generate(self._data.test_data)
- test_data = DataFeatureEntity(data_x=test_data, data_y=self._data.test_data[self._ml_config.y_column])
- self._model.train(train_data, test_data)
- metric_model = self._model.train_report(self._data)
- self.metric_value_dict = {**metric_feature, **metric_model}
- def prob(self, data: pd.DataFrame):
- feature = self._feature_strategy.feature_generate(data)
- prob = self._model.prob(feature)
- return prob
- def score(self, data: pd.DataFrame):
- return self._model.score(data)
- def score_rule(self, data: pd.DataFrame):
- return self._model.score_rule(data)
- def psi(self, x1: pd.DataFrame, x2: pd.DataFrame, points: List[float] = None) -> pd.DataFrame:
- if len(self._ml_config.rules) != 0:
- y1 = self.score_rule(x1)
- y2 = self.score_rule(x2)
- else:
- y1 = self.score(x1)
- y2 = self.score(x2)
- x1_score_bin, score_bins = f_get_model_score_bin(x1, y1, points)
- x2_score_bin, _ = f_get_model_score_bin(x2, y2, score_bins)
- model_psi = f_calcu_model_psi(x1_score_bin, x2_score_bin)
- print(f"模型psi: {model_psi['psi'].sum()}")
- return model_psi
- def report(self, ):
- save_path = self._ml_config.f_get_save_path("模型报告.docx")
- ReportWord.generate_report(self.metric_value_dict, self._model.get_report_template_path(), save_path=save_path)
- print(f"模型报告文件储存路径:{save_path}")
- def save(self):
- self._ml_config.config_save()
- self._feature_strategy.feature_save()
- self._model.model_save()
- @staticmethod
- def load(path: str):
- ml_config = MlConfigEntity.from_config(path)
- pipeline = Pipeline(ml_config=ml_config)
- pipeline._feature_strategy.feature_load(path)
- pipeline._model.model_load(path)
- return pipeline
- def variable_analyse(self, column: str, format_bin=None):
- self._feature_strategy.variable_analyse(self._data, column, format_bin)
- def rules_test(self, ):
- rules = self._ml_config.rules
- df = self._data.train_data.copy()
- df[ConstantEnum.SCORE.value] = [0] * len(df)
- f_add_rules(df, rules)
- if __name__ == "__main__":
- pass
|