1. 程式人生 > 實用技巧 >LSTM實現文字生成

LSTM實現文字生成

在時間序列預測的例子中,資料的時間步長為1,是有問題的。
故使用一個新的例項:用LSTM實現文字生成
輸入資料:50個單片語成一個訓練樣本,輸出為同樣長度的序列。一個多對多的模型。
資料集:莎士比亞作品。
整體描述:對莎士比亞的作品進行訓練。為了測試我們的工作方式,我們將提供模型候選短語,例如thou art more,並檢視模型是否可以找出短語後面應該包含的單詞。
程式碼來自:https://wizardforcel.gitbooks.io/tf-ml-cookbook-2e-zh/content/71.html

一、模型構建

1.模型引數

# Set RNN Parameters
min_word_freq = 5  # Trim the less frequent words off
rnn_size = 128  # RNN Model size
epochs = 10  # Number of epochs to cycle through data
batch_size = 100  # Train on this many examples at once
learning_rate = 0.001  # Learning rate
training_seq_len = 50  # how long of a word group to consider
embedding_size = rnn_size  # Word embedding size
save_every = 500  # How often to save model checkpoints
eval_every = 50  # How often to evaluate the test sentences
prime_texts = ['thou art more', 'to be or not to', 'wherefore art thou']

2.模型定義

# Define LSTM RNN Model
class LSTM_Model():
    # 這是一個多對多的模型。
    def __init__(self, embedding_size, rnn_size, batch_size, learning_rate,
                 training_seq_len, vocab_size, infer_sample=False):
        self.embedding_size = embedding_size # 詞嵌入維度,每個詞變為了128維的向量。
        self.rnn_size = rnn_size # 隱層大小128
        self.vocab_size = vocab_size #  單詞總數
        self.infer_sample = infer_sample # 區分訓練還是預測階段
        self.learning_rate = learning_rate # 學習率
        
        if infer_sample:
            self.batch_size = 1  # 預測階段,batch_size = 1,即輸入一個樣本
            self.training_seq_len = 1 # 一個樣本的長度為1,即只有一個單詞。
        else:
            self.batch_size = batch_size
            self.training_seq_len = training_seq_len # 訓練階段輸入文字的長度為50
        
        self.lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.rnn_size)
        self.initial_state = self.lstm_cell.zero_state(self.batch_size, tf.float32)
        '''
        輸入的樣本x:[batch_size,50] 50個單詞作為一個樣本。
        輸入的標籤y:[batch_size,50] 和單詞一一對應。
        '''
        self.x_data = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])
        self.y_output = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])
        
        with tf.variable_scope('lstm_vars'):
            # Softmax Output Weights
            W = tf.get_variable('W', [self.rnn_size, self.vocab_size], tf.float32, tf.random_normal_initializer())
            b = tf.get_variable('b', [self.vocab_size], tf.float32, tf.constant_initializer(0.0))
        
            # Define Embedding
            embedding_mat = tf.get_variable('embedding_mat', [self.vocab_size, self.embedding_size],
                                            tf.float32, tf.random_normal_initializer())
            # 此時embedding_output的維度 [batch_size,train_sen_len,self.embedding_size]
            # [100, 50, 128]
            embedding_output = tf.nn.embedding_lookup(embedding_mat, self.x_data)
            # 把嵌入向量,需要文字長度整除 embedding_output[1]
            # 所有rnn_inputs 一共有50個 維度為 [100,1,128]的tensor,然後遍歷每個tensor,把第二維去掉。
            rnn_inputs = tf.split(axis=1, num_or_size_splits=self.training_seq_len, value=embedding_output)
            # [(100,128),50個]
            rnn_inputs_trimmed = [tf.squeeze(x, [1]) for x in rnn_inputs]
        
        # If we are inferring (generating text), we add a 'loop' function
        # Define how to get the i+1 th input from the i th output
        def inferred_loop(prev):
            # Apply hidden layer
            prev_transformed = tf.matmul(prev, W) + b
            # Get the index of the output (also don't run the gradient)
            prev_symbol = tf.stop_gradient(tf.argmax(prev_transformed, 1))
            # Get embedded vector
            out = tf.nn.embedding_lookup(embedding_mat, prev_symbol)
            return out

3. 模型的輸出

decoder = tf.contrib.legacy_seq2seq.rnn_decoder
outputs, last_state = decoder(rnn_inputs_trimmed,
                            self.initial_state,
                            self.lstm_cell,
                            loop_function=inferred_loop if infer_sample else None)
self.final_state = last_state

本段程式碼使用了tf.contrib.legacy_seq2seq.rnn_decoder方法。老版本的seq2seq的實現,新版本使用 tf.contrib.seq2seq。

tf.contrib.legacy_seq2seq.rnn_decoder(
    decoder_inputs,
    initial_state,
    cell,
    loop_function=None,
    scope=None
)

decoder_inputs:一個列表,其長度為num_steps,每個元素是[batch_size, input_size]的2-D維的tensor。
initial_state:2-D tensor,cell的初始化狀態。
cell:使用的LSTM網路。
loop_function:如果不為空,則將該函式應用於第i個輸出以得到第i+1個輸入。在預測階段,上一個時刻的輸出,經過loop_function函式,得到的值作為當前時刻解碼器的輸入。訓練階段設定為了None。

兩個輸出:
outputs : A list of the same length as decoder_inputs of 2D Tensors with shape [batch_size x output_size] containing generated outputs.
state :The state of each cell at the final time-step. It is a 2D Tensor of shape [batch_size x cell.state_size].


# 指定最後一個維度為128,其他維度合併在一起。
output = tf.reshape(tf.concat(axis=1, values=outputs), [-1, self.rnn_size]) #output.shape()=[5000, 128]   

outputs是一個長度為50的列表,即 50* [100,128] 。 按axis=1進行合併,結果的shape為(100,128*50=6400)
然後reshape 為 (5000,128)。可以理解為這次訓練的輸出為5000個單詞的embedding。

t1 = [[1, 2, 3], [4, 5, 6]]
t2 = [[7, 8, 9], [10, 11, 12]]
tf.concat([t1, t2], 0)  # [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]
tf.concat([t1, t2], 1)  # [[1, 2, 3, 7, 8, 9], [4, 5, 6, 10, 11, 12]]

最後經過一個全連線層,得到每個單詞的分佈 (5000,詞彙表大小)
經過sotfmax,得到概率。

self.logit_output = tf.matmul(output, W) + b  #[5000,詞彙表大小]
self.model_output = tf.nn.softmax(self.logit_output)

4.損失函式

loss_fun = tf.contrib.legacy_seq2seq.sequence_loss_by_example
loss = loss_fun([self.logit_output], [tf.reshape(self.y_output, [-1])],
                        [tf.ones([self.batch_size * self.training_seq_len])])
self.cost = tf.reduce_sum(loss) / (self.batch_size * self.training_seq_len)

關於這裡的損失函式,這個函式用於計算所有examples(假設一句話有n個單詞,一個單詞及單詞所對應的label就是一個example,所有example就是一句話中所有單詞)的加權交叉熵損失。
sequence_loss_by_example的做法是,針對logits中的每一個num_step,即[batch_size, vocab_size], 對所有vocab_size個預測結果,得出預測值最大的那個類別,與target中的值相比較計算Loss值
loss shape 為 (5000,) ,通過求平均得到平均的交叉熵損失值。

tf.contrib.legacy_seq2seq.sequence_loss_by_example(
    logits,
    targets,
    weights,
    average_across_timesteps=True,
    softmax_loss_function=None,
    name=None
)

logtit:List of 2D Tensors of shape [batch_size x num_decoder_symbols].此時為 [[5000,詞彙表大小]]
targert:List of 1D batch-sized int32 Tensors of the same length as logits. 此時shape為(5000,),每個值代表一個標籤的真實值。
weights:List of 1D batch-sized float-Tensors of the same length as logits。這裡每個樣本的權重都為1。

5. 優化器

tf.gradients計算損失的梯度,進行梯度裁剪,將梯度作為引數傳給優化器tf.train.AdamaOptimizer()得到優化器。
優化器呼叫apply_gradients方法進行變數更新。

gradients, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tf.trainable_variables()), 4.5)
optimizer = tf.train.AdamOptimizer(self.learning_rate)
self.train_op = optimizer.apply_gradients(zip(gradients, tf.trainable_variables()))

二.訓練模型

1。從原始資料得到輸入格式的樣本。

# 一共多少個batch
num_batches = int(len(s_text_ix)/(batch_size * training_seq_len)) + 1
# 樣本切分
batches = np.array_split(s_text_ix, num_batches)
# Reshape each split into [batch_size, training_seq_len]
batches = [np.resize(x, [batch_size, training_seq_len]) for x in batches]
targets = [np.roll(x, -1, axis=1) for x in batches] # 

關於 np.roll,是陣列的元素進行平移。前一個詞預測後一個詞。

x = np.arange(10)
x2 = np.reshape(x, (2,5))
'''
array([[0, 1, 2, 3, 4],
       [5, 6, 7, 8, 9]])
'''
np.roll(x2, -11, axis=1)
'''
array([[1, 2, 3, 4, 0],
       [6, 7, 8, 9, 5]])
'''

2 通過feed_dict傳遞引數,訓練lstm_model.train_op

lstm_model = LSTM_Model(rnn_size, batch_size, learning_rate, 
                     training_seq_len, vocab_size) 
for ix, batch in enumerate(batches):
    training_dict = {lstm_model.x_data: batch, lstm_model.y_output: targets[ix]}
    c, h = lstm_model.initial_state
    training_dict[c] = state.c
    training_dict[h] = state.h
    
    temp_loss, state, _ = sess.run([lstm_model.cost, lstm_model.final_state, lstm_model.train_op],
                                   feed_dict=training_dict) 

二、預測階段

目標:輸入一個句子,得到後續10個單詞作為輸出
預測階段輸入單詞後如何得到輸出的單詞。
使用相同的模型(具有相同的權重)來批量訓練並從示例文字生成文字。如果沒有采用內部抽樣方法的課程,這將很難做到。

    def sample(self, sess, words=ix2vocab, vocab=vocab2ix, num=10, prime_text='thou art'):
        state = sess.run(self.lstm_cell.zero_state(1, tf.float32))
        word_list = prime_text.split()
        for word in word_list[:-1]:
            x = np.zeros((1, 1))
            x[0, 0] = vocab[word]
            feed_dict = {self.x_data: x, self.initial_state: state}
            [state] = sess.run([self.final_state], feed_dict=feed_dict)

        out_sentence = prime_text
        word = word_list[-1]
        for n in range(num):
            x = np.zeros((1, 1))
            x[0, 0] = vocab[word]
            feed_dict = {self.x_data: x, self.initial_state: state}
            [model_output, state] = sess.run([self.model_output, self.final_state], feed_dict=feed_dict)
            sample = np.argmax(model_output[0])
            if sample == 0:
                break
            word = words[sample]
            out_sentence = out_sentence + ' ' + word
        return out_sentence

1、此時有已經訓練好的模型:lstm_cell。
2、lstm_cell狀態0初始化 state
3、輸入的單詞通過單詞-索引的字典轉換為數值索引 x。並通過 feed_dict = {self.x_data:x,self.initial_state:state}的方式把變數傳到神經網路
4、[state] = sess.run([self.final_state],feed_dict=feed_dict),預測階段只輸出最後一個狀態的值。
5、 state傳遞到下一個時間步。
6、到輸入序列最後一個單詞的時候,把輸出和狀態同時返回,傳遞給下一步,迴圈生成10個單詞的文字。