1. 程式人生 > >Kaggle競賽丨入門手寫數字識別之KNN、CNN、降維

Kaggle競賽丨入門手寫數字識別之KNN、CNN、降維

引言

  • 這段時間來,看了西瓜書、藍皮書,各種機器學習演算法都有所瞭解,但在實踐方面卻缺乏相應的鍛鍊。於是我決定通過Kaggle這個平臺來提升一下自己的應用能力,培養自己的資料分析能力。
  • 我個人的計劃是先從簡單的資料集入手如手寫數字識別、泰坦尼克號、房價預測,這些目前已經有豐富且成熟的方案可以參考,之後關注未來就業的方向如計算廣告、點選率預測,有合適的時機,再與小夥伴一同參加線上比賽。

資料集

介紹

  • MNIST ("Modified National Institute of Standards and Technology")是計算機視覺中最典型的資料集之一,其一共包含訓練集train.csv

    ,測試集test.csv和提交示例sample_submission.csvcsv是一種資料格式,以逗號作為檔案的分隔值。

  • 訓練集train.csv包含4000028*28=784的圖片,圖片畫素值為0-255,每張圖片有對應的標籤,其資料格式如下,可以看作是一個40000 * 785的矩陣,第一列存放標籤;

  • 測試集test.csv包含2800028*28=784的圖片,其不提供標籤,矩陣維度為28000*784

讀取資料集

觀察到不同方案中資料的讀取方法各不同,這裡小結一下。

  • csv
  def loadTrainData():
      l=[]
      with open('/kaggle/input/digit-recognizer/train.csv') as file:
          lines = csv.reader(file)
          for line in lines:
              l.append(line)
  • open
  # load csv files to numpy arrays
  def load_data(data_dir):
      train_data = open(data_dir + "train.csv").read()
  • numpy
  def load_data(path):
      with np.load(path) as f:
          x_train, y_train = f['x_train'], f['y_train']
          x_test, y_test = f['x_test'], f['y_test']
          return (x_train, y_train), (x_test, y_test)
  • panda
  train = pd.read_csv('../input/train.csv')

K近鄰演算法KNN

  • 這裡不再介紹kNN的原理,貼一個簡潔的實現,參考自https://blog.csdn.net/u012162613/article/details/41929171,其主要採用了二值化、L2範數作為距離度量。

實現A

  from numpy import *
  import csv
  
  # 讀取訓練集
  def loadTrainData():
      l=[]
      with open('/kaggle/input/digit-recognizer/train.csv') as file:
          lines = csv.reader(file)
          for line in lines:
              l.append(line)
          l.remove(l[0])
          l=array(l)
          data, label = l[:,1:], l[:,0]
          label = label[:,newaxis]
          a = normalizing(toInt(data))
          b = toInt(label)
          return a, b
      
  # 字元轉整形
  def toInt(array):
      array = mat(array)
      m,n = shape(array)
      newArray = zeros((m,n))
      for i in range(m):
          for j in range(n):
              newArray[i,j]=int(array[i,j])
      return newArray
  
  # 二值化處理
  def normalizing(array):
      m,n = shape(array)
      for i in range(m):
          for j in range(n):
              if array[i,j] != 0:
                  array[i,j]=1
      return array
  
  # 載入測試集
  def loadTestData():
      l=[]
      with open('/kaggle/input/digit-recognizer/test.csv') as file:
          lines = csv.reader(file)
          for line in lines:
              l.append(line)
          l.remove(l[0])
          l=array(l)
          data=l
          return normalizing(toInt(data))
  
  def loadTestResult():
      l=[]
      with open('/kaggle/input/digit-recognizer/sample_submission.csv') as file:
          lines = csv.reader(file)
          for line in lines:
              l.append(line)
          l.remove(l[0])
          l=array(l)
          label=l[:,1]
          label = label[:, newaxis]
          return toInt(label)
  
  # 儲存結果
  def saveResult(result):
      with open('/kaggle/working/knn.csv', 'w', newline='') as myFile:
          myWriter = csv.writer(myFile)
          myWriter.writerow(['ImageId','Label'])
          for i, label in enumerate(result):
              tmp = [i+1, int(label)]
              myWriter.writerow(tmp)
  
  # kNN分類
  def classify(inX, dataSet, labels, k):
      inX = mat(inX)
      dataSet = mat(dataSet)
      labels = mat(labels)
      dataSetSize = dataSet.shape[0]
      diffMat = tile(inX, (dataSetSize, 1)) - dataSet
  
      spDiffMat = array(diffMat) ** 2
      spDistances = spDiffMat.sum(axis=1)
      
    #計算L2距離
      distances = spDistances ** 0.5
      sortedDistIndicies = distances.argsort()
      classCount = {}
      for i in range(k):
          voteIlabel = labels[sortedDistIndicies[i],0]
          classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1
      sortedClassCount = sorted(classCount.items(), key=lambda item:item[1], reverse=True)
      return sortedClassCount[0][0]
  
  # 主函式
  def handwritingClassTest():
      trainData,trainLabel=loadTrainData()
      testData =loadTestData()
      testLabel = loadTestResult()
      m,n=shape(testData)
      errorCount=0
      resultList=[]
      for i in range(m):
          classifierResult = classify(testData[i], trainData, trainLabel, 1)
          resultList.append(classifierResult)
          print("the classifier for %d came back with: %d, the real answer is: %d" % (i, classifierResult, testLabel[i]))
          if (classifierResult != testLabel[i]): errorCount += 1.0
      print("\nthe total number of errors is: %d" % errorCount)
      print("\nthe total error rate is: %f" % (errorCount/float(m)))
      saveResult(resultList)
  
  handwritingClassTest()
  • 結果:k=5,準確率96.40%;k=1,準確率96.27%。PS:按照個人理解,K值越小,結果應該更高才對。隨後我換了另一個實現,其採用了numpy實現矩陣計算,執行速度比上面的程式碼塊多了。

實現B

  import numpy as np
  import matplotlib.pyplot as plt
  from collections import Counter
  import time
  
  %matplotlib inline
  plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
  plt.rcParams['image.interpolation'] = 'nearest'
  plt.rcParams['image.cmap'] = 'gray'
  
  # load csv files to numpy arrays
  def load_data(data_dir):
      train_data = open(data_dir + "train.csv").read()
      train_data = train_data.split("\n")[1:-1]
      train_data = [i.split(",") for i in train_data]
      # print(len(train_data))
      X_train = np.array([[int(i[j]) for j in range(1,len(i))] for i in train_data])
      y_train = np.array([int(i[0]) for i in train_data])
  
      # print(X_train.shape, y_train.shape)
  
      test_data = open(data_dir + "test.csv").read()
      test_data = test_data.split("\n")[1:-1]
      test_data = [i.split(",") for i in test_data]
      # print(len(test_data))
      X_test = np.array([[int(i[j]) for j in range(0,len(i))] for i in test_data])
  
      # print(X_test.shape)
  
      return X_train, y_train, X_test
  
  
  class simple_knn():
      "a simple kNN with L2 distance"
  
      def __init__(self):
          pass
  
      def train(self, X, y):
          self.X_train = X
          self.y_train = y
  
      def predict(self, X, k=1):
          dists = self.compute_distances(X)
          # print("computed distances")
  
          num_test = dists.shape[0]
          y_pred = np.zeros(num_test)
  
          for i in range(num_test):
              k_closest_y = []
              labels = self.y_train[np.argsort(dists[i,:])].flatten()
              # find k nearest lables
              k_closest_y = labels[:k]
  
              # out of these k nearest lables which one is most common
              # for 5NN [1, 1, 1, 2, 3] returns 1
              # break ties by selecting smaller label
              # for 5NN [1, 2, 1, 2, 3] return 1 even though 1 and 2 appeared twice.
              c = Counter(k_closest_y)
              y_pred[i] = c.most_common(1)[0][0]
  
          return(y_pred)
  
      def compute_distances(self, X):
          num_test = X.shape[0]
          num_train = self.X_train.shape[0]
  
          dot_pro = np.dot(X, self.X_train.T)
          sum_square_test = np.square(X).sum(axis = 1)
          sum_square_train = np.square(self.X_train).sum(axis = 1)
          dists = np.sqrt(-2 * dot_pro + sum_square_train + np.matrix(sum_square_test).T)
  
          return(dists)
  # runs for 13 minutes
  predictions = []
  
  for i in range(int(len(X_test)/(2*batch_size))):
      # predicts from i * batch_size to (i+1) * batch_size
      print("Computing batch " + str(i+1) + "/" + str(int(len(X_test)/batch_size)) + "...")
      tic = time.time()
      predts = classifier.predict(X_test[i * batch_size:(i+1) * batch_size], k)
      toc = time.time()
      predictions = predictions + list(predts)
  #     print("Len of predictions: " + str(len(predictions)))
      print("Completed this batch in " + str(toc-tic) + " Secs.")
  
  print("Completed predicting the test data.")
  
  # runs for 13 minutes
  # uncomment predict lines to predict second half of test data
  
  for i in range(int(len(X_test)/(2*batch_size)), int(len(X_test)/batch_size)):
      # predicts from i * batch_size to (i+1) * batch_size
      print("Computing batch " + str(i+1) + "/" + str(int(len(X_test)/batch_size)) + "...")
      tic = time.time()
      predts = classifier.predict(X_test[i * batch_size:(i+1) * batch_size], k)
      toc = time.time()
      predictions = predictions + list(predts)
  #     print("Len of predictions: " + str(len(predictions)))
      print("Completed this batch in " + str(toc-tic) + " Secs.")
  
  print("Completed predicting the test data.")
  
  out_file = open("predictions.csv", "w")
  out_file.write("ImageId,Label\n")
  for i in range(len(predictions)):
      out_file.write(str(i+1) + "," + str(int(predictions[i])) + "\n")
  out_file.close()
  • 結果:K=5,96.90%,k=1,97.11%;相同的k值,實現B的準確率比實現A要高,原因是實現B未採用二值化,保留了更多的數字影象資訊。

卷積神經網路CNN

  • 這裡主要基於Pytorch實現。

資料載入

  • 這裡transform的作用是將PIL圖片轉化為Tensor,並進行歸一化。具體可參考:https://zhuanlan.zhihu.com/p/27382990
  # Construct the transform
  import torchvision.transforms as transforms
  from   PIL import Image
  transform = transforms.Compose([
          transforms.ToTensor(),
          transforms.Normalize((0.5,), (0.5,))
      ])
  
  # Get the device we're training on
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  
  def get_digits(df):
      """Loads images as PyTorch tensors"""
      # Load the labels if they exist 
      # (they wont for the testing data)
      labels = []
      start_inx = 0
      if 'label' in df.columns:
          labels = [v for v in df.label.values]
          start_inx = 1
          
      # Load the digit information
      digits = []
      for i in range(df.pixel0.size):
          digit = df.iloc[i].astype(float).values[start_inx:]
          digit = np.reshape(digit, (28,28))
          digit = transform(digit).type('torch.FloatTensor')
          if len(labels) > 0:
              digits.append([digit, labels[i]])
          else:
              digits.append(digit)
  
      return digits
  • SubsetRandomSampler對資料進行隨機劃分,num_workers採用多執行緒載入資料。可參考:https://stackoverflow.com/questions/53998282/how-does-the-number-of-workers-parameter-in-pytorch-dataloader-actually-work
  # Load the training data
  train_X = get_digits(train)
  
  # Some configuration parameters
  num_workers = 0    # number of subprocesses to use for data loading
  batch_size  = 64   # how many samples per batch to load
  valid_size  = 0.2  # percentage of training set to use as validation
  
  # Obtain training indices that will be used for validation
  num_train = len(train_X)
  indices   = list(range(num_train))
  np.random.shuffle(indices)
  split     = int(np.floor(valid_size * num_train))
  train_idx, valid_idx = indices[split:], indices[:split]
  
  # Define samplers for obtaining training and validation batches
  from torch.utils.data.sampler import SubsetRandomSampler
  train_sampler = SubsetRandomSampler(train_idx)
  valid_sampler = SubsetRandomSampler(valid_idx)
  
  # Construct the data loaders
  train_loader = torch.utils.data.DataLoader(train_X, batch_size=batch_size,
                      sampler=train_sampler, num_workers=num_workers)
  valid_loader = torch.utils.data.DataLoader(train_X, batch_size=batch_size, 
                      sampler=valid_sampler, num_workers=num_workers)
  
  # Test the size and shape of the output
  dataiter = iter(train_loader)
  images, labels = dataiter.next()
  print(type(images))
  print(images.shape)
  print(labels.shape)
  

網路模型

  • 網路的結果主要分為cnn_layersfc_layers,個人認為fc_layers有些繁雜。
  # Import the necessary modules
  import torch.nn as nn
  
  def calc_out(in_layers, stride, padding, kernel_size, pool_stride):
      """
      Helper function for computing the number of outputs from a
      conv layer
      """
      return int((1+(in_layers - kernel_size + (2*padding))/stride)/pool_stride)
  
  # define the CNN architecture
  class Net(nn.Module):
      def __init__(self):
          super(Net, self).__init__()
  
          # Some helpful values
          inputs      = [1,32,64,64]
          kernel_size = [5,5,3]
          stride      = [1,1,1]
          pool_stride = [2,2,2]
  
          # Layer lists
          layers = []
  
          self.out   = 28
          self.depth = inputs[-1]
          for i in range(len(kernel_size)):
              # Get some variables
              padding = int(kernel_size[i]/2)
  
              # Define the output from this layer
              self.out = calc_out(self.out, stride[i], padding,
                                  kernel_size[i], pool_stride[i])
  
              # convolutional layer 1
              layers.append(nn.Conv2d(inputs[i], inputs[i], kernel_size[i], 
                                         stride=stride[i], padding=padding))
              layers.append(nn.ReLU())
              
              # convolutional layer 2
              layers.append(nn.Conv2d(inputs[i], inputs[i+1], kernel_size[i], 
                                         stride=stride[i], padding=padding))
              layers.append(nn.ReLU())
              # maxpool layer
              layers.append(nn.MaxPool2d(pool_stride[i],pool_stride[i]))
              layers.append(nn.Dropout(p=0.2))
  
          self.cnn_layers = nn.Sequential(*layers)
          
          print(self.depth*self.out*self.out)
          
          # Now for our fully connected layers
          layers2 = []
          layers2.append(nn.Dropout(p=0.2))
          layers2.append(nn.Linear(self.depth*self.out*self.out, 512))
          layers2.append(nn.Dropout(p=0.2))
          layers2.append(nn.Linear(512, 256))
          layers2.append(nn.Dropout(p=0.2))
          layers2.append(nn.Linear(256, 256))
          layers2.append(nn.Dropout(p=0.2))
          layers2.append(nn.Linear(256, 10))
  
          self.fc_layers = nn.Sequential(*layers2)
          
          self.fc_last = nn.Linear(self.depth*self.out*self.out, 10)
  
      def forward(self, x):
          x = self.cnn_layers(x)
          x = x.view(-1, self.depth*self.out*self.out)
          x = self.fc_layers(x)
  #        x = self.fc_last(x)
          
          return x
      
  # create a complete CNN
  model = Net()
  model
  

模型訓練

  • 定義優化器,這裡採用Adam。
  import torch.optim as optim
  
  # specify loss function
  criterion = nn.CrossEntropyLoss()
  
  # specify optimizer
  optimizer = optim.Adam(model.parameters(), lr=0.0005)
  
  • 採用交叉驗證法,即從訓練集中劃分一定比例的驗證集作為評價標準,防止過擬合。
  # number of epochs to train the model
  n_epochs = 25 # you may increase this number to train a final model
  
  valid_loss_min = np.Inf # track change in validation loss
  
  # Additional rotation transformation
  #rand_rotate = transforms.Compose([
  #    transforms.ToPILImage(),
  #    transforms.RandomRotation(20),
  #    transforms.ToTensor()
  #])
  
  # Get the device
  print(device)
  model.to(device)
  tLoss, vLoss = [], []
  for epoch in range(n_epochs):
  
      # keep track of training and validation loss
      train_loss = 0.0
      valid_loss = 0.0
      
      #########
      # train #
      #########
      model.train()
      for data, target in train_loader:
          # move tensors to GPU if CUDA is available
          data   = data.to(device)
          target = target.to(device)
          
          # clear the gradients of all optimized variables
          optimizer.zero_grad()
          # forward pass: compute predicted outputs by passing inputs to the model
          output = model(data)
          # calculate the batch loss
          loss = criterion(output, target)
          # backward pass: compute gradient of the loss with respect to model parameters
          loss.backward()
          # perform a single optimization step (parameter update)
          optimizer.step()
          # update training loss
          train_loss += loss.item()*data.size(0)
          
      ############
      # validate #
      ############
      model.eval()
      for data, target in valid_loader:
          # move tensors to GPU if CUDA is available
          data   = data.to(device)
          target = target.to(device)
          # forward pass: compute predicted outputs by passing inputs to the model
          output = model(data)
          # calculate the batch loss
          loss = criterion(output, target)
          # update average validation loss 
          valid_loss += loss.item()*data.size(0)
      
      # calculate average losses
      train_loss = train_loss/len(train_loader.dataset)
      valid_loss = valid_loss/len(valid_loader.dataset)
      tLoss.append(train_loss)
      vLoss.append(valid_loss)
          
      # print training/validation statistics 
      print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
          epoch, train_loss, valid_loss))
      
      # save model if validation loss has decreased
      if valid_loss <= valid_loss_min:
          print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
          valid_loss_min,
          valid_loss))
          torch.save(model.state_dict(), 'model_cifar.pt')
          valid_loss_min = valid_loss
  
  • 繪製Loss曲線
  # Plot the resulting loss over time
  plt.plot(tLoss, label='Training Loss')
  plt.plot(vLoss, label='Validation Loss')
  plt.legend();
  

訓練結果

  • 這裡只展示模型在驗證集上的結果,採用混淆矩陣表示(Confusion Matrix)。
  • 該矩陣中\((i,j)\)表示原本為\(i\)的樣本被判定為\(j\)的數目。理想情況不存在誤判,只有對角線上有值,其他部分為0。但我們的結果顯示多多少少存在一些誤判,比如\((9,4)\)表示原本為9的樣本被誤判為了4,這可以理解,因為4和9確實很相近。

測試結果

  • 載入訓練權重
  model.load_state_dict(torch.load('model_cifar.pt'));
  
  • 載入測試集
  # Define the test data loader
  test        = pd.read_csv("../input/digit-recognizer/test.csv")
  test_X      = get_digits(test)
  test_loader = torch.utils.data.DataLoader(test_X, batch_size=batch_size, 
                                            num_workers=num_workers)
  
  • 預測並儲存結果
  # Create storage objects
  ImageId, Label = [],[]
  
  # Loop through the data and get the predictions
  for data in test_loader:
      # Move tensors to GPU if CUDA is available
      data = data.to(device)
      # Make the predictions
      output = model(data)
      # Get the most likely predicted digit
      _, pred = torch.max(output, 1)
      
      for i in range(len(pred)):        
          ImageId.append(len(ImageId)+1)
          Label.append(pred[i].cpu().numpy())
  
  sub = pd.DataFrame(data={'ImageId':ImageId, 'Label':Label})
  sub.describe
  
  # Write to csv file ignoring index column
  sub.to_csv("submission.csv", index=False)
  
  • 最終的結果是98.90%,比KNN要高接近兩個點,而我將網路模型中的fc_layers替換成一層普通的全連線層後,結果變成了99.21%。

降維Dimensionality Reduction

  • 在高維資料下,演算法的效能可能會變得很差,即維度災難。因此我們使用降維方法將資料從高維投影到低維,這樣學習演算法將會簡單很多。

主成幹分析PCA

  • PCA是一類線性變換,將原始特徵投射到子空間並且儘可能保留資訊。因此演算法嘗試尋找最合適的方向和角度(即主成分)來最大化子空間的方差。

  • 演算法

  • 實現

    # Standardize the data
    from sklearn.preprocessing import StandardScaler
    X = train.values
    X_std = StandardScaler().fit_transform(X)
    
    # Calculating Eigenvectors and eigenvalues of Cov matirx
    mean_vec = np.mean(X_std, axis=0)
    cov_mat = np.cov(X_std.T)
    eig_vals, eig_vecs = np.linalg.eig(cov_mat)
    # Create a list of (eigenvalue, eigenvector) tuples
    eig_pairs = [ (np.abs(eig_vals[i]),eig_vecs[:,i]) for i in range(len(eig_vals))]
    
    # Sort the eigenvalue, eigenvector pair from high to low
    eig_pairs.sort(key = lambda x: x[0], reverse= True)
    
    # Calculation of Explained Variance from the eigenvalues
    tot = sum(eig_vals)
    var_exp = [(i/tot)*100 for i in sorted(eig_vals, reverse=True)] # Individual explained variance
    cum_var_exp = np.cumsum(var_exp) # Cumulative explained variance
    
    
  • 視覺化

    • 單獨的方差(黑色)隨著維度增大而減小,累計方差隨著維度的增大而飽和。90%的方差可用前200個維度來表示。

    • 視覺化PCA找到的前30個最大方差方向上的特徵值。

      # Invoke SKlearn's PCA method
      n_components = 30
      pca = PCA(n_components=n_components).fit(train.values)
      
      eigenvalues = pca.components_.reshape(n_components, 28, 28)
      
      # Extracting the PCA components ( eignevalues )
      #eigenvalues = pca.components_.reshape(n_components, 28, 28)
      eigenvalues = pca.components_
      
      n_row = 4
      n_col = 7
      
      # Plot the first 8 eignenvalues
      plt.figure(figsize=(13,12))
      for i in list(range(n_row * n_col)):
          offset =0
          plt.subplot(n_row, n_col, i + 1)
          plt.imshow(eigenvalues[i].reshape(28,28), cmap='jet')
          title_text = 'Eigenvalue ' + str(i + 1)
          plt.title(title_text, size=6.5)
          plt.xticks(())
          plt.yticks(())
      plt.show()
      
      

    • 用5個特徵做PCA並可視化前2個特徵(程式碼略),資料點被分為幾個叢集,每個叢集就是一類數字。

    • 由於PCA是無監督方法,這裡也沒有提供標籤,於是我們接著採用K-means聚類演算法並可視化。

      from sklearn.cluster import KMeans # KMeans clustering 
      # Set a KMeans clustering with 9 components ( 9 chosen sneakily ;) as hopefully we get back our 9 class labels)
      kmeans = KMeans(n_clusters=9)
      # Compute cluster centers and predict cluster indices
      X_clustered = kmeans.fit_predict(X_5d)
      
      

線性判別分析LDA

參考https://www.cnblogs.com/pinard/p/6244265.html

  • LDA跟PCA一樣,也採用線性降維,但其是監督的。

  • 演算法過程如下

  • 實現、視覺化

    lda = LDA(n_components=5)
    # Taking in as second argument the Target as labels
    X_LDA_2D = lda.fit_transform(X_std, Target.values )
    
    

  • LDA vs PCA

t-SNE(t-Distributed Stochastic Neighbour Embedding)

  • 不同於PCA、LDA,t-SNE是非線性、基於概率的降維方法。
  • 演算法不同於尋找最大資訊分離的方向,t-SNE將歐氏距離轉化為條件概率,然後對概率應用t分佈。概率應用衡量資料點之間的相似性。
  • 實現、視覺化
  # Invoking the t-SNE method
  tsne = TSNE(n_components=2)
  tsne_results = tsne.fit_transform(X_std)
  

  • 相比PCA、LDA,資料點被更直觀的分離,t-SNE更好地保留了資料的拓撲資訊,但t-SNE的缺點是識別叢集會出現多個區域性極小點,可見顏色相同的叢集被分為兩個子群。

參考

  • Kaggle入門,看這一篇就夠了
  • 大資料競賽平臺——Kaggle 入門
  • Interactive Intro to Dimensionality Reduction
  • MNIST: PyTorch Convolutional Neural Nets
  • kNN from scratch in Python at 97.1%
  • 線性判別分析LDA原理總結