monitor_metric.py 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2024/11/1
  5. @desc: 监控报告
  6. """
  7. import threading
  8. from typing import Dict
  9. from entitys import MonitorMetricConfigEntity, MetricFucEntity
  10. class MonitorMetric():
  11. def __init__(self, monitor_metric_config_path: str):
  12. self._monitor_metric_config = MonitorMetricConfigEntity.from_config(monitor_metric_config_path)
  13. self.lock = threading.Lock()
  14. self._metric_value_dict: Dict[str, MetricFucEntity] = {}
  15. @property
  16. def metric_value_dict(self):
  17. return self._metric_value_dict
  18. def _update_metric_value_dict(self, key, value):
  19. with self.lock:
  20. self._metric_value_dict[key] = value
  21. # TODO 多线程计算指标
  22. def calculate_metric(self, *args, **kwargs):
  23. metric_dict = self._monitor_metric_config.metric_dict
  24. for metric_code, metric_clazz in metric_dict.items():
  25. metric_value = metric_clazz.calculate(*args, **kwargs)
  26. self._update_metric_value_dict(metric_code, metric_value)
  27. if __name__ == "__main__":
  28. pass