# -*- coding : utf-8 -*- import os import tqdm import json import time import glob import uuid import wave import logging import librosa import operator import requests import webrtcvad import websocket import traceback import contextlib import collections import numpy as np from scipy.io import wavfile from typing import List from pydub import AudioSegment from utillib import PlatformBase from openpyxl import Workbook, load_workbook wav_dir = './wav_dir' # ASR_URL = 'ws://8.142.222.140/asr/v0.7' ASR_URL = 'ws://172.26.215.90/asr/v0.7' INTENT_URL = 'http://8.142.69.133:8677/predict?type=222&sessionId=-1&query={}' class Chat_record(object): def __init__(self, content, start_time, end_time, role, result_id): self.content = content self.start_time = start_time self.end_time = end_time self.role = role self.result_id = result_id self.webhook = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=25ea61a4-35b8-4ca3-9e8a-527136a9a367' self.webhook_token = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cfc9a7ec-fae4-4802-9844-caae493e5ab4' def add_legal(self, legal_type, detail): self.legal = False self.legal_type = legal_type self.detail = detail class LxAsrError(Exception): pass class LxAsrClient: StartTranscription = "StartTranscription" StopTranscription = "StopTranscription" TranscriptionStarted = "TranscriptionStarted" TranscriptionResultChanged = "TranscriptionResultChanged" SentenceEnd = "SentenceEnd" TranscriptionCompleted = "TranscriptionCompleted" def __init__(self, ws_addr: str): self._ws_addr = ws_addr def send_pcm(self, pcm: bytes, sample_rate: int = 8000) -> List[str]: assert sample_rate in (8000, 16000) sentences: List[str] = [] payload = { "sample_rate": sample_rate, "enable_punctuation": True, "punctuation_threshold": 300, } task_id = str(uuid.uuid1()) start_msg = { "header": { "event_name": self.StartTranscription, "task_id": task_id, }, "payload": payload, } stop_msg = { "header": { "event_name": self.StopTranscription, } } ws = websocket.create_connection(self._ws_addr, suppress_origin=True) ws.send(json.dumps(start_msg)) resp = ws.recv() resp_dict = json.loads(resp) if resp_dict["header"]["event_name"] != self.TranscriptionStarted: raise LxAsrError(resp_dict["header"]["status_message"]) ws.send(pcm, opcode=websocket.ABNF.OPCODE_BINARY) ws.send(json.dumps(stop_msg)) while True: resp = ws.recv() resp_dict = json.loads(resp) event_name = resp_dict["header"]["event_name"] if event_name == self.SentenceEnd: sentences.append(resp_dict["payload"]["result"]) elif event_name == self.TranscriptionCompleted: break elif resp_dict["header"]["status"] != 200: raise LxAsrError(resp_dict["header"]["status_message"]) try: ws.close() except Exception as e: logging.error("asr ws close error: %s" % str(e)) return sentences class SelfAsr(): def __init__(self): pass def asr_send(self, pcm: bytes, sample_rate: int = 8000) -> List[str]: client = LxAsrClient(ASR_URL) return client.send_pcm(pcm, sample_rate) def read_wave(self, path): """Reads a .wav file. Takes the path, and returns (PCM audio data, sample rate). """ with contextlib.closing(wave.open(path, 'rb')) as wf: num_channels = wf.getnchannels() assert num_channels == 1 sample_width = wf.getsampwidth() assert sample_width == 2 sample_rate = wf.getframerate() assert sample_rate in (8000, 16000, 32000, 48000) pcm_data = wf.readframes(wf.getnframes()) return pcm_data, sample_rate class Frame(object): """Represents a "frame" of audio data.""" def __init__(self, bytes, timestamp, duration): self.bytes = bytes self.timestamp = timestamp self.duration = duration def frame_generator(self, frame_duration_ms, audio, sample_rate): """Generates audio frames from PCM audio data. Takes the desired frame duration in milliseconds, the PCM data, and the sample rate. Yields Frames of the requested duration. """ n = int(sample_rate * (frame_duration_ms / 1000.0) * 2) offset = 0 timestamp = 0.0 duration = (float(n) / sample_rate) / 2.0 while offset + n < len(audio): yield self.Frame(audio[offset:offset + n], timestamp, duration) timestamp += duration offset += n def vad_collector(self, sample_rate, frame_duration_ms, padding_duration_ms, vad, frames): """Filters out non-voiced audio frames. Given a webrtcvad.Vad and a source of audio frames, yields only the voiced audio. Uses a padded, sliding window algorithm over the audio frames. When more than 90% of the frames in the window are voiced (as reported by the VAD), the collector triggers and begins yielding audio frames. Then the collector waits until 90% of the frames in the window are unvoiced to detrigger. The window is padded at the front and back to provide a small amount of silence or the beginnings/endings of speech around the voiced frames. Arguments: sample_rate - The audio sample rate, in Hz. frame_duration_ms - The frame duration in milliseconds. padding_duration_ms - The amount to pad the window, in milliseconds. vad - An instance of webrtcvad.Vad. frames - a source of audio frames (sequence or generator). Returns: A generator that yields PCM audio data. """ num_padding_frames = int(padding_duration_ms / frame_duration_ms) # We use a deque for our sliding window/ring buffer. ring_buffer = collections.deque(maxlen=num_padding_frames) # We have two states: TRIGGERED and NOTTRIGGERED. We start in the # NOTTRIGGERED state. triggered = False time_list = [] voiced_frames = [] for frame in frames: is_speech = vad.is_speech(frame.bytes, sample_rate) if not triggered: ring_buffer.append((frame, is_speech)) num_voiced = len([f for f, speech in ring_buffer if speech]) # If we're NOTTRIGGERED and more than 90% of the frames in # the ring buffer are voiced frames, then enter the # TRIGGERED state. if num_voiced > 0.9 * ring_buffer.maxlen: triggered = True time_list.append(ring_buffer[0][0].timestamp) # print('starttime', ring_buffer[0][0].timestamp) # We want to yield all the audio we see from now until # we are NOTTRIGGERED, but we have to start with the # audio that's already in the ring buffer. for f, s in ring_buffer: voiced_frames.append(f) ring_buffer.clear() else: # We're in the TRIGGERED state, so collect the audio data # and add it to the ring buffer. voiced_frames.append(frame) ring_buffer.append((frame, is_speech)) num_unvoiced = len([f for f, speech in ring_buffer if not speech]) # If more than 90% of the frames in the ring buffer are # unvoiced, then enter NOTTRIGGERED and yield whatever # audio we've collected. if num_unvoiced > 0.9 * ring_buffer.maxlen: time_list.append(frame.timestamp + frame.duration) # print('endtime =', frame.timestamp + frame.duration) triggered = False yield b''.join([f.bytes for f in voiced_frames]), time_list voiced_frames = [] time_list.clear() if triggered: time_list.append(frame.timestamp + frame.duration) # If we have any leftover voiced audio when we run out of input, # yield it. if voiced_frames: yield b''.join([f.bytes for f in voiced_frames]), time_list def asr_audio(self, wav) -> List[dict]: # print('wav name =', wav) if librosa.load(wav, sr=8000)[0].size < 3: # print(wav, 'no wav data open error') return [] set_vadmode = 3 # 0,1,2,3 wav_result: List[dict] = [] audio, sample_rate = self.read_wave(wav) vad = webrtcvad.Vad(set_vadmode) frames = self.frame_generator(30, audio, sample_rate) frames = list(frames) segments = self.vad_collector(sample_rate, 30, 300, vad, frames) for i, segment in enumerate(segments): part_result = {} part_result['start_time'] = segment[1][0] part_result['end_time'] = segment[1][1] part_result['wav_name'] = wav asr_result: List[str] = self.asr_send(segment[0], sample_rate) if asr_result != []: part_result['result'] = "".join(asr_result) wav_result.append(part_result) return wav_result def download_to_wav(self, url, id, ip): # response = request.urlopen(url) response = requests.get(url) self.file_path = '' if url[-3:] == 'wav': self.file_path = '{}/{}_{}.wav'.format(wav_dir, ip, id) with open(self.file_path, 'wb') as fp: fp.write(response.content) # fp.write(response.read()) fp.flush() elif url[-3:] == 'mp3': self.file_path = '{}/{}_{}.mp3'.format(wav_dir, ip, id) c = 'wget "{}" -c -T 10 -t 10 -O "{}"'.format(url, self.file_path) os.system(c) c = 'ffmpeg -i "{}" -f wav "{}"'.format(self.file_path, self.file_path[:-3] + 'wav') os.system(c) os.remove(self.file_path) self.file_path = '{}/{}_{}.wav'.format(wav_dir, ip, id) # song.export( self.file_path, format='wav' ) elif url.split('?')[0].split('/')[-1][-3:] == 'mp3': self.file_path = '{}/{}_{}.mp3'.format(wav_dir, ip, id) c = 'wget "{}" -c -T 10 -t 10 -O "{}"'.format(url, self.file_path) os.system(c) c = 'ffmpeg -i "{}" -f wav "{}"'.format(self.file_path, self.file_path[:-3] + 'wav') os.system(c) os.remove(self.file_path) self.file_path = self.file_path[:-3] + 'wav' elif url.split('?')[0].split('/')[-1][-4:] == 'flac': self.file_path = '{}/{}_{}.flac'.format(wav_dir, ip, id) self.file_path_human = '' c = 'wget "{}" -c -T 10 -t 10 -O "{}"'.format(url, self.file_path) os.system(c) c = 'ffmpeg -i "{}" -f wav "{}"'.format(self.file_path, self.file_path[:-4] + 'wav') os.system(c) os.remove(self.file_path) self.file_path = self.file_path[:-4] + 'wav' elif url.split('?')[0].split('/')[-1][-3:] == 'wav': self.file_path = '{}/{}_{}.wav'.format(wav_dir, ip, id) with open(self.file_path, 'wb') as fp: fp.write(response.content) fp.flush() def get_human_port_from(self, human_answer_duration): song = AudioSegment.from_wav(self.file_path) try: human_part = song[-1 * (int(human_answer_duration) + 3) * 1000:] human_part.export(self.file_path, format='wav') except Exception: pass def convert_to_record(self, results, role, result_id): records = [] for result in results: # print( result ) if result.get('result', ''): content = result.get('result', '') start_time = result['start_time'] end_time = result['end_time'] record = Chat_record(content, start_time, end_time, role, result_id) records.append(record) return records def sort_records(self, chat_records): cmpfun = operator.attrgetter('start_time') chat_records.sort(key=cmpfun) return chat_records def sample_from_file(self): try: samplerate, data = wavfile.read(self.file_path) except Exception as err: try: c = 'ffmpeg -i "{}" "{}"'.format(self.file_path, self.file_path[:-4] + '_fix.wav') os.system(c) os.remove(self.file_path) self.file_path = self.file_path[:-4] + '_fix.wav' samplerate, data = wavfile.read(self.file_path) except Exception as err: traceback.print_exc() os.remove(self.file_path) return 0, 0, 0 time = data.shape[0] // samplerate return samplerate, data, time def get_content(self, id): # self.file_path = './saleCallRecord_RPA_huadan_2021-12-16_a5062471-bf90-452c-95f4-a05b47986980_D638325469292063042148.wav' samplerate, data, time = self.sample_from_file() if samplerate == 0 and data == 0: return 0, 0, 0, 0 time = data.shape[0] // samplerate left = [] right = [] if len(data.shape) == 2: for item in data: left.append(item[0]) right.append(item[1]) self.file_path_left = self.file_path[:-4] + '_left.wav' self.file_path_right = self.file_path[:-4] + '_right.wav' wavfile.write(self.file_path_left, samplerate, np.array(left)) wavfile.write(self.file_path_right, samplerate, np.array(right)) left_result = self.asr_audio(self.file_path_left) right_result = self.asr_audio(self.file_path_right) left_record = self.convert_to_record(left_result, 'staff', id) right_record = self.convert_to_record(right_result, 'user', id) os.remove(self.file_path) os.remove(self.file_path_left) os.remove(self.file_path_right) right_record.extend(left_record) right_record = self.sort_records(right_record) return right_record, time, 2, 1 else: result = self.asr_audio(self.file_path) record = self.convert_to_record(result, 'staff', id) os.remove(self.file_path) return record, time, 1, 1 def optimize_record(self, chat_record): content_list = list() if chat_record: for c in chat_record: if c.content not in ['', ',', '正在呼叫请稍候,', '用户已挂机', '对对对对对对对,', '对对对对对对,', '对对对对对,', '对对对对,', '多多多多多多多,', '多多多多多多,', '多多多多多,', '多多多多,', '多多多', ]: # c.content = c.content.replace(',', '') content_list.append([c.content, 0 if c.role == 'staff' else 1]) return content_list def transform_from_file(self): wb = Workbook() ws = wb.active ws.append(['ID', '内容', '角色']) file_path = './baixin_audio/*' file_paths = glob.glob(file_path) for file_path in tqdm.tqdm(file_paths): if file_path.endswith('.flac') or file_path.endswith('.mp3'): file_path_new = file_path.split('.') file_path_new[-1] = 'wav' c = 'ffmpeg -i "{}" -f wav "{}"'.format(file_path, '.'.join(file_path_new)) os.system(c) # os.remove(file_path) self.file_path = '.'.join(file_path_new) elif file_path.endswith('.wav'): self.file_path = file_path try: # print(file_path) customer_id = file_path.split('/')[-1].split('.')[0] # print( customer_id ) chat_record, time, channel, sign = self.get_content(customer_id) content_list = self.optimize_record(chat_record) for i, c in enumerate(content_list): content, type = c ws.append([customer_id, content, type]) except Exception as err: pass wb.save('./百信.xlsx') def nlu_result(workspace, query): update_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(round(time.time() * 1000)) / 1000)) response = requests.get( 'http://8.142.85.77:8680/nlu?session_id=-1&workspace={}¤t_query={}'.format(workspace, query)) # 'http://8.142.239.53:8679/nlu?session_id=-1&workspace={}¤t_query={}'.format(workspace, query)) response = json.loads(response.text) if workspace == '210': slot = list() if response.get('intention', ''): if response.get('intention', '')[0].get('value', ''): intent = response.get('intention', '')[0].get('value', '') # print(query, intent, type(intent)) q_type = intent['qtype'] answer = intent['a'] query = intent['original_query'] intent = intent['standard_query'] return [q_type, intent, query, answer, update_time] else: q, i, s = faq_result(workspace, query) return ['默认分类', '{}_{}'.format(i, q), query, 'NOINTENT', update_time] elif response.get('slot', ''): for slots in response['slot']: slot.append(slots['slot_value'][0]) return ['默认类型', ' '.join(slot), query, '', update_time] else: if response.get('intention', '') and response['intention'][0].get('value', ''): intent = response.get('intention', '')[0].get('value', '') # print( query, intent, type(intent) ) q_type = intent['qtype'] answer = intent['a'] query = intent['original_query'] intent = intent['standard_query'] return [q_type, intent, query, answer, update_time] else: q, i, s = faq_result(workspace, query) return ['默认分类', '{}_{}'.format(i, q), query, 'NOINTENT', update_time] def faq_result(workspace, query): response = requests.get( 'http://8.142.85.77:8456/level_search?systemId={}&query={}'.format(workspace, query)) # 'http://8.142.239.53:8455/level_search?systemId={}&query={}'.format(workspace, query)) response = json.loads(response.text) if response['ch']: query = response['ch'][0]['original_query'] intent = response['ch'][0]['standard_query'] semantic = response['ch'][0]['semantic'] return query, intent, semantic elif response['h']: query = response['h'][0]['original_query'] intent = response['h'][0]['standard_query'] semantic = response['h'][0]['semantic'] return query, intent, semantic return '', '', '' if __name__ == '__main__': # result = asr_audio('./56cedc5d-9480-457a-aed9-ba680029c4d5_left.wav') # print( result ) self_asr = SelfAsr() self_asr.transform_from_file() '''# wb = load_workbook('./美团.xlsx') wb = load_workbook('../../2022-5-26_金条质检纯人.xlsx') ws = wb[wb.sheetnames[0]] customer_dict = dict() for i, row in enumerate(ws.values): if i != 0 and row[0]: customer_id = row[0] if customer_id not in customer_dict: customer_dict[customer_id] = list() if row[1].startswith('http'): customer_dict[customer_id].append([row[1], '0']) else: # customer_dict[customer_id].append([row[1], row[2]]) customer_dict[customer_id].append([row[1][2:], row[1][0]]) wb_w = Workbook() ws_w = wb_w.active ws_w.append(['客户ID', '是否合格', '违规话术']) customer_finnal_dict = dict() for key in tqdm.tqdm(customer_dict): role = 1 for content in customer_dict[key]: if '工号' in content[0] or '京东金融' in content[0]: role = int(content[1]) break if key not in customer_finnal_dict: customer_finnal_dict[key] = list() for content in customer_dict[key]: ws_w.append([key, content[0], 1 if role == int(content[1]) else 0]) customer_finnal_dict[key].append([key, content[0], 1 if role == int(content[1]) else 0]) ws_w.append([]) # wb_w.save('./美团_new.xlsx') wb_w.save('./2022-5-26_纯人_金条质检纯人.xlsx') wb = load_workbook('./美团.xlsx') ws = wb[wb.sheetnames[0]] wb_w = Workbook() ws_w = wb_w.active ws_w.append(['客户ID', '内容', '角色', '识别意图', '命中语料']) for i, row in tqdm.tqdm(enumerate(ws.values)): # print(type(row[2]), row[2]==0) if row[1] and row[2] == 1: # print( type(row[2]), row[2] ) result = nlu_result('329', row[1]) ws_w.append([row[0], row[1], '用户', result[1], result[2]]) elif row[1] and row[2] == 0: ws_w.append([row[0], row[1], '坐席', '', '']) else: ws_w.append([]) wb_w.save('./美团_intent.xlsx')''' '''file_path = './菁卡话单-质检量 (2).xlsx' wb = load_workbook(file_path) ws = wb[wb.sheetnames[0]] customer_reason = dict() for i, row in enumerate(ws.values): if i != 0: customer_id = row[2] reason = row[14] result = row[15] customer_reason[customer_id] = [reason, result] file_path = '../../2022-6-7_菁卡质检.xlsx' wb = load_workbook(file_path) ws = wb[wb.sheetnames[0]] wb_w = Workbook() ws_w = wb_w.active customer_dict = dict() for i, row in enumerate(ws.values): if i == 0: ws_w.append(row) elif row[0]: customer_id = row[0] if customer_id not in customer_dict: customer_dict[customer_id] = 1 ws_w.append([row[0], row[1], customer_reason[customer_id][0], customer_reason[customer_id][1]]) else: ws_w.append([row[0], row[1]]) wb_w.save('./2022-6-7_菁卡质检.xlsx')'''