1. 程式人生 > 其它 >Pytorch中的dataloader以及處理變長資料

Pytorch中的dataloader以及處理變長資料

技術標籤:深度學習自然語言處理

起初,我最開始單獨訓練一個網路來完成landmark點回歸任務和分類任務,訓練的資料是txt格式,在訓練之前對資料進行分析,發現分類任務中存在嚴重的資料樣本不均衡的問題,那麼我事先針對性的進行資料取樣均衡操作,重新得到訓練和測試的txt資料和標籤,保證了整個訓練和測試資料的樣本均衡性。由於我的整個專案是檢測+點回歸+分類,起初檢測和點回歸+分類是分兩步實現的,檢測是通過讀取XML格式來進行訓練,現在要統一整個專案的訓練和測試過程,要將點回歸+分類的訓練測試過程也按照讀取XML格式來進行,那麼就遇到一個問題,如何針對性的去給樣本偏少的樣本進行均衡,由於在dataset類中,返回的影象和標籤都是針對每個index返回一個結果,在dataset類中進行操作似乎不太可行,那麼就想到在dataloader中進行操作,通過dataloader中的引數sample來完成針對性取樣。

還有一個問題是關於num_workers的設定,因為我有對比過,在我的單機RTX 2080Ti上和八卡伺服器TITAN RTX上(僅使用單卡,其它卡有在跑其它任務),使用相同的num_workers,在單機上的訓練速度反而更快,於是猜想可能和CPU或者記憶體有關係,下面會具體分析。

首先來看下下dataloader中的各個引數的含義。

類的定義為:torch.utils.data.DataLoader,其中包含的引數有:

torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False, sampler=None, \
    batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, \
    drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None)

dataset:定義的dataset類返回的結果。

batchsize:每個bacth要載入的樣本數,預設為1。

shuffle:在每個epoch中對整個資料集data進行shuffle重排,預設為False。

sample:定義從資料集中載入資料所採用的策略,如果指定的話,shuffle必須為False;batch_sample類似,表示一次返回一個batch的index。

num_workers:表示開啟多少個執行緒數去載入你的資料,預設為0,代表只使用主程序。

collate_fn:表示合併樣本列表以形成小批量的Tensor物件。

pin_memory:表示要將load進來的資料是否要拷貝到pin_memory區中,其表示生成的Tensor資料是屬於記憶體中的鎖頁記憶體區,這樣將Tensor資料轉義到GPU中速度就會快一些,預設為False。

drop_last:當你的整個資料長度不能夠整除你的batchsize,選擇是否要丟棄最後一個不完整的batch,預設為False。

注:這裡簡單科普下pin_memory,通常情況下,資料在記憶體中要麼以鎖頁的方式存在,要麼儲存在虛擬記憶體(磁碟)中,設定為True後,資料直接儲存在鎖頁記憶體中,後續直接傳入cuda;否則需要先從虛擬記憶體中傳入鎖頁記憶體中,再傳入cuda,這樣就比較耗時了,但是對於記憶體的大小要求比較高。

下面針對num_workers,sample和collate_fn分別進行說明:

1. 設定num_workers:

pytorch中dataloader一次性建立num_workers個子執行緒,然後用batch_sampler將指定batch分配給指定worker,worker將它負責的batch載入進RAM,dataloader就可以直接從RAM中找本輪迭代要用的batch。如果num_worker設定得大,好處是尋batch速度快,因為下一輪迭代的batch很可能在上一輪/上上一輪...迭代時已經載入好了。壞處是記憶體開銷大,也加重了CPU負擔(worker載入資料到RAM的程序是進行CPU複製)。如果num_worker設為0,意味著每一輪迭代時,dataloader不再有自主載入資料到RAM這一步驟,只有當你需要的時候再載入相應的batch,當然速度就更慢。num_workers的經驗設定值是自己電腦/伺服器的CPU核心數,如果CPU很強、RAM也很充足,就可以設定得更大些,對於單機來說,單跑一個任務的話,直接設定為CPU的核心數最好。

2. 定義sample:(假設dataset類返回的是:data, label)

from torch.utils.data.sampler import WeightedRandomSampler
## 如果label為1,那麼對應的該類別被取出來的概率是另外一個類別的2倍
weights = [2 if label == 1 else 1 for data, label in dataset]
sampler = WeightedRandomSampler(weights,num_samples=10, replacement=True)
dataloader = DataLoader(dataset, batch_size=16, sampler=sampler)

PyTorch中提供的這個sampler模組,用來對資料進行取樣。預設採用SequentialSampler,它會按順序一個一個進行取樣。常用的有隨機取樣器:RandomSampler,當dataloader的shuffle引數為True時,系統會自動呼叫這個取樣器,實現打亂資料。這裡使用另外一個很有用的取樣方法:WeightedRandomSampler,它會根據每個樣本的權重選取資料,在樣本比例不均衡的問題中,可用它來進行重取樣。replacement用於指定是否可以重複選取某一個樣本,預設為True,即允許在一個epoch中重複取樣某一個數據。

3. 定義collate_fn:

def detection_collate(batch):
    """Custom collate fn for dealing with batches of images that have a different
    number of associated object annotations (bounding boxes).

    Arguments:
        batch: (tuple) A tuple of tensor images and lists of annotations

    Return:
        A tuple containing:
            1) (tensor) batch of images stacked on their 0 dim
            2) (list of tensors) annotations for a given image are stacked on
                                 0 dim
    """
    targets = []
    imgs = []
    for sample in batch:
        imgs.append(sample[0])
        targets.append(torch.FloatTensor(sample[1]))
    return torch.stack(imgs, 0), targets

使用dataloader時加入collate_fn引數,即可合併樣本列表以形成小批量的Tensor物件,如果你的標籤不止一個的話,還可以支援自定義,在上述方法中再額外新增對應的label即可。

data_loader = torch.utils.data.DataLoader(dataset, args.batch_size,
    num_workers=args.num_workers, sampler=sampler, shuffle=False, 
    collate_fn=detection_collate, pin_memory=True, drop_last=True)

現在的問題:有的時候,特別對於NLP任務來說,輸入的資料可能不是定長的,比如多個句子的長度一般不會一致,這時候使用DataLoader載入資料時,不定長的句子會被胡亂切分,這肯定是不行的。

解決方法是重寫DataLoader的collate_fn,具體方法如下:

# 假如每一個樣本為:
sample = {
    # 一個句子中各個詞的id
    'token_list' : [5, 2, 4, 1, 9, 8],
    # 結果y
    'label' : 5,
}
 
 
# 重寫collate_fn函式,其輸入為一個batch的sample資料
def collate_fn(batch):
    # 因為token_list是一個變長的資料,所以需要用一個list來裝這個batch的token_list
  token_lists = [item['token_list'] for item in batch]
   
  # 每個label是一個int,我們把這個batch中的label也全取出來,重新組裝
  labels = [item['label'] for item in batch]
  # 把labels轉換成Tensor
  labels = torch.Tensor(labels)
  return {
    'token_list': token_lists,
    'label': labels,
  }
 
 
# 在使用DataLoader載入資料時,注意collate_fn引數傳入的是重寫的函式
DataLoader(trainset, batch_size=4, shuffle=True, num_workers=4, collate_fn=collate_fn)

Pytorch載入變長度序列資料

在處理序列資料集時,有時會遇到變長度的樣本。此時因為尺寸不一致,無法直接利用pytorch中dataloader的預設載入方式(沿著批維度直接Stack)。

處理這種資料集,一種辦法是可以事先記錄每個樣本的長度,並把所有的資料集樣本補全至最長的樣本長度,這樣所有樣本長度一致,可以直接載入。但是會有一個問題,就是例如在使用RNN建模時,這些padding的0值會對模型造成額外影響.參考這篇文章

pytorch中通過函式torch.nn.utils.rnn.pack_padded_sequence()以及torch.nn.utils.rnn.pad_packed_sequence()來解決這個問題。torch.nn.utils.rnn.pack_padded_sequence()通過利用

pad之後的樣本和每個原始序列的長度對補全後的樣本進行pack。這樣RNN模型在計算時,根據原來的樣本長度就知道每個樣本在何時結束,從而避免額外的pad的0值的影響。計算完之後通過torch.nn.utils.rnn.pad_packed_sequence()將輸出的格式轉換為pack之前的格式。

collate_fn

另一種辦法是通過自定義collate_fn,並將其傳入DataLoader,從而實現自定義的批資料聚合方式。這裡給出一些示例。

這篇文章給出了一種解決思路

示例1

問題背景

想要使用pytorch 框架中的 Dataset 和 Dataloader 類,將變長序列整合為batch資料 (主要是對長短不一的序列進行補齊),通過自定義collate_fn函式,實現對變長資料的處理。

主要思路

Dataset 主要負責讀取單條資料,建立索引方式。
Dataloader 負責將資料聚合為batch。

測試環境: python 3.6 ,pytorch 1.2.0

資料路徑:

data路徑下儲存的是待儲存的資料樣本。
舉例:其中的 1.json 樣本格式為:

定義資料集class,進行資料索引

資料集class定義程式碼:

import os
import numpy as np
import torch
from torch.utils.data import Dataset
from tqdm import tqdm
class time_series_dataset(Dataset):
    def __init__(self, data_root):
        """
        :param data_root:   資料集路徑
        """
        self.data_root = data_root
        file_list = os.listdir(data_root)
        file_prefix = []
        for file in file_list:
            if '.json' in file:
                file_prefix.append(file.split('.')[0])
        file_prefix = list(set(file_prefix))
        self.data = file_prefix
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        prefix = self.data[index]
        import json
        with open(self.data_root+prefix+'.json','r',encoding='utf-8') as f:
            data_dic=json.load(f)
        feature = np.array(data_dic['feature'])
        length=len(data_dic['feature'])
        feature = torch.from_numpy(feature)
        label = np.array(data_dic['label'])
        label = torch.from_numpy(label)
        sample = {'feature': feature, 'label': label, 'id': prefix,'length':length}
        return sample

這裡dataset將每個樣本的資料,標籤、以及每個樣本的長度都包裹在一個字典裡並返回。

資料集例項化:

dataset = time_series_dataset("./data/") # "./data/" 為資料集檔案儲存路徑

基於此資料集的實際資料格式如下:
舉例: dataset[0]

 {'feature': tensor([17, 14, 16, 18, 14, 16], dtype=torch.int32),
  'label': tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,
          0], dtype=torch.int32),
  'id': '2',
  'length': 6}

定義collate_fn函式,傳入Dataloader類

自定義collate_fn程式碼

from torch.nn.utils.rnn import pad_sequence

def collate_func(batch_dic):
    batch_len=len(batch_dic) # 批尺寸
    max_seq_length=max([dic['length'] for dic in batch_dic]) # 一批資料中最長的那個樣本長度
    mask_batch=torch.zeros((batch_len,max_seq_length)) # mask
    fea_batch=[]
    label_batch=[]
    id_batch=[]
    for i in range(len(batch_dic)): # 分別提取批樣本中的feature、label、id、length資訊
        dic=batch_dic[i]
        fea_batch.append(dic['feature'])
        label_batch.append(dic['label'])
        id_batch.append(dic['id'])
        mask_batch[i,:dic['length']]=1 # mask
    res={}
    res['feature']=pad_sequence(fea_batch,batch_first=True) # 將資訊封裝在字典res中
    res['label']=pad_sequence(label_batch,batch_first=True)
    res['id']=id_batch
    res['mask']=mask_batch
    return res

pytorch中的dataloader返回的是一個list,也即collate_func的輸入是一個列表。

說明: mask 欄位用以儲存變長序列的實際長度,補零的部分記為0,實際序列對應位置記為1。返回資料的格式及包含的欄位,根據自己的需求進行定義。

這一段似乎用對映map更合適:

for i in range(len(batch_dic)):
        dic=batch_dic[i]
        fea_batch.append(dic['feature'])
        label_batch.append(dic['label'])
        id_batch.append(dic['id'])
        mask_batch[i,:dic['length']]=1
     fea_batch = list(map(lambda x: x['feature'], batch_dic))
     label_batch = list(map(lambda x: x['label'], batch_dic))
     id_batch = list(map(lambda x: x['id'], batch_dic))

Dataloader例項化呼叫程式碼:

 train_loader = DataLoader(dataset, batch_size=3, num_workers=1, shuffle=True,collate_fn=collate_func)

完整流程程式碼

import os
import numpy as np
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
class time_series_dataset(Dataset):
    def __init__(self, data_root):
        """
        :param data_root:   資料集路徑
        """
        self.data_root = data_root
        file_list = os.listdir(data_root)
        file_prefix = []
        for file in file_list:
            if '.json' in file:
                file_prefix.append(file.split('.')[0])
        file_prefix = list(set(file_prefix))
        self.data = file_prefix
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        prefix = self.data[index]
        import json
        with open(self.data_root+prefix+'.json','r',encoding='utf-8') as f:
            data_dic=json.load(f)
        feature = np.array(data_dic['feature'])
        length=len(data_dic['feature'])
        feature = torch.from_numpy(feature)
        label = np.array(data_dic['label'])
        label = torch.from_numpy(label)
        sample = {'feature': feature, 'label': label, 'id': prefix,'length':length}
        return sample
def collate_func(batch_dic):
    #from torch.nn.utils.rnn import pad_sequence
    batch_len=len(batch_dic)
    max_seq_length=max([dic['length'] for dic in batch_dic])
    mask_batch=torch.zeros((batch_len,max_seq_length))
    fea_batch=[]
    label_batch=[]
    id_batch=[]
    for i in range(len(batch_dic)):
        dic=batch_dic[i]
        fea_batch.append(dic['feature'])
        label_batch.append(dic['label'])
        id_batch.append(dic['id'])
        mask_batch[i,:dic['length']]=1
    res={}
    res['feature']=pad_sequence(fea_batch,batch_first=True)
    res['label']=pad_sequence(label_batch,batch_first=True)
    res['id']=id_batch
    res['mask']=mask_batch
    return res
if __name__ == "__main__":
    dataset = time_series_dataset("./data/")
    batch_size=3
    train_loader = DataLoader(dataset, batch_size=batch_size, num_workers=4, shuffle=True,collate_fn=collate_func)
    for batch_idx, batch in tqdm(enumerate(train_loader),total=int(len(train_loader.dataset) / batch_size) + 1):
        inputs,labels,masks,ids=batch['feature'],batch['label'],batch['mask'],batch['id']
        break

示例2

from torch.nn.utils.rnn import pack_sequence
from torch.utils.data import DataLoader

def my_collate(batch):
    # batch contains a list of tuples of structure (sequence, target)
    data = [item[0] for item in batch]
    data = pack_sequence(data, enforce_sorted=False)
    targets = [item[1] for item in batch]
    return [data, targets]

# ...
# later in you code, when you define you DataLoader - use the custom collate function
loader = DataLoader(dataset,
                      batch_size,
                      shuffle,
                      collate_fn=my_collate, # use custom collate function here
                      pin_memory=True)

示例3

沿一般的維度填充

I wrote a simple code that maybe someone here can re-use. I wanted to make something that pads a generic dim, and I don’t use an RNN of any type so PackedSequence was a bit of overkill for me. It’s simple, but it works for me.

def pad_tensor(vec, pad, dim):
    """
    args:
        vec - tensor to pad
        pad - the size to pad to
        dim - dimension to pad

    return:
        a new tensor padded to 'pad' in dimension 'dim'
    """
    pad_size = list(vec.shape)
    pad_size[dim] = pad - vec.size(dim)
    return torch.cat([vec, torch.zeros(*pad_size)], dim=dim)


class PadCollate:
    """
    a variant of callate_fn that pads according to the longest sequence in
    a batch of sequences
    """

    def __init__(self, dim=0):
        """
        args:
            dim - the dimension to be padded (dimension of time in sequences)
        """
        self.dim = dim

    def pad_collate(self, batch):
        """
        args:
            batch - list of (tensor, label)

        reutrn:
            xs - a tensor of all examples in 'batch' after padding
            ys - a LongTensor of all labels in batch
        """
        # find longest sequence
        max_len = max(map(lambda x: x[0].shape[self.dim], batch))
        # pad according to max_len
        batch = map(lambda (x, y):
                    (pad_tensor(x, pad=max_len, dim=self.dim), y), batch)
        # stack all
        xs = torch.stack(map(lambda x: x[0], batch), dim=0)
        ys = torch.LongTensor(map(lambda x: x[1], batch))
        return xs, ys

    def __call__(self, batch):
        return self.pad_collate(batch)

to be used with the data loader:

train_loader = DataLoader(ds, ..., collate_fn=PadCollate(dim=0))

If you are going to pack your padded sequences later, you can also immediately sort the batches from longest sequence to shortest:

如果你打算後續對padded的樣本進行pack操作,你可以對批樣本從長到短進行排序:(這種做法是比較實用的,因為通常後續需要進行pack操作)

def sort_batch(batch, targets, lengths):
    """
    Sort a minibatch by the length of the sequences with the longest sequences first
    return the sorted batch targes and sequence lengths.
    This way the output can be used by pack_padded_sequences(...)
    """
    seq_lengths, perm_idx = lengths.sort(0, descending=True)
    seq_tensor = batch[perm_idx]
    target_tensor = targets[perm_idx]
    return seq_tensor, target_tensor, seq_lengths

def pad_and_sort_batch(DataLoaderBatch):
    """
    DataLoaderBatch should be a list of (sequence, target, length) tuples...
    Returns a padded tensor of sequences sorted from longest to shortest,
    """
    batch_size = len(DataLoaderBatch)
    batch_split = list(zip(*DataLoaderBatch))

    seqs, targs, lengths = batch_split[0], batch_split[1], batch_split[2]
    max_length = max(lengths)

    padded_seqs = np.zeros((batch_size, max_length))
    for i, l in enumerate(lengths):
        padded_seqs[i, 0:l] = seqs[i][0:l]

    return sort_batch(torch.tensor(padded_seqs), torch.tensor(targs).view(-1,1), torch.tensor(lengths))

假設你的Dataset具有以下形式:

def __getitem__(self, idx):
         return self.sequences[idx], torch.tensor(self.targets[idx]), self.sequence_lengths[idx]

使用時將pad_and_sort collator傳到 DataLoader:

train_gen = Data.DataLoader(train_data, batch_size=128, shuffle=True, collate_fn=pad_and_sort_batch)

示例5

def collate_fn_padd(batch):
    '''
    Padds batch of variable length

    note: it converts things ToTensor manually here since the ToTensor transform
    assume it takes in images rather than arbitrary tensors.
    '''
    ## get sequence lengths
    lengths = torch.tensor([ t.shape[0] for t in batch ]).to(device)
    ## padd
    batch = [ torch.Tensor(t).to(device) for t in batch ]
    batch = torch.nn.utils.rnn.pad_sequence(batch)
    ## compute mask
    mask = (batch != 0).to(device)
    return batch, lengths, mask

參考:

https://blog.csdn.net/lrs1353281004/article/details/106129660

https://discuss.pytorch.org/t/dataloader-for-various-length-of-data/6418