how to use priority queue with multiprocessing

M

Marco Hornung

Hey,

------------------------------------------------------------------------------------------
question
------------------------------------------------------------------------------------------
How can I use a priority queue to schedule jobs within the "multiprocessing pool" module?

------------------------------------------------------------------------------------------
my scenario
------------------------------------------------------------------------------------------
I want to run several jobs on a server. The jobs are being sent by users. However, all jobs have a different priority, and high-priority jobs should be processed before any low-priority job gets touched.
Currently I just append all incoming jobs to the multiprocessing worker pool as follows:
### initialize worker pool
pool = PriorityPool(processes=worker_count)
process_handles = []

### distribute function execution over several processes
for job_parameter in job_parameter_list:
handle = pool.apply_async(process_function, [job_parameter,])
process_handles.append(handle)

This will only put the jobs in some kind of a list - and execute the jobs in the order they come in. Is it possible to use a priority queue for the process-pool?

Kind Regards,
Marco
 
J

John Nagle

How can I use a priority queue to schedule jobs within the
"multiprocessing pool" module?I want to run several jobs on a server. The jobs are being sent by
users. However, all jobs have a different priority, and high-priority
jobs should be processed before any low-priority job gets touched.
Currently I just append all incoming jobs to the multiprocessing
worker pool as follows: ### initialize worker pool pool =
PriorityPool(processes=worker_count) process_handles = []

### distribute function execution over several processes for
job_parameter in job_parameter_list: handle =
pool.apply_async(process_function, [job_parameter,])
process_handles.append(handle)

This will only put the jobs in some kind of a list - and execute the
jobs in the order they come in. Is it possible to use a priority
queue for the process-pool?

You''ll probably have to track the available processes yourself,
starting a new job when there's a process available.

One way to do this is to have a management thread for each
process. Each management thread starts a subprocess, gets
a work item from the priority queue (blocking if necessary),
gives it to the subprocess, waits for the subprocess to
return a result, and goes back to get another work item.

This is straightforward, except for working out a way
to cleanly shut the thing down. One way to do that is
to have a "shutdown" flag visible to all the threads.
That's checked before getting a new task. If it's set,
the thread terminates its subprocess and returns.
Set the terminate flag in a signal handler for control-C.

(I have something that manages multiple processes
using a priority queue, where the queue is implemented
using MySQL. This allows me to put a whole cluster to
work.)

John Nagle
 
A

Adam Tauno Williams

On 1/13/2011 9:07 AM, Marco Hornung wrote:
I want to run several jobs on a server. The jobs are being sent by
users. However, all jobs have a different priority, and high-priority
jobs should be processed before any low-priority job gets touched.
Currently I just append all incoming jobs to the multiprocessing
worker pool as follows: ### initialize worker pool pool =
PriorityPool(processes=worker_count) process_handles = []
### distribute function execution over several processes for
job_parameter in job_parameter_list: handle =
pool.apply_async(process_function, [job_parameter,])
process_handles.append(handle)
This will only put the jobs in some kind of a list - and execute the
jobs in the order they come in. Is it possible to use a priority
queue for the process-pool?
You''ll probably have to track the available processes yourself,
starting a new job when there's a process available.

Which is exactly what we do in OpenGroupwre Coils' OIE.

There is a process [job] list which is sorted by priority and the next
available process is started when a worker is available. We use
multiprocessing to create a *process*, rather than a thread, for each
job.
One way to do this is to have a management thread for each
process. Each management thread starts a subprocess, gets
a work item from the priority queue (blocking if necessary),
gives it to the subprocess, waits for the subprocess to
return a result, and goes back to get another work item.

We have a manager process and an executor process. These communicate
via AMQ, but you could use any mechanism. The manager process controls
the process [job] list. When a process needs to be started a message is
send to the executor which creates a worker process if an opening is
available. Otherwise it messages the manager process to place the
process in a queued state. When a worker process completes it messages
the executor which in turn messages the manager that a process slot may
be available; then the manager looks up the next available process and
messages the executor to start it - provided a worker slot is still
available the executor will start the worker.... [otherwise the process
will go back into a queued state].
This is straightforward, except for working out a way
to cleanly shut the thing down. One way to do that is
to have a "shutdown" flag visible to all the threads.

Using a message bus helps a lot, and with multiprocessing you just do a
join/isalive to make sure a worker is still working.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,770
Messages
2,569,583
Members
45,075
Latest member
MakersCBDBloodSupport

Latest Threads

Top