raise exceptions in generators/functions from other tasks/functions

Discussion in 'Python' started by Dominic, May 15, 2004.

  1. Dominic

    Dominic Guest

    Just in case someone is interested.
    I came up with this solution since my previous
    posting.
    Maybe you've got some other ideas or critcism.
    I'll listen ;-)

    Ciao,
    Dominic

    P.S. Python is really powerful!

    Use-Case:

    def driveTask(handler):
    print " enter drive"
    yield "current speed"

    while 1:
    try:
    handler.register(driveTask, (NoPower, WallHit))
    while 1:
    print " adjust engine"
    yield "current speed"
    print " steer"

    except WallHit:
    print "hit the wall :-("
    except NoPower:
    print "need some fresh batteries :)"


    ====================================================================

    Full-Source code:

    from types import GeneratorType

    class TaskHandler(object):

    def __init__(self):
    self.tasks = []

    def addTask(self, task):
    self.tasks.append(task)

    def removeTask(self, task):
    del self.tasks[task]

    def loop(self):
    for t in self.tasks:
    # generator task
    if type(t) == GeneratorType:
    if not t.next():
    # generator is done, remove it
    self.tasks.remove(t)
    else: # normal method/function
    t()


    from sys import settrace

    class SignalingTaskHandler(TaskHandler):

    def __init__(self):
    super(SignalingTaskHandler, self).__init__()

    self.function2signal = {}
    self.activeSignals = ()
    self.newSignals = []
    self.loopTask = self.__loop()

    def register(self, aFunction, signals):
    self.function2signal[id(aFunction.func_code)] = signals

    def signal(self, aSignal):
    if not aSignal in self.newSignals:
    self.newSignals.append(aSignal)

    def loop(self):
    self.loopTask.next()

    def __loop(self):
    while 1:
    settrace(self.__intercept)
    tmp = len(self.newSignals)

    try:
    super(SignalingTaskHandler, self).loop()
    finally:
    settrace(None)

    # if we've reached "steady state", we make a step
    # I think Esterel-language has similar signaling semantics
    if tmp == len(self.newSignals):
    self.__step()

    yield True


    def __step(self):
    # switch to new signal set, discard old ones
    self.activeSignals = tuple(self.newSignals)
    self.newSignals = []

    def __intercept(self, frame, event, arg):
    def _i(frame, event, arg):
    # we could return _i to trace inside function
    # and not only calls to it

    if self.activeSignals:
    signals = self.function2signal[key]
    # check if there is one signal to raise
    for s in self.activeSignals:
    if s in signals:
    raise s

    # every function gets it's own tracer, otherwise
    # exceptions cannot be catched :-( Why?
    key = id(frame.f_code)
    if key in self.function2signal:
    return _i

    return None


    # A Use Case :


    class WallHit(Exception):
    pass

    class NoPower(Exception):
    pass

    def driveTask(handler):
    print " enter drive"
    yield "current speed"

    while 1:
    try:
    handler.register(driveTask, (NoPower, WallHit))
    while 1:
    print " adjust engine"
    yield "current speed"
    print " steer"

    except WallHit:
    print "hit the wall :-("
    except NoPower:
    print "need some fresh batteries :)"

    counter = 0

    def miscTask():
    global counter

    if counter == 4:
    handler.signal(WallHit)
    if counter == 5:
    handler.signal(NoPower)

    counter += 1

    # "special" tasks with signaling
    handler = SignalingTaskHandler()
    tmp = driveTask(handler)
    handler.addTask(tmp)

    # main loop

    mainTaskHandler = TaskHandler()

    # well, there could be a lot more
    mainTaskHandler.addTask(miscTask)
    mainTaskHandler.addTask(handler.loop)


    for _ in range(10):
    mainTaskHandler.loop()


    # enter drive
    # adjust engine
    # steer
    # adjust engine
    # steer
    # adjust engine
    # steer
    # adjust engine
    # hit the wall :-(
    # adjust engine
    # need some fresh batteries :)
    # adjust engine
    # steer
    # adjust engine
    # steer
    # adjust engine
    # steer
    # adjust engine
    Dominic, May 15, 2004
    #1
    1. Advertising

  2. Dominic wrote:
    > ...
    > class TaskHandler(object):
    > def __init__(self):
    > self.tasks = []
    > def addTask(self, task):
    > self.tasks.append(task)
    > def removeTask(self, task):
    > del self.tasks[task]
    > ...


    I think you mean:
    def removeTask(self, task):
    self.tasks.remove(task)

    Or, perhaps:
    class TaskHandler(object):
    def __init__(self):
    self.tasks = {}
    def addTask(self, task):
    self.tasks[task] = task
    def removeTask(self, task):
    del self.tasks[task]

    --
    -Scott David Daniels
    Scott David Daniels, May 17, 2004
    #2
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Jacol

    raise or not to raise [Newbie]

    Jacol, Feb 3, 2007, in forum: Python
    Replies:
    5
    Views:
    405
    Gabriel Genellina
    Feb 5, 2007
  2. Alex G
    Replies:
    2
    Views:
    213
    Alex G
    Jun 25, 2008
  3. ernest
    Replies:
    2
    Views:
    288
    Roy Smith
    Nov 14, 2010
  4. Jack Bates
    Replies:
    0
    Views:
    276
    Jack Bates
    May 2, 2011
  5. bvdp

    Raise X or Raise X()?

    bvdp, Mar 11, 2012, in forum: Python
    Replies:
    10
    Views:
    365
    Stefan Behnel
    Mar 12, 2012
Loading...

Share This Page