[Kamaelia] TCPClient: How to sense connection failure?

  • Thread starter Bjoern Schliessmann
  • Start date
B

Bjoern Schliessmann

Hello,

I'm currently trying to implement a simulation program with Kamaelia
and need a reliable TCP connection to a data server.

From Twisted, I know that a method is called if the connection fails
by whatever reason. I tried to get the same results with Kamaelia's
TCPClient component. If I start up the component and try to connect
to a closed TCP port it fails and sends a message out of the signal
box, that's okay.

But if the connection attempt succeeds and, after some time, the
server drops the connection with full TCP handshake (FIN, FIN+ACK,
ACK), the component just hangs and does nothing. Is this by design,
or could there be an error in my setup?

Also, how long does a TCPClient component live -- or how can/should
I terminate it explicitly? I'm afraid that if I create
a "ReconnectingTCPClient" component, it could eat up memory over
long runtime with hanging TCPClients.

Regards,


Björn
 
H

Hendrik van Rooyen

Bjoern Schliessmann said:
I'm currently trying to implement a simulation program with Kamaelia
and need a reliable TCP connection to a data server.

From Twisted, I know that a method is called if the connection fails
by whatever reason. I tried to get the same results with Kamaelia's
TCPClient component. If I start up the component and try to connect
to a closed TCP port it fails and sends a message out of the signal
box, that's okay.

But if the connection attempt succeeds and, after some time, the
server drops the connection with full TCP handshake (FIN, FIN+ACK,
ACK), the component just hangs and does nothing. Is this by design,
or could there be an error in my setup?

Not sure about Kamelia, but I have found that when a FIN comes along,
a socket.recv() gives back an empty string, just like EOF on a file.

I always have my sockets unblocked and fitted with time outs, but then
I am basically a broken down assembler programmer, so there are probably
better techniques around.

Below is what I use - a sort of netstring, synced on a tilde, with human
readable length implementation and escaping of tildes and the escape character.

It seems to work reliably for me, and detects when the server goes down.

The code for a typical client is below. If anybody is interested I will post
the server
too, but it should be trivial to make, given the example below.

I hope the tabs survive the journey

- Hendrik

#start of code fragment

def sockget_len(s,L,data):
"""
This fills a buffer of given length from the socket s, recursively.

s is the socket
L is the length to receive
data is the buffer
"""

error = 0
req_L = L - len(data)
try:
data = data+s.recv(req_L)
except socket.error,msg: # broken pipes again
if 'timed out' in msg:
rec = '2'*L
return 2,rec # time out
print 'socket error while receiving',msg
rec = '1'*L
return 1,rec # error = 1 is a snafu
if not data:
print 'end of file while receiving'
rec = '0'*L
return 3,rec # This is end of file
if len(data) != L:
error,data = sockget_len(s,L,data)
return error,data


def sockget(s):
"""
Gets a transmission from host.
"""

while True:
tilde = ''
error,tilde = sockget_len(s,1,tilde) # sync up on tilde
if error == 1:
return error,''
elif error == 2:
return error,''
elif error == 3:
return error,''
if tilde == '~':
break

length = ''
error,length = sockget_len(s,4,length) # get the length of the data
if error == 1:
return error,'' # real error
elif error == 2:
return error,'' # Time out
elif error == 3:
return error,'' # End of file
L = int(length)
buf = ''
error,data = sockget_len(s,L,buf) # get the data of length L
return error, data # same errors as above 0 is all right


# client communications program

def comms_thread(qi,qo):
"""This listens for the latest values, and sends requests up."""

while True:

HOST = 'Linuxbox' # The remote host
PORT = 50007 # The same port as used by the server
socket.setdefaulttimeout(10.00)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

while True:
try:
qi.put('Connecting,')
s.connect((HOST, PORT))
break
except socket.error,msg:
print 'error msg is:',msg
time.sleep(10)
continue
print 'Connected - Time out is:',s.gettimeout()
qi.put('Connected,')

last_rx_time = time.time()
last_tx_time = time.time()
while True:

while True:
error,data = sockget(s) # see if a message from host
if error == 0 and data:
msg2 = data.replace('/\x81','~')
msg1 = msg2.replace('/\xd0','/')
qi.put(msg1)
print 'received',msg1
last_rx_time = time.time()
break
elif error == 1:
print 'Error after sockget'
break
if time.time() - last_rx_time > 180:
print 'no comms from host for 3 minutes'
error = 1
break # time out ok, unless they are too long
if error == 2:
error = 0 # time outs are all right here
break
if error == 3:
error = 1
break # end of files are a snafu

if error == 1:
break

try:
i_string = qo.get(block=False) # see if stuff to transmit
except Queue.Empty:
if time.time()-last_tx_time > 8.5:
i_string = 'Keepalive' # if not for a while, tell server we are alive
print 'sending keepalive'
else:
time.sleep(0.1) # else wait a while and carry on
continue

msg1 = i_string.replace('/','/\xd0')
msg2 = msg1.replace('~','/\x81')
length = str(len(msg2))
L = len(length)
if L == 1:
length = '000'+length
elif L == 2:
length = '00'+length
elif L == 3:
length = '0'+length
try:
s.send('~'+length+msg2)
last_tx_time = time.time()
except socket.error,msg:
print 'Socket error on transmit',msg
break
time.sleep(0.1)

s.close() # Formally close the broken thing
qi.put('Quit,') # Tell main thread its hopeless

sys.exit() # Clobber this thread
 
B

Bjoern Schliessmann

Hendrik said:
Not sure about Kamelia, but I have found that when a FIN comes
along, a socket.recv() gives back an empty string, just like EOF
on a file.

That Python socket interface can detect it I'm absolutely sure --
Twisted handles it.

I even pdb'ed Kamaelia and control flow ended at a different point
in case of connection closing -- but yielded control to the
scheduler immediately and never continued.
Below is what I use - a sort of netstring, synced on a tilde, with
human readable length implementation and escaping of tildes and
the escape character.

Thank you (though I hope I won't have to mess with socket
functions ;) ).
I hope the tabs survive the journey

Looks like they morphed to spaces.

Regards,


Björn
 
M

Michael Sparks

Bjoern said:
Hello,

I'm currently trying to implement a simulation program with Kamaelia
and need a reliable TCP connection to a data server.

The behaviour you're seeing sounds odd (which is hopefully encouraging :),
but it's not clear from the description whether its a bug in your code or
Kamaelia. One question I really have as a result is what version are you
using?

Current release version 0.5.0, the version on /trunk, or the bleeding edge
version on /branches/private_MPS_Scratch. (I'm using the latter to run my
greylisting server - as are a few others).

All that said though, looking at the differences between versions, I'm not
convinced they're large enough to show the problem you're seeing.

I'm not about to rule out a bug I don't know about though :)
From Twisted, I know that a method is called if the connection fails
by whatever reason. I tried to get the same results with Kamaelia's
TCPClient component. If I start up the component and try to connect
to a closed TCP port it fails and sends a message out of the signal
box, that's okay.

If you'd prefer more information in that message, please let me know.
(all components send out a message when they shutdown. For things that
send data out as one of their primary actions send out a producerFinished
message)
But if the connection attempt succeeds and, after some time, the
server drops the connection with full TCP handshake (FIN, FIN+ACK,
ACK), the component just hangs and does nothing. Is this by design,
or could there be an error in my setup?

It sounds like an error in your setup... but I hate saying that. (Doesn't
tell me or you what it is, and doesn't help change things to discourage or
detect mistakes in usage)

When the server drops the connection in my setups, the client disconnects
cleanly when the server dies, with the client code looking like this:
self.send(producerFinished(self,self.howDied), "signal")

Meaning you get a message telling you how the component shutdown as well as
the fact it shutdown. (If "howDied" is None, it's just a normal shutdown -
ie as a result of being told to shut down)

The function where this is managed is runClient in the class
Kamaelia.Internet.TCPClient.TCPClient
(fully qualified name)

The actual socket connections are handled by a class called
ConnectedSocketAdapter which manages all logic of checking for
errors, remote shutdown etc. That works the same for both servers
and clients so breakage in clients would show up as breakage in servers
as well, which would be particularly bad.
Also, how long does a TCPClient component live -- or how can/should
I terminate it explicitly? I'm afraid that if I create
a "ReconnectingTCPClient" component, it could eat up memory over
long runtime with hanging TCPClients.

That shouldn't be an issue (I hate the word "should"), but you can do this
using a carousel component. (I ought to write that as an example of how to
use the Carousel component)

In the meantime - whilst I check to see if there's a bug I didn't know
about, the following 2 cookbook entries may be of use:
* http://kamaelia.sourceforge.net/Cookbook/TCPSystems
* http://kamaelia.sourceforge.net/Cookbook/Carousels - allows you to make
something that exits reusable. It's a little awkward to get your head
around, but is quite useful when you do. (I've heard of others using
Carousel & TCPClient to make a reconnecting TCPClient in the past)

All that said, I'm not going to rule out a bug and look into it. (if you
have a simple example you find fails, please forward it to me :)

*thinks*

The following code may also be useful when debugging:

from Kamaelia.Chassis.Pipeline import Pipeline

class PeriodicWakeup(Axon.ThreadedComponent.threadedcomponent):
interval = 300
def main(self):
while 1:
time.sleep(self.interval)
self.send("tick", "outbox")

class WakeableIntrospector(Axon.Component.component):
def main(self):
while 1:
Q = [ q.name for q in self.scheduler.listAllThreads() ]
Q.sort()
print "*debug* THREADS"+ str(Q)
self.scheduler.debuggingon = False
yield 1
while not self.dataReady("inbox"):
self.pause()
yield 1
while self.dataReady("inbox"):
self.recv("inbox")

Pipeline(
PeriodicWakeup(),
WakeableIntrospector(),
).activate()

If you put this code somewhere before your "run" call, you'll get periodic
output to tell you what's running. When debugging manually I'd drop the
interval to 3-10 seconds or so. I use 300 for a server.

Now, off to see if I can reproduce your problem... :)

Regards,


Michael.
 
B

Bjoern Schliessmann

Michael said:
The behaviour you're seeing sounds odd (which is hopefully
encouraging :), but it's not clear from the description whether
its a bug in your code or Kamaelia. One question I really have as
a result is what version are you using?

Oh sorry, it's the versions from MegaPack 1.4.0.
In the meantime - whilst I check to see if there's a bug I didn't
know about, the following 2 cookbook entries may be of use:
* http://kamaelia.sourceforge.net/Cookbook/TCPSystems
* http://kamaelia.sourceforge.net/Cookbook/Carousels - allows
you to make
something that exits reusable. It's a little awkward to get
your head around, but is quite useful when you do. (I've
heard of others using Carousel & TCPClient to make a
reconnecting TCPClient in the past)

Thanks for all the information.
All that said, I'm not going to rule out a bug and look into it.
(if you have a simple example you find fails, please forward it to
me :)

Sure, here is my code (but see below ;) ):

----snip----------------------------------------------------------
from Kamaelia.Internet.TCPClient import TCPClient
from Kamaelia.Chassis.Pipeline import Pipeline
from Kamaelia.Util.Console import ConsoleEchoer, ConsoleReader
from Axon.Component import component
from Axon.Ipc import shutdownMicroprocess, producerFinished

class Listener(component):

Inboxes = {"inbox": "Inbox",
"control": "control signals received here",
}

Outboxes = {"outbox": "(not used)",
"signal": "(not used)"
}

def main(self):
while True:
if self.dataReady("inbox"):
print "data from Inbox:", repr(self.recv("inbox"))
if self.dataReady("control"):
control_data = self.recv("control")
print repr(control_data)
if isinstance(control_data, shutdownMicroprocess):
print "Connection could not be established"
break
elif isinstance(control_data, producerFinished):
print "Connection closed"
break
yield 1

k = ConsoleReader(">>> ")
tcp_client = TCPClient("127.0.0.1", 1850)
listener = Listener()
Pipeline(tcp_client, listener).run()
----snip----------------------------------------------------------

So I'm just using a client and a helper object to display all data
behind it. I usually start the script in one VT, and "nc -l -p
1850" in another. Using wireshark, the packet sequence is almost
identical:

Client closes connection:
C: SYN
S: SYN,ACK
C: ACK
[connection established]
C: FIN,ACK
S: FIN,ACK
C: ACK

Client closes connection:
C: SYN
S: SYN,ACK
C: ACK
[connection established]
S: FIN,ACK
C: ACK
C: FIN,ACK
S: ACK

Looks like a perfectly normal handshake to me.
The following code may also be useful when debugging:

Cool, I've been looking for a code piece like that. :)

Whoops, the TCP client does in fact quit if the server closes
connection :) For some reason, my Listener doesn't quit. I thought
it's sufficient to exit the main method in some way to quit a
component? That's what I do using "break" in the
'if self.dataReady("control")' part of main.

Regards,


Björn
 
M

Michael Sparks

Bjoern said:
Whoops, the TCP client does in fact quit if the server closes
connection :)

Great - so it wasn't a problem with the TCPClient after all :)
For some reason, my Listener doesn't quit. I thought
it's sufficient to exit the main method in some way to quit a
component? That's what I do using "break" in the
'if self.dataReady("control")' part of main.

It is sufficient, and running with Kamaelia from /trunk, your listener does
indeed shutdown correctly - so it looks like it's a bug in that release of
Kamaelia.

That's my bad really because we haven't done a full release in quite some
time.

Looking at the old code, it appears that in that code the TCPClient wasn't
forwarding shutdown messages correctly, so there *was* a bug, but there
doesn't seem to be now.

My suggestion for the moment would be to use the code on /trunk since this
is stable at present (development happens on branches rather than /trunk)
and that I really ought to sort out the next release - I really hadn't
realised just how long it had been since the last release!

steps:

~> svn co https://kamaelia.svn.sourceforge.net/svnroot/kamaelia/trunk
kamaelia-trunk
~> cd kamaelia-trunk
~/kamaelia-trunk> cd Code/Python/Axon
~/kamaelia-trunk/Code/Python/Axon> sudo python setup.py install
~/kamaelia-trunk/Code/Python/Axon> cd ../Kamaelia
~/kamaelia-trunk/Code/Python/Kamaelia> sudo python setup.py install

I've tested using this, and your code works as you'd expect.

Regards,


Michael.
 
B

Bjoern Schliessmann

Michael said:
It is sufficient, and running with Kamaelia from /trunk, your
listener does indeed shutdown correctly

Great, thanks for validating. :)
My suggestion for the moment would be to use the code on /trunk
since this is stable at present (development happens on branches
rather than /trunk)

I will.

Regards,


Björn
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,580
Members
45,054
Latest member
TrimKetoBoost

Latest Threads

Top