ROS 教程3 機器人語音 語音識別理解合成控制 ASR NLU TTS
阿新 • • 發佈:2018-12-08
機器人語音 語音識別理解合成控制 ASR NLU TTS
一、語音處理總體框架
1. 語音識別(ASR , Automatic Speech Recognition ) 2. 語義理解(NLU , Natural Language Understanding) e. 語音合成(TTS , Text To Speech) 1. 語音識別 **ASR**:支援的包: 國外:CMU SPhinx ——> pocketsphinx 國內:科大迅飛等。。 2. 語義理解 NLU: 圖靈 3. 語音合成 TTS: 國外:Festival ——> sound_play 是 ros-indigo-audio-common 的一部分 國內:科大迅飛等。。。
二、國外庫
1、語音識別 pocketsphinx
1) 安裝 sudo apt-get install gstreamer0.10-pocketsphinx # 原生系統 sudo apt-get install ros-indigo-pocketsphinx # ros介面支援 sudo apt-get install ros-indigo-audio-common # 包含了sound_play TTS sudo apt-get install libasound2 # 語音驅動 sudo apt-get install gstreamer0.10-gconf # GStreamer元件 2) 測試 pocketsphinx包 包含了 一個 節點 recognizer.py 獲取硬體 語音的輸入流,在已有的語音庫裡 匹配語音相應的單詞 併發布到 /recognizer/output 話題 適合robotcup的一個語音庫測試: roslaunch pocketsphinx robocup.launch 說話測試 顯示 話題訊息: rostopic echo /recognizer/output 檢視語音庫: roscd pocketsphinx/demo more robocup.corpus 顯示 只有說了語音庫內的語音才能得到較滿意的結果
# robocup.launch
<launch>
<node name="recognizer" pkg="pocketsphinx" type="recognizer.py" output="screen"> # 識別器
<param name="lm" value="$(find pocketsphinx)/demo/robocup.lm"/> # 語言模型 線上工具根據語言庫生成
<param name="dict" value="$(find pocketsphinx)/demo/robocup.dic" /> # 語言詞典
</node>
</launch>
# recognizer.py 檔案
import roslib; roslib.load_manifest('pocketsphinx')
import rospy
import pygtk
pygtk.require('2.0')
import gtk
import gobject
import pygst
pygst.require('0.10')
gobject.threads_init()
import gst
from std_msgs.msg import String
from std_srvs.srv import *
import os
import commands
class recognizer(object):
""" GStreamer based speech recognizer. """
def __init__(self):
# Start node
rospy.init_node("recognizer")
self._device_name_param = "~mic_name" # Find the name of your microphone by typing pacmd list-sources in the terminal
self._lm_param = "~lm"
self._dic_param = "~dict"
# Configure mics with gstreamer launch config
if rospy.has_param(self._device_name_param):
self.device_name = rospy.get_param(self._device_name_param)
self.device_index = self.pulse_index_from_name(self.device_name)
self.launch_config = "pulsesrc device=" + str(self.device_index)
rospy.loginfo("Using: pulsesrc device=%s name=%s", self.device_index, self.device_name)
elif rospy.has_param('~source'):
# common sources: 'alsasrc'
self.launch_config = rospy.get_param('~source')
else:
self.launch_config = 'gconfaudiosrc'
rospy.loginfo("Launch config: %s", self.launch_config)
self.launch_config += " ! audioconvert ! audioresample " \
+ '! vader name=vad auto-threshold=true ' \
+ '! pocketsphinx name=asr ! fakesink'
# Configure ROS settings
self.started = False
rospy.on_shutdown(self.shutdown)
self.pub = rospy.Publisher('~output', String)
rospy.Service("~start", Empty, self.start)
rospy.Service("~stop", Empty, self.stop)
if rospy.has_param(self._lm_param) and rospy.has_param(self._dic_param):
self.start_recognizer()
else:
rospy.logwarn("lm and dic parameters need to be set to start recognizer.")
def start_recognizer(self):
rospy.loginfo("Starting recognizer... ")
self.pipeline = gst.parse_launch(self.launch_config)
self.asr = self.pipeline.get_by_name('asr')
self.asr.connect('partial_result', self.asr_partial_result)
self.asr.connect('result', self.asr_result)
self.asr.set_property('configured', True)
self.asr.set_property('dsratio', 1)
# Configure language model
if rospy.has_param(self._lm_param):
lm = rospy.get_param(self._lm_param)
else:
rospy.logerr('Recognizer not started. Please specify a language model file.')
return
if rospy.has_param(self._dic_param):
dic = rospy.get_param(self._dic_param)
else:
rospy.logerr('Recognizer not started. Please specify a dictionary.')
return
self.asr.set_property('lm', lm)
self.asr.set_property('dict', dic)
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus_id = self.bus.connect('message::application', self.application_message)
self.pipeline.set_state(gst.STATE_PLAYING)
self.started = True
def pulse_index_from_name(self, name):
output = commands.getstatusoutput("pacmd list-sources | grep -B 1 'name: <" + name + ">' | grep -o -P '(?<=index: )[0-9]*'")
if len(output) == 2:
return output[1]
else:
raise Exception("Error. pulse index doesn't exist for name: " + name)
def stop_recognizer(self):
if self.started:
self.pipeline.set_state(gst.STATE_NULL)
self.pipeline.remove(self.asr)
self.bus.disconnect(self.bus_id)
self.started = False
def shutdown(self):
""" Delete any remaining parameters so they don't affect next launch """
for param in [self._device_name_param, self._lm_param, self._dic_param]:
if rospy.has_param(param):
rospy.delete_param(param)
""" Shutdown the GTK thread. """
gtk.main_quit()
def start(self, req):
self.start_recognizer()
rospy.loginfo("recognizer started")
return EmptyResponse()
def stop(self, req):
self.stop_recognizer()
rospy.loginfo("recognizer stopped")
return EmptyResponse()
def asr_partial_result(self, asr, text, uttid):
""" Forward partial result signals on the bus to the main thread. """
struct = gst.Structure('partial_result')
struct.set_value('hyp', text)
struct.set_value('uttid', uttid)
asr.post_message(gst.message_new_application(asr, struct))
def asr_result(self, asr, text, uttid):
""" Forward result signals on the bus to the main thread. """
struct = gst.Structure('result')
struct.set_value('hyp', text)
struct.set_value('uttid', uttid)
asr.post_message(gst.message_new_application(asr, struct))
def application_message(self, bus, msg):
""" Receive application messages from the bus. """
msgtype = msg.structure.get_name()
if msgtype == 'partial_result':
self.partial_result(msg.structure['hyp'], msg.structure['uttid'])
if msgtype == 'result':
self.final_result(msg.structure['hyp'], msg.structure['uttid'])
def partial_result(self, hyp, uttid):
""" Delete any previous selection, insert text and select it. """
rospy.logdebug("Partial: " + hyp)
def final_result(self, hyp, uttid):
""" Insert the final result. """
msg = String()
msg.data = str(hyp.lower())
rospy.loginfo(msg.data)
self.pub.publish(msg)
if __name__ == "__main__":
start = recognizer()
gtk.main()
#!/usr/bin/python
# -*- coding:utf-8 -*-
### 修改後的 檔案
import roslib
roslib.load_manifest('pocketsphinx')
import rospy
import pygtk # Python輕鬆建立具有圖形使用者介面的程式 播放音樂等
pygtk.require('2.0')
import gtk # GNU Image Manipulation Program (GIMP) Toolkit
import gobject # 亦稱Glib物件系統,是一個程式庫,它可以幫助我們使用C語言編寫面向物件程式
import pygst # 與 pygtk 相關
pygst.require('0.10')
gobject.threads_init()# 初始化
import gst
from std_msgs.msg import String
from std_srvs.srv import *
import os
import commands
class recognizer(object):
"""GStreamer是一個多媒體框架,它可以允許你輕易地建立、編輯與播放多媒體檔案"""
# 初始化系統配置
def __init__(self):
# 建立節點
rospy.init_node("recognizer")
# 全域性引數
self._device_name_param = "~mic_name" # 麥克風
self._lm_param = "~lm" # 語言模型 language model
self._dic_param = "~dict" # 語言字典
self._hmm_param = "~hmm" # 識別網路 hiden markov model 隱馬爾可夫模型 分中英文模型
# 用 gstreamer launch config 配置 麥克風 一些啟動資訊
if rospy.has_param(self._device_name_param):# 按照指定的麥克風
self.device_name = rospy.get_param(self._device_name_param)# 麥克風名字
self.device_index = self.pulse_index_from_name(self.device_name)# 麥克風編號 ID
self.launch_config = "pulsesrc device=" + str(self.device_index)# 啟動資訊
rospy.loginfo("Using: pulsesrc device=%s name=%s", self.device_index, self.device_name)
elif rospy.has_param('~source'):
# common sources: 'alsasrc'
self.launch_config = rospy.get_param('~source')
else:
self.launch_config = 'gconfaudiosrc'
rospy.loginfo("麥克風配置: %s", self.launch_config) # "Launch config: %s",self.launch_config
self.launch_config += " ! audioconvert ! audioresample " \
+ '! vader name=vad auto-threshold=true ' \
+ '! pocketsphinx name=asr ! fakesink'
# 配置ros系統設定
self.started = False
rospy.on_shutdown(self.shutdown)# 自主關閉
self.pub = rospy.Publisher('~output', String)# 釋出 ~output 引數指定的 話題 型別 tring 似乎缺少 指定釋出佇列大小 tring
rospy.Service("~start", Empty, self.start) # 開始服務
rospy.Service("~stop", Empty, self.stop) # 結束服務
# 檢查模型和字典配置
if rospy.has_param(self._lm_param) and rospy.has_param(self._dic_param):
self.start_recognizer()
else:
rospy.logwarn("啟動語音識別器必須指定語言模型lm,以及語言字典dic.")
# rospy.logwarn("lm and dic parameters need to be set to start recognizer.")
def start_recognizer(self):
rospy.loginfo("開始語音識別... ")
# rospy.loginfo("Starting recognizer... ")
self.pipeline = gst.parse_launch(self.launch_config)# 解析 麥克風配置
self.asr = self.pipeline.get_by_name('asr') # 自動語音識別 模型
self.asr.connect('partial_result', self.asr_partial_result)# 後面的函式
self.asr.connect('result', self.asr_result)
#self.asr.set_property('configured', True) # 需要開啟配置 hmm模型
self.asr.set_property('dsratio', 1)
# 配置語言模型
if rospy.has_param(self._lm_param):
lm = rospy.get_param(self._lm_param)
else:
rospy.logerr('請配置一個語言模型 lm.')
return
if rospy.has_param(self._dic_param):
dic = rospy.get_param(self._dic_param)
else:
rospy.logerr('請配置一個語言字典 dic.')
return
if rospy.has_param(self._hmm_param):
hmm = rospy.get_param(self._hmm_param)
else:
rospy.logerr('請配置一個語言識別模型 hmm.')
return
self.asr.set_property('lm', lm) # 設定asr的語言模型
self.asr.set_property('dict', dic)# 設定asr的字典
self.asr.set_property('hmm', hmm) # 設定asr的識別模型
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus_id = self.bus.connect('message::application', self.application_message)
self.pipeline.set_state(gst.STATE_PLAYING)
self.started = True
# 解析 麥克風名稱 得到 麥克風ID
def pulse_index_from_name(self, name):
output = commands.getstatusoutput("pacmd list-sources | grep -B 1 'name: <" + name + ">' | grep -o -P '(?<=index: )[0-9]*'")
if len(output) == 2:
return output[1]
else:
raise Exception("Error. pulse index doesn't exist for name: " + name)
# 停止識別器
def stop_recognizer(self):
if self.started:
self.pipeline.set_state(gst.STATE_NULL)
self.pipeline.remove(self.asr)
self.bus.disconnect(self.bus_id)
self.started = False
# 程式關閉
def shutdown(self):
""" 刪除所有的引數,以防影響下次啟動"""
for param in [self._device_name_param, self._lm_param, self._dic_param]:
if rospy.has_param(param):
rospy.delete_param(param)
""" 關閉 GTK 程序. """
gtk.main_quit()
# 開始
def start(self, req):
self.start_recognizer()
rospy.loginfo("識別器啟動")
return EmptyResponse()
# 停止
def stop(self, req):
self.stop_recognizer()
rospy.loginfo("識別器停止")
return EmptyResponse()
def asr_partial_result(self, asr, text, uttid):
"""前線部分結果到主執行緒. """
struct = gst.Structure('partial_result')
struct.set_value('hyp', text)
struct.set_value('uttid', uttid)
asr.post_message(gst.message_new_application(asr, struct))
def asr_result(self, asr, text, uttid):
""" 前線結果到主執行緒 """
struct = gst.Structure('result')
struct.set_value('hyp', text)
struct.set_value('uttid', uttid)
asr.post_message(gst.message_new_application(asr, struct))
def application_message(self, bus, msg):
""" 從總線上接收應用資料. """
msgtype = msg.structure.get_name()
if msgtype == 'partial_result':
self.partial_result(msg.structure['hyp'], msg.structure['uttid'])
if msgtype == 'result':
self.final_result(msg.structure['hyp'], msg.structure['uttid'])
# 部分結果
def partial_result(self, hyp, uttid):
""" Delete any previous selection, insert text and select it. """
rospy.logdebug("Partial: " + hyp)
# 最終結果
def final_result(self, hyp, uttid):
""" Insert the final result. """
msg = String()# 話題訊息型別
msg.data = str(hyp)# 識別語音對於成的文字
rospy.loginfo(msg.data)
self.pub.publish(msg)
if __name__ == "__main__":
start = recognizer()
gtk.main()
3) 建立新的語音單詞
a) 建立語音單詞語句檔案 一行一句 的 txt檔案
例如:
roscd rbx1_speech/config
more nav_commands.txt
pause speech
continue speech
move forward
move backward
move back
move left
move right
...
## 中文
voice_ctr.txt
前進
後退
左轉
右轉
向左轉
向右轉
停止
加速
減速
b) 編譯生成語音庫
通過線上的一個 語言模型(lm)生成
http://www.speech.cs.cmu.edu/tools/lmtool-new.html
上傳語言檔案 Upload a sentence corpus file: Browse
線上編譯 COMPILE KNOWLEDGE BASE
下載 編譯好的檔案
使用 .dic 字典檔案 音節/音素 字典檔案
.lm 語言默默檔案 出現的概率
注意中文 .dic檔案是空的
需要自己生成
使用別人生成好的比較全的.dic檔案查詢自己定義的單詞的 音節
例如:
cd ewenwan/catkin_ws/src/voice_system/model/lm/zh/zh_CN/
grep 停止 mandarin_notone.dic
>>>
停止 t ing zh ib
停止聽寫 t ing zh ib t ing x ie
停止錄音 t ing zh ib l u y in
停止注水 t ing zh ib zh u sh ui
呼吸停止 h u x i t ing zh ib
自動停止 z if d ong t ing zh ib
# 編寫 詞典檔案
# voice_ctr.dic
前進 t ing zh ib
後退 h ou t ui
左轉 z uo zh uan
右轉 y uo zh uan
向左轉 x iang z uo zh uan
向右轉 x iang y uo zh uan
停止 t ing zh ib
加速 j ia s u
減速 j ian s u
c) 編寫自己的launch啟動檔案
voice_nav_commands.launch
<launch>
<node name="recognizer" pkg="pocketsphinx" type="recognizer.py" output="screen"> #識別器
<param name="lm" value="$(find rbx1_