1. 程式人生 > >tensorflow LSTM+CTC實現端到端的不定長數字串識別

tensorflow LSTM+CTC實現端到端的不定長數字串識別

上一篇文章tensorflow 實現端到端的OCR:二代身份證號識別實現了定長18位數字串的識別,並最終達到了98%的準確率。但是實際應用場景中,常常需要面對無法確定字串長度的情況,這時候除了需要對識別字符模型引數進行訓練外,還需要對字元劃分模型進行訓練,本文實現了上文提到的方法2,使用LSTM+CTC識別不定長的數字串。

環境依賴

環境依賴與上一篇基本一致

知識準備

  1. LSTM(長短時記憶網路):是一種特殊結構的RNN,能夠解決普通RNN不能解決的長期依賴問題。具體介紹可參看這篇譯文[譯] 理解 LSTM 網路

  2. CTC :Connectionist Temporal Classifier 一般譯為聯結主義時間分類器 ,適合於輸入特徵和輸出標籤之間對齊關係不確定的時間序列問題,CTC可以自動端到端地同時優化模型引數和對齊切分的邊界。比如本文例子,32 x 256大小的圖片,最大可切分256列,也就是輸入特徵最大256,而輸出標籤的長度最大設定是18,這種就可以用CTC模型進行優化。關於CTC模型,筆者認為可以這樣理解,假設32 x 256的圖片,數字串標籤是"123",把圖片按列切分(CTC會優化切分模型),然後分出來的每塊再去識別數字,找出這塊是每個數字或者特殊字元的概率(無法識別的則標記為特殊字元"-"),這樣就得到了基於輸入特徵序列(圖片)的每一個相互獨立建模單元個體(劃分出來的塊)(包括“-”節點在內)的類屬概率分佈。基於概率分佈,算出標籤序列是"123"的概率P(123),當然這裡設定"123"的概率為所有子序列之和,這裡子序列包括'-'和'1'、'2'、'3'的連續重複,如下圖所示:

所有子序列概率和

本文采用TF框架的CTC封裝實現,tf.nn.ctc_loss,我們最後的目標是最小化ctc_loss官方定義如下:

ctc_loss(
    labels,
    inputs,
    sequence_length,
    preprocess_collapse_repeated=False,
    ctc_merge_repeated=True,
    time_major=True
)

inputs: 輸入(訓練)資料,是一個三維float型的資料結構[max_time_step , batch_size , num_classes],當修改time_major = False時,[batch_size,max_time_step,num_classes]。
總體的資料流:
image_batch
->[batch_size,max_time_step,num_features]->lstm
->[batch_size,max_time_step,cell.output_size]->reshape
->[batch_sizemax_time_step,num_hidden]->affine projection A

W+b
->[batch_size*max_time_step,num_classes]->reshape
->[batch_size,max_time_step,num_classes]->transpose
->[max_time_step,batch_size,num_classes]

本文輸入圖片大小是(32,256),則num_features是32,max_time_step是256 代表最大劃分序列,其中cell.output_size == num_hidden,num_hidden及num_classes的值見下文常量定義

labels:OCR識別結果的標籤,是一個稀疏矩陣,下文訓練資料生成部分會有相關解釋

sequence_length: 一維資料,[max_time_step,…,max_time_step]長度為batch_size,值為max_time_step

因此我們需要做的就是將圖片的標籤label(需要OCR出的結果),圖片資料,以及圖片的長度轉換為labels,inputs,和sequence_length

正文

定義一些常量

#定義一些常量
#圖片大小,32 x 256
OUTPUT_SHAPE = (32,256)

#訓練最大輪次
num_epochs = 10000
#LSTM
num_hidden = 64
num_layers = 1

obj = gen_id_card()
num_classes = obj.len + 1 + 1  # 10位數字 + blank + ctc blank

#初始化學習速率
INITIAL_LEARNING_RATE = 1e-3
DECAY_STEPS = 5000
REPORT_STEPS = 100
LEARNING_RATE_DECAY_FACTOR = 0.9  # The learning rate decay factor
MOMENTUM = 0.9

DIGITS='0123456789'
BATCHES = 10
BATCH_SIZE = 64
TRAIN_SIZE = BATCHES * BATCH_SIZE

訓練資料集生成

訓練資料集的生成基本與上文一致,唯一變化就是增加生成隨機長度串的選項,對應方法如下:

def gen_text(self, is_ran=False):
        text = ''
        vecs = np.zeros((self.max_size * self.len))
        
        //唯一變化,隨機設定長度
        if is_ran == True:
            size = random.randint(1, self.max_size)
        else:
            size = self.max_size
            
        for i in range(size):
            c = random.choice(self.char_set)
            vec = self.char2vec(c)
            text = text + c
            vecs[i*self.len:(i+1)*self.len] = np.copy(vec)
        return text,vecs
# 生成一個訓練batch
def get_next_batch(batch_size=128):
    obj = gen_id_card()
    #(batch_size,256,32)
    inputs = np.zeros([batch_size, OUTPUT_SHAPE[1],OUTPUT_SHAPE[0]])
    codes = []

    for i in range(batch_size):
        #生成不定長度的字串
        image, text, vec = obj.gen_image(True)
        #np.transpose 矩陣轉置 (32*256,) => (32,256) => (256,32)
        inputs[i,:] = np.transpose(image.reshape((OUTPUT_SHAPE[0],OUTPUT_SHAPE[1])))
        #標籤轉成列表儲存在codes
        codes.append(list(text))
    #比如batch_size=2,兩條資料分別是"12"和"1",則targets [['1','2'],['1']]
    targets = [np.asarray(i) for i in codes]
    #targets轉成稀疏矩陣
    sparse_targets = sparse_tuple_from(targets)
    #(batch_size,) sequence_length值都是256,最大劃分列數
    seq_len = np.ones(inputs.shape[0]) * OUTPUT_SHAPE[1]

    return inputs, sparse_targets, seq_len

這裡我們來了解一下什麼是稀疏矩陣,下面是百度百科的定義

對於那些零元素數目遠遠多於非零元素數目,並且非零元素的分佈沒有規律的矩陣稱為稀疏矩陣(sparse)

其實很容易理解這裡為什麼OCR識別訓練的標籤labels是一個稀疏矩陣,假設我們生成的batch_size 是64的樣本,每個樣本是長度為1~18的數字串,則生成一個(64,18)的矩陣,矩陣有數字的是非零元素,無數字的是零元素,且因為這個標籤是不定長的,所以非零元素的分佈沒有規律,標籤在儲存數字串的同時還要儲存位置資訊。

我們來看一下tensorflow中,如何把targets轉成一個稀疏矩陣的

#轉化一個序列列表為稀疏矩陣    
def sparse_tuple_from(sequences, dtype=np.int32):
    """
    Create a sparse representention of x.
    Args:
        sequences: a list of lists of type dtype where each element is a sequence
    Returns:
        A tuple with (indices, values, shape)
    """
    indices = []
    values = []
    
    for n, seq in enumerate(sequences):
        indices.extend(zip([n] * len(seq), xrange(len(seq))))
        values.extend(seq)
 
    indices = np.asarray(indices, dtype=np.int64)
    values = np.asarray(values, dtype=dtype)
    shape = np.asarray([len(sequences), np.asarray(indices).max(0)[1] + 1], dtype=np.int64)
 
    return indices, values, shape

indices:二維int64的矩陣,代表非0的座標點
values:二維tensor,代表indice位置的資料值
dense_shape:一維,代表稀疏矩陣的大小

仍然拿剛才的兩個串"12"和"1"做例子,轉成的稀疏矩陣應該是
indecs = [[0,0],[0,1],[1,0]]
values = [1,2,1]
dense_shape = [2,2] (兩個數字串,最大長度為2)
代表dense tensor:

[[1,2],[1,0]]

有了序列列表轉稀疏矩陣的方法,反過來,當然也需要稀疏矩陣轉序列列表的方法:

def decode_sparse_tensor(sparse_tensor):
    decoded_indexes = list()
    current_i = 0
    current_seq = []
    for offset, i_and_index in enumerate(sparse_tensor[0]):
        i = i_and_index[0]
        if i != current_i:
            decoded_indexes.append(current_seq)
            current_i = i
            current_seq = list()
        current_seq.append(offset)
    decoded_indexes.append(current_seq)
    result = []
    for index in decoded_indexes:
        result.append(decode_a_seq(index, sparse_tensor))
    return result
    
def decode_a_seq(indexes, spars_tensor):
    decoded = []
    for m in indexes:
        str = DIGITS[spars_tensor[1][m]]
        decoded.append(str)
    return decoded

構建網路,開始訓練

資料準備工作完成,則開始構建LSTM+CTC的訓練模型,其中TF實現LSTM的方法就不做過多解釋,請讀者自行百度。

def get_train_model():
    inputs = tf.placeholder(tf.float32, [None, None, OUTPUT_SHAPE[0]])
    
    #定義ctc_loss需要的稀疏矩陣
    targets = tf.sparse_placeholder(tf.int32)
    
    #1維向量 序列長度 [batch_size,]
    seq_len = tf.placeholder(tf.int32, [None])
    
    #定義LSTM網路
    cell = tf.contrib.rnn.LSTMCell(num_hidden, state_is_tuple=True)
    stack = tf.contrib.rnn.MultiRNNCell([cell] * num_layers, state_is_tuple=True)
    outputs, _ = tf.nn.dynamic_rnn(cell, inputs, seq_len, dtype=tf.float32)
    
    shape = tf.shape(inputs)
    #[batch_size,256]
    batch_s, max_timesteps = shape[0], shape[1]
    
    #[batch_size*max_time_step,num_hidden]
    outputs = tf.reshape(outputs, [-1, num_hidden])
    W = tf.Variable(tf.truncated_normal([num_hidden,
                                          num_classes],
                                         stddev=0.1), name="W")
    b = tf.Variable(tf.constant(0., shape=[num_classes]), name="b")
    #[batch_size*max_timesteps,num_classes]
    logits = tf.matmul(outputs, W) + b
    #[batch_size,max_timesteps,num_classes]
    logits = tf.reshape(logits, [batch_s, -1, num_classes])
    #轉置矩陣,第0和第1列互換位置=>[max_timesteps,batch_size,num_classes]
    logits = tf.transpose(logits, (1, 0, 2))
    
    return logits, inputs, targets, seq_len, W, b
    

訓練模型

def train():
    global_step = tf.Variable(0, trainable=False)
    learning_rate = tf.train.exponential_decay(INITIAL_LEARNING_RATE,
                                                global_step,
                                                DECAY_STEPS,
                                                LEARNING_RATE_DECAY_FACTOR,
                                                staircase=True)
    logits, inputs, targets, seq_len, W, b = get_train_model()
    
    #tragets是一個稀疏矩陣
    loss = tf.nn.ctc_loss(labels=targets,inputs=logits, sequence_length=seq_len)
    cost = tf.reduce_mean(loss)
    
    #optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate,momentum=MOMENTUM).minimize(cost, global_step=global_step)
    optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss,global_step=global_step)
    
    #前面說的劃分塊之後找每塊的類屬概率分佈,ctc_beam_search_decoder方法,是每次找最大的K個概率分佈
    #還有一種貪心策略是隻找概率最大那個,也就是K=1的情況ctc_ greedy_decoder
    decoded, log_prob = tf.nn.ctc_beam_search_decoder(logits, seq_len, merge_repeated=False)
    
    acc = tf.reduce_mean(tf.edit_distance(tf.cast(decoded[0], tf.int32), targets))
    
    init = tf.global_variables_initializer()

    def report_accuracy(decoded_list, test_targets):
        original_list = decode_sparse_tensor(test_targets)
        detected_list = decode_sparse_tensor(decoded_list)
        true_numer = 0
        
        if len(original_list) != len(detected_list):
            print("len(original_list)", len(original_list), "len(detected_list)", len(detected_list),
                  " test and detect length desn't match")
            return
        print("T/F: original(length) <-------> detectcted(length)")
        for idx, number in enumerate(original_list):
            detect_number = detected_list[idx]
            hit = (number == detect_number)
            print(hit, number, "(", len(number), ") <-------> ", detect_number, "(", len(detect_number), ")")
            if hit:
                true_numer = true_numer + 1
        print("Test Accuracy:", true_numer * 1.0 / len(original_list))

    def do_report():
        test_inputs,test_targets,test_seq_len = get_next_batch(BATCH_SIZE)
        test_feed = {inputs: test_inputs,
                     targets: test_targets,
                     seq_len: test_seq_len}
        dd, log_probs, accuracy = session.run([decoded[0], log_prob, acc], test_feed)
        report_accuracy(dd, test_targets)
 
    def do_batch():
        train_inputs, train_targets, train_seq_len = get_next_batch(BATCH_SIZE)
        
        feed = {inputs: train_inputs, targets: train_targets, seq_len: train_seq_len}
        
        b_loss,b_targets, b_logits, b_seq_len,b_cost, steps, _ = session.run([loss, targets, logits, seq_len, cost, global_step, optimizer], feed)
        
        print b_cost, steps
        if steps > 0 and steps % REPORT_STEPS == 0:
            do_report()
            save_path = saver.save(session, "ocr.model", global_step=steps)
        return b_cost, steps
    
    with tf.Session() as session:
        session.run(init)
        saver = tf.train.Saver(tf.global_variables(), max_to_keep=100)
        for curr_epoch in xrange(num_epochs):
            print("Epoch.......", curr_epoch)
            train_cost = train_ler = 0
            for batch in xrange(BATCHES):
                start = time.time()
                c, steps = do_batch()
                train_cost += c * BATCH_SIZE
                seconds = time.time() - start
                print("Step:", steps, ", batch seconds:", seconds)
            
            train_cost /= TRAIN_SIZE
            
            train_inputs, train_targets, train_seq_len = get_next_batch(BATCH_SIZE)
            val_feed = {inputs: train_inputs,
                        targets: train_targets,
                        seq_len: train_seq_len}
 
            val_cost, val_ler, lr, steps = session.run([cost, acc, learning_rate, global_step], feed_dict=val_feed)
 
            log = "Epoch {}/{}, steps = {}, train_cost = {:.3f}, train_ler = {:.3f}, val_cost = {:.3f}, val_ler = {:.3f}, time = {:.3f}s, learning_rate = {}"
            print(log.format(curr_epoch + 1, num_epochs, steps, train_cost, train_ler, val_cost, val_ler, time.time() - start, lr))

訓練結果

訓練到第80個epoch的時候,64個測試樣本的準確率達到64%

訓練第80次後測試結果

訓練到第100個epoch的時候,64個測試樣本的準確率達到100%了,後續基本上準確率都是100%了

測試準確率達到100%

後記

最後完整的程式碼託管在我的Github

訓練產生的圖片資料屬於比較理想,無噪音的環境下,所以才會在100個epoch的時候準確率就達到100%了,實際應用中圖片可能會有些線段或者離散點的噪音,讀者可以自行在生成訓練集中增加一些噪音,測試模型訓練效果

本文生成的串的所屬類別僅是0~9的10個類別,如果後續加上26*2個大小寫英文字母,或者加上3500+常用中文漢字去組成串,隨著所屬類別的不斷擴大,模型還能不能很好的識別?模型收斂的速度如何?

在編寫本文示例程式碼的過程中,較多的參考了別人的程式碼和模型,很多東西深層的原理基本沒去了解。先做下記錄,後續再研究那些數學模型和推導公式之類的。