why asynchat's initiate_send() get called twice after reconnect ?

Discussion in 'Python' started by davy zhang, Oct 25, 2008.

  1. davy zhang

    davy zhang Guest

    Python3.0rc1 windowsxp

    in the lib\asynchat.py

    def handle_write (self):
    self.initiate_send()

    def push (self, data):
    sabs = self.ac_out_buffer_size
    if len(data) > sabs:
    for i in range(0, len(data), sabs):
    self.producer_fifo.append(data[i:i+sabs])
    else:
    self.producer_fifo.append(data)
    self.initiate_send()

    when there's only one time connection, the object works just fine. but
    problems came out when the client disconnected and reconnected again
    to the server, it seems there are two ways to call the initiate_send,
    one is from push() which I called in my program, one is from
    handle_write() which automatically called in asyncore.loop(). I just
    can't get it why one time connection works fine but multi-time
    connection went bad.

    I printed the traceback. I found when one time connection made, the
    handle_write() always get silent, but when the second time, it get
    called and start to call initiate_send in the same time as push() get
    called. So confusing....



    So I tried to remove the initiate_send from push() and the code
    magically works fine for me.

    the main program lists below:
    since it's need a flash client, I attached a webpage to reproduce the problem
    click on the connect button multiple times and clicked on the send
    button will make an error

    import asyncore, asynchat
    import os, socket, string
    from multiprocessing import Process,Manager
    import pickle
    import _thread
    import threading

    PORT = 80

    policyRequest = b"<policy-file-request/>"
    policyReturn = b"""<cross-domain-policy>
    <allow-access-from domain="*" to-ports="*" />
    </cross-domain-policy> \x00"""

    def handler(taskList,msgList):
    while 1:
    print('getting task')
    item = pickle.loads(taskList.get())
    print('item before handle ', item)
    #do something
    item['msg'] += b' hanlded done'
    msgList.put(pickle.dumps(item))

    def findClient(id):
    for item in clients:
    if item.idx == id:
    return item

    def pushData(ch,data):
    global pushLock
    pushLock.acquire()
    try:
    ch.push(data)
    finally:
    pushLock.release()


    def sender():
    global msgList
    print('thread started')
    while 1:
    item = pickle.loads(msgList.get())
    #print time()
    c = findClient(item['cid'])
    #print time()
    #wrong here it's not thread safe, need some wrapper
    #c.push(item['msg'])
    pushData(c,item['msg'])
    print('msg sent ',item['msg'])
    #print time()

    class HTTPChannel(asynchat.async_chat):

    def __init__(self, server, sock, addr):
    global cid;
    asynchat.async_chat.__init__(self, sock)
    self.set_terminator(b"\x00")
    self.data = b""
    cid += 1
    self.idx = cid
    if not self in clients:
    print('add to clients:',self)
    clients.append(self)

    def collect_incoming_data(self, data):
    self.data = self.data + data
    print(data)

    def found_terminator(self):
    global taskList
    print("found",self.data)
    if self.data == policyRequest:
    pushData(self,policyReturn)
    self.close_when_done()
    else:
    d = {'cid':self.idx,'msg':self.data}
    taskList.put(pickle.dumps(d))
    self.data = b""

    def handle_close(self):
    if self in clients:
    print('remove from clients:',self)
    clients.remove(self)

    class HTTPServer(asyncore.dispatcher):

    def __init__(self, port):
    asyncore.dispatcher.__init__(self)
    self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    self.bind(("", port))
    self.listen(5)

    def handle_accept(self):
    conn, addr = self.accept()
    print('a new customer!')
    HTTPChannel(self, conn, addr)


    #
    # try it out
    if __name__ == "__main__":
    s = HTTPServer(PORT)
    print ("serving at port", PORT, "...")

    #push data lock
    pushLock = threading.Lock()


    clients=[]

    cid = 0

    manager = Manager()

    taskList = manager.Queue()

    msgList = manager.Queue()


    h = Process(target=handler,args=(taskList,msgList))
    h.start()


    _thread.start_new_thread(sender,())
    print('entering loop')
    asyncore.loop()
     
    davy zhang, Oct 25, 2008
    #1
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.

Share This Page