Triplet-Loss原理及其實現、應用
阿新 • • 發佈:2019-01-30
- 本文個人部落格地址: 點選檢視
- 歡迎下面留言交流
一、 Triplet loss
1、介紹
- 為什麼不適用
softmax
函式呢,softmax
最終的類別數是確定的,而Triplet loss
學到的是一個好的embedding
,相似的影象在embedding
空間裡是相近的,可以判斷是否是同一個人臉。
2、原理
- 輸入是一個三元組
<a, p, n>
a: anchor
p: positive
, 與a
是同一類別的樣本n: negative
, 與a
是不同類別的樣本
- 公式是:
- 所以最終的優化目標是拉近
a, p
的距離, 拉遠a, n
的距離 easy triplets
: 即 ,這種情況不需要優化,天然a, p
的距離很近,a, n
的距離遠hard triplets
: , 即a, p
的距離遠semi-hard triplets
a, n
的距離靠的很近,但是有一個margin
- 所以最終的優化目標是拉近
FaceNet
中是隨機選取semi-hard triplets
進行訓練的, (也可以選擇hard triplets
或者兩者一起進行訓練)
3、訓練方法
3.1 offline
- 訓練集所有資料經過計算得到對應的
embeddings
, 可以得到 很多<i, j, k>
triplet loss
- 效率不高,因為需要過一遍所有的資料得到三元組,然後訓練反向更新網路
3.2 online
- 從訓練集中抽取
B
個樣本,然後計算B
個embeddings
,可以產生 個triplets
(當然其中有不合法的,因為需要的是<a, p, n>
)
- 實際使用中採用此方法,又分為兩種策略 (是在一篇行人重識別的論文中提到的 In Defense of the Triplet Loss for Person Re-Identification),假設 , 其中
P
個身份的人,每個身份的人K
張圖片(一般K
取4
)Batch All
: 計算batch_size
中所有valid
的的hard triplet
和semi-hard triplet
, 然後取平均得到Loss
- 注意因為很多
easy triplets
的情況,所以平均會導致Loss
很小,所以是對所有 valid 的所有求平均 (下面程式碼中會介紹) - 可以產生 個
triplets
PK
個anchor
K-1
個positive
PK-K
個negative
- 注意因為很多
Batch Hard
: 對於每一個anchor
, 選擇距離最大的d(a, p)
和 距離最大的d(a, n)
- 所以公有 個 三元組
triplets
- 所以公有 個 三元組
二、 Tensorflow 中的實現
- 全部程式碼
Tensorflow
中有實現好的triplet loss
介面,這裡自己實現,(實現起來還是有點繞的, 有一些小細節問題)- 使用
numpy
也仿照實現了,便於除錯檢視中間的結果, 全部程式碼
1、Batch All
1.1 計算兩兩embeddings
的距離
numpy
中的實現,便於除錯理解, 點選檢視- 輸入大小是
(batch_size, vector_size)
大小的embeddings
向量 - 因為 , 矩陣相乘 中包含
a*b
的值,物件線上是向量平方的值,所以可以直接使用矩陣計算 - 如果不使用平方,就開根號,
- 注意根號下不能為
0
,0
開根號是沒有問題的,但是Tensorflow
梯度反向傳播是就會導致無窮大,所以加上一個平滑項1e-16
,最後再修改回來。
- 注意根號下不能為
def _pairwise_distance(embeddings, squared=False):
'''
計算兩兩embedding的距離
------------------------------------------
Args:
embedding: 特徵向量, 大小(batch_size, vector_size)
squared: 是否距離的平方,即歐式距離
Returns:
distances: 兩兩embeddings的距離矩陣,大小 (batch_size, batch_size)
'''
# 矩陣相乘,得到(batch_size, batch_size),因為計算歐式距離|a-b|^2 = a^2 -2ab + b^2,
# 其中 ab 可以用矩陣乘表示
dot_product = tf.matmul(embeddings, tf.transpose(embeddings))
# dot_product對角線部分就是 每個embedding的平方
square_norm = tf.diag_part(dot_product)
# |a-b|^2 = a^2 - 2ab + b^2
# tf.expand_dims(square_norm, axis=1)是(batch_size, 1)大小的矩陣,減去 (batch_size, batch_size)大小的矩陣,相當於每一列操作
distances = tf.expand_dims(square_norm, axis=1) - 2.0 * dot_product + tf.expand_dims(square_norm, axis=0)
distances = tf.maximum(distances, 0.0) # 小於0的距離置為0
if not squared: # 如果不平方,就開根號,但是注意有0元素,所以0的位置加上 1e*-16
distances = distances + mask * 1e-16
distances = tf.sqrt(distances)
distances = distances * (1.0 - mask) # 0的部分仍然置為0
return distances
1.2 計算valid mask
numpy
中的實現, 點選檢視- 上面得到了
(batch_size, batch_size)
大小的距離矩陣,然後就可以計算所有embeddings
組成的三元組<i, j, k>
損失 - 但是不是所有的三元組都是
valid
的, 要是<a, p, n>
的形式,所以計算一個3D
的mask
,然後乘上得到的(batch_size, batch_size, batch_size)
的所有三元組的損失即可,如何得到mask
呢 <i, j, k>
要滿足i, j, k
不相等labels[i] == labels[j] and labels[i] != labels[k]
def _get_triplet_mask(labels):
'''
得到一個3D的mask [a, p, n], 對應triplet(a, p, n)是valid的位置是True
----------------------------------
Args:
labels: 對應訓練資料的labels, shape = (batch_size,)
Returns:
mask: 3D,shape = (batch_size, batch_size, batch_size)
'''
# 初始化一個二維矩陣,座標(i, j)不相等置為1,得到indices_not_equal
indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)
indices_not_equal = tf.logical_not(indices_equal)
# 因為最後得到一個3D的mask矩陣(i, j, k),增加一個維度,則 i_not_equal_j 在第三個維度增加一個即,(batch_size, batch_size, 1), 其他同理
i_not_equal_j = tf.expand_dims(indices_not_equal, 2)
i_not_equal_k = tf.expand_dims(indices_not_equal, 1)
j_not_equal_k = tf.expand_dims(indices_not_equal, 0)
# 想得到i!=j!=k, 三個不等取and即可, 最後可以得到當下標(i, j, k)不相等時才取True
distinct_indices = tf.logical_and(tf.logical_and(i_not_equal_j, i_not_equal_k), j_not_equal_k)
# 同樣根據labels得到對應i=j, i!=k
label_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))
i_equal_j = tf.expand_dims(label_equal, 2)
i_equal_k = tf.expand_dims(label_equal, 1)
valid_labels = tf.logical_and(i_equal_j, tf.logical_not(i_equal_k))
# mask即為滿足上面兩個約束,所以兩個3D取and
mask = tf.logical_and(distinct_indices, valid_labels)
return mask
1.3 計算triplet loss
numpy
中的實現, 點選檢視1.1
中計算得到了兩兩embeddings
的距離,大小(batch_size, batch_size)
, 需要得到所有三元組的triplet loss
, 即(batch_size, batch_size, batch_size)
大小- 為什麼
triplet_loss = anchor_positive_dist - anchor_negative_dist + margin
可以得到所有(i, j, k)
的triplet loss
,- 如下圖,
x0y
平面的是anchor_positive_dist
的距離矩陣(其實是3D
的, 想象一下) x0z
平面是anchor_negative_dist
的距離矩陣(也是3D
的)- 兩個相減, 比如
0-0 = 0
就相當於i=0, j=0
的距離,減去j=0, k=0
的距離 - 以此類推,得到所有三元組的
loss
- 如下圖,
def batch_all_triplet_loss(labels, embeddings, margin, squared=False):
'''
triplet loss of a batch
-------------------------------
Args:
labels: 標籤資料,shape = (batch_size,)
embeddings: 提取的特徵向量, shape = (batch_size, vector_size)
margin: margin大小, scalar
Returns:
triplet_loss: scalar, 一個batch的損失值
fraction_postive_triplets : valid的triplets佔的比例
'''
# 得到每兩兩embeddings的距離,然後增加一個維度,一維需要得到(batch_size, batch_size, batch_size)大小的3D矩陣
# 然後再點乘上valid 的 mask即可
pairwise_dis = _pairwise_distance(embeddings, squared=squared)
anchor_positive_dist = tf.expand_dims(pairwise_dis, 2)
assert anchor_positive_dist.shape[2] == 1, "{}".format(anchor_positive_dist.shape)
anchor_negative_dist = tf.expand_dims(pairwise_dis, 1)
assert anchor_negative_dist.shape[1] == 1, "{}".format(anchor_negative_dist.shape)
triplet_loss = anchor_positive_dist - anchor_negative_dist + margin
mask = _get_triplet_mask(labels)
mask = tf.to_float(mask)
triplet_loss = tf.multiply(mask, triplet_loss)
triplet_loss = tf.maximum(triplet_loss, 0.0)
# 計算valid的triplet的個數,然後對所有的triplet loss求平均
valid_triplets = tf.to_float(tf.greater(triplet_loss, 1e-16))
num_positive_triplets = tf.reduce_sum(valid_triplets)
num_valid_triplets = tf.reduce_sum(mask)
fraction_postive_triplets = num_positive_triplets / (num_valid_triplets + 1e-16)
triplet_loss = tf.reduce_sum(triplet_loss) / (num_positive_triplets + 1e-16)
return triplet_loss, fraction_postive_triplets
2、Batch Hard
numpy
中的實現,點選檢視- 因為最後只有個
triplet
, 從positive
中選擇距離最大的,從negative
中選擇距離最小的即可
2.1 計算positive mask
- 滿足
a!=p and a, p label一致即可
- 之後用
mask
乘上計算的pairwice_distances
, 然後取每行最大值即為每個樣本對應positive
的最大距離
def _get_anchor_positive_triplet_mask(labels):
'''
得到合法的positive的mask, 即2D的矩陣,[a, p], a!=p and a和p相同labels
------------------------------------------------
Args:
labels: 標籤資料,shape = (batch_size, )
Returns:
mask: 合法的positive mask, shape = (batch_size, batch_size)
'''
indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)
indices_not_equal = tf.logical_not(indices_equal) # (i, j)不相等
labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1)) # labels相等,
mask = tf.logical_and(indices_not_equal, labels_equal) # 取and即可
return mask
2.2 計算negative mask
- 只需
[a, n]
對應的labels
不一致即可
def _get_anchor_negative_triplet_mask(labels):
'''
得到negative的2D mask, [a, n] 只需a, n不同且有不同的labels
------------------------------------------------
Args:
labels: 標籤資料,shape = (batch_size, )
Returns:
mask: negative mask, shape = (batch_size, batch_size)
'''
labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))
mask = tf.logical_not(labels_equal)
return mask
2.3 batch hard loss
- 計算最大
positive
距離時直接取valid
的每一行的最大值即可 - 計算最小
negative
距離時不能直接取每一行的最小值,因為invalid
位置的值為0
,所以可以在invalid
位置加上每一行的最大值,然後就可以取每一行的最小值了
def batch_hard_triplet_loss(labels, embeddings, margin, squared=False):
'''
batch hard triplet loss of a batch, 每個樣本最大的positive距離 - 對應樣本最小的negative距離
------------------------------------
Args:
labels: 標籤資料,shape = (batch_size,)
embeddings: 提取的特徵向量, shape = (batch_size, vector_size)
margin: margin大小, scalar
Returns:
triplet_loss: scalar, 一個batch的損失值
'''
pairwise_distances = _pairwise_distance(embeddings)
mask_anchor_positive = _get_anchor_positive_triplet_mask(labels)
mask_anchor_positive = tf.to_float(mask_anchor_positive)
anchor_positive_dist = tf.multiply(mask_anchor_positive, pairwise_distances)
hardest_positive_dist = tf.reduce_max(anchor_positive_dist, axis=1, keepdims=True) # 取每一行最大的值即為最大positive距離
tf.summary.scalar("hardest_positive_dis", tf.reduce_mean(hardest_positive_dist))
'''取每一行最小值得時候,因為invalid [a, n]置為了0, 所以不能直接取,這裡對應invalid位置加上每一行的最大值即可,然後再取最小的值'''
mask_anchor_negative = _get_anchor_negative_triplet_mask(labels)
mask_anchor_negative = tf.to_float(mask_anchor_negative)
max_anchor_negative_dist = tf.reduce_max(pairwise_distances, axis=1, keepdims=True) # 每一樣最大值
anchor_negative_dist = pairwise_distances + max_anchor_negative_dist * (1.0 - mask_anchor_negative) # (1.0 - mask_anchor_negative)即為invalid位置
hardest_negative_dist = tf.reduce_min(anchor_negative_dist, axis=1, keepdims=True)
tf.summary.scalar("hardest_negative_dist", tf.reduce_mean(hardest_negative_dist))
triplet_loss = tf.maximum(hardest_positive_dist - hardest_negative_dist + margin, 0.0)
triplet_loss = tf.reduce_mean(triplet_loss)
return triplet_loss
三、具體使用
- 使用
mnist
資料集和triplet loss
訓練,最後得到的embeddings
應該是同一類別的靠在一起 - 因為只有
10
個類別,所以直接隨機取batch
大小的資料,這裡batch_size=64
,- 注意如果類別很多時,就不能隨機構建
batch
了, 需要選P
個類別,然後每個類別選K
張圖
- 注意如果類別很多時,就不能隨機構建
3.1 構建模型
- 上一篇介紹了 tensorflow的高階API, 這裡使用
Estimator
構建模型 - 全部程式碼:點選檢視
3.1.1 使用Estimator
params
指定超引數, 這裡儲存為json
格式的檔案,- 配置為:
{
"learning_rate": 1e-3,
"batch_size": 64,
"num_epochs": 20,
"num_channels": 32,
"use_batch_norm": false,
"bn_momentum": 0.9,
"margin": 0.5,
"embedding_size": 64,
"triplet_strategy": "batch_all",
"squared": false,
"image_size": 28,
"num_labels": 10,
"train_size": 50000,
"eval_size": 10000,
"num_parallel_calls": 4,
"save_summary_steps": 50
}
def main(argv):
args = parser.parse_args(argv[1:])
tf.logging.info("建立模型....")
with open(args.model_config) as f:
params = json.load(f)
config = tf.estimator.RunConfig(model_dir=args.model_dir, tf_random_seed=100) # config
cls = tf.estimator.Estimator(model_fn=my_model, config=config, params=params) # 建立模型
tf.logging.info("開始訓練模型,共{} epochs....".format(params['num_epochs']))
cls.train(input_fn = lambda: train_input_fn(args.data_dir, params)) # 訓練模型,指定輸入
tf.logging.info("測試集評價模型....")
res = cls.evaluate(input_fn = lambda: test_input_fn(args.data_dir, params)) # 測試模型,指定輸入
for key in res:
print("評價---{} : {}".format(key, res[key]))
3.1.2 model_fn函式
- 下面都有對應註釋
- 計算
embedding_mean_norm
中每一行embeding
公式為: , 然後再取均值
def my_model(features, labels, mode, params):
'''
model_fn指定函式,構建模型,訓練等
---------------------------------
Args:
features: 輸入,shape = (batch_size, 784)
labels: 輸出,shape = (batch_size, )
mode: str, 階段
params: dict, 超引數
'''
is_training = (mode == tf.estimator.ModeKeys.TRAIN)
images = features
images = tf.reshape(images, shape=[-1, params['image_size'], params['image_size'], 1]) # reshape (batch_size, img_size, img_size, 1)
with tf.variable_scope("model"):
embeddings = build_model(is_training, images, params) # 簡歷模型
if mode == tf.estimator.ModeKeys.PREDICT: # 如果是預測階段,直接返回得到embeddings
predictions = {'embeddings': embeddings}
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
'''呼叫對應的triplet loss'''
labels = tf.cast(labels, tf.int64)
if params['triplet_strategy'] == 'batch_all':
loss, fraction = batch_all_triplet_loss(labels, embeddings, margin=params['margin'], squared=params['squared'])
elif params['triplet_strategy'] == 'batch_hard':
loss = batch_hard_triplet_loss(labels, embeddings, margin=params['margin'], squared=params['squared'])
else:
raise ValueError("triplet_strategy 配置不正確: {}".format(params['triplet_strategy']))
embedding_mean_norm = tf.reduce_mean(tf.norm(embeddings, axis=1)) # 這裡計算了embeddings的二範數的均值
tf.summary.scalar("embedding_mean_norm", embeddin