multiprocessing & more

A

Andrea Crotti

Hi everyone, I have a few questions about my implementation, which doesn't make me totally happy.

Suppose I have a very long process, which during its executiong logs
something, and the logs are is in n different files in the same
directory.

Now in the meanwhile I want to be able to do realtime analysis in
python, so this is what I've done (simplifying):


def main():
from multiprocessing import Value, Process
is_over = Value('h', 0)
Process(target=run, args=(conf, is_over)).start()
# should also pass the directory with the results
Process(target=analyze,
args=(is_over, network, events, res_dir)).start()

def run():
sim = subprocess.Popen(TEST_PAD, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = sim.communicate()
ret = sim.wait()
# at this point the simulation is over, independently from the result
print "simulation over, can also stop the others"
is_over.value = 1

def analyze():
...

First of all, does it make sense to use multiprocessing and a short
value as boolean to check if the simulation is over or not?

Then the other problem is that I need to read many files, and the idea
was a sort of "tail -f", but on all of them at the same time. Since I
have to keep track of the status for each of them I ended up with
something like this:

class LogFileReader(object):
def __init__(self, log_file):
self.log_file = log_file
self.pos = 0

def get_line(self):
src = open(self.log_file)
src.seek(self.pos)
lines = src.readlines()
self.pos = src.tell()
return lines

which I'm also not really sure it's the best way, then in analyze()
I have a dictionary which keeps track of all the "readers"

log_readers = {}
for out in glob(res_dir + "/*.out"):
node = FILE_OUT.match(out).group(1)
nodeid = hw_to_nodeid(node)
log_readers[nodeid] = LogFileReader(out)

Maybe having more separate processes might be more clean, but since I
have to merge the data it might be a mess...


As last thing to know when to start to analyze the data I thought about this

while len(listdir(res_dir)) < len(network):
sleep(0.2)

which in theory it should be correct, when there are enough files as
the number of nodes in the network everything should be written. BUT
once every 5 times I get an error, telling me one file doens't exists.

That means that for listdir the file is already there but trying to
access to it gives error, how is that possible?

THanks a lot, and sorry for the long mail
 
A

Adam Skutt

First of all, does it make sense to use multiprocessing and a short
value as boolean to check if the simulation is over or not?

Maybe, but without knowing exactly what you're doing it's difficult to
say if any other approach would be superior. Plus, most of the other
approaches I can think of would require code modifications or platform-
specific assumptions.
As last thing to know when to start to analyze the data I thought about this

        while len(listdir(res_dir)) < len(network):
            sleep(0.2)

which in theory it should be correct, when there are enough files as
the number of nodes in the network everything should be written.  BUT
once every 5 times I get an error, telling me one file doens't exists.

That means that for listdir the file is already there but trying to
access to it gives error, how is that possible?

File I/O is inherently a concurrent, unsynchronized activity. A
directory listing can become stale at any time, even while the
directory listing is being built (e.g., imagine running ls or dir in a
directory where rm or del is currently executing). When a directory
is being modified while you're listing it, the contents of the listing
essentially become "undefined": you may get entries for files that no
longer exist, and you may not get entries for that do exist. A
directory listing may also return duplicate entries; this is what I
expect is happening to you.

The right thing to do is actually check to see if all the files you
want exist, if you can. If not, you'll have to keep waiting until
you've opened all the files you expect to open.

Adam
 
A

Andrea Crotti

Maybe, but without knowing exactly what you're doing it's difficult to
say if any other approach would be superior.  Plus, most of the other
approaches I can think of would require code modifications or platform-
specific assumptions.

Well the other possibility that I had in mind was to spawn the very
long process in an asynchronous way, but then I still have the
problem to notify the rest of the program that the simulation is over.

Is there a way to have an asynchronous program that also notifies when
it's over easily?

Otherwise I'll leave it like this it works apparently well...
The only thing is that debugging (with pdb) is not so trivial, but
for that I can use unit tests and check on older simulation results.
File I/O is inherently a concurrent, unsynchronized activity.  A
directory listing can become stale at any time, even while the
directory listing is being built (e.g., imagine running ls or dir in a
directory where rm or del is currently executing).  When a directory
is being modified while you're listing it, the contents of the listing
essentially become "undefined": you may get entries for files that no
longer exist, and you may not get entries for that do exist.  A
directory listing may also return duplicate entries; this is what I
expect is happening to you.

The right thing to do is actually check to see if all the files you
want exist, if you can.  If not, you'll have to keep waiting until
you've opened all the files you expect to open.

Adam

Yes that would be a solution, but I can't know in the python program
what file names there will be in the directory, I can only know
how many will be there.

So I think the easy and stupid solution is just to wait a couple of
seconds
and everyone is happy ;)
 
A

Adam Skutt

> Well the other possibility that I had in mind was to spawn the very
long process in an asynchronous way, but then I still have the
problem to notify the rest of the program that the simulation is over.

Is there a way to have an asynchronous program that also notifies when
it's over easily?

Several, again, what's best depends on what you're doing:
* If you can modify the application, you can have it modify that
shared Value before it exits.
* On UNIX, you can do a non-blocking waitpid() call to determine when
the process exited. The catch is that you can only do this from an
actual parent process, so you'd have to reconstruct the way you spawn
processes. Plus, if you need the stdout/stderr from the process you
then must also consume the I/O in an non-blocking fashion, or dedicate
a thread solely to consuming the I/O. It's possible to pull similar
trickery in Windows, however I don't think the standard Python library
makes it convenient (the right Win32 call is GetExitCodeProcess, given
a legitimate Win32 process HANDLE).
* You have it write something to stdout/stderr and key off of that.

Those (or slight variants on them) are the common ways.

Adam
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,766
Messages
2,569,569
Members
45,043
Latest member
CannalabsCBDReview

Latest Threads

Top