ExecuterService and Futures not thread save with 8 processors

H

helaha

Hi all,
I'm implementing a parallel image processing algorithm but I‘m having
some thread problems using ExecuterService and Future on a 8 processor
Vista 64-bit workstation. I get the processed images back, the
processed image stack has the right size, but some images are doubled
in the list and therefore some others are missing. This appears in an
arbitrary manner. In an image stack of 20 images about 1 or 2 images
are wrong. Furthermore, on a single processor PC the code works very
well, even with 16 threads.
Has anybody an idea?
Thank you very much in advance.


private static int nStackMax = 0; //number of images
private static Vector<Object> resultVector = null; //this is a
result vector of objects (PlanarImage or double)

public static void processImageStackExecuter() throws Exception{
System.gc();
System.out.println("processImageStackParallel");
nStackMax = managerModel.getSize(); //the size of
the image stack is determined
resultArrayList = new Vector<Object>(nStackMax);
nThread = 16;
System.out.println("Number of threads: "+ nThread);

class MyCallable implements Callable<Object> {

private PlanarImage pi;

MyCallable(PlanarImage pi, int n) {
this.pi = pi;
}
public Object call(){
System.out.println("ManagerJ: StackProcessing runs in thread: "
+Thread.currentThread());
Object ob = ManagerJ.processImage(pi); //This method does the
image processing and gives back a PlanarImage or a double
return ob;
}
}

ExecutorService exec = Executors.newFixedThreadPool(nThread);
Vector<Future<Object>> tasks = new Vector<Future<Object>>
(nStackMax);

for (int n =0; n < nStackMax; n++){
System.out.println("ManagerJ: execute: " + (n+1));
PlanarImage pi = TankJ.getCurrentTankImageAt(n); //this method
gets the current image at postion n
MyCallable MyCall = new MyCallable( pi, n );
Future<Object> future = exec.submit(MyCall);
tasks.add(n, future);
}

for (int n =0; n < nStackMax; n++){
Future <Object> future = tasks.get(n);
Object ob = future.get();
resultVector.add(n,ob);
}

exec.shutdown();
boolean b = exec.awaitTermination(100, TimeUnit.DAYS);
System.out.println("Stack processing finished:" + b);

if (resultVector.get(0) instanceof PlanarImage) TankJ.addNewItem
(resultVector ); //The list of PlanarImages is set to the display
}
 
L

Lothar Kimmeringer

helaha said:
PlanarImage pi = TankJ.getCurrentTankImageAt(n); //this method
gets the current image at postion n

Are you sure that this method is thread-safe and not returning the
same image twice?


Regards, Lothar
--
Lothar Kimmeringer E-Mail: (e-mail address removed)
PGP-encrypted mails preferred (Key-ID: 0x8BC3CD81)

Always remember: The answer is forty-two, there can only be wrong
questions!
 
M

Mark Space

helaha said:
MyCallable(PlanarImage pi, int n) {
this.pi = pi;
}
public Object call(){
System.out.println("ManagerJ: StackProcessing runs in thread: "
+Thread.currentThread());
Object ob = ManagerJ.processImage(pi); //This method does the
image processing and gives back a PlanarImage or a double


I agree with Lothar -- if you only see the spam from this method once,
then it's running ok and the problem must be somewhere else. First
thing I notice is that you don't do anything with n here.

You may wish to print the object ID of pi here (just call the default
toString() method) in the call() method, just to make sure each process
gets started with a unique image. If it does, look further into the
processing and output methods. If it doesn't, see Lothar's advice.

You might also want to reduce the number of running threads in the
thread pool to, say, 1, and kick it off only after all tasks are
created. Does the problem go away? If not, your issue may not be
related to threads at all.
 
H

helaha

Are you sure that this method is thread-safe and not returning the
same image twice?

Dear Lothar,
many thanks for your reply, it indeed isn't explicitly thread safe.
I'll have look at it. On the other hand the images aren't doubled side
by side (e.g. image number 2 and 3 are identical) It appears that e.g.
image number 2 and image number 5 are identical or number 4 and 7,…..

Many thanks,
Helmut
 
R

Roedy Green

I'm implementing a parallel image processing algorithm but I‘m having
some thread problems using ExecuterService and Future on a 8 processor
Vista 64-bit workstation. I get the processed images back, the
processed image stack has the right size, but some images are doubled
in the list and therefore some others are missing. This appears in an
arbitrary manner. In an image stack of 20 images about 1 or 2 images
are wrong. Furthermore, on a single processor PC the code works very
well, even with 16 threads.
Has anybody an idea?

See http://mindprod.com/jgloss/thread.html#BOOKS

and read the recommended books. Getting code working with many
threads is quite a delicate art.
--
Roedy Green Canadian Mind Products
http://mindprod.com

One path leads to despair and utter hopelessness. The other,
to total extinction. Let us pray we have the wisdom to choose correctly.
~ Woody Allen .
 
H

helaha

I agree with Lothar -- if you only see the spam from this method once,
then it's running ok and the problem must be somewhere else.  First
thing I notice is that you don't do anything with n here.

Yes, that's right
You may wish to print the object ID of pi here (just call the default
toString() method) in the call() method, just to make sure each process
gets started with a unique image.  If it does, look further into the
processing and output methods.  If it doesn't, see Lothar's advice.

I'll try that
You might also want to reduce the number of running threads in the
thread pool to, say, 1, and kick it off only after all tasks are
created.  Does the problem go away?  If not, your issue may not be
related to threads at all.

Dear Mark,
with decreased number of threads the problem occurs not so often. With
thread number = 1 the algorithm works with single or 8 processors. 1
thread and 8 processors works, many threads and 1 processor works,
only 8 processors and many threads doesn't work.

Thank you very much for your advice,
Helmut
 
L

Lew

helaha said:
I'm implementing a parallel image processing algorithm but I‘m having
some thread problems using ExecuterService and Future on a 8 processor
Vista 64-bit workstation. I get the processed images back, the
processed image stack has the right size, but some images are doubled
in the list and therefore some others are missing. This appears in an
arbitrary manner. In an image stack of 20 images about 1 or 2 images
are wrong. Furthermore, on a single processor PC the code works very
well, even with 16 threads.
Has anybody an idea?

There's too much and too little here to answer your question. This code is
uncompilable. The declarations of 'resultArrayList' and 'nThread' are not
shown (and 'resultArrayList' is a strange name given that you implement it
with a 'Vector'). Likewise the implementation of 'managerModel',
'ManagerJ', 'PlanarImage' and 'TankJ'. You have antipatterns (use of
'Vector', static variables and methods, 'instanceof' instead of polymorphism).
A complicated threading model without relevant code shown.

I can't begin to puzzle through all this. Provide an SSCCE that illustrates
your problem.
private static int nStackMax = 0; //number of images
private static Vector<Object> resultVector = null; //this is a
result vector of objects (PlanarImage or double)

public static void processImageStackExecuter() throws Exception{
System.gc();
System.out.println("processImageStackParallel");
nStackMax = managerModel.getSize(); //the size of
the image stack is determined
resultArrayList = new Vector<Object>(nStackMax);
nThread = 16;
System.out.println("Number of threads: "+ nThread);

class MyCallable implements Callable<Object> {

private PlanarImage pi;

MyCallable(PlanarImage pi, int n) {
this.pi = pi;
}
public Object call(){
System.out.println("ManagerJ: StackProcessing runs in thread: "
+Thread.currentThread());
Object ob = ManagerJ.processImage(pi); //This method does the
image processing and gives back a PlanarImage or a double
return ob;
}
}

ExecutorService exec = Executors.newFixedThreadPool(nThread);
Vector<Future<Object>> tasks = new Vector<Future<Object>>
(nStackMax);

for (int n =0; n < nStackMax; n++){
System.out.println("ManagerJ: execute: " + (n+1));
PlanarImage pi = TankJ.getCurrentTankImageAt(n); //this method
gets the current image at postion n
MyCallable MyCall = new MyCallable( pi, n );
Future<Object> future = exec.submit(MyCall);
tasks.add(n, future);
}

for (int n =0; n < nStackMax; n++){
Future <Object> future = tasks.get(n);
Object ob = future.get();
resultVector.add(n,ob);
}

exec.shutdown();
boolean b = exec.awaitTermination(100, TimeUnit.DAYS);
System.out.println("Stack processing finished:" + b);

if (resultVector.get(0) instanceof PlanarImage) TankJ.addNewItem
(resultVector ); //The list of PlanarImages is set to the display
}

P.S., please limit indentation on Usenet to no more than four spaces per
level, for readability.
 
T

Tom Anderson

Are you sure that this method is thread-safe and not returning the same
image twice?

He's only calling that method from the main thread, not from any of the
worker threads, so its thread-safety seems unlikely to be relevant.
However, your advice is still sound - it would be good to verify that it's
not producing the same source image more than once.

The only method that's being called in parallel is this one:

Object ob = ManagerJ.processImage(pi); //This method does the image processing and gives back a PlanarImage or a double

My guess would be that this is not threadsafe - specifically, that
ManagerJ is stashing state in a static variable somewhere, and multiple
calls to it are getting confused. I'd try declaring processImage to be
synchronized, or else putting that call inside a synchronized block that
locks a suitable shared object, eg:

synchronized (ManagerJ.class) {
Object obj = ManagerJ.processImage(pi);
}

Which is a bit nasty, but will work, at least for debugging.

Of course, this synchronization is not a solution to the problem, since it
destroys the advantage of multithreading. I suggest it merely as a
bug-hunting tool.

tom
 
L

Lothar Kimmeringer

Tom said:
He's only calling that method from the main thread, not from any of the
worker threads, so its thread-safety seems unlikely to be relevant.

He left away a lot of code, so what I think that is happening
here is that another Thread is maybe working with TankJ as
well (that seems to be some kind of Tool-class allowing
access via static methods) that change the order of the under-
lying pictures.

So while the "main thread" is building up the list of worker
threads the underlying list of images handled by TankJ changes,
so that e.g. TankJ.getCurrentTankImageAt(3) called some nano-
seconds before return the same image that is returned with
TankJ.getCurrentTankImageAt(7) some nanoseconds later.


Regards, Lothar
--
Lothar Kimmeringer E-Mail: (e-mail address removed)
PGP-encrypted mails preferred (Key-ID: 0x8BC3CD81)

Always remember: The answer is forty-two, there can only be wrong
questions!
 
H

helaha

synchronized (ManagerJ.class) {
        Object obj = ManagerJ.processImage(pi);

}

Dear Tom,
synchronization of my image processing method solved the problem! But
as you stated the parallelism has gone.
So what would you suggest, starting ManagerJ.processImage(pi) as a
thread?
Many thanks so far, this is a major step forward for me.
Helmut
 
H

helaha

so that e.g. TankJ.getCurrentTankImageAt(3) called some nano-
seconds before return the same image that is returned with
TankJ.getCurrentTankImageAt(7) some nanoseconds later.

Dear Lothar,
I tryed the synchronization similar to Tom's suggestion:
synchronized (TankJ.class){
TankJ.getCurrentTankImageAt(n)
}
without success.
Synchronization of ManagerJ was successful.
Nevertheless many thanks, that's the right way,

Helmut
 
T

Tom Anderson

synchronization of my image processing method solved the problem! But as
you stated the parallelism has gone. So what would you suggest, starting
ManagerJ.processImage(pi) as a thread?

You need to look at ManagerJ.processImage and work out why it's not
threadsafe (if that is indeed the problem). Then fix it. Look through it,
and the other methods it calls, and look for places where it stores values
in global variables - any variable which will be the same actual variable
for different executions of the method.

Examples:

public class ManagerJ {
private static int width;
private static Map<String, Object> currentImageInfo;

public static Object processImage(Image pi) {
int height = pi.getHeight();
width = pi.getWidth();
currentImageInfo = new HashMap<String, Object>();
List<Integer> dimensions = new ArrayList<Integer>();
dimensions.add(height);
dimensions.add(width);
currentImageInfo.put("DIMENSIONS", dimensions);
doubleDimensions();
}
private static void doubleDimensions() {
List<Integer> dims = (List<Integer>)currentImageInfo.get("DIMENSIONS"):
dims.set(0, dims.get(0) * 2);
dims.set(1, dims.get(1) * 2);
}
}

In that code, 'height' is the only threadsafe variable. All of the others
are vulnerable to inter-thread clobbering: width is global,
currentImageInfo is global, dims is local but derived from a global, and
dimensions is local, but its value is aliased by dims, which is vulnerable
(so perhaps it's truer to say that the variable itself is safe, but its
value is not).

This kind of analysis requires care and attention to detail, but it's not
really that difficult.

If ManagerJ is small, you could try posting it here and see if we can spot
anything.

tom
 
H

helaha

With all your help I figured out that the problem is probably the
massive access to my image processing method Object ob =
ManagerJ.processImage(pi); inside the callable.
Is there a method to regulate the access to a method, when a large
number of threads want to run it, despite of the keyword
“synchronized”?

Furthermore I figured out that a short time delay right after the
submission of a callable solves the problem too.

ExecutorService exec = Executors.newFixedThreadPool(nThread);
Vector<Future<Object>> tasks = new Vector<Future<Object>>(nStackMax);
for (int n =0; n < nStackMax; n++){
System.out.println("execute: " + (n+1));
PlanarImage pi = TankJ.getCurrentTankImageAt(n);
MyCallable MyCall = new MyCallable( pi, n );
Future<Object> future = exec.submit(MyCall);
tasks.add(n, future);
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

(A similar time delay just after the ManagerJ.processImage(pi) state
inside the callable wasn’t successful.)
I know that such a delay isn’t a good way to overcome such problems.
Is there another way to regulate only the multiple initialization of a
method without synchronization of the whole method?

Thank you all for your kind help,
Helmut
 
H

helaha

If ManagerJ is small, you could try posting it here and see if we can spot
anything.

Dear Tom,
Thanky you very much for your example, I'll have a look at it.
Here you can see the processImage method. I use the JAI framework with
user written functions. The current function name together with the
current image and the current paramterblock are the input for the
JAI.create method.

Concerning the image stack processing, the current function and the
parameterblock (without the source) are identical from image to image
in every thread but the PlanarImage (source) is different.

public static Object processImage(PlanarImage pi){

System.out.println("process Image");

//currFunc is a name for a user written function
//it is used by the JAI.create method
String currFunc = null;

//pb is a ParamterBlock used by the JAI method
ParameterBlock pb = null;

if (pi == null){
System.out.println("Current image not defined");
return null;
}
try {
//this gets the current name of the user function
// e.g. it is possible to set currFunc =”myinvert”.
currFunc = IqmTools.getCurrImgProcessFunc();
//this gets the current parameter block
pb = IqmTools.getCurrParameterBlock();
//perhaps the following is critical
pb.removeSources(); // this removes the image
pb.addSource(pi); // this adds the current image
}
catch (NullPointerException e) {
System.out.println("Current function and/or parameter block not
defined");
return null;
}

//JAI calls the function with the paramterblock (including the image)
Object ob = JAI.create(currFunc, pb, null);
if (ob instanceof PlanarImage && ob != null)
return ob;
}

The result images are all processed with the current function, so I
think the currFunc name and the paramterblock (without source) aren’t
the problem.
Thank you for your kind help,
Helmut
 
L

Lew

helaha said:
With all your help I figured out that the problem is probably the
massive access to my image processing method Object ob =
ManagerJ.processImage(pi); inside the callable.
Is there a method to regulate the access to a method, when a large
number of threads want to run it, despite of the keyword
“synchronized�

If different threads need access to common state, read or write, that access
must be synchronized. Period.

To minimize the serializing effect, reduce the synchronized access to the very
minimum body of code (the "critical section") that covers the common state,
all using the *same* object as the lock.

One way to reduce such access on a read is to copy the shared state to local
state in the critical section, then act on the local state outside the
critical section.
Furthermore I figured out that a short time delay right after the
submission of a callable solves the problem too.

Probably not. It probably only hides the problem.
Is there another way to regulate only the multiple initialization of a
method without synchronization of the whole method?

Why are you so hell-bent on not synchronizing the method? Shared state *must*
use synchronized access, whether via the 'synchronized' keyword or otherwise.

Perhaps one or more of the java.util.concurrent classes will help you.

Read /Java Concurrency in Practice/ by Brian Goetz, et al.
 
B

blue indigo

Concerning the image stack processing, the current function and the
parameterblock (without the source) are identical from image to image
in every thread but the PlanarImage (source) is different.

pb = IqmTools.getCurrParameterBlock();
//perhaps the following is critical
pb.removeSources(); // this removes the image
pb.addSource(pi); // this adds the current image

And there it is. The instance of ParameterBlock, which you imply is a
single one shared among threads, is used to store a reference to the
current image. Which gets clobbered.

If you want this to be fully concurrent, with no locking, you need to have
a separate copy of the ParameterBlock object for each thread, or else not
use it to hold per-image mutable state.
 
H

helaha

Lew said:
Why are you so hell-bent on not synchronizing the method? Shared state *must*
use synchronized access, whether via the 'synchronized' keyword or otherwise.

Perhaps one or more of the java.util.concurrent classes will help you.

Read /Java Concurrency in Practice/ by Brian Goetz, et al.

Dear Lew,
synchronizing ManagerJ.processImage as a whole, as Tom suggested,
solves the synchronizig problem, but then the threads are calulated in
a serial manner rather than a parallel way. But you are right, it is
necessary to synchronize as small parts as possible.
Many thanks,
Helmut
 
H

helaha

And there it is. The instance of ParameterBlock, which you imply is a
single one shared among threads, is used to store a reference to the
current image. Which gets clobbered.

If you want this to be fully concurrent, with no locking, you need to have
a separate copy of the ParameterBlock object for each thread, or else not
use it to hold per-image mutable state.

Dear blue indigo,
in my opinion the variable ParameterBlock is already defined as local
variable, am I wrong?

Many thanks,
Helmut
 
M

Mark Space

helaha said:
On 25 Feb., 17:43, blue indigo
Dear blue indigo,
in my opinion the variable ParameterBlock is already defined as local
variable, am I wrong?

You are probably wrong. What is the point of removing a source
(removeSource()) or adding a source (addSource()) if this object is
thread confined ("local")? It seems from the semantics pb cannot be
truly local.

The reference is local. The object the reference points to is not.
Does that make sense?

In other words, how does getCurrParameterBlock() function? Does it just
return an existing reference? Or does it call "new" to make a new objec
t or call clone() to copy one? The latter two are threadsafe, but
returning a reference to a single object is not, because the single
object is still shared among all threads.

I think try something like this:


pb = IqmTools.getCurrParameterBlock();
//perhaps the following is critical
synchronized( pb ) {
pb.removeSources(); // this removes the image
pb.addSource(pi); // this adds the current image
}

You might also need to add the call to JAI.create() in there too,
depending on how it uses the pb variable.
 
H

helaha

You are probably wrong. What is the point of removing a source
(removeSource()) or adding a source (addSource()) if this object is
thread confined ("local")? It seems from the semantics pb cannot be
truly local.

The reference is local. The object the reference points to is not.
Does that make sense?

In other words, how does getCurrParameterBlock() function? Does it just
return an existing reference? Or does it call "new" to make a new objec
t or call clone() to copy one? The latter two are threadsafe, but
returning a reference to a single object is not, because the single
object is still shared among all threads.

getCurrParameterBlock() returns a reference, simply a global
ParameterBlock, which can be set by other methods. First, the
ParameterBlock defines the parameters, which are quite identical for
each image, so it isn't a problem if this variable is global. Second
an image pi is added wich must be local because of thread savety.
So, with the first line
ParameterBlock pb; //pb is local
then with
pb = IqmTools.getCurrParameterBlock(); //pb isn't local any more,
it's global?, really?
// a local image is added but pb is global? I'm not really convinced
at this point.
pb.addSource(pi);

I'll try the clone method for a thread save copy. I'll post the
results.
I think try something like this:

pb = IqmTools.getCurrParameterBlock();
//perhaps the following is critical
synchronized( pb ) {
pb.removeSources(); // this removes the image
pb.addSource(pi); // this adds the current image
}

Sorry, but this synchronization doesn't solve the problem.
You might also need to add the call to JAI.create() in there too,
depending on how it uses the pb variable.

This would synchronize the long running process and unfortunately I
don't know anything about the usage ob pb in JAI.create().

Many thanks,
regards,
Helmut
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,767
Messages
2,569,570
Members
45,045
Latest member
DRCM

Latest Threads

Top