Kaynağa Gözat

add: request_id

yq 3 ay önce
ebeveyn
işleme
6cdfed2a02
4 değiştirilmiş dosya ile 108 ekleme ve 6 silme
  1. 12 0
      commom/__init__.py
  2. 65 0
      commom/logger.py
  3. 19 0
      commom/traceId_util.py
  4. 12 6
      coze_bot_api.py

+ 12 - 0
commom/__init__.py

@@ -0,0 +1,12 @@
+# -*- coding:utf-8 -*-
+"""
+@author: yq
+@time: 2021/11/9
+@desc: 
+"""
+from .logger import get_logger
+from .traceId_util import request_id_context
+
+__all__ = ['get_logger','request_id_context']
+
+

+ 65 - 0
commom/logger.py

@@ -0,0 +1,65 @@
+# -*- coding:utf-8 -*-
+"""
+@author: yq
+@time: 2022/8/29
+@desc:
+"""
+
+import datetime
+import logging
+import logging.handlers
+import os
+import threading
+import time
+from os.path import dirname, realpath
+
+import pytz
+
+from commom.traceId_util import TraceIdFilter
+
+
+def my_time(*args):
+    return time.strptime(datetime.datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"),
+                         "%Y-%m-%d %H:%M:%S")
+
+
+_instance_lock = threading.Lock()
+logger_map = {}
+
+
+def get_logger(logger_name: str = None) -> logging.Logger:
+    if logger_name is None:
+        logger_name = "app"
+    if logger_name in logger_map.keys():
+        return logger_map.get(logger_name)
+    with _instance_lock:
+        if logger_name in logger_map.keys():
+            return logger_map.get(logger_name)
+
+        _logger = logging.Logger(logger_name)
+        _logger.setLevel(logging.INFO)
+        _logger.addFilter(TraceIdFilter())
+
+        formatter = logging.Formatter(
+            '[%(asctime)s] [requestId-%(requestId)s] [%(levelname)s] [%(threadName)s] [%(filename)s] [func:%(funcName)s line:%(lineno)d]\n %(message)s')
+        formatter.converter = my_time
+
+        log_path = os.path.join(dirname(dirname(realpath(__file__))), "logs")
+        filename = os.path.join(log_path, f"{logger_name}.log")
+
+        if not os.path.exists(dirname(filename)):
+            os.makedirs(dirname(filename))
+        print(f"日志路径:{filename}")
+
+        handler = logging.handlers.TimedRotatingFileHandler(filename, when="MIDNIGHT", interval=30, backupCount=4,
+                                                            encoding="utf8", atTime=datetime.time(0, 0, 0, 0))
+        handler.setFormatter(formatter)
+        _logger.addHandler(handler)
+
+        console_handler = logging.StreamHandler()
+        console_handler.setFormatter(formatter)
+        _logger.addHandler(console_handler)
+
+        logger_map[logger_name] = _logger
+
+        return _logger

+ 19 - 0
commom/traceId_util.py

@@ -0,0 +1,19 @@
+# -*- coding:utf-8 -*-
+"""
+@author: yq
+@time: 2023/5/16
+@desc: 
+"""
+
+import logging
+from contextvars import ContextVar
+
+request_id_context = ContextVar('request_id')
+
+
+class TraceIdFilter(logging.Filter):
+
+    def filter(self, record):
+        record.requestId = request_id_context.get()
+        # record.requestId = "requestId"
+        return True

+ 12 - 6
coze_bot_api.py

@@ -12,6 +12,8 @@ Create Date: 2021/6/19
 import re
 import time
 import sys
+import uuid
+
 from fastapi import FastAPI, Response, Request, BackgroundTasks, Body
 from WXBizMsgCrypt3 import WXBizMsgCrypt
 from xml.etree.ElementTree import fromstring
@@ -19,7 +21,7 @@ import uvicorn
 import requests
 import json
 
-from commom import get_logger
+from commom import get_logger, request_id_context
 
 logger = get_logger()
 
@@ -151,13 +153,15 @@ def qiwei_post(username: str, answer: str,agentid:str):
     #return res.json()
 
 #问题传入字节服务器进行回答后发送给企业微信,行内服务器只进行接收然后发给字节,防止网络延迟
-def post_consumer_api(user_query, decrypt_data):
+def post_consumer_api(user_query, decrypt_data, request_id):
     data = {
         "user_query": user_query,
-        "decrypt_data": decrypt_data
+        "decrypt_data": decrypt_data,
+        "request_id": request_id,
     }
     url = "https://101.126.81.2:18088/consumer"
     try:
+        request_id_context.set(request_id)
         response = requests.post(url, json=data, verify=False)  # 忽略SSL证书验证
         response.raise_for_status()  # 检查响应状态码是否为200
         logger.info(f"post_consumer_api 请求成功: {response.json()}")
@@ -175,8 +179,8 @@ request: Request
     body = json.loads(body)
     user_query = body["user_query"]
     decrypt_data = body["decrypt_data"]
-
-
+    request_id = body["request_id"]
+    request_id_context.set(request_id)
     logger.info(f"consumer 请求:{user_query}")
     username = decrypt_data.get('FromUserName', '')
     agentid = decrypt_data.get('AgentID', '')
@@ -224,13 +228,15 @@ async def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str):
 async def recv(msg_signature: str, timestamp: str, nonce: str, request: Request, background_tasks: BackgroundTasks):
     #start_time = time.time()
     body = await request.body()
+    request_id = str(uuid.uuid4())
+    request_id_context.set(request_id)
     ret, sMsg = wxcpt.DecryptMsg(body.decode('utf-8'), msg_signature, timestamp, nonce)
     decrypt_data = {}
     for node in list(fromstring(sMsg.decode('utf-8'))):
         decrypt_data[node.tag] = node.text
     user_query = decrypt_data.get('Content', '')
     logger.info(f"start: {user_query}")
-    background_tasks.add_task(post_consumer_api, user_query, decrypt_data)
+    background_tasks.add_task(post_consumer_api, user_query, decrypt_data, request_id)
     # data = {
     #     "user_query":user_query,
     #     "decrypt_data":decrypt_data