# -*- coding : utf-8 -*- import os import glob import tqdm import time import pymysql import datetime import requests import threading import tornado.web import tornado.ioloop import tornado.options import tornado.httpserver import nest_asyncio from urllib import parse from tornado.web import RequestHandler from openpyxl import Workbook, load_workbook import wav_to_server_rule_bak as rule from utillib import PlatformBase nest_asyncio.apply() port = 9505 wav_dir = './wav_dir' class Quality_test(PlatformBase): def optimize_record(self, chat_record): content_list = list() if chat_record: for c in chat_record: if c.content not in ['', ',', '正在呼叫请稍候,', '用户已挂机', '对对对对对对对,', '对对对对对对,', '对对对对对,', '对对对对,', '多多多多多多多,', '多多多多多多,', '多多多多多,', '多多多多,', '多多多', ]: # c.content = c.content.replace(',', '') content_list.append([c.content, 0 if c.role == 'staff' else 1, c.start_time, c.end_time]) return content_list def task(self, customer_id, workspace_id, company_id, session_id, agent_id, create_time, end_time, answer_time, transfer_time, answer_end_time, talk_time, wav_path, voice_type, ip, file_name=''): if not voice_type: voice_type = 0 self.download_to_wav(wav_path, customer_id, ip) # logging.info( self.file_path ) # logging.info( os.path.exists(self.file_path) ) if talk_time: voice_type = 1 self.get_human_port_from(talk_time) chat_record, time, channel, sign = self.get_content(customer_id) if not sign: return 0 content_list = self.optimize_record(chat_record) sign, content_list = rule.recognition(content_list, customer_id, workspace_id, company_id, session_id, agent_id, create_time, end_time, answer_time, transfer_time, answer_end_time, talk_time, wav_path, voice_type, ip, file_name) return content_list def get(self): ip = self.get_argument('IP', '') customer_id = self.get_argument('customerId', '') wav_path = self.get_argument('wavPath', '') workspace_id = self.get_argument('workspaceId', '') company_id = self.get_argument('companyId', '') session_id = self.get_argument('sessionId', '') create_time = self.get_argument('createTime', '') end_time = self.get_argument('endTime', '') answer_time = self.get_argument('answerTime', '') transfer_time = self.get_argument('transferTime', '') answer_end_time = self.get_argument('answerEndTime', '') talk_time = self.get_argument('talkTime', '') agent_id = self.get_argument('agentId', '') file_name = self.get_argument('fileName', '') wav_paths = wav_path.split('.') if wav_paths[0] in ['https://lingxi-ai', 'https://lxai'] and not wav_paths[1].endswith('-internal'): wav_paths[1] += '-internal' wav_path = '.'.join(wav_paths) voice_type = self.get_argument('voiceType', '') aws_access_key = self.get_argument('AWSAccessKeyId', '') if self.get_argument('AWSAccessKeyId', '') else '' oss_access_key = self.get_argument('OSSAccessKeyId', '') if self.get_argument('OSSAccessKeyId', '') else '' expires = self.get_argument('Expires', '') if self.get_argument('Expires', '') else '' signature = self.get_argument('Signature', '') if self.get_argument('Signature', '') else '' # logging.info( wav_path ) if expires and signature: if aws_access_key: wav_path = '&'.join([wav_path, 'AWSAccessKeyId=' + aws_access_key, 'Expires=' + expires, 'Signature=' + parse.quote(signature)]) elif oss_access_key: wav_path = '&'.join([wav_path, 'OSSAccessKeyId=' + oss_access_key, 'Expires=' + expires, 'Signature=' + parse.quote(signature)]) else: wav_path = '&'.join([wav_path, 'Expires=' + expires, 'Signature=' + parse.quote(signature)]) # logging.info( wav_path ) content_list = self.task( customer_id, workspace_id, company_id, session_id, agent_id, create_time, end_time, answer_time, transfer_time, answer_end_time, talk_time, wav_path, voice_type, ip, file_name) '''p = threading.Thread(target=self.task, args=(customer_id, workspace_id, company_id, session_id, agent_id, create_time, end_time, answer_time, transfer_time, answer_end_time, talk_time, wav_path, voice_type, ip,)) p.start()''' data = { 'msg': 'success', 'data': content_list[1:] } self.write(data) class Quality_new_test(Quality_test): def task(self, customer_id, wav_path, ip, create_time, end_time, workspace_id, company_id, answer_time, answer_end_time, talk_time, transfer_time, voice_type): if not voice_type: voice_type = 0 self.download_to_wav(wav_path, customer_id, ip) if talk_time: voice_type = 1 self.get_human_port_from(talk_time) chat_record, time, channel, sign = self.get_content(customer_id) if not sign: return 0 content_list = self.optimize_record(chat_record) sign, content_list = rule.recognition(content_list, customer_id, wav_path, ip, workspace_id, company_id, create_time, end_time, voice_type, answer_time, answer_end_time, talk_time, transfer_time, ) '''try: if single_sign: url = 'http://192.168.1.75:9019/callbackResult?customerId={}'.format(customer_id, ) data = json.dumps({'result': content_list}) response = requests.post(url, data=data) except Exception: traceback.print_exc()''' # self.record_all([ip, sign, customer_id]) def get(self): ip = self.get_argument('IP', '') customer_id = self.get_argument('customerId', '') wav_path = self.get_argument('wavPath', '') workspace_id = self.get_argument('workspaceId', '') company_id = self.get_argument('companyId', '') create_time = self.get_argument('createTime', '') end_time = self.get_argument('endTime', '') answer_time = self.get_argument('answerTime', '') transfer_time = self.get_argument('transferTime', '') answer_end_time = self.get_argument('answerEndTime', '') talk_time = self.get_argument('talkTime', '') voice_type = self.get_argument('voiceType', '') access_key = 'OSSAccessKeyId=' + self.get_argument('OSSAccessKeyId', '') if self.get_argument('OSSAccessKeyId', '') else '' expires = 'Expires=' + self.get_argument('Expires', '') if self.get_argument('Expires', '') else '' signature = 'Signature=' + self.get_argument('Signature', '') if self.get_argument('Signature', '') else '' if expires and signature: wav_path = '&'.join([wav_path, access_key, expires, parse.quote(signature)]) self.task(customer_id, wav_path, ip, create_time, end_time, workspace_id, company_id, answer_time, answer_end_time, talk_time, transfer_time, voice_type) '''p = threading.Thread(target=self.task, args=(customer_id, wav_path, ip, create_time, end_time, workspace_id, company_id, answer_time, answer_end_time, talk_time, transfer_time, voice_type, asr_type)) p.start()''' self.write('success') class transform(RequestHandler): def get(self): ip = self.get_argument('ip', '') custom_id = self.get_argument('customerId', '') check_result = self.get_argument('checkResult', '') response = requests.get( 'http://{}:8004/writeBack?customerId={}&checkResult={}'.format(ip, custom_id, check_result), time_out=3) class Get_num(RequestHandler): def get(self): ip = self.get_argument('ip', '') num = len(glob.glob('./{}_data_dir/*.xlsx'.format(ip))) wav_url = self.get_argument('wavPath', '') print(wav_url) self.write(str(num)) class Call_back(RequestHandler):\ def task(self, ip, company): date = '{}-{}-{}'.format(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day) xlsx_list = glob.glob('./{}_data_dir/*.xlsx'.format(ip)) wb = Workbook() ws = wb.active ws['A1'] = '话单ID' ws['B1'] = '录音内容' for xlsx in xlsx_list: try: wb_s = load_workbook(xlsx) ws_s = wb_s[wb_s.sheetnames[0]] for row in ws_s.values: # row = row[:list(row).index(None)] ws.append(row) ws.append(['', ]) except Exception: pass wb.save('./{}_data_dir/{}_{}.xlsx'.format(ip, date, ip)) def get(self): ip = self.get_argument('ip', '') company = self.get_argument('company', '') p = threading.Thread(target=self.task, args=(ip, company)) p.start() class ASR_data(RequestHandler): def task(self, start_date, end_date, intent_id, records): worksapce_dict = {'10168': ['170', '312'], '10172': ['262', '303'], '10174': ['261', '303']} connection = pymysql.connect(host="47.92.76.236", port=13306, user="readonly", passwd="Moxi123#", db="outbound_platform", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) sql = 'select ocr.dm_session_id, ocr.voice_path ' \ 'from outbound_call_result ocr ' \ 'where ocr.call_start_time > "{}" ' \ 'and ocr.call_start_time < "{}" ' \ 'and ocr.company_id = {} ' \ 'and ocr.robot_answer_duration > 30 ' \ 'and ocr.call_status = "normalConnection" ' \ 'limit {}; '.format(start_date, end_date, worksapce_dict[intent_id][0], records) cursor = connection.cursor() cursor.execute(sql) results = cursor.fetchall() results_count = len(results) for i in tqdm.tqdm(range(results_count)): data = results[i] url = 'http://localhost:9504/quality_test?customerId={}&wavPath={}&IP={}'.format( data['dm_session_id'], data['voice_path'], '{}_asr'.format(intent_id)) response = requests.get(url) time.sleep(5) def get(self): records = self.get_argument('records', '') start_date = self.get_argument('start', '') end_date = self.get_argument('end', '') intent_id = self.get_argument('intentId', '') # self.task(start_date, end_date, intent_id, records) p = threading.Thread(target=self.task, args=(start_date, end_date, intent_id, records)) p.start() if __name__ == '__main__': tornado.options.parse_command_line() app = tornado.web.Application([ (r'/quality_test', Quality_test), (r'/quality_new_test', Quality_new_test), (r'/call_back', Call_back), (r'/get_num', Get_num), (r'/asr_data', ASR_data) ], static_path=os.path.join(os.path.dirname(__file__), "static"), template_path=os.path.join(os.path.dirname(__file__), "template"), debug=False ) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(port) http_server.start(50) print("starting") tornado.ioloop.IOLoop.current().start() print("starting ... ")