multiple, independent references to collections and blockingqueue iterators

F

falcon

I have a fairly large collection of data (queue), this queue has
several different different consumers which must be able to access the
queue without effecting other consumers' view of the queue. In other
words, if consumerA pops an item off the queue, consumerB should still
see that same item in the list until it pops it off.

Obviously I'll need one primary queue and one 'reference' queue for
each of the consumers. What is the best way to do this? I know I
shouldn't just make a complete copy of the whole thing, I can save
space by having references to objects in the primary queue...wondering
if there is an established design pattern...better yet, existing code!

Secondly, this will take place in a concurrent environment. My
consumers may come and go (that's why the primary queue will need to
retain its data as long as the application is running...never assuming
that no more consumers will need it).

I thought that since BlockingQueue is designed to be used specifically
in a Producer/Consumer scenario, its iterator might solve the problem
for me, but apparently if I get my blocking queue's iterator, and after
that some item is added the queue, my iterator may not reflect the
change (which obviously means that the iterator is useless for
concurrent environments).

Any ideas? Thanks!
 
C

castillo.bryan

You could use an Observer pattern. I called them subscribers in the
pseudo code below. Basically, your producer has a Set of
Observers/Subscribers that receive items when they are produced. Each
Observer/Subscriber would have its own queue. When an item is sent
from a Producer to an Observer/Subscriber, the item is placed on the
Observers local queue, and the observer pulls off of its own local
queue, in its own thread. When the observer's thread has completed, it
can remove itself from the producers list of subscribers.

You may not like the fact that each Observer has its own Queue, but I
think it would be much cleaner that way.


Produer
- Set subscribers
- publishItem(Item i) {
foreach subscriber in (subscribers) {
subscriber.publishItem(item);
}
}

- subscribe(Subscriber subscriber) {
subscribers.add(subscriber);
}

- unsubscribe(Subscriber subscriber) {
subscribers.remove(subscriber);
}

Subscriber
- Producer producer
- BlockingQueue queue

- publishItem (item) {
queue.put(item);
}

- run() {
producer.subscribe(this);
while (!done) {
Item item = queue.take();
doSomethingWithItem(item);
}
producer.unsubscribe(this);
}
 
C

Chris Uppal

falcon said:
I have a fairly large collection of data (queue), this queue has
several different different consumers which must be able to access the
queue without effecting other consumers' view of the queue. In other
words, if consumerA pops an item off the queue, consumerB should still
see that same item in the list until it pops it off.

In that case why not use several queues ?

Obviously I'll need one primary queue and one 'reference' queue for
each of the consumers.

What is the "primary queue" for ?

Secondly, this will take place in a concurrent environment. My
consumers may come and go (that's why the primary queue will need to
retain its data as long as the application is running...never assuming
that no more consumers will need it).

So things get put onto the "primary queue" and /never/ get taken off ?

I don't really understand what you are trying to do here, but it sounds very
strange. Normally you'd expect to have one shared queue with several threads
removing things from it, and I don't at all see why you should have several
queues with the same contents.

-- chris
 
F

falcon

I'll have one queue with data...think of it as a database table.
Several consumers will 'query' this table, they may filter according to
some predicate...but most importantly, they may decide to --unfilter--
to see all values (hence the need to keep all values).
 
C

castillo.bryan

Perhaps you should use a database table. You could use an auto
incrementing column and have each consumer rember where it left off.
If you know all of your data will fit in memory, you can use an in
memory HSQL table. Then if you have to, you can switch to a table type
stored on disk, or switch to a completely different RDBMS.

If you insist on doing what you were trying to do with the
BlockingQueue, you can control your access to the queue with a
synchronized block.


import java.util.concurrent.*;

public class BlockingQueueTest {

private BlockingQueue<String> queue = new
ArrayBlockingQueue<String>(100);
private static long tzero = System.currentTimeMillis();

private static void msg(String msg) {
System.err.println("[" + (System.currentTimeMillis() - tzero) + "] "
+ msg);
}

private void addSomeItems() throws Exception {
for (int i=0; i<10; i++) {
synchronized (queue) {
msg("putting item");
queue.put("Item: " + i);
}
Thread.sleep(500);
}
}

private void iterateItems() throws Exception {
synchronized (queue) {
for (String str : queue) {
msg("Get item: " + str);
Thread.sleep(1000);
}
}
}

public void run() throws Exception {
msg("running.....");
Thread t = new Thread() {
public void run() {
try {
addSomeItems();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
Thread.sleep(2000);
iterateItems();
t.join();
}

public static void main(String[] args) {
try {
new BlockingQueueTest().run();
}
catch (Exception e) {
e.printStackTrace();
}
}

}

output

C:\bryanc\dev\j15_test>java BlockingQueueTest
[0] running.....
[0] putting item
[500] putting item
[1000] putting item
[1500] putting item
[2000] Get item: Item: 0
[3000] Get item: Item: 1
[4000] Get item: Item: 2
[5000] Get item: Item: 3
[6000] putting item
[6500] putting item
[7000] putting item
[7516] putting item
[8016] putting item
[8516] putting item
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,774
Messages
2,569,596
Members
45,130
Latest member
MitchellTe
Top