專案使用encode_使用PaddlePaddle實現聲紋識別
技術標籤:專案使用encode
前言
本章介紹如何使用PaddlePaddle實現簡單的聲紋識別模型,首先你需要熟悉音訊分類,沒有了解的可以檢視這篇文章《基於PaddlePaddle實現聲音分類》
。基於這個知識基礎之上,我們訓練一個聲紋識別模型,通過這個模型我們可以識別說話的人是誰,可以應用在一些需要音訊驗證的專案。
環境準備
主要介紹libsora,PyAudio,pydub的安裝,其他的依賴包根據需要自行安裝。
Python 3.7
PaddlePaddle 1.7
安裝libsora
最簡單的方式就是使用pip命令安裝,如下:
pip install pytest-runner
pip install librosa
如果pip命令安裝不成功,那就使用原始碼安裝,下載原始碼:https://github.com/librosa/librosa/releases/, windows的可以下載zip壓縮包,方便解壓。
pip install pytest-runner
tar xzf librosa-<版本號>.tar.gz 或者 unzip librosa-<版本號>.tar.gz
cd librosa-<版本號>/
python setup.py install
如果出現libsndfile64bit.dll': error 0x7e
錯誤,請指定安裝版本0.6.3,如pip install librosa==0.6.3
安裝ffmpeg, 下載地址:http://blog.gregzaal.com/how-to-install-ffmpeg-on-windows/,筆者下載的是64位,static版。
然後到C盤,筆者解壓,修改檔名為ffmpeg
,存放在C:\Program Files\
目錄下,並新增環境變數C:\Program Files\ffmpeg\bin
最後修改原始碼,路徑為C:\Python3.7\Lib\site-packages\audioread\ffdec.py
,修改32行程式碼,如下:
COMMANDS = ('C:\\Program Files\\ffmpeg\\bin\\ffmpeg.exe', 'avconv')
安裝PyAudio
使用pip安裝命令,如下:
pip install pyaudio
在安裝的時候需要使用到C++庫進行編譯,如果讀者的系統是windows,Python是3.7,可以在這裡下載whl安裝包,下載地址:https://github.com/intxcc/pyaudio_portaudio/releases
安裝pydub
使用pip命令安裝,如下:
pip install pydub
建立資料
本教程筆者使用的是Free ST Chinese Mandarin Corpus資料集,這個資料集一共有855個人的語音資料,有102600條語音資料。如果讀者有其他更好的資料集,可以混合在一起使用。
如何已經讀過筆者《基於PaddlePaddle實現聲音分類》這篇文章,應該知道語音資料小而多,最好的方法就是把這些音訊檔案生成二進位制檔案,加快訓練速度。所以建立create_data.py
用於生成二進位制檔案。
首先是建立一個數據列表,資料列表的格式為,建立這個列表主要是方便之後的讀取,也是方便讀取使用其他的語音資料集,不同的語音資料集,可以通過編寫對應的生成資料列表的函式,把這些資料集都寫在同一個資料列表中,這樣就可以在下一步直接生成二進位制檔案了。
def get_data_list(audio_path, list_path):
files = os.listdir(audio_path)
f_train = open(os.path.join(list_path, 'train_list.txt'), 'w')
f_test = open(os.path.join(list_path, 'test_list.txt'), 'w')
sound_sum = 0
s = set()for file in files:if '.wav' not in file:continue
s.add(file[:15])
sound_path = os.path.join(audio_path, file)if sound_sum % 100 == 0:
f_test.write('%s\t%d\n' % (sound_path.replace('\\', '/'), len(s) - 1))else:
f_train.write('%s\t%d\n' % (sound_path.replace('\\', '/'), len(s) - 1))
sound_sum += 1
f_test.close()
f_train.close()if __name__ == '__main__':get_data_list('dataset/ST-CMDS-20170001_1-OS', 'dataset')
有了上面建立的資料列表,就可以把語音資料轉換成訓練資料了,主要是把語音資料轉換成梅爾頻譜(Mel Spectrogram),使用librosa可以很方便得到音訊的梅爾頻譜,使用的API為librosa.feature.melspectrogram()
,輸出的是numpy值,可以直接用tensorflow訓練和預測。關於梅爾頻譜具體資訊讀者可以自行了解,跟梅爾頻譜同樣很重要的梅爾倒譜(MFCCs)更多用於語音識別中,對應的API為librosa.feature.mfcc()
。在轉換過程中,筆者還使用了librosa.effects.split
裁剪掉靜音部分的音訊,這樣可以減少訓練資料的噪聲,提供訓練準確率。筆者目前預設每條語音的長度為2.04秒,這個讀者可以根據自己的情況修改語音的長度,如果要修改訓練語音的長度,需要根據註釋的提示修改相應的資料值。如果語音長度比較長的,程式會隨機裁剪20次,以達到資料增強的效果。
class DataSetWriter(object):
def __init__(self, prefix):
# 建立對應的資料檔案
self.data_file = open(prefix + '.data', 'wb')
self.header_file = open(prefix + '.header', 'wb')
self.label_file = open(prefix + '.label', 'wb')
self.offset = 0
self.header = ''
def add_data(self, key, data):
# 寫入影象資料
self.data_file.write(struct.pack('I', len(key)))
self.data_file.write(key.encode('ascii'))
self.data_file.write(struct.pack('I', len(data)))
self.data_file.write(data)
self.offset += 4 + len(key) + 4
self.header = key + '\t' + str(self.offset) + '\t' + str(len(data)) + '\n'
self.header_file.write(self.header.encode('ascii'))
self.offset += len(data)
def add_label(self, label):
# 寫入標籤資料
self.label_file.write(label.encode('ascii') + '\n'.encode('ascii'))
# 格式二進位制轉換
def convert_data(data_list_path, output_prefix):
# 讀取列表
data_list = open(data_list_path, "r").readlines()print("train_data size:", len(data_list))
# 開始寫入資料
writer = DataSetWriter(output_prefix)for record in tqdm(data_list):try:
path, label = record.replace('\n', '').split('\t')
wav, sr = librosa.load(path, sr=16000)
intervals = librosa.effects.split(wav, top_db=20)
wav_output = []
# [可能需要修改] 裁剪的音訊長度:16000 * 秒數
wav_len = int(16000 * 2.04)for sliced in intervals:
wav_output.extend(wav[sliced[0]:sliced[1]])for i in range(20):
# 裁剪過長的音訊,過短的補0if len(wav_output) > wav_len:
l = len(wav_output) - wav_len
r = random.randint(0, l)
wav_output = wav_output[r:wav_len + r]else:
wav_output.extend(np.zeros(shape=[wav_len - len(wav_output)], dtype=np.float32))
wav_output = np.array(wav_output)
# 轉成梅爾頻譜
ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).reshape(-1).tolist()
# [可能需要修改] 梅爾頻譜的shape,librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).shapeif len(ps) != 128 * 128: continue
data = struct.pack('%sd' % len(ps), *ps)
# 寫入對應的資料
key = str(uuid.uuid1())
writer.add_data(key, data)
writer.add_label('\t'.join([key, label.replace('\n', '')]))if len(wav_output) <= wav_len:break
except Exception as e:print(e)if __name__ == '__main__':convert_data('dataset/train_list.txt', 'dataset/train')convert_data('dataset/test_list.txt', 'dataset/test')
建立reader.py
用於在訓練時讀取資料。編寫一個ReadData
類,用讀取上一步生成的二進位制檔案,通過.header
中的key
和每條資料的偏移量,將.data
的資料讀取出來,並通過key
來繫結data
和label
的對應關係。
class ReadData(object):
def __init__(self, prefix_path):
self.offset_dict = {}for line in open(prefix_path + '.header', 'rb'):
key, val_pos, val_len = line.split('\t'.encode('ascii'))
self.offset_dict[key] = (int(val_pos), int(val_len))
self.fp = open(prefix_path + '.data', 'rb')
self.m = mmap.mmap(self.fp.fileno(), 0, access=mmap.ACCESS_READ)print('loading label')
# 獲取label
self.label = {}for line in open(prefix_path + '.label', 'rb'):
key, label = line.split(b'\t')
self.label[key] = [int(label.decode().replace('\n', ''))]print('finish loading data:', len(self.label))
# 獲取影象資料
def get_data(self, key):
p = self.offset_dict.get(key, None)if p is None:return None
val_pos, val_len = preturn self.m[val_pos:val_pos + val_len]
# 獲取影象標籤
def get_label(self, key):return self.label.get(key)
# 獲取所有keys
def get_keys(self):return self.label.keys()
使用上面的工具,建立train_reader
和est_reader
函式,用於在訓練讀取訓練資料和測試資料,train_reader
多了np.random.shuffle(keys)
操作,作用是為了每一輪的訓練,資料都是打亂的,使得每次一輪的輸入資料順序都不一樣。如果讀取修改了輸入語音的長度,需要相應修改mapper()
函式中的值。
def mapper(sample):
data, label = sample
# [可能需要修改] 梅爾頻譜的shape
data = list(struct.unpack('%sd' % (128 * 128), data))
data = np.array(data).reshape((1, 128, 128)).astype(np.float32)assert (data is not None), 'data is None'return data, label
def train_reader(data_path, batch_size):
def reader():
readData = ReadData(data_path)
keys = readData.get_keys()
keys = list(keys)
np.random.shuffle(keys)
batch_data, batch_label = [], []for key in keys:
data = readData.get_data(key)assert (data is not None)
label = readData.get_label(key)assert (label is not None)
sample = (data, label)
d, label = mapper(sample)
batch_data.append([d])
batch_label.append(label)if len(batch_data) == batch_size:yield np.vstack(batch_data), np.vstack(batch_label).astype(np.int64)
batch_data, batch_label = [], []return reader
def test_reader(data_path, batch_size):
def reader():
readData = ReadData(data_path)
keys = readData.get_keys()
keys = list(keys)
batch_data, batch_label = [], []for key in keys:
data = readData.get_data(key)assert (data is not None)
label = readData.get_label(key)assert (label is not None)
sample = (data, label)
d, label = mapper(sample)
batch_data.append([d])
batch_label.append(label)if len(batch_data) == batch_size:yield np.vstack(batch_data), np.vstack(batch_label).astype(np.int64)
batch_data, batch_label = [], []return reader
訓練模型
建立train.py
開始訓練模型,搭建一個CNN分類模型,資料輸入層設定為[None, 1, 128, 128]
,這個大小就是梅爾頻譜的shape,如果讀者使用了其他的語音長度,也需要修改這個值。save_path
是所有模型的儲存路徑,init_model
是初始化模型的路徑,CLASS_DIM
為分類的總數,Free ST Chinese Mandarin Corpus資料集一共有855個人的語音資料,所以這裡分類總數為855。
# 儲存模型路徑
save_path = 'models/'
# 初始化模型路徑
init_model = None
# 類別總數
CLASS_DIM = 855
# [可能需要修改] 梅爾頻譜的shape
audio = fluid.data(name='audio', shape=[None, 1, 128, 128], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='int64')
# 卷積神經網路
def cnn(input, class_dim):
conv1 = fluid.layers.conv2d(input=input,
num_filters=20,
filter_size=5,
act='relu')
conv2 = fluid.layers.conv2d(input=conv1,
num_filters=50,
filter_size=5,
act='relu')
pool1 = fluid.layers.pool2d(input=conv2, pool_type='avg', global_pooling=True)
drop = fluid.layers.dropout(x=pool1, dropout_prob=0.5)
f1 = fluid.layers.fc(input=drop, size=128, act='relu')
bn = fluid.layers.batch_norm(f1)
f2 = fluid.layers.fc(input=bn, size=128, act='relu')
f3 = fluid.layers.fc(input=f2, size=class_dim, act='softmax')return f3
# 獲取網路模型
model = cnn(audio, CLASS_DIM)
# 獲取損失函式和準確率函式
cost = fluid.layers.cross_entropy(input=model, label=label)
avg_cost = fluid.layers.mean(cost)
acc = fluid.layers.accuracy(input=model, label=label)
# 獲取訓練和測試程式
test_program = fluid.default_main_program().clone(for_test=True)
# 定義優化方法
optimizer = fluid.optimizer.AdamOptimizer(learning_rate=1e-3,
regularization=fluid.regularizer.L2Decay(
regularization_coeff=0.001))
opts = optimizer.minimize(avg_cost)
# 獲取自定義資料
train_reader = reader.train_reader('dataset/train', batch_size=32)
test_reader = reader.test_reader('dataset/test', batch_size=32)
如果讀者之前已經訓練過,可以使用初始化模型恢復訓練。通過修改place
可以選擇使用CPU訓練還是GPU訓練。
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
# 載入初始化模型if init_model:
fluid.load(program=fluid.default_main_program(),
model_path=init_model,
executor=exe,
var_list=fluid.io.get_program_parameter(fluid.default_main_program()))print("Init model from: %s." % init_model)
開始執行訓練,目前是訓練500輪,在訓練過程中是從打包的二進位制檔案中讀取訓練資料的。每訓練00個batch列印一次訓練日誌,每一輪訓練結束,執行一次測試和儲存模型。在儲存預測模型時,儲存的是最後分類層的上一層,這樣在執行預測時,就可以輸出語音的特徵值,通過使用這些特徵值就可以實現聲紋識別了。
for pass_id in range(500):
# 進行訓練for batch_id, data in enumerate(train_reader()):
train_cost, train_acc = exe.run(program=fluid.default_main_program(),
feed={audio.name: data[0], label.name: data[1]},
fetch_list=[avg_cost, acc])
# 每100個batch列印一次資訊if batch_id % 100 == 0:print('Pass:%d, Batch:%d, Cost:%0.5f, Accuracy:%0.5f' %(pass_id, batch_id, train_cost[0], train_acc[0]))
# 進行測試
test_accs = []
test_costs = []for batch_id, data in enumerate(test_reader()):
test_cost, test_acc = exe.run(program=test_program,
feed={audio.name: data[0], label.name: data[1]},
fetch_list=[avg_cost, acc])
test_accs.append(test_acc[0])
test_costs.append(test_cost[0])
# 求測試結果的平均值
test_cost = (sum(test_costs) / len(test_costs))
test_acc = (sum(test_accs) / len(test_accs))print('Test:%d, Cost:%0.5f, Accuracy:%0.5f' % (pass_id, test_cost, test_acc))
# 儲存引數if not os.path.exists(os.path.join(save_path, 'params')):
os.makedirs(os.path.join(save_path, 'params'))
fluid.save(program=fluid.default_main_program(),
model_path=os.path.join(os.path.join(save_path, 'params'), "model"))print("Saved model to: %s" % os.path.join(save_path, 'params'))
# 儲存預測模型if not os.path.exists(os.path.join(save_path, 'infer')):
os.makedirs(os.path.join(save_path, 'infer'))
fluid.io.save_inference_model(dirname=os.path.join(save_path, 'infer'), feeded_var_names=[audio.name], target_vars=[feature], executor=exe)print("Saved model to: %s" % os.path.join(save_path, 'infer'))
聲紋對比
下面開始實現聲紋對比,建立infer_contrast.py
程式,編寫兩個函式,分類是載入資料和執行預測的函式,在這個載入資料函式中裁剪資料的長度必須要跟訓練時的輸入長度一樣。而在執行預測之後得到資料的是語音的特徵值。
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
save_path = 'models/infer'[infer_program,
feeded_var_names,
target_var] = fluid.io.load_inference_model(dirname=save_path, executor=exe)
# 讀取音訊資料
def load_data(data_path):
wav, sr = librosa.load(data_path, sr=16000)
intervals = librosa.effects.split(wav, top_db=20)
wav_output = []for sliced in intervals:
wav_output.extend(wav[sliced[0]:sliced[1]])
# [可能需要修改] 裁剪的音訊長度:16000 * 秒數
wav_len = int(16000 * 2.04)
# 裁剪過長的音訊,過短的補0if len(wav_output) > wav_len:
wav_output = wav_output[:wav_len]else:
wav_output.extend(np.zeros(shape=[wav_len - len(wav_output)], dtype=np.float32))
wav_output = np.array(wav_output)
# 獲取梅爾頻譜
ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).astype(np.float32)
ps = ps[np.newaxis, np.newaxis, ...]return ps
def infer(audio_path):
data = load_data(audio_path)
# 執行預測
feature = exe.run(program=infer_program,
feed={feeded_var_names[0]: data},
fetch_list=target_var)[0]return feature[0]
有了上面兩個函式,就可以做聲紋識別了。我們輸入兩個語音,通過預測函式獲取他們的特徵資料,使用這個特徵資料可以求他們的對角餘弦值,得到的結果可以作為他們相識度。對於這個相識度的閾值,讀者可以根據自己專案的準確度要求進行修改。
if __name__ == '__main__':
# 要預測的兩個人的音訊檔案
person1 = 'dataset/ST-CMDS-20170001_1-OS/20170001P00001A0101.wav'
person2 = 'dataset/ST-CMDS-20170001_1-OS/20170001P00001A0001.wav'
feature1 = infer(person1)
feature2 = infer(person2)
# 對角餘弦值
dist = np.dot(feature1, feature2) / (np.linalg.norm(feature1) * np.linalg.norm(feature2))if dist > 0.7:print("%s 和 %s 為同一個人,相似度為:%f" % (person1, person2, dist))else:print("%s 和 %s 不是同一個人,相似度為:%f" % (person1, person2, dist))
聲紋識別
在上面的聲紋對比的基礎上,我們建立infer_recognition.py
實現聲紋識別。同樣是使用上面聲紋對比的資料載入函式和預測函式,通過這兩個同樣獲取語音的特徵資料。
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
save_path = 'models/infer'
person_feature = []
person_name = [][infer_program,
feeded_var_names,
target_var] = fluid.io.load_inference_model(dirname=save_path, executor=exe)
def load_data(data_path):
wav, sr = librosa.load(data_path, sr=16000)
intervals = librosa.effects.split(wav, top_db=20)
wav_output = []for sliced in intervals:
wav_output.extend(wav[sliced[0]:sliced[1]])
# [可能需要修改] 裁剪的音訊長度:16000 * 秒數
wav_len = int(16000 * 2.04)
# 裁剪過長的音訊,過短的補0if len(wav_output) > wav_len:
wav_output = wav_output[:wav_len]else:
wav_output.extend(np.zeros(shape=[wav_len - len(wav_output)], dtype=np.float32))
wav_output = np.array(wav_output)
# 獲取梅爾頻譜
ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).astype(np.float32)
ps = ps[np.newaxis, np.newaxis, ...]return ps
def infer(audio_path):
data = load_data(audio_path)
feature = exe.run(program=infer_program,
feed={feeded_var_names[0]: data},
fetch_list=target_var)[0]return feature[0]
不同的是筆者增加了load_audio_db()
和recognition()
,第一個函式是載入語音庫中的語音資料,這些音訊就是相當於已經註冊的使用者,他們註冊的語音資料會存放在這裡,如果有使用者需要通過聲紋登入,就需要拿到使用者的語音和語音庫中的語音進行聲紋對比,如果對比成功,那就相當於登入成功並且獲取使用者註冊時的資訊資料。完成識別的主要在recognition()
函式中,這個函式就是將輸入的語音和語音庫中的語音一一對比。
def load_audio_db(audio_db_path):
audios = os.listdir(audio_db_path)for audio in audios:
path = os.path.join(audio_db_path, audio)
name = audio[:-4]
feature = infer(path)
person_name.append(name)
person_feature.append(feature)
def recognition(path):
name = ''
pro = 0
feature = infer(path)for i, person_f in enumerate(person_feature):
dist = np.dot(feature, person_f) / (np.linalg.norm(feature) * np.linalg.norm(person_f))if dist > pro:
pro = dist
name = person_name[i]return name, pro
有了上面的聲紋識別的函式,讀者可以根據自己專案的需求完成聲紋識別的方式,例如筆者下面提供的是通過錄音來完成聲紋識別。首先必須要載入語音庫中的語音,語音庫資料夾為audio_db
,然後使用者回車後錄音3秒鐘,然後程式會自動錄音,並使用錄音到的音訊進行聲紋識別,去匹配語音庫中的語音,獲取使用者的資訊。通過這樣方式,讀者也可以修改成通過服務請求的方式完成聲紋識別,例如提供一個API供APP呼叫,使用者在APP上通過聲紋登入時,把錄音到的語音傳送到後端完成聲紋識別,再把結果返回給APP,前提是使用者已經使用語音註冊,併成功把語音資料存放在audio_db
資料夾中。
if __name__ == '__main__':load_audio_db('audio_db')
# 錄音引數
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = 3
WAVE_OUTPUT_FILENAME = "infer_audio.wav"
# 開啟錄音
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)while True:try:
i = input("按下回車鍵開機錄音,錄音3秒中:")print("開始錄音......")
frames = []for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(data)print("錄音已結束!")
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
# 識別對比音訊庫的音訊
name, p = recognition(WAVE_OUTPUT_FILENAME)if p > 0.7:print("識別說話的為:%s,相似度為:%f" % (name, p))else:print("音訊庫沒有該使用者的語音")
except:
pass
Github地址: https://github.com/yeyupiaoling/VoiceprintRecognition_PaddlePaddle