M
mk
Hello everyone,
This time I decided to test communication overhead in multithreaded /
multiprocess communication. The results are rather disappointing, that
is, communication overhead seems to be very high. In each of the
following functions, I send 10,000 numbers to the function / 10 threads
/ 10 processes, which simply returns it in its respective way.
Function: notfun Best: 0.00622 sec Average: 0.00633 sec
(simple function)
Function: threadsemfun Best: 0.64428 sec Average: 0.64791 sec
(10 threads synchronizing using semaphore)
Function: threadlockfun Best: 0.66288 sec Average: 0.66453 sec
(10 threads synchronizing using locks)
Function: procqueuefun Best: 1.16291 sec Average: 1.17217 sec
(10 processes communicating with main process using queues)
Function: procpoolfun Best: 1.18648 sec Average: 1.19577 sec
(a pool of 10 processes)
If I'm doing smth wrong in the code below (smth that would result in
performance suffering), please point it out.
Code:
import threading
import multiprocessing
import time
import timeit
def time_fun(fun):
t = timeit.Timer(stmt = fun, setup = "from __main__ import " +
fun.__name__)
results = t.repeat(repeat=10, number=1)
best_result = min(results)
avg = sum(results) / len(results)
print "Function: %-15s Best: %5.5f sec Average: %5.5f sec"
% (fun.__name__, best_result, avg)
def notfun():
inputlist = range(0,10000)
reslist = []
for x in range(len(inputlist)):
reslist.append(inputlist.pop())
def threadsemfun():
def tcalc(sem, inputlist, reslist, tid, activitylist):
while len(inputlist) > 0:
sem.acquire()
try:
x = inputlist.pop()
except IndexError:
sem.release()
return
#activitylist[tid] += 1
reslist.append(x)
sem.release()
inputlist = range(0,10000)
#print "before: ", sum(inputlist)
reslist = []
tlist = []
activitylist = [ 0 for x in range(0,10) ]
sem = threading.Semaphore()
for t in range(0,10):
tlist.append(threading.Thread(target=tcalc, args=(sem,
inputlist, reslist, t, activitylist)))
for t in tlist:
t.start()
for t in tlist:
t.join()
#print "after: ", sum(reslist)
#print "thread action count:", activitylist
def threadlockfun():
def tcalc(lock, inputlist, reslist, tid, activitylist):
while True:
lock.acquire()
if len(inputlist) == 0:
lock.release()
return
x = inputlist.pop()
reslist.append(x)
#activitylist[tid] += 1
lock.release()
inputlist = range(0,10000)
#print "before: ", sum(inputlist)
reslist = []
tlist = []
activitylist = [ 0 for x in range(0,10) ]
sem = threading.Semaphore()
for t in range(0,10):
tlist.append(threading.Thread(target=tcalc, args=(sem,
inputlist, reslist, t, activitylist)))
for t in tlist:
t.start()
for t in tlist:
t.join()
#print "after: ", sum(reslist)
#print "thread action count:", activitylist
def pf(x):
return x
def procpoolfun():
pool = multiprocessing.Pool(processes=10)
inputlist = range(0,10000)
reslist = []
i, j, jmax = 0, 10, len(inputlist)
#print "before: ", sum(inputlist)
while j <= jmax:
res = pool.map_async(pf, inputlist[i:j])
reslist.extend(res.get())
i += 10
j += 10
#print "after: ", sum(reslist)
def procqueuefun():
def pqf(qin, qout):
pid = multiprocessing.current_process().pid
while True:
x = qin.get()
if x == 'STOP':
return
qout.put((pid, x))
qin = multiprocessing.Queue()
qout = multiprocessing.Queue()
plist = []
activity = dict()
for i in range(0,10):
p = multiprocessing.Process(target = pqf, args=(qin, qout))
p.start()
plist.append(p)
activity[p.pid] = 0
inputlist = range(0,10000)
reslist = []
#print "before:", sum(inputlist)
ilen = len(inputlist)
x = 0
while x != ilen:
for i in range(0,10):
qin.put(inputlist[x+i])
for i in range(0,10):
pid, res = qout.get()
#activity[pid] = activity[pid] + 1
reslist.append(res)
x += 10
for i in range(0,10):
qin.put('STOP')
for i in range(len(plist)):
plist.join()
#print "after:", sum(reslist)
#print "activity", activity
if __name__ == "__main__":
time_fun(notfun)
time_fun(threadsemfun)
time_fun(threadlockfun)
time_fun(procqueuefun)
time_fun(procpoolfun)
This time I decided to test communication overhead in multithreaded /
multiprocess communication. The results are rather disappointing, that
is, communication overhead seems to be very high. In each of the
following functions, I send 10,000 numbers to the function / 10 threads
/ 10 processes, which simply returns it in its respective way.
Function: notfun Best: 0.00622 sec Average: 0.00633 sec
(simple function)
Function: threadsemfun Best: 0.64428 sec Average: 0.64791 sec
(10 threads synchronizing using semaphore)
Function: threadlockfun Best: 0.66288 sec Average: 0.66453 sec
(10 threads synchronizing using locks)
Function: procqueuefun Best: 1.16291 sec Average: 1.17217 sec
(10 processes communicating with main process using queues)
Function: procpoolfun Best: 1.18648 sec Average: 1.19577 sec
(a pool of 10 processes)
If I'm doing smth wrong in the code below (smth that would result in
performance suffering), please point it out.
Code:
import threading
import multiprocessing
import time
import timeit
def time_fun(fun):
t = timeit.Timer(stmt = fun, setup = "from __main__ import " +
fun.__name__)
results = t.repeat(repeat=10, number=1)
best_result = min(results)
avg = sum(results) / len(results)
print "Function: %-15s Best: %5.5f sec Average: %5.5f sec"
% (fun.__name__, best_result, avg)
def notfun():
inputlist = range(0,10000)
reslist = []
for x in range(len(inputlist)):
reslist.append(inputlist.pop())
def threadsemfun():
def tcalc(sem, inputlist, reslist, tid, activitylist):
while len(inputlist) > 0:
sem.acquire()
try:
x = inputlist.pop()
except IndexError:
sem.release()
return
#activitylist[tid] += 1
reslist.append(x)
sem.release()
inputlist = range(0,10000)
#print "before: ", sum(inputlist)
reslist = []
tlist = []
activitylist = [ 0 for x in range(0,10) ]
sem = threading.Semaphore()
for t in range(0,10):
tlist.append(threading.Thread(target=tcalc, args=(sem,
inputlist, reslist, t, activitylist)))
for t in tlist:
t.start()
for t in tlist:
t.join()
#print "after: ", sum(reslist)
#print "thread action count:", activitylist
def threadlockfun():
def tcalc(lock, inputlist, reslist, tid, activitylist):
while True:
lock.acquire()
if len(inputlist) == 0:
lock.release()
return
x = inputlist.pop()
reslist.append(x)
#activitylist[tid] += 1
lock.release()
inputlist = range(0,10000)
#print "before: ", sum(inputlist)
reslist = []
tlist = []
activitylist = [ 0 for x in range(0,10) ]
sem = threading.Semaphore()
for t in range(0,10):
tlist.append(threading.Thread(target=tcalc, args=(sem,
inputlist, reslist, t, activitylist)))
for t in tlist:
t.start()
for t in tlist:
t.join()
#print "after: ", sum(reslist)
#print "thread action count:", activitylist
def pf(x):
return x
def procpoolfun():
pool = multiprocessing.Pool(processes=10)
inputlist = range(0,10000)
reslist = []
i, j, jmax = 0, 10, len(inputlist)
#print "before: ", sum(inputlist)
while j <= jmax:
res = pool.map_async(pf, inputlist[i:j])
reslist.extend(res.get())
i += 10
j += 10
#print "after: ", sum(reslist)
def procqueuefun():
def pqf(qin, qout):
pid = multiprocessing.current_process().pid
while True:
x = qin.get()
if x == 'STOP':
return
qout.put((pid, x))
qin = multiprocessing.Queue()
qout = multiprocessing.Queue()
plist = []
activity = dict()
for i in range(0,10):
p = multiprocessing.Process(target = pqf, args=(qin, qout))
p.start()
plist.append(p)
activity[p.pid] = 0
inputlist = range(0,10000)
reslist = []
#print "before:", sum(inputlist)
ilen = len(inputlist)
x = 0
while x != ilen:
for i in range(0,10):
qin.put(inputlist[x+i])
for i in range(0,10):
pid, res = qout.get()
#activity[pid] = activity[pid] + 1
reslist.append(res)
x += 10
for i in range(0,10):
qin.put('STOP')
for i in range(len(plist)):
plist.join()
#print "after:", sum(reslist)
#print "activity", activity
if __name__ == "__main__":
time_fun(notfun)
time_fun(threadsemfun)
time_fun(threadlockfun)
time_fun(procqueuefun)
time_fun(procpoolfun)