|  | # | 
|  | # Simple benchmarks for the multiprocessing package | 
|  | # | 
|  | # Copyright (c) 2006-2008, R Oudkerk | 
|  | # All rights reserved. | 
|  | # | 
|  |  | 
|  | import time, sys, multiprocessing, threading, Queue, gc | 
|  |  | 
|  | if sys.platform == 'win32': | 
|  | _timer = time.clock | 
|  | else: | 
|  | _timer = time.time | 
|  |  | 
|  | delta = 1 | 
|  |  | 
|  |  | 
|  | #### TEST_QUEUESPEED | 
|  |  | 
|  | def queuespeed_func(q, c, iterations): | 
|  | a = '0' * 256 | 
|  | c.acquire() | 
|  | c.notify() | 
|  | c.release() | 
|  |  | 
|  | for i in xrange(iterations): | 
|  | q.put(a) | 
|  |  | 
|  | q.put('STOP') | 
|  |  | 
|  | def test_queuespeed(Process, q, c): | 
|  | elapsed = 0 | 
|  | iterations = 1 | 
|  |  | 
|  | while elapsed < delta: | 
|  | iterations *= 2 | 
|  |  | 
|  | p = Process(target=queuespeed_func, args=(q, c, iterations)) | 
|  | c.acquire() | 
|  | p.start() | 
|  | c.wait() | 
|  | c.release() | 
|  |  | 
|  | result = None | 
|  | t = _timer() | 
|  |  | 
|  | while result != 'STOP': | 
|  | result = q.get() | 
|  |  | 
|  | elapsed = _timer() - t | 
|  |  | 
|  | p.join() | 
|  |  | 
|  | print iterations, 'objects passed through the queue in', elapsed, 'seconds' | 
|  | print 'average number/sec:', iterations/elapsed | 
|  |  | 
|  |  | 
|  | #### TEST_PIPESPEED | 
|  |  | 
|  | def pipe_func(c, cond, iterations): | 
|  | a = '0' * 256 | 
|  | cond.acquire() | 
|  | cond.notify() | 
|  | cond.release() | 
|  |  | 
|  | for i in xrange(iterations): | 
|  | c.send(a) | 
|  |  | 
|  | c.send('STOP') | 
|  |  | 
|  | def test_pipespeed(): | 
|  | c, d = multiprocessing.Pipe() | 
|  | cond = multiprocessing.Condition() | 
|  | elapsed = 0 | 
|  | iterations = 1 | 
|  |  | 
|  | while elapsed < delta: | 
|  | iterations *= 2 | 
|  |  | 
|  | p = multiprocessing.Process(target=pipe_func, | 
|  | args=(d, cond, iterations)) | 
|  | cond.acquire() | 
|  | p.start() | 
|  | cond.wait() | 
|  | cond.release() | 
|  |  | 
|  | result = None | 
|  | t = _timer() | 
|  |  | 
|  | while result != 'STOP': | 
|  | result = c.recv() | 
|  |  | 
|  | elapsed = _timer() - t | 
|  | p.join() | 
|  |  | 
|  | print iterations, 'objects passed through connection in',elapsed,'seconds' | 
|  | print 'average number/sec:', iterations/elapsed | 
|  |  | 
|  |  | 
|  | #### TEST_SEQSPEED | 
|  |  | 
|  | def test_seqspeed(seq): | 
|  | elapsed = 0 | 
|  | iterations = 1 | 
|  |  | 
|  | while elapsed < delta: | 
|  | iterations *= 2 | 
|  |  | 
|  | t = _timer() | 
|  |  | 
|  | for i in xrange(iterations): | 
|  | a = seq[5] | 
|  |  | 
|  | elapsed = _timer()-t | 
|  |  | 
|  | print iterations, 'iterations in', elapsed, 'seconds' | 
|  | print 'average number/sec:', iterations/elapsed | 
|  |  | 
|  |  | 
|  | #### TEST_LOCK | 
|  |  | 
|  | def test_lockspeed(l): | 
|  | elapsed = 0 | 
|  | iterations = 1 | 
|  |  | 
|  | while elapsed < delta: | 
|  | iterations *= 2 | 
|  |  | 
|  | t = _timer() | 
|  |  | 
|  | for i in xrange(iterations): | 
|  | l.acquire() | 
|  | l.release() | 
|  |  | 
|  | elapsed = _timer()-t | 
|  |  | 
|  | print iterations, 'iterations in', elapsed, 'seconds' | 
|  | print 'average number/sec:', iterations/elapsed | 
|  |  | 
|  |  | 
|  | #### TEST_CONDITION | 
|  |  | 
|  | def conditionspeed_func(c, N): | 
|  | c.acquire() | 
|  | c.notify() | 
|  |  | 
|  | for i in xrange(N): | 
|  | c.wait() | 
|  | c.notify() | 
|  |  | 
|  | c.release() | 
|  |  | 
|  | def test_conditionspeed(Process, c): | 
|  | elapsed = 0 | 
|  | iterations = 1 | 
|  |  | 
|  | while elapsed < delta: | 
|  | iterations *= 2 | 
|  |  | 
|  | c.acquire() | 
|  | p = Process(target=conditionspeed_func, args=(c, iterations)) | 
|  | p.start() | 
|  |  | 
|  | c.wait() | 
|  |  | 
|  | t = _timer() | 
|  |  | 
|  | for i in xrange(iterations): | 
|  | c.notify() | 
|  | c.wait() | 
|  |  | 
|  | elapsed = _timer()-t | 
|  |  | 
|  | c.release() | 
|  | p.join() | 
|  |  | 
|  | print iterations * 2, 'waits in', elapsed, 'seconds' | 
|  | print 'average number/sec:', iterations * 2 / elapsed | 
|  |  | 
|  | #### | 
|  |  | 
|  | def test(): | 
|  | manager = multiprocessing.Manager() | 
|  |  | 
|  | gc.disable() | 
|  |  | 
|  | print '\n\t######## testing Queue.Queue\n' | 
|  | test_queuespeed(threading.Thread, Queue.Queue(), | 
|  | threading.Condition()) | 
|  | print '\n\t######## testing multiprocessing.Queue\n' | 
|  | test_queuespeed(multiprocessing.Process, multiprocessing.Queue(), | 
|  | multiprocessing.Condition()) | 
|  | print '\n\t######## testing Queue managed by server process\n' | 
|  | test_queuespeed(multiprocessing.Process, manager.Queue(), | 
|  | manager.Condition()) | 
|  | print '\n\t######## testing multiprocessing.Pipe\n' | 
|  | test_pipespeed() | 
|  |  | 
|  | print | 
|  |  | 
|  | print '\n\t######## testing list\n' | 
|  | test_seqspeed(range(10)) | 
|  | print '\n\t######## testing list managed by server process\n' | 
|  | test_seqspeed(manager.list(range(10))) | 
|  | print '\n\t######## testing Array("i", ..., lock=False)\n' | 
|  | test_seqspeed(multiprocessing.Array('i', range(10), lock=False)) | 
|  | print '\n\t######## testing Array("i", ..., lock=True)\n' | 
|  | test_seqspeed(multiprocessing.Array('i', range(10), lock=True)) | 
|  |  | 
|  | print | 
|  |  | 
|  | print '\n\t######## testing threading.Lock\n' | 
|  | test_lockspeed(threading.Lock()) | 
|  | print '\n\t######## testing threading.RLock\n' | 
|  | test_lockspeed(threading.RLock()) | 
|  | print '\n\t######## testing multiprocessing.Lock\n' | 
|  | test_lockspeed(multiprocessing.Lock()) | 
|  | print '\n\t######## testing multiprocessing.RLock\n' | 
|  | test_lockspeed(multiprocessing.RLock()) | 
|  | print '\n\t######## testing lock managed by server process\n' | 
|  | test_lockspeed(manager.Lock()) | 
|  | print '\n\t######## testing rlock managed by server process\n' | 
|  | test_lockspeed(manager.RLock()) | 
|  |  | 
|  | print | 
|  |  | 
|  | print '\n\t######## testing threading.Condition\n' | 
|  | test_conditionspeed(threading.Thread, threading.Condition()) | 
|  | print '\n\t######## testing multiprocessing.Condition\n' | 
|  | test_conditionspeed(multiprocessing.Process, multiprocessing.Condition()) | 
|  | print '\n\t######## testing condition managed by a server process\n' | 
|  | test_conditionspeed(multiprocessing.Process, manager.Condition()) | 
|  |  | 
|  | gc.enable() | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | multiprocessing.freeze_support() | 
|  | test() |