1. 程式人生 > >深度有趣 | 23 歌詞古詩自動生成

深度有趣 | 23 歌詞古詩自動生成

簡介

使用RNN實現歌詞和古詩的自動生成

RNN多用於處理序列資料,通過學習資料上下文之間的關係,可以在給定若干個連續資料點的基礎上,預測下一個可能的資料點

以下是最基礎的RNN公式,當然也可以使用LSTM(Long Short-Term Memory)或GRU(Gated Recurrent Unit)生成序列

ht=tanh(Whhht1+Wxhxt)+bh h_t=tanh(W_{hh}h_{t-1}+W_{xh}x_t)+b_h

yt=Whyht+by y_t=W_{hy}h_t+b_y

準備

一些序列資料,這裡我們主要使用文字,例如歌詞和古詩等

手動版

載入庫和歌詞,去掉英文佔比較多的歌詞(可能為英文歌),還剩36616首歌

# -*- coding: utf-8 -*-

import numpy as np

sentences = []
with open('../lyrics.txt', 'r', encoding='utf8') as fr:
    lines = fr.readlines()
    for line in lines:
        line = line.strip()
        count = 0
        for c in line:
            if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'):
                count += 1
        if count / len(line) < 0.1:
            sentences.append(line)

print('共%d首歌' % len(sentences))

整理字和id之間的對映,共10131個字

chars = {}
for sentence in sentences:
    for c in sentence:
        chars[c] = chars.get(c, 0) + 1
chars = sorted(chars.items(), key=lambda x:x[1], reverse=True)
chars = [char[0] for char in chars]
vocab_size = len(chars)
print('共%d個字' % vocab_size, chars[:20])

char2id = {c: i for i, c in enumerate(chars)}
id2char = {i: c for i, c in enumerate(chars)}

定義一些訓練引數和模型引數,整理訓練資料

hidden_size = 100
maxlen = 25
learning_rate = 0.1

X_data = []
Y_data = []
for sentence in sentences:
    for i in range(0, len(sentence) - maxlen - 1, maxlen):
        X_data.append([char2id[c] for c in sentence[i: i + maxlen]])
        Y_data.append([char2id[c] for c in sentence[i + 1: i + maxlen + 1]])

print(len(X_data))

Wxh = np.random.randn(hidden_size, vocab_size) * 0.01
Whh = np.random.randn(hidden_size, hidden_size) * 0.01
Why = np.random.randn(vocab_size, hidden_size) * 0.01
bh = np.zeros((hidden_size, 1))
by = np.zeros((vocab_size, 1))

損失函式

def lossFun(inputs, targets, hprev):
    xs, hs, ys, ps = {}, {}, {}, {}
    hs[-1] = np.copy(hprev)
    loss = 0

    # forward pass
    for t in range(len(inputs)):
        xs[t] = np.zeros((vocab_size, 1))
        xs[t][inputs[t]] = 1
        hs[t] = np.tanh(np.dot(Wxh, xs[t]) + np.dot(Whh, hs[t - 1]) + bh)
        ys[t] = np.dot(Why, hs[t]) + by
        ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t]))
        loss += -np.log(ps[t][targets[t], 0])
    
    # backward pass
    dWxh, dWhh, dWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why)
    dbh, dby = np.zeros_like(bh), np.zeros_like(by)
    dhnext = np.zeros_like(hs[0])
    for t in reversed(range(len(inputs))):
        dy = np.copy(ps[t])
        dy[targets[t]] -= 1
        dWhy += np.dot(dy, hs[t].T)
        dby += dy
        dh = np.dot(Why.T, dy) + dhnext
        dhraw = (1 - hs[t] * hs[t]) * dh
        dbh += dhraw
        dWxh += np.dot(dhraw, xs[t].T)
        dWhh += np.dot(dhraw, hs[t-1].T)
        dhnext = np.dot(Whh.T, dhraw)
    
    for dparam in [dWxh, dWhh, dWhy, dbh, dby]:
        np.clip(dparam, -5, 5, out=dparam)

    return loss, dWxh, dWhh, dWhy, dbh, dby, hs[len(inputs) - 1]

樣本生成函式,每經過若干輪迭代就呼叫一次

def sample(h, seed_ix, n):
    x = np.zeros((vocab_size, 1))
    x[seed_ix] = 1
    ixes = []
    for t in range(n):
        h = np.tanh(np.dot(Wxh, x) + np.dot(Whh, h) + bh)
        y = np.dot(Why, h) + by
        p = np.exp(y) / np.sum(np.exp(y))
        ix = np.random.choice(range(vocab_size), p=p.ravel())
        ixes.append(ix)
        
        x = np.zeros((vocab_size, 1))
        x[ix] = 1
    
    return ixes

初始化訓練變數,這裡使用Adagrad優化演算法,所以需要一些額外的快取變數

n = 0
mWxh, mWhh, mWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why)
mbh, mby = np.zeros_like(bh), np.zeros_like(by)
smooth_loss = -np.log(1.0 / vocab_size) * maxlen

訓練模型,會一直迴圈進行

while True:
    if n == 0 or n == len(X_data): 
        hprev = np.zeros((hidden_size, 1))
        n = 0
    
    X = X_data[n]
    Y = Y_data[n]

    loss, dWxh, dWhh, dWhy, dbh, dby, hprev = lossFun(X, Y, hprev)
    smooth_loss = smooth_loss * 0.999 + loss * 0.001

    for param, dparam, mem in zip([Wxh, Whh, Why, bh, by], [dWxh, dWhh, dWhy, dbh, dby], [mWxh, mWhh, mWhy, mbh, mby]):
        mem += dparam * dparam
        param += -learning_rate * dparam / np.sqrt(mem + 1e-8)

    if n % 100 == 0:
        print('iter %d, loss: %f' % (n, smooth_loss))
        sample_ix = sample(hprev, X[0], 200)
        txt = ''.join(id2char[ix] for ix in sample_ix)
        print(txt)

    n += 1

經過54W次迭代後,生成了這麼一段話,雖然並不通順,但似乎確實學習到了一些詞語和句法

顏悲 心已中雨著街眼淚不知 留在這時祈忘的自己一樣無常 你我的歡 當時是你能止學了綻放瞥袖 前朝來去勇氣 讓你是一雙睡過以後  因為你飛雪中的街音裡飛   此模糊的愛 只有誰要再多少時 管只是無度美醉不給主題襯  曾流盲雙腳一片城本身邊 來並肩常與盡是一點和缺 好愛得也還記得證著多夢 愛 做人來 這吃碎 我們精神蹲著你的門 口不信心終究理想透完了誰幾度 我都在憑營力的光體 賣愛不說 愛你是我的好

Keras

Keras官方提供了使用LSTM生成文字的示例

簡單地改一下,資料還是使用之前的歌詞

載入庫

# -*- coding: utf-8 -*-

from keras.models import Sequential
from keras.layers import Dense, LSTM, Embedding
from keras.callbacks import LambdaCallback
import numpy as np
import random
import sys
import pickle

載入資料,整理字和id之間的對映

sentences = []
with open('../lyrics.txt', 'r', encoding='utf8') as fr:
    lines = fr.readlines()
    for line in lines:
        line = line.strip()
        count = 0
        for c in line:
            if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'):
                count += 1
        if count / len(line) < 0.1:
            sentences.append(line)
print('共%d首歌' % len(sentences))

chars = {}
for sentence in sentences:
    for c in sentence:
        chars[c] = chars.get(c, 0) + 1
chars = sorted(chars.items(), key=lambda x:x[1], reverse=True)
chars = [char[0] for char in chars]
vocab_size = len(chars)
print('共%d個字' % vocab_size, chars[:20])

char2id = {c: i for i, c in enumerate(chars)}
id2char = {i: c for i, c in enumerate(chars)}

with open('dictionary.pkl', 'wb') as fw:
    pickle.dump([char2id, id2char], fw)

整理訓練資料,定義模型並編譯

maxlen = 10
step = 3
embed_size = 128
hidden_size = 128
vocab_size = len(chars)
batch_size = 64
epochs = 20

X_data = []
Y_data = []
for sentence in sentences:
    for i in range(0, len(sentence) - maxlen, step):
        X_data.append([char2id[c] for c in sentence[i: i + maxlen]])
        y = np.zeros(vocab_size, dtype=np.bool)
        y[char2id[sentence[i + maxlen]]] = 1
        Y_data.append(y)
X_data = np.array(X_data)
Y_data = np.array(Y_data)
print(X_data.shape, Y_data.shape)

model = Sequential()
model.add(Embedding(input_dim=vocab_size, output_dim=embed_size, input_length=maxlen))
model.add(LSTM(hidden_size, input_shape=(maxlen, embed_size)))
model.add(Dense(vocab_size, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')

定義序列樣本生成函式

def sample(preds, diversity=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds + 1e-10) / diversity
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

定義每輪訓練結束後的回撥函式

def on_epoch_end(epoch, logs):
    print('-' * 30)
    print('Epoch', epoch)

    index = random.randint(0, len(sentences))
    for diversity in [0.2, 0.5, 1.0]:
        print('----- diversity:', diversity)
        sentence = sentences[index][:maxlen]
        print('----- Generating with seed: ' + sentence)
        sys.stdout.write(sentence)

        for i in range(400):
            x_pred = np.zeros((1, maxlen))
            for t, char in enumerate(sentence):
                x_pred[0, t] = char2id[char]

            preds = model.predict(x_pred, verbose=0)[0]
            next_index = sample(preds, diversity)
            next_char = id2char[next_index]

            sentence = sentence[1:] + next_char

            sys.stdout.write(next_char)
            sys.stdout.flush()

訓練模型並儲存

model.fit(X_data, Y_data, batch_size=batch_size, epochs=epochs, callbacks=[LambdaCallback(on_epoch_end=on_epoch_end)])
model.save('song_keras.h5')

使用以下程式碼呼叫模型生成歌詞,需提供一句起始歌詞

# -*- coding: utf-8 -*-

from keras.models import load_model
import numpy as np
import pickle
import sys

maxlen = 10
model = load_model('song_keras.h5')

with open('dictionary.pkl', 'rb') as fr:
    [char2id, id2char] = pickle.load(fr)

def sample(preds, diversity=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds + 1e-10) / diversity
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

sentence = '能不能給我一首歌的時間'
sentence = sentence[:maxlen]

diversity = 1.0
print('----- Generating with seed: ' + sentence)
print('----- diversity:', diversity)
sys.stdout.write(sentence)

for i in range(400):
    x_pred = np.zeros((1, maxlen))
    for t, char in enumerate(sentence):
        x_pred[0, t] = char2id[char]

    preds = model.predict(x_pred, verbose=0)[0]
    next_index = sample(preds, diversity)
    next_char = id2char[next_index]

    sentence = sentence[1:] + next_char

    sys.stdout.write(next_char)
    sys.stdout.flush()

生成結果如下,比之前的結果似乎好一些,有意義的詞語和短句更多

能不能給我一首歌的時間 要去人還有古年 你代表我所的 只願為你做下一個成熟 從那個歌聲中  你的別思量 寫你的畫面走過了西陌上雨張 小水沒忘了 我欲再感受   我終於你開心哭過心事流出了我心痛 就看口提幽紋太多 獨自一直行 你也在想 我感到最此的第一次 只想要閒想 穿行多高樓的星雲 看見鞍上雲 青竹瓊樓又新葉 人潮春湧成度過 幸福嗚 風雪落入麗箏悽悽 萬頃枯枝回伸離袖弦  不幸以潮 到底必經認來我不變 都想你 這星辰 暮鼓 WA Lsevemusich hey Live 走進不在乎 不願天涯 如此溫柔 不夠支離 多巧認真和你還太平行 哎呀呀呀 呀呀呀呀呀呀呀啊嘿 餓不好去哪兒呀 那我的聰明? 王王之以下 下也難改徒有愛還能敢相離 撥開你的嘴角 相識的一見 到你的世界所世 才發現我也不會躲藏 讓我決定有人擔心善良 像一個人世界內心長著  夜晚需來又頭 與我專車徵 戰天幾天不懂配遊戲 也是自己應嗎 你給我來的狠也

TensorFlow

載入庫

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import glob
import json
from collections import Counter
from tqdm import tqdm
from snownlp import SnowNLP

載入資料,共105336首詩

poets = []
paths = glob.glob('chinese-poetry/json/poet.*.json')
for path in paths:
    data = open(path, 'r').read()
    data = json.loads(data)
    for item in data:
        content = ''.join(item['paragraphs'])
        if len(content) >= 24 and len(content) <= 32:
            content = SnowNLP(content)
            poets.append('[' + content.han + ']')

poets.sort(key=lambda x: len(x))
print('共%d首詩' % len(poets), poets[0], poets[-1])

整理字和id之間的對映,共8072個不同的字

chars = []
for item in poets:
    chars += [c for c in item]
print('共%d個字' % len(chars))

chars = sorted(Counter(chars).items(), key=lambda x:x[1], reverse=True)
print('共%d個不同的字' % len(chars))
print(chars[:10])

chars = [c[0] for c in chars]
char2id = {c: i + 1 for i, c in enumerate(chars)}
id2char = {i + 1: c for i, c in enumerate(chars)}

整理訓練資料

batch_size = 64
X_data = []
Y_data = []

for b in range(len(poets) // batch_size):
    start = b * batch_size
    end = b * batch_size + batch_size
    batch = [[char2id[c] for c in poets[i]] for i in range(start, end)]
    maxlen = max(map(len, batch))
    X_batch = np.full((batch_size, maxlen - 1), 0, np.int32)
    Y_batch = np.full((batch_size, maxlen - 1), 0, np.int32)

    for i in range(batch_size):
        X_batch[i, :len(batch[i]) - 1] = batch[i][:-1]
        Y_batch[i, :len(batch[i]) - 1] = batch[i][1:]
    
    X_data.append(X_batch)
    Y_data.append(Y_batch)
    
print(len(X_data), len(Y_data))

定義模型結構和優化器

hidden_size = 256
num_layer = 2
embedding_size = 256

X = tf.placeholder(tf.int32, [batch_size, None])
Y = tf.placeholder(tf.int32, [batch_size, None])
learning_rate = tf.Variable(0.0, trainable=False)

cell = tf.nn.rnn_cell.MultiRNNCell(
    [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)], 
    state_is_tuple=True)
initial_state = cell.zero_state(batch_size, tf.float32)

embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0))
embedded = tf.nn.embedding_lookup(embeddings, X)

# outputs: batch_size, max_time, hidden_size
# last_states: 2 tuple(two LSTM), 2 tuple(c and h)
#              batch_size, hidden_size
outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state)

outputs = tf.reshape(outputs, [-1, hidden_size])                # batch_size * max_time, hidden_size
logits = tf.layers.dense(outputs, units=len(char2id) + 1)       # batch_size * max_time, len(char2id) + 1
logits = tf.reshape(logits, [batch_size, -1, len(char2id) + 1]) # batch_size, max_time, len(char2id) + 1
probs = tf.nn.softmax(logits)                                   # batch_size, max_time, len(char2id) + 1

loss = tf.reduce_mean(tf.contrib.seq2seq.sequence_loss(logits, Y, tf.ones_like(Y, dtype=tf.float32)))
params = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5)
optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params))

訓練模型,共訓練50輪

sess = tf.Session()
sess.run(tf.global_variables_initializer())

for epoch in range(50):
    sess.run(tf.assign(learning_rate, 0.002 * (0.97 ** epoch)))
    
    data_index = np.arange(len(X_data))
    np.random.shuffle(data_index)
    X_data = [X_data[i] for i in data_index]
    Y_data = [Y_data[i] for i in data_index]
    
    losses = []
    for i in tqdm(range(len(X_data))):
        ls_,  _ = sess.run([loss, optimizer], feed_dict={X: X_data[i], Y: Y_data[i]})
        losses.append(ls_)
    
    print('Epoch %d Loss %.5f' % (epoch, np.mean(losses)))

儲存模型,以便在單機上使用

saver = tf.train.Saver()
saver.save(sess, './poet_generation_tensorflow')

import pickle
with open('dictionary.pkl', 'wb') as fw:
    pickle.dump([char2id, id2char], fw)

在單機上使用模型生成古詩,可隨機生成或生成藏頭詩

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import pickle

with open('dictionary.pkl', 'rb') as fr:
    [char2id, id2char] = pickle.load(fr)

batch_size = 1
hidden_size = 256
num_layer = 2
embedding_size = 256

X = tf.placeholder(tf.int32, [batch_size, None])
Y = tf.placeholder(tf.int32, [batch_size, None])
learning_rate = tf.Variable(0.0, trainable=False)

cell = tf.nn.rnn_cell.MultiRNNCell(
    [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)], 
    state_is_tuple=True)
initial_state = cell.zero_state(batch_size, tf.float32)

embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0))
embedded = tf.nn.embedding_lookup(embeddings, X)

outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state)

outputs = tf.reshape(outputs, [-1, hidden_size])
logits = tf.layers.dense(outputs, units=len(char2id) + 1)
probs = tf.nn.softmax(logits)
targets = tf.reshape(Y, [-1])

loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=targets))
params = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5)
optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params))

sess = tf.Session()
sess.run(tf.global_variables_initializer())

saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint('./'))

def generate():
    states_ = sess.run(initial_state)
    
    gen = ''
    c = '['
    while c != ']':
        gen += c
        x = np.zeros((batch_size, 1))
        x[:, 0] = char2id[c]
        probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_})
        probs_ = np.squeeze(probs_)
        pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_)))
        c = id2char[pos]
    
    return gen[1:]

def generate_with_head(head):
    states_ = sess.run(initial_state)
    
    gen = ''
    c = '['
    i = 0
    while c != ']':
        gen += c
        x = np.zeros((batch_size, 1))
        x[:, 0] = char2id[c]
        probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_})
        probs_ = np.squeeze(probs_)
        pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_)))

        if (c == '[' or c == '。' or c == ',') and i < len(head):
            c = head[i]
            i += 1
        else:
            c = id2char[pos]
    
    return gen[1:]

print(generate())
print(generate_with_head('深度學習'))

生成結果如下,字數和標點符號都對上了,內容也像那麼回事,反正也看不太懂

百計無心魄可無,知君又到兩家書。自知君子有天祿,天下名通赤子虛。
深山宜數月交馳,度世曾徒有客期。學子今來能入楚,習家不癭莫辭卑。

參考

視訊講解課程