# -*- coding:utf-8 -*- """ @author: isaacqyang @time: 2023/12/28 @desc: """ import json import os import time import lark_oapi as lark from lark_oapi.api.drive.v1 import CreateExportTaskRequest, ExportTask, CreateExportTaskResponse, GetExportTaskRequest, \ GetExportTaskResponse, DownloadExportTaskRequest, DownloadExportTaskResponse from config import BaseConfig def f_doc_export(token:str) -> str: # 飞书在线文档转word app_id = BaseConfig.app_id app_secret = BaseConfig.app_secret word_save_dir = BaseConfig.word_save_dir client = lark.Client.builder() \ .app_id(app_id) \ .app_secret(app_secret) \ .log_level(lark.LogLevel.DEBUG) \ .build() # 构造请求对象 request1: CreateExportTaskRequest = CreateExportTaskRequest.builder() \ .request_body(ExportTask.builder() .file_extension("docx") .token(token) .type("docx") .build()) \ .build() # 发起请求 response1: CreateExportTaskResponse = client.drive.v1.export_task.create(request1) # 处理失败返回 if not response1.success(): lark.logger.error( f"client.drive.v1.export_task.create failed, code: {response1.code}, msg: {response1.msg}, log_id: {response1.get_log_id()}, resp: \n{json.dumps(json.loads(response1.raw.content), indent=4, ensure_ascii=False)}") return # 处理业务结果 lark.logger.info(lark.JSON.marshal(response1.data, indent=4)) ticket = response1.data.ticket time.sleep(5) # 构造请求对象 request2: GetExportTaskRequest = GetExportTaskRequest.builder() \ .ticket(ticket) \ .token(token) \ .build() # 发起请求 response2: GetExportTaskResponse = client.drive.v1.export_task.get(request2) # 处理失败返回 if not response2.success(): lark.logger.error( f"client.drive.v1.export_task.get failed, code: {response2.code}, msg: {response2.msg}, log_id: {response2.get_log_id()}, resp: \n{json.dumps(json.loads(response2.raw.content), indent=4, ensure_ascii=False)}") return # 处理业务结果 lark.logger.info(lark.JSON.marshal(response2.data, indent=4)) file_token = response2.data.result.file_token # 构造请求对象 request3: DownloadExportTaskRequest = DownloadExportTaskRequest.builder() \ .file_token(file_token) \ .build() # 发起请求 response3: DownloadExportTaskResponse = client.drive.v1.export_task.download(request3) # 处理失败返回 if not response3.success(): lark.logger.error( f"client.drive.v1.export_task.download failed, code: {response3.code}, msg: {response3.msg}, log_id: {response3.get_log_id()}") return # 处理业务结果 save_path = os.path.join(word_save_dir, response3.file_name) with open(save_path, "wb") as f: f.write(response3.file.read()) return save_path if __name__ == "__main__": f_doc_export('YKNBdbs10oA3pCxTdnAczcvOnxc')