Pattern for queue + thread pool with in-order processing?

M

meselfo

Im writing a server that accepts connections from multiple network
clients. Messages from these client are submitted as tasks to worker
threads through a bounded queue. Messages coming from the _SAME_
client needs to be processed in the order they were received from that
client. How do I enforce this requirement?

Im using ThreadPoolExecutor from the java api which has a built in
queue for submitting tasks. If I stamp messages from the same client
with a sequence number and make worker threads aware of this number so
that they pause processing a message if the number indicates that
another message from the same client should finish processing first
then i'm effectively blocking that worker thread - thus a client
sending many messages fast could reduce the number of effective worker
threads. There must be some sort of pattern on how to process some
tasks submitted to a pool of worker thread in order??
 
G

Giovanni Azua

hi there,

meselfo said:
Im writing a server that accepts connections from multiple network
clients. Messages from these client are submitted as tasks to worker
threads through a bounded queue. Messages coming from the _SAME_
client needs to be processed in the order they were received from that
client. How do I enforce this requirement?

[snip]
If you can assure to have a finite number of different clients or expect it
to be reasonably bounded then I think a good solution would be to dedicate a
single Queue per client. You would insert in the appropriate Queue depending
on the Client ID or e.g. in JMS terms a Correlation ID. I think you have a
very good use-case for using the ArrayBlockingQueue implementation:
http://java.sun.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html

With this setup I think you will have minimal contention since multiple
threads will consume from multiple orders without having to worry about
breaking the per-client-ordering invariant. However, I would be curious as
to whether there are better performing alternatives.

HTH,
Best regards,
Giovanni
 
M

Mike Schilling

meselfo said:
Im writing a server that accepts connections from multiple network
clients. Messages from these client are submitted as tasks to worker
threads through a bounded queue. Messages coming from the _SAME_
client needs to be processed in the order they were received from
that
client. How do I enforce this requirement?

Im using ThreadPoolExecutor from the java api which has a built in
queue for submitting tasks. If I stamp messages from the same client
with a sequence number and make worker threads aware of this number
so
that they pause processing a message if the number indicates that
another message from the same client should finish processing first
then i'm effectively blocking that worker thread - thus a client
sending many messages fast could reduce the number of effective
worker
threads.

Right, you don't want to change how the worker threads execute. You
want to change the order in which messages are pulled off the queue.
There must be some sort of pattern on how to process some
tasks submitted to a pool of worker thread in order??

There are two things you need to ensure:

1. No messages from client A get processed as long as any other
messages from client A are still being processed.
2. When it's safe to process another message from client A, process
the earliest one.

You get 2 for free, as it's how queues work. All you really need to
do is:

A. Keep data about which clients have messages currently being
processed. The simplest thing here is a Set into which clients are
added when a job starts and removed when it ends.
B. Create a queue implementation that skips messages whose clients are
in the set from part A. That is, if messages from C1 and C2, but not
C3, are currently being processed, and the queue looks like

C1.5, C1.6, C2.4, C3.6

the next item returned would be C3.6. Now, if the C1 processing
completes, the next item returned will be C1.5.

The tricky bit here, I think, will be implementing your queue's poll()
method, since it needs to check whether to complete both when

i. a new item is added to the queue, and
ii. message-processing completes (i.e. a client is removed from the
Set), which may make an existing queue item newly eligible to be
returned.
 
M

Mike Schilling

Mike said:
Right, you don't want to change how the worker threads execute. You
want to change the order in which messages are pulled off the queue.


There are two things you need to ensure:

1. No messages from client A get processed as long as any other
messages from client A are still being processed.
2. When it's safe to process another message from client A, process
the earliest one.

You get 2 for free, as it's how queues work. All you really need to
do is:

Here's a simpler version (since it doesn't require implementing a new
kind of BlockingQueue)
A. Keep data about which clients have messages currently being
processed. The simplest thing here is a Set into which clients are
added when a job starts and removed when it ends.

This changes slightly:

A. Keep data about which clients have messages which have been queued
to the ThreadPoolExecutor but are not yet fully processed. The
simplest thing
here is a Set into which clients areadded when a message is queued and
removed
when it is done being processed.
B. Create a queue implementation that skips messages whose clients
are
in the set from part A. That is, if messages from C1 and C2, but
not
C3, are currently being processed, and the queue looks like

C1.5, C1.6, C2.4, C3.6

the next item returned would be C3.6. Now, if the C1 processing
completes, the next item returned will be C1.5.

Instead of this,

B1. Use a standard BlockingQueue impelmentation for the
ThreadPoolExecutor,
but instead of adding new messages to it directly:
B2. For each client, create a LinkedList of messages to be processed,
which will
eventually be moved to the BlockingQueue, and do the following:

WHEN a message is received from a client
IF the client is currently in the Set described above
THEN
Append that message to that client's LinkedList.
ELSE
Append it directly to the Blocking Queue
Add the client to the Set

WHEN a client finishes processing a message
IF the client's LinkedList is empty
THEN
Remove the client from the Set
ELSE
Move the oldest message from the client's LinkedList to
the BlockingQueue
 
M

meselfo

This changes slightly:

A. Keep data about which clients have messages which have been queued
to the ThreadPoolExecutor but are not yet fully processed. The
simplest thing
here is a Set into which clients areadded when a message is queued and
removed

Im using NIO for non-blocking read() operations in the IO thread. Thus
one
thread is handling all client accept and read. Im using the Selector
API
in the IO thread for accept/read readiness notification.

I was thinking of using the selection key .attach() method to store
data specific to the client, such as a flag that indicates if a
message
from that client has been submitted to the worker pool. The selection
key
is being passed along with the message to the worker thread. The flag
would then be cleared by the worker thread once is has finished
processing
the message.

Instead of this,

B1. Use a standard BlockingQueue impelmentation for the
ThreadPoolExecutor,
but instead of adding new messages to it directly:
B2. For each client, create a LinkedList of messages to be processed,
which will
eventually be moved to the BlockingQueue, and do the following:

WHEN a message is received from a client
IF the client is currently in the Set described above
THEN
Append that message to that client's LinkedList.
ELSE
Append it directly to the Blocking Queue
Add the client to the Set

WHEN a client finishes processing a message
IF the client's LinkedList is empty
THEN
Remove the client from the Set
ELSE
Move the oldest message from the client's LinkedList to
the BlockingQueue

I been playing with something like your suggestion too. I was thinking
that
I could store any excess messages in the selection key attach() until
the
flag mentioned above was cleared to indicate that the client has no
messages
currently in the queue and no message being processed by a worker
thread.
Like this:

WHEN worker thread finishes processing a message
IF more unsubmitted messages exists attached to the same
clients selection key
THEN
Let the worker thread invoke wakeup() on the IO Thread
selector
so the IO thread can submit the next message.

But wakeup() is given me a headache. Basically wakeup() interrupts a
blocked select()
invocation in the IO thread and if the IO thread is not currently
blocked then the
next invocation of select() will return right way. If a lot of worker
threads are
bombarding the IO thread with wakeup() then the IO thread will be
starved of time
to do readiness notification and the queue feeding the worker pool
would be starved.
Thus I would get a fluctuating pattern where reading is starved until
the worker threads
have no more jobs and stop invoking wakeup() at which point the queue
is filled up again,
the worker threads go back to work and the IO is starved again...

Maybe you were thinking to let the worker thread submit the next
unprocessed message
themselves if the same client has more messages in the linked list?
This would avoid the
wakeup() but im not too keen on having anyone else but the IO thread
dispatch jobs to the
worker pool. It complicates the design when different types of threads
dispatch jobs - but
it would give me an undisturbed flow of messages to the worker pool.

Wakeup() is unfortunately also haunting me when I need to write data
using non-blocking writes
through the Selector API. If a worker thread wants to write data back
to a client then
I need to submit the return message back to the IO thread so it can
write the data in a
non-blocking fashion. If I let the worker thread write data by itself
it would effectively
block the thread even if it uses a Selector since the worker thread
really cant pick another
job from the queue while doing readiness selection on a previous write
operation. If the
worker thread wakeup() the IO thread to get a write operation going
then again im starving the
readiness notification in the IO thread since a lot of worker threads
will want to write data
and the IO thread would thus be bombarded with wakeup().
If I create a dedicated IO thread for write operations only I wouldnt
starve the accept/read
operations of the other IO thread but wakeup() to signal the write
thread to poll()
messages sent to it for write operations would still starve it of
write readiness notifications.
I could try blocking the IO thread using select(some short interval)
so no wakeup() is needed
but i would hate to have this artificial time delay on write
operations.
 
M

Mike Schilling

meselfo said:
Maybe you were thinking to let the worker thread submit the next
unprocessed message
themselves if the same client has more messages in the linked list?
This would avoid the
wakeup() but im not too keen on having anyone else but the IO thread
dispatch jobs to the
worker pool. It complicates the design when different types of threads
dispatch jobs - but
it would give me an undisturbed flow of messages to the worker pool.

Why does it complicate the design? The BlockingQueue is designed to accept
items from many threads; just take a bit of care with the synchronization.

As to the output side, I don't have enough experience with NIO to make any
sensible suggestions.
 
Joined
Sep 11, 2011
Messages
1
Reaction score
0
Quite an interesting problem, I stumbled over this post when I was looking for a solution to exactly the same problem: Dispatching several runnables to a threadpool, but forcing some of these runnables to be executed in strict order.

As suggested here, I tried to use a ThreadPoolExecutor with a custom blocking queue. However, it turned out that sun's implementation bypasses the queue if there are idling threads in the pool. I ended up writing my own ThreadPool with a custom Queue. Works like a charm :)

For those who might find this thread as well: I've uploaded the source to https://github.com/EarlOfWenc/snippets/tree/master/java/DependencyThreadPool
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,744
Messages
2,569,482
Members
44,901
Latest member
Noble71S45

Latest Threads

Top