Duplex communication with pipes - is possible ?

D

Dara Durum

Hi !

I want to create a Process Pool Object.
I can hold started processes, and can communicate with them.

I tryed with many ipc methods, but every of them have bug or other problem.
Sockets are unavailabe (because Windows Firewall hold them).

I think I will use pipe.

The object's pseudocode:
while not Quit:
CheckProcessOutputs;
ProcessReceivedData;
SendDataToSubProcesses;
if NoMoreData: Quit=1

If I used pipes and subprocess module in Windows, I got big freezes,
and deadlocks.

Main proc:
subprocpipe.write('aaaa\n')
subprocpipe.readlines()

Sub proc:
input=sys.stdin.readlines().strip()
print input+'!!!'

It is working. But when I move this client code to cycle, I got deadlock.
while not Quit:
input=sys.stdin.readlines().strip()
print input+'!!!'
Quit=input=='q'

Why ? Why I cannot create cyclic communication with client ?
Subprocess must "staying alive" (don't die), and need to stay in
reuseable state ?

Simply: subprocess pool needed !!!

Thanks for help:
dd
 
D

Daniel Dittmar

Dara said:
Hi !

I want to create a Process Pool Object.
I can hold started processes, and can communicate with them.

I tryed with many ipc methods, but every of them have bug or other problem.
Sockets are unavailabe (because Windows Firewall hold them).

I think I will use pipe.

The object's pseudocode:
while not Quit:
CheckProcessOutputs;
ProcessReceivedData;
SendDataToSubProcesses;
if NoMoreData: Quit=1

If I used pipes and subprocess module in Windows, I got big freezes,
and deadlocks.

Main proc:
subprocpipe.write('aaaa\n')
subprocpipe.readlines()

Sub proc:
input=sys.stdin.readlines().strip()
print input+'!!!'

It is working. But when I move this client code to cycle, I got deadlock.
while not Quit:
input=sys.stdin.readlines().strip()
print input+'!!!'
Quit=input=='q'

Why ? Why I cannot create cyclic communication with client ?
Subprocess must "staying alive" (don't die), and need to stay in
reuseable state ?

Simply: subprocess pool needed !!!

Thanks for help:
dd

readlines () will try to read until the stream/socket is closed. Try to
read only one line. This of course means that you cannot sent \n as part
of the data, you have to escape them somehow.

I'm not sure how the pipe code is searching for the \n. Trying to read
too much could lead to deadlocks as well. (Although I'm sure that the
code is written to return fewerbytes than requested if there isn't
enough data pending in the pipe). A safer variant might be to transfer a
fixed number of bytes containing the length n of the following data, and
then n bytes containing the actual data.

Daniel
 
A

alisonken1

<snip
readlines () will try to read until the stream/socket is closed. Try to
read only one line. This of course means that you cannot sent \n as part
of the data, you have to escape them somehow.
<snip>

If I remember correctly, if you want to pass '\n' so readline won't
stop, you should be able to escape the escape '\\n', then remove the
extra '\' when actually processing.

Of course, readline will continue to read until buffer filled or a real
'\n' is passed (g).
 
D

Dara Durum

Hi !

Ahhh !

It's working !

This is the simple client code:
if 'C' in sys.argv:
#sys.stdout=open(r'c:\tpp2client.log','w')
print "clt start"
while 1:
head=sys.stdin.read(4)
print "clt head",[head]
if head.lower()=='quit':
break
dsize=int(head)
if dsize:
data=sys.stdin.read(dsize)
print "clt get data"#,[data]
print "clt end\n"
And the master:
else:
print "MS"
p=subprocess.Popen([r'c:\python24\python.exe','tpp2.py','C'], \
stdin=subprocess.PIPE,stdout=subprocess.PIPE)
print "MSS"
(child_stdout, child_stdin) = (p.stdout, p.stdin)
data=range(1000)
import cPickle
bdata=cPickle.dumps(data,1)
print len(bdata)
import binascii
hdata=binascii.hexlify(bdata)
print len(hdata)
import base64
hdata=base64.encodestring(bdata)
print len(hdata)
child_stdin.write('%04d'%len(hdata))
child_stdin.write(hdata)
child_stdin.write('quit')
#child_stdin.write('quit')
#child_stdin.write('quit\n')
output=child_stdout.readlines()
print [output]
print "MEE"

Now I trying with packet size decreasing. Are PIPE-s can handle the
binary data packets, or I need to convert them with base64 ?

Thanks for your help:
dd
 
D

Daniel Dittmar

Dara said:
Now I trying with packet size decreasing. Are PIPE-s can handle the
binary data packets, or I need to convert them with base64 ?

In the client, you need to set the mode of sys.stdin to binary,
otherwise, you get the DOS translation of linefeeds. See
http://mail.python.org/pipermail/python-list/2000-January/020463.html
for some hints.

I assume that the stream returned by subprocess.popen are also opened in
text mode and you have to change them tobinary, see the second hint in
the link mentioned above.

Daniel
 
D

Dara Durum

Hi !

See this shortened, simplified example. It is not working, but I don't
understand why...

# Client Process

import os, sys
from subprocess import Popen, PIPE
from time import sleep, time
from cPickle import loads, dumps
from binascii import hexlify, unhexlify
from base64 import encodestring, decodestring
from time import time

def WriteData(WStm,Data):
data=dumps(Data,1)
bdata=hexlify(data)
msg='%s#'%bdata
WStm.write(msg)

def ReadData(RStm):
tmpl=[]
while 1:
c=RStm.read(1)
if c=='#':
break
tmpl.append(c)
bdata=''.join(tmpl)
data=unhexlify(bdata)
orgdata=loads(data)
return orgdata

def SubProcessFunctions():
f=open('spp_clt.log','w')
print >>f,"m1"
while 1:
print >>f,"m2"
data=ReadData(sys.stdin)
print >>f,"m3",[data]
if data=='quit':
print >>f,"m4"
WriteData(sys.stdout,'')
break
print >>f,"m5"
WriteData(sys.stdout,'>>%s<<'%[data])
print >>f,"m6"

if __name__=='__main__':
SubProcessFunctions()

# The Master process
from spp_clt import ReadData, WriteData
from subprocess import Popen, PIPE
import time

def MasterProcess():
print "m1"
p=Popen([r'c:\python24\python.exe','spp_clt.py'], \
stdin=PIPE,stdout=PIPE)
print "m2"
(child_stdout, child_stdin) = (p.stdout, p.stdin)
print "m3"
for s in range(2):
print "m4"
WriteData(child_stdin,s)
print "m5"
ReadData(child_stdout)
print "m6"
print "m7"
WriteData(child_stdin,'quit')
print "m8"
ReadData(child_stdout)
print "m9"

MasterProcess()

It is freezed, because I got deadlock. Every process got "Read" state,
and never get back.
What I do wrong ?
I was trying with this packet managing mode:
def WriteData(WStm,Data):
data=dumps(Data,1)
bdata=encodestring(data)
dlen=len(bdata)
msg='%12d%s'%(dlen,bdata)
if WStm==None:
print msg
else:
WStm.write(msg)

def ReadData(RStm):
dlen=int(RStm.read(12))
bdata=RStm.read(dlen)
data=decodestring(bdata)
orgdata=loads(data)
return orgdata

but it also freezed.
Why the master doesn't got the packets are sended by client ?

Thanks for your help:
dd




2006/6/21 said:
Hi !
Sorry, but I need "multios" application...

This version for text mode is working:

import subprocess
import os,sys

if 'C' in sys.argv:
#sys.stdout=open(r'c:\tpp2client.log','w')
print "clt start"
while 1:
head=sys.stdin.read(4)
print "clt head",[head]
if head.lower()=='quit':
break
dsize=int(head)
if dsize:
data=sys.stdin.read(dsize)
print "clt get data",len(data)
print "clt end\n"
else:
print "MS"
p=subprocess.Popen([r'c:\python24\python.exe','tpp2.py','C'], \
stdin=subprocess.PIPE,stdout=subprocess.PIPE)
print "MSS"
(child_stdout, child_stdin) = (p.stdout, p.stdin)
data=range(1000)
import cPickle
bdata=cPickle.dumps(data,1)
print len(bdata)
import binascii
hdata=binascii.hexlify(bdata)
print len(hdata)
import base64
hdata=base64.encodestring(bdata)
print len(hdata)
child_stdin.write('%04d'%len(hdata))
child_stdin.write(hdata)
child_stdin.write('quit')
output=child_stdout.readlines()
print "Client's answer:\n",'<'*80
for s in output:
print s.strip()
print '>'*80
print "MEE"

I will see your idea: the bittorrent...

Thanx:
dd
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,009
Latest member
GidgetGamb

Latest Threads

Top