I'm building a UDP non-blocking client and server socket pair with SIGIO facilitating the reception of the data on the server. Here's my situation. I have a SIGIO activating when the client sends a message to me. However, the messages are approx. 3 bursts of 1024-byte packets (i.e. the client sends 3 packets all at once). I want the server to trigger on SIGIO on the first packet, and in the handler I have a while loop that goes through up to 2 of the 3 packets and reads them (for this current run). However, I have several runs of these 3 packet sends, and they are separated by 5 seconds. So I want the first packet to trigger the SIGIO, the while empties the buffer, and inside the signal handler I cancel the one SIGIO pending signal that might be caused from the remaining 3-1 packets on the first run, and re-enable SIGIO for the next run first packet (in 5 secs). It'll be easier to explain if you just look at the code:
Client:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
#include <math.h>
#include <time.h>
static const unsigned int TIMEOUT_SECS = 2; // Seconds between retransmits
void error(const char *msg, const char *detail);
int main(int argc, char *argv[]) {
if ((argc < 6) || (argc > 7))
error("Parameter(s)", "<Server Address/Name> <Echo File> <Server Port/Service> <Maximum packet length> <Number of trials> [<Seconds between trials>]");
char *server = argv[1];
FILE *fp = fopen(argv[2], "rb");
if (fp == NULL)
error("Error opening file", "");
fseek(fp, -1, SEEK_END);
unsigned long stringlen = (getc(fp) == '\n' ? (ftell(fp) - 1) : ftell(fp));
rewind(fp);
char *string = malloc(stringlen * sizeof(char));
if (string == NULL)
error("malloc() failed", "");
for (int i = 0; i < stringlen; i++) {
*(string + i) = fgetc(fp);
}
printf("%lu\n", stringlen);
fclose(fp);
in_port_t service = (in_port_t) strtol(argv[3], NULL, 10);
int maxpack = (int) strtol(argv[4], NULL, 10);
int trials = (int) strtol(argv[5], NULL, 10); // must coordinate with server
if (trials < 1)
error("invalid trials parameter", "entered a nonpositive number");
int sleeptime = (argc == 7) ? (int) strtol(argv[6], NULL, 10) : 5;
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sock < 0)
error("socket() failed", "");
struct timeval timeout;
timeout.tv_sec = TIMEOUT_SECS;
timeout.tv_usec = 0;
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &timeout, sizeof(timeout)) < 0)
error("setsockopt() failed", "");
if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &timeout, sizeof(timeout)) < 0)
error("setsockopt() failed", "");
struct sockaddr_in servAddr;
memset(&servAddr, 0, sizeof(servAddr));
servAddr.sin_family = AF_INET;
int rc = inet_pton(AF_INET, server, &servAddr.sin_addr.s_addr);
if (rc == 0)
error("inet_pton() failed", "invalid address string");
else if (rc < 0)
error("inet_pton() failed", "");
servAddr.sin_port = htons(service);
int packnum = (int) ceil((double) stringlen / maxpack);
for (int num = 0; num < trials; num++) {
struct timespec start, end;
uint64_t diff;
ssize_t numBytes;
for (int i = 0; i < packnum; i++) {
if (i == 0)
clock_gettime(CLOCK_MONOTONIC, &start);
numBytes = sendto(sock, string + i * maxpack, maxpack, 0, (struct sockaddr* ) &servAddr, sizeof(servAddr));
if (numBytes < 0)
error("sendto() failed", "");
}
struct sockaddr_in fromAddr;
socklen_t fromAddrlen = sizeof(fromAddr);
char buffer[strlen(argv[3]) + 1 + packnum + 1];
memset(&buffer, 0, sizeof(buffer));
do {
numBytes = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *) &fromAddr, &fromAddrlen);
if (numBytes < 0)
error("recvfrom() failed", "");
buffer[numBytes] = '\0';
} while ((int) strtol(buffer, NULL, 10) == num);
clock_gettime(CLOCK_MONOTONIC, &end);
diff = 1000000000L * (end.tv_sec - start.tv_sec) + end.tv_nsec - start.tv_nsec;
printf("Received: %s\n", buffer);
printf("Time: %f ms\n", (double) (diff / (double) 1000000));
sleep(sleeptime);
}
free(string);
close(sock);
exit(0);
}
void error(const char *msg, const char *detail) {
fputs(msg, stderr);
if (*detail != '\0') {
fputs(": ", stderr);
fputs(detail, stderr);
}
fputc('\n', stderr);
exit(1);
}
Server:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <limits.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>
#define ITOA_BASE_N (sizeof(unsigned)*CHAR_BIT + 2)
void error(const char *msg, const char *detail);
void PrintSocketAddress(const struct sockaddr *address, FILE *stream);
void SIGIOHandler(int signalType);
char * itoa_base(char *s, int x, int base);
void recvflush(int sockfd);
#define ITOA(x,b) itoa_base((char [ITOA_BASE_N]){0} , (x), (b))
int sock, acknum, maxpack; // GLOBAL for signal handler
volatile sig_atomic_t run, packets; // VOLATILE FOR signal handler modification
struct sigaction handler;
int main(int argc, char *argv[]) {
if (argc != 5)
error("Parameter(s)", "<Server Port/Service> <Number of ACKs> <Maximum packet length> <Number of trials>");
in_port_t service = (in_port_t) strtol(argv[1], NULL, 10);
acknum = (int) strtol(argv[2], NULL, 10);
maxpack = (int) strtol(argv[3], NULL, 10);
int trials = (int) strtol(argv[4], NULL, 10); // must coordinate with client
run = 0;
sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sock < 0)
error("socket() failed", strerror(errno));
struct sockaddr_in servAddr;
memset(&servAddr, 0, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
servAddr.sin_port = htons(service);
if (bind(sock, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0)
error("bind() failed", strerror(errno));
handler.sa_handler = SIGIOHandler;
if (sigfillset(&handler.sa_mask) < 0)
error("sigfillset() failed", "mask not set");
handler.sa_flags = 0;
if (sigaction(SIGIO, &handler, 0) < 0)
error("sigaction() failed", "SIGIO behavior unable to be modified");
if (fcntl(sock, F_SETOWN, getpid()) < 0)
error("Unable to set process owner to us", "");
if (fcntl(sock, F_SETFL, O_NONBLOCK | FASYNC) < 0)
error(
"Unable to put client sock into non-blocking/async mode", "");
recvflush(sock);
for (;;) {
if (run == trials) {
close(sock);
exit(0);
}
printf(".");
fflush(stdout);
sleep(1);
}
}
void SIGIOHandler(int signalType) {
printf("SIGIO\n");
packets = 0;
run++;
struct sockaddr_in clntAddr;
socklen_t clntLen = sizeof(clntAddr);
ssize_t numBytes;
//FILE *fp = fopen("output.txt", "ab");
//if (fp == NULL)
//error("Error opening file", "");
char buffer[maxpack + 1];
//char *output = malloc(2000 * (maxpack + 1) * sizeof(char));
//if (output == NULL)
//error("malloc() failed", "");
memset(buffer, 0, sizeof(buffer));
//memset(output, 0, sizeof(output));
while (packets < 2) { // read up to 2 packets (1024 bytes each) from buffer
numBytes = recvfrom(sock, buffer, maxpack, 0, (struct sockaddr *) &clntAddr, &clntLen);
if (numBytes < 0) {
if (errno != EWOULDBLOCK) // only acceptable error
error("recvfrom() failed", strerror(errno));
else
printf("%d EWOULDBLOCK", numBytes);
} else {
if (numBytes > 0) {
packets++;
buffer[numBytes] = '\0';
//memmove(output + maxpack * packets, buffer, strlen(buffer));
}
printf("Run-Packet numBytes: %d-%d %d\n", run, packets, numBytes);
printf("\t%s\n", buffer);
}
}
sigset_t mask;
sigpending(&mask);
if (sigismember(&mask, SIGIO))
printf("pending SIGIO signal\n");
printf("setting ignore\n");
struct sigaction change;
change.sa_handler = SIG_IGN; // set ignore handler to cancel pending SIGIO signals from 1024 - 1 packets
if (sigfillset(&change.sa_mask) < 0)
error("sigfillset() failed", "mask not set");
change.sa_flags = 0;
if (sigaction(SIGIO, &change, 0) < 0)
error("sigaction() failed", "SIGIO behavior unable to be modified");
sigpending(&mask);
if (sigismember(&mask, SIGIO))
printf("pending SIGIO signal\n"); // should NOT show up
recvflush(sock); // flush ignored bytes
//for (int i = 0; i < strlen(output); i++)
//putc(output[i], fp);
//putc('\0', fp);
//fclose(fp);
//free(output);
char *ack = ITOA(run, 10);
char *append = ITOA(packets, 10);
strcat(ack, "/"); // fix client side
strcat(ack, append);
for (int i = 0; i < acknum; i++) {
numBytes = sendto(sock, ack, sizeof(ack), 0, (struct sockaddr *) &clntAddr, clntLen);
if (numBytes < 0)
error("sendto() failed", strerror(errno));
}
if (sigaction(SIGIO, &handler, 0) < 0)
error("sigaction() failed", "SIGIO behavior unable to be modified"); // reenable SIGIO for subsequent runs
printf("HERE\n");
}
char * itoa_base(char *s, int x, int base) {
s += ITOA_BASE_N - 1;
*s = '\0';
if (base >= 2 && base <= 36) {
int x0 = x;
do {
*(--s) = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"[abs(x % base)];
x /= base;
} while (x);
if (x0 < 0) {
*(--s) = '-';
}
}
return s;
}
void recvflush(int sockfd) {
ssize_t num;
do {
num = recvfrom(sockfd, NULL, 0, 0, NULL, NULL);
if (num < 0 && errno != EWOULDBLOCK)
error("recvfrom() failed", strerror(errno));
} while (errno != EWOULDBLOCK);
}
Now, I start the server first with parameters 8080 10 1024 2
and pipe it to look at the first 10,000 chars using | head -c 10000
. Then, I start the client with parameters 192.168.4.101 test.txt 8080 1024 2
. The test.txt
is filled with approx. 3000 junk characters, which should split into 3 packets of 1024 (notice ceil
in client). However, whenever I send the information, a very peculiar thing happens.
Sometimes, everything works perfectly and I receive the output as expected, with 5 seconds in between buffer printf
s. Other times, it screws up and generates a EWOULDBLOCK error (see printf("%d EWOULDBLOCK", numBytes);
in server code) forever. Every time it generates this error, it still reads the ignored packet (since I'm only reading 2 of the 3 packets and flushing the rest with recvflush
) even though I'm flushing the receive buffer. Supposedly, recvflush
clears out the entire buffer to prepare for subsequent runs but for some reason the ignored packet stays there. Help!
Thanks
You are assuming that all 3 packets are already in the socket buffer at the time the SIGIO triggers your signal handler. But the SIGIO gets triggered on the first packet which means there might be only a single packet or two packets in the socket buffer.
Assuming there is only a single packet the second recv
will fail with EWOULDBLOCK since there are no more packets. Similar the recv
in recvflush
will fail since the missing packets are not there yet. And no SIGIO will be triggered either if the packets are not there so any calls of sigpending
will not be able to detect that there are outstanding data.