Search code examples
cudacufft

cufftSetStream causes garbage output. Am I doing something wrong?


According to the docs, the cufftSetStream() function

Associates a CUDA stream with a cuFFT plan. All kernel launches made during plan execution are now done through the associated stream [...until...] the stream is changed with another call to cufftSetStream().

Unfortunately, the results are turned into garbage. Here is an example that demonstrates this by performing a bunch of transforms two ways: once with each stream having its own dedicated plan, and once with a single plan being reused as the documentation above indicates. The former behaves as expected, the reused/cufftSetStream approach has errors in most of the transforms. This was observed on the two cards I've tried (GTX 750 ti, Titan X) on CentOS 7 linux with Cuda compilation tools, release 7.0, V7.0.27; and release 7.5, V7.5.17.

EDIT :see the "FIX" comments below for one way to fix things.

#include <cufft.h>
#include <stdexcept>
#include <iostream>
#include <numeric>
#include <vector>

#define ck(cmd) if ( cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);}


__global__
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y)
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x)
            buf[i*stride + j] = make_cuFloatComplex( (i+seed)%101 - 50,(j+seed)%41-20);
}

__global__
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) {
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) {
            float e=buf1[i*stride+j] - buf2[i*stride+j];
            if (e*e > 1) // gross error
                atomicAdd(errors,1);
        }
    }
}

void demo(bool reuse_plan)
{
    if (reuse_plan)
        std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... ";
    else
        std::cout << "Giving each stream its own dedicated fft plan ... ";
    int nfft = 1024;
    int batch = 1024;
    int nstreams = 8;
    int nbins = nfft/2+1;
    int nit=100;
    size_t inpitch,outpitch;

    std::vector<cufftComplex*> inbufs(nstreams);
    std::vector<float*> outbufs(nstreams);
    std::vector<float*> checkbufs(nstreams);
    std::vector<cudaStream_t> streams(nstreams);
    std::vector<cufftHandle> plans(nstreams);
    for (int i=0;i<nstreams;++i) {
        ck( cudaStreamCreate(&streams[i]));
        ck( cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch) );
        ck( cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch));
        ck( cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch) );
        if (i==0 || reuse_plan==false)
            ck ( cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch) );
    }

    // fill the input buffers and FFT them to get a baseline for comparison
    for (int i=0;i<nstreams;++i) {
        fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i);
        ck (cudaGetLastError());
        if (reuse_plan) {
            ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i]));
        }else{
            ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i]));
            ck( cufftSetStream(plans[i],streams[i]) ); // only need to set the stream once
        }
        ck( cudaDeviceSynchronize());
    }
    // allocate a buffer for the error count
    int * errors;
    cudaMallocHost((void**)&errors,sizeof(int)*nit);
    memset(errors,0,sizeof(int)*nit);

    /* FIX: an event can protect the plan internal buffers 
    by serializing access to the plan
    cudaEvent_t ev;
    cudaEventCreateWithFlags(&ev,cudaEventDisableTiming);
    */

    // perform the FFTs and check the outputs on streams
    for (int it=0;it<nit;++it) {
        int k = it % nstreams;
        ck( cudaStreamSynchronize(streams[k]) ); // make sure any prior kernels have completed
        if (reuse_plan) {
            // FIX: ck(cudaStreamWaitEvent(streams[k],ev,0 ) );
            ck(cufftSetStream(plans[0],streams[k]));
            ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k]));
            // FIX: ck(cudaEventRecord(ev,streams[k] ) );
        }else{
            ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k]));
        }
        check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]);
        ck (cudaGetLastError());
    }
    ck(cudaDeviceSynchronize());

    // report number of errors
    int errcount=0;
    for (int it=0;it<nit;++it)
        if (errors[it])
            ++errcount;
    std::cout << errcount << " of " << nit << " transforms had errors\n";

    for (int i=0;i<nstreams;++i) {
        cudaFree(inbufs[i]);
        cudaFree(outbufs[i]);
        cudaStreamDestroy(streams[i]);
        if (i==0 || reuse_plan==false)
            cufftDestroy(plans[i]);
    }
}

int main(int argc,char ** argv)
{
    demo(false);
    demo(true);
    return 0;
}

Typical output

Giving each stream its own dedicated fft plan ... 0 of 100 transforms had errors
Reusing the same fft plan with multiple stream via cufftSetStream ... 87 of 100 transforms had errors


Solution

  • In order to reuse plans the way you want you need to manage cuFFT work area manually.

    Each plan has a space for intermediate calculation results. If you want to use plan handle at the same time for two or more different plan executions you need to provide temporary buffer for each concurrent cufftExec* call.

    You can do this by using cufftSetWorkArea - please have a look at section 3.7 in cuFFT documentation. Section 2.2 also would help to understand how it works.

    Here's a worked example showing the changes to your code for this:

    $ cat t1241.cu
    #include <cufft.h>
    #include <stdexcept>
    #include <iostream>
    #include <numeric>
    #include <vector>
    
    #define ck(cmd) if ( cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);}
    
    
    __global__
    void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed)
    {
        for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y)
            for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x)
                buf[i*stride + j] = make_cuFloatComplex( (i+seed)%101 - 50,(j+seed)%41-20);
    }
    
    __global__
    void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors)
    {
        for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) {
            for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) {
                float e=buf1[i*stride+j] - buf2[i*stride+j];
                if (e*e > 1) // gross error
                    atomicAdd(errors,1);
            }
        }
    }
    
    void demo(bool reuse_plan)
    {
        if (reuse_plan)
            std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... ";
        else
            std::cout << "Giving each stream its own dedicated fft plan ... ";
        int nfft = 1024;
        int batch = 1024;
        int nstreams = 8;
        int nbins = nfft/2+1;
        int nit=100;
        size_t inpitch,outpitch;
    
        std::vector<cufftComplex*> inbufs(nstreams);
        std::vector<float*> outbufs(nstreams);
        std::vector<float*> checkbufs(nstreams);
        std::vector<cudaStream_t> streams(nstreams);
        std::vector<cufftHandle> plans(nstreams);
        // if plan reuse, set up independent work areas
        std::vector<char *> wk_areas(nstreams);
        for (int i=0;i<nstreams;++i) {
            ck( cudaStreamCreate(&streams[i]));
            ck( cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch) );
            ck( cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch));
            ck( cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch) );
            if (i==0 || reuse_plan==false)
                ck ( cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch) );
        }
        if (reuse_plan){
          size_t ws;
          ck(cufftGetSize(plans[0], &ws));
          for (int i = 0; i < nstreams; i++)
            ck(cudaMalloc(&(wk_areas[i]), ws));
          ck(cufftSetAutoAllocation(plans[0], 0));
          ck(cufftSetWorkArea(plans[0], wk_areas[0]));
          }
        // fill the input buffers and FFT them to get a baseline for comparison
        for (int i=0;i<nstreams;++i) {
            fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i);
            ck (cudaGetLastError());
            if (reuse_plan) {
                ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i]));
            }else{
                ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i]));
                ck( cufftSetStream(plans[i],streams[i]) ); // only need to set the stream once
            }
            ck( cudaDeviceSynchronize());
        }
        // allocate a buffer for the error count
        int * errors;
        cudaMallocHost((void**)&errors,sizeof(int)*nit);
        memset(errors,0,sizeof(int)*nit);
    
        // perform the FFTs and check the outputs on streams
        for (int it=0;it<nit;++it) {
            int k = it % nstreams;
            ck( cudaStreamSynchronize(streams[k]) ); // make sure any prior kernels have completed
            if (reuse_plan) {
                ck(cufftSetStream(plans[0],streams[k]));
                ck(cufftSetWorkArea(plans[0], wk_areas[k])); // update work area pointer in plan
                ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k]));
            }else{
                ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k]));
            }
            check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]);
            ck (cudaGetLastError());
        }
        ck(cudaDeviceSynchronize());
    
        // report number of errors
        int errcount=0;
        for (int it=0;it<nit;++it)
            if (errors[it])
                ++errcount;
        std::cout << errcount << " of " << nit << " transforms had errors\n";
    
        for (int i=0;i<nstreams;++i) {
            cudaFree(inbufs[i]);
            cudaFree(outbufs[i]);
            cudaFree(wk_areas[i]);
            cudaStreamDestroy(streams[i]);
            if (i==0 || reuse_plan==false)
                cufftDestroy(plans[i]);
        }
    }
    
    int main(int argc,char ** argv)
    {
        demo(false);
        demo(true);
        return 0;
    }
    $ nvcc -o t1241 t1241.cu -lcufft
    $ ./t1241
    Giving each stream its own dedicated fft plan ... 0 of 100 transforms had errors
    Reusing the same fft plan with multiple stream via cufftSetStream ... 0 of 100 transforms had errors
    $