Răsfoiți Sursa

Initial commit

zhusc 5 luni în urmă
părinte
comite
1fecb523ad

+ 14 - 0
user_events/data/__init__.py

@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/10/30
+@desc: 数据加载、加工相关
+"""
+from .loader.data_loader_excel import DataLoaderExcel
+from .loader.data_loader_base import DataLoaderBase
+from .loader.data_loader_mysql import DataLoaderMysql
+
+__all__ = ['DataLoaderBase', 'DataLoaderMysql', 'DataLoaderExcel']
+
+if __name__ == "__main__":
+    pass

+ 9 - 0
user_events/data/insight/__init__.py

@@ -0,0 +1,9 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/11/1
+@desc:  数据统计分析
+"""
+
+if __name__ == "__main__":
+    pass

+ 31 - 0
user_events/data/insight/data_explore.py

@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/11/13
+@desc: 数据探索
+"""
+import pandas as pd
+
+from commom import f_save_train_df
+
+
+class DataExplore():
+
+    def __init__(self, ):
+        pass
+
+    def distribution(self, df: pd.DataFrame) -> pd.DataFrame:
+        """
+        数据分布,缺失率,中位数,众数,偏离度等
+        """
+        pass
+
+    def save(self, df):
+        """
+        数据探索结果固化
+        """
+        f_save_train_df("distribution", df)
+
+
+if __name__ == "__main__":
+    pass

+ 9 - 0
user_events/data/loader/__init__.py

@@ -0,0 +1,9 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/11/1
+@desc:  数据加载相关
+"""
+
+if __name__ == "__main__":
+    pass

+ 24 - 0
user_events/data/loader/data_loader_base.py

@@ -0,0 +1,24 @@
+# -*- coding:utf-8 -*-
+"""
+@author: yq
+@time: 2024/1/2
+@desc: 数据加载基类
+"""
+import abc
+
+import pandas as pd
+
+
+class DataLoaderBase(metaclass=abc.ABCMeta):
+
+    @abc.abstractmethod
+    def get_connect(self):
+        pass
+
+    @abc.abstractmethod
+    def close_connect(self):
+        pass
+
+    @abc.abstractmethod
+    def get_data(self, *args, **kwargs) -> pd.DataFrame:
+        pass

+ 36 - 0
user_events/data/loader/data_loader_excel.py

@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/10/31
+@desc: 
+"""
+import pandas as pd
+
+from commom import get_logger
+from .data_loader_base import DataLoaderBase
+
+logger = get_logger()
+
+
+class DataLoaderExcel(DataLoaderBase):
+    def __init__(self, ):
+        pass
+
+    def get_connect(self):
+        pass
+
+    def close_connect(self):
+        pass
+
+    def get_data(self, file_path: str, sheet_name: str = 0) -> pd.DataFrame:
+        df: pd.DataFrame = pd.read_excel(file_path, sheet_name=sheet_name, index_col=False, dtype=str)
+        columns = df.columns.to_list()
+        columns_new = []
+        for idx, column in enumerate(columns):
+            column = str(column)
+            if idx != 0 and "Unnamed:" in column:
+                columns_new.append(columns_new[-1])
+            else:
+                columns_new.append(column)
+        df.columns = columns_new
+        return df

+ 48 - 0
user_events/data/loader/data_loader_mysql.py

@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/10/31
+@desc: 
+"""
+import pandas as pd
+import pymysql
+
+from commom import get_logger
+from entitys import DbConfigEntity
+from .data_loader_base import DataLoaderBase
+
+logger = get_logger()
+
+
+class DataLoaderMysql(DataLoaderBase):
+    def __init__(self, db_config: DbConfigEntity):
+        self.db_config = db_config
+        self.conn = None
+
+    def get_connect(self):
+        #  TODO 后续改成线程池
+        if self.conn == None:
+            self.conn = pymysql.connect(host=self.db_config.host, port=self.db_config.port, user=self.db_config.user,
+                                        passwd=self.db_config.passwd, db=self.db_config.db)
+        return self.conn
+
+    def close_connect(self):
+        if self.conn != None:
+            try:
+                self.conn.close()
+            except Exception as msg:
+                logger.error("关闭数据库失败:\n" + str(msg))
+            self.conn = None
+
+    def get_data(self, sql: str) -> pd.DataFrame:
+        cursor = self.get_connect().cursor()
+        cursor.execute(sql)
+        sql_results = cursor.fetchall()
+        column_desc = cursor.description
+        # 获取列名
+        columns = [column_desc[i][0] for i in range(len(column_desc))]
+        # 得到的data为二维元组,逐行取出,转化为列表,再转化为df
+        df = pd.DataFrame([list(i) for i in sql_results], columns=columns)
+        cursor.close()
+        self.close_connect()
+        return df

+ 9 - 0
user_events/data/process/__init__.py

@@ -0,0 +1,9 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/11/1
+@desc:  数据处理
+"""
+
+if __name__ == "__main__":
+    pass

+ 39 - 0
user_events/data/process/data_process.py

@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/11/13
+@desc: 数据加工
+"""
+
+import pandas as pd
+
+from commom import f_save_train_df
+from entitys import DataProcessConfigEntity
+
+
+class DataProcess():
+
+    def __init__(self, data_process_config: DataProcessConfigEntity):
+        self._data_process_config = data_process_config
+
+    def data_fill(self, df: pd.DataFrame) -> pd.DataFrame:
+        """
+        数据填充
+        """
+        pass
+
+    def data_filter(self, df: pd.DataFrame) -> pd.DataFrame:
+        """
+        数据筛选,删除缺失率高的特征或样本
+        """
+        pass
+
+    def save(self, df):
+        """
+        加工结果固化
+        """
+        f_save_train_df("distribution", df)
+
+
+if __name__ == "__main__":
+    pass

+ 14 - 0
user_events/db_script/mysql/test.sql

@@ -0,0 +1,14 @@
+CREATE TABLE `t1` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `c1` FLOAT NULL,
+  `c2` FLOAT NULL,
+  `c3` FLOAT NULL,
+  PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='指标测试表';
+
+INSERT INTO test.t1
+(id, c1, c2, c3)
+VALUES(1, 1.0, 2.0, 3.0);
+INSERT INTO test.t1
+(id, c1, c2, c3)
+VALUES(2, 2.0, 3.0, 4.0);

+ 13 - 0
user_events/monitor/__init__.py

@@ -0,0 +1,13 @@
+# -*- coding:utf-8 -*-
+"""
+@author: yq
+@time: 2022/10/24
+@desc: 指标监控
+"""
+
+from .monitor_metric import MonitorMetric
+
+__all__ = ['MonitorMetric']
+
+if __name__ == "__main__":
+    pass

+ 41 - 0
user_events/monitor/monitor_metric.py

@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/11/1
+@desc: 监控报告
+"""
+import threading
+from typing import Dict
+
+from entitys import MonitorMetricConfigEntity, MetricFucEntity
+from .report_generate import Report
+
+
+class MonitorMetric():
+
+    def __init__(self, monitor_metric_config_path: str):
+        self._monitor_metric_config = MonitorMetricConfigEntity.from_config(monitor_metric_config_path)
+        self.lock = threading.Lock()
+        self._metric_value_dict: Dict[str, MetricFucEntity] = {}
+
+    @property
+    def metric_value_dict(self):
+        return self._metric_value_dict
+
+    def _update_metric_value_dict(self, key, value):
+        with self.lock:
+            self._metric_value_dict[key] = value
+
+    #  TODO 多线程计算指标
+    def calculate_metric(self, *args, **kwargs):
+        metric_dict = self._monitor_metric_config.metric_dict
+        for metric_code, metric_clazz in metric_dict.items():
+            metric_value = metric_clazz.calculate(*args, **kwargs)
+            self._update_metric_value_dict(metric_code, metric_value)
+
+    def generate_report(self):
+        Report.generate_report(self._metric_value_dict, self._monitor_metric_config.template_path)
+
+
+if __name__ == "__main__":
+    pass

+ 191 - 0
user_events/monitor/report_generate.py

@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/11/8
+@desc: 
+"""
+import os
+from typing import Dict
+
+from docx import Document
+from docx.enum.table import WD_ALIGN_VERTICAL
+from docx.enum.text import WD_ALIGN_PARAGRAPH
+from docx.oxml import OxmlElement
+from docx.oxml.ns import qn
+from docx.shared import Inches, Cm
+
+from commom import GeneralException, f_get_datetime
+from config import BaseConfig
+from entitys import MetricFucEntity
+from enums import ResultCodesEnum, PlaceholderPrefixEnum
+
+
+class Report():
+
+    @staticmethod
+    def _set_cell_width(cell):
+        text = cell.text
+        if len(text) >= 10:
+            cell.width = Cm(2)
+        elif len(text) >= 15:
+            cell.width = Cm(2.5)
+        elif len(text) >= 25:
+            cell.width = Cm(3)
+        else:
+            cell.width = Cm(1.5)
+
+    @staticmethod
+    def _set_cell_format(cell):
+        cell.paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
+        cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
+
+    @staticmethod
+    def _merge_cell_column(pre_cell, curr_cell):
+        if curr_cell.text == pre_cell.text:
+            column_name = curr_cell.text
+            pre_cell.merge(curr_cell)
+            pre_cell.text = column_name
+            for run in pre_cell.paragraphs[0].runs:
+                run.bold = True
+            Report._set_cell_format(pre_cell)
+            Report._set_cell_width(pre_cell)
+
+    @staticmethod
+    def _set_table_singleBoard(table):
+        # 将table 的所有单元格四个边设置为 0.5 镑, 黑色, 实线
+
+        def _set_table_boarder(table, **kwargs):
+            """
+            Set table`s border
+            Usage:
+            set_table_border(
+                cell,
+                top={"sz": 12, "val": "single", "color": "#FF0000"},
+                bottom={"sz": 12, "color": "#00FF00", "val": "single"},
+                left={"sz": 24, "val": "dashed"},
+                right={"sz": 12, "val": "dashed"},
+            )
+            """
+            borders = OxmlElement('w:tblBorders')
+            for tag in ('bottom', 'top', 'left', 'right', 'insideV', 'insideH'):
+                edge_data = kwargs.get(tag)
+                if edge_data:
+                    any_border = OxmlElement(f'w:{tag}')
+                    for key in ["sz", "val", "color", "space", "shadow"]:
+                        if key in edge_data:
+                            any_border.set(qn(f'w:{key}'), str(edge_data[key]))
+                    borders.append(any_border)
+                    table._tbl.tblPr.append(borders)
+
+        return _set_table_boarder(
+            table,
+            top={"sz": 4, "val": "single", "color": "#000000"},
+            bottom={"sz": 4, "val": "single", "color": "#000000"},
+            left={"sz": 4, "val": "single", "color": "#000000"},
+            right={"sz": 4, "val": "single", "color": "#000000"},
+            insideV={"sz": 4, "val": "single", "color": "#000000"},
+            insideH={"sz": 4, "val": "single", "color": "#000000"}
+        )
+
+    @staticmethod
+    def _get_placeholder(placeholder_prefix_enum: PlaceholderPrefixEnum, metric_code: str):
+        return "{{" + f"{placeholder_prefix_enum.value}{metric_code}" + "}}"
+
+    @staticmethod
+    def _fill_value_placeholder(doc: Document, metric_value_dict: Dict[str, MetricFucEntity]):
+        # 替换指标
+        for paragraph in doc.paragraphs:
+            text = paragraph.text
+            for metric_code, metric_fuc_entity in metric_value_dict.items():
+                placeholder = Report._get_placeholder(PlaceholderPrefixEnum.VALUE, metric_code)
+                metric_value = metric_fuc_entity.value
+                if metric_value is None:
+                    continue
+                text = text.replace(placeholder, metric_value)
+            # 段落中多个runs时执行,最后一个run改成替换好的文本,其他run置空
+            if len(paragraph.runs[:-1]) > 0:
+                for run in paragraph.runs[:-1]:
+                    run.text = ''
+                paragraph.runs[-1].text = text
+
+    @staticmethod
+    def _fill_table_placeholder(doc: Document, metric_value_dict: Dict[str, MetricFucEntity]):
+        # 替换表格
+        for paragraph in doc.paragraphs:
+            for metric_code, metric_fuc_entity in metric_value_dict.items():
+                placeholder = Report._get_placeholder(PlaceholderPrefixEnum.TABLE, metric_code)
+                metric_table = metric_fuc_entity.table
+                if metric_table is None:
+                    continue
+                if not placeholder in paragraph.text:
+                    continue
+                # 清除占位符
+                for run in paragraph.runs:
+                    run.text = run.text.replace(placeholder, "")
+                table = doc.add_table(rows=metric_table.shape[0] + 1, cols=metric_table.shape[1])
+                table.alignment = WD_ALIGN_PARAGRAPH.CENTER
+                paragraph._element.addnext(table._element)
+                # 列名
+                for column_idx, column_name in enumerate(metric_table.columns):
+                    cell = table.cell(0, column_idx)
+                    cell.text = str(column_name)
+                    for run in cell.paragraphs[0].runs:
+                        run.bold = True
+                    Report._set_cell_format(cell)
+                    Report._set_cell_width(cell)
+                    # 合并相同的列名
+                    if column_idx != 0 and BaseConfig.merge_table_column:
+                        pre_cell = table.cell(0, column_idx - 1)
+                        Report._merge_cell_column(pre_cell, cell)
+                # 值
+                for row_idx, row in metric_table.iterrows():
+                    for column_idx, value in enumerate(row):
+                        cell = table.cell(row_idx + 1, column_idx)
+                        cell.text = str(value)
+                        Report._set_cell_format(cell)
+                        Report._set_cell_width(cell)
+                        # 合并第一行数据也为列的情况
+                        if row_idx == 0:
+                            Report._merge_cell_column(table.cell(0, column_idx), cell)
+
+                Report._set_table_singleBoard(table)
+                # 禁止自动调整表格
+                if len(metric_table.columns) <= 12:
+                    table.autofit = False
+
+    @staticmethod
+    def _fill_image_placeholder(doc: Document, metric_value_dict: Dict[str, MetricFucEntity]):
+        # 替换图片
+        for paragraph in doc.paragraphs:
+            for metric_code, metric_fuc_entity in metric_value_dict.items():
+                placeholder = Report._get_placeholder(PlaceholderPrefixEnum.IMAGE, metric_code)
+                image_path = metric_fuc_entity.image_path
+                if image_path is None:
+                    continue
+                if not placeholder in paragraph.text:
+                    continue
+                if not os.path.exists(image_path):
+                    raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"文件【{image_path}】不存在")
+                # 清除占位符
+                for run in paragraph.runs:
+                    if placeholder not in run.text:
+                        continue
+                    run.text = run.text.replace(placeholder, "")
+                    run.add_picture(image_path, width=Inches(6))
+
+    @staticmethod
+    def generate_report(metric_value_dict: Dict[str, MetricFucEntity], template_path: str):
+        if os.path.exists(template_path):
+            doc = Document(template_path)
+        else:
+            raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"监控模板文件【{template_path}】不存在")
+
+        Report._fill_value_placeholder(doc, metric_value_dict)
+        Report._fill_table_placeholder(doc, metric_value_dict)
+        Report._fill_image_placeholder(doc, metric_value_dict)
+        new_path = template_path.replace(".docx", f"{f_get_datetime()}.docx")
+        doc.save(f"./{new_path}")
+
+
+if __name__ == "__main__":
+    pass