new to python network programming is async_chat.push thread-safe?python3.0

Discussion in 'Python' started by davy zhang, Oct 24, 2008.

  1. davy zhang

    davy zhang Guest

    I wrote this server to handle incoming messages in a process using
    multiprocessing named "handler", and sending message in a Thread named
    "sender", 'cause I think the async_chat object can not pass between
    processes.

    My project is a network gate server with many complex logic handler
    behind, so I use multiprocessing to handle them separately and send
    back the clients later when done.
    To use the server multicore cpu I tried to separate the send and
    receive function in different process but it seems can not be done :)

    I just get questions about this design:
    1. is async_chat.push thread-safe? 'Cause I found random errors
    reporting push fifo queue out of index 0 sometimes
    2. is the whole design odd in any way?


    here is my code

    import asyncore, asynchat
    import os, socket, string
    from multiprocessing import Process,Manager
    import pickle
    import _thread

    PORT = 80

    policyRequest = b"<policy-file-request/>"
    policyReturn = b"""<cross-domain-policy>
    <allow-access-from domain="*" to-ports="*" />
    </cross-domain-policy> \x00"""

    def handler(taskList,msgList):
    while 1:
    print('getting task')
    item = pickle.loads(taskList.get())
    print('item before handle ', item)
    item['msg'] += b' hanlded done'
    msgList.put(pickle.dumps(item))

    def findClient(id):
    for item in clients:
    if item.idx == id:
    return item

    def sender():
    global msgList
    while 1:
    item = pickle.loads(msgList.get())
    #print time()
    c = findClient(item['cid'])
    #print time()
    c.push(item['msg'])
    print('msg sent ',item['msg'])
    #print time()

    class HTTPChannel(asynchat.async_chat):

    def __init__(self, server, sock, addr):
    global cid;
    asynchat.async_chat.__init__(self, sock)
    self.set_terminator(b"\x00")
    self.data = b""
    cid += 1
    self.idx = cid
    if not self in clients:
    clients.append(self)

    def collect_incoming_data(self, data):
    self.data = self.data + data
    print(data)

    def found_terminator(self):
    global taskList
    print("found",self.data)
    if self.data == policyRequest:
    self.push(policyReturn)
    else:
    d = {'cid':self.idx,'msg':self.data}
    taskList.put(pickle.dumps(d))
    self.data = b""

    def handle_close(self):
    if self in clients:
    clients.remove(self)

    class HTTPServer(asyncore.dispatcher):

    def __init__(self, port):
    asyncore.dispatcher.__init__(self)
    self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    self.bind(("", port))
    self.listen(5)

    def handle_accept(self):
    conn, addr = self.accept()
    HTTPChannel(self, conn, addr)


    #
    # try it out
    if __name__ == "__main__":
    s = HTTPServer(PORT)
    print ("serving at port", PORT, "...")

    #clients sock obj list stored for further use
    clients=[]

    #client id auto increasement
    cid = 0

    manager = Manager()
    taskList = manager.Queue()
    msgList = manager.Queue()


    h = Process(target=handler,args=(taskList,msgList))
    h.start()


    _thread.start_new_thread(sender,())
    print('entering loop')

    asyncore.loop()
     
    davy zhang, Oct 24, 2008
    #1
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Jos
    Replies:
    3
    Views:
    858
  2. XyZaa
    Replies:
    0
    Views:
    603
    XyZaa
    Jul 19, 2007
  3. Replies:
    5
    Views:
    513
    Josiah Carlson
    May 14, 2008
  4. Gabriel Rossetti
    Replies:
    0
    Views:
    1,399
    Gabriel Rossetti
    Aug 29, 2008
  5. John Nagle
    Replies:
    5
    Views:
    513
    John Nagle
    Mar 12, 2012
Loading...

Share This Page