multiprocessing problems

D

DoxaLogos

Hi,

I decided to play around with the multiprocessing module, and I'm
having some strange side effects that I can't explain. It makes me
wonder if I'm just overlooking something obvious or not. Basically, I
have a script parses through a lot of files doing search and replace
on key strings inside the file. I decided the split the work up on
multiple processes on each processor core (4 total). I've tried many
various ways doing this form using pool to calling out separate
processes, but the result has been the same: computer crashes from
endless process spawn.

Here's the guts of my latest incarnation.

def ProcessBatch(files):
p = []
for file in files:
p.append(Process(target=ProcessFile,args=file))

for x in p:
x.start()

for x in p:
x.join()

p = []
return

Now, the function calling ProcessBatch looks like this:
def ReplaceIt(files):
"""
All this does is walks through all the files passed to it and
verifies
the file is a legitimate file to be processed (project file).

@param files: files to be processed
"""
processFiles = []
for replacefile in files:
if(CheckSkipFile(replacefile)):
processFiles.append(replacefile)
if(len(processFiles) == 4):
ProcessBatch(processFiles)
processFiles = []

#check for left over files once main loop is done and process them
if(len(processFiles) > 0):
ProcessBatch(processFiles)

return

Specs:
Windows 7 64-bit
Python v2.6.2
Intel i5


Thanks
 
A

Adam Tauno Williams

I decided to play around with the multiprocessing module, and I'm
having some strange side effects that I can't explain. It makes me
wonder if I'm just overlooking something obvious or not. Basically, I
have a script parses through a lot of files doing search and replace
on key strings inside the file. I decided the split the work up on
multiple processes on each processor core (4 total). I've tried many
various ways doing this form using pool to calling out separate
processes, but the result has been the same: computer crashes from
endless process spawn.

Are you hitting a ulimit error? The number of processes you can create
is probably limited.

TIP: close os.stdin on your subprocesses.
Here's the guts of my latest incarnation.
def ProcessBatch(files):
p = []
for file in files:
p.append(Process(target=ProcessFile,args=file))
for x in p:
x.start()
for x in p:
x.join()
p = []
return
Now, the function calling ProcessBatch looks like this:
def ReplaceIt(files):
processFiles = []
for replacefile in files:
if(CheckSkipFile(replacefile)):
processFiles.append(replacefile)
if(len(processFiles) == 4):
ProcessBatch(processFiles)
processFiles = []
#check for left over files once main loop is done and process them
if(len(processFiles) > 0):
ProcessBatch(processFiles)

According to this you will create files is sets of four, but an unknown
number of sets of four.
 
D

DoxaLogos

I decided to play around with the multiprocessing module, and I'm
having some strange side effects that I can't explain.  It makes me
wonder if I'm just overlooking something obvious or not.  Basically, I
have a script parses through a lot of files doing search and replace
on key strings inside the file.  I decided the split the work up on
multiple processes on each processor core (4 total).  I've tried many
various ways doing this form using pool to calling out separate
processes, but the result has been the same: computer crashes from
endless process spawn.

Are you hitting a ulimit error?  The number of processes you can create
is probably limited.

TIP: close os.stdin on your subprocesses.


Here's the guts of my latest incarnation.
def ProcessBatch(files):
    p = []
    for file in files:
        p.append(Process(target=ProcessFile,args=file))
    for x in p:
        x.start()
    for x in p:
        x.join()
    p = []
    return
Now, the function calling ProcessBatch looks like this:
def ReplaceIt(files):
    processFiles = []
    for replacefile in files:
        if(CheckSkipFile(replacefile)):
            processFiles.append(replacefile)
            if(len(processFiles) == 4):
                ProcessBatch(processFiles)
                processFiles = []
    #check for left over files once main loop is done and process them
    if(len(processFiles) > 0):
        ProcessBatch(processFiles)

According to this you will create files is sets of four, but an unknown
number of sets of four.

What would be the proper way to only do a set of 4, stop, then do
another set of 4? I'm trying to only 4 files at time before doing
another set of 4.
 
D

DoxaLogos

Are you hitting a ulimit error?  The number of processes you can create
is probably limited.
TIP: close os.stdin on your subprocesses.
Here's the guts of my latest incarnation.
def ProcessBatch(files):
    p = []
    for file in files:
        p.append(Process(target=ProcessFile,args=file))
    for x in p:
        x.start()
    for x in p:
        x.join()
    p = []
    return
Now, the function calling ProcessBatch looks like this:
def ReplaceIt(files):
    processFiles = []
    for replacefile in files:
        if(CheckSkipFile(replacefile)):
            processFiles.append(replacefile)
            if(len(processFiles) == 4):
                ProcessBatch(processFiles)
                processFiles = []
    #check for left over files once main loop is done and process them
    if(len(processFiles) > 0):
        ProcessBatch(processFiles)
According to this you will create files is sets of four, but an unknown
number of sets of four.

What would be the proper way to only do a set of 4, stop, then do
another set of 4?  I'm trying to only 4 files at time before doing
another set of 4.

I found out my problems. One thing I did was followed the test queue
example in the documentation, but the biggest problem turned out to be
a pool instantiated globally in my script was causing most of the
endless process spawn, even with the "if __name__ == "__main__":"
block.
 
N

Nils Ruettershoff

Hi Doxa,

DoxaLogos wrote:
[...]
I found out my problems. One thing I did was followed the test queue
example in the documentation, but the biggest problem turned out to be
a pool instantiated globally in my script was causing most of the
endless process spawn, even with the "if __name__ == "__main__":"
block.

Problems who solves them self, are the best problems ;)

One tip: currently your algorithm has some overhead. 'Cause you are
starting 4 time an additional python interpreter, compute the files and,
closing all new spawned interpreter and starting again 4 interpreter,
which are processing the files.

For such kind of jobs I prefer to start processes once and feeding them
with data via a queue. This reduces some overhead and increase runtime
performance.


This could look like this:
(due some pseudo functions not directly executeable -> untested)

import multiprocessing
import Queue

class Worker(multiprocessing.Process):
def __init__(self, feeder_q, queue_filled):
multiprocessing.Process.__init__(self)
self.feeder_q = feeder_q
self.queue_filled = queue_filled

def run(self):
serve = True
# start infinite loop
while serve:
try:
# scan queue for work, will block process up to 5
seconds. If until then no item is in queue a Queue.Empty will be raised
text = self.feeder_q.get(True, timeout=5)
if text:
do_stuff(text)
# very important! tell the queue that the fetched
work has been finished
# otherwise the feeder_q.join() would block infinite
self.input_queue.task_done()
except Queue.Empty:
# as soon as queue is empty and all work has been enqueued
# process can terminate itself
if self.queue_filled.is_set() and feeder_q.empty():
serve = False
return


if __name__ == '__main__':
number_of_processes = 4
queue_filled = multiprocessing.Event()
feeder_q = multiprocessing.JoinableQueue()
process_list =[]
# get file name which need to be processed
all_files = get_all_files()
# start processes
for i in xrange(0,number_of_processes):
process = Worker(feeder_q, queue_filled)
process.start()
process_list.append(thread)
# start feeding
for file in all_files:
feeder_q.put(file)
# inform processes that all work has been ordered
queue_filled.set()
# wait until queue is empty
feeder_q.join()
# wait until all processed have finished their jobs
for process in process_list:
process.join()



Cheers,
Nils
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,769
Messages
2,569,582
Members
45,065
Latest member
OrderGreenAcreCBD

Latest Threads

Top