通過線性迴歸來理解pytorch的計算邏輯
首先,宣告本文非原創,參考部落格:https://shartoo.github.io/2019/10/28/-understand-pytorch/
只是想自己記錄一下,更好的理解pytorch的計算。
1、線性迴歸問題
假定我們以一個線性迴歸問題來逐步解釋pytorch過程中的一些操作和邏輯。線性迴歸公式如下:
1.1 先用普通的numpy來展示線性迴歸過程
隨機生成100個數據,並以一定的隨機概率擾動資料集,訓練集和驗證集八二分,如下:
1 # 資料生成 2 np.random.seed(42) 3 x = np.random.rand(100, 1) 4 y = 1 + 2 * x + .1 * np.random.randn(100, 1)5 6 # Shuffles the indices 7 idx = np.arange(100) 8 np.random.shuffle(idx) 9 10 # Uses first 80 random indices for train 11 train_idx = idx[:80] 12 # Uses the remaining indices for validation 13 val_idx = idx[80:] 14 15 # Generates train and validation sets 16 x_train, y_train = x[train_idx], y[train_idx]17 x_val, y_val = x[val_idx], y[val_idx]
上面這是我們已經知道的是一個線性迴歸資料分佈,並且迴歸的引數是a=1,b=2如果我們只知道資料x_train
和y_train
,需要求這兩個引數a,b呢,一般是使用梯度下降方法。
注意,下面的梯度下降方法是全量梯度,一次計算了所有的資料的梯度,只是在迭代了1000個epoch,通常訓練時會把全量資料分成多個batch,每次都是小批量更新。
# 初始化線性迴歸的引數 a 和 b np.random.seed(42) a = np.random.randn(1) b = np.random.randn(1) print("初始化的 a : %d 和 b : %d"%(a,b)) leraning_rate = 1e-2 epochs = 1000 for epoch in range(epochs): pred = a+ b*x_train # 計算預測值和真實值之間的誤差 error = y_train-pred # 使用MSE 來計算迴歸誤差 loss = (error**2).mean() # 計算引數 a 和 b的梯度 a_grad = -2*error.mean() b_grad = -2*(x_train*error).mean() # 更新引數:用學習率和梯度 a = a-leraning_rate*a_grad b = b -leraning_rate*b_grad print("最終獲得引數為 a : %.2f, b :%.2f "%(a,b))
得到的輸出如下:
初始化的 a : 0 和 b : 0
最終獲得引數為 a : 0.98, b :1.94
2 pytorhc 來解決迴歸問題
2.1 pytorch的一些基礎問題
- 如果將numpy陣列轉化為pytorch的tensor呢?使用
torch.from_numpy(data)
- 如果想將計算的資料放入GPU計算:
data.to(device)
(其中的device就是GPU或cpu) - 資料型別轉換示例:
data.float()
- 如果確定資料位於CPU還是GPU:
data.type()
會得到類似於torch.cuda.FloatTensor
的結果,表明在GPU中 - 從GPU中把資料轉化成numpy:先取出到cpu中,再轉化成numpy陣列。
data.cpu().numpy()
2.2 使用pytorch構建引數
如何區分普通資料和引數/權重呢?需要計算梯度的是引數,否則就是普通資料。引數需要用梯度來更新,我們需要選項requires_grad=True
。使用了這個選項就是告訴pytorch,我們要計算此變數的梯度了。
我們可以使用如下幾種方式來構建引數:
1、此方法構建出來的引數全部都在cpu中:
a = torch.randn(1, requires_grad=True, dtype=torch.float) b = torch.randn(1, requires_grad=True, dtype=torch.float) print(a, b)
2、此方法嘗試把tensor引數傳入到gpu:
a = torch.randn(1, requires_grad=True, dtype=torch.float).to(device) b = torch.randn(1, requires_grad=True, dtype=torch.float).to(device) print(a, b)
此時如果檢視輸出,會發現兩個tensor ,a和b的梯度選項沒了(沒了requires_grad=True)
tensor([0.5158], device='cuda:0', grad_fn=<CopyBackwards>) tensor([0.0246], device='cuda:0', grad_fn=<CopyBackwards>)
3、先將tensor傳入gpu,然後再使用requires_grad_()
選項來重構tensor的屬性。
a = torch.randn(1, dtype=torch.float).to(device) b = torch.randn(1, dtype=torch.float).to(device) # and THEN set them as requiring gradients... a.requires_grad_() b.requires_grad_() print(a, b)
4、最佳策略當然是初始化的時候直接賦予requires_grad=True
屬性了
# We can specify the device at the moment of creation - RECOMMENDED! torch.manual_seed(42) a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) print(a, b)
檢視tensor屬性:
tensor([0.6226], device='cuda:0', requires_grad=True) tensor([1.4505], device='cuda:0', requires_grad=True)
2.3 自動求導 Autograd
Autograd是Pytorch的自動求導包,有了它,我們就不必擔憂偏導數和鏈式法則等一系列問題。Pytorch計算所有梯度的方法是backward()
。計算梯度之前,我們需要先計算損失,那麼需要呼叫對應(損失)變數的求導方法,如loss.backward()
。
- 計算所有變數的梯度(假設損失變數是loss):
loss.back()
- 獲取某個變數的實際的梯度值(假設變數為att):
att.grad
- 由於梯度是累加的,每次用梯度更新引數之後,需要清零(假設梯度變數是att):
att.zero_()
,下劃線是一種運算子,相當於直接作用於原變數上,等同於att=0
(不要手動賦值,因為此過程可能涉及到GPU、CPU之間資料傳輸,容易出錯)
我們接下來嘗試下手工更新引數和梯度
lr = 1e-1 n_epochs = 1000 torch.manual_seed(42) a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) for epoch in range(n_epochs): yhat = a + b * x_train_tensor error = y_train_tensor - yhat loss = (error ** 2).mean() # 這個是numpy的計算梯度的方式 # a_grad = -2 * error.mean() # b_grad = -2 * (x_tensor * error).mean() # 告訴pytorch計算損失loss,計算所有變數的梯度 loss.backward() # 列印結果 print(a.grad) print(b.grad) # 1. 手動更新引數,會出錯 AttributeError: 'NoneType' object has no attribute 'zero_' # 錯誤的原因是,我們重新賦值時會丟掉變數的 梯度屬性 # a = a - lr * a.grad # b = b - lr * b.grad # print(a) # 2. 再次手動更新引數,這次我們沒有重新賦值,而是使用in-place的方式賦值 RuntimeError: a leaf Variable that requires grad has been used in an in- place operation. # 這是因為 pytorch 給所有需要計算梯度的python操作以及依賴都納入了動態計算圖,稍後會解釋 # a -= lr * a.grad # b -= lr * b.grad # 3. 如果我們真想手動更新,不使用pytorch的計算圖呢,必須使用no_grad來將此引數移除自動計算梯度變數之外。 # 這是源於pytorch的動態計算圖DYNAMIC GRAPH,後面會有詳細的解釋 with torch.no_grad(): a -= lr * a.grad b -= lr * b.grad # PyTorch is "clingy" to its computed gradients, we need to tell it to let it go... a.grad.zero_() b.grad.zero_() print(a, b)
2.4 動態計算圖
如果想視覺化計算圖,可以使用輔助包torchviz,需要自己安裝。使用其make_dot(變數)
方法來視覺化與當前給定變數相關的計算圖。示例
torch.manual_seed(42) a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) yhat = a + b * x_train_tensor error = y_train_tensor - yhat loss = (error ** 2).mean() make_dot(yhat)
使用make_dot(yhat)
會得到相關的三個計算圖如下:
各個元件,解釋如下
- 藍色盒子:作為引數的tensor,需要pytorch計算梯度的
- 灰色盒子:與計算梯度相關的或者計算梯度依賴的,python操作
- 綠色盒子:與灰色盒子一樣,區別是,它是計算梯度的起始點(假設
backward()
方法是需要視覺化圖的變數呼叫的)-計算圖自底向上構建。
上圖的error
(圖中)和loss
(圖右),與左圖的唯一區別就是中間步驟(灰色盒子)的數目。看左邊的綠色盒子,有兩個箭頭指向該綠色盒子,代表兩個變數相加。a
和b*x
。再看該圖中的灰色盒子,它執行的是乘法計算,即b*x
,但是為啥只有一個箭頭指向呢,只有來自藍色盒子的引數b
,為啥沒有資料x
?因為我們不需要為資料x
計算梯度(不計算梯度的變數不會出現在計算圖中)。那麼,如果我們去掉變數的requires_grad
屬性(設定為False)會怎樣?
a_nongrad = torch.randn(1,requires_grad=False,dtype=torch.float,device=device) b = torch.randn(1,requires_grad=True,dtype=torch.float,device=device) yhat = a_nongrad+b*x_train_tensor
可以看到,對應引數a
的藍色盒子沒有了,所以很簡單明瞭,不計算梯度,就不出現在計算圖中。
3 優化器 Optimizer
到目前為止,我們都是手動計算梯度並更新引數的,如果有非常多的變數。我們可以使用pytorch的優化器,像SGD
或者Adam
。
優化器需要指定需要優化的引數,以及學習率,然後使用step()
方法來更新,此外,我們不必再一個個的去將梯度賦值為0了,只需要使用優化器的zero_grad()
方法即可。。
程式碼示例,使用SGD優化器更新引數a
和b
的梯度。
torch.manual_seed(42) a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) print(a, b) lr = 1e-1 n_epochs = 1000 # Defines a SGD optimizer to update the parameters optimizer = optim.SGD([a, b], lr=lr) for epoch in range(n_epochs): # 第一步,計算損失 yhat = a + b * x_train_tensor error = y_train_tensor - yhat loss = (error ** 2).mean() # 第二步,後傳損失 loss.backward() # 不用再手動更新引數了 # with torch.no_grad(): # a -= lr * a.grad # b -= lr * b.grad # 使用優化器的step方法一步到位 optimizer.step() # 也不用告訴pytorch需要對哪些梯度清零操作了,優化器的zero_grad()一步到位 # a.grad.zero_() # b.grad.zero_() optimizer.zero_grad() print(a, b)
4 計算損失loss
pytorch提供了很多損失函式,可以直接呼叫。簡單使用如下:
torch.manual_seed(42) a = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) b = torch.randn(1, requires_grad=True, dtype=torch.float, device=device) print(a, b) lr = 1e-1 n_epochs = 1000 # 此處定義了損失函式為MSE loss_fn = nn.MSELoss(reduction='mean') optimizer = optim.SGD([a, b], lr=lr) for epoch in range(n_epochs): yhat = a + b * x_train_tensor # 不用再手動計算損失了 # error = y_tensor - yhat # loss = (error ** 2).mean() # 直接呼叫定義好的損失函式即可 loss = loss_fn(y_train_tensor, yhat) loss.backward() optimizer.step() optimizer.zero_grad() print(a, b)
5 模型
pytorch中模型由一個繼承自Module的Python類來定義。需要實現兩個最基本的方法
__init__(self)
:定義了模型由哪幾部分組成,當前模型只有兩個變數a
和b
。模型可以定義更多的引數,並且可以將其他模型或者網路層定義為其引數forwad(self,x)
:真實執行計算的方法,它對給定輸入x
輸出模型預測值。不要顯示呼叫此forward(x)
方法,而是直接呼叫模型本身,即model(x)
。
簡單的迴歸模型如下:
class ManualLinearRegression(nn.Module): def __init__(self): super().__init__() # To make "a" and "b" real parameters of the model, we need to wrap them with nn.Parameter self.a = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float)) self.b = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float)) def forward(self, x): # 計算預測結果 return self.a + self.b * x
在__init__(self)
方法中,我們使用Parameters()
類定義了兩個引數a
和b
,告訴Pytorch,這兩個tensor要被作為模型的引數的屬性。這樣,我們就可以使用模型的parameters()
方法來找到模型每次迭代時的所有引數值了,即便模型是巢狀模型都可以找得到,這樣就能將引數喂入優化器optimizer來計算了(而非手動維護一張引數表)。並且,我們可以使用模型的state_dict()
方法來獲取所有引數的當前值。
注意:模型應當與資料出於相同位置(GPU/CPU),如果資料時GPU tensor,我們的模型也必須在GPU中
程式碼示例如下:
torch.manual_seed(42) # Now we can create a model and send it at once to the device model = ManualLinearRegression().to(device) # 獲取所有引數的當前值 print(model.state_dict()) lr = 1e-1 n_epochs = 1000 loss_fn = nn.MSELoss(reduction='mean') optimizer = optim.SGD(model.parameters(), lr=lr) for epoch in range(n_epochs): # 注意,模型一般都有個train()方法,但是不要手動呼叫,此處只是為了說明此時是在訓練,防止有些模型在訓練模型和驗證模型時操作不一致,訓練時有dropout之類的 model.train() # yhat = a + b * x_tensor yhat = model(x_train_tensor) loss = loss_fn(y_train_tensor, yhat) loss.backward() optimizer.step() optimizer.zero_grad() print(model.state_dict())
6 訓練
我們定義了optimizer
,loss function
,model
為模型三要素,同時需要提供訓練時用的特徵(feature
)和對應的標籤(label
)資料。一個完整的模型訓練有以下組成
- 模型三要素
- 優化器optimizer
- 損失函式loss
- 模型 model
- 資料
- 特徵資料feature
- 資料標籤label
我們可以寫一個包含模型三要素的通用的訓練函式:
def make_train_step(model, loss_fn, optimizer): # 定義一個訓練函式 def train_step(x, y): # Sets model to TRAIN mode model.train() # 模型預測 yhat = model(x) # 計算損失 loss = loss_fn(y, yhat) # 計算梯度 loss.backward() # 更新引數以及梯度清零 optimizer.step()
optimizer.zero_grad() # Returns the loss return loss.item() # Returns the function that will be called inside the train loop return train_step
然後在每個epoch時迭代模型訓練
# Creates the train_step function for our model, loss function and optimizer train_step = make_train_step(model, loss_fn, optimizer) losses = [] # For each epoch... for epoch in range(n_epochs): # Performs one train step and returns the corresponding loss loss = train_step(x_train_tensor, y_train_tensor) losses.append(loss) # Checks model's parameters print(model.state_dict())
最後梳理一下pytorch計算的整個流程:
1、建立線性迴歸的模型類;
2、建立資料;
3、訓練呼叫模型進行預測;
4、計算損失(預先定義損失函式);
5、optimizer.zero_grad() 清空過往梯度;
6、loss.backward() 反向傳播,計算當前梯度;
7、optimizer.step() 根據梯度更新網路引數;
8、儲存模型;
最後,附上一個完整的線性迴歸訓練及預測模型的程式碼:
#!D:/CODE/python # -*- coding: utf-8 -*- # @Time : 2020/8/11 19:02 # @Author : Alex-bd # @Site : # @File : pytorch計算邏輯.py # @Software: PyCharm # Functional description:通過線性迴歸學習pytorch計算邏輯 import os os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' import torch import matplotlib.pyplot as plt def create_linear_data(nums_data, if_plot=False): """ Create data for linear model Args: nums_data: how many data points that wanted Returns: x with shape (nums_data, 1) """ x = torch.linspace(0, 1, nums_data) x = torch.unsqueeze(x, dim=1) b = 3 a = 1 y = a + b * x + torch.rand(x.size()) if if_plot: plt.scatter(x.numpy(), y.numpy(), c=x.numpy()) plt.show() data = {"x": x, "y": y} return data data = create_linear_data(300, if_plot=True) print(data["x"].size()) class LinearRegression(torch.nn.Module): """ Linear Regressoin Module, the input features and output features are defaults both 1 """ def __init__(self): super().__init__() # self.linear = torch.nn.Linear(1, 1) self.a = torch.nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float)) self.b = torch.nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float)) def forward(self, x): out = self.a + self.b * x return out # # linear = LinearRegression() # print(linear) class Linear_Model(): def __init__(self): """ Initialize the Linear Model """ self.learning_rate = 0.001 self.epoches = 5000 self.loss_function = torch.nn.MSELoss() self.create_model() def create_model(self): self.model = LinearRegression() self.optimizer = torch.optim.SGD(self.model.parameters(), lr=self.learning_rate) def train(self, data, model_save_path="model.pth"): """ Train the model and save the parameters Args: model_save_path: saved name of model data: (x, y) = data, and y = kx + b Returns: None """ x = data["x"] y = data["y"] for epoch in range(self.epoches): prediction = self.model(x) loss = self.loss_function(prediction, y) self.optimizer.zero_grad() loss.backward() self.optimizer.step() if epoch % 50 == 0: print("epoch: {}, loss is: {}".format(epoch, loss.item())) torch.save(self.model.state_dict(), "linear.pth") def test(self, x, model_path="linear.pth"): """ Reload and test the model, plot the prediction Args: model_path: the model's path and name data: (x, y) = data, and y = kx + b Returns: None """ x = data["x"] y = data["y"] self.model.load_state_dict(torch.load(model_path)) prediction = self.model(x) print('a= ', self.model.a, end=" ") print("b= ", self.model.b) plt.scatter(x.numpy(), y.numpy(), c=x.numpy()) plt.plot(x.numpy(), prediction.detach().numpy(), color="r") plt.show() def compare_epoches(self, data): x = data["x"] y = data["y"] num_pictures = 16 fig = plt.figure(figsize=(10, 10)) current_fig = 0 for epoch in range(self.epoches): prediction = self.model(x) loss = self.loss_function(prediction, y) self.optimizer.zero_grad() loss.backward() self.optimizer.step() if epoch % (self.epoches / num_pictures) == 0: current_fig += 1 plt.subplot(4, 4, current_fig) plt.scatter(x.numpy(), y.numpy(), c=x.numpy()) plt.plot(x.numpy(), prediction.detach().numpy(), color="r") plt.show() linear = Linear_Model() data = create_linear_data(100) linear.train(data) # linear.test(data) # linear.compare_epoches(data)