# -*- coding: utf-8 -*- """ @author: yq @time: 2024/11/1 @desc: 监控报告 """ import threading from typing import Dict from entitys import MonitorMetricConfigEntity, MetricFucEntity from .report_generate import Report class MonitorMetric(): def __init__(self, monitor_metric_config_path: str): self._monitor_metric_config = MonitorMetricConfigEntity.from_config(monitor_metric_config_path) self.lock = threading.Lock() self._metric_value_dict: Dict[str, MetricFucEntity] = {} @property def metric_value_dict(self): return self._metric_value_dict def _update_metric_value_dict(self, key, value): with self.lock: self._metric_value_dict[key] = value # TODO 多线程计算指标 def calculate_metric(self, *args, **kwargs): metric_dict = self._monitor_metric_config.metric_dict for metric_code, metric_clazz in metric_dict.items(): metric_value = metric_clazz.calculate(*args, **kwargs) self._update_metric_value_dict(metric_code, metric_value) def generate_report(self): Report.generate_report(self._metric_value_dict, self._monitor_metric_config.template_path) if __name__ == "__main__": pass