Pattern for queue + thread pool with in-order processing?

Discussion in 'Java' started by meselfo, Mar 22, 2009.

  1. meselfo

    meselfo Guest

    Im writing a server that accepts connections from multiple network
    clients. Messages from these client are submitted as tasks to worker
    threads through a bounded queue. Messages coming from the _SAME_
    client needs to be processed in the order they were received from that
    client. How do I enforce this requirement?

    Im using ThreadPoolExecutor from the java api which has a built in
    queue for submitting tasks. If I stamp messages from the same client
    with a sequence number and make worker threads aware of this number so
    that they pause processing a message if the number indicates that
    another message from the same client should finish processing first
    then i'm effectively blocking that worker thread - thus a client
    sending many messages fast could reduce the number of effective worker
    threads. There must be some sort of pattern on how to process some
    tasks submitted to a pool of worker thread in order??
     
    meselfo, Mar 22, 2009
    #1
    1. Advertising

  2. hi there,

    "meselfo" <> wrote in message
    > Im writing a server that accepts connections from multiple network
    > clients. Messages from these client are submitted as tasks to worker
    > threads through a bounded queue. Messages coming from the _SAME_
    > client needs to be processed in the order they were received from that
    > client. How do I enforce this requirement?
    >
    > [snip]
    >

    If you can assure to have a finite number of different clients or expect it
    to be reasonably bounded then I think a good solution would be to dedicate a
    single Queue per client. You would insert in the appropriate Queue depending
    on the Client ID or e.g. in JMS terms a Correlation ID. I think you have a
    very good use-case for using the ArrayBlockingQueue implementation:
    http://java.sun.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html

    With this setup I think you will have minimal contention since multiple
    threads will consume from multiple orders without having to worry about
    breaking the per-client-ordering invariant. However, I would be curious as
    to whether there are better performing alternatives.

    HTH,
    Best regards,
    Giovanni
     
    Giovanni Azua, Mar 22, 2009
    #2
    1. Advertising

  3. meselfo wrote:
    > Im writing a server that accepts connections from multiple network
    > clients. Messages from these client are submitted as tasks to worker
    > threads through a bounded queue. Messages coming from the _SAME_
    > client needs to be processed in the order they were received from
    > that
    > client. How do I enforce this requirement?
    >
    > Im using ThreadPoolExecutor from the java api which has a built in
    > queue for submitting tasks. If I stamp messages from the same client
    > with a sequence number and make worker threads aware of this number
    > so
    > that they pause processing a message if the number indicates that
    > another message from the same client should finish processing first
    > then i'm effectively blocking that worker thread - thus a client
    > sending many messages fast could reduce the number of effective
    > worker
    > threads.


    Right, you don't want to change how the worker threads execute. You
    want to change the order in which messages are pulled off the queue.

    > There must be some sort of pattern on how to process some
    > tasks submitted to a pool of worker thread in order??


    There are two things you need to ensure:

    1. No messages from client A get processed as long as any other
    messages from client A are still being processed.
    2. When it's safe to process another message from client A, process
    the earliest one.

    You get 2 for free, as it's how queues work. All you really need to
    do is:

    A. Keep data about which clients have messages currently being
    processed. The simplest thing here is a Set into which clients are
    added when a job starts and removed when it ends.
    B. Create a queue implementation that skips messages whose clients are
    in the set from part A. That is, if messages from C1 and C2, but not
    C3, are currently being processed, and the queue looks like

    C1.5, C1.6, C2.4, C3.6

    the next item returned would be C3.6. Now, if the C1 processing
    completes, the next item returned will be C1.5.

    The tricky bit here, I think, will be implementing your queue's poll()
    method, since it needs to check whether to complete both when

    i. a new item is added to the queue, and
    ii. message-processing completes (i.e. a client is removed from the
    Set), which may make an existing queue item newly eligible to be
    returned.
     
    Mike Schilling, Mar 22, 2009
    #3
  4. Mike Schilling wrote:
    > meselfo wrote:
    >> Im writing a server that accepts connections from multiple network
    >> clients. Messages from these client are submitted as tasks to
    >> worker
    >> threads through a bounded queue. Messages coming from the _SAME_
    >> client needs to be processed in the order they were received from
    >> that
    >> client. How do I enforce this requirement?
    >>
    >> Im using ThreadPoolExecutor from the java api which has a built in
    >> queue for submitting tasks. If I stamp messages from the same
    >> client
    >> with a sequence number and make worker threads aware of this number
    >> so
    >> that they pause processing a message if the number indicates that
    >> another message from the same client should finish processing first
    >> then i'm effectively blocking that worker thread - thus a client
    >> sending many messages fast could reduce the number of effective
    >> worker
    >> threads.

    >
    > Right, you don't want to change how the worker threads execute. You
    > want to change the order in which messages are pulled off the queue.
    >
    >> There must be some sort of pattern on how to process some
    >> tasks submitted to a pool of worker thread in order??

    >
    > There are two things you need to ensure:
    >
    > 1. No messages from client A get processed as long as any other
    > messages from client A are still being processed.
    > 2. When it's safe to process another message from client A, process
    > the earliest one.
    >
    > You get 2 for free, as it's how queues work. All you really need to
    > do is:


    Here's a simpler version (since it doesn't require implementing a new
    kind of BlockingQueue)
    >
    > A. Keep data about which clients have messages currently being
    > processed. The simplest thing here is a Set into which clients are
    > added when a job starts and removed when it ends.


    This changes slightly:

    A. Keep data about which clients have messages which have been queued
    to the ThreadPoolExecutor but are not yet fully processed. The
    simplest thing
    here is a Set into which clients areadded when a message is queued and
    removed
    when it is done being processed.

    > B. Create a queue implementation that skips messages whose clients
    > are
    > in the set from part A. That is, if messages from C1 and C2, but
    > not
    > C3, are currently being processed, and the queue looks like
    >
    > C1.5, C1.6, C2.4, C3.6
    >
    > the next item returned would be C3.6. Now, if the C1 processing
    > completes, the next item returned will be C1.5.


    Instead of this,

    B1. Use a standard BlockingQueue impelmentation for the
    ThreadPoolExecutor,
    but instead of adding new messages to it directly:
    B2. For each client, create a LinkedList of messages to be processed,
    which will
    eventually be moved to the BlockingQueue, and do the following:

    WHEN a message is received from a client
    IF the client is currently in the Set described above
    THEN
    Append that message to that client's LinkedList.
    ELSE
    Append it directly to the Blocking Queue
    Add the client to the Set

    WHEN a client finishes processing a message
    IF the client's LinkedList is empty
    THEN
    Remove the client from the Set
    ELSE
    Move the oldest message from the client's LinkedList to
    the BlockingQueue
     
    Mike Schilling, Mar 22, 2009
    #4
  5. meselfo

    meselfo Guest


    > This changes slightly:
    >
    > A. Keep data about which clients have messages which have been queued
    > to the ThreadPoolExecutor but are not yet fully processed. The
    > simplest thing
    > here is a Set into which clients areadded when a message is queued and
    > removed


    Im using NIO for non-blocking read() operations in the IO thread. Thus
    one
    thread is handling all client accept and read. Im using the Selector
    API
    in the IO thread for accept/read readiness notification.

    I was thinking of using the selection key .attach() method to store
    data specific to the client, such as a flag that indicates if a
    message
    from that client has been submitted to the worker pool. The selection
    key
    is being passed along with the message to the worker thread. The flag
    would then be cleared by the worker thread once is has finished
    processing
    the message.


    > Instead of this,
    >
    > B1. Use a standard BlockingQueue impelmentation for the
    > ThreadPoolExecutor,
    > but instead of adding new messages to it directly:
    > B2. For each client, create a LinkedList of messages to be processed,
    > which will
    > eventually be moved to the BlockingQueue, and do the following:
    >
    > WHEN a message is received from a client
    > IF the client is currently in the Set described above
    > THEN
    > Append that message to that client's LinkedList.
    > ELSE
    > Append it directly to the Blocking Queue
    > Add the client to the Set
    >
    > WHEN a client finishes processing a message
    > IF the client's LinkedList is empty
    > THEN
    > Remove the client from the Set
    > ELSE
    > Move the oldest message from the client's LinkedList to
    > the BlockingQueue


    I been playing with something like your suggestion too. I was thinking
    that
    I could store any excess messages in the selection key attach() until
    the
    flag mentioned above was cleared to indicate that the client has no
    messages
    currently in the queue and no message being processed by a worker
    thread.
    Like this:

    WHEN worker thread finishes processing a message
    IF more unsubmitted messages exists attached to the same
    clients selection key
    THEN
    Let the worker thread invoke wakeup() on the IO Thread
    selector
    so the IO thread can submit the next message.

    But wakeup() is given me a headache. Basically wakeup() interrupts a
    blocked select()
    invocation in the IO thread and if the IO thread is not currently
    blocked then the
    next invocation of select() will return right way. If a lot of worker
    threads are
    bombarding the IO thread with wakeup() then the IO thread will be
    starved of time
    to do readiness notification and the queue feeding the worker pool
    would be starved.
    Thus I would get a fluctuating pattern where reading is starved until
    the worker threads
    have no more jobs and stop invoking wakeup() at which point the queue
    is filled up again,
    the worker threads go back to work and the IO is starved again...

    Maybe you were thinking to let the worker thread submit the next
    unprocessed message
    themselves if the same client has more messages in the linked list?
    This would avoid the
    wakeup() but im not too keen on having anyone else but the IO thread
    dispatch jobs to the
    worker pool. It complicates the design when different types of threads
    dispatch jobs - but
    it would give me an undisturbed flow of messages to the worker pool.

    Wakeup() is unfortunately also haunting me when I need to write data
    using non-blocking writes
    through the Selector API. If a worker thread wants to write data back
    to a client then
    I need to submit the return message back to the IO thread so it can
    write the data in a
    non-blocking fashion. If I let the worker thread write data by itself
    it would effectively
    block the thread even if it uses a Selector since the worker thread
    really cant pick another
    job from the queue while doing readiness selection on a previous write
    operation. If the
    worker thread wakeup() the IO thread to get a write operation going
    then again im starving the
    readiness notification in the IO thread since a lot of worker threads
    will want to write data
    and the IO thread would thus be bombarded with wakeup().
    If I create a dedicated IO thread for write operations only I wouldnt
    starve the accept/read
    operations of the other IO thread but wakeup() to signal the write
    thread to poll()
    messages sent to it for write operations would still starve it of
    write readiness notifications.
    I could try blocking the IO thread using select(some short interval)
    so no wakeup() is needed
    but i would hate to have this artificial time delay on write
    operations.
     
    meselfo, Mar 23, 2009
    #5
  6. meselfo wrote:
    >
    > Maybe you were thinking to let the worker thread submit the next
    > unprocessed message
    > themselves if the same client has more messages in the linked list?
    > This would avoid the
    > wakeup() but im not too keen on having anyone else but the IO thread
    > dispatch jobs to the
    > worker pool. It complicates the design when different types of threads
    > dispatch jobs - but
    > it would give me an undisturbed flow of messages to the worker pool.


    Why does it complicate the design? The BlockingQueue is designed to accept
    items from many threads; just take a bit of care with the synchronization.

    As to the output side, I don't have enough experience with NIO to make any
    sensible suggestions.
     
    Mike Schilling, Mar 23, 2009
    #6
  7. meselfo

    Cupdoo

    Joined:
    Sep 11, 2011
    Messages:
    1
    Quite an interesting problem, I stumbled over this post when I was looking for a solution to exactly the same problem: Dispatching several runnables to a threadpool, but forcing some of these runnables to be executed in strict order.

    As suggested here, I tried to use a ThreadPoolExecutor with a custom blocking queue. However, it turned out that sun's implementation bypasses the queue if there are idling threads in the pool. I ended up writing my own ThreadPool with a custom Queue. Works like a charm :)

    For those who might find this thread as well: I've uploaded the source to https://github.com/EarlOfWenc/snippets/tree/master/java/DependencyThreadPool
     
    Cupdoo, Sep 11, 2011
    #7
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    719
    Russell Warren
    Jun 27, 2006
  2. Hugo
    Replies:
    4
    Views:
    1,818
    Logan Shaw
    Mar 27, 2008
  3. testisok
    Replies:
    0
    Views:
    325
    testisok
    Feb 17, 2009
  4. Kris
    Replies:
    0
    Views:
    529
  5. David Karr
    Replies:
    2
    Views:
    225
    Willem
    Apr 7, 2011
Loading...

Share This Page