1. 程式人生 > 實用技巧 >通過線性迴歸來理解pytorch的計算邏輯

通過線性迴歸來理解pytorch的計算邏輯

首先,宣告本文非原創,參考部落格:https://shartoo.github.io/2019/10/28/-understand-pytorch/

只是想自己記錄一下,更好的理解pytorch的計算。

1、線性迴歸問題

假定我們以一個線性迴歸問題來逐步解釋pytorch過程中的一些操作和邏輯。線性迴歸公式如下:

1.1 先用普通的numpy來展示線性迴歸過程

隨機生成100個數據,並以一定的隨機概率擾動資料集,訓練集和驗證集八二分,如下:

 1 # 資料生成
 2 np.random.seed(42)
 3 x = np.random.rand(100, 1)
 4 y = 1 + 2 * x + .1 * np.random.randn(100, 1)
5 6 # Shuffles the indices 7 idx = np.arange(100) 8 np.random.shuffle(idx) 9 10 # Uses first 80 random indices for train 11 train_idx = idx[:80] 12 # Uses the remaining indices for validation 13 val_idx = idx[80:] 14 15 # Generates train and validation sets 16 x_train, y_train = x[train_idx], y[train_idx]
17 x_val, y_val = x[val_idx], y[val_idx]

上面這是我們已經知道的是一個線性迴歸資料分佈,並且迴歸的引數是a=1,b=2如果我們只知道資料x_trainy_train,需要求這兩個引數a,b呢,一般是使用梯度下降方法。

注意,下面的梯度下降方法是全量梯度,一次計算了所有的資料的梯度,只是在迭代了1000個epoch,通常訓練時會把全量資料分成多個batch,每次都是小批量更新。

# 初始化線性迴歸的引數 a 和 b
np.random.seed(42)
a = np.random.randn(1)
b = np.random.randn(1)
print
("初始化的 a : %d 和 b : %d"%(a,b)) leraning_rate = 1e-2 epochs = 1000 for epoch in range(epochs): pred = a+ b*x_train # 計算預測值和真實值之間的誤差 error = y_train-pred # 使用MSE 來計算迴歸誤差 loss = (error**2).mean() # 計算引數 a 和 b的梯度 a_grad = -2*error.mean() b_grad = -2*(x_train*error).mean() # 更新引數:用學習率和梯度 a = a-leraning_rate*a_grad b = b -leraning_rate*b_grad print("最終獲得引數為 a : %.2f, b :%.2f "%(a,b))

得到的輸出如下:

初始化的 a : 0 和 b : 0
最終獲得引數為 a : 0.98, b :1.94

2 pytorhc 來解決迴歸問題

2.1 pytorch的一些基礎問題

  • 如果將numpy陣列轉化為pytorch的tensor呢?使用torch.from_numpy(data)
  • 如果想將計算的資料放入GPU計算:data.to(device)(其中的device就是GPU或cpu)
  • 資料型別轉換示例:data.float()
  • 如果確定資料位於CPU還是GPU:data.type()會得到類似於torch.cuda.FloatTensor的結果,表明在GPU中
  • 從GPU中把資料轉化成numpy:先取出到cpu中,再轉化成numpy陣列。data.cpu().numpy()

2.2 使用pytorch構建引數

如何區分普通資料和引數/權重呢?需要計算梯度的是引數,否則就是普通資料。引數需要用梯度來更新,我們需要選項requires_grad=True。使用了這個選項就是告訴pytorch,我們要計算此變數的梯度了。

我們可以使用如下幾種方式來構建引數:

1、此方法構建出來的引數全部都在cpu中:

a = torch.randn(1, requires_grad=True, dtype=torch.float)
b = torch.randn(1, requires_grad=True, dtype=torch.float)
print(a, b)

2、此方法嘗試把tensor引數傳入到gpu:

a = torch.randn(1, requires_grad=True, dtype=torch.float).to(device)
b = torch.randn(1, requires_grad=True, dtype=torch.float).to(device)
print(a, b)

此時如果檢視輸出,會發現兩個tensor ,a和b的梯度選項沒了(沒了requires_grad=True)

tensor([0.5158], device='cuda:0', grad_fn=<CopyBackwards>) tensor([0.0246], device='cuda:0', grad_fn=<CopyBackwards>)

3、先將tensor傳入gpu,然後再使用requires_grad_()選項來重構tensor的屬性。

a = torch.randn(1, dtype=torch.float).to(device)
b = torch.randn(1, dtype=torch.float).to(device)
# and THEN set them as requiring gradients...
a.requires_grad_()
b.requires_grad_()
print(a, b)

4、最佳策略當然是初始化的時候直接賦予requires_grad=True屬性了

# We can specify the device at the moment of creation - RECOMMENDED!
torch.manual_seed(42)
a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)
b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)
print(a, b)

檢視tensor屬性:

tensor([0.6226], device='cuda:0', requires_grad=True) tensor([1.4505], device='cuda:0', requires_grad=True)

2.3 自動求導 Autograd

Autograd是Pytorch的自動求導包,有了它,我們就不必擔憂偏導數和鏈式法則等一系列問題。Pytorch計算所有梯度的方法是backward()。計算梯度之前,我們需要先計算損失,那麼需要呼叫對應(損失)變數的求導方法,如loss.backward()

  • 計算所有變數的梯度(假設損失變數是loss):loss.back()
  • 獲取某個變數的實際的梯度值(假設變數為att):att.grad
  • 由於梯度是累加的,每次用梯度更新引數之後,需要清零(假設梯度變數是att):att.zero_(),下劃線是一種運算子,相當於直接作用於原變數上,等同於att=0(不要手動賦值,因為此過程可能涉及到GPU、CPU之間資料傳輸,容易出錯)

我們接下來嘗試下手工更新引數和梯度

lr = 1e-1
n_epochs = 1000

torch.manual_seed(42)
a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)
b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)

for epoch in range(n_epochs):
    yhat = a + b * x_train_tensor
    error = y_train_tensor - yhat
    loss = (error ** 2).mean()

    # 這個是numpy的計算梯度的方式
    # a_grad = -2 * error.mean()
    # b_grad = -2 * (x_tensor * error).mean()
    
    # 告訴pytorch計算損失loss,計算所有變數的梯度
    loss.backward()
    # 列印結果
    print(a.grad)
    print(b.grad)  
    
    # 1. 手動更新引數,會出錯 AttributeError: 'NoneType' object has no attribute 'zero_'
    # 錯誤的原因是,我們重新賦值時會丟掉變數的 梯度屬性
    # a = a - lr * a.grad
    # b = b - lr * b.grad
    # print(a)
    # 2. 再次手動更新引數,這次我們沒有重新賦值,而是使用in-place的方式賦值  RuntimeError: a leaf Variable that requires grad has been used in an in- place operation.
    # 這是因為 pytorch 給所有需要計算梯度的python操作以及依賴都納入了動態計算圖,稍後會解釋
    # a -= lr * a.grad
    # b -= lr * b.grad        

    # 3. 如果我們真想手動更新,不使用pytorch的計算圖呢,必須使用no_grad來將此引數移除自動計算梯度變數之外。
    # 這是源於pytorch的動態計算圖DYNAMIC GRAPH,後面會有詳細的解釋
    with torch.no_grad():
        a -= lr * a.grad
        b -= lr * b.grad
    
    # PyTorch is "clingy" to its computed gradients, we need to tell it to let it go...
    a.grad.zero_()
    b.grad.zero_()
    
print(a, b)

2.4 動態計算圖

如果想視覺化計算圖,可以使用輔助包torchviz,需要自己安裝。使用其make_dot(變數)方法來視覺化與當前給定變數相關的計算圖。示例

torch.manual_seed(42)
a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)
b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)

yhat = a + b * x_train_tensor
error = y_train_tensor - yhat
loss = (error ** 2).mean()
make_dot(yhat)

使用make_dot(yhat)會得到相關的三個計算圖如下:

各個元件,解釋如下

  • 藍色盒子:作為引數的tensor,需要pytorch計算梯度的
  • 灰色盒子:與計算梯度相關的或者計算梯度依賴的,python操作
  • 綠色盒子:與灰色盒子一樣,區別是,它是計算梯度的起始點(假設backward()方法是需要視覺化圖的變數呼叫的)-計算圖自底向上構建。

上圖的error(圖中)和loss(圖右),與左圖的唯一區別就是中間步驟(灰色盒子)的數目。看左邊的綠色盒子,有兩個箭頭指向該綠色盒子,代表兩個變數相加。ab*x。再看該圖中的灰色盒子,它執行的是乘法計算,即b*x,但是為啥只有一個箭頭指向呢,只有來自藍色盒子的引數b,為啥沒有資料x?因為我們不需要為資料x計算梯度(不計算梯度的變數不會出現在計算圖中)。那麼,如果我們去掉變數的requires_grad屬性(設定為False)會怎樣?

a_nongrad = torch.randn(1,requires_grad=False,dtype=torch.float,device=device)
b = torch.randn(1,requires_grad=True,dtype=torch.float,device=device)
yhat = a_nongrad+b*x_train_tensor

可以看到,對應引數a的藍色盒子沒有了,所以很簡單明瞭,不計算梯度,就不出現在計算圖中。

3 優化器 Optimizer

到目前為止,我們都是手動計算梯度並更新引數的,如果有非常多的變數。我們可以使用pytorch的優化器,像SGD或者Adam

優化器需要指定需要優化的引數,以及學習率,然後使用step()方法來更新,此外,我們不必再一個個的去將梯度賦值為0了,只需要使用優化器的zero_grad()方法即可。。

程式碼示例,使用SGD優化器更新引數ab的梯度。

torch.manual_seed(42)
a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)
b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)
print(a, b)

lr = 1e-1
n_epochs = 1000

# Defines a SGD optimizer to update the parameters
optimizer = optim.SGD([a, b], lr=lr)

for epoch in range(n_epochs):
    # 第一步,計算損失
    yhat = a + b * x_train_tensor
    error = y_train_tensor - yhat
    loss = (error ** 2).mean()
    # 第二步,後傳損失
    loss.backward()    
    
    # 不用再手動更新引數了
    # with torch.no_grad():
    # a -= lr * a.grad
    # b -= lr * b.grad
    # 使用優化器的step方法一步到位
    optimizer.step()
    
    # 也不用告訴pytorch需要對哪些梯度清零操作了,優化器的zero_grad()一步到位
    # a.grad.zero_()
    # b.grad.zero_()
    optimizer.zero_grad()
    
print(a, b)

4 計算損失loss

pytorch提供了很多損失函式,可以直接呼叫。簡單使用如下:

torch.manual_seed(42)
a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)
b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device)
print(a, b)

lr = 1e-1
n_epochs = 1000

# 此處定義了損失函式為MSE
loss_fn = nn.MSELoss(reduction='mean')

optimizer = optim.SGD([a, b], lr=lr)

for epoch in range(n_epochs):
    yhat = a + b * x_train_tensor
    
    # 不用再手動計算損失了
    # error = y_tensor - yhat
    # loss = (error ** 2).mean()
    # 直接呼叫定義好的損失函式即可
    loss = loss_fn(y_train_tensor, yhat)

    loss.backward()    
    optimizer.step()
    optimizer.zero_grad()
    
print(a, b)

5 模型

pytorch中模型由一個繼承自Module的Python類來定義。需要實現兩個最基本的方法

  1. __init__(self):定義了模型由哪幾部分組成,當前模型只有兩個變數ab。模型可以定義更多的引數,並且可以將其他模型或者網路層定義為其引數
  2. forwad(self,x):真實執行計算的方法,它對給定輸入x輸出模型預測值。不要顯示呼叫此forward(x)方法,而是直接呼叫模型本身,即model(x)

簡單的迴歸模型如下:

class ManualLinearRegression(nn.Module):
    def __init__(self):
        super().__init__()
        # To make "a" and "b" real parameters of the model, we need to wrap them with nn.Parameter
        self.a = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float))
        self.b = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float))
        
    def forward(self, x):
        # 計算預測結果
        return self.a + self.b * x

__init__(self)方法中,我們使用Parameters()類定義了兩個引數ab,告訴Pytorch,這兩個tensor要被作為模型的引數的屬性。這樣,我們就可以使用模型的parameters()方法來找到模型每次迭代時的所有引數值了,即便模型是巢狀模型都可以找得到,這樣就能將引數喂入優化器optimizer來計算了(而非手動維護一張引數表)。並且,我們可以使用模型的state_dict()方法來獲取所有引數的當前值。

注意:模型應當與資料出於相同位置(GPU/CPU),如果資料時GPU tensor,我們的模型也必須在GPU中

程式碼示例如下:

torch.manual_seed(42)

# Now we can create a model and send it at once to the device
model = ManualLinearRegression().to(device)
# 獲取所有引數的當前值
print(model.state_dict())

lr = 1e-1
n_epochs = 1000

loss_fn = nn.MSELoss(reduction='mean')
optimizer = optim.SGD(model.parameters(), lr=lr)

for epoch in range(n_epochs):
    #  注意,模型一般都有個train()方法,但是不要手動呼叫,此處只是為了說明此時是在訓練,防止有些模型在訓練模型和驗證模型時操作不一致,訓練時有dropout之類的
    model.train()

    # yhat = a + b * x_tensor
    yhat = model(x_train_tensor)
    
    loss = loss_fn(y_train_tensor, yhat)
    loss.backward()    
    optimizer.step()
    optimizer.zero_grad()
    
print(model.state_dict())

6 訓練

我們定義了optimizer,loss function,model為模型三要素,同時需要提供訓練時用的特徵(feature)和對應的標籤(label)資料。一個完整的模型訓練有以下組成

  • 模型三要素
    • 優化器optimizer
    • 損失函式loss
    • 模型 model
  • 資料
    • 特徵資料feature
    • 資料標籤label

我們可以寫一個包含模型三要素的通用的訓練函式:

def make_train_step(model, loss_fn, optimizer):
    # 定義一個訓練函式
    def train_step(x, y):
        # Sets model to TRAIN mode
        model.train()
        # 模型預測
        yhat = model(x)
        # 計算損失
        loss = loss_fn(y, yhat)
        # 計算梯度
        loss.backward()
        # 更新引數以及梯度清零
        optimizer.step()
     optimizer.zero_grad()
# Returns the loss return loss.item() # Returns the function that will be called inside the train loop return train_step

然後在每個epoch時迭代模型訓練

# Creates the train_step function for our model, loss function and optimizer
train_step = make_train_step(model, loss_fn, optimizer)
losses = []

# For each epoch...
for epoch in range(n_epochs):
    # Performs one train step and returns the corresponding loss
    loss = train_step(x_train_tensor, y_train_tensor)
    losses.append(loss)
    
# Checks model's parameters
print(model.state_dict())

最後梳理一下pytorch計算的整個流程:

1、建立線性迴歸的模型類;

2、建立資料;

3、訓練呼叫模型進行預測;

4、計算損失(預先定義損失函式);

5、optimizer.zero_grad() 清空過往梯度;

6、loss.backward() 反向傳播,計算當前梯度;

7、optimizer.step() 根據梯度更新網路引數;

8、儲存模型;

最後,附上一個完整的線性迴歸訓練及預測模型的程式碼:

#!D:/CODE/python
# -*- coding: utf-8 -*-
# @Time : 2020/8/11 19:02
# @Author : Alex-bd
# @Site :
# @File : pytorch計算邏輯.py
# @Software: PyCharm
# Functional description:通過線性迴歸學習pytorch計算邏輯

import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
import torch
import matplotlib.pyplot as plt


def create_linear_data(nums_data, if_plot=False):
    """
    Create data for linear model
    Args:
        nums_data: how many data points that wanted
    Returns:
        x with shape (nums_data, 1)
    """
    x = torch.linspace(0, 1, nums_data)
    x = torch.unsqueeze(x, dim=1)
    b = 3
    a = 1
    y = a + b * x + torch.rand(x.size())

    if if_plot:
        plt.scatter(x.numpy(), y.numpy(), c=x.numpy())
        plt.show()
    data = {"x": x, "y": y}
    return data


data = create_linear_data(300, if_plot=True)
print(data["x"].size())


class LinearRegression(torch.nn.Module):
    """
    Linear Regressoin Module, the input features and output
    features are defaults both 1
    """

    def __init__(self):
        super().__init__()
        # self.linear = torch.nn.Linear(1, 1)
        self.a = torch.nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float))
        self.b = torch.nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float))

    def forward(self, x):
        out = self.a + self.b * x
        return out

#
# linear = LinearRegression()
# print(linear)


class Linear_Model():
    def __init__(self):
        """
        Initialize the Linear Model
        """
        self.learning_rate = 0.001
        self.epoches = 5000
        self.loss_function = torch.nn.MSELoss()
        self.create_model()

    def create_model(self):
        self.model = LinearRegression()
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=self.learning_rate)

    def train(self, data, model_save_path="model.pth"):
        """
        Train the model and save the parameters
        Args:
            model_save_path: saved name of model
            data: (x, y) = data, and y = kx + b
        Returns:
            None
        """
        x = data["x"]
        y = data["y"]
        for epoch in range(self.epoches):
            prediction = self.model(x)
            loss = self.loss_function(prediction, y)

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            if epoch % 50 == 0:
                print("epoch: {}, loss is: {}".format(epoch, loss.item()))
        torch.save(self.model.state_dict(), "linear.pth")

    def test(self, x, model_path="linear.pth"):
        """
        Reload and test the model, plot the prediction
        Args:
            model_path: the model's path and name
            data: (x, y) = data, and y = kx + b
        Returns:
            None
        """
        x = data["x"]
        y = data["y"]
        self.model.load_state_dict(torch.load(model_path))
        prediction = self.model(x)
        print('a= ', self.model.a, end=" ")
        print("b= ", self.model.b)

        plt.scatter(x.numpy(), y.numpy(), c=x.numpy())
        plt.plot(x.numpy(), prediction.detach().numpy(), color="r")
        plt.show()

    def compare_epoches(self, data):
        x = data["x"]
        y = data["y"]

        num_pictures = 16
        fig = plt.figure(figsize=(10, 10))
        current_fig = 0
        for epoch in range(self.epoches):
            prediction = self.model(x)
            loss = self.loss_function(prediction, y)

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            if epoch % (self.epoches / num_pictures) == 0:
                current_fig += 1
                plt.subplot(4, 4, current_fig)
                plt.scatter(x.numpy(), y.numpy(), c=x.numpy())
                plt.plot(x.numpy(), prediction.detach().numpy(), color="r")
        plt.show()


linear = Linear_Model()
data = create_linear_data(100)
linear.train(data)
# linear.test(data)
# linear.compare_epoches(data)