#!/usr/bin/env python # -*- coding: utf-8 -*- """ -----------------File Info----------------------- Name: web.py Description: web api support Author: GentleCP Email: me@gentlecp.com Create Date: 2021/6/19 -----------------End----------------------------- """ import json import re import sys import threading import time import uuid from xml.etree.ElementTree import fromstring import requests import uvicorn from fastapi import FastAPI, Response, Request, BackgroundTasks, Body from WXBizMsgCrypt3 import WXBizMsgCrypt from commom import get_logger, request_id_context logger = get_logger() # 加载配置文件 with open('znjd_config.json', 'r') as f: config = json.load(f) # 从配置文件中提取参数 token = config['token'] aeskey = config['aeskey'] corpid = config['corpid'] corpsecret = config['corpsecret'] coze_access_token = config['coze_access_token'] bot_id = config['bot_id'] #port = config['port'] # token = "EcSp"#企业微信应用api信息 # aeskey = "OTZoY8N67kOnGosEpS3jw4Rsjea0Gu6D7X4IWxoYKtY"#企业微信应用api信息 # corpid = "ww5541cfeea51e3188"#企业id # corpsecret = "SbyG25s1LsMsW0nAMiaNprrQIHYrWKQP4f2mNLLDnwE"##api成功后的secret # coze_access_token = "pat_HNBYQOWE5h4r1tzXi8S2PuY4ddoVRH3DpTbE3NsYBjtcWHTYw5ffrVmKPh26hSLW"#豆包access_token # bot_id="7397619068440182793"#豆包机器人id # port = 18090#服务器端口 wxcpt = WXBizMsgCrypt(token, aeskey, corpid) app = FastAPI() # def call_llm(prompt: str, bot_id: str,coze_access_token:str): # req_head = { # "Authorization":f"Bearer {coze_access_token}", # "Content-Type": "application/json", # } # req_data = { # "conversation_id": "123", # "bot_id": bot_id, # "user": "test", # "query": prompt, # "stream": False # } # res = requests.post("https://api.coze.cn/open_api/v2/chat", headers=req_head, json=req_data) # res.raise_for_status() # 检查响应状态码是否为200 # return res.json() def call_llm(user_query: str, bot_id: str,coze_access_token:str,company_name:str): req_head = { "Authorization":f"Bearer {coze_access_token}", "Content-Type": "application/json", } req_data ={ "bot_id": bot_id, "user_id": "123456789", "stream": False, "auto_save_history": True, "additional_messages": [ { "role": "user", "content": user_query, "content_type": "text" } ] } res_create = requests.post(" https://api.coze.cn/v1/conversation/create", headers=req_head) conversation_id = res_create.json()["data"]["id"] res_chat = requests.post(f" https://api.coze.cn/v3/chat?conversation_id={conversation_id}", headers=req_head,json=req_data) chat_id = res_chat.json()["data"]["id"] while True: res_retrieve = requests.get(f" https://api.coze.cn/v3/chat/retrieve?chat_id={chat_id}&conversation_id={conversation_id}", headers=req_head) res_json = res_retrieve.json() # 首先判断网络状态是否为200 if res_retrieve.status_code != 200: logger.error(f"网络状态码失败,错误码:{res_retrieve.status_code }") coze_response = f"网络状态码失败,错误码:{res_retrieve.status_code }" url = "" return coze_response,company_name,url # 判断状态码是否为0 if res_json["code"] != 0 : logger.error(f"API调用失败,错误码:{res_json['code']}") coze_response = f"API调用失败,错误码:{res_json['code']}" url = "" return coze_response,company_name,url # 打印并记录状态 logger.info(res_json["data"]["status"]) status = res_json["data"]["status"] # 检查是否为错误状态 error_statuses = {"failed", "requires_action", "canceled"} if status in error_statuses: error_message = res_json["data"]["last_error"] logger.error(f"对话错误,状态:{status},错误信息:{error_message}") coze_response = f"对话错误,状态:{status},错误信息:{error_message}" url = "" return coze_response,company_name,url # 如果状态为completed,则获取消息 if status == "completed": res_message = requests.get(f"https://api.coze.cn/v3/chat/message/list?chat_id={chat_id}&conversation_id={conversation_id}", headers=req_head) aa = res_message.json() # 假设res_message是已经获取到的响应对象 data = res_message.json()['data'] # 使用列表推导式找到所有type为'answer'的记录,然后取最后一个 last_answer_record = next((record for record in reversed(data) if record['type'] == 'answer'), None) # 如果找到了符合条件的记录,则处理content if last_answer_record: report_link = last_answer_record['content'].replace(" ", "") url_start_index = report_link.find('http') url = report_link[url_start_index:] # coze_response = f'{company_name}的报告已生成:报告下载链接' # coze_response = f'{company_name}报告下载链接' # coze_response = f'{company_name}报告下载链接 - 原始链接:{url}' coze_response = f'[{company_name}报告下载链接]({url}) - 原始链接:[{url}]({url})' else: # 如果没有找到符合条件的记录,则输出提示信息 coze_response = f"{company_name}报告未生成" url = "" # report_link = res_message.json()['data'][-2]['content'].replace(" ", "") # v3 删除图片url中的空格 # coze_response = coze_response['data'][1]['content'].replace(" ", "") # v3 删除图片url中的空格 # 提取URL # 创建HTML链接 return coze_response,company_name,url time.sleep(1) def qiwei_get(): res = requests.get(f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}") qw_access_token = res.json()["access_token"] return qw_access_token def qiwei_post_text(username: str, answer: str,agentid:str): req_data = { "touser": username, "toparty": "", "totag": "", "msgtype": "text", "agentid": agentid, "text": {"content": answer}, "image": { "media_id": "MEDIA_ID" }, "safe": 0, "enable_id_trans": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 } res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data) # print(res.json()) logger.info(res.json()) #return res.json() def qiwei_post_card(username: str, answer: str,agentid:str,company_name:str,url:str): req_data ={ "touser": username, "toparty": "", "totag": "", "msgtype": "textcard", "agentid": agentid, "textcard": { "title": f"报告已生成(请在十分钟内点击下载)", "description": f"