1. 程式人生 > 其它 >論文 END-TO-END OPTIMIZED IMAGE COMPRESSION 原始碼解析與論文閱讀——預處理+分析變換

論文 END-TO-END OPTIMIZED IMAGE COMPRESSION 原始碼解析與論文閱讀——預處理+分析變換

目錄

0 前言

關於寫這篇文章的背景、相關資源連線、概述,參見這篇文章


1 預處理

1.1 讀取

def read_bmp(filename):
  """Loads a bmp image file."""
  string = tf.read_file(filename)
  image = tf.image.decode_image(string, channels=0) 
  #讀取、解碼,讀入後為二維矩陣
  image = tf.cast(image, tf.float32) #轉float32 型別
  image /= 255 #歸一化
  return image

1.2 維度變換

  # 假設輸入矩陣維度為[256,256]
  x = tf.expand_dims(x, 0) #增加了批維度batch [1,256,256] 
  x.set_shape([1, None, None, 1]) #增加了通道維度channel [1,256,256,1] 
  # 到這一步,變為[1,256,256,1]
  x_shape = tf.shape(x) #讀取形狀,用於後期的解壓縮。或可省略

PS:不確定是否必須擴充套件成4維矩陣,個人認為維度擴充套件主要是為了適配TensorFlow的函式。或許batch維度可以省略。


2 分析變換

分析變換類定義:

class AnalysisTransform(tf.keras.layers.Layer):
  """The analysis transform."""

  def __init__(self, num_filters, *args, **kwargs):
    self.num_filters = num_filters
    super(AnalysisTransform, self).__init__(*args, **kwargs)

  def build(self, input_shape):
    self._layers = [
        tfc.SignalConv2D(
            self.num_filters, (5, 5), name="layer_0", corr=True, strides_down=2,
            padding="same_zeros", use_bias=True,
            activation=tfc.GDN(name="gdn_0")),
        tfc.SignalConv2D(
            self.num_filters, (5, 5), name="layer_1", corr=True, strides_down=2,
            padding="same_zeros", use_bias=True,
            activation=tfc.GDN(name="gdn_1")),
        #圖上到這一步過,實際前面3層都一樣。
		tfc.SignalConv2D(
            self.num_filters, (5, 5), name="layer_2", corr=True, strides_down=2,
            padding="same_zeros", use_bias=True,
            activation=tfc.GDN(name="gdn_2")),  
    ]
    super(AnalysisTransform, self).build(input_shape)

  def call(self, tensor):
    for layer in self._layers:
      tensor = layer(tensor)

以一層為例說明:

tfc.SignalConv2D(
	self.num_filters, #濾波器個數,決定輸出特徵圖的通道數,例如36,輸出則為[256,256,36]
	(5, 5), #卷積核尺寸
	name="layer_0",  #名字,略
	corr=True, # 卷積/互相關?
	strides_down=2, #下采樣步長
    padding="same_zeros", #0填充
	use_bias=True,#Boolean, whether an additive constant will be applied to each output channel.
    activation=tfc.GDN(name="gdn_0")# 啟用函式GDN
),
  • 呼叫順序
    init()->build()->call()

2.1 初始化init()

一些變數的賦值
FPGA可能無法做面向物件程式設計?具體如何傳參還請你們設計。

  def __init__(self, filters, kernel_support,
               corr=False, strides_down=1, strides_up=1, padding="valid",
               extra_pad_end=True, channel_separable=False,
               data_format="channels_last",
               activation=None, use_bias=False, use_explicit=True,
               kernel_initializer=tf.initializers.variance_scaling(),
               bias_initializer=tf.initializers.zeros(),
               kernel_regularizer=None, bias_regularizer=None,
               kernel_parameterizer=parameterizers.RDFTParameterizer(),
               bias_parameterizer=None,
               **kwargs):#以上為預設值

    #呼叫父類建構函式
	super(_SignalConv, self).__init__(**kwargs)

    self._filters = int(filters) #√
    self._kernel_support = self._normalized_tuple(
        kernel_support, "kernel_support") # =(5,5)
    self._corr = bool(corr) #√
    self._strides_down = self._normalized_tuple(strides_down, "strides_down") #√
    self._strides_up = self._normalized_tuple(strides_up, "strides_up") #√
    self._padding = str(padding).lower()
    try:#√
      self._pad_mode = {
          "valid": None,
          "same_zeros": "CONSTANT",
          "same_reflect": "REFLECT",
      }[self.padding]
    except KeyError:
      raise ValueError("Unsupported padding mode: '{}'".format(padding))
    self._extra_pad_end = bool(extra_pad_end)
    self._channel_separable = bool(channel_separable)
    self._data_format = str(data_format)
    self._activation = activation
    self._use_bias = bool(use_bias)#√
    self._use_explicit = bool(use_explicit)
    self._kernel_initializer = kernel_initializer
    self._bias_initializer = bias_initializer
    self._kernel_regularizer = kernel_regularizer
    self._bias_regularizer = bias_regularizer
    self._kernel_parameterizer = kernel_parameterizer
    self._bias_parameterizer = bias_parameterizer

    if self.data_format not in ("channels_first", "channels_last"):
      raise ValueError("Unknown data format: '{}'.".format(self.data_format))

    self.input_spec = tf.keras.layers.InputSpec(ndim=self._rank + 2)

2.2 build()

  def build(self, input_shape):
  	#獲取輸入矩陣的形狀,例如[1,256,256,1]
    input_shape = tf.TensorShape(input_shape)
    channel_axis = {"channels_first": 1, "channels_last": -1}[self.data_format]
    input_channels = input_shape.as_list()[channel_axis]
    if input_channels is None:
      raise ValueError("The channel dimension of the inputs must be defined.")
	# 具體化輸入的資訊:維度=2+2
    self.input_spec = tf.keras.layers.InputSpec(
        ndim=self._rank + 2, axes={channel_axis: input_channels})
	#self.input_spec=(ndim=4, axes={-1: 1})
	
	# 卷積核形狀=(5,5)+(1,36)=(5,5,1,36)  4維卷積核
    kernel_shape = self.kernel_support + (input_channels, self.filters)
    if self.channel_separable:
      output_channels = self.filters * input_channels
    else:
      output_channels = self.filters #輸出通道數=36
	
	#2.2.1呼叫parameterizers.RDFTParameterizer()
    kernel_parameterizer = self.kernel_parameterizer
    if kernel_parameterizer is None:
      getter = self.add_weight
    else:
      getter = functools.partial(
          kernel_parameterizer, getter=self.add_weight)
    
	self._kernel = getter(
        name="kernel", shape=kernel_shape, dtype=self.dtype,
        initializer=self.kernel_initializer,
        regularizer=self.kernel_regularizer)
	
	#2.2.2 初始化bias 結果為shape=(36,), dtype=float32 全0向量
    if self.use_bias:
      bias_parameterizer = self.bias_parameterizer
      if bias_parameterizer is None:
        getter = self.add_weight #√
      else:
        getter = functools.partial(
            bias_parameterizer, getter=self.add_weight)
      self._bias = getter(
          name="bias", shape=(output_channels,), dtype=self.dtype,
          initializer=self.bias_initializer, regularizer=self.bias_regularizer)
    super(_SignalConv, self).build(input_shape)
  • 呼叫了引數化器,給卷積核賦值,賦值後是一個shape=(5, 5, 1, 36), float32型別的矩陣。
  • 因為不需要實現訓練,這一步替代為讀取模型引數、賦值

2.3 call()

2.3.1 初始化

	inputs = tf.convert_to_tensor(inputs)
    outputs = inputs
    kernel = self.kernel
    corr = self.corr

2.3.2 零填充

1)計算每個維度首、尾的填充數。

padding = padding_ops.same_padding_for_kernel(
          self.kernel_support=(5,5), corr=True, self.strides_up=(1,1))
# padding = [(2, 2), (2, 2)] 即2個維度,首尾各填充2個畫素

呼叫的函式

def same_padding_for_kernel(shape, corr, strides_up=None):
  rank = len(shape)
  if strides_up is None:
    strides_up = rank * (1,)

  if corr:
    padding = [(s // 2, (s - 1) // 2) for s in shape] #√
  else:
    padding = [((s - 1) // 2, s // 2) for s in shape]

  padding = [((padding[i][0] - 1) // strides_up[i] + 1,
              (padding[i][1] - 1) // strides_up[i] + 1) for i in range(rank)] #√
  return padding

2)“預填充”置零,即不進行預填充。

prepadding = self._rank * ((0, 0),)
# prepadding = ((0, 0), (0, 0))

3)填充
(這一步實際是在卷積計算時操作,在這裡先說明更清晰)
採用same padding,根據1)的計算結果填充0。
例如:padding = [(2, 2), (2, 2)] 2個維度,首尾各填充2個畫素,即影象的4條邊各拓展2個畫素寬的0。

2.3.3 卷積計算

convolution/correlation 卷積/互相關
1)執行 互相關 下采樣

outputs = self._correlate_down_explicit(outputs=輸入的矩陣, kernel, padding)

def _correlate_down_explicit(self, inputs, kernel, padding):
    # Computes correlation followed by downsampling, with arbitrary zero
    # padding.
    data_format = self._op_data_format
    strides = self._padded_tuple(self.strides_down, 1) # =(1, 2, 2, 1)
    padding = self._padded_tuple(padding, (0, 0)) # ((0, 0), (2, 2), (2, 2), (0, 0))
    do_cast = inputs.dtype.is_integer

    if self._rank == 1 and not self.channel_separable:
      # 1D 省略
    elif self._rank == 2 and not self.channel_separable:
      # `tf.nn.conv2d` performs correlations followed by optional downsampling.
      if do_cast: # 整型,忽略
        inputs = tf.cast(inputs, tf.float32)
      #卷積
      outputs = tf.nn.conv2d(
          inputs=4維矩陣, kernel=shape(5,5,1,36)矩陣,
          strides=strides=(1, 2, 2, 1), padding=padding=((0, 0), (2, 2), (2, 2), (0, 0)), data_format=data_format)
      
      if do_cast:# 整型,忽略
        outputs = tf.cast(tf.math.round(outputs), self.accum_dtype)
    else:
      self._raise_notimplemented()

    return outputs

輸入:
kernel 形狀 [filter_height, filter_width, in_channels, out_channels]=[5,5,1,36]
輸入矩陣 [batch, in_height, in_width, in_channels]=[1,256,256,1]
tf.nn.conv2d執行了以下操作:

  1. 將濾波器(卷積核)展平為形狀為[filter_height * filter_width * in_channels, output_channels]=[25,36]的二維矩陣.
  2. 從輸入張量中提取影象patch,以形成形狀為[batch, out_height, out_width, filter_height * filter_width * in_channels]=[1,256,256,25]的虛擬張量.
  3. 對於每個patch,右乘卷積核矩陣和影象patch向量.
output[b, i, j, k] = 
sum_{di, dj, q} input[b, strides[1] * i + di, strides[2] * j + dj, q] *filter[di, dj, q, k]

上述操作本質上就是:

  • 如果輸入是[1,256,256,c]的圖,kernel是[5,5,c,36]
    共36個[5,5,c]的卷積核,對於每個卷積核:

以2的步長滑動,對[256,256,c]的影象做卷積,產生[128, 128]的特徵圖一個。c=1,對應元素直接相乘;c≠1,則是做c維向量的點積。

  • 共36個,輸出(1,128,128,36)的特徵圖。

參考:
原始碼
tensorflow程式碼解析
tf.nn.conv2d是怎樣實現卷積的?
Explaining Tensorflow Code for a Convolutional Neural Network
What does tf.nn.conv2d do in tensorflow?

2.3.4 加bias

outputs = tf.nn.bias_add(outputs, bias)

outputs:shape(1,256,256,36)
bias:shape(36,)

  • 推測應該是將每個通道的(256,256)的矩陣加上同一個bias,即每一個bias對應一個通道,目前還沒查到具體的機制。

2.3.5 啟用函式 GDN(正向)

outputs = self.activation(outputs) 

執行的公式如下,其中gamma、beta是可以訓練的引數,需要讀取模型賦值。

  y[i] = x[i] / sqrt(beta[i] + sum_j(gamma[j, i] * x[j]^2))

對應程式碼:

norm_pool = tf.linalg.matmul(tf.math.square(inputs), self.gamma)
norm_pool = tf.nn.bias_add(norm_pool, self.beta)
norm_pool = tf.math.rsqrt(norm_pool)