論文 END-TO-END OPTIMIZED IMAGE COMPRESSION 原始碼解析與論文閱讀——預處理+分析變換
0 前言
1 預處理
1.1 讀取
def read_bmp(filename): """Loads a bmp image file.""" string = tf.read_file(filename) image = tf.image.decode_image(string, channels=0) #讀取、解碼,讀入後為二維矩陣 image = tf.cast(image, tf.float32) #轉float32 型別 image /= 255 #歸一化 return image
1.2 維度變換
# 假設輸入矩陣維度為[256,256]
x = tf.expand_dims(x, 0) #增加了批維度batch [1,256,256]
x.set_shape([1, None, None, 1]) #增加了通道維度channel [1,256,256,1]
# 到這一步,變為[1,256,256,1]
x_shape = tf.shape(x) #讀取形狀,用於後期的解壓縮。或可省略
PS:不確定是否必須擴充套件成4維矩陣,個人認為維度擴充套件主要是為了適配TensorFlow的函式。或許batch維度可以省略。
2 分析變換
分析變換類定義:
class AnalysisTransform(tf.keras.layers.Layer): """The analysis transform.""" def __init__(self, num_filters, *args, **kwargs): self.num_filters = num_filters super(AnalysisTransform, self).__init__(*args, **kwargs) def build(self, input_shape): self._layers = [ tfc.SignalConv2D( self.num_filters, (5, 5), name="layer_0", corr=True, strides_down=2, padding="same_zeros", use_bias=True, activation=tfc.GDN(name="gdn_0")), tfc.SignalConv2D( self.num_filters, (5, 5), name="layer_1", corr=True, strides_down=2, padding="same_zeros", use_bias=True, activation=tfc.GDN(name="gdn_1")), #圖上到這一步過,實際前面3層都一樣。 tfc.SignalConv2D( self.num_filters, (5, 5), name="layer_2", corr=True, strides_down=2, padding="same_zeros", use_bias=True, activation=tfc.GDN(name="gdn_2")), ] super(AnalysisTransform, self).build(input_shape) def call(self, tensor): for layer in self._layers: tensor = layer(tensor)
以一層為例說明:
tfc.SignalConv2D(
self.num_filters, #濾波器個數,決定輸出特徵圖的通道數,例如36,輸出則為[256,256,36]
(5, 5), #卷積核尺寸
name="layer_0", #名字,略
corr=True, # 卷積/互相關?
strides_down=2, #下采樣步長
padding="same_zeros", #0填充
use_bias=True,#Boolean, whether an additive constant will be applied to each output channel.
activation=tfc.GDN(name="gdn_0")# 啟用函式GDN
),
- 呼叫順序
init()
->build()
->call()
2.1 初始化init()
一些變數的賦值
FPGA可能無法做面向物件程式設計?具體如何傳參還請你們設計。
def __init__(self, filters, kernel_support,
corr=False, strides_down=1, strides_up=1, padding="valid",
extra_pad_end=True, channel_separable=False,
data_format="channels_last",
activation=None, use_bias=False, use_explicit=True,
kernel_initializer=tf.initializers.variance_scaling(),
bias_initializer=tf.initializers.zeros(),
kernel_regularizer=None, bias_regularizer=None,
kernel_parameterizer=parameterizers.RDFTParameterizer(),
bias_parameterizer=None,
**kwargs):#以上為預設值
#呼叫父類建構函式
super(_SignalConv, self).__init__(**kwargs)
self._filters = int(filters) #√
self._kernel_support = self._normalized_tuple(
kernel_support, "kernel_support") # =(5,5)
self._corr = bool(corr) #√
self._strides_down = self._normalized_tuple(strides_down, "strides_down") #√
self._strides_up = self._normalized_tuple(strides_up, "strides_up") #√
self._padding = str(padding).lower()
try:#√
self._pad_mode = {
"valid": None,
"same_zeros": "CONSTANT",
"same_reflect": "REFLECT",
}[self.padding]
except KeyError:
raise ValueError("Unsupported padding mode: '{}'".format(padding))
self._extra_pad_end = bool(extra_pad_end)
self._channel_separable = bool(channel_separable)
self._data_format = str(data_format)
self._activation = activation
self._use_bias = bool(use_bias)#√
self._use_explicit = bool(use_explicit)
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._kernel_parameterizer = kernel_parameterizer
self._bias_parameterizer = bias_parameterizer
if self.data_format not in ("channels_first", "channels_last"):
raise ValueError("Unknown data format: '{}'.".format(self.data_format))
self.input_spec = tf.keras.layers.InputSpec(ndim=self._rank + 2)
2.2 build()
def build(self, input_shape):
#獲取輸入矩陣的形狀,例如[1,256,256,1]
input_shape = tf.TensorShape(input_shape)
channel_axis = {"channels_first": 1, "channels_last": -1}[self.data_format]
input_channels = input_shape.as_list()[channel_axis]
if input_channels is None:
raise ValueError("The channel dimension of the inputs must be defined.")
# 具體化輸入的資訊:維度=2+2
self.input_spec = tf.keras.layers.InputSpec(
ndim=self._rank + 2, axes={channel_axis: input_channels})
#self.input_spec=(ndim=4, axes={-1: 1})
# 卷積核形狀=(5,5)+(1,36)=(5,5,1,36) 4維卷積核
kernel_shape = self.kernel_support + (input_channels, self.filters)
if self.channel_separable:
output_channels = self.filters * input_channels
else:
output_channels = self.filters #輸出通道數=36
#2.2.1呼叫parameterizers.RDFTParameterizer()
kernel_parameterizer = self.kernel_parameterizer
if kernel_parameterizer is None:
getter = self.add_weight
else:
getter = functools.partial(
kernel_parameterizer, getter=self.add_weight)
self._kernel = getter(
name="kernel", shape=kernel_shape, dtype=self.dtype,
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer)
#2.2.2 初始化bias 結果為shape=(36,), dtype=float32 全0向量
if self.use_bias:
bias_parameterizer = self.bias_parameterizer
if bias_parameterizer is None:
getter = self.add_weight #√
else:
getter = functools.partial(
bias_parameterizer, getter=self.add_weight)
self._bias = getter(
name="bias", shape=(output_channels,), dtype=self.dtype,
initializer=self.bias_initializer, regularizer=self.bias_regularizer)
super(_SignalConv, self).build(input_shape)
- 呼叫了引數化器,給卷積核賦值,賦值後是一個shape=(5, 5, 1, 36), float32型別的矩陣。
- 因為不需要實現訓練,這一步替代為讀取模型引數、賦值
2.3 call()
2.3.1 初始化
inputs = tf.convert_to_tensor(inputs)
outputs = inputs
kernel = self.kernel
corr = self.corr
2.3.2 零填充
1)計算每個維度首、尾的填充數。
padding = padding_ops.same_padding_for_kernel(
self.kernel_support=(5,5), corr=True, self.strides_up=(1,1))
# padding = [(2, 2), (2, 2)] 即2個維度,首尾各填充2個畫素
呼叫的函式
def same_padding_for_kernel(shape, corr, strides_up=None):
rank = len(shape)
if strides_up is None:
strides_up = rank * (1,)
if corr:
padding = [(s // 2, (s - 1) // 2) for s in shape] #√
else:
padding = [((s - 1) // 2, s // 2) for s in shape]
padding = [((padding[i][0] - 1) // strides_up[i] + 1,
(padding[i][1] - 1) // strides_up[i] + 1) for i in range(rank)] #√
return padding
2)“預填充”置零,即不進行預填充。
prepadding = self._rank * ((0, 0),)
# prepadding = ((0, 0), (0, 0))
3)填充
(這一步實際是在卷積計算時操作,在這裡先說明更清晰)
採用same padding,根據1)的計算結果填充0。
例如:padding = [(2, 2), (2, 2)]
2個維度,首尾各填充2個畫素,即影象的4條邊各拓展2個畫素寬的0。
2.3.3 卷積計算
convolution/correlation 卷積/互相關
1)執行 互相關 下采樣
outputs = self._correlate_down_explicit(outputs=輸入的矩陣, kernel, padding)
↓
def _correlate_down_explicit(self, inputs, kernel, padding):
# Computes correlation followed by downsampling, with arbitrary zero
# padding.
data_format = self._op_data_format
strides = self._padded_tuple(self.strides_down, 1) # =(1, 2, 2, 1)
padding = self._padded_tuple(padding, (0, 0)) # ((0, 0), (2, 2), (2, 2), (0, 0))
do_cast = inputs.dtype.is_integer
if self._rank == 1 and not self.channel_separable:
# 1D 省略
elif self._rank == 2 and not self.channel_separable:
# `tf.nn.conv2d` performs correlations followed by optional downsampling.
if do_cast: # 整型,忽略
inputs = tf.cast(inputs, tf.float32)
#卷積
outputs = tf.nn.conv2d(
inputs=4維矩陣, kernel=shape(5,5,1,36)矩陣,
strides=strides=(1, 2, 2, 1), padding=padding=((0, 0), (2, 2), (2, 2), (0, 0)), data_format=data_format)
if do_cast:# 整型,忽略
outputs = tf.cast(tf.math.round(outputs), self.accum_dtype)
else:
self._raise_notimplemented()
return outputs
輸入:
kernel 形狀 [filter_height, filter_width, in_channels, out_channels]
=[5,5,1,36]
輸入矩陣 [batch, in_height, in_width, in_channels]
=[1,256,256,1]
tf.nn.conv2d
執行了以下操作:
- 將濾波器(卷積核)展平為形狀為[filter_height * filter_width * in_channels, output_channels]=[25,36]的二維矩陣.
- 從輸入張量中提取影象patch,以形成形狀為[batch, out_height, out_width, filter_height * filter_width * in_channels]=[1,256,256,25]的虛擬張量.
- 對於每個patch,右乘卷積核矩陣和影象patch向量.
output[b, i, j, k] =
sum_{di, dj, q} input[b, strides[1] * i + di, strides[2] * j + dj, q] *filter[di, dj, q, k]
上述操作本質上就是:
- 如果輸入是
[1,256,256,c]
的圖,kernel是[5,5,c,36]
,
共36個[5,5,c]
的卷積核,對於每個卷積核:
以2的步長滑動,對
[256,256,c]
的影象做卷積,產生[128, 128]
的特徵圖一個。c=1,對應元素直接相乘;c≠1,則是做c維向量的點積。
- 共36個,輸出
(1,128,128,36)
的特徵圖。
參考:
原始碼
tensorflow程式碼解析
tf.nn.conv2d是怎樣實現卷積的?
Explaining Tensorflow Code for a Convolutional Neural Network
What does tf.nn.conv2d do in tensorflow?
2.3.4 加bias
outputs = tf.nn.bias_add(outputs, bias)
outputs
:shape(1,256,256,36)
bias
:shape(36,)
- 推測應該是將每個通道的(256,256)的矩陣加上同一個bias,即每一個bias對應一個通道,目前還沒查到具體的機制。
2.3.5 啟用函式 GDN(正向)
outputs = self.activation(outputs)
執行的公式如下,其中gamma、beta是可以訓練的引數,需要讀取模型賦值。
y[i] = x[i] / sqrt(beta[i] + sum_j(gamma[j, i] * x[j]^2))
對應程式碼:
norm_pool = tf.linalg.matmul(tf.math.square(inputs), self.gamma)
norm_pool = tf.nn.bias_add(norm_pool, self.beta)
norm_pool = tf.math.rsqrt(norm_pool)