better scheduler with correct sleep times

Q

qvx

I need a scheduler which can delay execution of a
function for certain period of time.
My attempt was something like this:

def delay(self, func, arg, delay_sec=0):
fire_at = wallclock() + delay_sec
self.queue.put((fire_at, func, arg))

def runner(self):
while self.alive:
fire_at, func, arg = self.queue.get(block=True)
try:
now = wallclock()
if now < fire_at:
time.sleep(fire_at - now)
func(arg)
except Exception, e:
log('DelayedTaskManager %s: %s\n' % (self.name, e))
finally:
self.queue.task_done()

But then I came up with the following case:

1. I call delay with delay_sec = 10
2. The scheduler goes to sleep for 10 seconds
3. In the meantime (lets say 1 second later) I delay
another func but this time with delay_sec=0.5
4. The scheduler is sleeping and won't know call my
second function for another 9 seconds insted of 0.5

I started googling for scheduler and found one in standard library
but ih has the same code as mine (it calls the functions in the
right order and my doesn't, but it still waits too long).
The other schedulers from web are dealing with
repeating tasks and such.

So, I wrote this:

# modification of http://code.activestate.com/recipes/87369/
class PriorityMinQueue(Queue):
def top(self):
try:
return self.queue[0]
except IndexError:
return None
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = []
def _put(self, item):
return heappush(self.queue, item)
def _get(self):
return heappop(self.queue)

class DelayedTaskManager:

def __init__(self, name):
self.name = name
self.queue = PriorityMinQueue()
# can't use queue.not_empty condition because it isn't
# signaled with notifyAll so I have to use my own
self.sleeper = threading.Condition()

def start(self):
log('start delayed task manager %s with %d elements\n' %
(self.name, self.queue.qsize()))
self.alive = True
self.thread = threading.Thread(target=self.runner)
self.thread.setDaemon(True)
self.thread.start()

def stop(self):
log('stop delayed task manager %s with %d elements\n' %
(self.name, self.queue.qsize()))
self.alive = False
self._wake()
self.thread.join()

def delay(self, delay_sec, func, *arg, **kw):
# even if delay is 0 or less, put to queue
# so the function gets executed concurrently
fire_at = wallclock() + delay_sec
self.queue.put((fire_at, func, arg, kw))
self._wake()

def _wake(self):
with self.sleeper:
self.sleeper.notify()

def _wait(self, timeout):
with self.sleeper:
self.sleeper.wait(timeout)

def runner(self):
while self.alive:
fire_at, func, arg, kw = self.queue.get(block=True)
try:
now = wallclock()
while now < fire_at:
self._wait(fire_at - now)
if not self.alive: # canceled
log('delayed task manager %s was stoped\n',
self.name)
return self.queue.put((fire_at, func, arg,
kw))
top = self.queue.top()
if top is not None and top[0] < fire_at:
# temporally closer item, put back the old one
self.queue.put((fire_at, func, arg, kw))
self.queue.task_done()
fire_at, func, arg, kw = self.queue.get()
now = wallclock()
func(*arg, **kw)
except Exception, e:
log('delayed task manager %s: %s\n', self.name, e)
finally:
self.queue.task_done()


Is there a better way or some library that does that?

My observations:

1. Threading module uses time.sleep instead of time.clock
which results in less precise results (on windows platform)

if sys.platform=="win32": #take care of differences in clock
accuracy
wallclock = time.clock
else:
wallclock = time.time

2. while analyzing threading module i noticed that wait() is
implemented via loop and tiny sleep periods. I was expecting
the usage of underlaying OS primitives and functions but
then I remembered about GIL and quasi-multithreaded nature
of Python. But still, isn't there a more precise method
that interpreter itself could implement?

Thanks,
Tvrtko

P.S. This was Python 2.5
 
C

Chris Rebert

I need a scheduler which can delay execution of a
function for certain period of time.
My attempt was something like this:
[code snipped]

But then I came up with the following case:

1. I call delay with delay_sec = 10
2. The scheduler goes to sleep for 10 seconds
3. In the meantime (lets say 1 second later) I delay
another func but this time with delay_sec=0.5
4. The scheduler is sleeping and won't know call my
second function for another 9 seconds insted of 0.5

I started googling for scheduler and found one in standard library
but ih has the same code as mine (it calls the functions in the
right order and my doesn't, but it still waits too long).
The other schedulers from web are dealing with
repeating tasks and such.

So, I wrote this:
[more code snipped]

Is there a better way or some library that does that?

I believe you're looking for the 'sched' module:
http://www.python.org/doc/2.5.2/lib/module-sched.html

Cheers,
Chris
 
S

sokol

I believe you're looking for the 'sched' module:http://www.python.org/doc/2.5.2/lib/module-sched.html

The sched module behaves just like mine version because
it uses almost the same code. My observations include the
sched module as well. Check it's source code. It is flawed:
it calls the sleep method and while it sleeps (presumably
for a long time) all newly scheduled events are on hold.
See my example in original post.

My code solves this problem (or so it looks to me right now).
 
J

James Mills

[ ... ]
Is there a better way or some library that does that?

How about this ?

$ ./timerexamples.py
Time: 1224375945.336958
Timer 2 fired at: 1224375945.840600
Timer 1 fired at: 1224375955.336889

<code>
#!/usr/bin/env python

import time

from circuits.core import Manager, Component, Event, listener
from circuits.timers import Timer

class TimerExamples(Component):

@listener("timer1")
def onTIMER1(self):
print "Timer 1 fired at: %f" % time.time()

@listener("timer2")
def onTIMER2(self):
print "Timer 2 fired at: %f" % time.time()

m = Manager()
m += TimerExamples()

timers = []
timers.append(Timer(10, Event(), "timer1"))
timers.append(Timer(0.5, Event(), "timer2"))

for timer in timers:
m += timer

print "Time: %f" % time.time()

while True:
try:
m.flush()
for timer in timers:
timer.poll()
except KeyboardInterrupt:
break
</code>

cheers
James
 
S

sokol

qvx said:
I need a scheduler which can delay execution of a
function for certain period of time.
My attempt was something like this:  ... <<<code>>>
Is there a better way or some library that does that?

The trick is to use Queue's timeout argument to interrupt your sleep
when new requests come in.

def time_server(commands):
     '''Process all scheduled operations that arrive on queue commands'''
     pending = []
     while True:
         now = time.time()
         while pending and pending[0][0] <= now:
             when, function, args, kwargs = heapq.heappop(pending)
             function(*args, **kwargs)
         try:
             command = commands.get(timeout=pending[0][0] - now
                                            if pending else None)
         except Queue.Empty:
             pass
         else:
             if command is None:
                 break
             heapq.heappush(pending, command)

queue = Queue.Queue()
thread.thread.start_new_thread(queue)
queue.put((time.time() + dt, callable, args, {}))

I see what you did there. You are keeping the queue empty
so you get notified for free, while I introduced a new
threading Condition to detect insertions.
All that is missing in your version is to put back all
pending tasks when somebody sends the stop (None) request.

Shouldn't sched module do something similar?

Tvrtko
 
L

Lie

The sched module behaves just like mine version because
it uses almost the same code. My observations include the
sched module as well. Check it's source code. It is flawed:
it calls the sleep method and while it sleeps (presumably
for a long time) all newly scheduled events are on hold.
See my example in original post.

My code solves this problem (or so it looks to me right now).

The trick to your problem is the heart of any event-driven
programming: Event Queue. There will always be a mainloop, which is a
method that would check the queue for new events and execute the
appropriate callbacks for each event. For a Timer object, it will be
an event that returns a callback that simply reinserts itself to the
the queue until "the time" has arrived. When the time has arrived, it
will execute the function you've given before.

Note that there is no need to use threading here.
 
L

Lie

The sched module behaves just like mine version because
it uses almost the same code. My observations include the
sched module as well. Check it's source code. It is flawed:
it calls the sleep method and while it sleeps (presumably
for a long time) all newly scheduled events are on hold.
See my example in original post.

My code solves this problem (or so it looks to me right now).


Alternatively, if you're only handling Timers, you could use an event
loop that would check a list of timers and check their expiry time,
and execute the appropriate callback when the timer expired.
 
S

sokol

Alternatively, if you're only handling Timers, you could use an event
loop that would check a list of timers and check their expiry time,
and execute the appropriate callback when the timer expired.

And that is what I'm doing. The trick is to do it correctly while:

1. Not using the processor time in useless loops and checks
-> use sleep()

2. Sleep just as much as it is needed and not too much
-> wake from sleep if there are new tasks since new tasks
could be scheduled for some earlier point in time.

My solution was: introduce "new_task" event and sleep on it with
desired timeout. If new tasks arrive I will be awaken before the
timeout and get a chance to reexamine if I need to sleep less
than before.

Scott's solution was: keep the queue empty so you get awaken
automatically when new tasks are queued. Much simpler.

Also, threading is quite necessary in my case: this "event loop"
is not the main loop of the program as is found in GUI applications,
but rather a plain old scheduler which runs some tasks concurrently
to the main thread of execution.

What was a surprise to me was that python sched.py makes the same
mistake as I did in my first version.

Tvrtko
 
G

greg

sokol said:
What was a surprise to me was that python sched.py makes the same
mistake as I did in my first version.

The sched module is *not* designed for multithreading. It
assumes that the thread waiting for events is the only one
putting events into the queue, so it's impossible for an
event to get scheduled while in the midst of a sleep. It
also doesn't take any measures to protect its data structures
from concurrent access.

The documentation could make this clearer, especially since
it confusingly talks about "allowing other threads to run".
 
S

sokol

The sched module is *not* designed for multithreading. It
assumes that the thread waiting for events is the only one
putting events into the queue, so it's impossible for an
event to get scheduled while in the midst of a sleep. It
also doesn't take any measures to protect its data structures
from concurrent access.

The documentation could make this clearer, especially since
it confusingly talks about "allowing other threads to run".

I find that hard to believe. Scheduler in single threaded
application is useless (well, you can correct me because
right now I can't come up with an example). Also, the
scheduler runs inside a loop. How do you suppose to
run other code while the loop is executing? Remember, all
you have is a single thread. The consequence of this is
that the only way to insert something new inside a queue
is by doing it from scheduled function. Furthermore,
if scheduler is single threaded, why does is
check if the top event has changed after sleep period?

What I can agree is that python sched as it is (not
designed for multithreading) is quite useless.
 
C

Carl Banks

I find that hard to believe. Scheduler in single threaded
application is useless (well, you can correct me because
right now I can't come up with an example). Also, the
scheduler runs inside a loop. How do you suppose to
run other code while the loop is executing? Remember, all
you have is a single thread. The consequence of this is
that the only way to insert something new inside a queue
is by doing it from scheduled function.

Yeah, that does happen.

I often used this sort of thing (pre-Python days, so a long time ago)
in single-threaded, real-time simulations.

Don't know mcuh about the sched module, but time scheduling in general
doesn't need multithreading, not one bit.


Carl Banks
 
G

greg

sokol said:
Also, the
scheduler runs inside a loop. How do you suppose to
run other code while the loop is executing?

The sleep function could be doing a select with a
timeout on some other source of events, such as a
socket or a gui input event stream. One possible
response to such an event is to schedule another
event. There's not so much need for that nowadays,
since most gui libraries provide a way of scheduling
timed events as part of their built-in event loop,
but you might want to use something like this in
a server that deals with network connections.

Another possible use is discrete-event simulation,
where the "sleep" function doesn't physically sleep
but just advances a simulated time, and all events
(other than the first one that starts everything off)
are scheduled by callbacks for other events.

So while its uses are rather specialized, I wouldn't
say it's useless. The main problem is that its nature
needs to be much more clearly spelled out in the
docs -- it's something of an attractive nuisance the
way it is.
 
G

Gabriel Genellina

I find that hard to believe. Scheduler in single threaded
application is useless (well, you can correct me because
right now I can't come up with an example).

Imagine your environment doesn't provide any kind of multithreading
support. Couldn't you write an interactive game, a FTP server, a
messaging system? Programmers have done that for years. The fact that
you *can* write such things using multiple threads doesn't mean that't
the only way to do that, nor the best one. It's like `fork`, the Unix
programmer's hammer: every problem becomes a nail.
Also, the
scheduler runs inside a loop. How do you suppose to
run other code while the loop is executing? Remember, all
you have is a single thread. The consequence of this is
that the only way to insert something new inside a queue
is by doing it from scheduled function. Furthermore,
if scheduler is single threaded, why does is
check if the top event has changed after sleep period?

What I can agree is that python sched as it is (not
designed for multithreading) is quite useless.

The sched module (and mutex too, BTW) exists right from the beginning
of Python, ages before multithreading support were added to the
language. The algorithm hasn't changed in years; I wouldn't say it's
useless, it's just not suitable for the kind of scheduling one usually
wants to do in a multithreaded environment.
The latest release (2.6) contains this warning::

In multi-threaded environments, the scheduler class has
limitations with respect to thread-safety, inability to insert a new
task before the one currently pending in a running scheduler, and
holding up the main thread until the event queue is empty. Instead, the
preferred approach is to use the threading.Timer class instead.
 
S

sokol

The sleep function could be doing a select with a
timeout on some other source of events, such as a
socket or a gui input event stream. One possible
response to such an event is to schedule another
event. There's not so much need for that nowadays,
since most gui libraries provide a way of scheduling
timed events as part of their built-in event loop,
but you might want to use something like this in
a server that deals with network connections.

Another possible use is discrete-event simulation,
where the "sleep" function doesn't physically sleep
but just advances a simulated time, and all events
(other than the first one that starts everything off)
are scheduled by callbacks for other events.

So while its uses are rather specialized, I wouldn't
say it's useless. The main problem is that its nature
needs to be much more clearly spelled out in the
docs -- it's something of an attractive nuisance the
way it is.

I see. The delayfunc is user defined function so it
doesn't have to sleep at all. If you are creative
enough, you can use this scheduler in many ways.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,534
Members
45,007
Latest member
obedient dusk

Latest Threads

Top