快速使用Tensorflow讀取7萬資料集!
一、Brief概述
這篇文章中,我們使用知名的圖片資料庫[THE MNIST DATABASE]作為我們的圖片來源,它的資料內容是一共七萬張28x28畫素的手寫陣列圖片。
並被分成六萬張訓練集與一萬張測試集,其中訓練集裡面,又有五千張圖片唄用來作為驗證使用,該資料庫是公認影象處理的"Hello World"入門級別庫,在此之前已經有數不清的研究,圍繞著這個模型展開。
不過初次看到這個庫之後,肯定是對其長相產生許多的疑問,我們從外觀上既看不到圖片本身,也看不到任何的索引線索,他就是四個壓縮包分別名稱如下圖:
對資料庫以此方法打包的理由需要從計算機對資料的運算過程和記憶體開始說起,人類直觀的影象是眼睛接收的光訊號,這些不同顏色的光用資料的方式儲存起來後由兩種主要的格式與其對應的格式內容:
1. .jpeg: height, width, channels;
2. .png: height, width, channels, alpha;
(注意:.png儲存格式的圖片含有透明度的資訊,在處理圖片的時候可以捨棄)
這些影象使用模組如opencv匯入到python中後,是以列表的方式呈現排列的資料,並且每次令image = cv2.imread()這類方式把資料指向到一個image物件時。
都是把資料存入記憶體的一個過程,在記憶體裡面的資料好處是可以非常快速的呼叫並處理,直到這個狀態我們才算佈置完資料被丟進演算法前的狀態。
然而,影象資料匯入記憶體的轉換並不是那麼的迅捷,首先必須先解析每個畫素的座標和顏色值,再把每一次讀取到的圖片資料值合起來後,放入快取中。
這樣的流程在移動和讀取上都顯然沒有優勢,因此我們需要把資料迴歸到其最基本的本質[二進位制]上。
二、Binary Data二進位制資料
Reasons for using binary data, 使用二進位制資料的理由
如果我們手上有成批的圖片資料,把它們傳入演算法中算結果的過程,就好比一個人怕上樓梯,坐上滑水道的入口,等待經歷一段未知的短暫旅程。
滑水道由很多個通道,一次可以讓假設五個人準備滑下,而這時候如果後面替補的人速度不夠快,就會造成該入口一定時間的空缺,直接導致效率低下。
而這個比喻中的滑水道入口,代表的是深度學習GPU計算埠,準備下滑的人代表資料本身,而我們現在需要優化的,就是如何讓GPU在還沒有處理完這一個資料之前,就已經為它準備好下一批預處理資料。
讓CPU永遠保持工作狀態可以進一步提升整體運算的效率。方法之一就是讓資料迴歸到[二進位制]的本質。
二進位制是資料在電腦硬碟儲存狀態的原貌,也是資料被處理時,最本質的狀態,因此批量圖片資料第一件要被處理的事情就是讓他們以二進位制的姿態被放入到記憶體中。
此舉就好比排隊玩滑水道的人們都要事前把鞋子手錶眼睛脫掉,帶著最需要的東西上去排隊後,等輪到自己時,一屁股坐上去擺好姿勢後就可以開始,沒有其他的冗餘動作拖慢時間。
而我選擇的入門資料庫MNIST已經很貼心的幫我們處理好預處理的部分,分為四個類別:
- 測試集影象資料: t10k-images-idx3-ubyte.gz;
- 測試集影象標籤: t10k-labels-idx1-ubyte.gz;
- 訓練集影象資料: train-images-idx3-ubyte.gz;
- 訓練集影象標籤: train-labels-idx1-ubyte.gz。
影象識別基本上都是屬於機器學習中的監督學習門類,因此四個類別其中兩個是對應圖片集的標籤集,都是使用二進位制的方法儲存檔案。
三、The approach to load images讀取資料的方法
既然知道了資料庫裡面的結構使二進位制資料,接下來就可以使用python裡面的模組包解析資料,壓縮檔案為.gz因此對應到開啟此檔案型別的模組名為gzip,程式碼如下:
import gzip, os
import numpy as np
location = input('The directory of MNIST dataset:')
path = os.path.join(location, 'train-images-idx3-ubyte.gz')
try:
with gzip.open(path, 'rb') as fi:
data_i = np.frombuffer(fi.read(), dtype = np.int8, offset=16)
images_flat_all = data_i.reshape(-1, 784)
print(images_flat_all)
print('------Separation -----')
print('Size of images_flat: ', len(images_flat_all))
except:
print("The file directory doesn't exist!")
### ------Result is shown below -----###
THe directory of MNIST dataset:/home/abc/MNIST_DATA
[[0 0 0 ... 0 0 0]
[0 0 0 ... 0 0 0]
[0 0 0 ... 0 0 0]
...
[0 0 0 ... 0 0 0]
[0 0 0 ... 0 0 0]
[0 0 0 ... 0 0 0]]
------ Separation -----
Size of images_flat: 60000
path_label = os.path.join(location, 'train-labels-idx1-ubyte.gz')
with gzip.open(path_label, 'rb') as fl:
data_l = np.frombuffer(fl.read(), dtype=np.int8, offset = 8)
print(data_l)
print('-----Separation -----')
print('Size of images_labels: ', len(data_l), type(data_l[0]))
### ----- Result is shown below ------ ###
[5 0 4 ... 5 6 8]
------Separation -----
Size of images_labels: 60000 <class 'numpy.int8'>
程式碼分為上下半段,上半段的程式碼用來提取MNIST DATASET中訓練集的六萬個影象樣本,每一個樣本都是由28×28尺寸的圖片資料拉直成一個1×784 長度的向量形式記錄下來。
下半段的程式碼則是提取對應訓練集影象的標籤,表示每一個圖片所描繪的數字實際上是多少,同樣也是六萬個標籤。(注:資料儲存格式同理測試集與其他種類資料庫。)
四、Explanation to the code程式碼說明
基於我們隊神經網路的瞭解,
一張圖片被用來放入神經網路解析的時候,需要把一個代表影象之二維矩陣的每條row拼成一個長條的一維向量,以此一向量作為一張圖片的計量單位。
而MNIST進一步把六萬張圖片的一維向量拼起來,形成一個超級長的向量後,以二進位制的方式儲存在電腦中,因此如果要讓人們可以影象化的看懂內部資料,就需要下面步驟還原資料:
-
使用 gzip.open 的 'rb' 讀取二進位制模式開啟指定的壓縮檔案;
-
為了轉換資料成為 np.array ,使用 .frombuffer;
-
原本的二進位制資料格式使用 dtype 修改成人類讀得懂的八進位制格式;
-
MNIST 原始資料中直到第十六位數才開始描述影象資訊,而資料標籤則是第八位就開始描述資訊,因此 offset 設定從第十六或是八位開始讀取;
-
讀出來的資料是一整條六萬個向量拼起來的資料,因此需要重新拼接資料, .reshape(-1, 784) 中的 -1 像一個未知數一樣,資料整形的過程中,只要 column = 784,那 row 是多少就是多少;
-
剝離出對應的標籤時,最後還需要對其使用 one_hot() 資料的轉換,讓標籤以例如 [0, 0, 0, 1, 0, 0, 0, 0, 0, 0] 的形式表示 "3" 的意思,目的是方便套入損失函式中運算,並尋找最優解。
把資料使用 numpy 陣列描述好處是處理效率高,且此庫和大多數資料處理的庫都相容,不論是便利性和效率都是很大的優勢。
後面兩個連結 "numpy.frombuffer" "在NumPy中使用動態陣列" 進一步深入的講述了函式的用法。
五、Linear Model線性模型
在理解資料集的資料格式和呼叫方法後,接下來就是把最簡單的線性模型應用到資料集中,並經過多次的梯度下降演算法迭代,找出我們為此模型定義的損失函式最小值。
回顧第一章的內容,一個線性函式的程式碼如下:
import numpy as np
import tensorflow as tf
x_data = np.random.rand(100).astype(np.float32)
y_data = x_data * 0.1 + 0.3
weight = tf.Variable(tf.random_uniform(shape=[1], minval=-1.0, maxval=1.0))
bias = tf.Variable(tf.zeros(shape=[1]))
y = weight * x_data + bias
loss = tf.reduce_mean(tf.square(y - y_data))
optimizer = tf.train.GradientDescentOptimizer(0.5)
training = optimizer.minimize(loss)
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
for step in range(101):
sess.run(training)
if step % 10 == 0:
print('Round {}, weight: {}, bias: {}'
.format(step, sess.run(weight[0]), sess.run(bias[0])))
其中我們可以看到沿著x軸上對應的y有兩組解,其中的y_data是我們預設的正解,而另外一個由wx + b計算產生的y則是我們要用來擬合正解的未知解,對應同一樣東西x的兩個不同的y軸值接下來需要被套入一個選定的損失函式中。
上面選中的是方差法,使用該方法算出損失函式後接著用reduce_mean()取平均,然後使用梯度下降演算法把該值降到儘可能低的地步。
同理影象資料的歸類問題,圖片的每一個畫素資料就好比一次上面計算的過程,如同x的角色,是正確標籤和預測標籤所共享的一個維度資料。
而y_data所對應的則是正確的標籤,預測的標籤則是經過一系列線性加法乘法與歸一化運算處理後才得出來的結果。
影象資料有一點在計算上看起來不同上面示例的地方是: 每一個畫素的計算被統一包含進了一個大的矩陣中,被作為整體運算的其中一個小單元平行處理,大大的加速整體運算的程序。
但是計算機處理物件的快取是有限的,我們需要適量的把影象資料放入快取中做平行處理,如果過載了則整個計算框架就會崩潰。
六、MNIST in Linear Model
梳理了一遍線性模型與MNIST資料集的組成元素後,接下來就是基於 Tensorflow搭建一個線性迴歸的手寫數字識別演算法,有以下幾點需要重新宣告:
-
batch size: 每一批次訓練圖片的數量需要調控以免記憶體不夠;
-
loss function: 損失函式的原理是計算預測和實際答案之間的差距。
接下來就是制定訓練步驟:
-
需要一個很簡單方便的方法呼叫我們需要的 MNIST 資料,因此需要寫一個類;
-
開始搭建 Tensorflow 資料流圖,用節點設計一個 wx + b 的線性運算;
-
把運算結果和實際標籤帶入損失函式中求出損失值;
-
使用梯度下降法求出損失值的最小值;
-
迭代訓練後,檢視訓練結果的準確率;
-
檢查錯誤判斷的圖片被歸類成了什麼標籤。
import gzip, os
import numpy as np
################ Step No.1 to well manage the dataset. ################
class MNIST:
# Images size is told in the official website 28*28 px.
image_size = 28
image_size_flat = image_size * image_size
# Let the validation set flexible when making an instance.
def __init__(self, val_ratio=0.1, data_dir='MNIST_data'):
self.val_ratio = val_ratio
self.data_dir = data_dir
# Load 4 files to individual lists with one string pixels.
img_train = self.load_flat_images('train-images-idx3-ubyte.gz')
lab_train = self.load_labels('train-labels-idx1-ubyte.gz')
img_test = self.load_flat_images('t10k-images-idx3-ubyte.gz')
lab_test = self.load_labels('t10k-labels-idx1-ubyte.gz')
# Determine the actual number of training / validation sets.
self.val_train_num = round(len(img_train) * self.val_ratio)
self.main_train_num = len(img_train) - self.val_train_num
# The normalized image pixels value can be more convenient when training.
# dtype=np.int64 would be more general when applying to Tensorflow.
self.img_train = img_train[0:self.main_train_num] / 255.0
self.lab_train = lab_train[0:self.main_train_num].astype(np.int)
self.img_train_val = img_train[self.main_train_num:] / 255.0
self.lab_train_val = lab_train[self.main_train_num:].astype(np.int)
# Also convert the format of testing set.
self.img_test = img_test / 255.0
self.lab_test = lab_test.astype(np.int)
# Extract the same codes from "load_flat_images" and "load_labels".
# This method won't be called during training procedure.
def load_binary_to_num(self, dataset_name, offset):
path = os.path.join(self.data_dir, dataset_name)
with gzip.open(path, 'rb') as binary_file:
# The datasets files are stored in 8 bites, mind the format.
data = np.frombuffer(binary_file.read(), np.uint8, offset=offset)
return data
# This method won't be called during training procedure.
def load_flat_images(self, dataset_name):
# Images offset position is 16 by default format
data = self.load_binary_to_num(dataset_name, offset=16)
images_flat_all = data.reshape(-1, self.image_size_flat)
return images_flat_all
# This method won't be called during training procedure.
def load_labels(self, dataset_name):
# Labels offset position is 8 by default format.
labels_all = self.load_binary_to_num(dataset_name, offset=8)
return labels_all
# This method would be called for training usage.
def one_hot(self, labels):
# Properly use numpy module to mimic the one hot effect.
class_num = np.max(self.lab_test) + 1
convert = np.eye(class_num, dtype=float)[labels]
return convert
#---------------------------------------------------------------------#
path = '/home/abc/MNIST_data'
data = MNIST(val_ratio=0.1, data_dir=path)
import tensorflow as tf
flat_size = data.image_size_flat
label_num = np.max(data.lab_test) + 1
################ Step No.2 to construct tensor graph. ################
x_train= tf.placeholder(dtype=tf.float32, shape=[None, flat_size])
t_label_oh = tf.placeholder(dtype=tf.float32, shape=[None, label_num])
t_label = tf.placeholder(dtype=tf.int64, shape=[None])
################ These are the values ################
# Initialize the beginning weights and biases by random_normal method.
weights = tf.Variable(tf.random_normal([flat_size, label_num],
mean=0.0, stddev=1.0,
dtype=tf.float32))
biases = tf.Variable(tf.random_normal([label_num], mean=0.0, stddev=1.0,
dtype=tf.float32))
########### that we wish to get by training ##########
logits = tf.matmul(x_train, weights) + biases # < Annotation No.1 >
# Shrink the distances between values into 0 to 1 by softmax formula.
p_label_soh = tf.nn.softmax(logits)
# Pick the position of largest value along y axis.
p_label = tf.argmax(p_label_soh, axis=1)
#---------------------------------------------------------------------#
####### Step No.3 to get a loss value by certain loss function. #######
# This softmax function can not accept input being "softmaxed" before.
CE = tf.nn.softmax_cross_entropy_with_logits_v2(logits=logits, labels=t_label_oh)
# Shrink all loss values in a matrix to only one averaged loss.
loss = tf.reduce_mean(CE)
#---------------------------------------------------------------------#
#### Step No.4 get a minimized loss value using gradient descent. ####
# Decrease this only averaged loss to a minimum value by using gradient descent.
optimizer = tf.train.AdamOptimizer(learning_rate=0.5).minimize(loss)
#---------------------------------------------------------------------#
# First return a boolean list values by tf.equal function
correct_predict = tf.equal(p_label, t_label)
# And cast them into 0 and 1 values so that its average value would be accuracy.
accuracy = tf.reduce_mean(tf.cast(correct_predict, dtype=tf.float32))
sess = tf.Session()
sess.run(tf.global_variables_initializer())
###### Step No.5 iterate the training set and check the accuracy. #####
# The trigger to train the linear model with a defined cycles.
def optimize(iteration, batch_size=32):
for i in range(iteration):
total = len(data.lab_train)
random = np.random.randint(0, total, size=batch_size)
# Randomly pick training images / labels with a defined batch size.
x_train_batch = data.img_train[random]
t_label_batch_oh = data.one_hot(data.lab_train[random])
batch_dict = {
x_train: x_train_batch,
t_label_oh: t_label_batch_oh
}
sess.run(optimizer, feed_dict=batch_dict)
# The trigger to check the current accuracy value
def Accuracy():
# Use the totally separate dataset to test the trained model
test_dict = {
x_train: data.img_test,
t_label_oh: data.one_hot(data.lab_test),
t_label: data.lab_test
}
Acc = sess.run(accuracy, feed_dict=test_dict)
print('Accuracy on Test Set: {0:.2%}'.format(Acc))
#---------------------------------------------------------------------#
### Step No.6 plot wrong predicted pictures with its predicted label.##
import matplotlib.pyplot as plt
# We can decide how many wrong predicted images are going to be shown up.
# We can focus on the specific wrong predicted labels
def wrong_predicted_images(pic_num=[3, 4], label_number=None):
test_dict = {
x_train: data.img_test,
t_label_oh: data.one_hot(data.lab_test),
t_label: data.lab_test
}
correct_pred, p_lab = sess.run([correct_predict, p_label],
feed_dict=test_dict)
# To reverse the boolean value in order to pick up wrong labels
wrong_pred = (correct_pred == False)
# Pick up the wrong doing elements from the corresponding places
wrong_img_test = data.img_test[wrong_pred]
wrong_t_label = data.lab_test[wrong_pred]
wrong_p_label = p_lab[wrong_pred]
fig, axes = plt.subplots(pic_num[0], pic_num[1])
fig.subplots_adjust(hspace=0.3, wspace=0.3)
edge = data.image_size
for ax in axes.flat:
# If we were not interested in certain label number,
# pick up the wrong predicted images randomly.
if label_number is None:
i = np.random.randint(0, len(wrong_t_label),
size=None, dtype=np.int)
pic = wrong_img_test[i].reshape(edge, edge)
ax.imshow(pic, cmap='binary')
xlabel = "True: {0}, Pred: {1}".format(wrong_t_label[i],
wrong_p_label[i])
# If we are interested in certain label number,
# pick up the specific wrong images number randomly.
else:
# Mind that np.where return a "tuple" that should be indexing.
specific_idx = np.where(wrong_t_label==label_number)[0]
i = np.random.randint(0, len(specific_idx),
size=None, dtype=np.int)
pic = wrong_img_test[specific_idx[i]].reshape(edge, edge)
ax.imshow(pic, cmap='binary')
xlabel = "True: {0}, Pred: {1}".format(wrong_t_label[specific_idx[i]],
wrong_p_label[specific_idx[i]])
ax.set_xlabel(xlabel)
# Pictures don't need any ticks, so we remove them in both dimensions
ax.set_xticks([])
ax.set_yticks([])
plt.show()
#---------------------------------------------------------------------#
Accuracy() # Accuracy before doing anything
optimize(10); Accuracy() # Iterate 10 times
optimize(1000); Accuracy() # Iterate 10 + 1000 times
optimize(10000); Accuracy() # Iterate 10 + 1000 + 10000 times
### ----- Results are shown below ----- ###
Accuracy on Test Set: 11.51%
Accuracy on Test Set: 68.37%
Accuracy on Test Set: 86.38%
Accuracy on Test Set: 89.34%
Annotation No.1 tf.matmul(x_train, weights)
這個環節是在瞭解整個神經網路訓練原理後,最重要的一個子標題,計算的矩陣模型中必須兼顧 random_batch 提取隨意多的資料集,同時符合矩陣乘法的運算原理,如下圖描述:
矩陣位置前後順序很重要,由於資料集本身經過我們處理後,就是左邊矩陣的格式,在期望輸出為右邊矩陣的情況下,只能是 x·w 的順序,以 x 的隨機列數來決定後面預測的標籤列數, w 則決定有幾個歸類標籤。
Reason of using one_hot()
資料集經過一番線性運算後得出的結果如上圖所見,只能是 size=[None, 10] 的大小,但是資料集給的標籤答案是數字本身,因此我們需要一個手段把數字轉換成 10 個元素組成的向量,而第一選擇方法就是 one_hot() ,同時使用 one_hot 的結果來計算損失函式.
七、Finally
呼叫上面定義的函式,如下程式碼:
wrong_predicted_images(pic_num=[3, 3], label_number=5)
其中可以自行選擇想要一次陳列幾張圖片,每次陳列的圖片都是隨機選擇,並同時可以選擇想檢視的標籤類別,如上面一行函式設定為 5 ,則就只顯示標籤 5 的錯誤判斷圖片和誤判結果。最後等整個框架計算完畢後,需要執行下面程式碼結束 tf.Session ,釋放記憶體:
sess.close()