99人参与 • 2024-08-02 • 语音识别
小爱同学、 siri 、豆包的电话功能、微信的”文字语音转文字”,百度网盘的ai字幕,小度智能音箱、小米su7等智能汽车的语音控制的功能都用到自动语音识别asr,相当于ai字幕师和ai朗读师的一种人工智能技术。
语音识别技术拆分下来,主要可分为“输入——编码——解码——输出 ”4个流程。
首先声音是一段段波形来表示的音频。
给音频进行信号处理后,便要按帧(毫秒级)拆分,并对拆分出的小段波形按照人耳特征变成多维向量信息
将这些帧信息识别成状态(可以理解为中间过程,一种比音素还要小的过程)
再将状态组合形成音素(通常3个状态=1个音素)
最后将音素组成字词(dà jiā hǎo)并串连成句 。于是,这就可以实现由语音转换成文字了。
语音特征提取是从原始语音信号中提取有助于语音处理和识别任务的关键信息的过程。以下是一些常用的语音特征提取方法:
梅尔频率倒谱系数 (mfccs):
梅尔频谱能量特征 (mfbs):
傅里叶变换 (ft):
短时傅里叶变换 (stft):
倒谱变换:
过零率 (zcr):
谱能量特征:
色谱图 (chromagram):
基频 (f0):
语谱图 (cepstrum):
动态时间规整 (dtw):
深度特征:
频谱对比度特征:
共振峰 (formants):
这些特征可以通过不同的算法和工具提取,如librosa
库在python中就提供了许多用于语音分析和特征提取的函数。在自动语音识别系统中,这些特征被用来训练模型,以识别和理解不同的语音指令或话语。
下面是一个使用python和librosa
库来提取mfccs特征的简单示例。这个示例将演示如何加载一个音频文件,计算其stft,然后使用stft结果来计算mfccs。
首先,确保已经安装了librosa
和soundfile
库。如果没有安装,可以使用以下命令安装:
pip install librosa soundfile
以下是提取mfccs的python代码示例:
import librosa
import soundfile as sf
import numpy as np
import matplotlib.pyplot as plt
# 加载音频文件
audio_path = 'your_audio_file.wav' # 替换为你的音频文件路径
y, sr = librosa.load(audio_path, sr=none) # y是音频波形,sr是采样率
# 计算stft
n_fft = 2048 # fft的大小
hop_length = 512 # 帧移
stft = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)
# 将stft的幅度转换为分贝单位
amplitude = librosa.amplitude_to_db(np.abs(stft))
# 计算mfccs
n_mfcc = 13 # 想要的mfcc数量
mfcc = librosa.feature.mfcc(y=amplitude, sr=sr, n_mfcc=n_mfcc, hop_length=hop_length)
# 可视化mfccs
plt.figure(figsize=(10, 4))
librosa.display.waveplot(y, sr=sr)
plt.title('audio waveform')
plt.figure(figsize=(10, 4))
librosa.display.specshow(mfcc, x_axis='time')
plt.colorbar()
plt.title('mfcc')
plt.show()
在这个示例中,我们首先使用librosa.load
加载音频文件,然后使用librosa.stft
计算短时傅里叶变换。接下来,我们使用librosa.amplitude_to_db
将stft的幅度转换为分贝单位,这是计算mfccs的常见做法。最后,我们使用librosa.feature.mfcc
函数计算mfccs,并使用matplotlib
库来可视化原始波形和mfccs。请注意,这个示例只演示了如何计算和可视化mfccs。其他特征的计算方法可能会有所不同,但librosa
库同样提供了相应的函数来计算这些特征。
speech在自动语音识别(asr — automatic speech recognition)的上下文中,它指的是人们说话的声音,该声音被asr系统捕捉并转换成文本。此外,“speech” 也可以指代关于语音的各种技术、研究领域或相关服务。比如可以基于微软云平台和大数据云计算,从事语音识别声学模型、语言模型的训练和迭代优化方面的开发,与语音识别模型建模相关的自动化工具和系统的开发,以及语音识别前沿技术研究实验类工作。
自动语音识别(asr)模型的训练是一个复杂的过程,涉及多个步骤。以下是一个简化的流程,展示如何使用python和一些流行的库来训练一个基本的asr模型:
数据收集与预处理:
特征提取:
librosa
等库提取语音特征,如mfcc。数据集划分:
模型设计:
模型训练:
模型评估:
模型优化:
模型部署:
以下是使用python和tensorflow/keras库训练asr模型的一个示例流程:
import tensorflow as tf
from tensorflow.keras import layers, models
import librosa
from sklearn.model_selection import train_test_split
# 假设x是特征集,y是对应的文本标签
# 特征和标签
x = ... # 特征集,如mfcc
y = ... # 对应的文本标签
# 划分数据集
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)
# 构建模型
model = models.sequential()
model.add(layers.inputlayer(input_shape=(none, x.shape[2]))) # 假设x是3维的
model.add(layers.lstm(128, return_sequences=true))
model.add(layers.lstm(128))
model.add(layers.dense(y.shape[1], activation='softmax')) # 输出层
# 编译模型
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# 训练模型
model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=10)
# 评估模型
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=2)
print(f'test accuracy: {test_acc}')
这个示例展示了如何使用keras构建一个简单的lstm模型,并在数据集上进行训练和评估。在实际应用中,你需要根据具体的数据集和任务需求设计更复杂的模型,并进行细致的调参。
请注意,这只是一个示例流程,实际的asr模型训练会更加复杂,可能包括数据增强、端到端训练、注意力机制、解码器等高级技术。此外,还需要对数据进行预处理和特征提取,并且可能需要使用更复杂的网络结构和训练策略。
更好的方法是用成熟的框架实现,如微软azure ai服务中的speech studio。
speech studio是微软azure ai服务中的一个工具,它提供基于用户界面的工具,用于在应用程序中生成和集成azure ai语音服务的功能。
用户可以在speech studio中试用语音转文本和文本转语音的功能,而无需注册或编写任何代码。
speech studio允许用户浏览、试用和查看一些常见用例的示例代码。
# notes:
# - install the speech sdk. run:
# pip install azure-cognitiveservices-speech
from datetime import datetime, time, timezone
from itertools import groupby, pairwise
from os import linesep, remove
from os.path import exists
from pathlib import path
from sys import argv
from time import sleep
from typing import any, list, optional
import wave
import azure.cognitiveservices.speech as speechsdk # type: ignore
import caption_helper
import helper
import user_config_helper
class captioning(object) :
def __init__(self) :
self._user_config = user_config_helper.user_config_from_args(usage)
self._srt_sequence_number = 1
self._previous_caption : optional[caption_helper.caption] = none
self._previous_end_time : optional[time] = none
self._previous_result_is_recognized = false
self._recognized_lines : list[str] = []
self._offline_results : list[speechsdk.speechrecognitionresult] = []
def get_timestamp(self, start : time, end : time) -> str :
time_format = ""
if self._user_config["use_sub_rip_text_caption_format"] :
# srt format requires ',' as decimal separator rather than '.'.
time_format = "%h:%m:%s,%f"
else :
time_format = "%h:%m:%s.%f"
# truncate microseconds to milliseconds.
return "{} --> {}".format(start.strftime(time_format)[:-3], end.strftime(time_format)[:-3])
def string_from_caption(self, caption : caption_helper.caption) -> str :
retval = ""
if self._user_config["use_sub_rip_text_caption_format"] :
retval += str(caption.sequence) + linesep
retval += self.get_timestamp(caption.begin, caption.end) + linesep
retval += caption.text + linesep + linesep
return retval
def adjust_real_time_caption_text(self, text : str, is_recognized_result : bool) -> str :
# split the caption text into multiple lines based on max_line_length and lines.
temp_caption_helper = caption_helper.captionhelper(self._user_config["language"], self._user_config["max_line_length"], self._user_config["lines"], [])
lines = temp_caption_helper.lines_from_text(text)
# recognizing results can change with each new result, so we do not save previous recognizing results.
# recognized results are final, so we save them in a member value.
recognizing_lines : list[str] = []
if is_recognized_result :
self._recognized_lines = self._recognized_lines + lines
else :
recognizing_lines = lines
caption_lines = self._recognized_lines + recognizing_lines
return '\n'.join(caption_lines[-self._user_config["lines"]:])
def caption_from_real_time_result(self, result : speechsdk.speechrecognitionresult, is_recognized_result : bool) -> optional[str] :
retval : optional[str] = none
start_time = helper.time_from_ticks(result.offset)
end_time = helper.time_from_ticks(result.offset + result.duration)
# if the end timestamp for the previous result is later
# than the end timestamp for this result, drop the result.
# this sometimes happens when we receive a lot of recognizing results close together.
if self._previous_end_time is not none and self._previous_end_time > end_time :
pass
else :
# record the end timestamp for this result.
self._previous_end_time = end_time
# convert the speechrecognitionresult to a caption.
# we are not ready to set the text for this caption.
# first we need to determine whether to clear _recognizedlines.
caption = caption_helper.caption(self._user_config["language"], self._srt_sequence_number, helper.add_time_and_timedelta(start_time, self._user_config["delay"]), helper.add_time_and_timedelta(end_time, self._user_config["delay"]), "")
# increment the sequence number.
self._srt_sequence_number += 1
# if we have a previous caption...
if self._previous_caption is not none :
# if the previous result was type recognized...
if self._previous_result_is_recognized :
# set the end timestamp for the previous caption to the earliest of:
# - the end timestamp for the previous caption plus the remain time.
# - the start timestamp for the current caption.
previous_end = helper.add_time_and_timedelta(self._previous_caption.end, self._user_config["remain_time"])
self._previous_caption.end = previous_end if previous_end < caption.begin else caption.begin
# if the gap between the original end timestamp for the previous caption
# and the start timestamp for the current caption is larger than remaintime,
# clear the cached recognized lines.
# note this needs to be done before we call adjustrealtimecaptiontext
# for the current caption, because it uses _recognizedlines.
if previous_end < caption.begin :
self._recognized_lines.clear()
# if the previous result was type recognizing, simply set the start timestamp
# for the current caption to the end timestamp for the previous caption.
# note this presumes there will not be a large gap between recognizing results,
# because such a gap would cause the previous recognizing result to be succeeded
# by a recognized result.
else :
caption.begin = self._previous_caption.end
retval = self.string_from_caption(self._previous_caption)
# break the caption text into lines if needed.
caption.text = self.adjust_real_time_caption_text(result.text, is_recognized_result)
# save the current caption as the previous caption.
self._previous_caption = caption
# save the result type as the previous result type.
self._previous_result_is_recognized = is_recognized_result
return retval
def captions_from_offline_results(self) -> list[caption_helper.caption] :
captions = caption_helper.get_captions(self._user_config["language"], self._user_config["max_line_length"], self._user_config["lines"], list(self._offline_results))
# save the last caption.
last_caption = captions[-1]
last_caption.end = helper.add_time_and_timedelta(last_caption.end, self._user_config["remain_time"])
# in offline mode, all captions come from recognitionresults of type recognized.
# set the end timestamp for each caption to the earliest of:
# - the end timestamp for this caption plus the remain time.
# - the start timestamp for the next caption.
captions_2 : list[caption_helper.caption] = []
for (caption_1, caption_2) in pairwise(captions) :
end = helper.add_time_and_timedelta(caption_1.end, self._user_config["remain_time"])
caption_1.end = end if end < caption_2.begin else caption_2.begin
captions_2.append(caption_1)
# re-add the last caption.
captions_2.append(last_caption)
return captions_2
def finish(self) -> none :
if user_config_helper.captioningmode.offline == self._user_config["captioning_mode"] :
for caption in self.captions_from_offline_results() :
helper.write_to_console_or_file(text=self.string_from_caption(caption), user_config=self._user_config)
elif user_config_helper.captioningmode.realtime == self._user_config["captioning_mode"] :
# show the last "previous" caption, which is actually the last caption.
if self._previous_caption is not none :
self._previous_caption.end = helper.add_time_and_timedelta(self._previous_caption.end, self._user_config["remain_time"])
helper.write_to_console_or_file(text=self.string_from_caption(self._previous_caption), user_config=self._user_config)
def initialize(self) :
if self._user_config["output_file"] is not none and exists(self._user_config["output_file"]) :
remove(self._user_config["output_file"])
if not self._user_config["use_sub_rip_text_caption_format"] :
helper.write_to_console_or_file(text="webvtt{}{}".format(linesep, linesep), user_config=self._user_config)
return
def audio_config_from_user_config(self) -> helper.read_only_dict :
if self._user_config["input_file"] is none :
return helper.read_only_dict({
"audio_config" : speechsdk.audioconfig(use_default_microphone=true),
"audio_stream_format" : none,
"pull_input_audio_stream_callback" : none,
"pull_input_audio_stream" : none
})
else :
audio_stream_format = none
if not self._user_config["use_compressed_audio"] :
reader = wave.open(self._user_config["input_file"], mode=none)
audio_stream_format = speechsdk.audio.audiostreamformat(samples_per_second=reader.getframerate(), bits_per_sample=reader.getsampwidth() * 8, channels=reader.getnchannels())
reader.close()
else :
audio_stream_format = speechsdk.audio.audiostreamformat(compressed_stream_format=self._user_config["compressed_audio_format"])
callback = helper.binaryfilereadercallback(filename=self._user_config["input_file"])
stream = speechsdk.audio.pullaudioinputstream(pull_stream_callback=callback, stream_format=audio_stream_format)
# we return the binaryfilereadercallback, audiostreamformat, and pullaudioinputstream
# because we need to keep them in scope until they are actually used.
return helper.read_only_dict({
"audio_config" : speechsdk.audio.audioconfig(stream=stream),
"audio_stream_format" : audio_stream_format,
"pull_input_audio_stream_callback" : callback,
"pull_input_audio_stream" : stream,
})
def speech_config_from_user_config(self) -> speechsdk.speechconfig :
speech_config = none
speech_config = speechsdk.speechconfig(subscription=self._user_config["subscription_key"], region=self._user_config["region"])
speech_config.set_profanity(self._user_config["profanity_option"])
if self._user_config["stable_partial_result_threshold"] is not none :
speech_config.set_property(property_id=speechsdk.propertyid.speechserviceresponse_stablepartialresultthreshold, value=self._user_config["stable_partial_result_threshold"])
speech_config.set_property(property_id=speechsdk.propertyid.speechserviceresponse_postprocessingoption, value="truetext")
speech_config.speech_recognition_language=self._user_config["language"]
return speech_config
def speech_recognizer_from_user_config(self) -> helper.read_only_dict :
audio_config_data = self.audio_config_from_user_config()
speech_config = self.speech_config_from_user_config()
speech_recognizer = speechsdk.speechrecognizer(speech_config=speech_config, audio_config=audio_config_data["audio_config"])
if len(self._user_config["phrases"]) > 0 :
grammar = speechsdk.phraselistgrammar.from_recognizer(recognizer=speech_recognizer)
for phrase in self._user_config["phrases"] :
grammar.addphrase(phrase)
return helper.read_only_dict({
"speech_recognizer" : speech_recognizer,
"audio_stream_format" : audio_config_data["audio_stream_format"],
"pull_input_audio_stream_callback" : audio_config_data["pull_input_audio_stream_callback"],
"pull_input_audio_stream" : audio_config_data["pull_input_audio_stream"],
})
def recognize_continuous(self, speech_recognizer : speechsdk.speechrecognizer, format : speechsdk.audio.audiostreamformat, callback : helper.binaryfilereadercallback, stream : speechsdk.audio.pullaudioinputstream) :
done = false
def recognizing_handler(e : speechsdk.speechrecognitioneventargs) :
if speechsdk.resultreason.recognizingspeech == e.result.reason and len(e.result.text) > 0 :
# this seems to be the only way we can get information about
# exceptions raised inside an event handler.
try :
caption = self.caption_from_real_time_result(e.result, false)
if caption is not none :
helper.write_to_console_or_file(text=caption, user_config=self._user_config)
except exception as ex :
print('exception in recognizing_handler: {}'.format(ex))
elif speechsdk.resultreason.nomatch == e.result.reason :
helper.write_to_console(text="nomatch: speech could not be recognized.{}".format(linesep), user_config=self._user_config)
def recognized_handler(e : speechsdk.speechrecognitioneventargs) :
if speechsdk.resultreason.recognizedspeech == e.result.reason and len(e.result.text) > 0 :
try :
if user_config_helper.captioningmode.offline == self._user_config["captioning_mode"] :
self._offline_results.append(e.result)
else :
caption = self.caption_from_real_time_result(e.result, true)
if caption is not none :
helper.write_to_console_or_file(text=caption, user_config=self._user_config)
except exception as ex :
print('exception in recognized_handler: {}'.format(ex))
elif speechsdk.resultreason.nomatch == e.result.reason :
helper.write_to_console(text="nomatch: speech could not be recognized.{}".format(linesep), user_config=self._user_config)
def canceled_handler(e : speechsdk.speechrecognitioncanceledeventargs) :
nonlocal done
# notes:
# speechrecognitioncanceledeventargs inherits the result property from speechrecognitioneventargs. see:
# https://docs.microsoft.com/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechrecognitioncanceledeventargs
# https://docs.microsoft.com/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechrecognitioneventargs
# result is type speechrecognitionresult, which inherits the reason property from recognitionresult. see:
# https://docs.microsoft.com/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechrecognitionresult
# https://docs.microsoft.com/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.recognitionresult
# e.result.reason is resultreason.canceled. to get the cancellation reason, see e.cancellation_details.reason.
if speechsdk.cancellationreason.endofstream == e.cancellation_details.reason :
helper.write_to_console(text="end of stream reached.{}".format(linesep), user_config=self._user_config)
done = true
elif speechsdk.cancellationreason.cancelledbyuser == e.cancellation_details.reason :
helper.write_to_console(text="user canceled request.{}".format(linesep), user_config=self._user_config)
done = true
elif speechsdk.cancellationreason.error == e.cancellation_details.reason :
# error output should not be suppressed, even if suppress output flag is set.
print("encountered error. cancellation details: {}{}".format(e.cancellation_details, linesep))
done = true
else :
print("request was cancelled for an unrecognized reason. cancellation details: {}{}".format(e.cancellation_details, linesep))
done = true
def stopped_handler(e : speechsdk.sessioneventargs) :
nonlocal done
helper.write_to_console(text="session stopped.{}".format(linesep), user_config=self._user_config)
done = true
# we only use recognizing results in real-time mode.
if user_config_helper.captioningmode.realtime == self._user_config["captioning_mode"] :
speech_recognizer.recognizing.connect(recognizing_handler)
speech_recognizer.recognized.connect(recognized_handler)
speech_recognizer.session_stopped.connect(stopped_handler)
speech_recognizer.canceled.connect(canceled_handler)
speech_recognizer.start_continuous_recognition()
while not done :
sleep(5)
speech_recognizer.stop_continuous_recognition()
return
if user_config_helper.cmd_option_exists("--help") :
print(usage)
else :
captioning = captioning()
captioning.initialize()
speech_recognizer_data = captioning.speech_recognizer_from_user_config()
captioning.recognize_continuous(speech_recognizer=speech_recognizer_data["speech_recognizer"], format=speech_recognizer_data["audio_stream_format"], callback=speech_recognizer_data["pull_input_audio_stream_callback"], stream=speech_recognizer_data["pull_input_audio_stream"])
captioning.finish()
speech studio功能:
speech studio的无代码方法:
发音评估工具:
语音合成技术的历史:
语音识别技术的历史:
开源语音识别项目:
语音情感识别:
企业云原生成熟高效的流水线解决方案是azure ppl流水线在微软azure云端实现openppl。
openppl是一个高性能、多后端、自主可控的云上推理解决方案,支持openmmlab的高效部署。它是一个基于自研高性能算子库的推理引擎,适用于ai模型的多后端部署,特别是在云原生环境下。openppl旨在让人工智能应用高效可靠地运行在现有的计算平台上,为云端场景提供人工智能推理服务。
这可能指的是azure devops中的持续集成/持续部署(ci/cd)流水线,它用于自动化软件交付过程。自动化流水线可以确保快速、频繁的部署和更高的产品质量,减少人为错误并支持快速的发布节奏。
azure pipelines 是微软 azure devops 服务中的一个持续集成和持续部署(ci/cd)工具,它允许用户自动化构建、测试和部署软件的流程。openppl 是一个高性能的深度学习推理引擎,专注于在各种计算平台上部署 ai 模型,特别是在云端环境中。
要在云端实现 openppl 与 azure pipelines 的集成,你可以考虑以下步骤:
在微软azure云端使用azure devops的azure pipelines(简称azure ci/cd或azure ppl流水线)实现openppl的步骤可以概括如下:
在微软azure云端利用azure devops的azure pipelines实现openppl的步骤整合如下:
创建azure devops项目:
配置源代码管理:
创建azure pipeline:
定义构建和测试步骤:
部署到云服务:
集成openmmlab模型:
利用azure ai服务:
监控和日志记录:
安全性和合规性:
自动化部署流程:
文档和维护:
反馈和迭代:
自动化和调度:
安全性和合规性:
通过以上步骤,可以在微软azure云平台上实现openppl的自动化构建、测试和部署,利用azure devops提供的ci/cd能力来提高开发效率和系统的可靠性。从而为云端提供高效的 ai 推理服务。具体的实现细节可能会根据你的具体需求和环境配置有所不同。
请注意,这个过程需要一定的云计算和devops知识,以及对 azure 平台和 openppl 引擎的理解。如果你需要更详细的操作指导,建议查阅 azure devops 和 openppl 的官方文档,或者咨询相关领域的专家。
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论