D
davy zhang
I wrote this server to handle incoming messages in a process using
multiprocessing named "handler", and sending message in a Thread named
"sender", 'cause I think the async_chat object can not pass between
processes.
My project is a network gate server with many complex logic handler
behind, so I use multiprocessing to handle them separately and send
back the clients later when done.
To use the server multicore cpu I tried to separate the send and
receive function in different process but it seems can not be done
I just get questions about this design:
1. is async_chat.push thread-safe? 'Cause I found random errors
reporting push fifo queue out of index 0 sometimes
2. is the whole design odd in any way?
here is my code
import asyncore, asynchat
import os, socket, string
from multiprocessing import Process,Manager
import pickle
import _thread
PORT = 80
policyRequest = b"<policy-file-request/>"
policyReturn = b"""<cross-domain-policy>
<allow-access-from domain="*" to-ports="*" />
</cross-domain-policy> \x00"""
def handler(taskList,msgList):
while 1:
print('getting task')
item = pickle.loads(taskList.get())
print('item before handle ', item)
item['msg'] += b' hanlded done'
msgList.put(pickle.dumps(item))
def findClient(id):
for item in clients:
if item.idx == id:
return item
def sender():
global msgList
while 1:
item = pickle.loads(msgList.get())
#print time()
c = findClient(item['cid'])
#print time()
c.push(item['msg'])
print('msg sent ',item['msg'])
#print time()
class HTTPChannel(asynchat.async_chat):
def __init__(self, server, sock, addr):
global cid;
asynchat.async_chat.__init__(self, sock)
self.set_terminator(b"\x00")
self.data = b""
cid += 1
self.idx = cid
if not self in clients:
clients.append(self)
def collect_incoming_data(self, data):
self.data = self.data + data
print(data)
def found_terminator(self):
global taskList
print("found",self.data)
if self.data == policyRequest:
self.push(policyReturn)
else:
d = {'cid':self.idx,'msg':self.data}
taskList.put(pickle.dumps(d))
self.data = b""
def handle_close(self):
if self in clients:
clients.remove(self)
class HTTPServer(asyncore.dispatcher):
def __init__(self, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(("", port))
self.listen(5)
def handle_accept(self):
conn, addr = self.accept()
HTTPChannel(self, conn, addr)
#
# try it out
if __name__ == "__main__":
s = HTTPServer(PORT)
print ("serving at port", PORT, "...")
#clients sock obj list stored for further use
clients=[]
#client id auto increasement
cid = 0
manager = Manager()
taskList = manager.Queue()
msgList = manager.Queue()
h = Process(target=handler,args=(taskList,msgList))
h.start()
_thread.start_new_thread(sender,())
print('entering loop')
asyncore.loop()
multiprocessing named "handler", and sending message in a Thread named
"sender", 'cause I think the async_chat object can not pass between
processes.
My project is a network gate server with many complex logic handler
behind, so I use multiprocessing to handle them separately and send
back the clients later when done.
To use the server multicore cpu I tried to separate the send and
receive function in different process but it seems can not be done
I just get questions about this design:
1. is async_chat.push thread-safe? 'Cause I found random errors
reporting push fifo queue out of index 0 sometimes
2. is the whole design odd in any way?
here is my code
import asyncore, asynchat
import os, socket, string
from multiprocessing import Process,Manager
import pickle
import _thread
PORT = 80
policyRequest = b"<policy-file-request/>"
policyReturn = b"""<cross-domain-policy>
<allow-access-from domain="*" to-ports="*" />
</cross-domain-policy> \x00"""
def handler(taskList,msgList):
while 1:
print('getting task')
item = pickle.loads(taskList.get())
print('item before handle ', item)
item['msg'] += b' hanlded done'
msgList.put(pickle.dumps(item))
def findClient(id):
for item in clients:
if item.idx == id:
return item
def sender():
global msgList
while 1:
item = pickle.loads(msgList.get())
#print time()
c = findClient(item['cid'])
#print time()
c.push(item['msg'])
print('msg sent ',item['msg'])
#print time()
class HTTPChannel(asynchat.async_chat):
def __init__(self, server, sock, addr):
global cid;
asynchat.async_chat.__init__(self, sock)
self.set_terminator(b"\x00")
self.data = b""
cid += 1
self.idx = cid
if not self in clients:
clients.append(self)
def collect_incoming_data(self, data):
self.data = self.data + data
print(data)
def found_terminator(self):
global taskList
print("found",self.data)
if self.data == policyRequest:
self.push(policyReturn)
else:
d = {'cid':self.idx,'msg':self.data}
taskList.put(pickle.dumps(d))
self.data = b""
def handle_close(self):
if self in clients:
clients.remove(self)
class HTTPServer(asyncore.dispatcher):
def __init__(self, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(("", port))
self.listen(5)
def handle_accept(self):
conn, addr = self.accept()
HTTPChannel(self, conn, addr)
#
# try it out
if __name__ == "__main__":
s = HTTPServer(PORT)
print ("serving at port", PORT, "...")
#clients sock obj list stored for further use
clients=[]
#client id auto increasement
cid = 0
manager = Manager()
taskList = manager.Queue()
msgList = manager.Queue()
h = Process(target=handler,args=(taskList,msgList))
h.start()
_thread.start_new_thread(sender,())
print('entering loop')
asyncore.loop()