Thread-Safe Generic List Queue in C

S

Steven Carpenter

I wrote this implementation of a List Queue in C that attempts to be thread-safe by the use of pthread mutexes. The Queue entries hold void pointers to data so the Queue can be generic in type. I would like to improve the code if I can. Please, if anyone has any recommendations on improvements or can see any areas where I've done something wrong I would appreciate your help.

Note, one area that I know needs improvement is the q_delete() method. Thepthread_mutex_destroy() method specifically says not to call it while a mutex is being held, however I know by releasing the mutex it will potentially be given to the next thread that is waiting for it. Is there some type ofatomic way to unlock & destroy or a better design that anyone knows of?

------------------------"q.h"-below-----------------------------

#include <pthread.h>
#include <stdbool.h>

/* Q entry */
typedef struct q_entry_st {
void * data;
struct q_entry_st *next;
} q_entry_t;

/* Q structure */
typedef struct q_st {
struct q_entry_st *back;
struct q_entry_st *front;
int size; /* current size */
pthread_mutex_t mutex;
} q_t;

/* Queue Methods */
q_t *q_new();
void q_delete(q_t *q,
void (*delete_entry)(q_entry_t *entry));
void q_enqueue(q_t *q, q_entry_t *entry);
q_entry_t *q_dequeue(q_t *q);
q_entry_t *q_peek(q_t *q);
bool q_is_empty(q_t *q);
void q_print(q_t *q, void (*print_entry)(q_entry_t *entry));

--------------------------"q.c"-----------------------------------

#include "q.h"
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>

q_t *q_new()
{
q_t *q;

/* Allocate queue */
q = (q_t*) calloc(1, sizeof(q_t));
if (!q) return NULL;

/* Initialize pthread mutex */
pthread_mutex_init(&q->mutex, NULL);

return q;
}

void q_delete(q_t *q,
void (*delete_entry)(q_entry_t *entry))
{
if (!q || !delete_entry) return;

pthread_mutex_lock(&q->mutex);

while(!q_is_empty(q)) {
/* Delete all entries */
(*delete_entry)(q_dequeue(q));
}

pthread_mutex_unlock(&q->mutex);

/* Destroy pthread mutex
* Potential race conditions here... */
pthread_mutex_destroy(&q->mutex);

/* Free queue */
free(q);
}

void q_enqueue(q_t *q, q_entry_t *entry)
{
if (!q || !entry) return;

pthread_mutex_lock(&q->mutex);

/* First entry case */
if (q_is_empty(q)) {
q->front = q->back = entry;
}

/* Every other entry case */
else {
q->back->next = entry;
q->back = entry;
}

entry->next = NULL;
q->size++;

pthread_mutex_unlock(&q->mutex);
}

q_entry_t *q_dequeue(q_t *q)
{
if (!q) return NULL;

q_entry_t *entry = NULL;

pthread_mutex_lock(&q->mutex);

if (!q_is_empty(q)) {

/* Grab entry */
entry = q->front;

/* Only element? */
if (q->size == 1) {
q->front = q->back = NULL;
}

else {
q->front = entry->next;
}

q->size--;
}

pthread_mutex_unlock(&q->mutex);

return entry;
}

q_entry_t *q_peek(q_t *q)
{
if (!q) return NULL;

q_entry_t *entry = NULL;

pthread_mutex_lock(&q->mutex);

if (!q_is_empty(q)) {
entry = q->front;
}

pthread_mutex_unlock(&q->mutex);

return entry;
}

bool q_is_empty(q_t *q)
{
if (!q) return true;

pthread_mutex_lock(&q->mutex);

bool empty;

if (q->size == 0) {
empty = true;
} else {
empty = false;
}

pthread_mutex_unlock(&q->mutex);

return empty;
}

/* Prints list front-to-back order */
void q_print(q_t *q,
void (*print_entry)(q_entry_t *entry))
{
if (!q || !print_entry) return;

q_entry_t *entry;

pthread_mutex_lock(&q->mutex);

entry = q->front;
while (entry) {
/* print */
(*print_entry)(entry);

entry = entry->next;
}
}

-------------------------"main.c"-below-------------------

#include <stdio.h>
#include <stdlib.h>
#include "q.h"

void print_int(q_entry_t *entry);

void print_int(q_entry_t *entry)
{
if (!entry) {
printf("Error printing\n");
}

int *x = (int) entry->data;

printf ("Number: %d\n", *x);
}

/*
*
*/
int main(int argc, char** argv) {

q_t *q = q_new();

q_entry_t *a, *b, *c;
int *ai, *bi, *ci;

a = (q_entry_t*) calloc(1, sizeof(q_entry_t));
ai = (int *) calloc(1, sizeof(int));

b = (q_entry_t*) calloc(1, sizeof(q_entry_t));
bi = (int *) calloc(1, sizeof(int));

c = (q_entry_t*) calloc(1, sizeof(q_entry_t));
ci = (int *) calloc(1, sizeof(int));

*ai = 1;
a->data = ai;

*bi = 2;
b->data = bi;

*ci = 3;
c->data = ci;

q_enqueue(q, a);
q_print(q, &print_int);
printf("\n");

q_enqueue(q, b);
q_print(q, &print_int);
printf("\n");

q_enqueue(q, c);
q_print(q, &print_int);
printf("\n");

return 0;
}
 
K

Kaz Kylheku

I wrote this implementation of a List Queue in C that attempts to be
thread-safe by the use of pthread mutexes.

Nobody needs this kind of thing. Multi-threaded programs need a flexible
list or queue data structure, which can be protected in ways that the
designers see fit.

What if a list has to be kept atomically synchronized with several other
objects? The program needs a single mutex to protect all of them; a mutex
encapsulated in the list is useless, and only adds overhead on each call
to the list module.

Also, your queue operations are all non-blocking. when q_dequeue notices
that the queue is empty, it balks: unlocks the mutex and leaves.

Thus, implementing a producer/consumer type situation requires polling (or some
"out-of-band" signaling mechanism to say "I put something in the queue,
please check").
given to the next thread that is waiting for it. Is there some type of atomic
way to unlock & destroy or a better design that anyone knows of?

There is no safe way to destroy an object which is reachable to other threads.

Even if the mutex could be destroyed atomically somehow, that would not
help. "Atomic unlock and destroy" is nonsense". Okay, the mutex is atomically
destroyed, whatever that means, but some thread has a pointer to it, and
subsequently uses the memory anyway; memory which is free, or perhaps has
been reassigned to some new object.

The reason that destroying a locked mutex is erroneous in POSIX in that it is a
kind of logical contradiction. A mutex is locked because it guards something
against concurrent access. But when we are reclaiming an object, it means that
the object is no longer in use, and about to be gone for good. This means that
it should be unreachable to all threads other than the caller. But if the
object is unreachable, there is no need to lock the mutex; there isn't
any more concurrent access.

So, long story short, your q_delete method does not have to lock the mutex in
the first place. It is not correct to call q_delete if the queue might
still be in use.
 
S

Steven Carpenter

Nobody needs this kind of thing. Multi-threaded programs need a flexible

list or queue data structure, which can be protected in ways that the

designers see fit.


What if a list has to be kept atomically synchronized with several other

objects? The program needs a single mutex to protect all of them; a mutex

encapsulated in the list is useless, and only adds overhead on each call

to the list module.

After reading your response, I agree. Adding mutexes into the queue itselfcan have adverse affects as you've explained. I'll remove the pthreads so that locking will be the responsibility of the person using the queue.
Thus, implementing a producer/consumer type situation requires polling (or some

"out-of-band" signaling mechanism to say "I put something in the queue,

please check").

Do you have any recommendations for implementing a producer/consumer setup with a queue? I've seen using pthread_cond_t variables that the producer can signal to the consumer once there is work to do. Are there any other options?

....
So, long story short, your q_delete method does not have to lock the mutex in

the first place. It is not correct to call q_delete if the queue might

still be in use.

Thanks for your response, everything you said was very helpful. I plan on improving the queue and re-posting the source code for further speculation.Do you have any other suggestions or opinions? Maybe some additional features the queue should have or other methods to generally provide a more useful data structure.

One area I would like to improve is the return values for some of the methods. It seems that if the application code calls q_enqueue(), q_delete(), or q_is_empty() with NULL arguments, there should be more information given to the caller indicating something about failure. I've seen this information passed back through the errno variable before, would you say that is appropriate or have any alternate suggestions?
 
N

Noob

Steven said:
I wrote this implementation of a List Queue in C that attempts
to be thread-safe by the use of pthread mutexes.

I think comp.unix.programmer and comp.programming.threads
are more appropriate. Regards.
 
J

James Kuyper

I think comp.unix.programmer and comp.programming.threads
are more appropriate. Regards.

If it were converted to use C2011 <threads.h> rather <pthreads.h>, this
would be the more appropriate place to discuss it.
 
N

Noob

James said:
Steven said:
I wrote this implementation of a List Queue in C that attempts
to be thread-safe by the use of pthread mutexes.

I think comp.unix.programmer and comp.programming.threads
are more appropriate [for discussing threads]. Regards.

If it were converted to use C2011 <threads.h> rather <pthreads.h>,
this would be the more appropriate place to discuss it.

The problem with C11 threads is that they are spanking new.
Perhaps your suggestion will make more sense in 1-3 years.
 
J

James Kuyper

James said:
Steven Carpenter wrote:

I wrote this implementation of a List Queue in C that attempts
to be thread-safe by the use of pthread mutexes.

I think comp.unix.programmer and comp.programming.threads
are more appropriate [for discussing threads]. Regards.

If it were converted to use C2011 <threads.h> rather <pthreads.h>,
this would be the more appropriate place to discuss it.

The problem with C11 threads is that they are spanking new.
Perhaps your suggestion will make more sense in 1-3 years.

Well, being inexperienced in multi-threaded code, I'd be happy to read a
discussion, in this forum, by threading experts, about how to convert
your code to use C2011 threads (assuming it doesn't depend upon any
feature of <pthreads.h> that is missing from <threads.h>).
 
S

Sean Burke

Nobody needs this kind of thing. Multi-threaded programs need a flexible
list or queue data structure, which can be protected in ways that the
designers see fit.

What if a list has to be kept atomically synchronized with several other
objects? The program needs a single mutex to protect all of them; a mutex
encapsulated in the list is useless, and only adds overhead on each call
to the list module.

While this is often true of container classes in general -
that you need to synchronize addition/removal/lookup with
other operations outside the container, so an internal
mutex is redundant - well, this is certainly NOT true of
message or work queues. Which you yourself point out below:
Also, your queue operations are all non-blocking. when q_dequeue notices
that the queue is empty, it balks: unlocks the mutex and leaves.
Thus, implementing a producer/consumer type situation requires polling (or some
"out-of-band" signaling mechanism to say "I put something in the queue,
please check").

Hmm, some sort of blocking mechanism... hey, what about pthread_cond_wait()? Yes, that would work, but it would require... wait for it ... a mutex!
There is no safe way to destroy an object which is reachable to other threads.

Agreed. The solution to this, is to know when your object
is no longer reachable by other threads. You can depend on
the structure of your application to ensure this, but if you
want to have "built-in" safety, you generally want to use
reference counts.

I think that the best way to implement reference-counted
shared objects in C, is to use handles. A handle is simply
an integer that maps to a pointer via some global map.

In this approach, threads do not have a pointer to the queue,
they only get a handle. In order to perform operations on the
queue, such as pushing or popping an item, they call APIs
which performs the following steps:

1) look up the handle in the global map.

This lookup must atomically increment the queue's reference
count. (Notice how, as we just discussed, a mutex embedded
in the map object, such as a hash table, would be redundant,
because we also need a mutex to synchronize the lookup with
the reference-count-increment.)

2) pass the pointer obtained from step 1 to private calls
that perform the desired operation - enqueue, dequeue, etc.

3) decrement the queue's reference count, and return the
result of step 2) to the caller.

If you implement your queue in this way, then the only
permanent pointer to your object is in the global map.
All other pointers are private and temporary - they exist
only for the span of an API call.

So, when you want to destroy the queue, you first delete
its handle from the global map, so that no new references
can be obtained. Then you set a flag in the object, to
destroy the queue when its reference count drops to zero.

This may sound like a lot of trouble, but fortunately,
it is possible to implement the core mechanism once,
and share it among a variety of thread-safe objects.
In fact, I illustrate how to do this in libnifty,
which you can download from SourceForge:

http://sourceforge.net/projects/libnifty/

It includes an example shared queue, implemented
as I have described.

It also includes is a very useful package for
scheduling tasks to execute in the future,
also using the same core mechanisms for handles
and reference counting that I have just described.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,768
Messages
2,569,574
Members
45,051
Latest member
CarleyMcCr

Latest Threads

Top