thread, multiprocessing: communication overhead

M

mk

Hello everyone,

This time I decided to test communication overhead in multithreaded /
multiprocess communication. The results are rather disappointing, that
is, communication overhead seems to be very high. In each of the
following functions, I send 10,000 numbers to the function / 10 threads
/ 10 processes, which simply returns it in its respective way.


Function: notfun Best: 0.00622 sec Average: 0.00633 sec
(simple function)

Function: threadsemfun Best: 0.64428 sec Average: 0.64791 sec
(10 threads synchronizing using semaphore)

Function: threadlockfun Best: 0.66288 sec Average: 0.66453 sec
(10 threads synchronizing using locks)

Function: procqueuefun Best: 1.16291 sec Average: 1.17217 sec
(10 processes communicating with main process using queues)

Function: procpoolfun Best: 1.18648 sec Average: 1.19577 sec
(a pool of 10 processes)

If I'm doing smth wrong in the code below (smth that would result in
performance suffering), please point it out.

Code:

import threading
import multiprocessing
import time
import timeit


def time_fun(fun):
t = timeit.Timer(stmt = fun, setup = "from __main__ import " +
fun.__name__)
results = t.repeat(repeat=10, number=1)
best_result = min(results)
avg = sum(results) / len(results)
print "Function: %-15s Best: %5.5f sec Average: %5.5f sec"
% (fun.__name__, best_result, avg)


def notfun():
inputlist = range(0,10000)
reslist = []
for x in range(len(inputlist)):
reslist.append(inputlist.pop())

def threadsemfun():
def tcalc(sem, inputlist, reslist, tid, activitylist):
while len(inputlist) > 0:
sem.acquire()
try:
x = inputlist.pop()
except IndexError:
sem.release()
return
#activitylist[tid] += 1
reslist.append(x)
sem.release()
inputlist = range(0,10000)
#print "before: ", sum(inputlist)
reslist = []
tlist = []
activitylist = [ 0 for x in range(0,10) ]
sem = threading.Semaphore()
for t in range(0,10):
tlist.append(threading.Thread(target=tcalc, args=(sem,
inputlist, reslist, t, activitylist)))
for t in tlist:
t.start()
for t in tlist:
t.join()
#print "after: ", sum(reslist)
#print "thread action count:", activitylist


def threadlockfun():
def tcalc(lock, inputlist, reslist, tid, activitylist):
while True:
lock.acquire()
if len(inputlist) == 0:
lock.release()
return
x = inputlist.pop()
reslist.append(x)
#activitylist[tid] += 1
lock.release()
inputlist = range(0,10000)
#print "before: ", sum(inputlist)
reslist = []
tlist = []
activitylist = [ 0 for x in range(0,10) ]
sem = threading.Semaphore()
for t in range(0,10):
tlist.append(threading.Thread(target=tcalc, args=(sem,
inputlist, reslist, t, activitylist)))
for t in tlist:
t.start()
for t in tlist:
t.join()
#print "after: ", sum(reslist)
#print "thread action count:", activitylist

def pf(x):
return x

def procpoolfun():
pool = multiprocessing.Pool(processes=10)
inputlist = range(0,10000)
reslist = []
i, j, jmax = 0, 10, len(inputlist)
#print "before: ", sum(inputlist)
while j <= jmax:
res = pool.map_async(pf, inputlist[i:j])
reslist.extend(res.get())
i += 10
j += 10
#print "after: ", sum(reslist)

def procqueuefun():
def pqf(qin, qout):
pid = multiprocessing.current_process().pid
while True:
x = qin.get()
if x == 'STOP':
return
qout.put((pid, x))
qin = multiprocessing.Queue()
qout = multiprocessing.Queue()
plist = []
activity = dict()
for i in range(0,10):
p = multiprocessing.Process(target = pqf, args=(qin, qout))
p.start()
plist.append(p)
activity[p.pid] = 0
inputlist = range(0,10000)
reslist = []
#print "before:", sum(inputlist)
ilen = len(inputlist)
x = 0
while x != ilen:
for i in range(0,10):
qin.put(inputlist[x+i])
for i in range(0,10):
pid, res = qout.get()
#activity[pid] = activity[pid] + 1
reslist.append(res)
x += 10
for i in range(0,10):
qin.put('STOP')
for i in range(len(plist)):
plist.join()

#print "after:", sum(reslist)
#print "activity", activity

if __name__ == "__main__":
time_fun(notfun)
time_fun(threadsemfun)
time_fun(threadlockfun)
time_fun(procqueuefun)
time_fun(procpoolfun)
 
A

Aaron Brady

Hello everyone,

This time I decided to test communication overhead in multithreaded /
multiprocess communication. The results are rather disappointing, that
is, communication overhead seems to be very high. In each of the
following functions, I send 10,000 numbers to the function / 10 threads
/ 10 processes, which simply returns it in its respective way.

Function: notfun            Best: 0.00622 sec   Average: 0.00633 sec
(simple function)

Function: threadsemfun      Best: 0.64428 sec   Average: 0.64791 sec
(10 threads synchronizing using semaphore)

Function: threadlockfun     Best: 0.66288 sec   Average: 0.66453 sec
(10 threads synchronizing using locks)

Function: procqueuefun      Best: 1.16291 sec   Average: 1.17217 sec
(10 processes communicating with main process using queues)

Function: procpoolfun       Best: 1.18648 sec   Average: 1.19577 sec
(a pool of 10 processes)

If I'm doing smth wrong in the code below (smth that would result in
performance suffering), please point it out. snips
def threadsemfun():
         sem = threading.Semaphore()
def threadlockfun():
         sem = threading.Semaphore()

You used a Semaphore for both lock objects here.

'multiprocessing' is a really high level layer that makes a lot of
decisions about trade-offs, has highly redundant communication, and is
really easy to use. If you want to save a byte, you'll have to make
your own decisions about trade-offs and redundancies (possibly even
looking at real result data to make them).

I actually think 'multiprocessing' is really good, and even if I hand-
wrote my own IPC, it would be slower!

CMIIW, but I believe your timing function includes the time to launch
the actual processes and threads, create the synch. objects, etc. You
might try it again, creating them first, starting the timer, then
loading them.
 
M

mk

Aaron said:
snips

You used a Semaphore for both lock objects here.

Right... I corrected that (simply changed to threading.Lock() in
threadlockfun) and the result is much better, though still an order of
magnitude worse than plain function:

Function: threadlockfun Best: 0.08665 sec Average: 0.08910 sec
Function: notfun Best: 0.00987 sec Average: 0.01003 sec

'multiprocessing' is a really high level layer that makes a lot of
decisions about trade-offs, has highly redundant communication, and is
really easy to use. If you want to save a byte, you'll have to make
your own decisions about trade-offs and redundancies (possibly even
looking at real result data to make them).

Hmm, do you think that lower-level 'thread' module might work more
efficiently?
I actually think 'multiprocessing' is really good, and even if I hand-
wrote my own IPC, it would be slower!

CMIIW, but I believe your timing function includes the time to launch
the actual processes and threads, create the synch. objects, etc. You
might try it again, creating them first, starting the timer, then
loading them.

Except I don't know how to do that using timeit.Timer. :-/
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,874
Messages
2,569,924
Members
46,181
Latest member
GlycoRenewBlood

Latest Threads

Top