tensorflow LSTM+CTC實現端到端的不定長數字串識別
上一篇文章tensorflow 實現端到端的OCR:二代身份證號識別實現了定長18位數字串的識別,並最終達到了98%的準確率。但是實際應用場景中,常常需要面對無法確定字串長度的情況,這時候除了需要對識別字符模型引數進行訓練外,還需要對字元劃分模型進行訓練,本文實現了上文提到的方法2,使用LSTM+CTC識別不定長的數字串。
環境依賴
環境依賴與上一篇基本一致
知識準備
-
LSTM(長短時記憶網路):是一種特殊結構的RNN,能夠解決普通RNN不能解決的長期依賴問題。具體介紹可參看這篇譯文[譯] 理解 LSTM 網路
-
CTC :Connectionist Temporal Classifier 一般譯為聯結主義時間分類器 ,適合於輸入特徵和輸出標籤之間對齊關係不確定的時間序列問題,CTC可以自動端到端地同時優化模型引數和對齊切分的邊界。比如本文例子,32 x 256大小的圖片,最大可切分256列,也就是輸入特徵最大256,而輸出標籤的長度最大設定是18,這種就可以用CTC模型進行優化。關於CTC模型,筆者認為可以這樣理解,假設32 x 256的圖片,數字串標籤是"123",把圖片按列切分(CTC會優化切分模型),然後分出來的每塊再去識別數字,找出這塊是每個數字或者特殊字元的概率(無法識別的則標記為特殊字元"-"),這樣就得到了基於輸入特徵序列(圖片)的每一個相互獨立建模單元個體(劃分出來的塊)(包括“-”節點在內)的類屬概率分佈。基於概率分佈,算出標籤序列是"123"的概率P(123),當然這裡設定"123"的概率為所有子序列之和,這裡子序列包括'-'和'1'、'2'、'3'的連續重複,如下圖所示:
本文采用TF框架的CTC封裝實現,tf.nn.ctc_loss,我們最後的目標是最小化ctc_loss官方定義如下:
ctc_loss(
labels,
inputs,
sequence_length,
preprocess_collapse_repeated=False,
ctc_merge_repeated=True,
time_major=True
)
inputs: 輸入(訓練)資料,是一個三維float型的資料結構[max_time_step , batch_size , num_classes],當修改time_major = False時,[batch_size,max_time_step,num_classes]。
總體的資料流:
image_batch
->[batch_size,max_time_step,num_features]->lstm
->[batch_size,max_time_step,cell.output_size]->reshape
->[batch_sizemax_time_step,num_hidden]->affine projection AW+b
->[batch_size*max_time_step,num_classes]->reshape
->[batch_size,max_time_step,num_classes]->transpose
->[max_time_step,batch_size,num_classes]
本文輸入圖片大小是(32,256),則num_features是32,max_time_step是256 代表最大劃分序列,其中cell.output_size == num_hidden,num_hidden及num_classes的值見下文常量定義
labels:OCR識別結果的標籤,是一個稀疏矩陣,下文訓練資料生成部分會有相關解釋
sequence_length: 一維資料,[max_time_step,…,max_time_step]長度為batch_size,值為max_time_step
因此我們需要做的就是將圖片的標籤label(需要OCR出的結果),圖片資料,以及圖片的長度轉換為labels,inputs,和sequence_length
正文
定義一些常量
#定義一些常量
#圖片大小,32 x 256
OUTPUT_SHAPE = (32,256)
#訓練最大輪次
num_epochs = 10000
#LSTM
num_hidden = 64
num_layers = 1
obj = gen_id_card()
num_classes = obj.len + 1 + 1 # 10位數字 + blank + ctc blank
#初始化學習速率
INITIAL_LEARNING_RATE = 1e-3
DECAY_STEPS = 5000
REPORT_STEPS = 100
LEARNING_RATE_DECAY_FACTOR = 0.9 # The learning rate decay factor
MOMENTUM = 0.9
DIGITS='0123456789'
BATCHES = 10
BATCH_SIZE = 64
TRAIN_SIZE = BATCHES * BATCH_SIZE
訓練資料集生成
訓練資料集的生成基本與上文一致,唯一變化就是增加生成隨機長度串的選項,對應方法如下:
def gen_text(self, is_ran=False):
text = ''
vecs = np.zeros((self.max_size * self.len))
//唯一變化,隨機設定長度
if is_ran == True:
size = random.randint(1, self.max_size)
else:
size = self.max_size
for i in range(size):
c = random.choice(self.char_set)
vec = self.char2vec(c)
text = text + c
vecs[i*self.len:(i+1)*self.len] = np.copy(vec)
return text,vecs
# 生成一個訓練batch
def get_next_batch(batch_size=128):
obj = gen_id_card()
#(batch_size,256,32)
inputs = np.zeros([batch_size, OUTPUT_SHAPE[1],OUTPUT_SHAPE[0]])
codes = []
for i in range(batch_size):
#生成不定長度的字串
image, text, vec = obj.gen_image(True)
#np.transpose 矩陣轉置 (32*256,) => (32,256) => (256,32)
inputs[i,:] = np.transpose(image.reshape((OUTPUT_SHAPE[0],OUTPUT_SHAPE[1])))
#標籤轉成列表儲存在codes
codes.append(list(text))
#比如batch_size=2,兩條資料分別是"12"和"1",則targets [['1','2'],['1']]
targets = [np.asarray(i) for i in codes]
#targets轉成稀疏矩陣
sparse_targets = sparse_tuple_from(targets)
#(batch_size,) sequence_length值都是256,最大劃分列數
seq_len = np.ones(inputs.shape[0]) * OUTPUT_SHAPE[1]
return inputs, sparse_targets, seq_len
這裡我們來了解一下什麼是稀疏矩陣,下面是百度百科的定義
對於那些零元素數目遠遠多於非零元素數目,並且非零元素的分佈沒有規律的矩陣稱為稀疏矩陣(sparse)
其實很容易理解這裡為什麼OCR識別訓練的標籤labels是一個稀疏矩陣,假設我們生成的batch_size 是64的樣本,每個樣本是長度為1~18的數字串,則生成一個(64,18)的矩陣,矩陣有數字的是非零元素,無數字的是零元素,且因為這個標籤是不定長的,所以非零元素的分佈沒有規律,標籤在儲存數字串的同時還要儲存位置資訊。
我們來看一下tensorflow中,如何把targets轉成一個稀疏矩陣的
#轉化一個序列列表為稀疏矩陣
def sparse_tuple_from(sequences, dtype=np.int32):
"""
Create a sparse representention of x.
Args:
sequences: a list of lists of type dtype where each element is a sequence
Returns:
A tuple with (indices, values, shape)
"""
indices = []
values = []
for n, seq in enumerate(sequences):
indices.extend(zip([n] * len(seq), xrange(len(seq))))
values.extend(seq)
indices = np.asarray(indices, dtype=np.int64)
values = np.asarray(values, dtype=dtype)
shape = np.asarray([len(sequences), np.asarray(indices).max(0)[1] + 1], dtype=np.int64)
return indices, values, shape
indices:二維int64的矩陣,代表非0的座標點
values:二維tensor,代表indice位置的資料值
dense_shape:一維,代表稀疏矩陣的大小
仍然拿剛才的兩個串"12"和"1"做例子,轉成的稀疏矩陣應該是
indecs = [[0,0],[0,1],[1,0]]
values = [1,2,1]
dense_shape = [2,2] (兩個數字串,最大長度為2)
代表dense tensor:
[[1,2],[1,0]]
有了序列列表轉稀疏矩陣的方法,反過來,當然也需要稀疏矩陣轉序列列表的方法:
def decode_sparse_tensor(sparse_tensor):
decoded_indexes = list()
current_i = 0
current_seq = []
for offset, i_and_index in enumerate(sparse_tensor[0]):
i = i_and_index[0]
if i != current_i:
decoded_indexes.append(current_seq)
current_i = i
current_seq = list()
current_seq.append(offset)
decoded_indexes.append(current_seq)
result = []
for index in decoded_indexes:
result.append(decode_a_seq(index, sparse_tensor))
return result
def decode_a_seq(indexes, spars_tensor):
decoded = []
for m in indexes:
str = DIGITS[spars_tensor[1][m]]
decoded.append(str)
return decoded
構建網路,開始訓練
資料準備工作完成,則開始構建LSTM+CTC的訓練模型,其中TF實現LSTM的方法就不做過多解釋,請讀者自行百度。
def get_train_model():
inputs = tf.placeholder(tf.float32, [None, None, OUTPUT_SHAPE[0]])
#定義ctc_loss需要的稀疏矩陣
targets = tf.sparse_placeholder(tf.int32)
#1維向量 序列長度 [batch_size,]
seq_len = tf.placeholder(tf.int32, [None])
#定義LSTM網路
cell = tf.contrib.rnn.LSTMCell(num_hidden, state_is_tuple=True)
stack = tf.contrib.rnn.MultiRNNCell([cell] * num_layers, state_is_tuple=True)
outputs, _ = tf.nn.dynamic_rnn(cell, inputs, seq_len, dtype=tf.float32)
shape = tf.shape(inputs)
#[batch_size,256]
batch_s, max_timesteps = shape[0], shape[1]
#[batch_size*max_time_step,num_hidden]
outputs = tf.reshape(outputs, [-1, num_hidden])
W = tf.Variable(tf.truncated_normal([num_hidden,
num_classes],
stddev=0.1), name="W")
b = tf.Variable(tf.constant(0., shape=[num_classes]), name="b")
#[batch_size*max_timesteps,num_classes]
logits = tf.matmul(outputs, W) + b
#[batch_size,max_timesteps,num_classes]
logits = tf.reshape(logits, [batch_s, -1, num_classes])
#轉置矩陣,第0和第1列互換位置=>[max_timesteps,batch_size,num_classes]
logits = tf.transpose(logits, (1, 0, 2))
return logits, inputs, targets, seq_len, W, b
訓練模型
def train():
global_step = tf.Variable(0, trainable=False)
learning_rate = tf.train.exponential_decay(INITIAL_LEARNING_RATE,
global_step,
DECAY_STEPS,
LEARNING_RATE_DECAY_FACTOR,
staircase=True)
logits, inputs, targets, seq_len, W, b = get_train_model()
#tragets是一個稀疏矩陣
loss = tf.nn.ctc_loss(labels=targets,inputs=logits, sequence_length=seq_len)
cost = tf.reduce_mean(loss)
#optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate,momentum=MOMENTUM).minimize(cost, global_step=global_step)
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss,global_step=global_step)
#前面說的劃分塊之後找每塊的類屬概率分佈,ctc_beam_search_decoder方法,是每次找最大的K個概率分佈
#還有一種貪心策略是隻找概率最大那個,也就是K=1的情況ctc_ greedy_decoder
decoded, log_prob = tf.nn.ctc_beam_search_decoder(logits, seq_len, merge_repeated=False)
acc = tf.reduce_mean(tf.edit_distance(tf.cast(decoded[0], tf.int32), targets))
init = tf.global_variables_initializer()
def report_accuracy(decoded_list, test_targets):
original_list = decode_sparse_tensor(test_targets)
detected_list = decode_sparse_tensor(decoded_list)
true_numer = 0
if len(original_list) != len(detected_list):
print("len(original_list)", len(original_list), "len(detected_list)", len(detected_list),
" test and detect length desn't match")
return
print("T/F: original(length) <-------> detectcted(length)")
for idx, number in enumerate(original_list):
detect_number = detected_list[idx]
hit = (number == detect_number)
print(hit, number, "(", len(number), ") <-------> ", detect_number, "(", len(detect_number), ")")
if hit:
true_numer = true_numer + 1
print("Test Accuracy:", true_numer * 1.0 / len(original_list))
def do_report():
test_inputs,test_targets,test_seq_len = get_next_batch(BATCH_SIZE)
test_feed = {inputs: test_inputs,
targets: test_targets,
seq_len: test_seq_len}
dd, log_probs, accuracy = session.run([decoded[0], log_prob, acc], test_feed)
report_accuracy(dd, test_targets)
def do_batch():
train_inputs, train_targets, train_seq_len = get_next_batch(BATCH_SIZE)
feed = {inputs: train_inputs, targets: train_targets, seq_len: train_seq_len}
b_loss,b_targets, b_logits, b_seq_len,b_cost, steps, _ = session.run([loss, targets, logits, seq_len, cost, global_step, optimizer], feed)
print b_cost, steps
if steps > 0 and steps % REPORT_STEPS == 0:
do_report()
save_path = saver.save(session, "ocr.model", global_step=steps)
return b_cost, steps
with tf.Session() as session:
session.run(init)
saver = tf.train.Saver(tf.global_variables(), max_to_keep=100)
for curr_epoch in xrange(num_epochs):
print("Epoch.......", curr_epoch)
train_cost = train_ler = 0
for batch in xrange(BATCHES):
start = time.time()
c, steps = do_batch()
train_cost += c * BATCH_SIZE
seconds = time.time() - start
print("Step:", steps, ", batch seconds:", seconds)
train_cost /= TRAIN_SIZE
train_inputs, train_targets, train_seq_len = get_next_batch(BATCH_SIZE)
val_feed = {inputs: train_inputs,
targets: train_targets,
seq_len: train_seq_len}
val_cost, val_ler, lr, steps = session.run([cost, acc, learning_rate, global_step], feed_dict=val_feed)
log = "Epoch {}/{}, steps = {}, train_cost = {:.3f}, train_ler = {:.3f}, val_cost = {:.3f}, val_ler = {:.3f}, time = {:.3f}s, learning_rate = {}"
print(log.format(curr_epoch + 1, num_epochs, steps, train_cost, train_ler, val_cost, val_ler, time.time() - start, lr))
訓練結果
訓練到第80個epoch的時候,64個測試樣本的準確率達到64%
訓練第80次後測試結果訓練到第100個epoch的時候,64個測試樣本的準確率達到100%了,後續基本上準確率都是100%了
測試準確率達到100%後記
最後完整的程式碼託管在我的Github下
訓練產生的圖片資料屬於比較理想,無噪音的環境下,所以才會在100個epoch的時候準確率就達到100%了,實際應用中圖片可能會有些線段或者離散點的噪音,讀者可以自行在生成訓練集中增加一些噪音,測試模型訓練效果
本文生成的串的所屬類別僅是0~9的10個類別,如果後續加上26*2個大小寫英文字母,或者加上3500+常用中文漢字去組成串,隨著所屬類別的不斷擴大,模型還能不能很好的識別?模型收斂的速度如何?
在編寫本文示例程式碼的過程中,較多的參考了別人的程式碼和模型,很多東西深層的原理基本沒去了解。先做下記錄,後續再研究那些數學模型和推導公式之類的。