asyncore based port splitter code questions

G

George Trojan

The following code is a attempt at port splitter: I want to forward data
coming on tcp connection to several host/port addresses. It sort of
works, but I am not happy with it. asyncore based code is supposed to be
simple, but I need while loops and a lot of try/except clauses. Also, I
had to add suspend/activate_channel methods in the Writer class that use
variables with leading underscores. Otherwise the handle_write() method
is called in a tight loop. I designed the code by looking at Python 2.3
source for asyncore and originally wanted to use add_channel() and
del_channel() methods. However in Python 2.6 del_channel() closes the
socket in addition to deleting it from the map. I do not want to have
one connection per message, the traffic may be high and there are no
message delimiters. The purpose of this exercise is to split incoming
operational data so I can test a new version of software.
Comments please - I have cognitive dissonance about the code, my little
yellow rubber duck is of no help here.
The code is run as:

python2.6 afwdport.py 50002 localhost 50003 catbert 50001

where 50002 is the localhost incoming data port, (localhost, 50003) and
(catbert, 50001) are destinations.

George

import asyncore, os, socket, sys, time

TMOUT = 10

#----------------------------------------------------------------------
def log_msg(msg):
print >> sys.stderr, '%s: %s' % (time.ctime(), msg)

#----------------------------------------------------------------------
class Reader(asyncore.dispatcher):
def __init__(self, sock, writers):
asyncore.dispatcher.__init__(self, sock)
self.writers = writers

def handle_read(self):
data = self.recv(1024)
for writer in self.writers:
writer.add_data(data)

def handle_expt(self):
self.handle_close()

def handle_close(self):
log_msg('closing reader connection')
self.close()

def writable(self):
return False

#----------------------------------------------------------------------
class Writer(asyncore.dispatcher):
def __init__(self, address):
asyncore.dispatcher.__init__(self)
self.address = address
self.data = ''
self.mksocket()

def suspend_channel(self, map=None):
fd = self._fileno
if map is None:
map = self._map
if fd in map:
del map[fd]

def activate_channel(self):
if self._fileno not in self._map:
self._map[self._fileno] = self

def mksocket(self):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.connect(self.address)
log_msg('connected to %s' % str(self.address))

def add_data(self, data):
self.data += data
self.activate_channel()

def handle_write(self):
while self.data:
log_msg('sending data to %s' % str(self.address))
sent = self.send(self.data)
self.data = self.data[sent:]
self.suspend_channel()

def handle_expt(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
log_msg(asyncore._strerror(err))
self.handle_close()

def handle_close(self):
log_msg('closing writer connection')
self.close()
# try to reconnect
time.sleep(TMOUT)
self.mksocket()

def readable(self):
return False

#----------------------------------------------------------------------
class Dispatcher(asyncore.dispatcher):
def __init__(self, port, destinations):
asyncore.dispatcher.__init__(self)
self.address = socket.gethostbyname(socket.gethostname()), port
self.writers = [Writer(_) for _ in destinations]
self.reader = None
self.handle_connect()

def handle_connect(self):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(self.address)
self.listen(1)
log_msg('listening on %s' % str(self.address))

def handle_accept(self):
conn, addr = self.accept()
log_msg('connection from %s' % str(addr))
# current read connection not closed for some reason
if self.reader:
self.reader.close()
self.reader = Reader(conn, self.writers)

def cleanup(self):
try:
if self.reader:
self.reader.close()
except socket.error, e:
log_msg('error closing reader connection %s' % e)
# writer might have unwatched connections
for w in self.writers:
try:
w.close()
except socket.error, e:
log_msg('error closing writer connection %s' % e)

#----------------------------------------------------------------------
def main(port, destinations):
disp = None
try:
# asyncore.loop() exits when input connection closes
while True:
try:
disp = Dispatcher(port, destinations)
asyncore.loop(timeout=TMOUT, use_poll=True)
except socket.error, (errno, e):
if errno == 98:
log_msg('sleeping %d s: %s', (30, e))
time.sleep(30)
except BaseException, e:
log_msg('terminating - uncaught exception: %s' % e)
raise SystemExit
finally:
if disp:
disp.cleanup()

#----------------------------------------------------------------------
if __name__ == '__main__':
nargs = len(sys.argv)
try:
assert nargs > 3 and nargs % 2 == 0
port = int(sys.argv[1])
destinations = [(sys.argv[n], int(sys.argv[n+1])) \
for n in range(2, nargs-1, 2)]
main(port, destinations)
except (AssertionError, ValueError), e:
print 'Error: %s' % e
print 'Usage: python %s local-port host port ...' % sys.argv[0]
raise SystemExit(1)
 
G

Giampaolo Rodola'

asyncore based code is supposed to be simple,
but I need while loops and a lot of try/except clauses.

First of all: you DON'T have to use while loops or anything which is
blocking where by "blocking" I mean anything like time.sleep().
asyncore, just like Twisted, is an asynchrounous abstraction on top of
select().
Anything you call must return *immediately* otherwise the whole thing
will hang aka "stop serving any connected client or server".
I designed the code by looking at Python 2.3
source for asyncore and originally wanted to use add_channel() and
del_channel() methods. However in Python 2.6 del_channel() closes the
socket in addition to deleting it from the map.

Don't look at the 2.3 source. Use asyncore of Python 2.6 which is far
more improved, bug-fixed and also *different*, where by that I mean
that it might actually behaves differently.
If you are forced to use Python 2.3 my advice is to get a copy of
Python's asyncore.py and asynchat.py and include them in your code.

Secondly, to temporarily "sleep" your connections *don't* remove
anything from your map.
The correct way of doing things here is to override readable() and
writable() methods and make them return False as long as you want your
connection to hang.

Now I'm going to comment some parts of your code.
class Reader(asyncore.dispatcher):
     def __init__(self, sock, writers):
         asyncore.dispatcher.__init__(self, sock)
         self.writers = writers

     def handle_read(self):
         data = self.recv(1024)
         for writer in self.writers:
             writer.add_data(data) [...]
     def handle_write(self):
         while self.data:
             log_msg('sending data to %s' % str(self.address))
             sent = self.send(self.data)
             self.data = self.data[sent:]
         self.suspend_channel()


By looking at how you are appending data you want to send in a buffer,
it looks like you might want to use asynchat.async_chat rather than
asyncore.dispatcher.
async_chat.push() and async_chat.push_with_producer() methods already
take care of buffers logic and make sure that all the data gets sent
to the other peer without going lost.

Actually there's no reason to use asyncore.dispatcher class directly
except for creating a socket which listens on an interface and then
passes the connection to another class inheriting from
asynchat.async_chat which will actually handle that session.

So my recommendation is to use asynchat.async_chat whenever possible.
     def suspend_channel(self, map=None):
         fd = self._fileno
         if map is None:
             map = self._map
         if fd in map:
             del map[fd]

As I said this is unecessary.
Override readable() and writable() methods instead.
     def handle_close(self):
         log_msg('closing writer connection')
         self.close()
         # try to reconnect
         time.sleep(TMOUT)
         self.mksocket()

You really don't want to use time.sleep() there.
It blocks everything.
         while True:
             try:
                 disp = Dispatcher(port, destinations)
                asyncore.loop(timeout=TMOUT, use_poll=True)
             except socket.error, (errno, e):
                 if errno == 98:
                     log_msg('sleeping %d s: %s', (30, e))
                     time.sleep(30)

Same as above.


As a final note I would recommend to take a look at pyftpdlib code
which uses asyncore/asynchat as part of its core:
http://code.google.com/p/pyftpdlib
It can be of some help to figure out how things should be done.



--- Giampaolo
http://code.google.com/p/pyftpdlib/
 
G

George Trojan

Thanks for your help. Some comments below.

George
Secondly, to temporarily "sleep" your connections *don't* remove
anything from your map.
The correct way of doing things here is to override readable() and
writable() methods and make them return False as long as you want your
connection to hang.
Good advice.
Now I'm going to comment some parts of your code.
class Reader(asyncore.dispatcher):
def __init__(self, sock, writers):
asyncore.dispatcher.__init__(self, sock)
self.writers = writers

def handle_read(self):
data = self.recv(1024)
for writer in self.writers:
writer.add_data(data) [...]
def handle_write(self):
while self.data:
log_msg('sending data to %s' % str(self.address))
sent = self.send(self.data)
self.data = self.data[sent:]
self.suspend_channel()


By looking at how you are appending data you want to send in a buffer,
it looks like you might want to use asynchat.async_chat rather than
asyncore.dispatcher.
async_chat.push() and async_chat.push_with_producer() methods already
take care of buffers logic and make sure that all the data gets sent
to the other peer without going lost.

Actually there's no reason to use asyncore.dispatcher class directly
except for creating a socket which listens on an interface and then
passes the connection to another class inheriting from
asynchat.async_chat which will actually handle that session.

My understanding is that asynchat is used for bi-directional connection,
I don't see how it applies to my case (forwarding data). However I
rewrote the Writer class following some of asynchat code.
So my recommendation is to use asynchat.async_chat whenever possible.
You really don't want to use time.sleep() there.
It blocks everything.


Same as above.
I wanted to reconnect after the os cleans up half-closed sockets.
Otherwise the program exits immediately with a message:
terminating - uncaught exception: [Errno 98] Address already in use
As a final note I would recommend to take a look at pyftpdlib code
which uses asyncore/asynchat as part of its core:
http://code.google.com/p/pyftpdlib
It can be of some help to figure out how things should be done.
Thanks for good example to study.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,769
Messages
2,569,579
Members
45,053
Latest member
BrodieSola

Latest Threads

Top