I'm setting up a prototype for a DAQ system for Zynq FPGAs. I receive data from a server through ethernet, write it to a FIFO using the DMA and viceversa using two different pthreads. However, threads work correctly only if printf are executed. I expect there is a memory leak or some lines leading to a undefined behaviour, but I can't spot it.
Placing output to stderr has the same result. Changing addresses does nothing different.
Sorry for the shameful code, but I tried replacing almost every line by now to spot the problem.
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <termios.h>
#include <sys/mman.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <sys/stat.h>
#include <stdint.h>
#include <errno.h>
#include <time.h>
#define PORT 8080
#define SA struct sockaddr
#define MM2S_CONTROL_REGISTER 0x00
#define MM2S_STATUS_REGISTER 0x04
#define MM2S_START_ADDRESS 0x18
#define MM2S_LENGTH 0x28
#define S2MM_CONTROL_REGISTER 0x30
#define S2MM_STATUS_REGISTER 0x34
#define S2MM_DESTINATION_ADDRESS 0x48
#define S2MM_LENGTH 0x58
#define VIRT_ADDR 0x40400000
#define FIFO_ADDR 0x0c000000
#define SEND_ADDR 0x0a000000
#define RECV_ADDR 0x0b000000
#define TIMEOUT 10
#define MAX_TRANSF 12274
unsigned int dma_set(unsigned int *dma_virtual_address, int offset, unsigned int value);
unsigned int dma_get(unsigned int *dma_virtual_address, int offset);
int dma_mm2s_sync(unsigned int *dma_virtual_address);
int dma_s2mm_sync(unsigned int *dma_virtual_address);
unsigned int dma_s2mm_status(unsigned int *dma_virtual_address);
unsigned int dma_mm2s_status(unsigned int *dma_virtual_address);
void memdump(void *virtual_address, int byte_count);
void print_status(unsigned int status);
unsigned long int elements = 0;
clock_t start_time;
typedef struct arg_struct {
unsigned int *virtual_address;
unsigned int *virtual_buffer;
unsigned int *head;
unsigned int *tail;
unsigned int buffsize;
unsigned int fifosize;
unsigned int sockfd;
pthread_mutex_t *lock;
char verbose;
} arguments;
unsigned int dma_set(unsigned int *dma_virtual_address, int offset, unsigned int value) {
dma_virtual_address[offset >> 2] = value;
}
unsigned int dma_get(unsigned int *dma_virtual_address, int offset) {
return dma_virtual_address[offset >> 2];
}
int dma_mm2s_sync(unsigned int *dma_virtual_address) {
unsigned int mm2s_status = dma_get(dma_virtual_address, MM2S_STATUS_REGISTER);
while (!(mm2s_status & 1 << 12) || !(mm2s_status & 1 << 1)) {
dma_s2mm_status(dma_virtual_address);
dma_mm2s_status(dma_virtual_address);
mm2s_status = dma_get(dma_virtual_address, MM2S_STATUS_REGISTER);
}
}
int dma_s2mm_sync(unsigned int *dma_virtual_address) {
unsigned int s2mm_status = dma_get(dma_virtual_address, S2MM_STATUS_REGISTER);
while (!(s2mm_status & 1 << 12) || !(s2mm_status & 1 << 1)) {
dma_s2mm_status(dma_virtual_address);
dma_mm2s_status(dma_virtual_address);
s2mm_status = dma_get(dma_virtual_address, S2MM_STATUS_REGISTER);
}
}
unsigned int dma_s2mm_status(unsigned int *dma_virtual_address) {
return dma_get(dma_virtual_address, S2MM_STATUS_REGISTER);
}
unsigned int dma_mm2s_status(unsigned int *dma_virtual_address) {
return dma_get(dma_virtual_address, MM2S_STATUS_REGISTER);
}
void print_mm2s_status(unsigned int status) {
fprintf(stderr, "[*] Stream to memory-mapped status (0x%08x@0x%02x):", status, S2MM_STATUS_REGISTER);
print_status(status);
}
void print_s2mm_status(unsigned int status) {
fprintf(stderr, "[*] Memory-mapped to stream status (0x%08x@0x%02x):", status, MM2S_STATUS_REGISTER);
print_status(status);
}
void print_status(unsigned int status) {
if (status & 0x00000001) fprintf(stderr, " halted");
else fprintf(stderr, " running");
if (status & 0x00000002) fprintf(stderr, " idle");
if (status & 0x00000008) fprintf(stderr, " SGIncld");
if (status & 0x00000010) fprintf(stderr, " DMAIntErr");
if (status & 0x00000020) fprintf(stderr, " DMASlvErr");
if (status & 0x00000040) fprintf(stderr, " DMADecErr");
if (status & 0x00000100) fprintf(stderr, " SGIntErr");
if (status & 0x00000200) fprintf(stderr, " SGSlvErr");
if (status & 0x00000400) fprintf(stderr, " SGDecErr");
if (status & 0x00001000) fprintf(stderr, " IOC_Irq");
if (status & 0x00002000) fprintf(stderr, " Dly_Irq");
if (status & 0x00004000) fprintf(stderr, " Err_Irq");
fprintf(stderr, "\n");
}
void memdump(void *virtual_address, int byte_count) {
char * p = virtual_address;
int offset;
for (offset = 0; offset < byte_count; offset++) {
fprintf(stderr, "%02x", p[offset]);
if (offset % 4 == 3) {
fprintf(stderr, " ");
}
}
}
void DMATransfer(unsigned int *virtual_address, long unsigned int src, long unsigned int dest, unsigned int length, char verbose) {
unsigned int s2mm_status = 0;
unsigned int mm2s_status = 0;
dma_set(virtual_address, S2MM_CONTROL_REGISTER, 4);
dma_set(virtual_address, MM2S_CONTROL_REGISTER, 4);
if (verbose > 0) {
print_s2mm_status(dma_s2mm_status(virtual_address));
print_mm2s_status(dma_mm2s_status(virtual_address));
}
dma_set(virtual_address, S2MM_CONTROL_REGISTER, 0);
dma_set(virtual_address, MM2S_CONTROL_REGISTER, 0);
if (verbose > 0) {
print_s2mm_status(dma_s2mm_status(virtual_address));
print_mm2s_status(dma_mm2s_status(virtual_address));
}
dma_set(virtual_address, S2MM_DESTINATION_ADDRESS, dest);
dma_set(virtual_address, MM2S_START_ADDRESS, src);
if (verbose > 0) {
print_s2mm_status(dma_s2mm_status(virtual_address));
print_mm2s_status(dma_mm2s_status(virtual_address));
}
dma_set(virtual_address, S2MM_CONTROL_REGISTER, 0xf001);
dma_set(virtual_address, MM2S_CONTROL_REGISTER, 0xf001);
if (verbose > 0) {
print_s2mm_status(dma_s2mm_status(virtual_address));
print_mm2s_status(dma_mm2s_status(virtual_address));
}
dma_set(virtual_address, S2MM_LENGTH, length);
dma_set(virtual_address, MM2S_LENGTH, length);
if (verbose > 0) {
print_s2mm_status(dma_s2mm_status(virtual_address));
print_mm2s_status(dma_mm2s_status(virtual_address));
}
dma_mm2s_sync(virtual_address);
dma_s2mm_status(virtual_address);
if (verbose > 0) {
print_s2mm_status(dma_s2mm_status(virtual_address));
print_mm2s_status(dma_mm2s_status(virtual_address));
}
}
int GetCPULoad() {
int FileHandler;
char FileBuffer[1024];
float load;
FileHandler = open("/proc/loadavg", O_RDONLY);
if(FileHandler < 0) {
return -1;
}
read(FileHandler, FileBuffer, sizeof(FileBuffer) - 1);
sscanf(FileBuffer, "%f", &load);
close(FileHandler);
return (int)(load * 100);
}
void *sender(void *params) {
arguments *args = params;
if (args->head == NULL) {
fprintf(stderr, "[-] Head pointer not valid\n");
exit(0);
}
if (args->tail == NULL) {
fprintf(stderr, "[-] Tail pointer not valid\n");
exit(0);
}
if (args->virtual_address == NULL) {
fprintf(stderr, "[-] AXI DMA register pointer not valid\n");
exit(0);
}
if (args->virtual_buffer == NULL) {
fprintf(stderr, "[-] Send buffer pointer not valid\n");
exit(0);
}
unsigned long int units_sent = 0;
unsigned int myhead = 0;
unsigned int mytail = 0;
for (;;) {
pthread_mutex_lock(args->lock);
myhead = *(args->head);
mytail = *(args->tail);
pthread_mutex_unlock(args->lock);
fprintf(stderr, "[*] Send Head: %d Tail: %d\n", myhead, mytail);
if (myhead != mytail) {
int remaining = args->buffsize;
int sent = 0;
int src = FIFO_ADDR + mytail * args->buffsize;
if (args->verbose > 2) {
fprintf(stderr, "[*] Sender: DMA is transferring data from 0x%x to 0x%x\n", src, SEND_ADDR);
}
unsigned int length = args->buffsize;
unsigned int verb = args->verbose > 2 ? 1 : 0;
pthread_mutex_lock(args->lock);
while (remaining > 0) {
length = remaining < MAX_TRANSF ? remaining : remaining % MAX_TRANSF;
DMATransfer(args->virtual_address, src + sent, SEND_ADDR, length, verb);
remaining -= args->buffsize;
sent += remaining;
}
pthread_mutex_unlock(args->lock);
elements--;
units_sent++;
if (args->verbose > 2) {
fprintf(stderr, "[*] %f elements in FIFO: %lu\n", ((double)(clock() - start_time)) / CLOCKS_PER_SEC, elements);
fprintf(stderr, "[*] %f DMA tranfer to buffer: %d\n", ((double)(clock() - start_time)) / CLOCKS_PER_SEC, units_sent);
}
remaining = args->buffsize;
sent = 0;
int result = 0;
pthread_mutex_lock(args->lock);
while (remaining > 0) {
result = send(args->sockfd, args->virtual_buffer + sent, remaining, 0);
if (result > 0) {
remaining -= result;
sent += remaining;
} else if (result < 0) {
fprintf(stderr, "[-] Error retrieving configuration from the server\n");
exit(0);
}
}
*(args->tail) = (mytail + 1) % (args->fifosize + 1);
pthread_mutex_unlock(args->lock);
//memset(args->virtual_buffer, 0, args->buffsize);
if (args->verbose > 2) {
fprintf(stderr, "[*] %f Unit sent: %d\n", ((double)(clock() - start_time)) / CLOCKS_PER_SEC, units_sent);
}
if (args->verbose > 0) {
fprintf(stderr, "[*] Packet retrieved");
}
if (args->verbose > 1) {
fprintf(stderr, " content: ");
memdump(args->virtual_buffer, args->buffsize);
}
if (args->verbose > 0) {
fprintf(stderr, "\n");
}
if (args->verbose > 2) {
fprintf(stderr, "[*] %f CPU Usage: %d\n", ((double)(clock() - start_time)) / CLOCKS_PER_SEC, GetCPULoad());
}
}
printf("0000000000000000000000000\n");
}
}
void *receiver(void *params) {
arguments *args = params;
if (args->head == NULL) {
fprintf(stderr, "[-] Head pointer not valid\n");
exit(0);
}
if (args->tail == NULL) {
fprintf(stderr, "[-] Tail pointer not valid\n");
exit(0);
}
if (args->virtual_address == NULL) {
fprintf(stderr, "[-] AXI DMA register pointer not valid\n");
exit(0);
}
if (args->virtual_buffer == NULL) {
fprintf(stderr, "[-] Recv buffer pointer not valid\n");
exit(0);
}
unsigned long int units_received = 0;
unsigned int myhead = 0;
unsigned int mytail = 0;
for (;;) {
pthread_mutex_lock(args->lock);
myhead = *(args->head);
mytail = *(args->tail);
pthread_mutex_unlock(args->lock);
fprintf(stderr, "[*] Recv Head: %d Tail: %d\n", myhead, mytail);
if (mytail != myhead + 1) {
int remaining = args->buffsize;
int received = 0;
int result = 0;
pthread_mutex_lock(args->lock);
while (remaining > 0) {
result = recv(args->sockfd, args->virtual_buffer + received, remaining, 0);
fprintf(stderr, "[*] Recv result: %d\n", result);
if (result > 0) {
remaining -= result;
received += result;
} else if (result == 0) {
fprintf(stderr, "[-] Remote side closed his end of the connection before all data was received\n");
exit(0);
} else if (result < 0) {
fprintf(stderr, "[-] Error retrieving configuration from the server\n");
exit(0);
}
}
printf("++++++++++++++++++++++++++++\n");
pthread_mutex_unlock(args->lock);
units_received++;
if (args->verbose > 2) {
fprintf(stderr, "[*] %f Unit recv: %d\n", ((double)(clock() - start_time)) / CLOCKS_PER_SEC, units_received);
}
remaining = args->buffsize;
received = 0;
int dest = FIFO_ADDR + myhead * args->buffsize;
if (args->verbose > 2) {
fprintf(stderr, "[*] Receiver: DMA is transferring data from 0x%x to 0x%x\n", RECV_ADDR, dest);
}
unsigned int length = args->buffsize;
unsigned int verb = args->verbose > 2 ? 1 : 0;
pthread_mutex_lock(args->lock);
while (remaining > 0) {
printf("############################\n");
length = remaining < MAX_TRANSF ? remaining : remaining % MAX_TRANSF;
DMATransfer(args->virtual_address, RECV_ADDR, dest + received, length, verb);
remaining -= args->buffsize;
received += args->buffsize;
}
printf("*************************\n");
*(args->head) = (myhead + 1) % (args->fifosize + 1);
pthread_mutex_unlock(args->lock);
//memset(args->virtual_buffer, 0, args->buffsize);
elements++;
if (args->verbose > 2) {
fprintf(stderr, "[*] %f elements in FIFO: %lu\n", ((double)(clock() - start_time)) / CLOCKS_PER_SEC, elements);
fprintf(stderr, "[*] %f DMA tranfer to DDR: %d\n", ((double)(clock() - start_time)) / CLOCKS_PER_SEC, units_received);
}
if (args->verbose > 0) {
fprintf(stderr, "[*] Packet received");
}
if (args->verbose > 1) {
fprintf(stderr, " content: ");
memdump(args->virtual_buffer, args->buffsize);
}
if (args->verbose > 0) {
fprintf(stderr, "\n");
}
if (args->verbose > 2) {
fprintf(stderr, "[*] %f CPU Usage: %d\n", ((double)(clock() - start_time)) / CLOCKS_PER_SEC, GetCPULoad());
}
}
printf("77777777777777777777777777777\n");
}
}
int isValidIpAddress(char *ipAddress) {
struct sockaddr_in sa;
int result = inet_pton(AF_INET, ipAddress, &(sa.sin_addr));
if (result != 0) {
return 0;
} else {
return 1;
}
}
int main(int argc, char *argv[]) {
if (argc < 3 || argc > 5) {
fprintf(stderr, "\nUsage: DAQTest [IP address] [fifo size]\nExample: DAQTest 192.168.1.81 64\n\n");
fprintf(stderr, "Optional flags: -v Verbose (print operations)\n");
fprintf(stderr, " -vv Very verbose (also print data content)\n");
fprintf(stderr, " -vvv Extremely verbose (also print DMA info)\n\n");
exit(0);
}
if (isValidIpAddress(argv[1]) == 1) {
fprintf(stderr, "[-] Invalid ip address\n");
exit(0);
}
int fifosize = atoi(argv[2]);
if (fifosize < 0 || fifosize > 8192) {
fprintf(stderr, "[-] Invalid fifo size\n");
exit(0);
}
char verbose = 0;
if (argc == 4) {
if (strcmp(argv[3], "-v") == 0) {
verbose = 1;
} else if (strcmp(argv[3], "-vv") == 0) {
verbose = 2;
} else if (strcmp(argv[3], "-vvv") == 0) {
verbose = 3;
} else {
fprintf(stderr, "[-] Unwanted parameter\n");
exit(0);
}
}
struct sockaddr_in servaddr, cli;
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == sockfd) {
fprintf(stderr, "[-] Socket creation failed\n");
exit(0);
}
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr(argv[1]);
servaddr.sin_port = htons(PORT);
/*
struct timeval tv;
tv.tv_sec = TIMEOUT;
tv.tv_usec = 0;
setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
*/
if (connect(sockfd, (SA * ) & servaddr, sizeof(servaddr)) != 0) {
fprintf(stderr, "[-] Connection failed\n");
exit(0);
}
fprintf(stderr, "[+] Connected to the server\n");
int buffsize = 0;
char* recv_buffer = (char*)&buffsize;
int remaining = sizeof(int);
int received = 0;
int result = 0;
while (remaining > 0) {
result = recv(sockfd, recv_buffer + received, remaining, 0);
if (result > 0) {
remaining -= result;
received += result;
} else if (result == 0) {
fprintf(stderr, "[-] Remote side closed his end of the connection before all data was received\n");
exit(0);
} else if (result < 0) {
fprintf(stderr, "[-] Error retrieving configuration from the server\n");
exit(0);
}
}
//fprintf(stderr, "[*] Page size: %ld\n", sysconf(_SC_PAGE_SIZE));
int dh = open("/dev/mem", O_RDWR | O_SYNC);
unsigned int *virtual_address = mmap(NULL, 65535, PROT_READ | PROT_WRITE, MAP_SHARED, dh, VIRT_ADDR);
unsigned int *virtual_sendbuff = mmap(NULL, buffsize, PROT_READ | PROT_WRITE, MAP_SHARED, dh, SEND_ADDR);
unsigned int *virtual_recvbuff = mmap(NULL, buffsize, PROT_READ | PROT_WRITE, MAP_SHARED, dh, RECV_ADDR);
unsigned int *virtual_fifo = mmap(NULL, (fifosize + 1) * buffsize, PROT_READ | PROT_WRITE, MAP_SHARED, dh, FIFO_ADDR);
if (virtual_address == MAP_FAILED) {
fprintf(stderr, "[-] AXI DMA registers mmap failed\n");
}
if (virtual_sendbuff == MAP_FAILED) {
fprintf(stderr, "[-] Send buffer mmap failed\n");
}
if (virtual_sendbuff == MAP_FAILED) {
fprintf(stderr, "[-] Send buffer mmap failed\n");
}
if (virtual_recvbuff == MAP_FAILED) {
fprintf(stderr, "[-] Receiver buffer mmap failed\n");
}
if (virtual_fifo == MAP_FAILED) {
fprintf(stderr, "[-] Fifo mmap failed\n");
}
memset(virtual_address, 0, buffsize);
memset(virtual_sendbuff, 0, buffsize);
memset(virtual_recvbuff, 0, buffsize);
memset(virtual_fifo, 0, buffsize);
int head = 0, tail = 0;
pthread_t sendth, recvth;
pthread_mutex_t lock;
pthread_mutex_init(&lock, NULL);
arguments send_args;
send_args.virtual_address = virtual_address;
send_args.virtual_buffer = virtual_sendbuff;
send_args.head = &head;
send_args.tail = &tail;
send_args.buffsize = buffsize;
send_args.fifosize = fifosize;
send_args.sockfd = sockfd;
send_args.lock = &lock;
send_args.verbose = verbose;
arguments recv_args;
recv_args.virtual_address = virtual_address;
recv_args.virtual_buffer = virtual_recvbuff;
recv_args.head = &head;
recv_args.tail = &tail;
recv_args.buffsize = buffsize;
recv_args.fifosize = fifosize;
recv_args.sockfd = sockfd;
recv_args.lock = &lock;
recv_args.verbose = verbose;
start_time = clock();
if (pthread_create(&sendth, NULL, sender, &send_args)) {
fprintf(stderr, "[-] Error creating sender thread\n");
exit(0);
}
if (pthread_create(&recvth, NULL, receiver, &recv_args)) {
fprintf(stderr, "[-] Error creating receiver thread\n");
exit(0);
}
if (pthread_join(sendth, NULL)) {
fprintf(stderr, "[-] Error joining sender thread\n");
exit(0);
}
if (pthread_join(recvth, NULL)) {
fprintf(stderr, "[-] Error joining receiver thread\n");
exit(0);
}
pthread_mutex_destroy(&lock);
close(sockfd);
fprintf(stderr, "[+] Exit\n");
return 0;
}
Try replacing the print statements with delays.
As you're doing networking, you're speed constrained by your network connection. If you do not take this into considerations, your transfer buffers may overflow. The print statements add a certain delay, which might prevent this. Replacing them with actual delays would check for this.
A better solution - if this were indeed the problem - would then be to check for buffer availability before writing.