In the following code, each thread runs sell_tickets()
function, takes the mutex lock and decrement the tickets number. In order to limit the active threads number, the checkin()
and checkout()
threads is a producer-consumer model using semaphore.
But in the unit-test, it seems that consumer function checkout()
calls pthread_join()
function before the producer checkin()
even creating new threads. So anything wrong? Is it because threads don't share the stack memory segment, since I didn't malloc()
heap space for some parameters.
/**
* Using semaphore to limit maximum thread number.
*/
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
/**
* If arguments that pthread_create() pass to sell_tickets() are more than one,
* they need to be wrapped into a single structure.
*/
typedef struct {
unsigned agent_id; // simulate an agent
unsigned tickets_tosell; // agent's personal goal of the day
pthread_mutex_t *pool_lock; // mutex lock for visiting the shared tickets pool
unsigned *tickets_pool; // shared tickets pool
} agent;
/**
* Constructor
*/
static void new_agent(agent *a, unsigned agentid, unsigned ticketsnum, pthread_mutex_t *lock, unsigned *tickets_pool) {
a->agent_id = agentid;
a->tickets_tosell = ticketsnum;
a->pool_lock = lock;
a->tickets_pool = tickets_pool;
}
/**
* Implement void *(*start_rtn)(void *);
* -------------------------------------
* Each thread execute this function.
*/
static void *sell_tickets(void *agent_addr) {
agent *a = (agent *)agent_addr;
while (a->tickets_tosell > 0) {
pthread_mutex_lock(a->pool_lock);
(*a->tickets_pool)--;
printf("agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
pthread_mutex_unlock(a->pool_lock);
a->tickets_tosell--;
printf("agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
}
pthread_exit((void *)&a->agent_id);
}
typedef struct {
/* shared threads pool */
pthread_t *pool_addr;
unsigned pool_size;
/* a pair of producer-consumer semaphore */
sem_t *producer_sem;
sem_t *consumer_sem;
/* producer & consumer thread id */
pthread_t *producer_tid;
pthread_t *consumer_tid;
} threads_pool;
static void new_threads_pool(threads_pool *tp,
pthread_t *pool_addr, unsigned pool_size,
sem_t *producer_sem, sem_t *consumer_sem,
pthread_t *ptid, pthread_t *ctid) {
tp->pool_addr = pool_addr;
tp->pool_size = pool_size;
tp->producer_sem = producer_sem;
tp->consumer_sem = consumer_sem;
tp->producer_tid = ptid;
tp->consumer_tid = ctid;
}
typedef struct {
sem_t *checkin_b;
sem_t *checkout_b;
} barrier;
static void new_barrier(barrier *b, sem_t *inb, sem_t *outb) {
b->checkin_b = inb;
b->checkout_b = outb;
}
typedef struct {
unsigned num_agents;
unsigned num_tickets;
} project;
static void new_project(project *p, unsigned num_agents, unsigned num_tickets) {
p->num_agents = num_agents;
p->num_tickets = num_tickets;
}
typedef struct {
project *pj;
threads_pool *tp;
barrier *b;
} project_params;
static void new_project_params(project_params *pp, project *pj, threads_pool *tp, barrier *b) {
pp->pj = pj;
pp->tp = tp;
pp->b = b;
}
/**
* producer thread
* create agent sell_tickets() threads
*/
static void *checkin(void *params) {
project_params *pp = (project_params *)params;
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(pp->b->checkout_b);
sem_wait(pp->b->checkin_b);
unsigned tickets_pool = pp->pj->num_tickets; // shared resourses
pthread_mutex_t tickets_pool_lock; // mutex object
agent agents[pp->pj->num_agents]; // arguments pass to 10 threads
unsigned id; // agent id
pthread_t tid; // current sell_tickets() thread id
int err; // thread_create() function return the error code
pthread_mutex_init(&tickets_pool_lock, NULL);
for (unsigned i = 0; i < pp->pj->num_agents; i++) {
id = i + 1;
new_agent(&agents[i], id, pp->pj->num_tickets / pp->pj->num_agents, &tickets_pool_lock, &tickets_pool);
sem_wait(pp->tp->producer_sem);
err = pthread_create(pp->tp->pool_addr + (i % pp->tp->pool_size), NULL, sell_tickets, &agents[i]);
if (err != 0) {
printf("error[%d]: can't create thread.", err);
pthread_cancel(*pp->tp->consumer_tid);
pthread_exit(NULL);
} else {
tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
printf("thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
sem_post(pp->tp->consumer_sem);
}
}
/* wait for checkout() thread to finish */
pthread_join(*pp->tp->consumer_tid, NULL);
pthread_mutex_destroy(&tickets_pool_lock);
if (tickets_pool == 0) {
printf("Today's tickets are sold out!\n");
} else {
printf("%d tickets left at the end of the day!\n", tickets_pool);
}
pthread_exit(NULL);
}
/**
* consumer thread
* wait for old agent sell_tickets() thread to exit and increment the producer semaphore
*/
static void *checkout(void *params) {
project_params *pp = (project_params *)params;
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(pp->b->checkin_b);
sem_wait(pp->b->checkout_b);
void *tret; // to store pthread_exit() exit status rval_ptr
int errcode; // pthread_join() return err code
for (unsigned i = 0; i < pp->pj->num_agents; i++) {
sem_wait(pp->tp->consumer_sem);
errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
sem_post(pp->tp->producer_sem);
printf("agent@%d has finished his job!\n", *(unsigned *)tret);
}
pthread_exit(NULL);
}
/**
* producer-consumer model using semaphore
* maximum 10 agent sell_tickets() threads work at a time
*/
static void run(void) {
unsigned num_agents = 30;
unsigned num_tickets = 300;
/*
* project
*/
project pj;
new_project(&pj, num_agents, num_tickets);
/*
* thread pool of maximum 10 thread
*/
unsigned threads_pool_size = 10;
pthread_t pool[threads_pool_size];
/*
* checkin() and checkout() threads
*/
pthread_t checkin_tid, checkout_tid;
int checkin_err, checkout_err;
/* a pair of producer-consumer semaphores */
sem_t *checkin_sem, *checkout_sem;
checkin_sem = sem_open("/checkin_sem", O_CREAT, S_IRWXG, threads_pool_size);
checkout_sem = sem_open("/checkout_sem", O_CREAT, S_IRWXG, 0);
/* barrier */
barrier b;
sem_t *checkin_b, *checkout_b;
checkin_b = sem_open("/checkin_b", O_CREAT, S_IRWXG, 0);
checkout_b = sem_open("/checkout_b", O_CREAT, S_IRWXG, 0);
new_barrier(&b, checkin_b, checkout_b);
/* wrap checkin() checkout() parameters into a single structure */
threads_pool tp;
new_threads_pool(&tp, &pool[0], threads_pool_size, checkin_sem, checkout_sem, &checkin_tid, &checkout_tid);
project_params pp;
new_project_params(&pp, &pj, &tp, &b);
/* create checkin(), checkout() threads */
checkin_err = pthread_create(&checkin_tid, NULL, checkin, &pp);
checkout_err = pthread_create(&checkout_tid, NULL, checkout, &pp);
if (checkin_err != 0 || checkout_err != 0) {
if (checkin_err != 0) printf("error: checkin() thread creation failed!\n");
if (checkout_err != 0) printf("error: checkout() thread creation failed!\n");
sem_close(checkin_sem);
sem_close(checkout_sem);
sem_close(checkin_b);
sem_close(checkout_b);
return;
}
/*
* wait for checkin() and checkout() to exit
*/
pthread_join(checkin_tid, NULL);
pthread_join(checkout_tid, NULL);
/*
* free resources
*/
sem_close(checkin_sem);
sem_close(checkout_sem);
sem_close(checkin_b);
sem_close(checkout_b);
}
int main(void) {
run();
}
lldb
shows that thread_join()
return error code 3
:ESRCH
.
Process 46631 stopped
* thread #3, stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
frame #0: 0x0000000100003b39 agtktp`checkout(params=0x00007ffeefbff618) at agents_tickets_pool.c:221:46
217 errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
218 pthread_t tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
219 printf("thread@%u = %lx\n", i + 1, (unsigned long)tid);
220 sem_post(pp->tp->producer_sem);
-> 221 printf("agent@%d has finished his job!\n", *(unsigned *)tret);
222 }
223 pthread_exit(NULL);
224 }
Target 0: (agtktp) stopped.
(lldb) fr v errcode
(int) errcode = 3
(lldb) fr v tid
(pthread_t) tid = 0x0000000000000050
=== 01/03/2021 UPDATE 2 ===
I've posted the solution. Feel free to correct me if something wrong. Thanks everyone, especially @Semion.
=== END 01/03/2021 UPDATE 2 ===
=== 01/03/2021 UPDATE 1 ===
Currently I have fixed the problem, and find out I've made a typical mistake in the following code.
I need some time to re-edit this question and give an answer, to make it useful for later POSIX Semaphore user. Please DO NOT close this question (it has already 2 close votes). Thanks.
=== END 01/03/2021 UPDATE 1 ===
The bug is fixed now. The problem is in sem_close()
.
According to "Unix Networking Program - Vol.2 - ch10", sem_close()
closes a semaphore, but does not remove it from the system shared memory. POSIX named semaphores are "kernel-persistent".
So basically I'm using the same pair of old semaphore called /checkin_sem
and checkout_sem
for each run. Once the program exit improperly and leave some strange values in the semaphore, all subsequent tests will crash. That's why some others said that the program works well on their machine. Because they have no legacy semaphore on their system.
To solve this, we should use sem_unlink()
instead, with which semaphore name will be removed from system, and the destruction of the semaphore will take place until the last sem_close()
occurs. Since close operation occurs automatically on process termination, we don't have to write sem_close()
explicitly.
sem_unlink("/checkin_sem");
sem_unlink("/checkout_sem");
sem_unlink("/checkin_b");
sem_unlink("/checkout_b");
And to prevent the legacy semaphore affect our program, it's better to add O_EXCL
when create the semaphore, which will pop an error when semaphore already exists.
checkin_sem = sem_open("/checkin_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, threads_pool_size);
checkout_sem = sem_open("/checkout_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
At last, for the permission flag, if the program doesn't concern inter-process communication, the simple read
, write
for owner is enough, which is S_IREAD | S_IWRITE
or S_IRUSR | S_IWUSR
.
Here's the final version of this producer-consumer threads pool.
/**
* Using semaphore to limit maximum thread number.
*/
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
/**
* If arguments that pthread_create() pass to sell_tickets() are more than one,
* they need to be wrapped into a single structure.
*/
typedef struct {
unsigned agent_id; // simulate an agent
unsigned tickets_tosell; // agent's personal goal of the day
unsigned *tickets_pool; // shared tickets pool
pthread_mutex_t *pool_lock; // mutex lock for visiting the shared tickets pool
} agent;
/**
* Constructor
*/
static void new_agent(agent *a, unsigned agentid, unsigned tickets_num, unsigned *pool, pthread_mutex_t *lock) {
a->agent_id = agentid;
a->tickets_tosell = tickets_num;
a->tickets_pool = pool;
a->pool_lock = lock;
}
/**
* Implement void *(*start_rtn)(void *);
* -------------------------------------
* Each thread execute this function.
*/
static void *sell_tickets(void *agent_addr) {
agent *a = (agent *)agent_addr;
while (a->tickets_tosell > 0) {
pthread_mutex_lock(a->pool_lock); // begin of race condition
(*a->tickets_pool)--;
fprintf(stdout, "agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
fflush(stdout);
pthread_mutex_unlock(a->pool_lock); // end of race condition
a->tickets_tosell--;
fprintf(stdout, "agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
fflush(stdout);
}
pthread_exit((void *)&a->agent_id);
}
/**
* Simulate a tickets selling project
*/
struct {
unsigned num_agents;
unsigned num_tickets;
} project;
/**
* Shared threads pool
*/
struct {
unsigned pool_size;
pthread_t *pool;
} threads_pool;
/**
* Producer, consumer threads id
*/
struct {
pthread_t producer;
pthread_t consumer;
} producer_consumer;
/**
* A pair of semaphore
*/
struct {
sem_t *producer_sem;
sem_t *consumer_sem;
} sem;
/**
* Barrier is not implemented in mac os, we use a pair of semaphore instead.
*/
struct {
sem_t *producer_b;
sem_t *consumer_b;
} barrier;
/**
* producer thread
* create agent sell_tickets() threads
*/
static void *producer(void *arg) {
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(barrier.consumer_b);
sem_wait(barrier.producer_b);
unsigned tickets_pool = project.num_tickets; // shared resource for each agent
pthread_mutex_t tickets_pool_lock; // mutex lock for visiting tickets pool
agent agents[project.num_agents]; // arguments pass to 10 threads
unsigned id; // agent id
pthread_t tid; // current sell_tickets() thread id
int err; // thread_create() return err code
pthread_mutex_init(&tickets_pool_lock, NULL);
for (unsigned i = 0; i < project.num_agents; i++) {
id = i + 1;
new_agent(&agents[i], id, project.num_tickets / project.num_agents, &tickets_pool, &tickets_pool_lock);
sem_wait(sem.producer_sem);
err = pthread_create(threads_pool.pool + i % threads_pool.pool_size, NULL, sell_tickets, &agents[i]);
if (err != 0) {
fprintf(stdout, "error[%d]: can't create thread.\n", err);
fflush(stdout);
pthread_cancel(producer_consumer.consumer);
pthread_exit(NULL);
} else {
tid = *(threads_pool.pool + i % threads_pool.pool_size);
fprintf(stdout, "thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
fflush(stdout);
sem_post(sem.consumer_sem);
}
}
/* wait for checkout() thread to finish */
pthread_join(producer_consumer.consumer, NULL);
pthread_mutex_destroy(&tickets_pool_lock);
if (tickets_pool == 0) {
fprintf(stdout, "Today's tickets are sold out!\n");
fflush(stdout);
} else {
fprintf(stdout, "%d tickets left at the end of the day!\n", tickets_pool);
fflush(stdout);
}
pthread_exit(NULL);
}
/**
* consumer thread
* wait for old agent sell_tickets() thread to exit and increment the producer semaphore
*/
static void *consumer(void *arg) {
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(barrier.producer_b);
sem_wait(barrier.consumer_b);
void *tret; // to store pthread_exit() exit status rval_ptr
for (unsigned i = 0; i < project.num_agents; i++) {
sem_wait(sem.consumer_sem);
pthread_join(*(threads_pool.pool + i % threads_pool.pool_size), &tret);
sem_post(sem.producer_sem);
fprintf(stdout, "agent@%d has finished his job!\n", *(unsigned *)tret);
fflush(stdout);
}
pthread_exit(NULL);
}
/**
* maximum 10 agent sell_tickets() threads work at a time
*/
static void run(void) {
/* project */
project.num_agents = 30;
project.num_tickets = 300;
/* threads pool */
threads_pool.pool_size = 10;
pthread_t pool[threads_pool.pool_size];
threads_pool.pool = &pool[0];
/* producer-consumer semaphore */
sem.producer_sem = sem_open("/producer_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, threads_pool.pool_size);
sem.consumer_sem = sem_open("/consumer_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
/* barrier */
barrier.producer_b = sem_open("/producer_b", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
barrier.consumer_b = sem_open("/consumer_b", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
/* create producer(), consumer() threads */
int producer_err = pthread_create(&producer_consumer.producer, NULL, producer, NULL);
int consumer_err = pthread_create(&producer_consumer.consumer, NULL, consumer, NULL);
if (producer_err != 0 || consumer_err != 0) {
if (producer_err != 0) {
fprintf(stdout, "error: checkin() thread creation failed!\n");
fflush(stdout);
}
if (consumer_err != 0) {
fprintf(stdout, "error: checkout() thread creation failed!\n");
fflush(stdout);
}
sem_unlink("/producer_sem");
sem_unlink("/consumer_sem");
sem_unlink("/producer_b");
sem_unlink("/consumer_b");
return;
}
/*
* wait for checkin() and checkout() to exit
*/
pthread_join(producer_consumer.producer, NULL);
pthread_join(producer_consumer.consumer, NULL);
/*
* free resources
* --------------
* Always use sem_unlink() to remove the semaphore, but not sem_close().
* Close a semaphore does NOT remove the semaphore from system shared memory.
* Close operation occurs automatically on process termination, we don't have
* to write sem_close() explicitly.
*/
sem_unlink("/producer_sem");
sem_unlink("/consumer_sem");
sem_unlink("/producer_b");
sem_unlink("/consumer_b");
}
int main(void) {
run();
}