Search code examples
cparallel-processingfunctional-programmingarray-mappurely-functional

How to implement functional parallel-map in c?


A functional map is a function that applies a callback function to each element in an array and returns a list of callback return values. For example, in pseudocode, map(["hello", "world"], fn(x) => x + " meow") would return ["hello meow", "world meow"]

Since function pointers can be passed as parameters in C, it is possible to implement a functional map like below:

void** fp_map(void** array, size_t len, void* (*execute)(void*))
{
    // Allocate memory for return items
    void** returns = malloc(sizeof(void*) * len);
    if (returns == NULL) err(42, "Malloc failed, buy more ram");

    // Map values
    for (int i = 0; i < len; ++i)
        returns[i] = execute(array[i]);

    return returns;
}

If I write the following anonymous function in my main method, it would map ["hello", "world"] to ["hello meow", "world meow"]:

int main() {
    char* arr[] = {"hello", "world"};

    char** arr2 = fp_map((void**) arr, 2, ({ void* _func_ (void* x) {
        char* buf = malloc(sizeof(char) * (strlen(x) + 7));
        strcpy(buf, x);
        strcat(buf, " meow");
        return buf;
    }; _func_; }));

    for (int i = 0; i < 3; ++i)
        printf("%s\n", arr2[i]);
}

Now, I want to implement a parallel map to speed things up. Since this is purely functional, calls to the callback function with the same parameters would return the same return values. How can I use multithreading so that each call to execute() runs on a different thread, but still have the results return in an ordered array?


Solution

  • I have written the following code, in which I create a context for the thread, then for every calculation I spawn a separate thread. Join all the threads and return the value.

    #include <errno.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <threads.h>
    
    #define ERRORON(expr) \
        do { \
            if (expr) { \
                fprintf(stderr, "ERROR: %s\n", #expr); \
                exit(1); \
            } \
        } while (0)
    
    #define ARRLEN(x) (sizeof(x) / sizeof(*x))
    
    struct mythread_context {
        void **returns;
        void *(*execute)(void *);
        void **array;
        size_t i;
    };
    
    int mythread(void *arg) {
        const struct mythread_context *ctx = arg;
        // Execute the stuff to execute.
        ctx->returns[ctx->i] = ctx->execute(ctx->array[ctx->i]);
        return 0;
    }
    
    void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
        // Allocate memory for return items
        void **returns = malloc(sizeof(*returns) * len);
        ERRORON(!returns);
        // Allocate memory for threads and contextes.
        thrd_t *threads = malloc(sizeof(*threads) * len);
        ERRORON(!threads);
        struct mythread_context *ctxs = malloc(sizeof(*ctxs) * len);
        ERRORON(!ctxs);
        for (size_t i = 0; i < len; ++i) {
            const struct mythread_context thisctx = {
                .returns = returns,
                .execute = execute,
                .array = array,
                .i = i,
            };
            ctxs[i] = thisctx;
            // Start a thread for every returns, execute and array index.
            int ret = thrd_create(&threads[i], mythread, &ctxs[i]);
            ERRORON(ret != thrd_success);
        }
        for (size_t i = 0; i < len; ++i) {
            // Join all threads. They will assing to returns separately concurrently.
            int ret = thrd_join(threads[i], NULL);
            ERRORON(ret != thrd_success);
        }
        free(threads);
        free(ctxs);
        return returns;
    }
    
    void *appnend_to_char(void *x) {
        char *buf = malloc(sizeof(char) * (strlen(x) + 7));
        strcpy(buf, x);
        strcat(buf, " meow");
        return buf;
    }
    
    int main() {
        const char *arr[] = {"hello", "world"};
        char **arr2 = (char **)fp_map((void **)arr, ARRLEN(arr), appnend_to_char);
        for (size_t i = 0; i < ARRLEN(arr); ++i) {
            printf("%s\n", arr2[i]);
        }
        // free memory
        for (size_t i = 0; i < ARRLEN(arr); ++i) {
            free(arr2[i]);
        }
        free(arr2);
    }
    

    Alternatively, you can just seamlessly integrate with OpenMP, with just:

    void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
        void **returns = malloc(sizeof(*returns) * len);
        ERRORON(!returns);
        size_t i;
        #pragma omp parallel for
        for (size_t i = 0; i < len; ++i) {
            returns[i] = execute(array[i]);
        }
        return returns;
    }
    

    Notes:

    • ({ is a GCC extension, not part of C language. There are no lambdas or anonymous functions in C programming langauge.
    • I am not sure if C11 threads.h should be used or rather POSIX pthreads should be preferred. The interface is very similar.
    • The context is rather big, it could be optimized. The count of malloc coudl also be optimized.