new to python network programming is async_chat.push thread-safe?python3.0

D

davy zhang

I wrote this server to handle incoming messages in a process using
multiprocessing named "handler", and sending message in a Thread named
"sender", 'cause I think the async_chat object can not pass between
processes.

My project is a network gate server with many complex logic handler
behind, so I use multiprocessing to handle them separately and send
back the clients later when done.
To use the server multicore cpu I tried to separate the send and
receive function in different process but it seems can not be done :)

I just get questions about this design:
1. is async_chat.push thread-safe? 'Cause I found random errors
reporting push fifo queue out of index 0 sometimes
2. is the whole design odd in any way?


here is my code

import asyncore, asynchat
import os, socket, string
from multiprocessing import Process,Manager
import pickle
import _thread

PORT = 80

policyRequest = b"<policy-file-request/>"
policyReturn = b"""<cross-domain-policy>
<allow-access-from domain="*" to-ports="*" />
</cross-domain-policy> \x00"""

def handler(taskList,msgList):
while 1:
print('getting task')
item = pickle.loads(taskList.get())
print('item before handle ', item)
item['msg'] += b' hanlded done'
msgList.put(pickle.dumps(item))

def findClient(id):
for item in clients:
if item.idx == id:
return item

def sender():
global msgList
while 1:
item = pickle.loads(msgList.get())
#print time()
c = findClient(item['cid'])
#print time()
c.push(item['msg'])
print('msg sent ',item['msg'])
#print time()

class HTTPChannel(asynchat.async_chat):

def __init__(self, server, sock, addr):
global cid;
asynchat.async_chat.__init__(self, sock)
self.set_terminator(b"\x00")
self.data = b""
cid += 1
self.idx = cid
if not self in clients:
clients.append(self)

def collect_incoming_data(self, data):
self.data = self.data + data
print(data)

def found_terminator(self):
global taskList
print("found",self.data)
if self.data == policyRequest:
self.push(policyReturn)
else:
d = {'cid':self.idx,'msg':self.data}
taskList.put(pickle.dumps(d))
self.data = b""

def handle_close(self):
if self in clients:
clients.remove(self)

class HTTPServer(asyncore.dispatcher):

def __init__(self, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(("", port))
self.listen(5)

def handle_accept(self):
conn, addr = self.accept()
HTTPChannel(self, conn, addr)


#
# try it out
if __name__ == "__main__":
s = HTTPServer(PORT)
print ("serving at port", PORT, "...")

#clients sock obj list stored for further use
clients=[]

#client id auto increasement
cid = 0

manager = Manager()
taskList = manager.Queue()
msgList = manager.Queue()


h = Process(target=handler,args=(taskList,msgList))
h.start()


_thread.start_new_thread(sender,())
print('entering loop')

asyncore.loop()
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,756
Messages
2,569,533
Members
45,007
Latest member
OrderFitnessKetoCapsules

Latest Threads

Top