utils.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2024/12/5
  5. @desc:
  6. """
  7. import os
  8. import shutil
  9. from typing import List
  10. import gradio as gr
  11. import pandas as pd
  12. from config import BaseConfig
  13. from strategy_parse import StrategyParse
  14. from .manager import engine
  15. DATA_SUB_DIR = "data"
  16. UPLOAD_DATA_PREFIX = "prefix_upload_data_"
  17. def _clean_base_dir(data):
  18. base_dir = _get_base_dir(data)
  19. file_name_list: List[str] = os.listdir(base_dir)
  20. for file_name in file_name_list:
  21. if file_name in [DATA_SUB_DIR]:
  22. continue
  23. file_path = os.path.join(base_dir, file_name)
  24. if os.path.isdir(file_path):
  25. shutil.rmtree(file_path)
  26. else:
  27. os.remove(file_path)
  28. def _check_save_dir(data):
  29. project_name = engine.get(data, "project_name")
  30. if project_name is None or len(project_name) == 0:
  31. raise gr.Error(message='项目名称不能为空', duration=5)
  32. return True
  33. def _get_prefix_file(save_path, prefix):
  34. file_name_list: List[str] = os.listdir(save_path)
  35. for file_name in file_name_list:
  36. if prefix in file_name:
  37. return os.path.join(save_path, file_name)
  38. def _get_base_dir(data):
  39. project_name = engine.get(data, "project_name")
  40. base_dir = os.path.join(BaseConfig.base_dir, project_name)
  41. return base_dir
  42. def _get_upload_data(data) -> pd.ExcelFile:
  43. base_dir = _get_base_dir(data)
  44. save_path = os.path.join(base_dir, DATA_SUB_DIR)
  45. file_path = _get_prefix_file(save_path, UPLOAD_DATA_PREFIX)
  46. excel = pd.ExcelFile(file_path)
  47. return excel
  48. def f_project_is_exist(data):
  49. project_name = engine.get(data, "project_name")
  50. if project_name is None or len(project_name) == 0:
  51. gr.Warning(message='项目名称不能为空', duration=5)
  52. elif os.path.exists(_get_base_dir(data)):
  53. gr.Warning(message='项目名称已被使用', duration=5)
  54. def _get_save_path(data, file_name: str, sub_dir="", name_prefix=""):
  55. base_dir = _get_base_dir(data)
  56. save_path = os.path.join(base_dir, sub_dir)
  57. os.makedirs(save_path, exist_ok=True)
  58. # 有前缀标示的先删除
  59. if name_prefix:
  60. file = _get_prefix_file(save_path, name_prefix)
  61. if file:
  62. os.remove(file)
  63. save_path = os.path.join(save_path, name_prefix + os.path.basename(file_name))
  64. return save_path
  65. def f_data_upload(data):
  66. if not _check_save_dir(data):
  67. return
  68. file_data = engine.get(data, "file_data")
  69. data_path = _get_save_path(data, file_data.name, DATA_SUB_DIR, UPLOAD_DATA_PREFIX)
  70. shutil.copy(file_data.name, data_path)
  71. excel = _get_upload_data(data)
  72. df = excel.parse(sheet_name="流程")
  73. columns = excel.sheet_names
  74. excel.close()
  75. return {
  76. engine.get_elem_by_id("data_upload"): gr.update(value=df, visible=True),
  77. engine.get_elem_by_id("sheet_name"): gr.update(choices=columns),
  78. }
  79. def f_get_sheet_data(data):
  80. sheet_name = engine.get(data, "sheet_name")
  81. excel = _get_upload_data(data)
  82. df = excel.parse(sheet_name=sheet_name)
  83. excel.close()
  84. return {
  85. engine.get_elem_by_id("data_upload"): gr.update(value=df, visible=True)
  86. }
  87. def f_download_code(data):
  88. file_path = _get_save_path(data, "code.zip")
  89. if os.path.exists(file_path):
  90. return {engine.get_elem_by_id("download_code"): gr.update(value=file_path)}
  91. else:
  92. raise FileNotFoundError(f"{file_path} not found.")
  93. def f_verify_param(data):
  94. excel = _get_upload_data(data)
  95. columns = excel.sheet_names
  96. excel.close()
  97. if "流程" not in columns:
  98. raise gr.Error(message=f'【流程】sheet不能为空', duration=5)
  99. return True
  100. def f_code_generate(data, progress=gr.Progress(track_tqdm=True)):
  101. def _reset_component_state():
  102. return {engine.get_elem_by_id("download_code"): gr.update(visible=False),
  103. engine.get_elem_by_id("code_view"): gr.update(visible=False)
  104. }
  105. progress(0, desc="Starting")
  106. all_param = engine.get_all(data)
  107. # 清空储存目录
  108. _clean_base_dir(data)
  109. # 校验参数
  110. if not f_verify_param(data):
  111. yield _reset_component_state()
  112. yield _reset_component_state()
  113. progress(0.01)
  114. excel = _get_upload_data(data)
  115. strategy_parse = StrategyParse(**all_param)
  116. strategy_parse.f_parse_strategy(excel, progress)
  117. excel.close()
  118. code_file_path = _get_save_path(data, "code.zip")
  119. progress(1)
  120. yield {engine.get_elem_by_id("generate_progress"): gr.update(value="生成完成"),
  121. engine.get_elem_by_id("download_code"): gr.update(value=code_file_path, visible=True)}