Search code examples
cudasynchronizationgpgpucuda-streamscuda-events

Is there a way to block and unblock a CUDA stream arbitrarily?


I need to pause the execution of all calls in a stream from a certain point in one part of the program until another part of the program decides to unpause this stream at an arbitrary time. This is the requirement of the application I'm working on, I can't work around that. Ideally I want to use the graph API (e.g. cudaGraphAddMemcpyNode), but regular async calls (e.g. cudaMemcpyAsync) are acceptable too if graphs can't do this for some reason. From reading CUDA's documentation I thought that there is an obvious way to do this, but it turned out to be way more complicated.

This is my first attempt, distilled to a simple example:

    cudaGraphCreate(&cuda_graph_cpy, 0);
    cudaGraphAddMemcpyNode1D(&memcpy_h2d_node, cuda_graph_cpy, NULL, 0, device_buf, host_buf, BUF_SIZE * sizeof(char), cudaMemcpyDefault);
    cudaGraphAddEventWaitNode(&wait_node, cuda_graph_cpy, &memcpy_h2d_node, 1, cuda_event);
    cudaGraphAddMemcpyNode1D(&memcpy_d2h_node, cuda_graph_cpy, &wait_node, 1, host_buf, device_buf, BUF_SIZE * sizeof(char), cudaMemcpyDefault);
    cudaGraphInstantiate(&cuda_graph_exec_cpy, cuda_graph_cpy, NULL, NULL, 0);

    cudaGraphCreate(&cuda_graph_set, 0);
    cudaGraphAddMemsetNode(&memset_node, cuda_graph_set, NULL, 0, &memset_params);
    cudaGraphAddEventRecordNode(&record_set_node, cuda_graph_set, &memset_node, 1, cuda_event);
    cudaGraphInstantiate(&cuda_graph_exec_set, cuda_graph_set, NULL, NULL, 0);

    cudaGraphLaunch(cuda_graph_exec_cpy, cuda_stream_cpy);
    cudaGraphLaunch(cuda_graph_exec_set, cuda_stream_set);
    cudaStreamSynchronize(cuda_stream_cpy);

So I create and instantiate one linear graph that: does a host-to-device copy, waits for cuda_event, does a device-to-host copy. Then I create and instantiate another linear graph that: does a memset on the device memory, records cuda_event.

After that I launch the first graph on cuda_stream_cpy, then launch the second graph on cuda_stream_set, then synchronize on cuda_stream_cpy.

In the end I expected to modify host_buf, but instead it is left untouched because the first graph/stream didn't actually wait for anything and proceeded with the second copy immediately.

After rewriting the code with regular async calls instead of graphs and getting the same behavior, reading everything I could find in Google on this topic, and experimenting with flags and adding more cudaEventRecord/cudaGraphAddEventRecordNode calls in different places, I realized that the event semantics of CUDA doesn't seem to be capable of the behavior I need? The issue seems to be that both record and wait calls have to be made around the same time, and it's impossible to decouple them. If there is no event record enqueued yet, the wait async call or graph node doesn't block the stream, and the stream keeps going.

So what I would like to do is to either replace cudaGraphAddEventWaitNode/cudaGraphAddEventRecordNode in the code sample above, or add something to the sample so that the code works the way I described: the wait node actually blocks the stream until the record node (or its replacement?) unblocks it.

I also found in CUDA something called the "external semaphores" that could be doing what I want (with cudaGraphAddExternalSemaphoresWaitNode/cudaGraphAddExternalSemaphoresSignalNode instead) but they seem to be impossible to create without also using Vulkan or DirectX, which is something I can't bring into the application. I tried to pass a shared memory object's file descriptor to cudaImportExternalSemaphore for cudaExternalSemaphoreHandleTypeOpaqueFd, but that didn't work.

EDIT 1: I tried to integrate the wait kernel suggested by @RobertCrovella into my prototype, but it gets stuck on the first graph's launch. Here's the reproducer:

#include "cuda_runtime_api.h"

#include <stdio.h>
#include <stdlib.h>

#define BUF_SIZE 1024
#define TEST_POS_OLD 512
#define TEST_POS_NEW 10
#define OLD_VAL 5
#define NEW_VAL 23

#define CUDA_CHKERR(x) res = x; if (res != cudaSuccess) goto fail;


__global__ void wait_kernel(volatile unsigned char *event, unsigned char val)
{
    while (*event == val);
}

int main()
{
    cudaError_t res = cudaSuccess;
    const char *err_str = NULL;
    const char *err_name = NULL;

    cudaStream_t cuda_stream_cpy;
    cudaStream_t cuda_stream_set;

    cudaGraph_t cuda_graph_cpy;
    cudaGraphExec_t cuda_graph_exec_cpy;
    cudaGraph_t cuda_graph_set;
    cudaGraphExec_t cuda_graph_exec_set;

    cudaGraphNode_t memcpy_h2d_node;
    cudaGraphNode_t memcpy_d2h_node;
    cudaGraphNode_t memset_node;
    cudaGraphNode_t signal_node;
    cudaGraphNode_t wait_node;

    unsigned char *event;
    unsigned char test = 0;

    dim3 grid(1,1,1);
    dim3 block(1,1,1);
    struct cudaKernelNodeParams kernel_node_params = {};
    struct cudaMemsetParams memset_params = {};
    void *wait_kernel_args[2] = {(void *) &event, (void *) &test};

    char *host_buf = NULL;
    void *device_buf = NULL;

    printf("Creating the event...\n");
    CUDA_CHKERR(cudaMalloc(&event, sizeof(event[0])));
    printf("cudaMalloc\n");
    CUDA_CHKERR(cudaMemset(event, 0, sizeof(event[0])));
    printf("cudaMemset\n");

    printf("Allocating the host buffer and setting the test value...\n");
    host_buf = (char *) malloc(BUF_SIZE * sizeof(char));
    for (int i = 0; i < BUF_SIZE; i++) {
        host_buf[i] = OLD_VAL;
    }

    CUDA_CHKERR(cudaMalloc(&device_buf, BUF_SIZE * sizeof(char)));
    printf("cudaMalloc\n");

    CUDA_CHKERR(cudaStreamCreate(&cuda_stream_cpy));
    printf("cudaStreamCreate cpy\n");
    CUDA_CHKERR(cudaStreamCreate(&cuda_stream_set));
    printf("cudaStreamCreate set\n");

    CUDA_CHKERR(cudaGraphCreate(&cuda_graph_cpy, 0));
    printf("cudaGraphCreate cpy\n");

    CUDA_CHKERR(cudaGraphAddMemcpyNode1D(&memcpy_h2d_node, cuda_graph_cpy, NULL, 0, device_buf, host_buf, BUF_SIZE * sizeof(char), cudaMemcpyDefault));
    printf("cudaGraphAddMemcpyNode1D H2D\n");
    memset(&kernel_node_params, 0, sizeof(cudaKernelNodeParams));
    kernel_node_params.func = (void *)wait_kernel;
    kernel_node_params.gridDim = grid;
    kernel_node_params.blockDim = block;
    kernel_node_params.sharedMemBytes = 0;
    kernel_node_params.kernelParams = wait_kernel_args;
    kernel_node_params.extra = NULL;
    CUDA_CHKERR(cudaGraphAddKernelNode(&wait_node, cuda_graph_cpy, &memcpy_h2d_node, 1, &kernel_node_params));
    printf("cudaGraphAddKernelNode (wait)\n");
    CUDA_CHKERR(cudaGraphAddMemcpyNode1D(&memcpy_d2h_node, cuda_graph_cpy, &wait_node, 1, host_buf, device_buf, BUF_SIZE * sizeof(char), cudaMemcpyDefault));
    printf("cudaGraphAddMemcpyNode1D D2H\n");

    CUDA_CHKERR(cudaGraphInstantiate(&cuda_graph_exec_cpy, cuda_graph_cpy, NULL, NULL, 0));
    printf("cudaGraphInstantiate cpy\n");

    CUDA_CHKERR(cudaGraphCreate(&cuda_graph_set, 0));
    printf("cudaGraphCreate set\n");

    memset(&memset_params, 0, sizeof(cudaMemsetParams));
    memset_params.dst = device_buf;
    memset_params.value = NEW_VAL;
    memset_params.pitch = 0;
    memset_params.elementSize = sizeof(char);
    memset_params.width = 512;
    memset_params.height = 1;
    CUDA_CHKERR(cudaGraphAddMemsetNode(&memset_node, cuda_graph_set, NULL, 0, &memset_params));
    printf("cudaGraphAddMemsetNode\n");
    memset(&memset_params, 0, sizeof(cudaMemsetParams));
    memset_params.dst = event;
    memset_params.value = 1;
    memset_params.pitch = 0;
    memset_params.elementSize = 1;
    memset_params.width = 1;
    memset_params.height = 1;
    CUDA_CHKERR(cudaGraphAddMemsetNode(&signal_node, cuda_graph_set, &memset_node, 1, &memset_params));
    printf("cudaGraphAddMemsetNode (signal)\n");

    CUDA_CHKERR(cudaGraphInstantiate(&cuda_graph_exec_set, cuda_graph_set, NULL, NULL, 0));
    printf("cudaGraphInstantiate set\n");

    CUDA_CHKERR(cudaGraphLaunch(cuda_graph_exec_cpy, cuda_stream_cpy));
    printf("cudaGraphLaunch cpy\n");
    CUDA_CHKERR(cudaGraphLaunch(cuda_graph_exec_set, cuda_stream_set));
    printf("cudaGraphLaunch set\n");
    CUDA_CHKERR(cudaStreamSynchronize(cuda_stream_cpy));
    printf("cudaStreamSynchronize cpy\n");

    CUDA_CHKERR(cudaGraphExecDestroy(cuda_graph_exec_cpy));
    printf("cudaGraphExecDestroy\n");
    CUDA_CHKERR(cudaGraphExecDestroy(cuda_graph_exec_set));
    printf("cudaGraphExecDestroy\n");
    CUDA_CHKERR(cudaGraphDestroy(cuda_graph_cpy));
    printf("cudaGraphDestroy\n");
    CUDA_CHKERR(cudaGraphDestroy(cuda_graph_set));
    printf("cudaGraphDestroy\n");

    CUDA_CHKERR(cudaStreamDestroy(cuda_stream_cpy));
    printf("cudaStreamDestroy cpy\n");
    CUDA_CHKERR(cudaStreamDestroy(cuda_stream_set));
    printf("cudaStreamDestroy set\n");

    if (host_buf[TEST_POS_OLD] == OLD_VAL) {
        printf("host_buf[TEST_POS_OLD] is correct.\n");
    } else {
        printf("host_buf[TEST_POS_OLD] is not correct!\n");
    }
    if (host_buf[TEST_POS_NEW] == NEW_VAL) {
        printf("host_buf[TEST_POS_NEW] is correct.\n");
    } else {
        printf("host_buf[TEST_POS_NEW] is not correct!\n");
        if (host_buf[TEST_POS_OLD] == host_buf[TEST_POS_NEW]) printf("They are equal!\n");
    }

    return 0;
 fail:
    err_name = cudaGetErrorName(res);
    err_str = cudaGetErrorString(res);
    printf("%s: %s\n", err_name, err_str);
    return 1;
}

EDIT 2: The issue was indeed the host memory allocation, with that correction my code works properly.


Solution

  • Although the comments say otherwise, you are effectively building a dependency between the two graphs. With a bit of refactoring, my suggestion would be to combine those activities into one graph, and express the dependency using the mechanisms available in graph capture.

    However with the goals:

    1. Two graphs
    2. (apparently) using the graph API (not stream capture)

    we could realize this in a fashion similar to mentioned stream memops (in the comments, only available in the driver API). Basically, we create a kernel that is waiting on device memory location, to synchronize one graph to another. The graph that is waiting will launch the kernel to synchronize. The other graph signals via a memset node.

    Here is an example:

    $ cat t2217.cu
    #include <iostream>
    #include <vector>
    #include <cstdio>
    #include <cstdlib>
    #define cudaCheckErrors(msg) \
        do { \
            cudaError_t __err = cudaGetLastError(); \
            if (__err != cudaSuccess) { \
                fprintf(stderr, "Fatal error: %s (%s at %s:%d)\n", \
                    msg, cudaGetErrorString(__err), \
                    __FILE__, __LINE__); \
                fprintf(stderr, "*** FAILED - ABORTING\n"); \
                exit(1); \
            } \
        } while (0)
    
    __global__ void calc1kernel(float *data, float val, size_t n){
    
      size_t idx = threadIdx.x+blockDim.x*blockIdx.x;
      while (idx < n){
        data[idx] += val;
        idx += gridDim.x*blockDim.x;}
    }
    __global__ void calc2kernel(float *data, float val, size_t n){
    
      size_t idx = threadIdx.x+blockDim.x*blockIdx.x;
      while (idx < n){
        data[idx] *= val;
        idx += gridDim.x*blockDim.x;}
    }
    __global__ void waitkernel(volatile unsigned char *signal, unsigned char val){
    
      while (*signal == val);
    }
    
    // CUDA Graph 1:
    //                calc1kernelnode
    //                     |
    //                memsetnode
    // CUDA Graph 2:
    //                waitkernel
    //                     |
    //                calc2kernelnode
    
    
    int main(int argc, char *argv[]){
    
      size_t data_size = 32;
      cudaStream_t s1, s2;
      cudaGraph_t g1, g2;
      float *data, val;
      unsigned char *sig;
      // allocate for data on the device
      cudaMalloc(&data, data_size*sizeof(data[0]));
      cudaCheckErrors("CUDAMalloc failure");
      cudaMalloc(&sig, sizeof(sig[0]));
      cudaCheckErrors("CUDAMalloc failure");
      cudaMemset(sig, 0, sizeof(sig[0]));
      cudaCheckErrors("CUDAMemset failure");
      cudaMemset(data, 0, data_size*sizeof(data[0]));
      cudaCheckErrors("CUDAMemset failure");
      // create the graph
      cudaGraphCreate(&g1, 0);
      cudaCheckErrors("CUDAGraphCreate failure");
      cudaGraphCreate(&g2, 0);
      cudaCheckErrors("CUDAGraphCreate failure");
      cudaStreamCreate(&s1);
      cudaCheckErrors("CUDAStreamCreate failure");
      cudaStreamCreate(&s2);
      cudaCheckErrors("CUDAStreamCreate failure");
      dim3 grid(1,1,1);
      dim3 block(1,1,1);
      cudaGraphNode_t calc1kernelnode, calc2kernelnode, waitkernelnode, memsetnode;
      // add nodes and their dependencies to the first graph
      cudaKernelNodeParams kernelNodeParams = {0};
      // first add calc1kernelnode, which has no dependencies
      val = 3.0f;
      memset(&kernelNodeParams, 0, sizeof(cudaKernelNodeParams));
      void *kernelargs[3] = {(void *)&data, (void *)&val, (void *)&data_size};
      kernelNodeParams.func = (void *)calc1kernel;
      kernelNodeParams.gridDim = grid;
      kernelNodeParams.blockDim = block;
      kernelNodeParams.sharedMemBytes = 0;
      kernelNodeParams.kernelParams = kernelargs;
      kernelNodeParams.extra = NULL;
      cudaGraphAddKernelNode(&calc1kernelnode, g1, NULL, 0, &kernelNodeParams);
      cudaCheckErrors("CUDAGraphAddKernelNode failure");
      // now add the memsetnode, which has 1 dependency on calc1kernelnode
      cudaMemsetParams memsetParams = {0};
      memset(&memsetParams, 0, sizeof(cudaMemsetParams));
      memsetParams.dst = sig;
      memsetParams.elementSize = 1;
      memsetParams.height = 1;
      memsetParams.pitch = 1;
      memsetParams.value = 1;
      memsetParams.width = 1;
      cudaGraphAddMemsetNode(&memsetnode, g1, &calc1kernelnode, 1, &memsetParams);
      cudaCheckErrors("CUDAGraphAddMemsetNode failure");
      // graph 1 is now defined, next step is to instantiate an executable version of it
      size_t num_nodes = 0;
      cudaGraphNode_t *nodes1 = NULL;
      cudaGraphGetNodes(g1, nodes1, &num_nodes);
      cudaCheckErrors("CUDAGraphGetNodes failure");
      printf("graph 1 num nodes: %lu\n", num_nodes);
      cudaGraphExec_t graphExec1, graphExec2;
      cudaGraphInstantiate(&graphExec1, g1, NULL, NULL, 0);
      cudaCheckErrors("CUDAGraphInstantiate failure");
      // add nodes and their dependencies to the second graph
      // first add waitkernelnode, which has no dependencies
      unsigned char test = 0;
      memset(&kernelNodeParams, 0, sizeof(cudaKernelNodeParams));
      void *waitkernelargs[2] = {(void *) &sig, (void *) &test };
      kernelNodeParams.func = (void *)waitkernel;
      kernelNodeParams.gridDim = grid;
      kernelNodeParams.blockDim = block;
      kernelNodeParams.sharedMemBytes = 0;
      kernelNodeParams.kernelParams = waitkernelargs;
      kernelNodeParams.extra = NULL;
      cudaGraphAddKernelNode(&waitkernelnode, g2, NULL, 0, &kernelNodeParams);
      cudaCheckErrors("CUDAGraphAddKernelNode failure");
      // now add the calc2kernelnode, which has 1 dependency on waitkernelnode
      memset(&kernelNodeParams, 0, sizeof(cudaKernelNodeParams));
      kernelNodeParams.func = (void *)calc2kernel;
      kernelNodeParams.gridDim = grid;
      kernelNodeParams.blockDim = block;
      kernelNodeParams.sharedMemBytes = 0;
      kernelNodeParams.kernelParams = kernelargs;
      kernelNodeParams.extra = NULL;
      cudaGraphAddKernelNode(&calc2kernelnode, g2, &waitkernelnode, 1, &kernelNodeParams);
      cudaCheckErrors("CUDAGraphAddKernelNode failure");
      // graph 2 is now defined, next step is to instantiate an executable version of it
      cudaGraphNode_t *nodes2 = NULL;
      cudaGraphGetNodes(g2, nodes2, &num_nodes);
      cudaCheckErrors("CUDAGraphGetNodes failure");
      printf("graph 2 num nodes: %lu\n", num_nodes);
      cudaGraphInstantiate(&graphExec2, g2, NULL, NULL, 0);
      cudaCheckErrors("CUDAGraphInstantiate failure");
      // now launch the graphs
      cudaGraphLaunch(graphExec2, s2);
      cudaCheckErrors("CUDAGraphLaunch failure");
      cudaGraphLaunch(graphExec1, s1);
      cudaCheckErrors("CUDAGraphLaunch failure");
      cudaStreamSynchronize(s1);
      cudaCheckErrors("graph execution failure");
      cudaStreamSynchronize(s2);
      cudaCheckErrors("graph execution failure");
      float *result = new float[data_size];
      cudaMemcpy(result, data, data_size*sizeof(float), cudaMemcpyDeviceToHost);
      std::cout << "result[0] = " << result[0] << std::endl;
      // clean up
      cudaFree(data);
      cudaStreamDestroy(s1);
      cudaGraphDestroy(g1);
      cudaGraphExecDestroy(graphExec1);
      cudaStreamDestroy(s2);
      cudaGraphDestroy(g2);
      cudaGraphExecDestroy(graphExec2);
    }
    $ nvcc -o t2217 t2217.cu
    $ ./t2217
    graph 1 num nodes: 2
    graph 2 num nodes: 2
    result[0] = 9
    $
    

    The result of 9 indicates that even though graph 2 was launched first, it successfully waited until the synchronization point in graph 1, before it allowed its calc kernel to run.

    The given example (in the question) shows use of the runtime API, as does my answer.

    If you want to use the driver API, as already indicated in the comments, it should be possible to do this directly via batched memops using cuGraphAddBatchMemOpNode. A memset node, or similar, is also still needed.

    This kind of interlock is something that can give rise to hangs and deadlocks if used improperly. Note the various warnings given:

    Warning: Improper use of this API may deadlock the application. Synchronization ordering established through this API is not visible to CUDA. CUDA tasks that are (even indirectly) ordered by this API should also have that order expressed with CUDA-visible dependencies such as events. ...

    Regarding your EDIT 1:

    If I change this:

    host_buf = (char *) malloc(BUF_SIZE * sizeof(char));
    

    to this:

    CUDA_CHKERR(cudaHostAlloc(&host_buf, BUF_SIZE*sizeof(char), cudaHostAllocDefault));
    

    your code runs correctly for me. In CUDA, in order for a D->H or H->D transfer to be (guaranteed to be) asynchronous and non-blocking, the host buffer must be a pinned buffer. When we apply this to graphs, the requirement is more stringent:

    General requirements:

    Memcpy nodes:

    Only copies involving device memory and/or pinned device-mapped host memory are permitted.

    (emphasis added)

    memory allocated with malloc is neither device memory, nor is it pinned device-mapped host memory.