# -*- coding: utf-8 -*- """ @author: yq @time: 2024/12/5 @desc: """ import os import shutil from typing import List import gradio as gr import pandas as pd from config import BaseConfig from strategy_parse import StrategyParse from .manager import engine DATA_SUB_DIR = "data" UPLOAD_DATA_PREFIX = "prefix_upload_data_" def _clean_base_dir(data): base_dir = _get_base_dir(data) file_name_list: List[str] = os.listdir(base_dir) for file_name in file_name_list: if file_name in [DATA_SUB_DIR]: continue file_path = os.path.join(base_dir, file_name) if os.path.isdir(file_path): shutil.rmtree(file_path) else: os.remove(file_path) def _check_save_dir(data): project_name = engine.get(data, "project_name") if project_name is None or len(project_name) == 0: raise gr.Error(message='项目名称不能为空', duration=5) return True def _get_prefix_file(save_path, prefix): file_name_list: List[str] = os.listdir(save_path) for file_name in file_name_list: if prefix in file_name: return os.path.join(save_path, file_name) def _get_base_dir(data): project_name = engine.get(data, "project_name") base_dir = os.path.join(BaseConfig.base_dir, project_name) return base_dir def _get_upload_data(data) -> pd.ExcelFile: base_dir = _get_base_dir(data) save_path = os.path.join(base_dir, DATA_SUB_DIR) file_path = _get_prefix_file(save_path, UPLOAD_DATA_PREFIX) excel = pd.ExcelFile(file_path) return excel def f_project_is_exist(data): project_name = engine.get(data, "project_name") if project_name is None or len(project_name) == 0: gr.Warning(message='项目名称不能为空', duration=5) elif os.path.exists(_get_base_dir(data)): gr.Warning(message='项目名称已被使用', duration=5) def _get_save_path(data, file_name: str, sub_dir="", name_prefix=""): base_dir = _get_base_dir(data) save_path = os.path.join(base_dir, sub_dir) os.makedirs(save_path, exist_ok=True) # 有前缀标示的先删除 if name_prefix: file = _get_prefix_file(save_path, name_prefix) if file: os.remove(file) save_path = os.path.join(save_path, name_prefix + os.path.basename(file_name)) return save_path def f_data_upload(data): if not _check_save_dir(data): return file_data = engine.get(data, "file_data") data_path = _get_save_path(data, file_data.name, DATA_SUB_DIR, UPLOAD_DATA_PREFIX) shutil.copy(file_data.name, data_path) excel = _get_upload_data(data) df = excel.parse(sheet_name="流程") columns = excel.sheet_names excel.close() return { engine.get_elem_by_id("data_upload"): gr.update(value=df, visible=True), engine.get_elem_by_id("sheet_name"): gr.update(choices=columns), } def f_get_sheet_data(data): sheet_name = engine.get(data, "sheet_name") excel = _get_upload_data(data) df = excel.parse(sheet_name=sheet_name) excel.close() return { engine.get_elem_by_id("data_upload"): gr.update(value=df, visible=True) } def f_download_code(data): file_path = _get_save_path(data, "code.zip") if os.path.exists(file_path): return {engine.get_elem_by_id("download_code"): gr.update(value=file_path)} else: raise FileNotFoundError(f"{file_path} not found.") def f_verify_param(data): excel = _get_upload_data(data) columns = excel.sheet_names excel.close() if "流程" not in columns: raise gr.Error(message=f'【流程】sheet不能为空', duration=5) return True def f_code_generate(data, progress=gr.Progress(track_tqdm=True)): def _reset_component_state(): return {engine.get_elem_by_id("download_code"): gr.update(visible=False), engine.get_elem_by_id("code_view"): gr.update(visible=False) } progress(0, desc="Starting") all_param = engine.get_all(data) # 清空储存目录 _clean_base_dir(data) # 校验参数 if not f_verify_param(data): yield _reset_component_state() yield _reset_component_state() progress(0.01) excel = _get_upload_data(data) strategy_parse = StrategyParse(**all_param) strategy_parse.f_parse_strategy(excel, progress) excel.close() code_file_path = _get_save_path(data, "code.zip") progress(1) yield {engine.get_elem_by_id("generate_progress"): gr.update(value="生成完成"), engine.get_elem_by_id("download_code"): gr.update(value=code_file_path, visible=True)}