1. 程式人生 > >CUDA並行算法系列之FFT快速卷積

CUDA並行算法系列之FFT快速卷積

卷積定義

在維基百科上,卷積定義為:

連續卷積公式

離散卷積定義為:

離散卷積公式

[ 0, 1, 2, 3][0, 1, 2]的卷積例子如下圖所示:

卷積示例

Python實現(直接卷積)

根據離散卷積的定義,用Python實現:

def conv(a, b):
    N = len(a)
    M = len(b)
    YN = N + M - 1
    y = [0.0 for i in range(YN)]
    for n in range(YN):
        for m in range(M):
            if 0 <= n - m and n - m < N:
                y[n] += a[n - m] * b[m]
    return
y

把陣列b逆序,則可以不交叉計算卷積(使用numpyarray[::-1]即可實現逆序):

import numpy as np
def conv2(a, b):
    N = len(a)
    M = len(b)
    YN = N + M - 1
    y = [0.0 for i in range(YN)]
    b = np.array(b)[::-1]       # 逆序
    for n in range(YN):
        for m in range(M):
            k = n - M + m + 1;
            if
0 <= k and k < N: y[n] += a[k] * b[m] return y

測試

可以利用numpy.convolve來檢驗計算結果的正確性:

if __name__ == '__main__':
    a = [ 0, 1, 2, 3 ]
    b = [ 0, 1, 2 ]
    print(conv2(a, b))
    print(np.convolve(a, b))

完整程式碼可以在Github上找到。

利用FFT快速卷積

時域的卷積和頻域的乘法是等價的,同時時域的乘法和頻域的卷積也是等價的。基於這個這個前提,可以把待卷積的陣列進行FFT變換,在頻域做乘法,然後再進行IFFT變換即可得到卷積結果。演算法流程描述如下:

  1. N=len(a), M = len(b), 其中a, b為待卷積的陣列,將長度增加到L>=N+M1,L=2n,nZ,即 L=2logN+M12+1
  2. 增加a, b的長度到L,後面補零。
  3. 分別計算afft=fft(a)bfft=fft(b)
  4. abfft=afft×bfft
  5. 用IFFT計算abaft的FFT逆變換,取前(N + M - 1)個值即為卷積結果。

FFT快速卷積Python程式碼如下:

def convfft(a, b):
    N = len(a)
    M = len(b)
    YN = N + M - 1
    FFT_N = 2 ** (int(np.log2(YN)) + 1)
    afft = np.fft.fft(a, FFT_N)
    bfft = np.fft.fft(b, FFT_N)
    abfft = afft * bfft
    y = np.fft.ifft(abfft).real[:YN]
    return y

測試

對比直接卷積、FFT卷積、numpy的卷積結果:

if __name__ == '__main__':
    a = [ 0, 1, 2, 3 ]
    b = [ 0, 1, 2 ]
    print(conv2(a, b))
    print(convfft(a, b))
    print(np.convolve(a, b))

可以看到,3個版本的計算結果是一致的。完整程式碼可以在Github上找到。

效能分析

複雜度分析

直接卷積的時間複雜度為o(MN),即o(n2)
FFT的時間複雜度為o(nlogn),FFT卷積複雜度為3次FFT+L次乘法,3o(nlogn)+o(n)=o(nlogn),及o(nlogn)
在實際應用中,卷積核(b)被提前計算,則只需2次FFT變換。

執行測試

分別測試3個版本在陣列長度為n * 1000 + 10, n=0,1,…,9的執行時間,並繪製執行時間曲線,編寫如下測試程式碼:

def time_test():
    import time
    import matplotlib.pyplot as plt

    def run(func, a, b):
        n = 1
        start = time.clock()
        for j in range(n):
            func(a, b)
        end = time.clock()
        run_time = end - start
        return run_time / n

    n_list = []
    t1_list = []
    t2_list = []
    t3_list = []
    for i in range(10):
        count = i * 1000 + 10
        print(count)
        a = np.ones(count)
        b = np.ones(count)
        t1 = run(conv, a, b)    # 直接卷積
        t2 = run(conv2, a, b)
        t3 = run(convfft, a, b) # FFT卷積
        n_list.append(count)
        t1_list.append(t1)
        t2_list.append(t2)
        t3_list.append(t3)

    # plot
    plt.plot(n_list, t1_list, label='conv')
    plt.plot(n_list, t2_list, label='conv2')
    plt.plot(n_list, t3_list, label='convfft')
    plt.legend()
    plt.title(u"convolve times")
    plt.ylabel(u"run times(ms/point)")
    plt.xlabel(u"length")
    plt.show()

執行得到的曲線圖如下:

卷積執行時間

從圖中可知,FFT卷積比直接卷積速度要快很多。完整程式碼可以在Github上找到

CUDA實現

直接卷積

只需要把外層迴圈並行化就可以在CUDA上實現卷積,程式碼如下:

// 直接計算卷積
__global__ void conv_kernel(const float *ina, const float *inb, float *out, size_t len_a, size_t len_b, size_t len_out)
{
    const int tid = blockIdx.x * blockDim.x + threadIdx.x;
    if (tid >= len_out)
    {
        return;
    }

    float sum = 0.0f;
    for (int m = 0; m < len_b; ++m)
    {
        int k = tid - m;
        if (0 <= k && k < len_a)
        {
            sum += ina[k] * inb[m];
        }
    }
    out[tid] = sum;
}

當然,可以使用共享記憶體和常量記憶體(卷積核存入常量記憶體)進行優化,優化的程式碼請檢視Github

cuFFT卷積

使用CUDA的cuFFT可以方便的進行快速傅立葉變換,cuFFT的詳細說明可以檢視NVIDIA的官方文件。本文主要使用到一下兩個函式:

  • cufftExecR2C:實數到複數的快速傅立葉變換(FFT)
  • cufftExecC2R:複數到實數的快速傅立葉逆變換(IFFT)

基於cuFFT的實數到複數的快速傅立葉變換程式碼如下:

void fft(float *in, Complex *out, size_t size)
{
    cufftHandle plan;
    cufftPlan1d(&plan, size, CUFFT_R2C, 1);
    cufftExecR2C(plan, in, out);
    cufftDestroy(plan);
}

基於cuFFT的複數到實數的快速傅立葉逆變換程式碼如下:

void ifft(Complex *in, float *out, size_t size)
{
    cufftHandle plan;
    cufftPlan1d(&plan, size, CUFFT_C2R, 1);
    cufftExecC2R(plan, in, out);
    cufftDestroy(plan);
}

其中Complex被定義為float2typedef float2 Complex;

有了FFT,那麼基於CUDA的卷積程式碼可如下編寫:

void convfft( float *ina, float *inb, float *out, size_t len_out, size_t L, size_t numThreads)
{
    thrust::device_vector<Complex> d_a_fft(L);
    thrust::device_vector<Complex> d_b_fft(L);
    thrust::device_vector<Complex> d_c_fft(L);

    Complex *raw_point_a_fft = thrust::raw_pointer_cast(&d_a_fft[0]);
    Complex *raw_point_b_fft = thrust::raw_pointer_cast(&d_b_fft[0]);
    Complex *raw_point_c_fft = thrust::raw_pointer_cast(&d_c_fft[0]);

    fft(ina, raw_point_a_fft, L);
    fft(inb, raw_point_b_fft, L);

    // 計算 d_c_fft = d_a_fft * d_b_fft;

    ifft(raw_point_c_fft, out, L);
}

最後只剩下乘法運算了,可以自己編寫一個複數乘法的核心,也可以使用thrushtransform。使用thrush實現複數乘法,首先定義一個複數乘法操作函式(可以參考Transformations):

struct complex_multiplies_functor
{
    const int N;

    complex_multiplies_functor(int _n) : N(_n) {}

    __host__ __device__ Complex operator()(const Complex &a, const Complex &b) const
    {
        Complex c;
        c.x = (a.x * b.x - a.y * b.y) / N;
        c.y = (a.x * b.y + a.y * b.x) / N;
        return c;
    }
};

然後使用thrush::transform即可完成計算:

// 計算 d_c_fft = d_a_fft * d_b_fft;
thrust::transform(d_a_fft.begin(), d_a_fft.end(), d_b_fft.begin(), d_c_fft.begin(), complex_multiplies_functor(L));

結語

本文首先簡要介紹了卷積運算,然後使用Python實現了卷積執行的程式碼,接著討論了基於FFT的快速卷積演算法,並使用Python實現了FFT卷積,接著對直接卷積和基於FFT的快速卷積演算法的效能進行了分析,從實驗結果可以看出,FFT卷積相比直接卷積具有更快的執行速度。最後,基於CUDA實現了直接卷積演算法,並且使用cuFFT和thrush在CUDA平臺實現了基於FFT的快速卷積演算法。

參考文獻