monitor_metric.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2024/11/1
  5. @desc: 监控报告
  6. """
  7. import threading
  8. from typing import Dict
  9. from entitys import MonitorConfigEntity, MetricFucResultEntity
  10. from .report_generate import Report
  11. class MonitorMetric():
  12. def __init__(self, monitor_config_path: str):
  13. self._monitor_config = MonitorConfigEntity.from_config(monitor_config_path)
  14. self.lock = threading.Lock()
  15. self._metric_value_dict: Dict[str, MetricFucResultEntity] = {}
  16. @property
  17. def metric_value_dict(self):
  18. return self._metric_value_dict
  19. def _update_metric_value_dict(self, key, value):
  20. with self.lock:
  21. self._metric_value_dict[key] = value
  22. # TODO 多线程计算指标
  23. def calculate_metric(self, *args, **kwargs):
  24. metric_dict = self._monitor_config.metric_dict
  25. for metric_code, metric_clazz in metric_dict.items():
  26. metric_value = metric_clazz.calculate(*args, **kwargs)
  27. self._update_metric_value_dict(metric_code, metric_value)
  28. def generate_report(self):
  29. Report.generate_report(self._metric_value_dict, self._monitor_config.template_path)
  30. if __name__ == "__main__":
  31. pass