Stumped. How to add a limit to a LinkedBlockingQueue

B

Bogus Exception

First of all, thanks to all that have helped this beginning Java
programmer with my questions in the past. I'm hoping that once again
the experts here can save my butt!

I have a working implementation using LinkedBlockingQueues in a
producer to multiple consumers model. I have successfully crashed on
heap memory without much trouble, though. I've written all the offer()
and poll() routines to have timouts when blocked. Problem is, I can't
figure out how to make the LinkedBlockingQueues block!

The queues are passing lists of objects between them. A list can have
any number of objects in them, but at least 1. So, starting at the
basics and building up, an object is defined as having an interface:

public interface AnObjectInterface {
public HashMap<String, String> hmTime = new HashMap<String, String>();
}

...and the object being:

public class AnObject implements Serializable, AnObjectInterface {
static final long serialVersionUID = 1; // needed for serializable..
public LinkedHashMap<String, String> hmTime = new
LinkedHashMap<String, String>(); // time, comment
}

hmTime is written to by each producer & consumer, like a log. So the
class that is the list of objects is:

public class ObjectList implements Externalizable {
static final long serialVersionUID = 1;
public ObjectList(){}
/*
* This LinkedList holds the actual objects
*/
public LinkedList<AnObject> objectList = new LinkedList<AnObject>();

[methods writeExternal and readExternal mercifully ommitted :)]
}

Now as we're working up the heirarchy, this class is used to create the
actual queues that are given to the producers and consumers:

public class ObjectListQueue extends LinkedBlockingQueue<ObjectList> {
static final long serialVersionUID = 1;
}

Messing with the above class for 2 days to constrain the size of the
LinkedBlockingQueue has created zero joy!

So those are all the actors (except for the actual producers and
consumers). So the following is currently used to create the queues:

(In this simple example-which works-there is an object generator, a
single interim consumer/producer, and a final consumer. P = producer,
CP = producer/consumer, C = consumer)

P -> CP -> C

[...]
ObjectListQueue qCPin = new ObjectListQueue(), qCin = new
ObjectListQueue();
// outbound queues (arrays, with each destination as an element)
ObjectListQueue[] aGout = {qCPin}, aCPout = {qCin};
[...]

Now all that is left to hand things off to the players:

ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ObjectListGenerator(aGout));
exec.execute(new ObjectListConsumerProducer(qCPin, aCPout));
exec.execute(new ObjectListConsumer(qCin));

Thats it. It all works great. Problem is when either the
ObjectListConsumerProducer or ObjectListConsumer process (consume)
slowly, the queue gradually grows (really?!). When the heap gets down
to about 470k, the queues crash. FYI, no command line changes except
"-server". This equates to about 13,000 lists of 10 objects each in a
single LinkedBlockingQueue.

So I guess I'm beating my head on the keyboard trying to figure out how
to make the blocking queues block! The error might be in the way I've
got the whole app structured, or in how I'm initializing the
LinkedBlockingQueues. I've tried all I can think of, including
generics, but probably poorly as I feel I'm sitting here with all my
references opened up jumping from one approach to another blindly,
hoping one will show some promise.

Please forgive me if there are subtle typos, as I reduced the actual
code to only include those sections of interest, and may have
accidentally made a typo. I hope its obvious that the code does in fact
work.

Thanks VERY much in advance!

Bogus Exception
 
P

Patricia Shanahan

Bogus Exception wrote:
....
Thats it. It all works great. Problem is when either the
ObjectListConsumerProducer or ObjectListConsumer process (consume)
slowly, the queue gradually grows (really?!). When the heap gets down
to about 470k, the queues crash. FYI, no command line changes except
"-server". This equates to about 13,000 lists of 10 objects each in a
single LinkedBlockingQueue.
....

What does the LinkedBlockingQueue constructor call look like? It will be
the super call at the start of your ObjectListQueue constructor. The
important thing is whether it specifies a reasonable capacity.

Patricia
 
B

Bogus Exception

Patricia,

Thanks for writing!

Patricia said:
What does the LinkedBlockingQueue constructor call look like? It will be
the super call at the start of your ObjectListQueue constructor. The
important thing is whether it specifies a reasonable capacity.

The only connection to the LinkedBlockingQueue class is (from above):

public class ObjectListQueue extends LinkedBlockingQueue<ObjectList> {
static final long serialVersionUID = 1;
}

Instances that have LinkedBlockingQueue properties are instantiated
from the ObjectListQueue class. An example is:

ObjectListQueue qCPin = new ObjectListQueue(), qCin = new
ObjectListQueue();

...which is from the static class first run in the app. This approach is
from Bruce Eckel's TIJ4, page 1218. Does taking this approach mean that
a queue size can't be specified?

TIA!

Bogus Exception
 
P

Patricia Shanahan

Bogus said:
Patricia,

Thanks for writing!



The only connection to the LinkedBlockingQueue class is (from above):

public class ObjectListQueue extends LinkedBlockingQueue<ObjectList> {
static final long serialVersionUID = 1;
}

Instances that have LinkedBlockingQueue properties are instantiated
from the ObjectListQueue class. An example is:

ObjectListQueue qCPin = new ObjectListQueue(), qCin = new
ObjectListQueue();

..which is from the static class first run in the app. This approach is
from Bruce Eckel's TIJ4, page 1218. Does taking this approach mean that
a queue size can't be specified?

What does the ObjectListQueue constructor look like? Not the calls to
it, its implementation.

Patricia
 
B

Bogus Exception

Patricia,

Thanks so much for writing!

Patricia said:
What does the ObjectListQueue constructor look like? Not the calls to
it, its implementation.

(from above) I instantiate these objects in the top level class with:

[...]
ObjectListQueue qCPin = new ObjectListQueue(), qCin = new
ObjectListQueue();
// outbound queues (arrays, with each destination as an element)
ObjectListQueue[] aGout = {qCPin}, aCPout = {qCin};
[...]

Where input queues are single (multiple producers can talk to a single
consumer), and output queues are arrays (a single producer can send to
pultiple consumers).

Is that clear? Sorry if it looks confusing! :)

TIA!

Bogus Exception
 
P

Patricia Shanahan

Bogus said:
Patricia,

Thanks so much for writing!

Patricia said:
What does the ObjectListQueue constructor look like? Not the calls to
it, its implementation.

(from above) I instantiate these objects in the top level class with:

[...]
ObjectListQueue qCPin = new ObjectListQueue(), qCin = new
ObjectListQueue();
// outbound queues (arrays, with each destination as an element)
ObjectListQueue[] aGout = {qCPin}, aCPout = {qCin};
[...]

Where input queues are single (multiple producers can talk to a single
consumer), and output queues are arrays (a single producer can send to
pultiple consumers).

Is that clear? Sorry if it looks confusing! :)

TIA!

Bogus Exception

I'm still not getting the critical data, although the fact that it is
invisible to you probably gives the answer.

In the implementation of ObjectListQueue there is a constructor, either
explicit or default. If you did not provide one yourself, or if you
provide one but it does not contain an explicit "super" call, it will
call the parameterless constructor for the immediate superclass.

In this case, the parameterless LinkedBlockingQueue sets capacity
Integer.MAX_VALUE, which is a good idea if, and only if, something else
is limiting the number of items on the queue.

LinkedBlockingQueue also has a constructor that takes a capacity as
parameter. You need to call that constructor from the ObjectListQueue
constructor, which means you must have an explicit ObjectListQueue
constructor, not just a compiler-supplied default.

Patricia
 
B

Bogus Exception

Patricia,

Thanks again for writing. Your assistance is great!

In order for me to use a direct instantiation of LinkedBlockingQueue, I
can't use an array. This is true with or without an imposed queue
limit. This is why this approach was needed.

For example, this is OK:

final int QUEUE_SIZE = 10;
LinkedBlockingQueue<ObjectList>
qCPin = new LinkedBlockingQueue<ObjectList>(QUEUE_SIZE),
qECin = new LinkedBlockingQueue<ObjectList>(QUEUE_SIZE);

Which is great for inbound queues, which LinkedBlockingQueue allows
multiple producers by default. The trouble starts when we try:

LinkedBlockingQueue<ObjectList>[] aGout = new
LinkedBlockingQueue<ObjectList>[1];

...or even:

LinkedBlockingQueue<ObjectList>[] aGout = {qCPin};

Either yields the same error:
Cannot create a generic array of LinkedBlockingQueue<ObjectList>

The only way I found around this problem was the code you saw in my
first post, which simply inheritted from LinkedBlockingQueue. Since I
need an array of queues (one producer can send to many consumers), the
standard approaches I've seen on the net don't work (that is, where
there is one producer and one consumer). Eckel's TIJ4 code was the only
example that worked with arrays.

I hope I'm explaining this in a way that makes sense.

TIA!!!

Bogus Exception
 
P

Patricia Shanahan

Bogus said:
Patricia,

Thanks again for writing. Your assistance is great!

In order for me to use a direct instantiation of LinkedBlockingQueue, I
can't use an array. This is true with or without an imposed queue
limit. This is why this approach was needed.

For example, this is OK:

final int QUEUE_SIZE = 10;
LinkedBlockingQueue<ObjectList>
qCPin = new LinkedBlockingQueue<ObjectList>(QUEUE_SIZE),
qECin = new LinkedBlockingQueue<ObjectList>(QUEUE_SIZE);

Which is great for inbound queues, which LinkedBlockingQueue allows
multiple producers by default. The trouble starts when we try:

LinkedBlockingQueue<ObjectList>[] aGout = new
LinkedBlockingQueue<ObjectList>[1];

..or even:

LinkedBlockingQueue<ObjectList>[] aGout = {qCPin};

Either yields the same error:
Cannot create a generic array of LinkedBlockingQueue<ObjectList>

The only way I found around this problem was the code you saw in my
first post, which simply inheritted from LinkedBlockingQueue. Since I
need an array of queues (one producer can send to many consumers), the
standard approaches I've seen on the net don't work (that is, where
there is one producer and one consumer). Eckel's TIJ4 code was the only
example that worked with arrays.

Take a look at this, and see if it helps:

import java.util.concurrent.LinkedBlockingQueue;

public class ArrayTest {

public static void main(String[] args) {
MyQueue[] quarray = new MyQueue[5];
quarray[2] = new MyQueue();
for(int i=0; i<7; i++){
System.out.println(quarray[2].offer("Element "+i));
}
}

static class MyQueue extends LinkedBlockingQueue<String>{
MyQueue(){
super(5);
}
}

}

Obviously, you can use any number you like instead of "5".

Patricia
 
B

Bogus Exception

Patricia,

Thanks so much for exchanging these emails with me.

Apparently the solution is as you specified. I added:

MyQueue(){
super(5);
}

To the class that extends LinkedBlockingQueue, and that was all the
change I needed to make.

Thanks so much for your assistance! I can now go back to programming!

Bogus Exception
 
P

Patricia Shanahan

Bogus said:
Patricia,

Thanks so much for exchanging these emails with me.

Apparently the solution is as you specified. I added:

MyQueue(){
super(5);
}

To the class that extends LinkedBlockingQueue, and that was all the
change I needed to make.

Thanks so much for your assistance! I can now go back to programming!

Remember to tune the capacity at some point. I would be very surprised
if 5 is the right choice. It may couple the producer and consumer too
tightly, so that you waste time on context switches.

Patricia
 
B

Bogus Exception

Patricia said:
Remember to tune the capacity at some point. I would be very surprised
if 5 is the right choice. It may couple the producer and consumer too
tightly, so that you waste time on context switches.

Patricia, thanks _again_ for helping out. It has really saved me a lot
of headache!

As for the limit, I've moved it to various numbers to test. Putting the
count low allows me to test blocking handling. I never would have
thought to access the default constructor of the superclass called via
extends via a constructor. That is a very elegant solution, and didn't
necessitate re-engineering anything!

Since the queue capacity can't be changed after instantiation (wish it
could), I have to figure out the right amount at initial program
compile time. It would certainly be nice to adjust on the fly. I know I
can drain/add members in a queue to another, adjusted, queue, but that
might be just too complicated a solution. I'll get by for a while by
simply aiming low at runtime, since my program doesn't know ahead of
time how many queues will be needed.

Thanks again for your help!

Bogus Exception
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,770
Messages
2,569,583
Members
45,074
Latest member
StanleyFra

Latest Threads

Top