Search code examples
mutexsynthesisvivado

How can I replace mutex with proper fucntion in using Vivado HLS?


Sorry in advance because I am a beginner in Vivado HLS. In my code in the following, I wanna Synthesis it, but Vivado tells me you cannot use the mutex and whatever dependent and gives me following errors.

ERROR: [SYNCHK 200-11] Global Variable 'readyQMutex' has an unsynthesizable struct type '%union.pthread_mutex_t.2.12.22 = type { %struct.__pthread_mu...' (a member pointer to struct itself).
ERROR: [SYNCHK 200-71] ../fpga_top.c:221: function 'pthread_mutex_lock' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:225: function 'pthread_cond_wait' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:237: function 'pthread_cond_signal' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:238: function 'pthread_mutex_unlock' has no function body.
ERROR: [SYNCHK 200-11] ../fpga_top.c:18: Constant 'workerInfos' has an unsynthesizable type '[4 x %struct.threadInfo.6.16.26]*' (possible cause(s): structure variable cannot be decomposed due to (1) unsupported type conversion; (2) memory copy operation; (3) function pointer used in struct; (4) unsupported pointer comparison).
ERROR: [SYNCHK 200-61] ../fpga_top.c:75: unsupported memory access on variable 'child_task_ID' which is (or contains) an array with unknown size at compile time.
ERROR: [SYNCHK 200-71] ../fpga_top.c:77: function 'pthread_mutex_init' has no function body.
INFO: [SYNCHK 200-10] 8 error(s), 0 warning(s).

I found I should write the related code to handle it by myself, if so, how and what I should write?!

#include <stdbool.h>
#include "fpga_top.h"

int outputIndex = 0;
double core_speed[CORE_MAX] = {1.0, 1.0, 1.0, 1.0};
double outputTable[WORKLOAD_MAX*TASK_COUNT_MAX][EXCEL_Column_Size];

int readyQueueHead = 0;
int readyQueueRear = 0;
int readyQueueSize = 0;
char canContinue_ = 1;
int wlCounter = 0;      
bool flag = 1;

// Add Task to assignedQueue
void addToAssignedQueue(int task_ID, int workload_ID, int q)
{
    pthread_mutex_lock(&(workerInfos[q].workerMutex));
    while( workerInfos[q].assignedQSize>=DEEP)
    {
        pthread_cond_wait(&(workerInfos[q].workerWaitHandle_Add), &(workerInfos[q].workerMutex));
    }
    int i = workerInfos[q].assignedQRear;
    workerInfos[q].assignedQueue[i].task_ID = task_ID;
    workerInfos[q].assignedQueue[i].workload_ID = workload_ID;
    workerInfos[q].assignedQRear = (workerInfos[q].assignedQRear + 1) % DEEP;
    workerInfos[q].assignedQSize++;
    // A signal to a worker waiting to read from this queue
    pthread_cond_signal(&(workerInfos[q].workerWaitHandle));
    pthread_mutex_unlock(&(workerInfos[q].workerMutex));
}

// Read from assignedQueue
struct workItem readFromAssignedQueue(int q)
{
    struct threadInfo *workerInfo_ = &workerInfos[q];
    pthread_mutex_lock(&(workerInfo_->workerMutex));

    struct workItem tas_;
    // Initialize the output values (which may not be necessary now)
    tas_.task_ID = -1;
    tas_.workload_ID = -1;
    if(workerInfo_->assignedQSize <= 0)
    {
        struct timespec time_to_wait = {10, 0}; //10 sec wait
        pthread_cond_timedwait(&(workerInfo_->workerWaitHandle), &(workerInfo_->workerMutex), &time_to_wait);
    }
    if(workerInfo_->assignedQSize >0)
    {
        // Reading the assignedQueue if data is available
        tas_ = workerInfo_->assignedQueue[workerInfo_->assignedQHead];
        // Move forward the queue head index rotationally
        workerInfos[q].assignedQHead = (workerInfos[q].assignedQHead + 1) % DEEP;
        // Decreasing the count number of queue elements
        workerInfos[q].assignedQSize--;
        pthread_cond_signal(&(workerInfos[q].workerWaitHandle_Add));
    }
    pthread_mutex_unlock(&(workerInfo_->workerMutex));
    return tas_;
}

// Add Definition of Task to DAG
void addTask(int task_ID, int parentCount, int child_task_ID[], int childCount, int processingTime)
{
    struct Task_Package_Profile *p_task_ = &(taskArray[task_ID]);
    p_task_->parentCount = parentCount;
    p_task_->childCount = childCount;
    p_task_->processingTime = processingTime;
    // Initialize the parentReady variable for all workloads
    for (int i = 0; i < WORKLOAD_MAX;i++) {p_task_->parentReady[i] = 0;}
    // Copy the child's index
    for (int i = 0; i < childCount; i++) {p_task_->child_task_ID[i] = child_task_ID[i];}
    // Make parentReady mutex
    pthread_mutex_init(&(p_task_->parentReadyMutex), NULL);
}

// DAG Definition
void initDag()
{
    int ch0[] = { 1, 2, 3, 4}; addTask( 0, 0, ch0, 4, 10000);

    int ch1[] = { 5, 6, 7, 8}; addTask( 1, 1, ch1, 4, 20000);
    int ch2[] = { 5, 6, 7, 8}; addTask( 2, 1, ch2, 4, 20000);
    int ch3[] = { 5, 6, 7, 8}; addTask( 3, 1, ch3, 4, 20000);
    int ch4[] = { 5, 6, 7, 8}; addTask( 4, 1, ch4, 4, 20000);

    int ch5[] = { 9, 10}; addTask( 5, 4, ch5, 2, 30000);
    int ch6[] = { 9, 10}; addTask( 6, 4, ch6, 2, 30000);
    int ch7[] = { 9, 10}; addTask( 7, 4, ch7, 2, 30000);
    int ch8[] = { 9, 10}; addTask( 8, 4, ch8, 2, 30000);

    int ch9[] = { 11, 12}; addTask( 9, 4, ch9, 2, 40000);
    int ch10[] = { 11, 12}; addTask( 10, 4, ch10, 2, 40000);

    int ch11[] = {}; addTask( 11, 2, ch11, 0, 50000);
    int ch12[] = {}; addTask( 12, 2, ch12, 0, 50000);

    addToReadyQueue(0, 0);              // Root task, addToReadyQueue(int task_ID, int workload_ID)
    readFromReadyQueue();
    //allocateTask(0, 0, 0);                    // allocateTask(int task_ID, int workload_ID, int core_ID)
}

// Add Task to the end of the readyQueue 
void addToReadyQueue(int task_ID, int workload_ID)
{
    pthread_mutex_lock(&readyQMutex);
    while(readyQueueSize >= READY_LOOP_DEEP)
    {
        // Waiting for the queue to be empty if there is no space
        int res = pthread_cond_wait( &readyQWaitHandleAdd, &readyQMutex);
    }
    #ifdef PRINT_ReadyQ
        printf("Task #%d (workload #%d) added to readyQueue %d:%d.\n", task_ID, workload_ID,readyQueueRear, readyQueueSize);
    #endif
    readyQueue[readyQueueRear].task_ID = task_ID;
    readyQueue[readyQueueRear].workload_ID = workload_ID;
    // Move forward the queue rear index in rotation
    readyQueueRear = (readyQueueRear + 1) % READY_LOOP_DEEP;
    // Increasing the number of the queue elements
    readyQueueSize++;
    // The signal is given to workers waiting to read from the queue
    pthread_cond_signal(&readyQWaitHandleRead);
    pthread_mutex_unlock(&readyQMutex);
}

// Read from the beginning of the readyQueue
struct workItem readFromReadyQueue()
{
    struct workItem witem;
    witem.task_ID = -1;
    witem.workload_ID = -1;

    pthread_mutex_lock(&readyQMutex);
    // Waiting to queue if empty
    while(readyQueueSize <= 0)
    {
        pthread_cond_wait( &readyQWaitHandleRead, &readyQMutex);
    }
    // Picking up from queue head
    witem = readyQueue[readyQueueHead];
    // Move forward the queue head index in rotation
    readyQueueHead = (readyQueueHead + 1) % READY_LOOP_DEEP;
    // Reduce the number of queue elements
    readyQueueSize--;
    #ifdef PRINT_ReadyQ
        printf("Task #%d (workload #%d) removed to readyQueue. %d : %d\n", witem.task_ID , witem.workload_ID, readyQueueHead, readyQueueSize);
    #endif
    // The signal is given to workers who are waiting for the queue to be empty
    pthread_cond_signal(&readyQWaitHandleAdd);
    pthread_mutex_unlock(&readyQMutex);
    return witem;
}

// Check if the reaadyQueue is empty with the corresponding mutex
int isReadyQueueEmpty()
{
    int res = 0;
    pthread_mutex_lock(&readyQMutex);
    res = (readyQueueSize == 0);
    pthread_mutex_unlock(&readyQMutex);
    return res;
}

// Assigning Task to the Worker (Cores)
struct outputsFromFPGA allocateTask(int task_ID, int workload_ID, int core_ID)
{
    if (flag == 1)
    {
        initDag();
        flag = 0;
    }
    #ifdef PRINT_AllocateTask
        printf("Task #%d (workload #%d) assigned to Core #%d;\n", task_ID, workload_ID, core_ID);
    #endif
    addToAssignedQueue( task_ID, workload_ID, core_ID);

    struct outputsFromFPGA FPGAOutputs;
    FPGAOutputs.task_ID = task_ID;
    FPGAOutputs.workload_ID = workload_ID;
    FPGAOutputs.core_ID = core_ID;
}

// Ending each task and inform the children
void taskDone(int task_ID, int workload_ID, int core_ID)
{
    struct Task_Package_Profile task_ = taskArray[task_ID];
    #ifdef PRINT_TaskDone
        printf("taskDone: Task #%d (workload #%d);\n", task_ID, workload_ID);
    #endif
    // Increase the child's parentReady variable and send the children to the ready queue if all parents are finished
    struct Task_Package_Profile *p_task_ = &(taskArray[task_ID]);
    for(int i = 0; i < p_task_->childCount; i++)
    {
        struct Task_Package_Profile *p_childTsk = &(taskArray[p_task_->child_task_ID[i]]);
        int nbParentReady = 0;
        // Increase the parentReady variable
        pthread_mutex_lock(&(p_childTsk->parentReadyMutex));
        nbParentReady = ++(p_childTsk->parentReady[workload_ID]);
        pthread_mutex_unlock(&(p_childTsk->parentReadyMutex));
        // Send the child to the ready queue if all parents are finished
        if (nbParentReady == p_childTsk->parentCount)
            addToReadyQueue(p_task_->child_task_ID[i], workload_ID);
    }
    pthread_mutex_lock(&assignQSizeCheckMutex);
    // Find the most empty assignedQueue and assign ready tasks as much as possible
    while(!isReadyQueueEmpty())
    {   // Finds the best assignedQueue
        int minQueue = 0;
        int minSize =  workerInfos[0].assignedQSize;
        for (int i = 1; i < CORE_MAX; i++)
        {
            if(workerInfos[i].assignedQSize < minSize)
            {
                minSize = workerInfos[i].assignedQSize;
                minQueue = i;
            }
        }
        // The most empty queue should be smaller than Deep so that it can be added to the queue
        if(minSize < DEEP)
        {
            struct workItem witem = readFromReadyQueue();
            struct outputsFromFPGA FPGAOutputs = allocateTask(witem.task_ID, witem.workload_ID, minQueue);
        }
        else
        {
            break;  // All assignedQueue are full
        }
    }
    pthread_mutex_unlock(&assignQSizeCheckMutex);
}

// Check the end of the program that has all the tests done
void finishCheck()
{
    if (wlCounter != WORKLOAD_MAX) return;
    for(int i = 0; i < CORE_MAX; i++)
    {
        if (workerInfos[i].assignedQSize > 0) return;
        if (workerInfos[i].coreState > 0) return;
    }
    if (!isReadyQueueEmpty()) return;
    canContinue_ = 0;
    for(int i = 0; i < CORE_MAX; i++)
        pthread_cond_signal(&(workerInfos[i].workerWaitHandle));
}

Solution

  • Thread synchronization can be done in HLS as shown in this paper for example, but it is not supported in Vivado HLS yet.

    That being said, it does not mean that it is impossible to implement your application on hardware. One approach is to implement every thread as a separate hardware kernel. Shared data can be put in another kernel, which ensures that accesses to the data are synchronized the way that you want. The kernels can communicate with the shared object via streaming interfaces. You can implement function parameters as streaming interfaces with hls::stream. After implementing each of the kernels as an IP module, you can connect them via FIFOs generated with FIFO generator in a Vivado block design.

    You could make for example a control stream from each processing kernel to the shared object that allow the kernels to send a request to access the shared object. In the shared object, you use non-blocking reads from the streams to see whether any of them wants exclusive access. Then you take write or read requests only from the control stream from the kernel that was granted exclusive access. The data associated with the reads and writes can be communicated via dedicated data streams between the kernels and shared object. When a kernel is done using the shared object, it can send a release command, and the shared object starts looking again for requests on all control streams. It takes a bit of labor, but it is a feasible solution...