#!/usr/bin/env python # -*- coding: utf-8 -*- """ -----------------File Info----------------------- Name: web.py Description: web api support Author: GentleCP Email: me@gentlecp.com Create Date: 2021/6/19 -----------------End----------------------------- """ import re import time import sys import uuid from fastapi import FastAPI, Response, Request, BackgroundTasks, Body from WXBizMsgCrypt3 import WXBizMsgCrypt from xml.etree.ElementTree import fromstring import uvicorn import requests import json from commom import get_logger, request_id_context logger = get_logger() # 加载配置文件 with open('cqrcb_config.json', 'r') as f: config = json.load(f) # 从配置文件中提取参数 token = config['token'] aeskey = config['aeskey'] corpid = config['corpid'] corpsecret = config['corpsecret'] coze_access_token = config['coze_access_token'] bot_id = config['bot_id'] #port = config['port'] # token = "EcSp"#企业微信应用api信息 # aeskey = "OTZoY8N67kOnGosEpS3jw4Rsjea0Gu6D7X4IWxoYKtY"#企业微信应用api信息 # corpid = "ww5541cfeea51e3188"#企业id # corpsecret = "SbyG25s1LsMsW0nAMiaNprrQIHYrWKQP4f2mNLLDnwE"##api成功后的secret # coze_access_token = "pat_HNBYQOWE5h4r1tzXi8S2PuY4ddoVRH3DpTbE3NsYBjtcWHTYw5ffrVmKPh26hSLW"#豆包access_token # bot_id="7397619068440182793"#豆包机器人id # port = 18090#服务器端口 wxcpt = WXBizMsgCrypt(token, aeskey, corpid) app = FastAPI() # def call_llm(prompt: str, bot_id: str,coze_access_token:str): # req_head = { # "Authorization":f"Bearer {coze_access_token}", # "Content-Type": "application/json", # } # req_data = { # "conversation_id": "123", # "bot_id": bot_id, # "user": "test", # "query": prompt, # "stream": False # } # res = requests.post("https://api.coze.cn/open_api/v2/chat", headers=req_head, json=req_data) # res.raise_for_status() # 检查响应状态码是否为200 # return res.json() def call_llm(prompt: str, bot_id: str,coze_access_token:str): req_head = { "Authorization":f"Bearer {coze_access_token}", "Content-Type": "application/json", } req_data ={ "bot_id": bot_id, "user_id": "123456789", "stream": False, "auto_save_history": True, "additional_messages": [ { "role": "user", "content": prompt, "content_type": "text" } ] } res_create = requests.post(" https://api.coze.cn/v1/conversation/create", headers=req_head) conversation_id = res_create.json()["data"]["id"] res_chat = requests.post(f" https://api.coze.cn/v3/chat?conversation_id={conversation_id}", headers=req_head,json=req_data) chat_id = res_chat.json()["data"]["id"] while True: res_retrieve = requests.get(f" https://api.coze.cn/v3/chat/retrieve?chat_id={chat_id}&conversation_id={conversation_id}", headers=req_head) res_json = res_retrieve.json() # 首先判断网络状态是否为200 if res_retrieve.status_code != 200: logger.error(f"网络状态码失败,错误码:{res_retrieve.status_code }") coze_response = f"网络状态码失败,错误码:{res_retrieve.status_code }" return coze_response # 判断状态码是否为0 if res_json["code"] != 0 : logger.error(f"API调用失败,错误码:{res_json['code']}") coze_response = f"API调用失败,错误码:{res_json['code']}" return coze_response # 打印并记录状态 logger.info(res_json["data"]["status"]) status = res_json["data"]["status"] # 检查是否为错误状态 error_statuses = {"failed", "requires_action", "canceled"} if status in error_statuses: error_message = res_json["data"]["last_error"] logger.error(f"对话错误,状态:{status},错误信息:{error_message}") coze_response = f"对话错误,状态:{status},错误信息:{error_message}" return coze_response # 如果状态为completed,则获取消息 if status == "completed": res_message = requests.get(f"https://api.coze.cn/v3/chat/message/list?chat_id={chat_id}&conversation_id={conversation_id}", headers=req_head) coze_response = res_message.json()['data'][1]['content'].replace(" ", "") # v3 删除图片url中的空格 # coze_response = coze_response['data'][1]['content'].replace(" ", "") # v3 删除图片url中的空格 return coze_response time.sleep(1) def qiwei_get(): res = requests.get(f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}") qw_access_token = res.json()["access_token"] return qw_access_token def qiwei_post(username: str, answer: str,agentid:str): req_data = { "touser": username, "toparty": "", "totag": "", "msgtype": "text", "agentid": agentid, "text": {"content": answer}, "image": { "media_id": "MEDIA_ID" }, "safe": 0, "enable_id_trans": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 } res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data) # print(res.json()) logger.info(res.json()) #return res.json() def qiwei_post_loading(username: str, answer: str,agentid:str): req_data = { "touser": username, "toparty": "", "totag": "", "msgtype": "text", "agentid": agentid, "text": {"content": answer}, "image": { "media_id": "MEDIA_ID" }, "safe": 0, "enable_id_trans": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 } res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data) # print(res.json()) logger.info(res.json()) #return res.json() #问题传入字节服务器进行回答后发送给企业微信,行内服务器只进行接收然后发给字节,防止网络延迟 def post_consumer_api(user_query, decrypt_data, request_id): data = { "user_query": user_query, "decrypt_data": decrypt_data, "request_id": request_id, } url = "https://101.126.81.2:18088/consumer" try: request_id_context.set(request_id) response = requests.post(url, json=data, verify=False) # 忽略SSL证书验证 response.raise_for_status() # 检查响应状态码是否为200 logger.info(f"post_consumer_api 请求成功: {response.json()}") except requests.exceptions.RequestException as e: logger.error(f"post_consumer_api 请求失败: {e}") @app.post("/consumer") async def consumer( request: Request ): # print(f"请求:{user_query}") body = await request.body() body = body.decode() body = json.loads(body) request_id = body["request_id"] request_id_context.set(request_id) user_query = body["user_query"] decrypt_data = body["decrypt_data"] username = decrypt_data.get('FromUserName', '') agentid = decrypt_data.get('AgentID', '') qiwei_post(username, "正在加载,请稍后...", agentid) logger.info("正在加载,请稍后...") logger.info(f"consumer 请求:{user_query}") # 返回coze结果 coze_response = call_llm(prompt=user_query,bot_id=bot_id,coze_access_token = coze_access_token) # answer = coze_response['messages'][1]['content']#v2 # answer = coze_response['data'][1]['content'].replace(" ","") #v3 删除图片url中的空格 ##处理图片链接 image_counter = 1 # 定义一个替换函数,用于在替换时添加序号 def replace_with_counter(match): nonlocal image_counter alt_text = match.group(1) or f"示例图片{image_counter}" url = match.group(2) replacement = f'{alt_text}' image_counter += 1 return replacement # 将Markdown格式的图片链接转换为HTML格式的文字链接,并添加序号 answer = re.sub(r'!\[(.*?)\]\((https?://[^)]+)\)', replace_with_counter, coze_response) # print(f"结果:{answer}") logger.info(f"结果:{answer}") # 主动发结果给qiwei qiwei_post(username, answer, agentid) @app.get("/ok") async def ok(): return "ok" @app.get("/bot") async def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str): ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr) if ret == 0: return Response(content=sEchoStr.decode('utf-8')) else: # print(sEchoStr) logger.info(sEchoStr) @app.post("/bot") async def recv(msg_signature: str, timestamp: str, nonce: str, request: Request, background_tasks: BackgroundTasks): #start_time = time.time() body = await request.body() request_id = str(uuid.uuid4()) request_id_context.set(request_id) ret, sMsg = wxcpt.DecryptMsg(body.decode('utf-8'), msg_signature, timestamp, nonce) decrypt_data = {} for node in list(fromstring(sMsg.decode('utf-8'))): decrypt_data[node.tag] = node.text user_query = decrypt_data.get('Content', '') logger.info(f"start: {user_query}") background_tasks.add_task(post_consumer_api, user_query, decrypt_data, request_id) # data = { # "user_query":user_query, # "decrypt_data":decrypt_data # } # requests.post( # f"https://101.126.81.2:18066/consumer", # data=data) return Response(content="") if __name__ == "__main__": # coze_response = call_llm(prompt="房快贷是什么",bot_id=bot_id,coze_access_token = coze_access_token) # print(coze_response) try: port = sys.argv[1] int(port) except: port = 18088 logger.info(f'{port=}') uvicorn.run("coze_bot_api:app", port=port, host='0.0.0.0', reload=False,ssl_keyfile="./key.pem", ssl_certfile="./cert.pem")