# -*- coding : utf-8 -*- """测试gpu性能""" import re import sys import oss2 import json import time import tqdm import pymysql import requests import datetime import threadpool import logging import sys import psycopg2 import psycopg2.extras # 初始化日志对象 logging.basicConfig( # 日志级别 level = logging.INFO, # 日志格式 # 时间、代码所在文件名、代码行号、日志级别名字、日志信息 format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', # 打印日志的时间 datefmt = '%Y-%m-%d %H:%M:%S', ) #任务类型 try: type = sys.argv[1] except: type = "transfer" # 开始日期 try: start_date = sys.argv[2] except: start_date = datetime.datetime.now().strftime('%Y-%m-%d') # 结束日期 try: end_date = sys.argv[3] except: end_date = datetime.datetime.now().strftime('%Y-%m-%d') try: current_date = sys.argv[4] except: current_date = datetime.datetime.now().strftime('%Y-%m-%d') current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') pool = threadpool.ThreadPool(30) def send_wechat_warning(msg): headers = {'Content-Type': 'application/json'} text = {'content': msg} webhook = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=2bf76dd0-85a7-4c7b-b9b3-5f62e2232499' data = { 'msgtype': 'text', 'text': text } data = json.dumps(data) response = requests.request('POST', webhook, headers=headers, data=data, timeout=1) return response def get_already_human_dialogue(): connection = pymysql.connect(host="172.26.172.65", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor = connection.cursor() sql = """select distinct call_id from human_dialogue_ws where call_start_time >= "{} 08:00:00" and call_start_time <= "{} 21:00:00" """ cursor.execute(sql.format(start_date, end_date)) results = cursor.fetchall() cursor.close() connection.close() records = [result["call_id"] for result in results] return records def get_already_human_dialogue_nlu_result(): connection = pymysql.connect(host="172.26.172.65", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor = connection.cursor() sql = """select distinct call_id from human_dialogue_nlu_result_Jan """ cursor.execute(sql.format(start_date)) results = cursor.fetchall() cursor.close() connection.close() records = [result["call_id"] for result in results] return records def get_outbound_call_result(): # connection = pymysql.connect(host="39.103.215.119", # port=3308, # user="ds_user", # passwd="Moxi123#", # # db="data_center_temp", # charset='utf8mb4', # cursorclass=pymysql.cursors.DictCursor # ) # cursor = connection.cursor() conn = psycopg2.connect( host="hgprecn-cn-2r42t31q3003-cn-zhangjiakou.hologres.aliyuncs.com", port=80, user="LTAI5tMsnFz7WRxKJyBDK5uv", password="SZxfgbrWpDYaHOR4HsU5ENxDOw6Xxt", dbname="screen") cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) sql = """select ocr.customer_id, ocr.company_id, ocr.case_id, ocr.dm_session_id, ocr.voice_path, TO_CHAR((call_start_time :: TIMESTAMPTZ) AT TIME ZONE 'Asia/Shanghai', 'YYYY-MM-DD HH24:MI:SS') call_start_time, TO_CHAR((answer_time :: TIMESTAMPTZ) AT TIME ZONE 'Asia/Shanghai', 'YYYY-MM-DD HH24:MI:SS') answer_time, TO_CHAR((transfer_time :: TIMESTAMPTZ) AT TIME ZONE 'Asia/Shanghai', 'YYYY-MM-DD HH24:MI:SS') transfer_time, TO_CHAR((call_end_time :: TIMESTAMPTZ) AT TIME ZONE 'Asia/Shanghai', 'YYYY-MM-DD HH24:MI:SS') call_end_time, ocr.dm_version, ocr.robot_answer_duration, ocr.human_answer_duration from screen.public.outbound_call_result ocr where date(ocr.call_start_time) = '2024-04-10' and ocr.company_id = 2252 and ocr.call_status = 'normalConnection' and ocr.human_answer_duration > 30 """ cursor.execute(sql) results = cursor.fetchall() results = [dict(remote_result) for remote_result in results] cursor.close() return results def transfer(workspace_id, task_type): already_records = get_already_human_dialogue() results = get_outbound_call_result() print(f'exist:{len(already_records)}, need:{len(results)}') args_human_transfer = list() args_transfer_transfer = list() for result in tqdm.tqdm(results): IP = '金条' case_id = result['case_id'] agent_id = '' voice_path = result['voice_path'] customer_id = result['customer_id'] company_id = result['company_id'] session_id = result['dm_session_id'] call_start_time = result['call_start_time'] answer_time = result['answer_time'] transfer_time = result['transfer_time'] call_end_time = result['call_end_time'] robot_duration = result['robot_answer_duration'] talk_time = result['human_answer_duration'] dm_version = result['dm_version'] # 筛选 if voice_path and session_id not in already_records: # 1call # 机打实转 if robot_duration: args_human_transfer.append(([case_id, customer_id, voice_path, workspace_id, company_id, session_id, agent_id, call_start_time, answer_time, answer_time, transfer_time if transfer_time else '', call_end_time, talk_time, '{}_实转'.format(IP)], None)) # 纯人 else: args_transfer_transfer.append(([case_id, customer_id, voice_path, workspace_id, company_id, session_id, agent_id, call_start_time, answer_time, answer_time, transfer_time if transfer_time else '', call_end_time, talk_time, '{}_纯人'.format(IP)], None)) msg = "金条回填标签\n{}\n{}\ntransfer\n实转待完成:{}\n纯人待完成:{}" send_wechat_warning(msg.format(start_date, end_date, len(args_human_transfer), len(args_transfer_transfer))) requests = threadpool.makeRequests(call_asr, args_human_transfer) [pool.putRequest(req) for req in requests] pool.wait() requests = threadpool.makeRequests(call_asr, args_transfer_transfer) [pool.putRequest(req) for req in requests] pool.wait() def call_asr(call_id, customer_id, voice_path, workspace_id, company_id, session_id, agent_id, call_start_time, call_end_time, answer_start_time, transfer_time, answer_end_time, talk_time, IP): # 只转文本 url = 'http://localhost:9505/quality_test?&customerId={}&wavPath={}&workspaceId={}&sessionId={}&' \ 'companyId={}&agentId={}&createTime={}&endTime={}&answerTime={}&transferTime={}&answerEndTime={}&talkTime={}&IP={}'.format( session_id, voice_path, workspace_id, session_id, company_id, agent_id, call_start_time, call_end_time, answer_start_time, transfer_time, answer_end_time, talk_time, IP ) response = requests.get(url) def insert(): # 已有记录 already_records = get_already_human_dialogue() with open("/home/work/workdir/project/quality_test/qyz_line/asr_output/{}.txt".format(current_date), "r") as f: lines = f.readlines() ll = [line.strip().split(",") for line in lines] # 清楚空格等符号 ll = [[i.strip() for i in l] for l in ll] # 过滤已有记录 lt = [tuple(l) for l in ll if l[0] not in already_records and '-2252-' in l[0]] connection = pymysql.connect(host="47.92.193.147", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor = connection.cursor() # 插入记录 sql = 'insert into human_dialogue_ws ' \ '(call_id, content_text, identity, workspace_id, company_id, call_start_time, call_end_time, answer_start_time, answer_end_time, talk_time, type)' \ 'values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);' try: # 执行sql语句 cursor.executemany(sql, lt) # 提交事务 connection.commit() send_wechat_warning('金条回填标签\ninsert:{}\nfinish'.format(len(lt))) except Exception as e: print(e) # 如果出现异常,回滚 connection.rollback() send_wechat_warning('金条回填标签\ninsert:{}\nerror'.format(len(lt))) finally: # 关闭数据库连接 connection.close() def recognize(): original_records = get_already_human_dialogue() target_records = get_already_human_dialogue_nlu_result() params = [record for record in original_records if record not in target_records] params = [([param, 'label'], None) for param in params] send_wechat_warning("金条回填标签\nrecognize\n总量{}通\n已完成{}通\n未完成{}通".format(len(original_records), len(target_records), len(params))) requests = threadpool.makeRequests(call_dm, params) [pool.putRequest(req) for req in requests] pool.wait() # dm拼接对话接口 def call_dm(session_id, company_id): url = 'http://172.26.4.253:9900/realTimeLabel?callId={}&workspaceIdUser={}&workspaceIdSeat={}&in_table={}&out_table={}&company_id={}'.\ format(session_id, '222', '364', 'human_dialogue_Jan', 'human_dialogue_nlu_result_Jan', company_id) response = requests.get(url) def main(): if type == "transfer": transfer('642876', '1') elif type == "insert": insert() elif type == "recognize": recognize() if __name__ == '__main__': main() # transfer(75013, 1)