1. 程式人生 > 實用技巧 >優化演算法之SGD

優化演算法之SGD

SGD-隨機梯度演算法介紹

機器學習演算法中的代價函式通常可以分解成每個樣本的代價函式的總和. 訓練資料的負條件對數似然可以寫成

\[J(\theta)=\mathbb{E}_{x,y \in \hat p_{data}}L(x, y, \theta)=\frac {1} {m} \sum_{i=1}^m L(x^{(i)},y^{(i)},\theta) \]

其中 \(L\) 是每個樣本的損失 \(L(x, y, \theta) = -log(p(y|x;\theta))\).

​ 對於這些相加的代價函式, 梯度下降需要計算

\[\nabla_{\theta} J(\theta) = \frac {1} {m} \sum_{i=1}^m \nabla_{\theta} L(x^{(i)},y^{(i)},\theta) \]

​ 小批量樣本 ( minibatch ) \(\mathbb{B}=\{x^{(1)},...,x^{(m')}\}\) . 梯度的估計可以表示成

\[\mathcal{g}=\frac {1} {m'} \nabla_{\theta} \sum_{i=1}^{m'} L(x^{(i)}, y^{(i)}, \theta) \]

使用來自小批量 \(\mathbb{B}\) 的樣本. 然後, 隨機梯度下降演算法使用如下的梯度下降估計:

\[\theta \leftarrow \theta - \epsilon \cdot \mathcal{g} \]

其中, \(\epsilon\) 是學習率.

實際應用:

案例1-Logistic

邏輯斯蒂迴歸(Logistic), \(Logistic(x)=\frac {1} {1+e^{w \cdot x+b}}\).

損失函式 ( 交叉熵損失 ): \(BCELoss(\hat y,y)=-\sum_i {y_i \cdot log(\hat y_i)}\).

\(Loss(\hat y, y)=-(y \cdot log(\hat y) + (1-y) \cdot log(1-\hat y))\).

\[\begin{split} & \frac {dL} {d \hat y} = \frac {1 - y} {1 - \hat y} - \frac {y} {\hat y} \\ & \frac {d\hat y} {dw} = -2x \cdot {\hat y} \cdot (1-\hat y) \\ & \frac {d\hat y} {db} = -2 {\hat y} \cdot (1-\hat y) \end{split} \]

則損失函式 \(Loss(\hat y, y)\)\(w, b\) 求偏導

\[\begin{split} & \frac {dL} {dw} = \frac {dL} {d \hat y} \cdot \frac {d\hat y} {dw} = 2(y\cdot \hat y - (1-y)(1-\hat y))\cdot x \\ & \frac {dL} {db} = \frac {dL} {d \hat y} \cdot \frac {d\hat y} {db} = 2(y\cdot \hat y - (1-y)(1-\hat y)) \end{split} \]

\(\theta \leftarrow \theta - \epsilon \cdot \mathcal{g}\) 如下:

\[\begin{split} & w \leftarrow w - \epsilon \cdot \frac {dL} {dw} = w - \epsilon \cdot 2 \cdot (y\cdot \hat y - (1-y)(1-\hat y))\cdot x \\ & b \leftarrow b - \epsilon \cdot \frac {dL} {db} = b - \epsilon \cdot 2 \cdot (y\cdot \hat y - (1-y)(1-\hat y)) \end{split} \]

上述演算法的實際應用如下:

# 基類
class Module(object):
    def __init__(self):
        self.loss_pro = 0.
        self.loss_now = 0.
        self.x = None
        self.y_pred = None
        self.y_true = None

    def backprop(self, *args, **kwargs):
        r"""
        SGD: dLoss/dy_pred * dy_pred/dw
        """
        pass


# Logistic
class Logistic(Module):
    def __init__(self, w=None, b=None, lr=0.01):
        super(Logistic, self).__init__()
        self.lr = lr
        self.w = w
        self.b = b

    def __call__(self, x):
        self.x = x
        inputs_shape = x.shape
        if self.w is None:  # 隨機初始化, 效果較差
            self.w = np.zeros(*inputs_shape[:-1])[:, np.newaxis]
        if self.b is None:  # 隨機初始化, 效果較差
            self.b = np.zeros((1, 1))
        return 1. / (1. + np.exp(np.matmul(self.w.T, x) + self.b))

    def loss(self, y_pred, y_true, delta=1e-16):
        self.y_pred = y_pred
        self.y_true = y_true
        self.loss_pro = self.loss_now
        y_pred = np.minimum(np.maximum(y_pred, delta), 1. - delta)
        self.loss_now = -(y_true * np.log(y_pred) +
                          (1 - y_true) * np.log(1 - y_pred))
        return self.loss_now

    def backprop(self, delta=1e-64, **kwargs):
        y_pred = np.minimum(np.maximum(self.y_pred, delta), 1. - delta)
        loss_diff = self.y_true * y_pred - (1 - self.y_true) * (
                1 - y_pred)
        self.w -= self.lr * 2 * loss_diff * self.x
        self.b -= self.lr * 2 * loss_diff
        return True
    pass


# Linear
class Linear(Module):
    def __init__(self, w=None, b=None, lr=0.01):
        super(Linear, self).__init__()
        self.lr = lr
        self.w = w
        self.b = b

    def __call__(self, x):
        self.x = x
        inputs_shape = x.shape
        if self.w is None:
            self.w = np.random.randn(*inputs_shape[:-1])[:, np.newaxis]
        if self.b is None:
            self.b = np.random.randn(1, 1)
        return np.matmul(self.w.T, x) + self.b

    def loss(self, y_pred, y_true):
        self.y_pred = y_pred
        self.y_true = y_true
        self.loss_pro = self.loss_now
        self.loss_now = np.sum((y_pred - y_true) ** 2)
        return self.loss_now

    def backprop(self, threshold=0.01, y_true=None):
        if abs(self.loss_now - self.loss_pro) < threshold * 5e-3:
            return False
        if abs(self.loss_now - self.loss_pro) < threshold * 64:
            self.lr *= 0.98
        elif self.loss_now < self.loss_pro:
            self.lr *= 1.001
        elif self.loss_now > self.loss_pro:
            self.lr *= .998

        self.w -= self.lr * 2 * np.matmul(
            self.x, (self.y_pred - self.y_true).T)
        self.b -= self.lr * 2 * (self.y_pred - self.y_true)
        return True
    pass


# SGD
def SGD(x, y, model, batch_size=1, epochs=10, threshold=0.001):
    """SDG
    
    Args:
    	x:
    	y:
    	model:
    	batch_size:
    	epochs:
    	threshold:
    	
    Returns:
    
    """
    inputs_shape = x.shape
    if inputs_shape[-1] > batch_size:
        batch_size = inputs_shape[-1]
    bool_break = False
    num_roced = x.shape[-1]
    for i in range(epochs):
        loss_mean = 0.
        for j in range(num_roced):
            y_pred = model(x[..., j:j + 1])
            y_true = y[..., j]
            sgd_loss = model.loss(y_pred, y_true)
            if (j + 1) % batch_size == 0:
                if np.abs(loss_mean) < threshold or loss_mean == np.NAN:
                    bool_break = True
                    break
                loss_mean = 0.
            loss_mean = (loss_mean * j + sgd_loss) / (j + 1)

            if not model.backprop(threshold=threshold):
                bool_break = True
                break
        if bool_break:
            break
    return model


def Sigmod(x, w, b):
    return 1. / (1. + np.exp(np.matmul(w.T, x) + b))


def test_SGD_Logistic(x, w, b):
    y_logistic = Sigmod(x, w, b)
    rand_y = np.random.randn(len(y_logistic))
    rand_y = 0.01 * rand_y / np.max(np.abs(rand_y))
    y_logistic = Sigmod(x, w, b) + rand_y > 0.5

    model = Logistic(lr=0.0005)
    sgd_model = SGD(x, y_logistic, model, batch_size=256,
                    epochs=10000, threshold=.5)
    pred = Sigmod(x, sgd_model.w, sgd_model.b)

    print(sgd_model.w)
    print(sgd_model.b)


def test_SGD_Linear(x, w, b):
    linear = Linear(w, b)
    y_linear = linear(x)
    rand_y = np.random.randn(len(y_linear))
    y_linear += 0.01 * rand_y / np.max(np.abs(rand_y))

    model = Linear(lr=0.1)
    sgd_model = SGD(x, y_linear, model, batch_size=256,
                    epochs=10000, threshold=.5)
    pred = Sigmod(x, sgd_model.w, sgd_model.b)

    print(sgd_model.w)
    print(sgd_model.b)


def test_SGD(model='logistic'):
    w = np.array([1.8, 2.5, 3.1, 2.3])
    b = 0.1
    # Data
    x = np.random.randn(len(w), 1024)
    if model == 'logistic':
        test_SGD_Logistic(x, w, b)
    elif model == 'linear':
        test_SGD_Linear(x, w, b)


if __name__ == '__main__':
    test_SGD('logistic')