Best way to report progress at fixed intervals

S

Slaunger

Hi comp.lang.python

I am a novice Python 2.5 programmer, who write some cmd line scripts
for processing large amounts of data.

I would like to have possibility to regularly print out the progress
made during the processing, say every 1 seconds, and i am wondering
what a proper generic way to do this is.

I have created this test example to show the general problem. Running
the script gives me the output:

Work through all 20 steps reporting progress every 1.0 secs...
Work step 0
Work step 1
Work step 2
Work step 3
Work step 4
Processed 4 of 20
Work step 5
Work step 6
Work step 7
Work step 8
Processed 8 of 20
Work step 9
Work step 10
Work step 11
Work step 12
Work step 13
Processed 13 of 20
Work step 14
Work step 15
Work step 16
Work step 17
Processed 17 of 20
Work step 18
Work step 19
Finished working through 20 steps

The script that does this is as follows:

testregularprogress.py:

"""
Test module for testing generic ways of displaying progress
information
at regular intervals.
"""
import random
import threading
import time

def work(i):
"""
Dummy process function, which takes a random time in the interval
0.0-0.5 secs to execute
"""
print "Work step %d" % i
time.sleep(0.5 * random.random())


def workAll(verbose=True, max_iter=20, progress_interval=1.0):

class _Progress(object):

def __init__(self):
self.no = 0
self.max = max_iter
self.start_timer = verbose

def __str__(self):
self.start_timer = True # I do not like this appraoch
return "Processed %d of %d" % (self.no, self.max)

p = _Progress()

def report_progress():
print p

if verbose:
print "Work through all %d steps reporting progress every
%3.1f secs..." % \
(max_iter, progress_interval)

for i in xrange(max_iter):
if p.start_timer :
p.start_timer = False # Let the progress instance set the
flag
timer = threading.Timer(progress_interval,
report_progress)
timer.start()
work(i)
p.no = i + 1

# Kill the last timer, which is still active at this time
timer.cancel()

if verbose:
print "Finished working through %d steps" % max_iter

if __name__ == "__main__":
workAll()

Quite frankly, I do not like what I have made! It is a mess,
responsibilities are mixed, and it seems overly complicated. But I
can't figure out how to do this right.

I would therefore like some feedback on this proposed generic "report
progress at regular intervals" approach presented here. What could I
do better?

-- Slaunger
 
R

rdmurray

I am a novice Python 2.5 programmer, who write some cmd line scripts
for processing large amounts of data.

I would like to have possibility to regularly print out the progress
made during the processing, say every 1 seconds, and i am wondering
what a proper generic way to do this is.

I have created this test example to show the general problem. Running
the script gives me the output:

Work through all 20 steps reporting progress every 1.0 secs...
Work step 0
Work step 1
Work step 2
Work step 3
Work step 4
Processed 4 of 20
Work step 5 [...]
Work step 19
Finished working through 20 steps [...]
Quite frankly, I do not like what I have made! It is a mess,
responsibilities are mixed, and it seems overly complicated. But I
can't figure out how to do this right.

I would therefore like some feedback on this proposed generic "report
progress at regular intervals" approach presented here. What could I
do better?

I felt like a little lunchtime challenge, so I wrote something that
I think matches your spec, based on your sample code. This is not
necessarily the best implementation, but I think it is simpler and
clearer than yours. The biggest change is that the work is being
done in the subthread, while the main thread does the monitoring.

It would be fairly simple to enhance this so that you could pass
arbitrary arguments in to the worker function, in addition to
or instead of the loop counter.

-----------------------------------------------------------------------
"""
Test module for testing generic ways of displaying progress
information at regular intervals.
"""
import random
import threading
import time

def work(i):
"""
Dummy process function, which takes a random time in the interval
0.0-0.5 secs to execute
"""
print "Work step %d" % i
time.sleep(0.5 * random.random())


class Monitor(object):
"""
This class creates an object that will execute a worker function
in a loop and at regular intervals emit a progress report on
how many times the function has been called.
"""

def dowork(self):
"""
Call the worker function in a loop, keeping track of how
many times it was called in self.no
"""
for self.no in xrange(self.max_iter):
self.func(self.no)

def __call__(self, func, verbose=True, max_iter=20, progress_interval=1.0):
"""
Repeatedly call 'func', passing it the loop count, for max_iter
iterations, and every progress_interval seconds report how
many times the function has been called.
"""
# Not all of these need to be instance variables, but they might
# as well be in case we want to reference them in an enhanced
# dowork function.
self.func = func
self.verbose = verbose
self.max_iter=max_iter
self.progress_interval=progress_interval

if self.verbose:
print ("Work through all %d steps reporting progress every "
"%3.1f secs...") % (self.max_iter, self.progress_interval)

# Create a thread to run the loop, and start it going.
worker = threading.Thread(target=self.dowork)
worker.start()

# Monitoring loop.
loops = 0
# We're going to loop ten times per second using an integer count,
# so multiply the seconds parameter by 10 to give it the same
# magnitude.
intint = int(self.progress_interval*10)
# isAlive will be false after dowork returns
while worker.isAlive():
loops += 1
# Wait 0.1 seconds between checks so that we aren't chewing
# CPU in a spin loop.
time.sleep(0.1)
# when the modulus (second element of divmod tuple) is zero,
# then we have hit a new progress_interval, so emit the report.
if not divmod(loops, intint)[1]:
print "Processed %d of %d" % (self.no, self.max_iter)

if verbose:
print "Finished working through %d steps" % max_iter

if __name__ == "__main__":
#Create the monitor.
monitor = Monitor()
#Run the work function under monitoring.
monitor(work)
 
M

MRAB

Slaunger said:
Hi comp.lang.python

I am a novice Python 2.5 programmer, who write some cmd line scripts
for processing large amounts of data.

I would like to have possibility to regularly print out the progress
made during the processing, say every 1 seconds, and i am wondering
what a proper generic way to do this is.

I have created this test example to show the general problem. Running
the script gives me the output:

Work through all 20 steps reporting progress every 1.0 secs...
Work step 0
Work step 1
Work step 2
Work step 3
Work step 4
Processed 4 of 20
Work step 5
Work step 6
Work step 7
Work step 8
Processed 8 of 20
Work step 9
Work step 10
Work step 11
Work step 12
Work step 13
Processed 13 of 20
Work step 14
Work step 15
Work step 16
Work step 17
Processed 17 of 20
Work step 18
Work step 19
Finished working through 20 steps

The script that does this is as follows:

testregularprogress.py:

"""
Test module for testing generic ways of displaying progress
information
at regular intervals.
"""
import random
import threading
import time

def work(i):
"""
Dummy process function, which takes a random time in the interval
0.0-0.5 secs to execute
"""
print "Work step %d" % i
time.sleep(0.5 * random.random())


def workAll(verbose=True, max_iter=20, progress_interval=1.0):

class _Progress(object):

def __init__(self):
self.no = 0
self.max = max_iter
self.start_timer = verbose

def __str__(self):
self.start_timer = True # I do not like this appraoch
return "Processed %d of %d" % (self.no, self.max)

p = _Progress()

def report_progress():
print p

if verbose:
print "Work through all %d steps reporting progress every
%3.1f secs..." % \
(max_iter, progress_interval)

for i in xrange(max_iter):
if p.start_timer :
p.start_timer = False # Let the progress instance set the
flag
timer = threading.Timer(progress_interval,
report_progress)
timer.start()
work(i)
p.no = i + 1

# Kill the last timer, which is still active at this time
timer.cancel()

if verbose:
print "Finished working through %d steps" % max_iter

if __name__ == "__main__":
workAll()

Quite frankly, I do not like what I have made! It is a mess,
responsibilities are mixed, and it seems overly complicated. But I
can't figure out how to do this right.

I would therefore like some feedback on this proposed generic "report
progress at regular intervals" approach presented here. What could I
do better?
I've come up with this:

"""
Test module for testing generic ways of displaying progress
information
at regular intervals.
"""
import random
import threading
import time

def work(i):
"""
Dummy process function, which takes a random time in the interval
0.0-0.5 secs to execute
"""
print "Work step %d" % i
time.sleep(0.5 * random.random())

def workAll(verbose=True, max_iter=20, progress_interval=1.0):
class _Progress(threading.Thread):
def __init__(self, progress_interval=1.0):
threading.Thread.__init__(self)
self.setDaemon(True)
self.progress_interval = progress_interval
self.progress = None
self.active = True

def stop(self):
self.active = False

def run(self):
while self.active:
if self.progress is not None:
print self.progress
time.sleep(self.progress_interval)

if verbose:
print "Work through all %d steps reporting progress every %3.1f
secs..." % \
(max_iter, progress_interval)

p = _Progress(progress_interval)
p.start()

for i in xrange(max_iter):
p.progress = "Processed %d of %d" % (i + 1, max_iter)
work(i)

p.stop()

if verbose:
print "Finished working through %d steps" % max_iter

if __name__ == "__main__":
workAll()
 
S

Slaunger

I felt like a little lunchtime challenge, so I wrote something that
I think matches your spec, based on your sample code.  This is not
necessarily the best implementation, but I think it is simpler and
clearer than yours.  The biggest change is that the work is being
done in the subthread, while the main thread does the monitoring.
Well, thank you for spending your lunch time break on my little
problem.
It would be fairly simple to enhance this so that you could pass
arbitrary arguments in to the worker function, in addition to
or instead of the loop counter.
Yes, I agree
-----------------------------------------------------------------------
"""
Test module for testing generic ways of displaying progress
information at regular intervals.
"""
import random
import threading
import time

def work(i):
     """
     Dummy process function, which takes a random time in the interval
     0.0-0.5 secs to execute
     """
     print "Work step %d" % i
     time.sleep(0.5 * random.random())

class Monitor(object):
     """
     This class creates an object that will execute a worker function
     in a loop and at regular intervals emit a progress report on
     how many times the function has been called.
     """

     def dowork(self):
         """
         Call the worker function in a loop, keeping track of how
         many times it was called in self.no
         """
         for self.no in xrange(self.max_iter):
             self.func(self.no)

     def __call__(self, func, verbose=True, max_iter=20, progress_interval=1.0):
I had to look up the meaning of __call__, to grasp this, but I get
your methology
         """
         Repeatedly call 'func', passing it the loop count, for max_iter
         iterations, and every progress_interval seconds report how
         many times the function has been called.
         """
         # Not all of these need to be instance variables, but they might
         # as well be in case we want to reference them in an enhanced
         # dowork function.
         self.func = func
         self.verbose = verbose
         self.max_iter=max_iter
         self.progress_interval=progress_interval

         if self.verbose:
             print ("Work through all %d steps reporting progress every "
                 "%3.1f secs...") % (self.max_iter, self.progress_interval)

         # Create a thread to run the loop, and start it going.
         worker = threading.Thread(target=self.dowork)
         worker.start()

         # Monitoring loop.
         loops = 0
         # We're going to loop ten times per second using an integer count,
         # so multiply the seconds parameter by 10 to give it the same
         # magnitude.
         intint = int(self.progress_interval*10)
Is this not an unnecessary complication?
         # isAlive will be false after dowork returns
         while worker.isAlive():
             loops += 1
             # Wait 0.1 seconds between checks so that we aren't chewing
             # CPU in a spin loop.
             time.sleep(0.1)
Why not just call this with progress_interval directly?
             # when the modulus (second element of divmod tuple) is zero,
             # then we have hit a new progress_interval, so emit the report.
And then avoid this if expression?
             if not divmod(loops, intint)[1]:
                 print "Processed %d of %d" % (self.no, self.max_iter)

         if verbose:
             print "Finished working through %d steps" % max_iter

if __name__ == "__main__":
     #Create the monitor.
     monitor = Monitor()
     #Run the work function under monitoring.
     monitor(work)
I was unfamiliar with this notation, but now I understand it simply
invokes __call__. Thank you for showing me that!

OK. I agree this is a more elegant implementation, although I my mind,
I find it more natural if the reporting goes on in a subthread, but
that is a matter of taste, I guess. Anyway: Thank you again for
spending your lunch break on this!

-- Slaunger
 
R

rdmurray

Well, thank you for spending your lunch time break on my little
problem.

Yes, I agree

I had to look up the meaning of __call__, to grasp this, but I get
your methology
Is this not an unnecessary complication?
Why not just call this with progress_interval directly?

Because then the program make take up to progress_interval seconds to
complete even after all the work is done. For a long running program
and a short progress_interval that might not matter, so yes, that would
be a reasonable simplification depending on your requirements.
             # when the modulus (second element of divmod tuple) is zero,
             # then we have hit a new progress_interval, so emit the report.
And then avoid this if expression?
             if not divmod(loops, intint)[1]:
                 print "Processed %d of %d" % (self.no, self.max_iter)

         if verbose:
             print "Finished working through %d steps" % max_iter

if __name__ == "__main__":
     #Create the monitor.
     monitor = Monitor()
     #Run the work function under monitoring.
     monitor(work)
I was unfamiliar with this notation, but now I understand it simply
invokes __call__. Thank you for showing me that!

Yes, it is a very nice feature of python :)
OK. I agree this is a more elegant implementation, although I my mind,
I find it more natural if the reporting goes on in a subthread, but

You could pretty easily rewrite it to put the reporter in the subthread,
it was just more natural to _me_ to put the worker in the subthread,
so that's how I coded it. Note, however, that if you were to write a
GUI front end it might be important to put the worker in the background
because on some OSes it is hard to update GUI windows from anything
other than the main thread. (I ran into this in a Windows GUI ap I
wrote using wxPython).
that is a matter of taste, I guess. Anyway: Thank you again for
spending your lunch break on this!

No problem, it was fun.

--RDM
 
S

Slaunger

Because then the program make take up to progress_interval seconds to
complete even after all the work is done.  For a long running program
and a short progress_interval that might not matter, so yes, that would
be a reasonable simplification depending on your requirements.
Ah, OK. With my timer.cancel() statement in my original proposal I
avoided that.
You could pretty easily rewrite it to put the reporter in the subthread,
it was just more natural to _me_ to put the worker in the subthread,
so that's how I coded it.  Note, however, that if you were to write a
GUI front end it might be important to put the worker in the background
because on some OSes it is hard to update GUI windows from anything
other than the main thread.  (I ran into this in a Windows GUI ap I
wrote using wxPython).
Ah, yes, you right. For GUIs this is often quite important. I don't do
much GUI, so This is not something I had strongly in mind.

Br,

-- Slaunger
 
J

Jon Morton

I think there is something I've run into, and it will only ever get
_loaded_ once. See below for sour
output of an easy example case of my problem. The wxPython lib is
big, so it may have bugs, but pySer
l and pure python (no C/C++ directly, event thought it uses os.open/
close & termios that themselves use
mistaking)) and I see nothing that could cause this. I used the
"ps" command to see the memory usage
recipe that I found that counts the number of objects. To test them
just run each example and run "ps
python" once in a few days. This is probably just bias from my last
programming environment, though.
 
S

Slaunger

It's a text progress bar

Sorry, apparently I did not realize that at first sight. Anyway, I'd
rather avoid using further external modules besides the standard
batteries, as I would have to update several workstations with
different OSes (some of which I do not have admin access to) to use
the new module.

-- Slaunger
 
E

eric

Don't mind if I give my shot ?

def work(i):
"""
Dummy process function, which takes a random time in the interval
0.0-0.5 secs to execute
"""
print "Work step %d" % i
time.sleep(0.5 * random.random())

def workAll(work, verbose=True, max_iter=20, progress_interval=1.0):
'''
pass the real job as a callable
'''
progress = time.time()
for i in range(max_iter): # do the requested loop
work(i)
if verbose:
print "Work through all %d steps reporting progress every
%3.1f secs..." %(max_iter, progress_interval)
interval = time.time()-progress
if interval>progress_interval:
print "Processed %d of %d at pace %s" % (i, max_iter,
interval)
progress +=interval


if __name__=="__main__":
workAll(work, False)


It's works fine, and the "pace" is 'almost' the required one. You earn
a no-thread-mess, and cleaner alg.

But the loop is controlled by the caller (the WorkAll function) this
is also called ass-backward algorithm, and you cannot expect
algorithms to be assbackward (even if it's the best way to implement
them).

You can use the yield statement, to turn easilly your alg into a
nice, stopable assbackward algo:

def work():
"""
Dummy process function, which takes a random time in the interval
0.0-0.5 secs to execute
"""
for i in range(50):
print "Work step %d" % i
time.sleep(0.5 * random.random())
yield i # kind-of "publish it and let the caller do whatever
it want s (good practice anyway)


def workAll(work, verbose=True, max_iter=20, progress_interval=1.0):
'''
pass the real job as a generator
'''
progress = time.time()
i = 0
for w in work: # do the requested loop
if verbose:
print "Work through all %d steps reporting progress every
%3.1f secs..." %(max_iter, progress_interval)
interval = time.time()-progress
if interval>progress_interval:
print "Processed %d at pace %s" % (w, interval)
progress +=interval
if i>=max_iter:
work.close()
i+=1


if __name__=="__main__":
workAll(work(), False) # note the calling difference


hope it helps.
 
S

Slaunger

Don't mind if I give my shot ?

def work(i):
    """
    Dummy process function, which takes a random time in the interval
    0.0-0.5 secs to execute
    """
    print "Work step %d" % i
    time.sleep(0.5 * random.random())

def workAll(work, verbose=True, max_iter=20, progress_interval=1.0):
    '''
    pass the real job as a callable
    '''
    progress = time.time()
    for i in range(max_iter): # do the requested loop
        work(i)
        if verbose:
            print "Work through all %d steps reporting progress every
%3.1f secs..." %(max_iter, progress_interval)
        interval = time.time()-progress
        if interval>progress_interval:
            print "Processed %d of %d at pace %s" % (i, max_iter,
interval)
            progress +=interval

if __name__=="__main__":
    workAll(work, False)

It's works fine, and the "pace" is 'almost' the required one. You earn
a no-thread-mess, and cleaner alg.

But the loop is controlled by the caller (the WorkAll function) this
is also called ass-backward algorithm, and you cannot expect
algorithms to be assbackward (even if it's the best way to implement
them).

You can use the yield statement, to turn  easilly your alg into a
nice, stopable assbackward algo:

def work():
    """
    Dummy process function, which takes a random time in the interval
    0.0-0.5 secs to execute
    """
    for i in range(50):
        print "Work step %d" % i
        time.sleep(0.5 * random.random())
        yield i # kind-of "publish it and let the caller do whatever
it want s (good practice anyway)

def workAll(work, verbose=True, max_iter=20, progress_interval=1.0):
    '''
    pass the real job as a generator
    '''
    progress = time.time()
    i = 0
    for w in work: # do the requested loop
        if verbose:
            print "Work through all %d steps reporting progress every
%3.1f secs..." %(max_iter, progress_interval)
        interval = time.time()-progress
        if interval>progress_interval:
            print "Processed %d at pace %s" % (w, interval)
            progress +=interval
        if i>=max_iter:
            work.close()
        i+=1

if __name__=="__main__":
    workAll(work(), False)     # note the calling difference

hope it helps.

Hi eric,

No, I certainly don't mind you giving a try ;-)

I actually started out doing something like your first version here,
but I am a little annoyed by the fact that the progress report
interval is not a sure thing. For instance in my real applications, I
have seldomly occuring work steps, which may take significantly longer
than the progress_interval, and I'd like to let it keep reporting
that, oh, I am still woking, albeit on the same work step, to maintain
a sense of the script being alive.

I like you generator approach though.

Anyway, I have now given my own proposal another iteration based on
what I have seen here (and my personal preferences), and I have come
up with this:

============ src =======================
"""
Test module for testing generic ways of displaying progress
information
at regular intervals.
"""
import random
import threading
import time

def work(i):
"""
Dummy process function, which takes a random time in the interval
0.0-0.5 secs to execute
"""
print "Work step %d" % i
time.sleep(0.5 * random.random())


def workAll(verbose=True, max_iter=20, progress_interval=1.0):

class ProgressReporter(threading.Thread):

def __init__(self):
threading.Thread.__init__(self)
self.setDaemon(True)
self.i = 0
self.max = max_iter
self.start_timer = verbose
self.progress_interval = progress_interval

def run(self):
while self.start_timer:
print "Processed %d of %d." % (self.i + 1, self.max)
time.sleep(self.progress_interval)

p = ProgressReporter()

if verbose:
print "Work through all %d steps reporting every %3.1f
secs..." % \
(max_iter, progress_interval)
p.start()

for i in xrange(max_iter):
work(i)
p.i = i

if verbose:
print "Finished working through %d steps" % max_iter

if __name__ == "__main__":
workAll()

========= end src ================================

I like this much better than my own first attempt in my initial post
on this thread.

-- Slaunger
 
G

George Sakkis

Sorry, apparently I did not realize that at first sight. Anyway, I'd
rather avoid using further external modules besides the standard
batteries, as I would have to update several workstations with
different OSes (some of which I do not have admin access to) to use
the new module.

How is this different from writing your own module from scratch ? You
don't need admin access to use a 3rd party package.

George
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,770
Messages
2,569,583
Members
45,075
Latest member
MakersCBDBloodSupport

Latest Threads

Top