Asynchronous Messaging

W

wink

Hello,

I'm getting my feet wet in Python and thought I'd try to see how well
Python works for asynchronous messaging. I've been using asynchronous
messaging for 5 years and find it advantageous for many things. In the
scheme I use communication is not only asynchronous but it is also non-
blocking and inherently serialized via a queue.

This provides two benefits, a sender is never directly effected by the
receiver and since the receiver handles only one message at a time
it generally never has to use mutexes or semaphores. This allows for
the
programmer to use multiple threads without having to contend with the
associated issues mutexes have in the area of deadly embraces or race
conditions.

Anyway, below is a link to the first pass at an implementation
in Python, comments are welcome:

http://www.saville.com/python/mproc.tgz

Regards,

Wink Saville
 
F

Fredrik Lundh

wink said:
This provides two benefits, a sender is never directly effected by the
receiver and since the receiver handles only one message at a time
it generally never has to use mutexes or semaphores. This allows for
the programmer to use multiple threads without having to contend with the
associated issues mutexes have in the area of deadly embraces or race
conditions.

import Queue # ?

</F>
 
W

wink

To make it easier to comment on the code, I'm including
"mproc.py" file below. Fredrik, was commenting about using
Queue and in fact I do. Queue is quite nice and is also
thread safe, which is a requirement for this implementation.
But its performance is poor if the number of items on a
Queue becomes large because it is implemented using a list.

One of the things I was thinking of was doing another implementation
using of Queue which was based on deque.


"""Message Processor Module.

This modules allows programmers to create compnents
which communicate asynchronously via messages. In
addition the receiving component is will only handle
one message at a time. This allows the programmer to
create multi-threaded program with fewer shared memory
thus fewer mutex's and semaphore's.

The basic communication is via an instance of the
Msg class which is sent to mproc's using the send
method. An Mproc is an active component which has a
thread and executes asynchronously from all other
Mproc's by using one MprocDriver for each Mproc. It
is also possible for several mproc's to share one
MprocDriver by using BaseMproc.

Each mproc must override the _handler method. When a
message arrives for a mproc is it placed in a Queue
and the driver calls the _handler method passing the
Msg as a parameter. The driver uses the Queue to
serialize the message processing, thus the _handler
method will be invoked with one message at a time.
Thus the _handler method does not generally need to
use mutex's or semaphore's. But because each message's
processing must be completed before the next message
will be started it is important that the message be
processed as quickly as possible.

Add more documentation."""

import copy
import threading
import Queue
import traceback

class Msg:
"""A message"""

def __init__(self):
"""Initializer"""
self.dstMpId = None
self.dstCnId = None
self.srcMpId = None
self.srcCnId = None
self.mid = None
self.cmd = None
self.tag = None
self.status = None
self.data = None

def dup(self):
"""Duplicate the message"""
msgCopy = copy.deepcopy(self)
return msgCopy

def send(self):
"""Send a message.

Returns True if the message was started on its way"""

try:
MprocDriver._mpList[self.dstMpId]._msgQueue.put(self)
return True
except:
return False

class BaseMproc:
"""Message Processor.

A message processor requires a handler method and another
driver which passes messages to it. This mproc driver has
one overriding requirement, it must only pass one message at
a time to the handler method. This eliminates any need for
the programmer to worry about multi-threading issues while
processing messages.

This does put a burden on the handler routine, it must process
messages quickly and in a non-blocking fashion so that the
mproc may remain lively.

The name of an BaseMproc must be unique, an exception is thrown
if the name is already used."""

def __init__(self, name, mprocDriver):
"""Initializer"""
self.name = name
addMproc(self, mprocDriver)

def close(self):
"""Close the mproc"""
#print "BaseMproc.close: ", self.name
try:
rmvMproc(self)
except:
#print "BaseMproc.close: excption"
traceback.print_exc()
self._unreg()

def _handler(self):
"""Override this routine."""
raise Exception("BaseMproc._handler needs to be overridden")

def _reg(self, mprocDriver, id):
"""Register the mproc driver for this mproc"""
self._mprocDriver = mprocDriver
self._msgQueue = mprocDriver._msgQueue
self.id = id

def _unreg(self):
"""Unregister the mproc driver for this mproc"""
self._mprocDriver = None
self._msgQueue = None
self.id = None

class Mproc(BaseMproc):
"""Active Message Processor.

An active message processor isa BaseMproc but it always creates a
MprocDriver instance as its driver"""

def __init__(self, name):
"""Initializer"""
BaseMproc.__init__(self, name,
MprocDriver("ActiveMprocDriver_" + name))

def close(self):
"""Close the active mproc"""
try:
this_mprocDriver = self._mprocDriver
BaseMproc.close(self)
this_mprocDriver.close()
except:
print "Mproc.close: excption"
traceback.print_exc()
self._unreg()

class MprocDriver(threading.Thread, BaseMproc):
"""Message processor driver."""
_mpList = []
_mpDict = {}

def __init__(self, name):
"""Initializer"""

self._thisMpdDict = {}
self._running = True
self._msgQueue = Queue.Queue()

threading.Thread.__init__(self)
BaseMproc.__init__(self, name, self)

self.start()

def _regMproc(self, mproc, id):
self._thisMpdDict[mproc.name] = mproc
mproc._reg(self, id)

def _unregMproc(self, mproc):
#print "%s._unregMproc(%s):" % (self.name, mproc.name)
del self._thisMpdDict[mproc.name]
mproc._unreg()

def _handler(self, msg):
if (msg.mid == -1) and (msg.cmd == 0):
self._running = False
msg.completeQ.put(0)

def close(self):
"""Close the mproc driver"""

# Remove all mprocs related to this MprocDriver?????
mprocs = self._thisMpdDict.values()
for mproc in mprocs:
if (mproc.name != self.name):
rmvMproc(mproc)

completeQ = Queue.Queue()
msg = Msg()
msg.dstMpId = self.id
msg.mid = -1
msg.cmd = 0
msg.completeQ = completeQ
msg.send()
completeQ.get()

# Remove ourself
BaseMproc.close(self)
del self._thisMpdDict

def run(self):
while (self._running):
try:
msg = self._msgQueue.get()
mproc = MprocDriver._mpList[msg.dstMpId]
mproc._handler(msg)
except:
if msg == None:
print "run: no message"
elif mproc == None:
print "run: no mproc"
elif not callable(mproc._handler):
print "run: mproc._handler is not callable"
else:
print "run: mproc._handler caused an exception"
traceback.print_exc()

class PsMgr:
"""Publish Subscribe Manager.

Allow mprocs to subscribe to any message having
a specified mid/cmd.

Maybe this should be in a separte module and instead
of using class variables?
"""

midDict = {}

@classmethod
def publish(self, msg):
"""Send the message to all subscribers"""
pass

@classmethod
def subscribe(self, mproc, mid, cmd):
"""Subscribe the mproc to messages with mid/cmd"""
self.publish = PsMgr._publish
self.subscribe = PsMgr._subscribe
self.unsubscribe = PsMgr._unsubscribe
self.subscribe(mproc, mid, cmd)

@classmethod
def unsubscribe(self, mproc, mid, cmd):
"""Unsubscirve the mproc"""
pass

@classmethod
def _publish(self, msg):
"""The actual publish routine where there is one or more
subscribers"""
try:
#print "_publish: msg.mid=%d msg.cmd=%d" % (msg.mid,
msg.cmd)
subscribers = self.midDict[msg.mid][msg.cmd]
except KeyError:
#print "_publish: error no subscribers for msg.mid=%d
msg.cmd=%d" % (msg.mid, msg.cmd)
pass
else:
for mproc in subscribers:
msgNew = msg.dup()
#print "_public mid=%d cmd=%d to %s" % (msgNew.mid,
msgNew.cmd, mproc.name) #, mproc.id)
msgNew.dstMpId = mproc.id
msgNew.send()

@classmethod
def _subscribe(self, mproc, mid, cmd):
"""The actual subscribe routine"""
#print "_subscribe: add mproc %s for mid=%d cmd=%d" %
(mproc.name, mid, cmd)
cmdDict = self.midDict.get(mid, {})
subscribers = cmdDict.get(cmd, [])
subscribers.append(mproc)
cmdDict[cmd] = subscribers
self.midDict[mid] = cmdDict

@classmethod
def _unsubscribe(self, mproc, mid, cmd):
"""The actual unsubscribe routine when there is one or more
subscribers"""
#print "_unsubscribe: remove mproc %s for mid=%d cmd=%d" %
(mproc.name, mid, cmd)
cmdDict = self.midDict.get(mid, {})
subscribers = cmdDict.get(cmd, [])

delList = []
count = 0
for mp in subscribers:
if mp == mproc:
delList.append(count)
count += 1
l = len(delList)
for x in xrange(l-1, -1, -1):
del subscribers[delList[x]]

if len(self.midDict) == 0:
self.publish = PsMgr.publish
self.subscribe = PsMgr.subscribe
self.unsubscribe = PsMgr.unsubscribe

def lookupMproc(name, onNotFound=None):
"""Lookup an message processor"""
try:
#print "lookupMproc: %s dict=%s" % (name, MprocDriver._mpDict)
mproc = MprocDriver._mpDict[name]
except: # Add not found exception?
#print "lookupMproc: NOT FOUND", name
mproc = onNotFound
return mproc

def addMproc(mproc, mprocDriver):
"""Add a new message processor to the database"""
if (lookupMproc(mproc.name) != None):
raise NameError("%s BaseMproc already exists" % mproc.name)

#print "addMproc:", mproc.name
MprocDriver._mpList.append(mproc)
MprocDriver._mpDict[mproc.name] = mproc
#print "addMproc: dict=", MprocDriver._mpDict

id = MprocDriver._mpList.index(mproc)
mprocDriver._regMproc(mproc, id)

def rmvMproc(mproc):
"""Remove message processor from the database"""
#print "rmvMproc:", mproc.name
MprocDriver._mpList[mproc.id] = None
del MprocDriver._mpDict[mproc.name];
#print "rmvMproc: dict=", MprocDriver._mpDict
mproc._mprocDriver._unregMproc(mproc)
 
F

Fredrik Lundh

wink said:
But its performance is poor if the number of items on a
Queue becomes large because it is implemented using a list.
One of the things I was thinking of was doing another implementation
using of Queue which was based on deque.

Updating from 2.3 to something newer will fix that, of course:

$ more Queue.py

....

from collections import deque

....

class Queue:

...

def _init(self, maxsize):
self.maxsize = maxsize
self.queue = deque()

</F>
 
W

wink

Fredrik,

You are most correct, but Queue is slow compared to deque but
not for the reason I guessed. Apparently it's because deque is
implemented in C while Queue is in python. Using the program below
it looks there is about a 35:1 speed difference.

100 d.append 0.000011s 0.1097us per
100 d.popL 0.000011s 0.1097us per
100 q.put 0.000429s 4.2892us per
100 q.get 0.000391s 3.9077us per

So someday it _might_ warrant adding Queue to collections.


#!/usr/bin/python

import timeit
import Queue
from collections import deque

setupD = """
from collections import deque
d=deque()
cnt = %d
for x in xrange(cnt):
d.append(x)
"""

setupQ = """
import Queue
q=Queue.Queue()
cnt = %d
for x in xrange(cnt):
q.put(x)
"""

def main():
cnt = 100
t = timeit.Timer(setup=setupD % cnt, stmt="d.append(0)")
time = min(t.repeat(10000, cnt))
print " %9d d.append %fs %7.4fus per" % \
(cnt, time, (time/cnt) * 1000000.0)

t = timeit.Timer(setup=setupD % cnt, stmt="d.popleft()")
time = min(t.repeat(10000, cnt))
print " %9d d.popL %fs %7.4fus per" % \
(cnt, time, (time/cnt) * 1000000.0)

t = timeit.Timer(setup=setupQ % cnt, stmt="q.put(0)")
time = min(t.repeat(10000, cnt))
print " %9d q.put %fs %7.4fus per" % \
(cnt, time, (time/cnt) * 1000000.0)

t = timeit.Timer(setup=setupQ % cnt, stmt="q.get()")
time = min(t.repeat(10000, cnt))
print " %9d q.get %fs %7.4fus per" % \
(cnt, time, (time/cnt) * 1000000.0)
if __name__ == "__main__":
main()
 
G

Gabriel Genellina

You are most correct, but Queue is slow compared to deque but
not for the reason I guessed. Apparently it's because deque is
implemented in C while Queue is in python. Using the program below
it looks there is about a 35:1 speed difference.

That't not the reason. A Queue is built around a container, and it happens
to be a deque in the default implementation. But the important thing is
that a Queue is a synchronized object - it performs the necesary
synchronization to ensure proper operation even from multiple threads
attempting to use it at the same time.
So your comparison is meaningless, apart from telling that using mutexes
is not cheap.
 
W

wink

That't not the reason. A Queue is built around a container, and it happens
to be a deque in the default implementation. But the important thing is
that a Queue is a synchronized object - it performs the necesary
synchronization to ensure proper operation even from multiple threads
attempting to use it at the same time.
So your comparison is meaningless, apart from telling that using mutexes
is not cheap.

Interesting, from the documentation for deque says;
"Deques support thread-safe, ..." which would seem to
imply a mutex of some sort would be used. But in
looking at collectionsmodule.c for 2.5.1 I don't see
any mutex which would seem to imply there is one
place else, maybe the GIL, or the documentation is
incorrect.

On the other hand looking at the code in Queue.py there
is a more code plus the not_full Condition variable plus
the call to _put (which is a call to the deque.append)
plus a notify on the not_empty conditional, so it's not
surprising it's slower.

If and when I think Queue is a performance impediment
to an mproc I'll take another look at it, for now lots
of things to learn.

Thanks,

Wink
 
G

Gabriel Genellina

Interesting, from the documentation for deque says;
"Deques support thread-safe, ..." which would seem to
imply a mutex of some sort would be used. But in
looking at collectionsmodule.c for 2.5.1 I don't see
any mutex which would seem to imply there is one
place else, maybe the GIL, or the documentation is
incorrect.

Uh, I had never noticed that append/pop claimed to be thread safe. Ok,
they don't appear to call any Python code, neither Py_DECREF is used, so
the GIL would make those methods thread safe, I think.
R. Hettinger, any comments?
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,770
Messages
2,569,583
Members
45,074
Latest member
StanleyFra

Latest Threads

Top