1. 程式人生 > 實用技巧 >目標跟蹤初探(DeepSORT)

目標跟蹤初探(DeepSORT)

目前主流的目標跟蹤演算法都是基於Tracking-by-Detecton策略,即基於目標檢測的結果來進行目標跟蹤。DeepSORT運用的就是這個策略,上面的視訊是DeepSORT對人群進行跟蹤的結果,每個bbox左上角的數字是用來標識某個人的唯一ID號。

這裡就有個問題,視訊中不同時刻的同一個人,位置發生了變化,那麼是如何關聯上的呢?答案就是匈牙利演算法和卡爾曼濾波。

  • 匈牙利演算法可以告訴我們當前幀的某個目標,是否與前一幀的某個目標相同。
  • 卡爾曼濾波可以基於目標前一時刻的位置,來預測當前時刻的位置,並且可以比感測器(在目標跟蹤中即目標檢測器,比如Yolo等)更準確的估計目標的位置。

在DeepSORT中,匈牙利演算法用來將前一幀中的跟蹤框tracks與當前幀中的檢測框detections進行關聯,通過外觀資訊(appearance information)和馬氏距離(Mahalanobis distance),或者IOU來計算代價矩陣

卡爾曼濾波(Kalman Filter)

卡爾曼濾波被廣泛應用於無人機、自動駕駛、衛星導航等領域,簡單來說,其作用就是基於感測器的測量值來更新預測值,以達到更精確的估計。

假設我們要跟蹤小車的位置變化,如下圖所示,藍色的分佈是卡爾曼濾波預測值,棕色的分佈是感測器的測量值,灰色的分佈就是預測值基於測量值更新後的最優估計。

Kalman Filter

在目標跟蹤中,需要估計track的以下兩個狀態:

  • 均值(Mean):表示目標的位置資訊,由bbox的中心座標 (cx, cy),寬高比r,高h,以及各自的速度變化值組成,由8維向量表示為 x = [cx, cy, r, h, vx, vy, vr, vh],各個速度值初始化為0。
  • 協方差(Covariance ):表示目標位置資訊的不確定性,由8x8的對角矩陣表示,矩陣中數字越大則表明不確定性越大,可以以任意值初始化。

卡爾曼濾波分為兩個階段:(1)預測track在下一時刻的位置,(2) 基於detection來更新預測的位置。

DeepSort工作流程

DeepSORT對每一幀的處理流程如下:

檢測器得到bbox → 生成detections → 卡爾曼濾波預測→ 使用匈牙利演算法將預測後的tracks和當前幀中的detecions進行匹配(級聯匹配和IOU匹配) → 卡爾曼濾波更新

Frame 0:檢測器檢測到了3個detections,當前沒有任何tracks,將這3個detections初始化為tracks
Frame 1:檢測器又檢測到了3個detections,對於Frame 0中的tracks,先進行預測得到新的tracks,然後使用匈牙利演算法將新的tracks與detections進行匹配,得到(track, detection)匹配對,最後用每對中的detection更新對應的track


目標跟蹤初探(DeepSORT)

求索 ChenJ pprp 清歡守護者 小小將 Lyon

最近由於工作原因,首次接觸到了目標跟蹤任務,這幾天讀了一些該領域的優秀論文,真心感覺目標跟蹤任務的難度和複雜度要比分類和目標檢測高不少,具有更大的挑戰性。

如果你跟我一樣是正在學習目標跟蹤的新手,希望本文能讓你對目標跟蹤任務和DeepSORT演算法的工作流程有個初步的瞭解,如果你是該領域的前輩,歡迎對文中的不足之處進行指正,多多指教。


本文首先將介紹在目標跟蹤任務中常用的匈牙利演算法(Hungarian Algorithm)和卡爾曼濾波(Kalman Filter),然後介紹經典演算法DeepSORT的工作流程以及對相關原始碼進行解析。

DeepSORT Demo

目前主流的目標跟蹤演算法都是基於Tracking-by-Detecton策略,即基於目標檢測的結果來進行目標跟蹤。DeepSORT運用的就是這個策略,上面的視訊是DeepSORT對人群進行跟蹤的結果,每個bbox左上角的數字是用來標識某個人的唯一ID號。

這裡就有個問題,視訊中不同時刻的同一個人,位置發生了變化,那麼是如何關聯上的呢?答案就是匈牙利演算法和卡爾曼濾波。

  • 匈牙利演算法可以告訴我們當前幀的某個目標,是否與前一幀的某個目標相同。
  • 卡爾曼濾波可以基於目標前一時刻的位置,來預測當前時刻的位置,並且可以比感測器(在目標跟蹤中即目標檢測器,比如Yolo等)更準確的估計目標的位置。

匈牙利演算法(Hungarian Algorithm)

首先,先介紹一下什麼是分配問題(Assignment Problem):假設有N個人和N個任務,每個任務可以任意分配給不同的人,已知每個人完成每個任務要花費的代價不盡相同,那麼如何分配可以使得總的代價最小。

舉個例子,假設現在有3個任務,要分別分配給3個人,每個人完成各個任務所需代價矩陣(cost matrix)如下所示(這個代價可以是金錢、時間等等):

怎樣才能找到一個最優分配,使得完成所有任務花費的代價最小呢?

匈牙利演算法(又叫KM演算法)就是用來解決分配問題的一種方法,它基於定理:

如果代價矩陣的某一行或某一列同時加上或減去某個數,則這個新的代價矩陣的最優分配仍然是原代價矩陣的最優分配。

演算法步驟(假設矩陣為NxN方陣):

  1. 對於矩陣的每一行,減去其中最小的元素
  2. 對於矩陣的每一列,減去其中最小的元素
  3. 用最少的水平線或垂直線覆蓋矩陣中所有的0
  4. 如果線的數量等於N,則找到了最優分配,演算法結束,否則進入步驟5
  5. 找到沒有被任何線覆蓋的最小元素,每個沒被線覆蓋的行減去這個元素,每個被線覆蓋的列加上這個元素,返回步驟3

繼續拿上面的例子做演示:

step1 每一行最小的元素分別為15、20、20,減去得到:

step2 每一列最小的元素分別為0、20、5,減去得到:

step3 用最少的水平線或垂直線覆蓋所有的0,得到:

step4 線的數量為2,小於3,進入下一步;

step5 現在沒被覆蓋的最小元素是5,沒被覆蓋的行(第一和第二行)減去5,得到:

被覆蓋的列(第一列)加上5,得到:

跳轉到step3,用最少的水平線或垂直線覆蓋所有的0,得到:

step4:線的數量為3,滿足條件,演算法結束。顯然,將任務2分配給第1個人、任務1分配給第2個人、任務3分配給第3個人時,總的代價最小(0+0+0=0):

所以原矩陣的最小總代價為(40+20+25=85):

sklearn裡的linear_assignment()函式以及scipy裡的linear_sum_assignment()函式都實現了匈牙利演算法,兩者的返回值的形式不同:

import numpy as np 
from sklearn.utils.linear_assignment_ import linear_assignment
from scipy.optimize import linear_sum_assignment
 

cost_matrix = np.array([
    [15,40,45],
    [20,60,35],
    [20,40,25]
])
 
matches = linear_assignment(cost_matrix)
print('sklearn API result:\n', matches)
matches = linear_sum_assignment(cost_matrix)
print('scipy API result:\n', matches)
 

"""Outputs
sklearn API result:
 [[0 1]
  [1 0]
  [2 2]]
scipy API result:
 (array([0, 1, 2], dtype=int64), array([1, 0, 2], dtype=int64))
"""

在DeepSORT中,匈牙利演算法用來將前一幀中的跟蹤框tracks與當前幀中的檢測框detections進行關聯,通過外觀資訊(appearance information)和馬氏距離(Mahalanobis distance),或者IOU來計算代價矩陣。

原始碼解讀:

#  linear_assignment.py
def min_cost_matching(distance_metric, max_distance, tracks, detections, 
                      track_indices=None, detection_indices=None):
    ...
    
    # 計算代價矩陣
    cost_matrix = distance_metric(tracks, detections, track_indices, detection_indices)
    cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
    
    # 執行匈牙利演算法,得到匹配成功的索引對,行索引為tracks的索引,列索引為detections的索引
    row_indices, col_indices = linear_assignment(cost_matrix)
 
    matches, unmatched_tracks, unmatched_detections = [], [], []
 
    # 找出未匹配的detections
    for col, detection_idx in enumerate(detection_indices):
        if col not in col_indices:
            unmatched_detections.append(detection_idx)
     
    # 找出未匹配的tracks
    for row, track_idx in enumerate(track_indices):
        if row not in row_indices:
            unmatched_tracks.append(track_idx)
    
    # 遍歷匹配的(track, detection)索引對
    for row, col in zip(row_indices, col_indices):
        track_idx = track_indices[row]
        detection_idx = detection_indices[col]
        # 如果相應的cost大於閾值max_distance,也視為未匹配成功
        if cost_matrix[row, col] > max_distance:
            unmatched_tracks.append(track_idx)
            unmatched_detections.append(detection_idx)
        else:
            matches.append((track_idx, detection_idx))
 
    return matches, unmatched_tracks, unmatched_detections

卡爾曼濾波(Kalman Filter)

卡爾曼濾波被廣泛應用於無人機、自動駕駛、衛星導航等領域,簡單來說,其作用就是基於感測器的測量值來更新預測值,以達到更精確的估計。

假設我們要跟蹤小車的位置變化,如下圖所示,藍色的分佈是卡爾曼濾波預測值,棕色的分佈是感測器的測量值,灰色的分佈就是預測值基於測量值更新後的最優估計。

Kalman Filter

在目標跟蹤中,需要估計track的以下兩個狀態:

  • 均值(Mean):表示目標的位置資訊,由bbox的中心座標 (cx, cy),寬高比r,高h,以及各自的速度變化值組成,由8維向量表示為 x = [cx, cy, r, h, vx, vy, vr, vh],各個速度值初始化為0。
  • 協方差(Covariance ):表示目標位置資訊的不確定性,由8x8的對角矩陣表示,矩陣中數字越大則表明不確定性越大,可以以任意值初始化。

卡爾曼濾波分為兩個階段:(1)預測track在下一時刻的位置,(2) 基於detection來更新預測的位置。

下面將介紹這兩個階段用到的計算公式。(這裡不涉及公式的原理推導,因為我也不清楚原理(ಥ_ಥ) ,只是說明一下各個公式的作用)

預測

基於track在t-1時刻的狀態來預測其在t時刻的狀態。

在公式1中,x為track在t-1時刻的均值,F稱為狀態轉移矩陣,該公式預測t時刻的x':

矩陣F中的dt是當前幀和前一幀之間的差,將等號右邊的矩陣乘法展開,可以得到cx'=cx+dt*vx,cy'=cy+dt*vy...,所以這裡的卡爾曼濾波是一個勻速模型(Constant Velocity Model)。

在公式2中,P為track在t-1時刻的協方差,Q為系統的噪聲矩陣,代表整個系統的可靠程度,一般初始化為很小的值,該公式預測t時刻的P'。

原始碼解讀:

#  kalman_filter.py
def predict(self, mean, covariance):
    """Run Kalman filter prediction step.
    
    Parameters
    ----------
    mean: ndarray, the 8 dimensional mean vector of the object state at the previous time step.
    covariance: ndarray, the 8x8 dimensional covariance matrix of the object state at the previous time step.
 
    Returns
    -------
    (ndarray, ndarray), the mean vector and covariance matrix of the predicted state. 
     Unobserved velocities are initialized to 0 mean.
    """
    std_pos = [
        self._std_weight_position * mean[3],
        self._std_weight_position * mean[3],
        1e-2,
        self._std_weight_position * mean[3]]
    std_vel = [
        self._std_weight_velocity * mean[3],
        self._std_weight_velocity * mean[3],
        1e-5,
        self._std_weight_velocity * mean[3]]
    
    motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))  # 初始化噪聲矩陣Q
    mean = np.dot(self._motion_mat, mean)  # x' = Fx
    covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov  # P' = FPF(T) + Q
 
    return mean, covariance

更新

基於t時刻檢測到的detection,校正與其關聯的track的狀態,得到一個更精確的結果。

在公式3中,z為detection的均值向量,不包含速度變化值,即z=[cx, cy, r, h],H稱為測量矩陣,它將track的均值向量x'對映到檢測空間,該公式計算detection和track的均值誤差;

在公式4中,R為檢測器的噪聲矩陣,它是一個4x4的對角矩陣,對角線上的值分別為中心點兩個座標以及寬高的噪聲,以任意值初始化,一般設定寬高的噪聲大於中心點的噪聲,該公式先將協方差矩陣P'對映到檢測空間,然後再加上噪聲矩陣R;

公式5計算卡爾曼增益K,卡爾曼增益用於估計誤差的重要程度;

公式6和公式7得到更新後的均值向量x和協方差矩陣P。

原始碼解讀:

#  kalman_filter.py
def project(self, mean, covariance):
    """Project state distribution to measurement space.
        
    Parameters
    ----------
    mean: ndarray, the state's mean vector (8 dimensional array).
    covariance: ndarray, the state's covariance matrix (8x8 dimensional).

    Returns
    -------
    (ndarray, ndarray), the projected mean and covariance matrix of the given state estimate.
    """
    std = [self._std_weight_position * mean[3],
           self._std_weight_position * mean[3],
           1e-1,
           self._std_weight_position * mean[3]]
        
    innovation_cov = np.diag(np.square(std))  # 初始化噪聲矩陣R
    mean = np.dot(self._update_mat, mean)  # 將均值向量對映到檢測空間,即Hx'
    covariance = np.linalg.multi_dot((
        self._update_mat, covariance, self._update_mat.T))  # 將協方差矩陣對映到檢測空間,即HP'H^T
    return mean, covariance + innovation_cov


def update(self, mean, covariance, measurement):
    """Run Kalman filter correction step.

    Parameters
    ----------
    mean: ndarra, the predicted state's mean vector (8 dimensional).
    covariance: ndarray, the state's covariance matrix (8x8 dimensional).
    measurement: ndarray, the 4 dimensional measurement vector (x, y, a, h), where (x, y) is the 
                 center position, a the aspect ratio, and h the height of the bounding box.
    Returns
    -------
    (ndarray, ndarray), the measurement-corrected state distribution.
    """
    # 將mean和covariance對映到檢測空間,得到Hx'和S
    projected_mean, projected_cov = self.project(mean, covariance)
    # 矩陣分解(這一步沒看懂)
    chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
    # 計算卡爾曼增益K(這一步沒看明白是如何對應上公式5的,求線代大佬指教)
    kalman_gain = scipy.linalg.cho_solve(
            (chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
            check_finite=False).T
    # z - Hx'
    innovation = measurement - projected_mean
    # x = x' + Ky
    new_mean = mean + np.dot(innovation, kalman_gain.T)
    # P = (I - KH)P'
    new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
        
    return new_mean, new_covariance

DeepSort工作流程

DeepSORT對每一幀的處理流程如下:

檢測器得到bbox → 生成detections → 卡爾曼濾波預測→ 使用匈牙利演算法將預測後的tracks和當前幀中的detecions進行匹配(級聯匹配和IOU匹配) → 卡爾曼濾波更新

Frame 0:檢測器檢測到了3個detections,當前沒有任何tracks,將這3個detections初始化為tracks
Frame 1:檢測器又檢測到了3個detections,對於Frame 0中的tracks,先進行預測得到新的tracks,然後使用匈牙利演算法將新的tracks與detections進行匹配,得到(track, detection)匹配對,最後用每對中的detection更新對應的track

檢測

使用Yolo作為檢測器,檢測當前幀中的bbox:

#  demo_yolo3_deepsort.py
def detect(self):
    while self.vdo.grab():
	...
	bbox_xcycwh, cls_conf, cls_ids = self.yolo3(im)  # 檢測到的bbox[cx,cy,w,h],置信度,類別id
	if bbox_xcycwh is not None:
    	    # 篩選出人的類別
    	    mask = cls_ids == 0
  	    bbox_xcycwh = bbox_xcycwh[mask]
  	    bbox_xcycwh[:, 3:] *= 1.2
   	    cls_conf = cls_conf[mask]
            ...

生成detections

將檢測到的bbox轉換成detections:

#  deep_sort.py
def update(self, bbox_xywh, confidences, ori_img):
    self.height, self.width = ori_img.shape[:2]
    # 提取每個bbox的feature
    features = self._get_features(bbox_xywh, ori_img)
    # [cx,cy,w,h] -> [x1,y1,w,h]
    bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
    # 過濾掉置信度小於self.min_confidence的bbox,生成detections
    detections = [Detection(bbox_tlwh[i], conf, features[i]) for i,conf in enumerate(confidences) if conf > self.min_confidence]
    # NMS (這裡self.nms_max_overlap的值為1,即保留了所有的detections)
    boxes = np.array([d.tlwh for d in detections])
    scores = np.array([d.confidence for d in detections])
    indices = non_max_suppression(boxes, self.nms_max_overlap, scores)
    detections = [detections[i] for i in indices]
    ...

卡爾曼濾波預測階段

使用卡爾曼濾波預測前一幀中的tracks在當前幀的狀態:

#  track.py
def predict(self, kf):
    """Propagate the state distribution to the current time step using a 
       Kalman filter prediction step.
    Parameters
    ----------
    kf: The Kalman filter.
    """
    self.mean, self.covariance = kf.predict(self.mean, self.covariance)  # 預測
    self.age += 1  # 該track自出現以來的總幀數加1
    self.time_since_update += 1  # 該track自最近一次更新以來的總幀數加1

匹配

首先對基於外觀資訊的馬氏距離計算tracks和detections的代價矩陣,然後相繼進行級聯匹配和IOU匹配,最後得到當前幀的所有匹配對、未匹配的tracks以及未匹配的detections:

#  tracker.py
def _match(self, detections):
    def gated_metric(racks, dets, track_indices, detection_indices):
        """
        基於外觀資訊和馬氏距離,計算卡爾曼濾波預測的tracks和當前時刻檢測到的detections的代價矩陣
        """
        features = np.array([dets[i].feature for i in detection_indices])
        targets = np.array([tracks[i].track_id for i in track_indices]
	# 基於外觀資訊,計算tracks和detections的餘弦距離代價矩陣
        cost_matrix = self.metric.distance(features, targets)
	# 基於馬氏距離,過濾掉代價矩陣中一些不合適的項 (將其設定為一個較大的值)
        cost_matrix = linear_assignment.gate_cost_matrix(self.kf, cost_matrix, tracks, 
                      dets, track_indices, detection_indices)
        return cost_matrix

    # 區分開confirmed tracks和unconfirmed tracks
    confirmed_tracks = [i for i, t in enumerate(self.tracks) if t.is_confirmed()]
    unconfirmed_tracks = [i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

    # 對confirmd tracks進行級聯匹配
    matches_a, unmatched_tracks_a, unmatched_detections = \
        linear_assignment.matching_cascade(
            gated_metric, self.metric.matching_threshold, self.max_age,
            self.tracks, detections, confirmed_tracks)

    # 對級聯匹配中未匹配的tracks和unconfirmed tracks中time_since_update為1的tracks進行IOU匹配
    iou_track_candidates = unconfirmed_tracks + [k for k in unmatched_tracks_a if
                                                 self.tracks[k].time_since_update == 1]
    unmatched_tracks_a = [k for k in unmatched_tracks_a if
                          self.tracks[k].time_since_update != 1]
    matches_b, unmatched_tracks_b, unmatched_detections = \
        linear_assignment.min_cost_matching(
            iou_matching.iou_cost, self.max_iou_distance, self.tracks,
            detections, iou_track_candidates, unmatched_detections)
	
    # 整合所有的匹配對和未匹配的tracks
    matches = matches_a + matches_b
    unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
    
    return matches, unmatched_tracks, unmatched_detections


# 級聯匹配原始碼  linear_assignment.py
def matching_cascade(distance_metric, max_distance, cascade_depth, tracks, detections, 
                     track_indices=None, detection_indices=None):
    ...
    unmatched_detections = detection_indice
    matches = []
    # 由小到大依次對每個level的tracks做匹配
    for level in range(cascade_depth):
	# 如果沒有detections,退出迴圈
        if len(unmatched_detections) == 0:  
            break
	# 當前level的所有tracks索引
        track_indices_l = [k for k in track_indices if 
                           tracks[k].time_since_update == 1 + level]
	# 如果當前level沒有track,繼續
        if len(track_indices_l) == 0: 
            continue
		
	# 匈牙利匹配
        matches_l, _, unmatched_detections = min_cost_matching(distance_metric, max_distance, tracks, detections, 
                                                               track_indices_l, unmatched_detections)
        
	matches += matches_l
	unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
    return matches, unmatched_tracks, unmatched_detections

卡爾曼濾波更新階段

對於每個匹配成功的track,用其對應的detection進行更新,並處理未匹配tracks和detections:

#  tracker.py
def update(self, detections):
    """Perform measurement update and track management.
    Parameters
    ----------
    detections: List[deep_sort.detection.Detection]
                A list of detections at the current time step.
    """
    # 得到匹配對、未匹配的tracks、未匹配的dectections
    matches, unmatched_tracks, unmatched_detections = self._match(detections)

    # 對於每個匹配成功的track,用其對應的detection進行更新
    for track_idx, detection_idx in matches:
        self.tracks[track_idx].update(self.kf, detections[detection_idx])
    
	# 對於未匹配的成功的track,將其標記為丟失
	for track_idx in unmatched_tracks:
        self.tracks[track_idx].mark_missed()
	
    # 對於未匹配成功的detection,初始化為新的track
    for detection_idx in unmatched_detections:
        self._initiate_track(detections[detection_idx])
    
	...

參考

SIMPLE ONLINE AND REALTIME TRACKING WITH A DEEP ASSOCIATION METRIC

編輯於 07-23 目標跟蹤 深度學習(Deep Learning) 計算機視覺

文章被以下專欄收錄

CV自習室 希望來過這裡的同學都有所收穫。

推薦閱讀

Deep SORT多目標跟蹤演算法程式碼解析(上)

pprp發表於Giant...

【MOT】詳解DeepSORT多目標追蹤模型

周威發表於深度學習【...

帶你入門多目標跟蹤(二)SORT&DeepSORT

ZihaoZhao

Deep SORT多目標跟蹤演算法程式碼解析(下)

由於知乎對文章篇幅有限制,所以分上下篇發。上一篇為: pprp:Deep SORT多目標跟蹤演算法程式碼解析(上)上篇將梳理SORT、Deep SORT,講解DeepSORT程式碼部分的各個模組,以類圖為主。 下篇主要是…

pprp發表於Giant...

85 條評論

寫下你的評論...
    • 智秩2019-11-12 請問這種應用需要用到機器學習或深度學習嗎?
    • 求索(作者)回覆智秩2019-11-12 當然,用到了深度學習
    • 智秩回覆求索(作者)2019-11-12 不太懂,請問哪一步用到深度學習了?
    • Elon2019-11-13 就問問你離智慧社群的要求還有多遠?如果要大規模商業落地,離商家的要求又有多遠……
    • 劉政回覆Elon2019-11-15

      這個離落地還有很多優化的地方 現在基本上只是有這種功能 誤差還是比較大的 比如 跟蹤的A時候 B從A身邊走過有幾幀把A全部遮擋了 之後A的ID就會改變了

    • Elon回覆劉政2019-11-15 識別準確率-這恐怕就商業落地上是視覺模式識別最致命的地方。
  • 禿爵達文西2019-11-13 用了yolo作為檢測器以後,整體的效能不會太高吧。
  • 求索(作者)回覆禿爵達文西2019-11-13

    我認為yolo是效能很好的檢測演算法了,不論是在準確率還是速率方面,在工業界應用得也非常多。目標跟蹤演算法的整體效能不光看檢測器的效能,還要看目標的ID變換次數,這就基於演算法中所用的匹配策略了。

  • 深度學不動2019-11-13 寫的很好,對這個領域的新手很有幫助,謝謝大佬[贊][贊][贊]
  • 求索(作者)回覆深度學不動2019-11-13 多謝鼓勵,共同學習