實現基於seq2seq的聊天機器人
前幾篇部落格介紹了基於檢索聊天機器人的實現、seq2seq的模型和程式碼,本篇部落格將從頭實現一個基於seq2seq的聊天機器人。這樣,在強化學習和記憶模型出現之前的對話系統中的模型就差不多介紹完了。後續將著重介紹強化學習和記憶模型在對話系統中的應用。
閒聊機器人在網上有很多有趣的小例子:
他們不一定是用seq2seq來做的,但是實現的效果都是類似的。
本篇部落格主要參考github程式碼DeepQA。可以實現命令列中的互動式對話和前端網頁版的互動式對話。
可以看到,實現的效果就是單輪的閒聊對話,是對話中最簡單的任務,但對於以後理解和實現多輪對話和對話管理幫助也很大。程式碼
資料處理
資料集介紹:Cornell Movie-Dialogs Corpus
該電影對話資料集包含從原始電影指令碼中提取的虛構對話集合:
- 10,292對電影人物之間的220,579次會話交流
- 涉及617部電影中的9,035個角色
- 總共304,713條話語
本次訓練主要使用了movie_lines.txt和movie_conversations.txt。
movie_lines.txt每一行的屬性包括:
- lineID
- characterID (who uttered this phrase)
- movieID
- character name
- text of the utterance
舉一個例子:
各屬性之間以“ +++$+++ ”分割。第一個屬性為lineID,最後一個屬性為對話的文字。
movie_conversations.txt每一行的屬性包括:
- characterID of the first character involved in the conversation
- characterID of the second character involved in the conversation
- movieID of the movie in which the conversation occurred
- list of the utterances that make the conversation
舉一個例子:
仍然以相同的分隔符分割,每一行的最後一個屬性為擷取的對話的片段。如第一行對話片段為[‘L194’,’L195’,’L196’,’L197’],每一個元素代表lineID,將movie_conversations.txt中的lineID替換為movie_lines.txt中的對話文字,就構成了訓練的資料集,即將lineID替換為對話文字後的[‘L194’,’L195’],[‘L195’,’L196’], [‘L196’,’L197’]就構成了三個訓練樣本。
接下來開始寫程式碼。資料處理部分的三板斧:讀取資料、構建詞典、構造資料集,我們已經熟的不能再熟了。
1、讀取資料依然使用pandas:
read_csv中使用正則來匹配分隔符,所以需要對“+++$+++”進行轉義,在movie_lines.txt檔案中只需要使用lineID和對話文字兩列即可,然後將對話文字進行分詞,得到對話文字的單詞列表。
movie_conversations.txt中只需要使用對話片段,即只需要line_ids列即可,讀取的時候是以str格式讀取的,因此需要eval或者literal_eval函式將其還原為列表格式。
# 讀取 movie_lines.txt 和movie_conversations.txt兩個檔案
print("開始讀取資料")
self.lines = pd.read_csv(self.args.line_path, sep=" \+\+\+\$\+\+\+ ", usecols=[0,4],
names=["line_id", "utterance"], dtype={"utterance":str}, engine="python")
self.conversations = pd.read_csv(self.args.conv_path, usecols=[3], names=["line_ids"],
sep=" \+\+\+\$\+\+\+ ", dtype={"line_ids":str}, engine="python")
self.lines.utterance = self.lines.utterance.apply(lambda conv : self.word_tokenizer(conv))
self.conversations.line_ids = self.conversations.line_ids.apply(lambda li : eval(li))
2、構建詞表
為了方便,將文字中所有的單詞都轉為小寫,然後按照單詞出現次數進行排序並分配id。選擇出現次數大於1的單詞作為vocab,減小長尾對生成結果的影響。使用pandas的series來構造word2id和id2word詞表。
# 得到word2id和id2word兩個詞典
print("開始構建詞典")
words = self.lines.utterance.values
words = list(chain(*words))
# 將全部words轉為小寫
print("正在轉為小寫")
words = list(map(str.lower, words))
print("轉化小寫完畢")
sr_words_count = pd.Series(words).value_counts()
# 篩選出 出現次數 大於 1 的詞作為 vocabulary
sr_words_size = np.where(sr_words_count.values > self.args.vacab_filter)[0].size
sr_words_index = sr_words_count.index[0:sr_words_size]
self.sr_word2id = pd.Series(range(self.numToken, self.numToken + sr_words_size), index=sr_words_index)
self.sr_id2word = pd.Series(sr_words_index, index=range(self.numToken, self.numToken + sr_words_size))
self.sr_word2id[self.padToken] = 0
self.sr_word2id[self.goToken] = 1
self.sr_word2id[self.eosToken] = 2
self.sr_word2id[self.unknownToken] = 3
self.sr_id2word[0] = self.padToken
self.sr_id2word[1] = self.goToken
self.sr_id2word[2] = self.eosToken
self.sr_id2word[3] = self.unknownToken
3、構造資料集
生成對話類的資料集只需要構造訓練樣本就可以。前面提到要將movie_conversations.txt中的lineID替換為movie_lines.txt中的對話文字,為了快速索引,需要構建一個以lineID為鍵,對話文字為value的字典,即程式碼中的sr_line_id。然後構造型為[first_conv, first_conv]的樣本。細心的讀者可能注意到,這裡在構造資料集的時候並沒有填充,因為填充的部分解除安裝get_batch的部分了,這樣可以方便程式碼的重用,在構建batch的時候會詳細說明的。至此資料處理部分就完成了。
print("開始生成訓練樣本")
# 將id與line作為字典,以方便生成訓練樣本
self.sr_line_id = pd.Series(self.lines.utterance.values, index=self.lines.line_id.values)
for line_id in tqdm(self.conversations.line_ids.values, ncols=10):
for i in range(len(line_id) - 1):
first_conv = self.sr_line_id[line_id[i]]
second_conv = self.sr_line_id[line_id[i+1]]
# 將文字全部轉化為小寫,然後再將word替換為id
first_conv = self.replace_word_with_id(first_conv)
second_conv = self.replace_word_with_id(second_conv)
# 篩選樣本,將輸入或輸出大於max_length的樣本、輸出中含有UNK的單詞的樣本過濾掉
valid = self.filter_conversations(first_conv, second_conv)
if valid :
temp = [first_conv, second_conv]
self.train_samples.append(temp)
print("生成訓練樣本結束")
def filter_conversations(self, first_conv, second_conv):
# 篩選樣本, 首先將encoder_input 或 decoder_input大於max_length的conversation過濾
# 其次將target中包含有UNK的conversation過濾
valid = True
valid &= len(first_conv) <= self.args.maxLength
valid &= len(second_conv) <= self.args.maxLength
valid &= second_conv.count(self.sr_word2id[self.unknownToken]) == 0
return valid
模型構建
模型構建部分主要使用tensorflow中的tf.contrib.legacy_seq2seq介面的embedding_rnn_seq2seq函式。這個函式在tensorflow中的seq2seq的程式碼詳解中有詳細的解釋。值得注意的是,模型構建時的placeholder是一個列表,即list of [batch_size,]。因此在訓練過程中,生成batch時需要根據對應的placeholder的shape進行填充和變形。此處的模型構建也不復雜,因此不詳細介紹了。
class seq2seq:
def __init__(self, args, text_data):
self.args = args
self.text_data = text_data
# Placeholders
self.encoder_inputs = None
self.decoder_inputs = None
self.decoder_targets = None
self.decoder_weights = None
self.num_encoder_symbols = len(text_data.sr_word2id)
self.num_decoder_symbols = self.num_encoder_symbols
# self.num_encoder_symbols = 10000
# self.num_decoder_symbols = 10000
# important operation
self.outputs = None
self.loss = None
self.build_model()
def build_model(self):
outputProjection = None
# define mutil RNN cell
def create_cell():
cell = tf.contrib.rnn.BasicLSTMCell(self.args.hidden_size)
cell = tf.contrib.rnn.DropoutWrapper(
cell,
input_keep_prob=1.0,
output_keep_prob=self.args.dropout)
return cell
self.cell = tf.contrib.rnn.MultiRNNCell([create_cell() for _ in range(self.args.rnn_layers)])
# define placeholder
with tf.name_scope("encoder_placeholder"):
self.encoder_inputs = [tf.placeholder(tf.int32, [None, ])
for _ in range(self.args.maxLengthEnco)]
with tf.name_scope("decoder_placeholder"):
self.decoder_inputs = [tf.placeholder(tf.int32, [None, ], name='decoder_inputs')
for _ in range(self.args.maxLengthDeco)]
self.decoder_targets = [tf.placeholder(tf.int32, [None, ], name='decoder_targets')
for _ in range(self.args.maxLengthDeco)]
self.decoder_weights = [tf.placeholder(tf.float32, [None, ], name='decoder_weights')
for _ in range(self.args.maxLengthDeco)]
decoder_output, state = tf.contrib.legacy_seq2seq.embedding_rnn_seq2seq(self.encoder_inputs,
self.decoder_inputs,
self.cell,
self.num_encoder_symbols,
self.num_decoder_symbols,
self.args.embedding_size,
output_projection=None,
feed_previous=bool(self.args.test),
dtype=None,
scope=None)
# For testing only
if self.args.test is not None:
if not outputProjection:
self.outputs = decoder_output
else:
self.outputs = [outputProjection(output) for output in decoder_output]
else:
self.loss = tf.contrib.legacy_seq2seq.sequence_loss(logits=decoder_output,
targets=self.decoder_targets,
weights=self.decoder_weights)
tf.summary.scalar('loss', self.loss) # Keep track of the cost
print("模型構建完畢")
模型訓練
訓練的主體結構如下。這和我們之前所寫的程式碼結構都一樣,需要詳細講的是如何得到訓練和預測都可以用的batch,即get_next_batches();如何得到訓練和預測都可以用的feed_dict,即self.seq2seq_model.step(next_batch)。
try:
for i in range(self.args.epoch_nums):
# Generate batches
tic = datetime.datetime.now()
batches = self.text_data.get_next_batches()
for next_batch in tqdm(batches, desc="Training"):
# train_op, summaries, loss = self.seq2seq_model.step(next_batch)
feed_dict = self.seq2seq_model.step(next_batch)
_, summaries, loss = self.sess.run(
(self.train_op, mergedSummaries, self.seq2seq_model.loss),
feed_dict)
self.global_step += 1
self.writer.add_summary(summaries, self.global_step)
# Output training status
if self.global_step % 100 == 0:
perplexity = math.exp(float(loss)) if loss < 300 else float("inf")
tqdm.write("----- Step %d -- Loss %.2f -- Perplexity %.2f" %(self.global_step, loss, perplexity))
if self.global_step % self.args.checkpoint_every == 0:
self.save_session(self.sess, self.global_step)
toc = datetime.datetime.now()
print("Epoch finished in {}".format(toc - tic))
except (KeyboardInterrupt, SystemExit): # If the user press Ctrl+C while testing progress
print('Interruption detected, exiting the program...')
# self.save_session(sess, self.global_step) # Ultimate saving before complete exit
get_next_batches():當一次epoch結束時,首先要進行樣本的shuffle。然後使用yield的方式來產生樣本,得到的batches即為已經混洗過的,len(train_samples)/len(batch_size)個batches。注意samples,是沒有經過資料填充,並且資料shape也不為list of [batch_size,]格式的。因此create_batch將資料填充,並且進行資料變形。
def get_next_batches(self):
"""Prepare the batches for the current epoch
Return:
list<Batch>: Get a list of the batches for the next epoch
"""
self.shuffle()
batches = []
def gen_next_samples():
""" Generator over the mini-batch training samples
"""
for i in range(0, len(self.train_samples), self.args.batch_size):
yield self.train_samples[i:min(i + self.args.batch_size, len(self.train_samples))]
# TODO: Should replace that by generator (better: by tf.queue)
for samples in gen_next_samples():
batch = self.create_batch(samples)
batches.append(batch)
return batches
create_batch():主要進行資料填充和資料變形,以適應embedding_rnn_seq2seq函式輸入引數的要求。
def create_batch(self, samples):
batch = Batch()
batch_size = len(samples)
# 資料填充和資料構造,將模型中四個placeholder都構造好。
for i in range(batch_size):
# Unpack the sample
sample = samples[i]
batch.encoderSeqs.append(list(reversed(
sample[0]))) # Reverse inputs (and not outputs), little trick as defined on the original seq2seq paper
batch.decoderSeqs.append([self.sr_word2id[self.goToken]] + sample[1] + [self.sr_word2id[self.eosToken]]) # Add the <go> and <eos> tokens
batch.targetSeqs.append(
batch.decoderSeqs[-1][1:]) # Same as decoder, but shifted to the left (ignore the <go>)
# Long sentences should have been filtered during the dataset creation
assert len(batch.encoderSeqs[i]) <= self.args.maxLengthEnco
assert len(batch.decoderSeqs[i]) <= self.args.maxLengthDeco
# TODO: Should use tf batch function to automatically add padding and batch samples
# Add padding & define weight
batch.encoderSeqs[i] = [self.sr_word2id[self.padToken]] * (self.args.maxLengthEnco -
len(batch.encoderSeqs[i])) + batch.encoderSeqs[i] # Left padding for the input
batch.weights.append(
[1.0] * len(batch.targetSeqs[i]) + [0.0] * (self.args.maxLengthDeco - len(batch.targetSeqs[i])))
batch.decoderSeqs[i] = batch.decoderSeqs[i] + [self.sr_word2id[self.padToken]] * (
self.args.maxLengthDeco - len(batch.decoderSeqs[i]))
batch.targetSeqs[i] = batch.targetSeqs[i] + [self.sr_word2id[self.padToken]] * (
self.args.maxLengthDeco - len(batch.targetSeqs[i]))
# 資料的reshape,構造為list of [batch_size,]格式的
encoderSeqsT = [] # Corrected orientation
for i in range(self.args.maxLengthEnco):
encoderSeqT = []
for j in range(batch_size):
encoderSeqT.append(batch.encoderSeqs[j][i])
encoderSeqsT.append(encoderSeqT)
batch.encoderSeqs = encoderSeqsT
decoderSeqsT = []
targetSeqsT = []
weightsT = []
for i in range(self.args.maxLengthDeco):
decoderSeqT = []
targetSeqT = []
weightT = []
for j in range(batch_size):
decoderSeqT.append(batch.decoderSeqs[j][i])
targetSeqT.append(batch.targetSeqs[j][i])
weightT.append(batch.weights[j][i])
decoderSeqsT.append(decoderSeqT)
targetSeqsT.append(targetSeqT)
weightsT.append(weightT)
batch.decoderSeqs = decoderSeqsT
batch.targetSeqs = targetSeqsT
batch.weights = weightsT
return batch
self.seq2seq_model.step(next_batch):訓練時,需要將encoder_inputs、decoder_inputs、decoder_targets、decoder_weights四個placeholder都進行feed,否則無法計算loss,也就沒法訓練。預測時,將encoder_inputs和decoder_inputs的第一個時間步長進行feed就可以。
def step(self, batch):
""" Forward/training step operation.
Does not perform run on itself but just return the operators to do so. Those have then to be run
Args:
batch (Batch): Input data on testing mode, input and target on output mode
Return:
(ops), dict: A tuple of the (training, loss) operators or (outputs,) in testing mode with the associated feed dictionary
"""
# Feed the dictionary
feedDict = {}
if not self.args.test: # Training
for i in range(self.args.maxLengthEnco):
feedDict[self.encoder_inputs[i]] = batch.encoderSeqs[i]
for i in range(self.args.maxLengthDeco):
feedDict[self.decoder_inputs[i]] = batch.decoderSeqs[i]
feedDict[self.decoder_targets[i]] = batch.targetSeqs[i]
feedDict[self.decoder_weights[i]] = batch.weights[i]
# ops = (self.optOp, self.lossFct)
else: # Testing (batchSize == 1)
for i in range(self.args.maxLengthEnco):
feedDict[self.encoder_inputs[i]] = batch.encoderSeqs[i]
feedDict[self.decoder_inputs[0]] = [self.text_data.sr_word2id[self.text_data.goToken]]
# ops = (self.outputs,)
# Return one pass operator
return feedDict
訓練結果:
經過了大概七千多步的訓練:
loss降到了二點多,困惑度降到了二十多,互動式預測結果如下圖所示: