This changes slightly:
A. Keep data about which clients have messages which have been queued
to the ThreadPoolExecutor but are not yet fully processed. The
simplest thing
here is a Set into which clients areadded when a message is queued and
removed
Im using NIO for non-blocking read() operations in the IO thread. Thus
one
thread is handling all client accept and read. Im using the Selector
API
in the IO thread for accept/read readiness notification.
I was thinking of using the selection key .attach() method to store
data specific to the client, such as a flag that indicates if a
message
from that client has been submitted to the worker pool. The selection
key
is being passed along with the message to the worker thread. The flag
would then be cleared by the worker thread once is has finished
processing
the message.
Instead of this,
B1. Use a standard BlockingQueue impelmentation for the
ThreadPoolExecutor,
but instead of adding new messages to it directly:
B2. For each client, create a LinkedList of messages to be processed,
which will
eventually be moved to the BlockingQueue, and do the following:
WHEN a message is received from a client
IF the client is currently in the Set described above
THEN
Append that message to that client's LinkedList.
ELSE
Append it directly to the Blocking Queue
Add the client to the Set
WHEN a client finishes processing a message
IF the client's LinkedList is empty
THEN
Remove the client from the Set
ELSE
Move the oldest message from the client's LinkedList to
the BlockingQueue
I been playing with something like your suggestion too. I was thinking
that
I could store any excess messages in the selection key attach() until
the
flag mentioned above was cleared to indicate that the client has no
messages
currently in the queue and no message being processed by a worker
thread.
Like this:
WHEN worker thread finishes processing a message
IF more unsubmitted messages exists attached to the same
clients selection key
THEN
Let the worker thread invoke wakeup() on the IO Thread
selector
so the IO thread can submit the next message.
But wakeup() is given me a headache. Basically wakeup() interrupts a
blocked select()
invocation in the IO thread and if the IO thread is not currently
blocked then the
next invocation of select() will return right way. If a lot of worker
threads are
bombarding the IO thread with wakeup() then the IO thread will be
starved of time
to do readiness notification and the queue feeding the worker pool
would be starved.
Thus I would get a fluctuating pattern where reading is starved until
the worker threads
have no more jobs and stop invoking wakeup() at which point the queue
is filled up again,
the worker threads go back to work and the IO is starved again...
Maybe you were thinking to let the worker thread submit the next
unprocessed message
themselves if the same client has more messages in the linked list?
This would avoid the
wakeup() but im not too keen on having anyone else but the IO thread
dispatch jobs to the
worker pool. It complicates the design when different types of threads
dispatch jobs - but
it would give me an undisturbed flow of messages to the worker pool.
Wakeup() is unfortunately also haunting me when I need to write data
using non-blocking writes
through the Selector API. If a worker thread wants to write data back
to a client then
I need to submit the return message back to the IO thread so it can
write the data in a
non-blocking fashion. If I let the worker thread write data by itself
it would effectively
block the thread even if it uses a Selector since the worker thread
really cant pick another
job from the queue while doing readiness selection on a previous write
operation. If the
worker thread wakeup() the IO thread to get a write operation going
then again im starving the
readiness notification in the IO thread since a lot of worker threads
will want to write data
and the IO thread would thus be bombarded with wakeup().
If I create a dedicated IO thread for write operations only I wouldnt
starve the accept/read
operations of the other IO thread but wakeup() to signal the write
thread to poll()
messages sent to it for write operations would still starve it of
write readiness notifications.
I could try blocking the IO thread using select(some short interval)
so no wakeup() is needed
but i would hate to have this artificial time delay on write
operations.