1. 程式人生 > >語音差分編碼(DPCM)的實現與改進——Python實現

語音差分編碼(DPCM)的實現與改進——Python實現

介紹

這是視聽覺訊號處理的第二個實驗——語音差分編碼(DPCM)。總體來講,思路上還是比較簡單的,很容易理解。如果程式設計能力好的話,相信很快就能完成。奈何我太菜了,寫了幾個晚上才算搞定。做了點擴充套件,添加了自己神奇的想法,在這裡記錄一下。先附上程式碼地址:視聽覺訊號處理實驗二

DPCM 原理

DPCM 的原理很簡單,就是利用訊號取樣點之間的關聯性,即每個取樣點與相鄰的取樣點之間的差別很小,因此,就可以利用該特性進行壓縮。總地來說,就是先儲存第一個取樣點的數值,再儲存每個取樣點與前一個取樣點之間的差值,作為壓縮的資料。這樣的話,就可以利用第一個取樣點,加上差值,求出第二個取樣點,然後再加差值…一直持續下去,就可以求出所有采樣點的數值了,也就完成了語音還原。而且,由於沒個取樣點之間相差很小,因此,差值也不會很大,所以就可以利用較少的位元數來儲存壓縮的資料了,這樣也就實現了壓縮。

量化誤差

由於,這裡的差值可能過大,為便於儲存,一般設定一個量化因子,比如,如果量化因子是100的話,差值 400 就可以對映到 4 ,這樣的話壓縮資料可以用更少的位元儲存,但容易出現量化誤差。如果將所有的差值固定在一定的範圍內(比如這次實驗,儲存位元為4,差值範圍就是 -8 到 7 ,再乘上量化因子)。因此,如果差值過大或者過小,超出範圍了,就只能按照邊界值來定。而這樣的話,就會導致量化誤差。

由於計算出來的差值需要被量化,即對映到 -7 到 8。由於量化過程中,可能會因為差值超出可以正確量化的範圍,導致量化值精度不夠,從而可能導致解壓過程中計算出來的值與被壓縮資料不同。這種量化過程中出現的誤差就是量化誤差。

邊壓縮,邊解壓

如果一個數據出現了量化誤差,那麼後面的資料在還原的過程中就會在錯誤的資料上進行還原,這樣的話,會讓之前出現的誤差一直積累下去,影響後面所有的資料還原。那麼該怎麼解決呢,最簡單的方式就是邊壓縮,邊解壓,利用上一個還原的資料再對當前的資料進行壓縮,這樣的話,即使產生量化誤差,也只是影響一個取樣點,而不會影響後續取樣點的還原

Python 實現

這裡並沒有真正地進行解壓,只是將壓縮過程中的解壓陣列儲存起來了。可以通過讀取壓縮檔案,並根據解壓公式來進行解壓。在改進版的實現中寫了這一過程。

# -*- coding: utf-8 -*-
import wave
import os
import numpy as np
 # 壓縮檔案
def compressWaveFile(wave_data) :       
    quantized_num = 100                         # 量化因子
    diff_value = []
    compressed_data = []
    decompressed_data = []
     diff_value = [wave_data[0]]
    compressed_data = [wave_data[0]]
    decompressed_data = [wave_data[0]]
    for index in range(len(wave_data)) :
        if index == 0 :
            continue
        diff_value.append(wave_data[index] - compressed_data[index - 1])
        compressed_data.append(calCompressedData(diff_value[index], quantized_num))
        decompressed_data.append(decompressed_data[index - 1] + compressed_data[index] * quantized_num)
    return compressed_data, decompressed_data
 # 計算 對映
def calCompressedData(diff_value, quantized_num) :
    if diff_value > 7 * quantized_num :
        return 7
    elif diff_value < -8 * quantized_num :
        return -8
    for i in range(16) :
        j = i - 8
        if (j - 1) * quantized_num < diff_value and diff_value <= j * quantized_num :
            return j
 for i in range(10) :
    f = wave.open("./語料/" + str(i + 1) + ".wav","rb")
    # getparams() 一次性返回所有的WAV檔案的格式資訊
    params = f.getparams()
    # nframes 取樣點數目
    nchannels, sampwidth, framerate, nframes = params[:4]
    # readframes() 按照取樣點讀取資料
    str_data = f.readframes(nframes)            # str_data 是二進位制字串
    # 以上可以直接寫成 str_data = f.readframes(f.getnframes())
    # 轉成二位元組陣列形式(每個取樣點佔兩個位元組)
    wave_data = np.fromstring(str_data, dtype = np.short)
    print( "取樣點數目:" + str(len(wave_data)))          #輸出應為取樣點數目
    f.close()
    compressed_data, decompressed_data = compressWaveFile(wave_data)
    # 寫壓縮檔案
    with open("./壓縮檔案/" + str(i + 1) + ".dpc", "wb") as f :
        for num in compressed_data :
            f.write(np.int16(num))
    # 寫還原檔案
    with open("./還原檔案/" + str(i + 1) + ".pcm", "wb") as f :
        for num in decompressed_data :
            f.write(np.int16(num)) 

改進策略

整體演算法是,先對每個取樣點進行取絕對值然後加一的運算,將所有采樣點的值都變換到大於等於1的區間,然後對這個變換後的值取 log, 儲存取完 log 之後的相鄰資料之間的差值。由於這裡的壓縮檔案需要特意儲存一下每個取樣點的符號(使用 1 位元),然後再進行解密。相當於每個取樣點利用了加密檔案的 5 個位元。

加密過程:

  1. 首先,對每個取樣點進行變換,變換到取絕對值加一。

  2. 計算差值,根據公式:
    差值公式
    將所有差值存到一個數組 d[0:n] 中

  3. 計算對映,將計算所得到的差值進行量化,即將差值對映到 -8 到 7 這個區間(壓縮成4位元,便於儲存)。將量化資料儲存起來,壓縮到檔案中時,需要使用該資訊。

  4. 然後,儲存差值,且為了避免誤差進行積累,一邊解密,一邊加密。

  5. 最後,需要計算整體取樣點的符號,然後利用 1 位元進行儲存,每 16 個符號為一組,組成一個 16 位元的無符號整數。(注:這一步可以跟上面的演算法並行完成。)

最後計算得到的加密檔案格式如下:

檔案格式

解密過程:

  1. 讀取壓縮檔案,將符號數和差值數區分開,分別儲存到不同的陣列中。

  2. 然後,對差值部分進行解壓,利用公式:
    解壓公式
    解壓所得到的是原來取樣點的絕對值加一,因此,先進行減一操作,然後根據對應的符號數再的符號位來判斷該取樣點的符號。

  3. 最後,得到一個所有采樣點的陣列。寫入 .pcm 檔案中。

改進後的Python 程式碼

# -*- coding: utf-8 -*-
import wave
import os
import numpy as np
import math

quantized_num = 0.12               # 量化因子

# 壓縮檔案
def compressWaveFile(wave_data) :       
    diff_value = []                   # 儲存差值
    compressed_data = []              # 儲存壓縮資料
    decompressed_data = []            # 儲存解壓資料
    # 初始化 將第一個取樣點存起來 第一個取樣點不進行加密
    diff_value = [wave_data[0]]
    compressed_data = [wave_data[0]]
    decompressed_data = [wave_data[0]]
    # 壓縮每個資料
    for index in range(len(wave_data)) :
        if index == 0 :
            continue
        # 做差的時候要取對數,對數的 自變數 x >= 0, 由於樣本點有正有負,因此這裡先取絕對值加一
        waveData_abs = abs(wave_data[index]) + 1
        decompressedData_abs = abs(decompressed_data[index - 1]) + 1 
        # 相當於對變換後的值,即取絕對值加一後的值進行加密
        diff_value.append(math.log(waveData_abs) - math.log(decompressedData_abs))
        compressed_data.append(calCompressedData(diff_value[index], quantized_num))
        # 這裡進行解密,並直接將解密出來的數值進行減一操作
        de_num = math.exp(math.log(abs(decompressed_data[index - 1]) + 1) + compressed_data[index] * quantized_num) - 1
        # 判斷加密之前的樣本點符號是正還是負, 如果是負數,那麼解密出來的也應該是負數,需要乘-1
        if wave_data[index] < 0 :
            decompressed_data.append((-1) * de_num)
            continue
        decompressed_data.append(de_num)
    return compressed_data, decompressed_data

# 將所有樣本點的符號儲存在
def calSig(wave_data) :
    sig_array = []
    sig_num = np.uint16(0)
    # 除去第一個取樣點 每 64 個數據一組,將每組的符號位儲存在一個 16 位無符號整數中
    for index in range(len(wave_data)) :
        if index == 0 :
            continue
        if index % 16 == 1 :
            if index != 1 :
                sig_array.append(sig_num)
            if wave_data[index] < 0 :
                sig_num = np.uint16(1)
            else :
                sig_num = np.uint16(0)
        if wave_data[index] < 0 :
            sig_num = np.uint16((sig_num << 1) + 1)  # 負數 左移 1 位,加 1
        else :
            sig_num = np.uint16(sig_num << 1)        # 正數 左移 1 位
        # 最後幾位也要存起來
        if index == len(wave_data) - 1 :
            sig_array.append(sig_num)
    sig_array.insert(0, len(sig_array))
    # print("符號陣列大小:" + str(len(sig_array)))
    return sig_array

# 計算 對映 將差值對映到 -8 到 7 之間
def calCompressedData(diff_value, quantized_num) :
    if diff_value > 7 * quantized_num :
        return 7
    elif diff_value < -8 * quantized_num :
        return -8
    for i in range(16) :
        j = i - 8
        if (j - 1) * quantized_num < diff_value and diff_value <= j * quantized_num :
            return j

# 計算信噪比
def calSignalToNoiseRatio(wave_data, decompressed_data) :
    sum_son = np.int64(0)
    sum_mum = np.int64(0)
    for i in range(len(decompressed_data)) :
        sum_son = sum_son + int(decompressed_data[i]) * int(decompressed_data[i])
        sub = decompressed_data[i] - wave_data[i]
        sum_mum = sum_mum + sub * sub
    return 10 * math.log10(float(sum_son) / float(sum_mum))

# 讀取壓縮檔案
def readCompressedFile(compressed_str) :
    compressed_data = []          #用來儲存壓縮資料的陣列
    # 取出前兩個壓縮資料,即第一個樣本點 和 符號數的個數
    data_first = np.fromstring(compressed_str[0:2], dtype = np.uint16)
    compressed_data.append(data_first[0])
    data_next = np.fromstring(compressed_str[2:(data_first[0] + 1) * 2], dtype = np.uint16)
    # print("第一個資料:" + str(data_first[0]) + "\t 讀取長度 :" + str(len(data_first)) + "\t" + str(len(data_next)))
    for num in data_next :
        compressed_data.append(num)
    # 第一個取樣點
    com_first = np.fromstring(compressed_str[(data_first[0] + 1) * 2 : (data_first[0] + 2) * 2], dtype = np.short)
    compressed_data.append(com_first[0])
    # 去除第一個樣本點,剩餘所有資料都以 4 bit 儲存
    compressed_str = compressed_str[(data_first[0] + 2) * 2:len(compressed_str)]
    compressed_data_append = np.fromstring(compressed_str, dtype = np.uint8)
    # 將讀取出來的資料裝進壓縮陣列中,每一個數據,拆成兩個 4 bit 數
    for num in compressed_data_append :
        # 儲存的時候,是轉成 4 bit 無符號整數儲存的, 解密時,需要轉換回來
        compressed_data.append((num >> 4) - 8)
        compressed_data.append(((np.uint8(num << 4)) >> 4) - 8)
    return compressed_data

# 解密 還原檔案
def decompressWaveFile(compressed_data) :
    # 取出符號陣列
    sig_num = compressed_data[0]
    sig_array = compressed_data[1: sig_num + 1]
    decompressed_data = []
    # 將符號陣列從壓縮陣列中去除
    compressed_data = compressed_data[sig_num + 1 : len(compressed_data)]
    # 將第一個取樣點加入解密陣列中
    decompressed_data.append(compressed_data[0])
    for i in range(len(compressed_data)) :
        if i == 0 : 
            continue
        de_num = math.exp(math.log(abs(decompressed_data[i - 1]) + 1) + compressed_data[i]* quantized_num) - 1
        # 去除第一個取樣點的佔位
        t = i - 1
        if np.uint16(1 << (15 - (t % 16))) & sig_array[int(t / 16)] != 0 :
            decompressed_data.append((-1) * de_num)
            continue
        decompressed_data.append(de_num)
    return decompressed_data

for i in range(10) :
    f = wave.open("./語料/" + str(i + 1) + ".wav","rb")
    # getparams() 一次性返回所有的WAV檔案的格式資訊
    params = f.getparams()
    # nframes 取樣點數目
    nchannels, sampwidth, framerate, nframes = params[:4]
    # readframes() 按照取樣點讀取資料
    str_data = f.readframes(nframes)            # str_data 是二進位制字串
    # 以上可以直接寫成 str_data = f.readframes(f.getnframes())
    # 轉成二位元組陣列形式(每個取樣點佔兩個位元組)
    wave_data = np.fromstring(str_data, dtype = np.short)
    print( "取樣點數目:" + str(len(wave_data)))          #輸出應為取樣點數目
    f.close()
    compressed_data, decompressed_data = compressWaveFile(wave_data)

    # 計算符號陣列
    sig_array = calSig(wave_data)

    # 寫壓縮檔案
    with open("./壓縮檔案/" + str(i + 1) + ".dpc", "wb") as f :
        # 寫入樣本符號
        for sig_num in sig_array : 
            f.write(np.uint16(sig_num))
        # 寫入差值
        num = 0
        f.write(np.int16(compressed_data[0]))
        for j in range(len(compressed_data)) :
            # 第一個資料已經壓縮
            if j == 0 :
                continue
            # 壓縮資料 如果有最後一個數據沒拼上一個子節,則丟棄該樣本點
            elif j % 2 == 1 :
                num = np.uint8((compressed_data[j] + 8 )<< 4)
            else :
                num = np.uint8(num + np.uint8(compressed_data[j] + 8))
                f.write(num)

    # 讀壓縮檔案 解壓
    with open("./壓縮檔案/" + str(i + 1) + ".dpc", "rb") as f :
        compressed_data = readCompressedFile(f.read())
        decompressed_data = decompressWaveFile(compressed_data)

    # # 測試 寫壓縮檔案
    # with open("./還原檔案/" + str(i + 1) + ".txt", "w") as f :
    #     for num in compressed_data :
    #         f.write(str(num) + "\n")

    # 寫還原檔案
    with open("./還原檔案/" + str(i + 1) + ".pcm", "wb") as f :
        for num in decompressed_data :
            f.write(np.int16(num))
    print("檔案 " + str(i + 1) + " 的信噪比:" + str(calSignalToNoiseRatio(wave_data, decompressed_data)))

總結

總的來說,這個實驗還是挺自由的。我比較喜歡這個實驗老師的風格,隨意。而且老師強調,做實驗不用太拘束,隨便寫,就跟玩一樣。真的挺喜歡這個觀點的,實驗的主要目的就是讓我們熟悉語音,掌握語音的操作和演算法,總是按照那麼多條條框框來,重心就很容易偏。