Will Future<?>.get() release the object monitor?

Discussion in 'Java' started by William Rose, Sep 22, 2005.

  1. William Rose

    William Rose Guest

    Hello,

    I'm want a thread pool controller like an ExecutorService that has the
    ability to stop all active tasks and suspend the adding of new tasks,
    before starting afresh and adding more tasks. The purpose is to create
    an interruptible background task (comprised of parallelisable sub-tasks
    performed by the thread pool attached to the ExecutorService) that can
    be stopped and restarted as the project is updated.

    I am developing this new class, which delegates to the ExecutorService,
    because ExecutorService doesn't provide the reset-able behaviour I want,
    and only supports a once-off shutdown. So my prototype code (below) is
    intended to do this. But it doesn't, at least in part because calling
    Future<?>.get() suspends the thread but does not release the current
    object's monitor (like wait() would). This leads to a deadlock in the
    current code.

    So I have a few of questions:
    * Is there an easier way to achieve what I described first (a
    reset-able ExecutorService)?
    * If not, is it possible to make the condition wait triggered by get()
    also lead to the release of some kind of mutex (either the object
    monitor, or some other arbitrary mutex)?

    If neither of the above has a good answer, I could instead be polling
    the Future<?>.isDone() method to detect completion (and using
    controller.wait() and a piggy-backed call to the controller.notify()
    method on the end of every task), but this seems a bit more hacked up
    than had hoped.

    cheers,
    Will

    /**
    * Code follows - FutureTest.java
    * Requires J2SE 5.0
    */
    package com.radiologyforstudents.builder.build;

    import java.text.MessageFormat;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.CancellationException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.atomic.AtomicInteger;

    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;

    /**
    * FutureTest
    *
    * @author William Rose
    */
    public class FutureTest extends Thread {
    private static Log s_log = LogFactory.getLog(FutureTest.class);

    public static class Task implements Runnable {
    private static Log s_log = LogFactory.getLog(Task.class);

    private static AtomicInteger s_count = new AtomicInteger();

    private Integer _id;

    private Long _delay;

    public Task(long delay) {
    _id = s_count.incrementAndGet();
    _delay = delay;
    }

    /* (non-Javadoc)
    * @see java.lang.Runnable#run()
    */
    public void run() {
    s_log.info(MessageFormat.format("{0}:\tStarting task which will take
    {1} ms", new Object[] { _id, _delay }));

    process();

    s_log.info(MessageFormat.format("{0}:\t Completed task", new Object[]
    { _id }));
    }

    protected void process() {
    try {
    Thread.sleep(_delay);
    } catch (InterruptedException e) {
    s_log.error(MessageFormat.format("{0}:\t Interrupted task", new
    Object[] { _id }));
    }
    }
    }

    private ExecutorService _executor;

    private List<Future<?>> _running;

    private boolean _flushing;

    /**
    *
    */
    public FutureTest() {
    super("Control");

    _executor = Executors.newCachedThreadPool();
    _running = new LinkedList<Future<?>>();
    _flushing = false;
    }

    public synchronized void submit(Task task) {
    _running.add(_executor.submit(task));
    notifyAll();
    }

    protected synchronized void reset() {
    if (_flushing)
    return;

    _flushing = true;

    // Try to cancel as many as possible quickly
    for (Future<?> future : _running) {
    if (!future.isDone())
    future.cancel(false);
    }

    // Wait for all the futures to finish
    for (Future<?> future : _running) {
    try {
    future.get();
    } catch (CancellationException e) {
    } catch (InterruptedException e) {
    s_log.error(e);
    } catch (ExecutionException e) {
    s_log.error(e);
    }
    }

    _running.clear();
    _flushing = false;
    }

    /* (non-Javadoc)
    * @see java.lang.Thread#run()
    */
    @Override
    public synchronized void run() {
    while(!_executor.isShutdown()) {
    if(_running.size() < 1) {
    try {
    wait();
    } catch (InterruptedException e) {
    s_log.error("Control interrupted while waiting for new future");
    }
    } else {
    Future<?> future = _running.get(0);

    try {
    future.get();
    } catch (InterruptedException e) {
    s_log.error("Control interrupted while waiting future to arrive");
    } catch (ExecutionException e) {
    s_log.error("Control interrupted due future exception");
    }
    }
    }
    }


    public synchronized void quit() {
    _executor.shutdown();
    notifyAll();
    }

    /**
    * @param args
    */
    public static void main(String[] args) {
    final FutureTest controller = new FutureTest();

    controller.start();

    controller.submit(new FutureTest.Task(5000) {
    /* (non-Javadoc)
    * @see com.radiologyforstudents.builder.build.FutureTest.Task#process()
    */
    @Override
    protected void process() {
    super.process();

    controller.quit();
    }
    });
    }

    }
     
    William Rose, Sep 22, 2005
    #1
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Maciej
    Replies:
    1
    Views:
    395
    Chris Uppal
    Oct 26, 2006
  2. Replies:
    1
    Views:
    397
    Daniel Pitts
    Feb 25, 2008
  3. ScottZ
    Replies:
    2
    Views:
    412
    Jorgen Grahn
    Nov 2, 2008
  4. Simon Strandgaard

    [ann] aeditor-2.2 (future release)

    Simon Strandgaard, Jan 23, 2005, in forum: Ruby
    Replies:
    1
    Views:
    106
    Simon Strandgaard
    Jan 23, 2005
  5. Mike Schwab

    inline comments in future release?

    Mike Schwab, Jul 24, 2008, in forum: Ruby
    Replies:
    23
    Views:
    269
    Robert Klemme
    Jul 27, 2008
Loading...

Share This Page