paramiko and stackless

L

Linnorm

I've written a port forwarding wrapper with paramiko for an app we use
and it works fine except that it consumes way too much processor.
(i.e. 25% +) I'm trying to write a stackless-based server class to
replace the threading one but I can't get the tunnel the wrapper
creates to work for more than one instance of the app. I based my
code on the example HTTP server on the stackless site. Relevant code
follows. The glovia variable is the result of subprocess.Popen().

<pre>
class ForwardServer (SocketServer.TCPServer):
allow_reuse_address = True
def __init__(self, *args, **kwargs):
SocketServer.TCPServer.__init__(self, *args, **kwargs)
self.socket.setblocking(0)

def serve_forever(self, glovia):
while glovia.poll() is None:
verbose("Handle request")
self.handle_request()

def handle_request(self):
try:
request, client_address = self.get_request()
except socket.error:
return
verbose("Adding handler tasklet")
stackless.tasklet(self.handle_request_tasklet)(request,
client_address)

def handle_request_tasklet(self, request, client_address):
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except:
self.handle_error(request, client_address)
self.close_request(request)


class Handler (SocketServer.BaseRequestHandler):
def handle(self):
verbose("Entering Handler.handle")
peername = self.request.getpeername()
try:
chan = self.ssh_transport.open_channel('direct-tcpip',
(self.chain_host,
self.chain_port), peername)
except Exception, e:
verbose('Incoming request to %s:%d failed: %s' %
(self.chain_host, self.chain_port, repr(e)))
return
if chan is None:
verbose('Incoming request to %s:%d was rejected by the SSH
server.' % (self.chain_host, self.chain_port))
return

verbose('Connected! Tunnel open %r -> %r -> %r' % (peername,

chan.getpeername(), (self.chain_host, self.chain_port)))
while True:
r, w, x = select.select([self.request, chan], [], [])
if self.request in r:
data = self.request.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
self.request.send(data)
verbose("Current Task: %s" %
(str(stackless.getcurrent()),))
verbose("Scheduling Tasks: %s" %
(str(stackless.getruncount()),))
stackless.schedule()
chan.close()
self.request.close()
verbose("Exiting Handler.handle")
verbose('Tunnel closed from %r' % (peername,))


def forward_tunnel(glovia, local_port, remote_host, remote_port,
transport):
class SubHandler (Handler):
chain_host = remote_host
chain_port = remote_port
ssh_transport = transport
ForwardServer(('', local_port), SubHandler).serve_forever(glovia)

stackless.tasklet(forward_tunnel)(glovia, LOCAL_PORT, HOST,
REMOTE_PORT, client.get_transport())
stackless.run()
</pre>
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,769
Messages
2,569,579
Members
45,053
Latest member
BrodieSola

Latest Threads

Top