# -*- coding: utf-8 -*- """ @author: yq @time: 2024/12/5 @desc: """ import json import os import shutil import time from typing import List import gradio as gr import pandas as pd from commom import f_read_file from config import BaseConfig from .manager import engine DATA_SUB_DIR = "data" UPLOAD_DATA_PREFIX = "prefix_upload_data_" def _clean_base_dir(data): base_dir = _get_base_dir(data) file_name_list: List[str] = os.listdir(base_dir) for file_name in file_name_list: if file_name in [DATA_SUB_DIR]: continue file_path = os.path.join(base_dir, file_name) if os.path.isdir(file_path): shutil.rmtree(file_path) else: os.remove(file_path) def _check_save_dir(data): project_name = engine.get(data, "project_name") if project_name is None or len(project_name) == 0: raise gr.Error(message='项目名称不能为空', duration=5) return True def _get_prefix_file(save_path, prefix): file_name_list: List[str] = os.listdir(save_path) for file_name in file_name_list: if prefix in file_name: return os.path.join(save_path, file_name) def _get_base_dir(data): project_name = engine.get(data, "project_name") base_dir = os.path.join(BaseConfig.base_dir, project_name) return base_dir def _get_upload_data(data) -> pd.ExcelFile: base_dir = _get_base_dir(data) save_path = os.path.join(base_dir, DATA_SUB_DIR) file_path = _get_prefix_file(save_path, UPLOAD_DATA_PREFIX) excel = pd.ExcelFile(file_path) return excel def _get_upload_data_path(data) ->str: base_dir = _get_base_dir(data) save_path = os.path.join(base_dir, DATA_SUB_DIR) file_path = _get_prefix_file(save_path, UPLOAD_DATA_PREFIX) return file_path def _get_node_func_dict(data) -> dict: node_func_dict_path = _get_save_path(data, BaseConfig.node_map_name) if not os.path.exists(node_func_dict_path): return None with open(node_func_dict_path, mode="r", encoding="utf8") as f: node_func_dict = f.read() node_func_dict = json.loads(node_func_dict) return node_func_dict def f_project_is_exist(data): project_name = engine.get(data, "project_name") if project_name is None or len(project_name) == 0: gr.Warning(message='项目名称不能为空', duration=5) elif os.path.exists(_get_base_dir(data)): gr.Warning(message='项目名称已被使用', duration=5) def _get_save_path(data, file_name: str, sub_dir="", name_prefix=""): base_dir = _get_base_dir(data) save_path = os.path.join(base_dir, sub_dir) os.makedirs(save_path, exist_ok=True) # 有前缀标示的先删除 if name_prefix: file = _get_prefix_file(save_path, name_prefix) if file: os.remove(file) save_path = os.path.join(save_path, name_prefix + os.path.basename(file_name)) return save_path def f_data_upload(data): if not _check_save_dir(data): return None file_data = engine.get(data, "file_data") data_path = _get_save_path(data, file_data.name, DATA_SUB_DIR, UPLOAD_DATA_PREFIX) shutil.copy(file_data.name, data_path) excel = _get_upload_data(data) df = excel.parse(sheet_name=BaseConfig.flow_sheet_name) columns = excel.sheet_names excel.close() return { engine.get_elem_by_id("data_upload"): gr.update(value=df, visible=True), engine.get_elem_by_id("sheet_name"): gr.update(choices=columns, value=BaseConfig.flow_sheet_name), engine.get_elem_by_id("download_demo"): gr.update(visible=False), engine.get_elem_by_id("code_view"): gr.update(value=None, visible=False), engine.get_elem_by_id("download_code"): gr.update(value=None, visible=False), } def f_get_sheet_data(data): sheet_name = engine.get(data, "sheet_name") excel = _get_upload_data(data) df = excel.parse(sheet_name=sheet_name) excel.close() node_func_dict = _get_node_func_dict(data) if node_func_dict is not None: code = f_read_file(_get_save_path(data, node_func_dict[sheet_name])) return { engine.get_elem_by_id("data_upload"): gr.update(value=df), engine.get_elem_by_id("code_view"): gr.update(value=code, label=sheet_name), } return { engine.get_elem_by_id("data_upload"): gr.update(value=df) } def f_download_code(data): file_path = _get_save_path(data, BaseConfig.code_zip_name) if os.path.exists(file_path): return {engine.get_elem_by_id("download_code"): gr.update(value=file_path)} else: raise FileNotFoundError(f"{file_path} not found.") def f_verify_param(data): excel = _get_upload_data(data) columns = excel.sheet_names excel.close() if BaseConfig.flow_sheet_name not in columns: raise gr.Error(message=f'【{BaseConfig.flow_sheet_name}】sheet不能为空', duration=5) return True def f_project_load(data): project_name = engine.get(data, "project_name") if project_name is None or len(project_name) == 0: gr.Warning(message='项目名称不能为空', duration=5) if os.path.exists(_get_base_dir(data)): node_func_dict = _get_node_func_dict(data) if node_func_dict is not None: excel = _get_upload_data(data) df = excel.parse(sheet_name=BaseConfig.flow_sheet_name) columns = excel.sheet_names excel.close() code = f_read_file(_get_save_path(data, node_func_dict[BaseConfig.flow_sheet_name])) code_zip_file_path = _get_save_path(data, BaseConfig.code_zip_name) upload_data_path = _get_upload_data_path(data) return { engine.get_elem_by_id("data_upload"): gr.update(value=df, visible=True), engine.get_elem_by_id("code_view"): gr.update(value=code, visible=True, label=BaseConfig.flow_sheet_name), engine.get_elem_by_id("sheet_name"): gr.update(choices=columns, value=BaseConfig.flow_sheet_name), engine.get_elem_by_id("download_demo"): gr.update(visible=False), engine.get_elem_by_id("download_code"): gr.update(value=code_zip_file_path, visible=True), engine.get_elem_by_id("file_data"): gr.update(value=upload_data_path) } return { engine.get_elem_by_id("data_upload"): gr.update(visible=False), engine.get_elem_by_id("code_view"): gr.update(visible=False), engine.get_elem_by_id("download_code"): gr.update(visible=False), } def f_code_generate(data, progress=gr.Progress(track_tqdm=True)): def _reset_component_state(): return {engine.get_elem_by_id("download_code"): gr.update(visible=False), engine.get_elem_by_id("code_view"): gr.update(visible=False) } progress(0, desc="Starting") all_param = engine.get_all(data) # 校验参数 if not f_verify_param(data): yield _reset_component_state() # 清空储存目录 # _clean_base_dir(data) yield _reset_component_state() progress(0.01) time.sleep(2) excel = None try: excel = _get_upload_data(data) # strategy_parse = StrategyParse(**all_param) # strategy_parse.f_parse_strategy(excel, progress) excel.close() except Exception as msg: if excel is not None: excel.close() yield _reset_component_state() raise gr.Error(message=f"系统错误【{msg}】", duration=5) code_zip_file_path = _get_save_path(data, BaseConfig.code_zip_name) node_func_dict = _get_node_func_dict(data) flow_code = f_read_file(_get_save_path(data, node_func_dict[BaseConfig.flow_sheet_name])) progress(1) yield {engine.get_elem_by_id("generate_progress"): gr.update(value="生成完成"), engine.get_elem_by_id("download_code"): gr.update(value=code_zip_file_path, visible=True), engine.get_elem_by_id("code_view"): gr.update(value=flow_code, label=BaseConfig.flow_sheet_name, visible=True), }