# -*- coding : utf-8 -*- """测试gpu性能""" import re import sys import oss2 import json import time import tqdm import pymysql import requests import datetime import threadpool import logging import sys # 初始化日志对象 logging.basicConfig( # 日志级别 level = logging.INFO, # 日志格式 # 时间、代码所在文件名、代码行号、日志级别名字、日志信息 format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', # 打印日志的时间 datefmt = '%Y-%m-%d %H:%M:%S', ) try: type = sys.argv[1] except: type = "transfer" try: start_date = sys.argv[2] except: start_date = datetime.datetime.now().strftime('%Y-%m-%d') try: end_date = sys.argv[3] except: end_date = datetime.datetime.now().strftime('%Y-%m-%d') try: current_date = sys.argv[4] except: current_date = datetime.datetime.now().strftime('%Y-%m-%d') current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') pool = threadpool.ThreadPool(30) def send_wechat_warning(msg): headers = {'Content-Type': 'application/json'} text = {'content': msg} webhook = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=2bf76dd0-85a7-4c7b-b9b3-5f62e2232499' data = { 'msgtype': 'text', 'text': text } data = json.dumps(data) response = requests.request('POST', webhook, headers=headers, data=data, timeout=1) return response def get_already_human_dialogue(): connection = pymysql.connect(host="47.92.193.147", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor = connection.cursor() sql = 'select distinct(call_id) ' \ 'from human_dialogue_limit_and_interest where call_start_time >= "{} 08:00:00" ' \ 'and call_start_time <= "{} 21:00:00"; ' cursor.execute(sql.format(start_date, end_date)) results = cursor.fetchall() cursor.close() connection.close() records = [result["call_id"] for result in results] return records def get_already_human_dialogue_nlu_result(): connection = pymysql.connect(host="47.92.193.147", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor = connection.cursor() sql = """select distinct(call_id) from human_dialogue_nlu_result_limit_and_interest where create_time > "{} 21:00:00" """ cursor.execute(sql.format(start_date)) results = cursor.fetchall() cursor.close() connection.close() records = [result["call_id"] for result in results] return records def get_outbound_call_result(): connection = pymysql.connect(host="39.103.215.119", port=3308, user="ds_user", passwd="Moxi123#", # db="data_center_temp", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = connection.cursor() sql_human = """select ocr.customer_id, ocr.company_id, ocr.case_id, ocr.dm_session_id, ocr.voice_path, ocr.call_start_time, ocr.answer_time, ocr.transfer_time, ocr.call_end_time, ocr.dm_version, ocr.robot_answer_duration, ocr.human_answer_duration from ods_outbound_data_platform.outbound_call_result ocr where (ocr.batch_id like '%H%' or ocr.batch_id like '%高分B%') and ocr.call_start_time > "{} 08:00:00" and ocr.call_start_time < "{} 21:00:00" and ocr.company_id = 2186 and ocr.call_status = "normalConnection" and ocr.human_answer_duration > 1 """.format(start_date, end_date) cursor.execute(sql_human) results = cursor.fetchall() return results def transfer(workspace_id, task_type): # 已插入的记录 already_records = get_already_human_dialogue() results = get_outbound_call_result() args_human_transfer = list() args_transfer_transfer = list() for result in tqdm.tqdm(results): IP = "360" case_id = result['case_id'] agent_id = '' voice_path = result['voice_path'] customer_id = result['customer_id'] company_id = result['company_id'] session_id = result['dm_session_id'] call_start_time = result['call_start_time'] answer_time = result['answer_time'] transfer_time = result['transfer_time'] call_end_time = result['call_end_time'] robot_duration = result['robot_answer_duration'] talk_time = result['human_answer_duration'] dm_version = result['dm_version'] if voice_path and session_id not in already_records: # 1call # 机打实转 if robot_duration: args_human_transfer.append(([case_id, customer_id, voice_path, workspace_id, company_id, session_id, agent_id, call_start_time, answer_time, answer_time, transfer_time if transfer_time else '', call_end_time, talk_time, '{}_实转'.format(IP)], None)) # 纯人 else: args_transfer_transfer.append(([case_id, customer_id, voice_path, workspace_id, company_id, session_id, agent_id, call_start_time, answer_time, answer_time, transfer_time if transfer_time else '', call_end_time, talk_time, '{}_纯人'.format(IP)], None)) msg = "额度利率\n{}\n{}\ntransfer\n实转待完成:{}\n纯人待完成:{}" send_wechat_warning(msg.format(start_date, end_date, len(args_human_transfer), len(args_transfer_transfer))) requests = threadpool.makeRequests(call_asr, args_transfer_transfer) [pool.putRequest(req) for req in requests] pool.wait() requests = threadpool.makeRequests(call_asr, args_human_transfer) [pool.putRequest(req) for req in requests] pool.wait() def call_asr(call_id, customer_id, voice_path, workspace_id, company_id, session_id, agent_id, call_start_time, call_end_time, answer_start_time, transfer_time, answer_end_time, talk_time, IP): # 只转文本 url = 'http://localhost:9505/quality_test?&customerId={}&wavPath={}&workspaceId={}&sessionId={}&' \ 'companyId={}&agentId={}&createTime={}&endTime={}&answerTime={}&transferTime={}&answerEndTime={}&talkTime={}&IP={}'.format( session_id, voice_path, workspace_id, session_id, company_id, agent_id, call_start_time, call_end_time, answer_start_time, transfer_time, answer_end_time, talk_time, IP ) response = requests.get(url) # 插入记录 def insert(): # 获取human_dialogue_new已有记录 already_records = get_already_human_dialogue() # 获取最新结果 with open("/home/work/workdir/project/quality_test/360_line/asr_output/{}.txt".format(current_date), "r") as f: lines = f.readlines() ll = [line.strip().split(",") for line in lines] # 清楚空格等符号 ll = [[i.strip() for i in l] for l in ll] # 过滤已有记录 lt = [tuple(l) for l in ll if l[0] not in already_records] connection = pymysql.connect(host="47.92.193.147", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor = connection.cursor() # 插入记录 sql = 'insert into human_dialogue_limit_and_interest ' \ '(call_id, content_text, identity, workspace_id, company_id, call_start_time, call_end_time, answer_start_time, answer_end_time, talk_time, type)' \ 'values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);' try: # 执行sql语句 cursor.executemany(sql, lt) # 提交事务 connection.commit() send_wechat_warning('额度利率插入{}条数据成功'.format(len(lt))) except Exception as e: print(e) # 如果出现异常,回滚 connection.rollback() send_wechat_warning('额度利率插入{}条数据失败'.format(len(lt))) finally: # 关闭数据库连接 connection.close() def recognize(): original_records = get_already_human_dialogue() target_records = get_already_human_dialogue_nlu_result() params = [[record, 'edulilv'] for record in original_records if record not in target_records] send_wechat_warning("额度利率总量{}通\n已打标签{}通\n待打标签{}通".format(len(original_records), len(target_records), len(params))) params = [(param, None) for param in params] reqs = threadpool.makeRequests(call_dm, params) [pool.putRequest(req) for req in reqs] pool.wait() # 调用dm def call_dm(session_id, company_id): url = 'http://172.26.4.253:9900/realTimeLabel?callId={}&workspaceIdUser={}&workspaceIdSeat={}&in_table={}&out_table={}&company_id={}'.format(session_id, '222', '364', 'human_dialogue_limit_and_interest', 'human_dialogue_nlu_result_limit_and_interest', company_id) response = requests.get(url) if __name__ == '__main__': if type == "transfer": transfer('74743', '1') send_wechat_warning("额度利率\n{}\n{}\ntransfer\n已完成".format(start_date, end_date)) elif type == "insert": insert() elif type == "recognize": recognize() send_wechat_warning("额度利率\n{}\n{}\nrecognize\n已完成".format(start_date, end_date))