import re import os import sys import time import json import glob import tqdm import redis import jieba import socket import base64 import pickle import joblib import pymysql import pandas import hashlib import requests import datetime import threadpool import collections import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy.io import wavfile from pypinyin import pinyin, lazy_pinyin, Style from openpyxl import load_workbook, Workbook from rediscluster import StrictRedisCluster pool = threadpool.ThreadPool(50) def task_shuidi(): connection = pymysql.connect(host="39.103.215.119", port=3308, user="ds_user", passwd="Moxi123#", # db="data_center_temp", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = connection.cursor() sql = """ select result.dm_version, result.case_id, result.dm_session_id, result.customer_id, result.company_id, result.voice_path, result.call_start_time, result.answer_time, result.transfer_time, result.call_end_time, result.human_answer_duration from data_center_temp.outbound_call_result result JOIN (select customer_id, human_answer_duration from ds_temp.baoxian_monitor_renren_train_new) bx ON (result.customer_id = bx.customer_id and result.dm_version not like '%2call%' and bx.human_answer_duration is not null) """ sql_nlu_result = 'select distinct dm_session_id from ds_temp.baoxian_human_dialogue_nlu_result;' sql_insert = 'insert into ds_temp.baoxian_human_dialogue_nlu_result ' \ '(case_id, customer_id, dm_session_id, user_label, seat_label, all_label)' \ 'values ({}, "{}", "{}", "{}", "{}", "{}");' sql_update = 'update ds_temp.baoxian_human_dialogue_nlu_result ' \ 'set user_label="{}", seat_label="{}", all_label="{}" ' \ 'where dm_session_id="AX{}"; ' '''connection_1 = pymysql.connect(host="47.92.193.147", port=3306, user="root", passwd="Moxi123#", db="task_dialogue_config", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) cursor_1 = connection_1.cursor() sql_1 = 'select distinct call_id ' \ 'from human_dialogue ' \ 'where company_id = {} ' \ 'and call_start_time > "{} 09:00:00" ' \ 'and call_start_time < "{} 23:59:59"; ' cursor_1.execute(sql_1.format('2130', '2022-05-01', '2022-08-01')) results = cursor_1.fetchall() session_id_list = list() for result in results: if result['call_id'] not in session_id_list: session_id_list.append(result['call_id']) print( len(results) )''' cursor.execute(sql_nlu_result) results = cursor.fetchall() session_id_nlu_list = list() for result in tqdm.tqdm(results): session_id = result['dm_session_id'] if 'AX' in session_id: session_id_nlu_list.append(session_id[2:]) '''num = 0 cursor.execute(sql) results = cursor.fetchall() args_transfer = list() args_dm_transfer = list() for result in tqdm.tqdm(results): case_id = result['case_id'] company_id = result['company_id'] customer_id = result['customer_id'] session_id = result['dm_session_id'] workspace_id = '341' voice_path = result['voice_path'] call_start_time = result['call_start_time'] answer_time = result['answer_time'] transfer_time = result['transfer_time'] call_end_time = result['call_end_time'] talk_time = result['human_answer_duration'] if session_id not in session_id_list:# and session_id not in ['1-2130-0-1542064796703875072-276', ]: args_transfer.append(([session_id, voice_path, workspace_id, company_id, '', '', call_start_time, answer_time, answer_time, transfer_time, call_end_time, talk_time, '{}_实转'.format('水滴')], None)) if session_id not in session_id_nlu_list: args_dm_transfer.append( ([session_id, case_id, customer_id], None) ) # print(session_id) print( len(args_transfer) ) print( len(args_dm_transfer) )''' '''requests = threadpool.makeRequests(platform_task, args_transfer) [pool.putRequest(req) for req in requests] pool.wait() requests = threadpool.makeRequests(dm_task, args_dm_transfer) [pool.putRequest(req) for req in requests] pool.wait()''' '''for args in tqdm.tqdm(args_dm_transfer): case_id = args[0][1] session_id = args[0][0] customer_id = args[0][2]''' for session_id in tqdm.tqdm(session_id_nlu_list): user_labels, seat_labels = get_labels("AX" + session_id) all_labels = ','.join([user_labels, seat_labels]) # cursor.execute(sql_insert.format(case_id, customer_id, 'AX'+session_id, user_labels, seat_labels, all_labels)) cursor.execute(sql_update.format(user_labels, seat_labels, all_labels, session_id)) connection.commit() connection.close() def get_labels(dm_session_id): user_label = [] all_label = [] url = 'https://sale.xi-ai.com/dataCenter/dm/detail?sessionId={}'.format(dm_session_id) response = requests.get(url=url) response = json.loads(response.text) result = response["data"] res = [] for d in result: if d["speakerType"] == "SEAT" and d["contentLabel"]: all = [label for label in d["contentLabel"] if label[:2] in ["坐席", "用户"]] all_label += all if d["speakerType"] == "USER": try: tmp = json.loads(d["idlResultJson"]) tmp = tmp["standard_query"] if tmp not in [None, "", "NOINTENT", "肯定态度", "否定态度", "无态度"]: user_label.append(tmp) except: pass # print( dm_session_id ) # print( user_label, all_label ) return ",".join(user_label), ",".join(all_label) def platform_task(call_id, voice_path, workspace_id, company_id, session_id, agent_id, call_start_time, call_end_time, answer_start_time, transfer_time, answer_end_time, talk_time, IP): url = 'http://localhost:9503/quality_test?&customerId={}&wavPath={}&workspaceId={}&sessionId={}&' \ 'companyId={}&agentId={}&createTime={}&endTime={}&answerTime={}&transferTime={}&answerEndTime={}&talkTime={}&IP={}'.format( call_id, voice_path, workspace_id, session_id, company_id, agent_id, call_start_time, call_end_time, answer_start_time, transfer_time, answer_end_time, talk_time, IP ) response = requests.get(url) def task_heiniu(min_date, max_date): connection = pymysql.connect(host="39.103.215.119", port=3308, user="zhangjian", passwd="Lingxi@123", # db="data_center_temp", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = connection.cursor() sql = ''' SELECT customer_id from ods_outbound_sale_platform.ods_breakpoint_data WHERE company_id = 2170 and date(breakpoint_time)>= '2022-06-01' GROUP by customer_id ''' sql_call = ''' select customer_id, company_id, case_id, dm_session_id, voice_path, human_answer_duration, call_start_time, answer_time from data_center_temp.outbound_call_result where call_start_time > "2022-06-27 09:00:00" and call_start_time < "2022-08-01 23:59:00" and call_status = "normalConnection" and human_answer_duration is not null and company_id = 2170; ''' deal_parameter = list() not_deal_parameter = list() customer_id_list = list() cursor.execute(sql) results = cursor.fetchall() for res in tqdm.tqdm(results): customer_id = res['customer_id'] customer_id_list.append(customer_id) print( len(customer_id_list) ) total_duration = 0 cursor.execute(sql_call) results_call = cursor.fetchall() for res_call in tqdm.tqdm(results_call): if res_call['customer_id'] not in customer_id_list: # print( res_call['customer_id'], res_call['company_id'], res_call['case_id'], res_call['voice_path'], # res_call['human_answer_duration'], res_call['call_start_time'] ) total_duration += res_call['human_answer_duration'] not_deal_parameter.append(([res_call['voice_path'], res_call['human_answer_duration'], res_call['case_id'], res_call['company_id'], res_call['customer_id'], res_call['dm_session_id'], res_call['call_start_time'], res_call['answer_time'], '黑牛未成交'], None)) else: # print(res_call['customer_id'], res_call['company_id'], res_call['case_id'], res_call['voice_path'], # res_call['human_answer_duration'], res_call['call_start_time']) deal_parameter.append(([res_call['voice_path'], res_call['human_answer_duration'], res_call['case_id'], res_call['company_id'], res_call['customer_id'], res_call['dm_session_id'], res_call['call_start_time'], res_call['answer_time'], '黑牛成交'], None)) # cursor.execute(sql_insert.format(res_call['case_id'], res_call['customer_id'])) print( total_duration ) print( len(not_deal_parameter) ) requests = threadpool.makeRequests(task, deal_parameter) [pool.putRequest(req) for req in requests] pool.wait() requests = threadpool.makeRequests(task, not_deal_parameter) [pool.putRequest(req) for req in requests] pool.wait() def dm_task(call_id, case_id, customer_id,): print(call_id) url = 'http://172.26.4.253:9900/realTimeLabel?callId={}&workspaceIdUser={}&workspaceIdSeat={}'.format(call_id, '248', '247') response = requests.get(url, timeout=500) def task(): wb_w = Workbook() ws_w = wb_w.active ws_w.append(['customer_id', 'session_id', 'company_id', '文本', '意图']) connection = pymysql.connect(host="39.103.215.119", port=3308, user="ds_user", passwd="Moxi123#", # db="data_center_temp", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = connection.cursor() sql_human = 'select ocr.customer_id, ocr.company_id, ocr.case_id, ocr.dm_session_id, ocr.agent_id, ocr.voice_path, ' \ 'ocr.call_start_time, ocr.answer_time, ocr.transfer_time, ocr.call_end_time, ' \ 'ocr.robot_answer_duration, ocr.human_answer_duration ' \ 'from data_center_temp.outbound_call_result ocr ' \ 'where ocr.call_start_time > "2022-08-09 08:00:00" ' \ 'and ocr.call_start_time < "2022-08-09 09:30:00" ' \ 'and ocr.company_id in (2186, 2187) ' \ 'and ocr.call_status = "normalConnection"; ' cursor.execute(sql_human) results = cursor.fetchall() '''url = 'http://8.142.85.77:8630/report/getDetailedRecord/?sessionId={}' for result in tqdm.tqdm(results): response = requests.get(url.format(result['dm_session_id'])) res = json.loads(response.text) for content in res['result']: if content['speakerType'] == 'IVR' and not content['msgContent']: for content in res['result']: if content['speakerType'] != 'IVR': con = nlu_result('222', content['msgContent']) if '_' not in con[1]: ws_w.append([result['customer_id'], result['dm_session_id'], result['company_id'], content['msgContent'], con[1]]) break wb_w.save('./2022-08-09.xlsx')''' sql = 'select ocr.id, ocr.phone_number ' \ 'from data_center_temp.outbound_call_case ocr ' \ 'where ocr.customer_id = "{}";' wb = load_workbook('./2022-08-09.xlsx') ws = wb[wb.sheetnames[0]] wb_w = Workbook() ws_w = wb_w.active ws_w.append(['customer_id', 'sessionn_id', 'company_id', '文本', '意图']) for i, row in tqdm.tqdm(enumerate(ws.values)): if i != 0 and (row[2] == 2187): session_id = row[1] customer_id = row[0] cursor.execute(sql.format(customer_id)) results = cursor.fetchall() ws_w.append([results[0]['id'], results[0]['phone_number'], customer_id, row[1], row[2], row[3]]) wb_w.save('./2022-08-09_2187.xlsx') def send_task(wav_path, duration, customer_id, company_id, call_id, ip,): url = 'http://localhost:9506/human_company?callDuration={}&' \ 'wavPath={}&customerId={}&companyId={}&callId={}&IP={}'.format(duration, wav_path, customer_id, company_id, call_id, ip) response = requests.get(url) def nlu_result(workspace, query): update_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(round(time.time() * 1000)) / 1000)) response = requests.get( 'http://8.142.85.77:8679/nlu?session_id=-1&workspace={}¤t_query={}'.format(workspace, query)) # 'http://172.26.2.56:8679/nlu?session_id=-1&workspace={}¤t_query={}'.format(workspace, query)) response = json.loads(response.text) if workspace == '210': slot = list() if response.get('intention', ''): if response.get('intention', '')[0].get('value', ''): intent = response.get('intention', '')[0].get('value', '') # print(query, intent, type(intent)) q_type = intent['qtype'] answer = intent['a'] query = intent['original_query'] intent = intent['standard_query'] return [q_type, intent, query, answer, update_time] else: q, i, s = faq_result(workspace, query) return ['默认分类', '{}_{}'.format(i, q), query, 'NOINTENT', update_time] elif response.get('slot', ''): for slots in response['slot']: slot.append(slots['slot_value'][0]) return ['默认类型', ' '.join(slot), query, '', update_time] else: if response.get('intention', '') and response['intention'][0].get('value', ''): intent = response.get('intention', '')[0].get('value', '') # print( query, intent, type(intent) ) q_type = intent['qtype'] answer = intent['a'] query = intent['original_query'] intent = intent['standard_query'] return [q_type, intent, query, answer, update_time] else: q, i, s = faq_result(workspace, query) return ['默认分类', '{}_{}'.format(i, q), query, 'NOINTENT', update_time] def faq_result(workspace, query): response = requests.get( 'http://8.142.85.77:8455/level_search?systemId={}&query={}'.format(workspace, query)) # 'http://172.26.2.56:8455/level_search?systemId={}&query={}'.format(workspace, query)) response = json.loads(response.text) if response['ch']: query = response['ch'][0]['original_query'] intent = response['ch'][0]['standard_query'] semantic = response['ch'][0]['semantic'] return query, intent, semantic elif response['h']: query = response['h'][0]['original_query'] intent = response['h'][0]['standard_query'] semantic = response['h'][0]['semantic'] return query, intent, semantic return '', '', '' def linshi(): connection = pymysql.connect(host="am-8vbwn20384jdq3vq185480.zhangbei.ads.aliyuncs.com", port=3306, user="ds_user", passwd="Moxi123#", # db="data_center_temp", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = connection.cursor() sql = 'select dm_session_id, customer_id ' \ 'from data_center_temp.outbound_call_result ' \ 'where call_start_time > "2022-09-24 08:00:00" ' \ 'and call_start_time < "2022-09-25 23:59:59" ' \ 'and company_id = 2201 ' \ 'and call_status in ("normalConnection", "transFail");' cursor.execute(sql) results = cursor.fetchall() print( len(results) ) wb = Workbook() ws = wb.active ws.append(['customer_id', 'session_id', '角色', '内容', '命中标签']) for re in tqdm.tqdm(results): customer_id = re['customer_id'] session_id = re['dm_session_id'] response = requests.get('http://8.142.85.77:8630/report/getDetailedRecord/?sessionId={}'.format(session_id)) response = json.loads(response.text) response = response['result'] for respon in response: speakerType = respon['speakerType'] msgContent = respon['msgContent'] contentLabel = ' '.join(respon['contentLabel']) if respon['contentLabel'] else '' if speakerType == 'USER': try: idlResultJson = json.loads( respon['idlResultJson'] ) except Exception as err: continue query = idlResultJson['query'] intent = idlResultJson['intention'] if intent and intent[0].get('value', ''): for slot in idlResultJson['slot']: if slot['slotValue'][0] == '明确同意': standard_query = intent[0].get('value', '')['standard_query'] ws.append([customer_id, session_id, query, standard_query, ]) # ws.append([customer_id, session_id, speakerType, msgContent, contentLabel]) # ws.append([]) wb.save('./2199_周末明确同意与意图标签共存.xlsx') def linshi_task(customer_id, ws): connection = pymysql.connect(host="39.103.215.119", port=3308, user="ds_user", passwd="Moxi123#", # db="data_center_temp", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = connection.cursor() sql = 'select dm_session_id ' \ 'from data_center_temp.outbound_call_result ' \ 'where customer_id = {} ' \ 'and call_start_time > "2022-08-15 08:00:00" ' \ 'and dm_version = "赠险激活-0802" ' \ 'and call_status = "normalConnection";' cursor.execute(sql.format(customer_id)) res = cursor.fetchall() for re in res: session_id = re['dm_session_id'] response = requests.get('http://8.142.85.77:8630/report/getDetailedRecord/?sessionId={}'.format(session_id)) response = json.loads(response.text) response = response['result'] for respon in response: speakerType = respon['speakerType'] msgContent = respon['msgContent'] contentLabel = ' '.join(respon['contentLabel']) if respon['contentLabel'] else '' ws.append([customer_id, session_id, speakerType, msgContent, contentLabel]) ws.append([]) def check_session_id(): REDIS_NODES = [{'host': 'dm-8vbhbqm1mjard4hw4epd.redis.zhangbei.rds.aliyuncs.com', 'port': 6379}, ] REDIS_PWD = 'Lingxird12345!@#' redisconn = StrictRedisCluster(startup_nodes=REDIS_NODES, password=REDIS_PWD) session_str = redisconn.get('dialogueSession:348:1-2144-343676701-1581650637767151616-171') session_history = json.loads(session_str.decode("utf-8")) print( session_history ) batch_id = '' for session in session_history['dialogueHistory']['dialogueItemList'][0]['dstOutput']: for key in session: print(key, session[key]) if session.get('slotValue', '') and isinstance(session.get('slotValue', ''), dict): if session.get('slotValue', '').get('batchId', ''): batch_id = session.get('slotValue', '').get('batchId', '') # print( batch_id ) # print( ) if batch_id: print( 'batch_id:{}'.format(batch_id) ) else: print( 'batch_id:空' ) print(len(session_history['dialogueHistory']['dialogueItemList'][0]['dstOutput'])) # print( session_history['dialogueHistory']['dialogueItemList'][0]['dstOutput'] ) values = session_history.get('slotPool', {}).get('slotVOMap', {}).get('share_userinfo', {}).get('values', []) print( '性别:', values[0].get('sex', '')) def get_pinyin(): content = '王李张陈刘杨黄吴周赵徐朱林孙胡马郑郭高何罗谢梁许唐宋曾邓蔡曹叶冯彭肖韩袁蒋潘沈余董魏丁程苏吕陆田于卢杜钟姚姜汪任夏谭方廖崔金范石邱江邹顾薛秦贾付熊尹孟韦侯雷钱严戴陶邵白段史汤赖闫洪毛贺万黎孔葛欧龚郝龙施武温倪庄颜章殷常覃向易莫康伍邢樊俞聂翟齐文傅乔牛耿兰代庞关安翁柳詹柯梅季祝鲁左甘申岳阮焦华骆游盛宁么户扈闷呢你却雀确啊哎埃凹摆采彩仓苍草曾柴昌池迟茌揣歹的狄迪敌翟迭牒娥俄讹额恩盖还好郝蒋卡佧懒朗勒区上若洒睢他它她谈谭潭檀央仰耶冶野乙以蚁倚逊雅演金' last_name_list = list() last_name_dict = dict() last_name_set_dict = dict() for last_name in content: last_name_list.append(last_name) result = lazy_pinyin(last_name, style=Style.TONE3, neutral_tone_with_five=True) # print( last_name, result ) last_name_dict[last_name] = result[0] if result[0] not in last_name_set_dict: last_name_set_dict[result[0]] = set() last_name_set_dict[result[0]].add(last_name) # for key in last_name_set_dict: # print( key, last_name_set_dict[key] ) print( '|'.join(last_name_list) ) print( last_name_dict ) print( last_name_set_dict ) from lxml import etree def get_position(): position_dict = dict() url = 'http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/2021/' headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'max-age=0', 'Connection': 'keep-alive', 'Cookie': 'SF_cookie_1=37059734', 'Host': 'www.stats.gov.cn', 'If-Modified-Since': 'Thu, 05 Jul 2018 01:28:42 GMT', 'If-None-Match': '"1331-5703677ae2680-gzip"', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36' } response = requests.get(url, headers=headers) response.encoding = 'utf-8' html = etree.HTML(response.text) tables = html.xpath('//tr[@class="provincetr"]') sheng_list = list() for table in tables: for td in table.xpath('//td/a'): # print( td.text ) # print( td.xpath('./@href')[0] ) sheng_list.append(td.text.replace('省', '').replace('市', '').replace('壮族自治区', '').replace('回族自治区', '').replace('维吾尔自治区', '').replace('自治区', '')) get_city(url+td.xpath('./@href')[0], position_dict, td.text.replace('省', '').replace('壮族自治区', '').replace('回族自治区', '').replace('维吾尔自治区', '').replace('自治区', '')) for key in position_dict: print( key, position_dict[key] ) print( position_dict ) print( sheng_list ) def get_city(url, position_dict, sheng): headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'max-age=0', 'Connection': 'keep-alive', 'Cookie': 'SF_cookie_1=37059734', 'Host': 'www.stats.gov.cn', 'If-Modified-Since': 'Thu, 05 Jul 2018 01:28:42 GMT', 'If-None-Match': '"1331-5703677ae2680-gzip"', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36' } response = requests.get(url, headers=headers) response.encoding = 'utf-8' # print(response.text) html = etree.HTML(response.text) trs = html.xpath('//tr[@class="citytr"]') for tr in trs[0].xpath('//td/a'): if not tr.text.endswith('0') and tr.text != '市辖区': # print( tr.text ) position_dict[tr.text.replace('地区', '').replace('哈萨克自治州', '').replace('蒙古自治州', '').replace('回族自治州', '').replace('藏族自治州', '').replace('彝族自治州', '').replace('僳族自治州', '').replace('白族自治州', '').replace('傣族自治州', '').replace('苗族自治州', '').replace('土家族自治州', '').replace('蒙古族自治州', '').replace('侗族自治州', '').replace('羌族自治州', '').replace('朝鲜族自治州', '').replace('土家族自治州', '').replace('自治州', '').replace('市', '')] = sheng if __name__ == '__main__': # task_heiniu('2022-06-24', '2022-06-28') # task_shuidi() # linshi() # check_session_id() # get_pinyin() get_position() # get_city('http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/2021/13.html') content = '姓王怎么了' pattern = re.compile('(姓.)') result = pattern.search(content) print( result.group() )