# -*- coding : utf-8 -*- import re import os import tqdm import json import glob import datetime import pymysql import requests import threadpool from openpyxl import Workbook, load_workbook def check_data(customer_id, call_start_time): connection = pymysql.connect(host="47.92.193.147", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) connection2 = pymysql.connect(host="39.103.234.234", port=13306, user="root", passwd="Moxi123#", db="outbound_call_platform_sale", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor = connection.cursor() cursor2 = connection2.cursor() sql = 'select call_id, identity, ' \ 'from human_dialogue ' \ 'where call_id = "{}"' \ 'and ocr.call_start_time > "{} 09:00:00" ' \ 'and ocr.call_start_time < "{} 21:00:00"; ' \ cursor.execute(sql.format(customer_id, call_start_time, call_start_time)) results = cursor.fetchall() sql_info = 'select call_id, call_user, sound_record_url, talk_time ' \ 'from huadanrecord ' \ 'where customer_id = "{}" ' \ 'and call_start_time = "{}"; ' cursor2.execute(sql_info.format(customer_id, call_start_time)) info = cursor2.fetchall() call_id, call_user, sound_record_url, talk_time = info[0].values() if results: sql_info = 'update human_dialogue ' \ 'set call_id = "{}" ' \ 'where call_id = "{}"' \ 'and ocr.call_start_time = "{}"; ' cursor2.execute(sql_info.format(call_id, customer_id, call_start_time)) connection2.commit() return call_id def recognition( conversation, customer_id, workspace_id, company_id, session_id, agent_id, create_time, end_time, answer_time, transfer_time, answer_end_time, talk_time, wav_path, voice_type, ip,): wb = Workbook() ws = wb.active sign_num = 0 content_list = list() '''result = check_data(customer_id, create_time) if result: customer_id = result''' try: if agent_id: ws.append([customer_id, wav_path, create_time, agent_id]) else: ws.append([customer_id, wav_path, session_id]) except Exception: ws.append([customer_id, wav_path]) content_list.append([customer_id, wav_path]) connection = pymysql.connect(host="47.92.193.147", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor = connection.cursor() sql = 'insert into human_dialogue ' \ '(call_id, content_text, identity, workspace_id, company_id, call_start_time, call_end_time, answer_start_time, answer_end_time, talk_time, type)' \ 'values ("{}", "{}", {}, {}, {}, "{}", "{}", "{}", "{}", "{}", {});' for i, c in enumerate(conversation): content, type, start_time, end_time = c if answer_time: start_time = int(start_time) # start_time = datetime.datetime.strptime(start_time.split('.')[0], "%M:%S").second start_time = datetime.timedelta(seconds=int(start_time)) if transfer_time: start_time = datetime.datetime.strptime(transfer_time, "%Y-%m-%d %H:%M:%S") + start_time else: start_time = datetime.datetime.strptime(answer_time, "%Y-%m-%d %H:%M:%S") + start_time start_time = start_time.strftime("%Y-%m-%d %H:%M:%S") end_time = int(end_time) # end_time = datetime.datetime.strptime(end_time.split('.')[0], "%M:%S").second end_time = datetime.timedelta(seconds=int(end_time)) if transfer_time: end_time = datetime.datetime.strptime(transfer_time, "%Y-%m-%d %H:%M:%S") + end_time else: end_time = datetime.datetime.strptime(answer_time, "%Y-%m-%d %H:%M:%S") + end_time # end_time = datetime.datetime.strptime(call_start_time, "%Y-%m-%d %H:%M:%S") + end_time end_time = end_time.strftime("%Y-%m-%d %H:%M:%S") if type == 1: if workspace_id: # print( customer_id, content, 1, workspace_id, company_id, create_time, end_time, answer_time, answer_end_time, talk_time, voice_type ) cursor.execute(sql.format( customer_id, content, 1, workspace_id, company_id, start_time, end_time, answer_time, answer_end_time, talk_time, voice_type )) ws.append([customer_id, '1_'+content]) content_list.append([customer_id, '1_'+content]) else: if workspace_id: # print(customer_id, content, 2, workspace_id, company_id, create_time, end_time, answer_time, answer_end_time, talk_time, voice_type) cursor.execute(sql.format( customer_id, content, 2, workspace_id, company_id, start_time, end_time, answer_time, answer_end_time, talk_time, voice_type )) ws.append([customer_id, '2_'+content]) content_list.append([customer_id, '2_'+content]) if workspace_id: connection.commit() connection.close() if not os.path.exists('./{}_data_dir/'.format(ip)): os.mkdir('./{}_data_dir/'.format(ip)) date = '{}-{}-{}'.format(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day) if glob.glob('./{}_data_dir/*.xlsx'.format(ip)) and not glob.glob('./{}_data_dir/*.xlsx'.format(ip))[-1].split('/')[-1].startswith(date): for file_path in glob.glob('./{}_data_dir/*.xlsx'.format(ip)): os.remove(file_path) wb.save( './{}_data_dir/'.format(ip) + '{}_{}_{}.xlsx'.format(date, customer_id, sign_num) ) return sign_num, content_list intent_list = [ '投诉', '别给我打电话了', '怀疑平台', '怎么有我的号码', '转人工', '怎么注销', '已经注销', '强烈拒绝' ] pool = threadpool.ThreadPool(20) def get_session_text(session_id): url = "http://47.92.193.147:8630/report/getDetailedRecord?sessionId=%s" % \ (session_id) resposne = requests.get(url) result = json.loads(resposne.text) # result = httpservice.http_get(url, timeout=5) return result["result"] def linshi_sex_false(): file_path = './性别不一致客户.xlsx' wb = load_workbook(file_path) ws = wb[wb.sheetnames[0]] wb_w = Workbook() ws_w = wb_w.active ws_w.append(['customer_id', 'dm_session_id', '质检结果', '质检标签']) transfer_list = list() for i, row in enumerate(tqdm.tqdm(ws.values)): if i < 10000 and i > 0: '''transfer_list.append(([row, ws_w], None)) requests = threadpool.makeRequests(linshi_sex_false_task, transfer_list) [pool.putRequest(req) for req in requests] pool.wait() wb_w.save('./性别错误——投诉解析_1.xlsx')''' customer_id = row[0] dm_session_id = row[1] result = get_session_text(dm_session_id) # print( dm_session_id ) sign = False final_list = list() for res in result: if res['speakerType'] == 'USER': user_text = res['msgContent'] # print( user_text ) try: reason = json.loads(res['idlResultJson']) except Exception: continue query = reason['query'] intention = reason['intention'][0].get('value', '') if reason['intention'] else '' try: reason_text = intention['standard_query'] if intention else '' reason_query = intention['original_query'] if intention else '' except Exception: reason_text = '' reason_query = '' if reason_text in intent_list: sign = True final_list.append(reason_text) ws_w.append([customer_id, dm_session_id, 1 if sign else 0, '_'.join(final_list)]) wb_w.save('./性别错误——投诉解析.xlsx') def linshi_sex_false_task(row, ws_w): customer_id = row[0] dm_session_id = row[1] result = get_session_text(dm_session_id) sign = False final_list = list() for res in result: if res['speakerType'] == 'USER': user_text = res['msgContent'] try: reason = json.loads(res['idlResultJson']) except Exception: continue query = reason['query'] intention = reason['intention'][0].get('value', '') if reason['intention'] else '' try: reason_text = intention['standard_query'] if intention else '' reason_query = intention['original_query'] if intention else '' except Exception: reason_text = '' reason_query = '' if reason_text in intent_list: sign = True final_list.append(reason_text) # print(final_list) ws_w.append([customer_id, dm_session_id, 1 if sign else 0, '_'.join(final_list)]) if __name__ == '__main__': linshi_sex_false()