#!/usr/bin/env python # -*- coding: utf-8 -*- """ -----------------File Info----------------------- Name: web.py Description: web api support Author: GentleCP Email: me@gentlecp.com Create Date: 2021/6/19 -----------------End----------------------------- """ import json import re import sys import threading import time import uuid from xml.etree.ElementTree import fromstring import requests import uvicorn from fastapi import FastAPI, Response, Request, BackgroundTasks, Body from WXBizMsgCrypt3 import WXBizMsgCrypt from commom import get_logger, request_id_context logger = get_logger() # 加载配置文件 with open('znjd_config.json', 'r') as f: config = json.load(f) # 从配置文件中提取参数 token = config['token'] aeskey = config['aeskey'] corpid = config['corpid'] corpsecret = config['corpsecret'] coze_access_token = config['coze_access_token'] bot_id = config['bot_id'] #port = config['port'] # token = "EcSp"#企业微信应用api信息 # aeskey = "OTZoY8N67kOnGosEpS3jw4Rsjea0Gu6D7X4IWxoYKtY"#企业微信应用api信息 # corpid = "ww5541cfeea51e3188"#企业id # corpsecret = "SbyG25s1LsMsW0nAMiaNprrQIHYrWKQP4f2mNLLDnwE"##api成功后的secret # coze_access_token = "pat_HNBYQOWE5h4r1tzXi8S2PuY4ddoVRH3DpTbE3NsYBjtcWHTYw5ffrVmKPh26hSLW"#豆包access_token # bot_id="7397619068440182793"#豆包机器人id # port = 18090#服务器端口 wxcpt = WXBizMsgCrypt(token, aeskey, corpid) app = FastAPI() # def call_llm(prompt: str, bot_id: str,coze_access_token:str): # req_head = { # "Authorization":f"Bearer {coze_access_token}", # "Content-Type": "application/json", # } # req_data = { # "conversation_id": "123", # "bot_id": bot_id, # "user": "test", # "query": prompt, # "stream": False # } # res = requests.post("https://api.coze.cn/open_api/v2/chat", headers=req_head, json=req_data) # res.raise_for_status() # 检查响应状态码是否为200 # return res.json() def call_llm(user_query: str, bot_id: str,coze_access_token:str,company_name:str): req_head = { "Authorization":f"Bearer {coze_access_token}", "Content-Type": "application/json", } req_data ={ "bot_id": bot_id, "user_id": "123456789", "stream": False, "auto_save_history": True, "additional_messages": [ { "role": "user", "content": user_query, "content_type": "text" } ] } res_create = requests.post(" https://api.coze.cn/v1/conversation/create", headers=req_head) conversation_id = res_create.json()["data"]["id"] res_chat = requests.post(f" https://api.coze.cn/v3/chat?conversation_id={conversation_id}", headers=req_head,json=req_data) chat_id = res_chat.json()["data"]["id"] while True: res_retrieve = requests.get(f" https://api.coze.cn/v3/chat/retrieve?chat_id={chat_id}&conversation_id={conversation_id}", headers=req_head) res_json = res_retrieve.json() # 首先判断网络状态是否为200 if res_retrieve.status_code != 200: logger.error(f"网络状态码失败,错误码:{res_retrieve.status_code }") coze_response = f"网络状态码失败,错误码:{res_retrieve.status_code }" url = "" return coze_response,company_name,url # 判断状态码是否为0 if res_json["code"] != 0 : logger.error(f"API调用失败,错误码:{res_json['code']}") coze_response = f"API调用失败,错误码:{res_json['code']}" url = "" return coze_response,company_name,url # 打印并记录状态 logger.info(res_json["data"]["status"]) status = res_json["data"]["status"] # 检查是否为错误状态 error_statuses = {"failed", "requires_action", "canceled"} if status in error_statuses: error_message = res_json["data"]["last_error"] logger.error(f"对话错误,状态:{status},错误信息:{error_message}") coze_response = f"对话错误,状态:{status},错误信息:{error_message}" url = "" return coze_response,company_name,url # 如果状态为completed,则获取消息 if status == "completed": res_message = requests.get(f"https://api.coze.cn/v3/chat/message/list?chat_id={chat_id}&conversation_id={conversation_id}", headers=req_head) aa = res_message.json() # 假设res_message是已经获取到的响应对象 data = res_message.json()['data'] # 使用列表推导式找到所有type为'answer'的记录,然后取最后一个 last_answer_record = next((record for record in reversed(data) if record['type'] == 'answer'), None) # 如果找到了符合条件的记录,则处理content if last_answer_record: report_link = last_answer_record['content'].replace(" ", "") url_start_index = report_link.find('http') url = report_link[url_start_index:] # coze_response = f'{company_name}的报告已生成:报告下载链接' # coze_response = f'{company_name}报告下载链接' # coze_response = f'{company_name}报告下载链接 - 原始链接:{url}' coze_response = f'[{company_name}报告下载链接]({url}) - 原始链接:[{url}]({url})' else: # 如果没有找到符合条件的记录,则输出提示信息 coze_response = f"{company_name}报告未生成" url = "" # report_link = res_message.json()['data'][-2]['content'].replace(" ", "") # v3 删除图片url中的空格 # coze_response = coze_response['data'][1]['content'].replace(" ", "") # v3 删除图片url中的空格 # 提取URL # 创建HTML链接 return coze_response,company_name,url time.sleep(1) def qiwei_get(): res = requests.get(f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}") qw_access_token = res.json()["access_token"] return qw_access_token def qiwei_post_text(username: str, answer: str,agentid:str): req_data = { "touser": username, "toparty": "", "totag": "", "msgtype": "text", "agentid": agentid, "text": {"content": answer}, "image": { "media_id": "MEDIA_ID" }, "safe": 0, "enable_id_trans": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 } res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data) # print(res.json()) logger.info(res.json()) #return res.json() def qiwei_post_card(username: str, answer: str,agentid:str,company_name:str,url:str): req_data ={ "touser": username, "toparty": "", "totag": "", "msgtype": "textcard", "agentid": agentid, "textcard": { "title": f"报告已生成(请在十分钟内点击下载)", "description": f"
{company_name}报告下载链接
", "url": url, "btntxt": "" }, "enable_id_trans": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 } res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data) # print(res.json()) logger.info(res.json()) #return res.json() #问题传入字节服务器进行回答后发送给企业微信,行内服务器只进行接收然后发给字节,防止网络延迟 def post_consumer_api(user_query, decrypt_data, request_id,company_name): data = { "user_query": user_query, "decrypt_data": decrypt_data, "request_id": request_id, "company_name":company_name, } request_id_context.set(request_id) url = "https://101.126.81.2:18093/consumer" try: logger.info(f"post_consumer_api 被执行{user_query}") t1 = threading.Thread(target=requests.post, kwargs={"url": url, "json": data, "verify": False}) t1.start() logger.info(f"post_consumer_api 请求成功: {t1}") except requests.exceptions.RequestException as e: logger.error(f"post_consumer_api 请求失败: {e}") @app.post("/consumer") def consumer( request_id: str = Body(...), user_query: str = Body(...), decrypt_data: dict = Body(...), company_name: str = Body(...), ): # print(f"请求:{user_query}") # body = await request.body() # body = body.decode() # body = json.loads(body) # request_id = body["request_id"] request_id_context.set(request_id) # user_query = body["user_query"] # company_name = body["company_name"] # decrypt_data = body["decrypt_data"] # bot_id = body["bot_id"] username = decrypt_data.get('FromUserName', '') agentid = decrypt_data.get('AgentID', '') qiwei_post_text(username, "正在加载,请稍后,预计2分钟...", agentid) logger.info("正在加载,请稍后,预计2分钟...") logger.info(f"consumer 请求:{user_query}") # 返回coze结果 coze_response,company_name,url = call_llm(user_query=user_query,bot_id=bot_id,coze_access_token = coze_access_token,company_name= company_name) # answer = coze_response['messages'][1]['content']#v2 # answer = coze_response['data'][1]['content'].replace(" ","") #v3 删除图片url中的空格 # print(coze_response) answer = coze_response # ##处理图片链接 # image_counter = 1 # # 定义一个替换函数,用于在替换时添加序号 # def replace_with_counter(match): # nonlocal image_counter # alt_text = match.group(1) or f"示例图片{image_counter}" # url = match.group(2) # replacement = f'{alt_text}' # image_counter += 1 # return replacement # # 将Markdown格式的图片链接转换为HTML格式的文字链接,并添加序号 # answer = re.sub(r'!\[(.*?)\]\((https?://[^)]+)\)', replace_with_counter, coze_response) # print(f"结果:{answer}") logger.info(f"结果:{answer}") # 主动发结果给qiwei qiwei_post_card(username, answer, agentid,company_name,url) @app.get("/ok") async def ok(): return "ok" @app.get("/bot") async def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str): ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr) if ret == 0: return Response(content=sEchoStr.decode('utf-8')) else: # print(sEchoStr) logger.info(sEchoStr) # 创建一个字典来存储用户的bot_id状态 user_bot_id_mapping = {} @app.post("/bot") async def recv(msg_signature: str, timestamp: str, nonce: str, request: Request, background_tasks: BackgroundTasks): #start_time = time.time() body = await request.body() request_id = str(uuid.uuid4()) request_id_context.set(request_id) ret, sMsg = wxcpt.DecryptMsg(body.decode('utf-8'), msg_signature, timestamp, nonce) decrypt_data = {} for node in list(fromstring(sMsg.decode('utf-8'))): decrypt_data[node.tag] = node.text # 获取用户发送的消息内容 company_name = decrypt_data.get('Content', '') logger.info(f"start user_query: {company_name}") logger.info(f"start request_id: {request_id}") user_query = f"""query:{company_name} request_id: {request_id}""" # 处理其他类型的消息 logger.info(f"start prompt: {user_query}") # background_tasks.add_task(post_consumer_api, user_query, decrypt_data, request_id,company_name) post_consumer_api(user_query, decrypt_data, request_id, company_name) return Response(content="") if __name__ == "__main__": # coze_response = call_llm(prompt="房快贷是什么",bot_id=bot_id,coze_access_token = coze_access_token) # print(coze_response) try: port = sys.argv[1] int(port) except: port = 18067 request_id_context.set("app start") logger.info(f'{port=}') print(f'{port=}') uvicorn.run("coze_znjd_api:app", port=int(port), host='0.0.0.0', reload=False,ssl_keyfile="./key.pem", ssl_certfile="./cert.pem")