# -*- coding : utf-8 -*- import re import os import json import glob import tqdm import datetime import pymysql import logging import requests import threadpool from openpyxl import Workbook, load_workbook # from server import send_wechat_warning pool = threadpool.ThreadPool(15) user_sign_rule = r'(我是京东|工号)' user_pattern = re.compile(user_sign_rule) user_end_rule = r'再见' user_end_pattern = re.compile(user_end_rule) re_dict = { '企微话术': [[r'微信.{0,20}(添加|加上|加)|(添加|加).{0,20}微信', ], True, False], '节日营销话术': [[r'(双(十一|11|十二|12)|元旦|年底|过年|开学|孩子上学|618|五一|十一|寒假|暑假).*(购物|满减|囤货|打折|吃饭|旅游|教育)'], False, False] } def get(date, workspace_id): connection1 = pymysql.connect(host="47.92.193.147", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) connection2 = pymysql.connect(host="39.103.234.234", port=13306, user="root", passwd="Moxi123#", db="outbound_call_platform_sale", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor1 = connection1.cursor() cursor2 = connection2.cursor() sql = 'select distinct call_case_id ' \ 'from human_dialogue ' \ 'where workspace_id = {} ' \ 'and call_start_time > "{} 09:00:00" ' \ 'and call_start_time < "{} 23:59:59"; ' sql_id = 'select call_case_id, content_text, identity, workspace_id, company_id, call_start_time, type ' \ 'from human_dialogue ' \ 'where workspace_id = {} ' \ 'and call_start_time > "{} 09:00:00" ' \ 'and call_start_time < "{} 23:59:59" ' \ 'and call_case_id = "{}"; ' sql_info = 'select call_id, call_user, sound_record_url, talk_time ' \ 'from huadanrecord ' \ 'where customer_id = "{}" ' \ 'and call_start_time = "{}"; ' cursor1.execute(sql.format(workspace_id, date, date)) result = cursor1.fetchall() for res in tqdm.tqdm(result): call_case_id = res['call_case_id'] cursor1.execute(sql_id.format(workspace_id, date, date, call_case_id)) datas = cursor1.fetchall() conversation = list() for data in datas: call_start_time = data['call_start_time'] cursor2.execute(sql_info.format(call_case_id, call_start_time)) info = cursor2.fetchall() call_id, call_user, sound_record_url, talk_time = info[0].values() conversation.append([data['content_text'], data['identity']]) sign = recognition(conversation, call_id, call_case_id, call_user, call_start_time, sound_record_url, '素质质检', talk_time) record_all('素质质检', sign, call_case_id, call_start_time, call_user) def recognition(conversation, call_id, customer_id, who, call_date, wav_path, ip, time): wb = Workbook() ws = wb.active ws.append([call_id, customer_id, who, call_date, wav_path]) sign = False sign_check = True sign_nocheck = False finnal_list = list() user_sign = 1 for c in conversation: result = user_pattern.search(c[0]) if result: user_sign = c[1] break for c in conversation: if c[1] == user_sign: # 判断节日营销话术是否执行 intent = intent_judge(c[0], customer_id, ip, 'nlu获取意图错误') if intent.startswith('不需要') or intent in ['额度多少', '额度问题-额度少']: re_dict['节日营销话术执行'][1] = True # 判断用户是否输入敏感客户 判断用户流程完整不完整 intent = intent_judge(c[0], customer_id, ip, 'nlu获取意图错误') result = user_end_pattern.search(c[0]) if result or intent in ['投诉', '别给我打电话了', '怀疑平台']: for false_type in re_dict: re_dict[false_type][1] = False for i, c in enumerate(conversation): content, type = c '''try: content = process_queries(content) except Exception: pass''' if type == user_sign: for false_type in re_dict: # 执行标记,False为不质检,跳过 if re_dict[false_type][1]: continue for re_rule in re_dict[false_type]: pattern = re.compile(re_rule) result = pattern.search(content) if result: re_dict[false_type][2] = True finnal_list.append([call_id, customer_id, '', false_type, '', '', '{}_{}'.format(type, content)]) else: finnal_list.append([call_id, customer_id, '', '', '', '', '{}_{}'.format(type, content)]) else: # 使用线上意图模型识别意图 intent = intent_judge(content, customer_id, ip, 'nlu获取意图错误') if intent in ['投诉', '别给我打电话了', '怀疑平台']: finnal_list.append([call_id, customer_id, '', intent, '', '', '{}_{}'.format(type, content)]) else: finnal_list.append([call_id, customer_id, '', '', '', '', '{}_{}'.format(type, content)]) # 最终对话内容导入 for fina_content in finnal_list: try: ws.append(fina_content) except Exception: pass # 末尾添加总结记录 for false_type in re_dict: if re_dict[false_type][2]: ws.append([call_id, customer_id, '', '质检未说{}'.format(false_type), '', '', '']) if not os.path.exists('./{}_data_dir/'.format(ip)): os.mkdir('./{}_data_dir/'.format(ip)) date = '{}-{}-{}'.format(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day) if glob.glob('./{}_data_dir/*.xlsx'.format(ip)) and \ not glob.glob('./{}_data_dir/*.xlsx'.format(ip))[-1].split('/')[-1].startswith(date): for file_path in glob.glob('./{}_data_dir/*.xlsx'.format(ip)): os.remove(file_path) wb.save('./{}_data_dir/'.format(ip) + '{}_{}_{}_{}.xlsx'.format(date, call_id, sign, time)) return sign def intent_judge(query, customer_id, ip, text): url = 'http://47.92.230.239:8679/nlu?session_id=-1&workspace=222¤t_query={}'.format(str(query)) try: response = requests.get(url) result = json.loads(response.text) intention = result['intention'] if intention and intention[0].get('value', ''): return intention[0]['value']['standard_query'] else: return '' except Exception: pass # send_wechat_warning(customer_id, ip, text) return '' def process_queries(current_query): pass_word = ['不需要', '不用了', '不用', '一点', '考虑', '谢', '在', '看', '问', '想', '天', '刚', '试'] remove_word = [ '不好意思', '对不起', '谢谢你', '哎呀', '你好', '您好', '再见', '谢谢', '好吧', '感谢', '呃', '啊', '哦', '嘞', '喂', '哎', '哈', '哟', '哇', '呦', '拜' '嗯,', '嗯。', '嗯', ',好,', '。好,', ',行,', ',行。', ',对,', ',对。'] for remove in remove_word: if len(current_query.replace(remove, '')) > 2: current_query = current_query.replace(remove, '') # 删除相近相同词 sign = True while sign: i = 0 sign = False record_list = list() final_str = list() while i < len(current_query): repeat_num = 0 for j in range(1, 20): target = current_query[i:i + j] while True: if target == current_query[i + j * (repeat_num + 1):i + j * (repeat_num + 2)]: repeat_num += 1 else: break if repeat_num: record_append = (i, j, repeat_num) break if repeat_num: if target in pass_word or not (u'\u4e00' <= target <= u'\u9fff'): final_str.append(target * 2) else: final_str.append(target) sign = True record_list.append(target) i = i + j * (repeat_num + 1) continue final_str.append(current_query[i]) i += 1 current_query = ''.join(final_str) # 将被标点符号隔开的相同字合并在一块 current_query_result = '' for i in range(len(current_query)): if current_query[i] in [',', '。', '?', '!']: for j in range(1, 5): if current_query[i - j if i - j >= 0 else 0:i] == current_query[i + 1:i + j + 1]: current_query_result = current_query_result[:-1 * j] break current_query_result += current_query[i] current_query = current_query_result for point_sign in ['?。', '?,', ',。', ',,', '。,', '。。']: current_query = current_query.replace(point_sign, ',') if current_query[0] in [',', '。']: current_query = current_query[1:] return current_query def record_all(*args): ip = args[0] date = '{}-{}-{}'.format(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day) if not os.path.exists('./{}_data_dir/{}_反馈客户数据.xlsx'.format(ip, date)): fp = open('./{}_data_dir/{}_反馈客户数据.xlsx'.format(ip, date), 'w') if len(args) == 5: fp.write('质检结果\t客服ID\t电话开始时间\t外拨坐席\n') else: fp.write('质检结果\t客服ID\t电话开始时间\t通话时长\t外拨职场\t外拨小组\t外拨坐席\n') else: fp = open('./{}_data_dir/{}_反馈客户数据.xlsx'.format(ip, date), 'a') if len(args) == 5: ip, sign, customer_id, call_date, name = args fp.write('{}\t{}\t{}\t{}\n'.format(sign, customer_id, call_date, name)) else: ip, sign, customer_id, call_date, time, call_place, call_group, name = args fp.write( '{}\t{}\t{}\t{}\t{}\t{}\t{}\n'.format(sign, customer_id, call_date, time, call_place, call_group, name)) fp.close() if __name__ == '__main__': get('2021-09-06', '222')