Pipe IO Problem?

C

Chris S.

A wrote a small class to handle IO through pipes, but the connection
only seems to work in one direction. The following code defines
connection.py, engine.py, and controller.py. Once connected, the engine
is only able to send and the controller recieve. Also, this only works
when using popen2.popen3. Using os.popen3 doesn't work at all and seems
to behave completely differently.
What could be causing these problems? Any help is greatly appreciated.

##-------------------------------------------
# connection.py
import sys
import threading
import socket
import time

class ConnectionError(Exception):
'''Encapsulates connection errors.'''
pass

class Connection:
'''Base class for an interprocess connection.

This class is meant to be used in two primary ways:
First, as a caller, the initiator of the connection.
Second, as a callee, the recepiant of the connection.

However, once the connection has been made, general use should
behave symmetrically.'''

def __init__(self, in_handler=None):
self.connected = False
self._recv_handler = in_handler
self._recv_buffer = [] # stores data if no in_handler given
self._recv_lock = threading.Lock()
self._send_lock = threading.Lock()

def _send_raw(self, string):
'''Outgoing handler.'''
raise Exception, 'this method must be overridden'

def _recv_raw(self):
'''Incoming handler.'''
raise Exception, 'this method must be overridden'

def _launch_recv_handler(self):
'''Launches general reception handler.'''
assert not self.connected, 'connection already open'
print 'launching recv handler'
self.connected = True
t = threading.Thread(target=self._recv_raw)
t.setDaemon(True)
t.start()

def _launch_data_handler(self, data):
'''Launches user defined reception handler.'''

if self._recv_handler: # launch custom handler (ie in_handler)
if present
t = threading.Thread(target=self._recv_handler, args=(data,))
t.setDaemon(True)
t.start()
else: # otherwise append to buffer
self._recv_lock.acquire()
self._recv_buffer.append(data)
self._recv_lock.release()

def open(self):
raise Exception, 'this method must be overridden'

def close(self, message=None):
raise Exception, 'this method must be overridden'

def send(self, data):
assert self.connected, 'not connected'
self._send_raw(data)

def recv(self):
'''Blocks until it has data to return.
Only use this if you haven't specified a reception handler.'''
assert self.connected, 'not connected'

# wait for data
while not len(self._recv_buffer):
time.sleep(0.1)

# get data
self._recv_lock.acquire()
data = self._recv_buffer.pop(0)
self._recv_lock.release()

return data

def pending(self):
'''True if pending data in the buffer. False otherwise.'''
return bool(len(self._recv_buffer))

def name(self):
pass

class Pipe(Connection):
'''Base class for a pipe connection.'''

def __init__(self, *args, **kargs):
Connection.__init__(self, *args, **kargs)

def _send_raw(self, string):
assert self.connected, 'not connected'

try:
self._send_lock.acquire()
self._outp.write(string+'\n')
self._outp.flush()
self._send_lock.release()
except Exception, e:
self._send_lock.release()
self.close(e)
raise Exception, e

def _recv_raw(self):

while self.connected:
# get data
try:
data = self._inp.readline()
except Exception, e:
self.close(e)
break
if not len(data):
time.sleep(0.1)
continue

# launch handler
self._launch_data_handler(data)

def open(self, target=None):
assert not self.connected, 'connection already open'

if target:
#import os # different functionality?
#(self._inp, self._outp, self._errp) = os.popen3(target)
import popen2
(self._inp, self._outp, self._errp) = popen2.popen3(target)
else:
from sys import stdin, stdout
self._inp = stdin
self._outp = stdout

self._launch_recv_handler()

def close(self, message=None):
assert self.connected, 'connection already closed'

self.connected = False
self._inp.close()
self._inp = None
self._outp.close()
self._outp = None
self._errp.close()
self._errp = None

if message:
print message


##-------------------------------------------
# engine.py
import time

from connection import *

outfile = open('enginetest.data', 'w')

def print_received(data):
print >>outfile, data

conn = Pipe(in_handler=print_received)
conn.open()

for i in range(5):
time.sleep(1)
conn.send('engine '+str(i))

outfile.close()


##-------------------------------------------
# controller.py
import time

from connection import *

def print_received(data):
print 'controller received:',data

conn = Pipe(in_handler=print_received)
conn.open('pipetest_engine.py')

for i in range(5):
print 'controller',i
time.sleep(1)
conn.send(str(i))
 
D

Donn Cave

"Chris S. said:
A wrote a small class to handle IO through pipes, but the connection
only seems to work in one direction. The following code defines
connection.py, engine.py, and controller.py. Once connected, the engine
is only able to send and the controller recieve. Also, this only works
when using popen2.popen3. Using os.popen3 doesn't work at all and seems
to behave completely differently.

I'm too lazy to track down the problem in your code, but
os.popen3 vs. popen2.popen3 is an easy one. You can see
the code yourself, in the os.py module --

def popen3(cmd, mode="t", bufsize=-1):
import popen2
stdout, stdin, stderr = popen2.popen3(cmd, bufsize)
return stdin, stdout, stderr

Note different return value.

You can avoid some confounding factors, and possibly the
need for a separate thread, if you use the UNIX file descriptors
(e.g., outpfd = outp.fileno(), os.write(outpfd, string + '\n'),
etc. The thread could go away if select() turns out to be suitable
for your application. The buffering you get with file objects
is relatively useless here and likely not worth the trouble it causes.

Donn Cave, (e-mail address removed)
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,768
Messages
2,569,574
Members
45,050
Latest member
AngelS122

Latest Threads

Top