|
@@ -9,10 +9,10 @@ Email: me@gentlecp.com
|
|
|
Create Date: 2021/6/19
|
|
|
-----------------End-----------------------------
|
|
|
"""
|
|
|
+import asyncio
|
|
|
import json
|
|
|
import re
|
|
|
import sys
|
|
|
-import threading
|
|
|
import time
|
|
|
import uuid
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
@@ -20,11 +20,15 @@ from xml.etree.ElementTree import fromstring
|
|
|
|
|
|
import requests
|
|
|
import uvicorn
|
|
|
-from fastapi import FastAPI, Response, Request, BackgroundTasks, Body
|
|
|
+from anyio import CapacityLimiter
|
|
|
+from anyio.lowlevel import RunVar
|
|
|
+from fastapi import FastAPI, Response, Request, Body, BackgroundTasks
|
|
|
|
|
|
from WXBizMsgCrypt3 import WXBizMsgCrypt
|
|
|
from commom import get_logger, request_id_context
|
|
|
|
|
|
+timeout = 30
|
|
|
+
|
|
|
logger = get_logger()
|
|
|
|
|
|
# 加载配置文件
|
|
@@ -52,6 +56,13 @@ coze_access_token = config['coze_access_token']
|
|
|
wxcpt = WXBizMsgCrypt(token, aeskey, corpid)
|
|
|
|
|
|
app = FastAPI()
|
|
|
+executor = ThreadPoolExecutor(max_workers=200)
|
|
|
+
|
|
|
+
|
|
|
+@app.on_event("startup")
|
|
|
+def startup():
|
|
|
+ print("start")
|
|
|
+ RunVar("_default_thread_limiter").set(CapacityLimiter(210))
|
|
|
|
|
|
|
|
|
# def call_llm(prompt: str, bot_id: str,coze_access_token:str):
|
|
@@ -95,28 +106,29 @@ def call_llm(prompt: str, bot_id: str, coze_access_token: str):
|
|
|
json=req_data)
|
|
|
chat_id = res_chat.json()["data"]["id"]
|
|
|
while True:
|
|
|
+ # logger.info(f"{prompt=}")
|
|
|
res_retrieve = requests.get(
|
|
|
f" https://api.coze.cn/v3/chat/retrieve?chat_id={chat_id}&conversation_id={conversation_id}",
|
|
|
headers=req_head)
|
|
|
res_json = res_retrieve.json()
|
|
|
# 首先判断网络状态是否为200
|
|
|
if res_retrieve.status_code != 200:
|
|
|
- logger.error(f"网络状态码失败,错误码:{res_retrieve.status_code}")
|
|
|
+ # logger.error(f"网络状态码失败,错误码:{res_retrieve.status_code}")
|
|
|
coze_response = f"网络状态码失败,错误码:{res_retrieve.status_code}"
|
|
|
return coze_response
|
|
|
# 判断状态码是否为0
|
|
|
if res_json["code"] != 0:
|
|
|
- logger.error(f"API调用失败,错误码:{res_json['code']}")
|
|
|
+ # logger.error(f"API调用失败,错误码:{res_json['code']}")
|
|
|
coze_response = f"API调用失败,错误码:{res_json['code']}"
|
|
|
return coze_response
|
|
|
# 打印并记录状态
|
|
|
- logger.info(res_json["data"]["status"])
|
|
|
+ # logger.info(res_json["data"]["status"])
|
|
|
status = res_json["data"]["status"]
|
|
|
# 检查是否为错误状态
|
|
|
error_statuses = {"failed", "requires_action", "canceled"}
|
|
|
if status in error_statuses:
|
|
|
error_message = res_json["data"]["last_error"]
|
|
|
- logger.error(f"对话错误,状态:{status},错误信息:{error_message}")
|
|
|
+ # logger.error(f"对话错误,状态:{status},错误信息:{error_message}")
|
|
|
coze_response = f"对话错误,状态:{status},错误信息:{error_message}"
|
|
|
return coze_response
|
|
|
# 如果状态为completed,则获取消息
|
|
@@ -164,7 +176,7 @@ def qiwei_post(username: str, answer: str, agentid: str):
|
|
|
|
|
|
|
|
|
# 问题传入字节服务器进行回答后发送给企业微信,行内服务器只进行接收然后发给字节,防止网络延迟
|
|
|
-def post_consumer_api(user_query, decrypt_data, request_id):
|
|
|
+async def post_consumer_api(user_query, decrypt_data, request_id):
|
|
|
data = {
|
|
|
"user_query": user_query,
|
|
|
"decrypt_data": decrypt_data,
|
|
@@ -173,14 +185,27 @@ def post_consumer_api(user_query, decrypt_data, request_id):
|
|
|
request_id_context.set(request_id)
|
|
|
url = "https://101.126.81.2:18090/consumer"
|
|
|
try:
|
|
|
- logger.info(f"post_consumer_api 被执行{user_query}")
|
|
|
- t1 = threading.Thread(target=requests.post, kwargs={"url": url, "json": data, "verify": False})
|
|
|
- t1.start()
|
|
|
+ logger.info(f"post_consumer_api {user_query=}")
|
|
|
+
|
|
|
+ def consumer(data):
|
|
|
+ re = requests.post(url=url, json=data, verify=False)
|
|
|
+ return re.text
|
|
|
+
|
|
|
+ loop = asyncio.get_event_loop()
|
|
|
+ result = await loop.run_in_executor(executor, consumer, data)
|
|
|
+ logger.info(f"post_consumer_api {result=}")
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ # t1 = threading.Thread(target=requests.post, kwargs={"url": url, "json": data, "verify": False})
|
|
|
+ # t1.start()
|
|
|
+
|
|
|
# response = requests.post(url, json=data, verify=False) # 忽略SSL证书验证
|
|
|
# response.raise_for_status() # 检查响应状态码是否为200
|
|
|
# logger.info(f"post_consumer_api 请求成功: {response.json()}")
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
logger.error(f"post_consumer_api 请求失败: {e}")
|
|
|
+ return "error"
|
|
|
|
|
|
|
|
|
# 创建一个字典来存储用户的bot_id状态
|
|
@@ -190,11 +215,8 @@ user_bot_id_mapping = {}
|
|
|
user_welcome_status = {}
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
@app.post("/consumer")
|
|
|
-def consumer(
|
|
|
+async def consumer(
|
|
|
request_id: str = Body(...),
|
|
|
user_query: str = Body(...),
|
|
|
decrypt_data: dict = Body(...),
|
|
@@ -236,14 +258,14 @@ def consumer(
|
|
|
# 根据用户发送的消息内容切换bot_id
|
|
|
if msg_type == 'event' and event == 'click' and event_key == '#sendmsg#_0_0#7599826213209000':
|
|
|
bot_id = "7456977536891846697" # 当用户发送渝快振兴贷时使用的bot_id
|
|
|
- # change_message = "您好,已切换为渝快振兴贷产品助手,请输入问题。"
|
|
|
- # qiwei_post(username, change_message, agentid)
|
|
|
+ change_message = "您好,已切换为渝快振兴贷产品助手,请输入问题。"
|
|
|
+ qiwei_post(username, change_message, agentid)
|
|
|
user_bot_id_mapping[username] = bot_id # 更新用户的bot_id状态
|
|
|
return Response(content="")
|
|
|
elif msg_type == 'event' and event == 'click' and event_key == '#sendmsg#_0_1#7599826213209001':
|
|
|
bot_id = "7445101065005154313" # 当用户发送房快贷时使用的bot_id
|
|
|
- # change_message = "您好,已切换为房快贷产品助手,请输入问题。"
|
|
|
- # qiwei_post(username, change_message, agentid)
|
|
|
+ change_message = "您好,已切换为房快贷产品助手,请输入问题。"
|
|
|
+ qiwei_post(username, change_message, agentid)
|
|
|
user_bot_id_mapping[username] = bot_id # 更新用户的bot_id状态
|
|
|
return Response(content="")
|
|
|
else:
|
|
@@ -263,7 +285,16 @@ def consumer(
|
|
|
user_query = json.dumps(multimodal_content, ensure_ascii=False)
|
|
|
|
|
|
# 返回coze结果
|
|
|
- coze_response = call_llm(prompt=user_query, bot_id=bot_id, coze_access_token=coze_access_token)
|
|
|
+ loop = asyncio.get_event_loop()
|
|
|
+ future = loop.run_in_executor(executor, call_llm, user_query, bot_id, coze_access_token)
|
|
|
+
|
|
|
+ try:
|
|
|
+ coze_response = await asyncio.wait_for(future, timeout)
|
|
|
+ except asyncio.exceptions.TimeoutError as err:
|
|
|
+ return Response(content="timeout")
|
|
|
+
|
|
|
+ # coze_response = call_llm(prompt=user_query, bot_id=bot_id, coze_access_token=coze_access_token)
|
|
|
+
|
|
|
# answer = coze_response['messages'][1]['content']#v2
|
|
|
# answer = coze_response['data'][1]['content'].replace(" ","") #v3 删除图片url中的空格
|
|
|
|
|
@@ -284,20 +315,22 @@ def consumer(
|
|
|
|
|
|
# print(f"结果:{answer}")
|
|
|
logger.info(f"结果:{answer}")
|
|
|
- # 主动发结果给qiwei
|
|
|
qiwei_post(username, answer, agentid)
|
|
|
+ return Response(content=answer)
|
|
|
+ # 主动发结果给qiwei
|
|
|
+
|
|
|
|
|
|
# choice_answer = "若需要切换产品助手请输入产品名称(渝快振兴贷、房快贷),无需切换请忽略"
|
|
|
# qiwei_post(username, choice_answer, agentid)
|
|
|
|
|
|
|
|
|
@app.get("/ok")
|
|
|
-async def ok():
|
|
|
+def ok():
|
|
|
return "ok"
|
|
|
|
|
|
|
|
|
@app.get("/bot")
|
|
|
-async def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str):
|
|
|
+def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str):
|
|
|
ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
|
|
|
if ret == 0:
|
|
|
return Response(content=sEchoStr.decode('utf-8'))
|
|
@@ -306,9 +339,6 @@ async def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str):
|
|
|
logger.info(sEchoStr)
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
#
|
|
|
@app.post("/bot")
|
|
|
async def recv(msg_signature: str, timestamp: str, nonce: str, request: Request, background_tasks: BackgroundTasks):
|
|
@@ -321,22 +351,21 @@ async def recv(msg_signature: str, timestamp: str, nonce: str, request: Request,
|
|
|
for node in list(fromstring(sMsg.decode('utf-8'))):
|
|
|
decrypt_data[node.tag] = node.text
|
|
|
|
|
|
- # decrypt_data = body
|
|
|
- print(decrypt_data)
|
|
|
+ # print(body)
|
|
|
# 获取用户发送的消息内容
|
|
|
+ # decrypt_data = json.loads(body)
|
|
|
user_query = decrypt_data.get('Content', '')
|
|
|
# logger.info(f"start: {user_query}")
|
|
|
|
|
|
# print(event_key)
|
|
|
|
|
|
-
|
|
|
-
|
|
|
# 处理其他类型的消息
|
|
|
logger.info(f"start: {user_query}")
|
|
|
# background_tasks.add_task(post_consumer_api, user_query, decrypt_data, request_id, bot_id)
|
|
|
|
|
|
- post_consumer_api(user_query, decrypt_data, request_id)
|
|
|
- return Response(content="")
|
|
|
+ result = await post_consumer_api(user_query, decrypt_data, request_id)
|
|
|
+ # return Response(content="")
|
|
|
+ return Response(content=result)
|
|
|
|
|
|
# user_query = decrypt_data.get('Content', '')
|
|
|
# logger.info(f"start: {user_query}")
|