producer/consumer remove problem

Discussion in 'Java' started by Jeff, Oct 21, 2004.

  1. Jeff

    Jeff Guest

    For a specific producer to distribute to several consumers , I have a simple
    extension of HashSet of consumers. However, a problem sometimes occurs
    while a consumer is removing himself from the HashSet.

    The problem occurs when distribute() calls the .eventObserved() method of
    the consumer that is trying to remove himself. The .eventObserved() method
    never returns. I think that is because the consumer's thread is waiting in
    ConsumerSet's remove(). The producing caller and consumer are always
    different threads.

    To solve the problem, I'm considering having remove() spawn a thread to do
    the remove so that remove does not wait on synchronized. distribute() is
    called A LOT, but remove() is called rarely.

    Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
    Is there a better solution, or at least more Java-like?

    public class ConsumerSet extends HashSet {
    public void distribute(Object message) {
    synchronized( this ) {
    for (Iterator i = iterator(); i.hasNext();)
    ((Consumer) i.next()).eventObserved(message);
    }
    }

    public void add( Consumer consumer ) {
    synchronized (this) {
    super.add( consumer );
    }
    }

    public void remove( Consumer consumer ) {
    synchronized ( this ) {
    super.remove( consumer );
    }
    }
    } // ConsumerSet

    // usually be an extension of Thread
    public interface Consumer {
    /**
    * Invoked when an event is observed
    */
    public void eventObserved( Object toProcess );
    }



    --
    Jeff
     
    Jeff, Oct 21, 2004
    #1
    1. Advertising

  2. Jeff

    xarax Guest

    "Jeff" <> wrote in message
    news:8tSdd.6713$Ug4.1382@trndny01...
    > For a specific producer to distribute to several consumers , I have a simple
    > extension of HashSet of consumers. However, a problem sometimes occurs
    > while a consumer is removing himself from the HashSet.
    >
    > The problem occurs when distribute() calls the .eventObserved() method of
    > the consumer that is trying to remove himself. The .eventObserved() method
    > never returns. I think that is because the consumer's thread is waiting in
    > ConsumerSet's remove(). The producing caller and consumer are always
    > different threads.


    That's your intent, but likely not what you implemented.

    > To solve the problem, I'm considering having remove() spawn a thread to do
    > the remove so that remove does not wait on synchronized. distribute() is
    > called A LOT, but remove() is called rarely.
    >
    > Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
    > Is there a better solution, or at least more Java-like?
    >
    > public class ConsumerSet extends HashSet {
    > public void distribute(Object message) {
    > synchronized( this ) {
    > for (Iterator i = iterator(); i.hasNext();)
    > ((Consumer) i.next()).eventObserved(message);
    > }
    > }
    >
    > public void add( Consumer consumer ) {
    > synchronized (this) {
    > super.add( consumer );
    > }
    > }
    >
    > public void remove( Consumer consumer ) {
    > synchronized ( this ) {
    > super.remove( consumer );
    > }
    > }
    > } // ConsumerSet
    >
    > // usually be an extension of Thread
    > public interface Consumer {
    > /**
    > * Invoked when an event is observed
    > */
    > public void eventObserved( Object toProcess );
    > }


    This is likely your problem. You seem to have one
    thread calling eventObserved(Object) that is defined
    in the Thread instance of another thread. Of course,
    you should know that that will not cause eventObserved()
    method to run under the other thread. The other thread
    must itself call eventObserved().

    Your design is likely flawed.

    If you have a producer thread and multiple consumer
    threads, then you need something like an event queue.
    The producer thread places a node onto the queue. Other
    consumer threads are waiting for a node to appear on
    the queue. One of the consumer threads will pull the
    node off of the queue and process it.

    The Observer/Observed pattern doesn't work for multiple
    threads. You need a synchronized event queue where the
    consumer threads will wait() until the queue is non-empty,
    and the producer thread will notifyAll() when it puts a
    new node onto the queue.

    If you need some mutex classes, you can download the
    source at http://mindprod.com/products.html, look for
    the Mutex download.

    Also, J2SE 5.0 has new interfaces and classes that are
    very similar to the mutex download (see above).

    Hope this helps.

    --
    ----------------------------
    Jeffrey D. Smith
    Farsight Systems Corporation
    24 BURLINGTON DRIVE
    LONGMONT, CO 80501-6906
    http://www.farsight-systems.com
    z/Debug debugs your Systems/C programs running on IBM z/OS for FREE!
     
    xarax, Oct 21, 2004
    #2
    1. Advertising

  3. Jeff wrote:

    > For a specific producer to distribute to several consumers , I have a simple
    > extension of HashSet of consumers. However, a problem sometimes occurs
    > while a consumer is removing himself from the HashSet.


    I don't see the point of extending HashSet here. I think you are
    placing responsibilities on your extended version that more properly
    belong on the producer object.

    > The problem occurs when distribute() calls the .eventObserved() method of
    > the consumer that is trying to remove himself. The .eventObserved() method
    > never returns. I think that is because the consumer's thread is waiting in
    > ConsumerSet's remove().


    It is conceivable that you are getting deadlocks this way, but it would
    depend on how the consumer's eventObserved() method was written. If I
    interpret your code and comments rightly, then eventObserved() will be
    executed by the producer's thread, whereas ConsumerSet.remove() will be
    executed by the consumer's thread.

    > The producing caller and consumer are always
    > different threads.


    This is causing you confusion. Do not put application logic into Thread
    subclasses; use Runnables instead. This gives you a more consistent
    object model in the first place, but more importantly, it tends to
    reduce confusion about who can do what, when, and to whom.

    > To solve the problem, I'm considering having remove() spawn a thread to do
    > the remove so that remove does not wait on synchronized. distribute() is
    > called A LOT, but remove() is called rarely.


    No, don't. Throwing more threads at a synchronization problem just
    makes for a messier synchronization problem.

    > Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
    > Is there a better solution, or at least more Java-like?
    >
    > public class ConsumerSet extends HashSet {
    > public void distribute(Object message) {
    > synchronized( this ) {
    > for (Iterator i = iterator(); i.hasNext();)
    > ((Consumer) i.next()).eventObserved(message);
    > }
    > }


    The distribute() method belongs on your producer object, not on the set.
    The set should not be exposed directly to consumer objects; instead
    they should register themselves with the producer, which will add them
    to the Set. This obviates any need for type-safe add() and remove()
    methods, so this whole class becomes superfluous.

    > public void add( Consumer consumer ) {
    > synchronized (this) {
    > super.add( consumer );
    > }
    > }
    >
    > public void remove( Consumer consumer ) {
    > synchronized ( this ) {
    > super.remove( consumer );
    > }
    > }
    > } // ConsumerSet
    >
    > // usually be an extension of Thread


    Should NOT be an extension of Thread. Probably should not even be an
    implementation of Runnable. May be an object shared between the
    producer thread and some other thread.

    > public interface Consumer {
    > /**
    > * Invoked when an event is observed
    > */
    > public void eventObserved( Object toProcess );
    > }


    Your producer should look something like this:

    public class Producer implements Runnable {

    private Set consumerSet = new HashSet();

    public void registerConsumer(Consumer c) {
    synchronized (consumerSet) {
    consumerSet.add(c);
    }
    }

    public void unregisterConsumer(Consumer c) {
    synchronized (consumerSet) {
    consumerSet.remove(c);
    }
    }

    protected void fireEvent(Object event) {
    synchronized (consumerSet) {
    for (Iterator it = consumerSet.iterator(); it.next(); ) {
    ((Consumer) it.next()).eventObserved(event);
    }
    }
    }

    public void run() {
    // do stuff that ends up invoking fireEvent() periodically
    }
    }

    (Rather resembles your ConsumerSet, doesn't it?)

    Encapsulating the consumer set in this way prevents any unexpected
    synchronization on it that might cause deadlock. Your documentation for
    the Consumer interface should remark that its eventObserved() method
    must execute quickly and that the scope of its execution must not
    include any attempt to unregister the consumer (which would not
    deadlock, but might throw a ConcurrentModificationException).

    You should also keep in mind that registering and unregistering event
    listeners (err... consumers) can block on completion of fireEvent(), and
    therefore no thread should invoke registerConsumer() or
    unregisterConsumer() while holding the monitor for an object that the
    relevant Consumer's eventObserved() method needs to lock. [That is very
    likely what is causing your deadlock now.] In particular, it may not be
    possible to prevent a Consumer from observing events after a request to
    unregister it has been dispatched (but before the unregisterConsumer()
    returns).


    John Bollinger
     
    John C. Bollinger, Oct 21, 2004
    #3
  4. Jeff

    Jeff Guest

    Thanks for the thoughtful response. I need to clarify the problem.

    ALL consumers must consume the object. It's really a network multicast
    requirement in software. The queue implementation fails to achieve this
    requirement because the first consumer dequeues the object. The second
    consumer also needs the object, but it's gone from the queue.

    I considered the 1.5 enhancements, but I would rather not move to 1.5 yet.


    "xarax" <> wrote in message
    news:x3Udd.3405$%...
    > "Jeff" <> wrote in message
    > news:8tSdd.6713$Ug4.1382@trndny01...
    > > For a specific producer to distribute to several consumers , I have a

    simple
    > > extension of HashSet of consumers. However, a problem sometimes occurs
    > > while a consumer is removing himself from the HashSet.
    > >
    > > The problem occurs when distribute() calls the .eventObserved() method

    of
    > > the consumer that is trying to remove himself. The .eventObserved()

    method
    > > never returns. I think that is because the consumer's thread is waiting

    in
    > > ConsumerSet's remove(). The producing caller and consumer are always
    > > different threads.

    >
    > That's your intent, but likely not what you implemented.
    >
    > > To solve the problem, I'm considering having remove() spawn a thread to

    do
    > > the remove so that remove does not wait on synchronized. distribute() is
    > > called A LOT, but remove() is called rarely.
    > >
    > > Because I use ConsumerSet extensively, I'd like to get some wiser

    opinions.
    > > Is there a better solution, or at least more Java-like?
    > >
    > > public class ConsumerSet extends HashSet {
    > > public void distribute(Object message) {
    > > synchronized( this ) {
    > > for (Iterator i = iterator(); i.hasNext();)
    > > ((Consumer) i.next()).eventObserved(message);
    > > }
    > > }
    > >
    > > public void add( Consumer consumer ) {
    > > synchronized (this) {
    > > super.add( consumer );
    > > }
    > > }
    > >
    > > public void remove( Consumer consumer ) {
    > > synchronized ( this ) {
    > > super.remove( consumer );
    > > }
    > > }
    > > } // ConsumerSet
    > >
    > > // usually be an extension of Thread
    > > public interface Consumer {
    > > /**
    > > * Invoked when an event is observed
    > > */
    > > public void eventObserved( Object toProcess );
    > > }

    >
    > This is likely your problem. You seem to have one
    > thread calling eventObserved(Object) that is defined
    > in the Thread instance of another thread. Of course,
    > you should know that that will not cause eventObserved()
    > method to run under the other thread. The other thread
    > must itself call eventObserved().
    >
    > Your design is likely flawed.
    >
    > If you have a producer thread and multiple consumer
    > threads, then you need something like an event queue.
    > The producer thread places a node onto the queue. Other
    > consumer threads are waiting for a node to appear on
    > the queue. One of the consumer threads will pull the
    > node off of the queue and process it.
    >
    > The Observer/Observed pattern doesn't work for multiple
    > threads. You need a synchronized event queue where the
    > consumer threads will wait() until the queue is non-empty,
    > and the producer thread will notifyAll() when it puts a
    > new node onto the queue.
    >
    > If you need some mutex classes, you can download the
    > source at http://mindprod.com/products.html, look for
    > the Mutex download.
    >
    > Also, J2SE 5.0 has new interfaces and classes that are
    > very similar to the mutex download (see above).
    >
    > Hope this helps.
    >
    > --
    > ----------------------------
    > Jeffrey D. Smith
    > Farsight Systems Corporation
    > 24 BURLINGTON DRIVE
    > LONGMONT, CO 80501-6906
    > http://www.farsight-systems.com
    > z/Debug debugs your Systems/C programs running on IBM z/OS for FREE!
    >
    >
    >
     
    Jeff, Oct 21, 2004
    #4
  5. Jeff

    xarax Guest

    "Jeff" <> wrote in message
    news:zDVdd.17078$fP3.74@trndny05...
    > "xarax" <> wrote in message
    > news:x3Udd.3405$%...
    > > "Jeff" <> wrote in message
    > > news:8tSdd.6713$Ug4.1382@trndny01...
    > > > For a specific producer to distribute to several consumers , I have a

    > simple
    > > > extension of HashSet of consumers. However, a problem sometimes occurs
    > > > while a consumer is removing himself from the HashSet.
    > > >
    > > > The problem occurs when distribute() calls the .eventObserved() method

    > of
    > > > the consumer that is trying to remove himself. The .eventObserved()

    > method
    > > > never returns. I think that is because the consumer's thread is waiting

    > in
    > > > ConsumerSet's remove(). The producing caller and consumer are always
    > > > different threads.

    > >
    > > That's your intent, but likely not what you implemented.
    > >
    > > > To solve the problem, I'm considering having remove() spawn a thread to

    > do
    > > > the remove so that remove does not wait on synchronized. distribute() is
    > > > called A LOT, but remove() is called rarely.
    > > >
    > > > Because I use ConsumerSet extensively, I'd like to get some wiser

    > opinions.
    > > > Is there a better solution, or at least more Java-like?
    > > >
    > > > public class ConsumerSet extends HashSet {
    > > > public void distribute(Object message) {
    > > > synchronized( this ) {
    > > > for (Iterator i = iterator(); i.hasNext();)
    > > > ((Consumer) i.next()).eventObserved(message);
    > > > }
    > > > }
    > > >
    > > > public void add( Consumer consumer ) {
    > > > synchronized (this) {
    > > > super.add( consumer );
    > > > }
    > > > }
    > > >
    > > > public void remove( Consumer consumer ) {
    > > > synchronized ( this ) {
    > > > super.remove( consumer );
    > > > }
    > > > }
    > > > } // ConsumerSet
    > > >
    > > > // usually be an extension of Thread
    > > > public interface Consumer {
    > > > /**
    > > > * Invoked when an event is observed
    > > > */
    > > > public void eventObserved( Object toProcess );
    > > > }

    > >
    > > This is likely your problem. You seem to have one
    > > thread calling eventObserved(Object) that is defined
    > > in the Thread instance of another thread. Of course,
    > > you should know that that will not cause eventObserved()
    > > method to run under the other thread. The other thread
    > > must itself call eventObserved().
    > >
    > > Your design is likely flawed.
    > >
    > > If you have a producer thread and multiple consumer
    > > threads, then you need something like an event queue.
    > > The producer thread places a node onto the queue. Other
    > > consumer threads are waiting for a node to appear on
    > > the queue. One of the consumer threads will pull the
    > > node off of the queue and process it.
    > >
    > > The Observer/Observed pattern doesn't work for multiple
    > > threads. You need a synchronized event queue where the
    > > consumer threads will wait() until the queue is non-empty,
    > > and the producer thread will notifyAll() when it puts a
    > > new node onto the queue.
    > >
    > > If you need some mutex classes, you can download the
    > > source at http://mindprod.com/products.html, look for
    > > the Mutex download.
    > >
    > > Also, J2SE 5.0 has new interfaces and classes that are
    > > very similar to the mutex download (see above).
    > >
    > > Hope this helps.

    > Thanks for the thoughtful response. I need to clarify the problem.
    >
    > ALL consumers must consume the object. It's really a network multicast
    > requirement in software. The queue implementation fails to achieve this
    > requirement because the first consumer dequeues the object. The second
    > consumer also needs the object, but it's gone from the queue.
    >
    > I considered the 1.5 enhancements, but I would rather not move to 1.5 yet.
    >
    >

    Please don't top post.

    It would seem that the consumable object must be posted
    to each and every consumer queue via the distribute()
    method. Then each consumer thread can process the consumable.

    Of course, by now you understand that extending Thread
    to add the eventObserved() method is worse than useless,
    because it clouds the semantics of your design, doesn't
    offer any functionality to the Thread class, and can
    confuse the casual reader into thinking that somehow
    the target thread is processing the method call.
     
    xarax, Oct 22, 2004
    #5
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Mark McKay
    Replies:
    0
    Views:
    456
    Mark McKay
    Dec 9, 2003
  2. Buck Turgidson

    Simple Producer/Consumer Thread Question

    Buck Turgidson, Feb 17, 2004, in forum: Java
    Replies:
    5
    Views:
    549
    Tony Dahlman
    Feb 21, 2004
  3. George Sakkis

    Producer-consumer threading problem

    George Sakkis, Jun 11, 2008, in forum: Python
    Replies:
    5
    Views:
    473
    George Sakkis
    Jun 12, 2008
  4. Wu Zhe
    Replies:
    2
    Views:
    516
    Piet van Oostrum
    May 27, 2009
  5. Michael Malone

    Producer-Consumer problem

    Michael Malone, Mar 16, 2009, in forum: Ruby
    Replies:
    0
    Views:
    227
    Michael Malone
    Mar 16, 2009
Loading...

Share This Page