1. 程式人生 > >ROS 教程3 機器人語音 語音識別理解合成控制 ASR NLU TTS

ROS 教程3 機器人語音 語音識別理解合成控制 ASR NLU TTS

機器人語音 語音識別理解合成控制 ASR NLU TTS

github

一、語音處理總體框架

    1. 語音識別(ASR , Automatic Speech Recognition )
    2. 語義理解(NLU , Natural Language Understanding)
    e. 語音合成(TTS , Text To Speech)

1. 語音識別 **ASR**:支援的包:
    國外:CMU SPhinx  ——>  pocketsphinx
    國內:科大迅飛等。。

2. 語義理解 NLU:  圖靈

3. 語音合成 TTS:
    國外:Festival    ——> sound_play   是 ros-indigo-audio-common 的一部分
    國內:科大迅飛等。。。

二、國外庫

1、語音識別 pocketsphinx

 1) 安裝
  sudo apt-get install gstreamer0.10-pocketsphinx   # 原生系統
  sudo apt-get install ros-indigo-pocketsphinx      # ros介面支援
  sudo apt-get install ros-indigo-audio-common      # 包含了sound_play TTS
  sudo apt-get install libasound2                   # 語音驅動
  sudo apt-get install gstreamer0.10-gconf          # GStreamer元件
  
 2) 測試
   pocketsphinx包 包含了 一個 節點  recognizer.py
   獲取硬體 語音的輸入流,在已有的語音庫裡  匹配語音相應的單詞  併發布到 /recognizer/output 話題   
   適合robotcup的一個語音庫測試:
   roslaunch pocketsphinx robocup.launch
   說話測試
   顯示 話題訊息:
     rostopic echo /recognizer/output
   檢視語音庫:
   roscd pocketsphinx/demo
   more robocup.corpus      顯示
   只有說了語音庫內的語音才能得到較滿意的結果
  # robocup.launch     
 <launch>
  <node name="recognizer" pkg="pocketsphinx" type="recognizer.py" output="screen"> # 識別器
    <param name="lm" value="$(find pocketsphinx)/demo/robocup.lm"/>                # 語言模型 線上工具根據語言庫生成
    <param name="dict" value="$(find pocketsphinx)/demo/robocup.dic"
/> # 語言詞典 </node> </launch>
# recognizer.py 檔案
import roslib; roslib.load_manifest('pocketsphinx')
import rospy

import pygtk
pygtk.require('2.0')
import gtk

import gobject
import pygst
pygst.require('0.10')
gobject.threads_init()
import gst

from std_msgs.msg import String
from std_srvs.srv import *
import os
import commands

class recognizer(object):
    """ GStreamer based speech recognizer. """

    def __init__(self):
        # Start node
        rospy.init_node("recognizer")

        self._device_name_param = "~mic_name"  # Find the name of your microphone by typing pacmd list-sources in the terminal
        self._lm_param = "~lm"
        self._dic_param = "~dict"

        # Configure mics with gstreamer launch config
        if rospy.has_param(self._device_name_param):
            self.device_name = rospy.get_param(self._device_name_param)
            self.device_index = self.pulse_index_from_name(self.device_name)
            self.launch_config = "pulsesrc device=" + str(self.device_index)
            rospy.loginfo("Using: pulsesrc device=%s name=%s", self.device_index, self.device_name)
        elif rospy.has_param('~source'):
            # common sources: 'alsasrc'
            self.launch_config = rospy.get_param('~source')
        else:
            self.launch_config = 'gconfaudiosrc'

        rospy.loginfo("Launch config: %s", self.launch_config)

        self.launch_config += " ! audioconvert ! audioresample " \
                            + '! vader name=vad auto-threshold=true ' \
                            + '! pocketsphinx name=asr ! fakesink'

        # Configure ROS settings
        self.started = False
        rospy.on_shutdown(self.shutdown)
        self.pub = rospy.Publisher('~output', String)
        rospy.Service("~start", Empty, self.start)
        rospy.Service("~stop", Empty, self.stop)

        if rospy.has_param(self._lm_param) and rospy.has_param(self._dic_param):
            self.start_recognizer()
        else:
            rospy.logwarn("lm and dic parameters need to be set to start recognizer.")

    def start_recognizer(self):
        rospy.loginfo("Starting recognizer... ")

        self.pipeline = gst.parse_launch(self.launch_config)
        self.asr = self.pipeline.get_by_name('asr')
        self.asr.connect('partial_result', self.asr_partial_result)
        self.asr.connect('result', self.asr_result)
        self.asr.set_property('configured', True)
        self.asr.set_property('dsratio', 1)

        # Configure language model
        if rospy.has_param(self._lm_param):
            lm = rospy.get_param(self._lm_param)
        else:
            rospy.logerr('Recognizer not started. Please specify a language model file.')
            return

        if rospy.has_param(self._dic_param):
            dic = rospy.get_param(self._dic_param)
        else:
            rospy.logerr('Recognizer not started. Please specify a dictionary.')
            return

        self.asr.set_property('lm', lm)
        self.asr.set_property('dict', dic)

        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus_id = self.bus.connect('message::application', self.application_message)
        self.pipeline.set_state(gst.STATE_PLAYING)
        self.started = True

    def pulse_index_from_name(self, name):
        output = commands.getstatusoutput("pacmd list-sources | grep -B 1 'name: <" + name + ">' | grep -o -P '(?<=index: )[0-9]*'")

        if len(output) == 2:
            return output[1]
        else:
            raise Exception("Error. pulse index doesn't exist for name: " + name)

    def stop_recognizer(self):
        if self.started:
            self.pipeline.set_state(gst.STATE_NULL)
            self.pipeline.remove(self.asr)
            self.bus.disconnect(self.bus_id)
            self.started = False

    def shutdown(self):
        """ Delete any remaining parameters so they don't affect next launch """
        for param in [self._device_name_param, self._lm_param, self._dic_param]:
            if rospy.has_param(param):
                rospy.delete_param(param)

        """ Shutdown the GTK thread. """
        gtk.main_quit()

    def start(self, req):
        self.start_recognizer()
        rospy.loginfo("recognizer started")
        return EmptyResponse()

    def stop(self, req):
        self.stop_recognizer()
        rospy.loginfo("recognizer stopped")
        return EmptyResponse()

    def asr_partial_result(self, asr, text, uttid):
        """ Forward partial result signals on the bus to the main thread. """
        struct = gst.Structure('partial_result')
        struct.set_value('hyp', text)
        struct.set_value('uttid', uttid)
        asr.post_message(gst.message_new_application(asr, struct))

    def asr_result(self, asr, text, uttid):
        """ Forward result signals on the bus to the main thread. """
        struct = gst.Structure('result')
        struct.set_value('hyp', text)
        struct.set_value('uttid', uttid)
        asr.post_message(gst.message_new_application(asr, struct))

    def application_message(self, bus, msg):
        """ Receive application messages from the bus. """
        msgtype = msg.structure.get_name()
        if msgtype == 'partial_result':
            self.partial_result(msg.structure['hyp'], msg.structure['uttid'])
        if msgtype == 'result':
            self.final_result(msg.structure['hyp'], msg.structure['uttid'])

    def partial_result(self, hyp, uttid):
        """ Delete any previous selection, insert text and select it. """
        rospy.logdebug("Partial: " + hyp)

    def final_result(self, hyp, uttid):
        """ Insert the final result. """
        msg = String()
        msg.data = str(hyp.lower())
        rospy.loginfo(msg.data)
        self.pub.publish(msg)

if __name__ == "__main__":
    start = recognizer()
    gtk.main()
#!/usr/bin/python
# -*- coding:utf-8 -*-
### 修改後的 檔案
import roslib
roslib.load_manifest('pocketsphinx')
import rospy
import pygtk # Python輕鬆建立具有圖形使用者介面的程式  播放音樂等
pygtk.require('2.0')
import gtk   #  GNU Image Manipulation Program (GIMP)   Toolkit

import gobject # 亦稱Glib物件系統,是一個程式庫,它可以幫助我們使用C語言編寫面向物件程式
import pygst   # 與 pygtk 相關
pygst.require('0.10')
gobject.threads_init()# 初始化
import gst

from std_msgs.msg import String
from std_srvs.srv import *
import os
import commands

class recognizer(object):
    """GStreamer是一個多媒體框架,它可以允許你輕易地建立、編輯與播放多媒體檔案"""
    # 初始化系統配置
    def __init__(self):
        # 建立節點
        rospy.init_node("recognizer")
        # 全域性引數
        self._device_name_param = "~mic_name"  # 麥克風
        self._lm_param = "~lm"                 # 語言模型 language model  
        self._dic_param = "~dict"              # 語言字典
        self._hmm_param = "~hmm"               # 識別網路  hiden markov model 隱馬爾可夫模型 分中英文模型
        
        
        # 用 gstreamer launch config 配置 麥克風  一些啟動資訊
        if rospy.has_param(self._device_name_param):# 按照指定的麥克風
            self.device_name = rospy.get_param(self._device_name_param)# 麥克風名字
            self.device_index = self.pulse_index_from_name(self.device_name)# 麥克風編號 ID
            self.launch_config = "pulsesrc device=" + str(self.device_index)# 啟動資訊
            rospy.loginfo("Using: pulsesrc device=%s name=%s", self.device_index, self.device_name)
        elif rospy.has_param('~source'):
            # common sources: 'alsasrc'
            self.launch_config = rospy.get_param('~source')
        else:
            self.launch_config = 'gconfaudiosrc'

        rospy.loginfo("麥克風配置: %s", self.launch_config) # "Launch config: %s",self.launch_config

        self.launch_config += " ! audioconvert ! audioresample " \
                            + '! vader name=vad auto-threshold=true ' \
                            + '! pocketsphinx name=asr ! fakesink'

        # 配置ros系統設定
        self.started = False
        rospy.on_shutdown(self.shutdown)# 自主關閉
        self.pub = rospy.Publisher('~output', String)# 釋出 ~output 引數指定的 話題 型別 tring  似乎缺少 指定釋出佇列大小 tring
        
        rospy.Service("~start", Empty, self.start)   # 開始服務
        rospy.Service("~stop", Empty, self.stop)     # 結束服務
        # 檢查模型和字典配置
        if rospy.has_param(self._lm_param) and rospy.has_param(self._dic_param):
            self.start_recognizer()
        else:
            rospy.logwarn("啟動語音識別器必須指定語言模型lm,以及語言字典dic.")
            # rospy.logwarn("lm and dic parameters need to be set to start recognizer.")
                    
    def start_recognizer(self):
        rospy.loginfo("開始語音識別... ")
        # rospy.loginfo("Starting recognizer... ")
        
        self.pipeline = gst.parse_launch(self.launch_config)# 解析 麥克風配置 
        self.asr = self.pipeline.get_by_name('asr')         # 自動語音識別 模型
        self.asr.connect('partial_result', self.asr_partial_result)# 後面的函式
        self.asr.connect('result', self.asr_result)
        #self.asr.set_property('configured', True) # 需要開啟配置  hmm模型
        self.asr.set_property('dsratio', 1)

        # 配置語言模型
        if rospy.has_param(self._lm_param):
            lm = rospy.get_param(self._lm_param)
        else:
            rospy.logerr('請配置一個語言模型 lm.')
            return

        if rospy.has_param(self._dic_param):
            dic = rospy.get_param(self._dic_param)
        else:
            rospy.logerr('請配置一個語言字典 dic.')
            return
        
        if rospy.has_param(self._hmm_param):
            hmm = rospy.get_param(self._hmm_param)
        else:
            rospy.logerr('請配置一個語言識別模型 hmm.')
            return


        self.asr.set_property('lm', lm)   # 設定asr的語言模型
        self.asr.set_property('dict', dic)# 設定asr的字典
        self.asr.set_property('hmm', hmm) # 設定asr的識別模型
        

        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus_id = self.bus.connect('message::application', self.application_message)
        self.pipeline.set_state(gst.STATE_PLAYING)
        self.started = True

    # 解析 麥克風名稱 得到 麥克風ID
    def pulse_index_from_name(self, name):
        output = commands.getstatusoutput("pacmd list-sources | grep -B 1 'name: <" + name + ">' | grep -o -P '(?<=index: )[0-9]*'")

        if len(output) == 2:
            return output[1]
        else:
            raise Exception("Error. pulse index doesn't exist for name: " + name)
        
    # 停止識別器
    def stop_recognizer(self):
        if self.started:
            self.pipeline.set_state(gst.STATE_NULL)
            self.pipeline.remove(self.asr)
            self.bus.disconnect(self.bus_id)
            self.started = False
    # 程式關閉
    def shutdown(self):
        """ 刪除所有的引數,以防影響下次啟動"""
        for param in [self._device_name_param, self._lm_param, self._dic_param]:
            if rospy.has_param(param):
                rospy.delete_param(param)

        """ 關閉 GTK 程序. """
        gtk.main_quit()
    # 開始
    def start(self, req):
        self.start_recognizer()
        rospy.loginfo("識別器啟動")
        return EmptyResponse()
    # 停止
    def stop(self, req):
        self.stop_recognizer()
        rospy.loginfo("識別器停止")
        return EmptyResponse()
    
    def asr_partial_result(self, asr, text, uttid):
        """前線部分結果到主執行緒. """
        struct = gst.Structure('partial_result')
        struct.set_value('hyp', text)
        struct.set_value('uttid', uttid)
        asr.post_message(gst.message_new_application(asr, struct))

    def asr_result(self, asr, text, uttid):
        """ 前線結果到主執行緒 """
        struct = gst.Structure('result')
        struct.set_value('hyp', text)
        struct.set_value('uttid', uttid)
        asr.post_message(gst.message_new_application(asr, struct))

    def application_message(self, bus, msg):
        """ 從總線上接收應用資料. """
        msgtype = msg.structure.get_name()
        if msgtype == 'partial_result':
            self.partial_result(msg.structure['hyp'], msg.structure['uttid'])
        if msgtype == 'result':
            self.final_result(msg.structure['hyp'], msg.structure['uttid'])
    # 部分結果
    def partial_result(self, hyp, uttid):
        """ Delete any previous selection, insert text and select it. """
        rospy.logdebug("Partial: " + hyp)
        
    # 最終結果
    def final_result(self, hyp, uttid):
        """ Insert the final result. """
        msg = String()# 話題訊息型別
        msg.data = str(hyp)# 識別語音對於成的文字
        rospy.loginfo(msg.data)
        self.pub.publish(msg)

if __name__ == "__main__":
    start = recognizer()
    gtk.main()
 3) 建立新的語音單詞
  a) 建立語音單詞語句檔案  一行一句 的 txt檔案
   例如:
    roscd rbx1_speech/config
    more nav_commands.txt
pause speech
continue speech
move forward
move backward
move back
move left
move right
...
## 中文 
voice_ctr.txt
前進
後退
左轉
右轉
向左轉
向右轉
停止
加速
減速
   b) 編譯生成語音庫
    通過線上的一個 語言模型(lm)生成
    http://www.speech.cs.cmu.edu/tools/lmtool-new.html
    上傳語言檔案  Upload a sentence corpus file: Browse
    線上編譯     COMPILE KNOWLEDGE BASE
    下載         編譯好的檔案

使用 .dic 字典檔案 音節/音素 字典檔案
     .lm  語言默默檔案 出現的概率

注意中文 .dic檔案是空的
需要自己生成

使用別人生成好的比較全的.dic檔案查詢自己定義的單詞的 音節
例如:
cd ewenwan/catkin_ws/src/voice_system/model/lm/zh/zh_CN/
grep 停止 mandarin_notone.dic
>>>
停止	t ing zh ib
停止聽寫	t ing zh ib t ing x ie
停止錄音	t ing zh ib l u y in
停止注水	t ing zh ib zh u sh ui
呼吸停止	h u x i t ing zh ib
自動停止	z if d ong t ing zh ib
# 編寫 詞典檔案
# voice_ctr.dic
前進	t ing zh ib
後退	h ou t ui
左轉	z uo zh uan
右轉	y uo zh uan
向左轉	x iang z uo zh uan
向右轉	x iang y uo zh uan
停止	t ing zh ib
加速	j ia s u
減速	j ian s u
   c) 編寫自己的launch啟動檔案

voice_nav_commands.launch

<launch>
 <node name="recognizer" pkg="pocketsphinx" type="recognizer.py" output="screen"> #識別器
  <param name="lm" value="$(find rbx1_