第十二課 tensorflow 使用RNN實現古詩自動生成
阿新 • • 發佈:2019-01-03
上一課中說到RNN的實現原理。這一章,一個古詩生成的demo.
輸入
# coding:utf-8
"""
資料輸入
"""
import logging
import collections
import json
import numpy as np
class PoemInput(object):
def __init__(self, poem_file_path, batch_size):
self._poem_file_path = poem_file_path
self._batch_size = batch_size
self._poems = list()
self._poem_vectors = list()
self._batch_num = 0
self._chunk_size = None
# word:index 組成的字典
self._word_index_dict = None
# index:word 組成的字典
self._index_word_dict = None
def process(self):
self._convert_vector()
def _read_file(self):
line_no = 0
with open(self._poem_file_path) as poem_file:
for line in poem_file:
line_no += 1
line = line.decode('utf-8').strip()
line_infos = line.strip().split(':')
if len(line_infos) != 2:
continue
title = line_infos[0]
content = line_infos[1 ]
content = content.replace(' ', '')
if u'_' in content or u'(' in content or u'(' in content or u'《' in content or u'[' in content:
continue
if len(content) < 5 or len(content) > 79:
continue
content = '[' + content + ']'
# logging.debug(str(line_no) + ': ' + title + ': ' + content)
self._poems.append(content)
def _convert_vector(self):
self._read_file()
# 按詩的字數排序
poetrys = sorted(self._poems, key=lambda line: len(line))
logging.info(u'唐詩總數: ' + str(len(poetrys)))
# 統計每個字出現次數
all_words = []
for poetry in poetrys:
all_words += [word for word in poetry]
counter = collections.Counter(all_words)
count_pairs = sorted(counter.items(), key=lambda x: -x[1])
logging.debug('type count_pairs: ' + str(type(count_pairs)))
for i in range(5):
logging.debug('count_pairs ' + str(i) + ': ' + str(count_pairs[i]).decode('utf-8'))
words, counts = zip(*count_pairs)
logging.debug('type words: ' + str(type(words)))
logging.debug('indexs:' + str(counts))
logging.debug('words: ' + ' '.join(words))
# 取前多少個常用字
words = words[:len(words)] + (' ',)
# 每個字對映為一個數字ID
self._word_index_dict = dict(zip(words, range(len(words))))
# 將ID 對映成漢字
self._index_word_dict = dict(zip(range(len(words)), words))
logging.debug('index_word_dict: ' + json.dumps(self._index_word_dict, ensure_ascii=False))
logging.debug('word_index_map: ' + json.dumps(self._word_index_dict, ensure_ascii=False))
# 將每一個詞轉換成編號
to_num = lambda word: self._word_index_dict.get(word, len(words))
# 遍歷所有的唐詩,將每一首詩都轉換成編號
self._poem_vectors = [list(map(to_num, poetry)) for poetry in poetrys]
logging.debug(u'potrys_vector: ' + str(self._poem_vectors))
logging.debug(u'poetry: ' + json.dumps(poetrys, ensure_ascii=False))
self._chunk_size = len(self._poem_vectors) // self._batch_size
def next_batch(self):
batch_examples = list()
for i in range(self._batch_size):
poem = self._poem_vectors[i + self._batch_num * self._batch_size]
batch_examples.append(poem)
self._batch_num += 1
if self._batch_num == self._chunk_size: # 迴圈處理
self._batch_num = 0
# 將batch_examples 轉換成numpy的形式
# 計算長度取最長的
poem_max_len = max(map(len, batch_examples))
x_data = np.full([self._batch_size, poem_max_len], self._word_index_dict[' '], dtype=np.int32)
for row in range(self._batch_size):
x_data[row, :len(batch_examples[row])] = batch_examples[row]
# label 就是講x_data向後移動一個
# 產生的每一個batch,就是二維矩陣,行是 batch_size, 每一行都是 一首詩
# 樣本是 a, b, c, d label: b, c, d, d 注意label的最後一個是d 特別注意
# 另外因為樣本是[, a, b, c, d, ] 所以 label就是 a, b, c, d, ], ]
# 所以 label的最後是 ], 預測出來也是 ] 這很合理 非常合理
y_data = x_data.copy()
y_data[:, :-1] = x_data[:, 1:]
return x_data, y_data
def convert_poem_vector_2_poem(self, poem_vector):
"""
把詩歌的index vector轉換成具體的詩
:param poem_vector: index的向量
:return: 詩
"""
return [self._index_word_dict[word_index] for word_index in poem_vector]
def convert_poem_2_poem_vector(self, poem):
return [self._word_index_dict[word] for word in poem]
@property
def word_dict(self):
return self._word_index_dict
@property
def index_dict(self):
return self._index_word_dict
產生RNN模型
# coding:utf-8
"""
模型產生
"""
import tensorflow as tf
import logging
import common
class Inference(object):
BASIC_RNN_CELL = 'basice_rnn_cell'
LSTM_BASIC_CELL = 'lstm_basic_cell'
GRU_CELL = 'gru_cell'
def __init__(self, hidden_unit_size, num_layers, class_num, batch_size):
"""
初始化rnn
:param hidden_unit_size: 隱層單元數
:param num_layers: 多少層rnn
:param class_num: 最終的分類結果,也就是所有單詞的總量,每一個單詞作為一個分類
"""
self._hidden_unit_size = hidden_unit_size
self._num_layers = num_layers
self._class_num = class_num
self._batch_size = batch_size
def inference(self, model_type, inputs, targets):
# targets 因為是二維的,所以需要轉換成一維的來處理
if targets is not None:
tf.logging.info('1 targes shape: %s' % (str(targets.shape)))
targets = tf.reshape(targets, [-1])
tf.logging.info('2 targes shape: %s' % (str(targets.shape)))
if model_type == Inference.BASIC_RNN_CELL:
cell = tf.nn.rnn_cell.BasicRNNCell(self._hidden_unit_size)
elif model_type == Inference.LSTM_BASIC_CELL:
cell = tf.nn.rnn_cell.BasicLSTMCell(self._hidden_unit_size)
elif model_type == Inference.GRU_CELL:
cell = tf.nn.rnn_cell.GRUCell(self._hidden_unit_size)
else:
raise RuntimeError('not exised model type: ' + model_type)
multi_cell = tf.nn.rnn_cell.MultiRNNCell([cell] * self._num_layers)
initial_state = multi_cell.zero_state(self._batch_size, tf.float32)
# 構建rnn 模型,需要先將 inputs embedding
with tf.variable_scope('rnn'):
softmax_w = tf.get_variable('sfotmax_w', [self._hidden_unit_size, self._class_num], tf.float32)
softmax_b = tf.get_variable('softmax_b', [self._class_num], tf.float32)
# 構建inputs的embedding
with tf.device('/cpu:0'):
embedding = tf.get_variable('embedding', [self._class_num, self._hidden_unit_size])
inputs_embeding = tf.nn.embedding_lookup(embedding, inputs)
outputs, last_state = tf.nn.dynamic_rnn(multi_cell, inputs_embeding, initial_state=initial_state,
scope='rnn', dtype=tf.float32)
logging.info('outputs shape: ' + str(outputs.shape))
outputs = tf.reshape(outputs, [-1, self._hidden_unit_size])
logit = tf.matmul(outputs, softmax_w) + softmax_b
prob = tf.nn.softmax(logit)
loss = None
cost = None
if targets is not None:
loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example([logit],
[targets],
[tf.ones_like(targets, dtype=tf.float32)])
cost = tf.reduce_mean(loss)
return cost, prob, loss, last_state, logit, multi_cell, initial_state
訓練
# coding:utf8
"""
訓練
"""
import tensorflow as tf
import common
from poem_inference import Inference
from poem_input import PoemInput
import logging
class Train(object):
def train(self):
poem_input = PoemInput('./input/poetry.txt', common.TRAIN_BATCH_SIZE)
poem_input.process()
num_class = len(poem_input.word_dict) + 1
x_placeholder = tf.placeholder(tf.int32, [common.TRAIN_BATCH_SIZE, None])
y_placeholder = tf.placeholder(tf.int32, [common.TRAIN_BATCH_SIZE, None])
logging.info('y shape 1: ' + str(y_placeholder.shape))
inference = Inference(common.HIDDEN_UNIT_SIZE,
common.NUM_LAYERS,
num_class,
common.TRAIN_BATCH_SIZE)
info = inference.inference(Inference.BASIC_RNN_CELL,
x_placeholder,
y_placeholder)
cost = info[0]
learning_rate = tf.Variable(0.01, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), 5)
optimizer = tf.train.AdamOptimizer(learning_rate)
train_op = optimizer.apply_gradients(zip(grads, tvars))
with tf.Session() as session:
session.run([tf.global_variables_initializer(),
tf.local_variables_initializer()])
saver = tf.train.Saver(tf.all_variables())
decay_steps = common.NUM_STPES // 10
output_steps = common.NUM_STPES // 500
for step in range(common.NUM_STPES):
session.run(tf.assign(learning_rate, 0.002 * (0.97 ** (step // decay_steps))))
batch_x, batch_y = poem_input.next_batch()
cost_result, _ = session.run([cost, train_op],
feed_dict={
x_placeholder: batch_x,
y_placeholder: batch_y
})
if step % output_steps == 0:
logging.info('step: %d, loss: %f' % (step, cost_result))
saver.save(session, './output/poem', global_step=step)
通用配置
# coding:utf-8
"""
common 定義
"""
TRAIN_BATCH_SIZE = 64 # 訓練使用的batch size
HIDDEN_UNIT_SIZE = 128
NUM_LAYERS = 2
NUM_STPES = 100000
GEN_BATCH_SIZE = 1 # 生成使用的batch size
訓練的main
# coding:utf-8
"""
main入口
"""
from poem_train import Train
import logging
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(name)s:%(levelname)s: %(message)s [%(filename)s:%(lineno)d]",
datefmt='%a, %d %b %Y %H:%M:%S',
filename='./log/rnn_poem_train.log',
filemode='w'
)
train = Train()
train.train()
使用訓練的模型生成古詩
# coding:utf-8
"""
產生詩歌
"""
import numpy as np
from poem_input import PoemInput
from poem_inference import Inference
import logging
import common
import tensorflow as tf
import json
import sys
class PoemGen(object):
def __init__(self, poem_path, model_path):
self._poem_input = PoemInput(poem_path, 1)
self._prepear()
num_classes = len(self._poem_input.word_dict) + 1
self._inference = Inference(common.HIDDEN_UNIT_SIZE,
common.NUM_LAYERS,
num_classes,
common.GEN_BATCH_SIZE)
self._mode_path = model_path
def _prepear(self):
self._poem_input.process()
def _to_word(self, prob):
logging.info('prob: ' + str(prob))
t = np.cumsum(prob)
logging.info('t: ' + str(t))
s = np.sum(prob)
logging.info('s:' + str(s))
sample = int(np.searchsorted(t, np.random.rand(1) * s))
logging.info('sample: ' + str(sample))
if sample not in self._poem_input.index_dict:
logging.info('index_dict: ' + str(self._poem_input.index_dict))
return self._poem_input.index_dict[sample]
# target = prob[0]
#
# target_max = 1e-20
# max_index = -1
# for index in range(len(target)):
# if target[index] > target_max:
# target_max = target[index]
# max_index = index
#
# return self._poem_input.index_dict[max_index]
def gen(self):
"""
生成詩歌
:return:
"""
word_index_vector = self._poem_input.convert_poem_2_poem_vector(['['])
x = np.array([word_index_vector])
logging.debug('x shape: ' + str(x.shape))
logging.debug('x content: ' + str(x))
x_placehoder = tf.placeholder(tf.int32, [1, None])
_, prob, _, last_state, _, _, initial_state = self._inference.inference(Inference.BASIC_RNN_CELL,
x_placehoder, None)
with tf.Session() as session:
session.run([tf.global_variables_initializer(),
tf.local_variables_initializer()])
saver = tf.train.Saver(tf.all_variables())
saver.restore(session, self._mode_path)
prob_result, last_state_result = session.run([prob, last_state], feed_dict={
x_placehoder: x
})
word = self._to_word(prob_result)
poem = '[' + word
while word != ']':
word_index_vector = self._poem_input.convert_poem_2_poem_vector([word])
x = np.array([word_index_vector])
prob_result, last_state_result = session.run([prob, last_state], feed_dict={
x_placehoder: x,
initial_state: last_state_result
})
word = self._to_word(prob_result)
poem += word
logging.info('poem: ' + json.dumps(poem, ensure_ascii=False).encode('utf-8'))
return poem
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(name)s:%(levelname)s: %(message)s [%(filename)s:%(lineno)d]",
datefmt='%a, %d %b %Y %H:%M:%S',
filename='./log/rnn_poem_gen.log',
filemode='w'
)
if len(sys.argv) != 2:
logging.fatal('please input model path')
exit(-1)
poem_gen = PoemGen('./input/poetry.txt')
poem = poem_gen.gen()
print('opem: ', json.dumps(poem, ensure_ascii=False).encode('utf-8'))
生成結果
僅僅訓練了20000次。可以訓練的更久一點,使用的是RNN模型,並沒有使用lstm,使用lstm會更好。
金藕聳傳木,思天希保官。王化獻茲離,連雲何近聞。努戈峨軾至,軍主就陽川。空使君明愧,囊當白露華。青帘對雪咽,簫雅豈遙遊。恭渟萬古史,四陸覓其行。