# -*- coding: utf-8 -*- | |
# This file should be kept compatible with both Python 2.6 and Python >= 3.0. | |
from __future__ import division | |
from __future__ import print_function | |
""" | |
ccbench, a Python concurrency benchmark. | |
""" | |
import time | |
import os | |
import sys | |
import functools | |
import itertools | |
import threading | |
import subprocess | |
import socket | |
from optparse import OptionParser, SUPPRESS_HELP | |
import platform | |
# Compatibility | |
try: | |
xrange | |
except NameError: | |
xrange = range | |
try: | |
map = itertools.imap | |
except AttributeError: | |
pass | |
THROUGHPUT_DURATION = 2.0 | |
LATENCY_PING_INTERVAL = 0.1 | |
LATENCY_DURATION = 2.0 | |
BANDWIDTH_PACKET_SIZE = 1024 | |
BANDWIDTH_DURATION = 2.0 | |
def task_pidigits(): | |
"""Pi calculation (Python)""" | |
_map = map | |
_count = itertools.count | |
_islice = itertools.islice | |
def calc_ndigits(n): | |
# From http://shootout.alioth.debian.org/ | |
def gen_x(): | |
return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1)) | |
def compose(a, b): | |
aq, ar, as_, at = a | |
bq, br, bs, bt = b | |
return (aq * bq, | |
aq * br + ar * bt, | |
as_ * bq + at * bs, | |
as_ * br + at * bt) | |
def extract(z, j): | |
q, r, s, t = z | |
return (q*j + r) // (s*j + t) | |
def pi_digits(): | |
z = (1, 0, 0, 1) | |
x = gen_x() | |
while 1: | |
y = extract(z, 3) | |
while y != extract(z, 4): | |
z = compose(z, next(x)) | |
y = extract(z, 3) | |
z = compose((10, -10*y, 0, 1), z) | |
yield y | |
return list(_islice(pi_digits(), n)) | |
return calc_ndigits, (50, ) | |
def task_regex(): | |
"""regular expression (C)""" | |
# XXX this task gives horrendous latency results. | |
import re | |
# Taken from the `inspect` module | |
pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE) | |
with open(__file__, "r") as f: | |
arg = f.read(2000) | |
def findall(s): | |
t = time.time() | |
try: | |
return pat.findall(s) | |
finally: | |
print(time.time() - t) | |
return pat.findall, (arg, ) | |
def task_sort(): | |
"""list sorting (C)""" | |
def list_sort(l): | |
l = l[::-1] | |
l.sort() | |
return list_sort, (list(range(1000)), ) | |
def task_compress_zlib(): | |
"""zlib compression (C)""" | |
import zlib | |
with open(__file__, "rb") as f: | |
arg = f.read(5000) * 3 | |
def compress(s): | |
zlib.decompress(zlib.compress(s, 5)) | |
return compress, (arg, ) | |
def task_compress_bz2(): | |
"""bz2 compression (C)""" | |
import bz2 | |
with open(__file__, "rb") as f: | |
arg = f.read(3000) * 2 | |
def compress(s): | |
bz2.compress(s) | |
return compress, (arg, ) | |
def task_hashing(): | |
"""SHA1 hashing (C)""" | |
import hashlib | |
with open(__file__, "rb") as f: | |
arg = f.read(5000) * 30 | |
def compute(s): | |
hashlib.sha1(s).digest() | |
return compute, (arg, ) | |
throughput_tasks = [task_pidigits, task_regex] | |
for mod in 'bz2', 'hashlib': | |
try: | |
globals()[mod] = __import__(mod) | |
except ImportError: | |
globals()[mod] = None | |
# For whatever reasons, zlib gives irregular results, so we prefer bz2 or | |
# hashlib if available. | |
# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards) | |
if bz2 is not None: | |
throughput_tasks.append(task_compress_bz2) | |
elif hashlib is not None: | |
throughput_tasks.append(task_hashing) | |
else: | |
throughput_tasks.append(task_compress_zlib) | |
latency_tasks = throughput_tasks | |
bandwidth_tasks = [task_pidigits] | |
class TimedLoop: | |
def __init__(self, func, args): | |
self.func = func | |
self.args = args | |
def __call__(self, start_time, min_duration, end_event, do_yield=False): | |
step = 20 | |
niters = 0 | |
duration = 0.0 | |
_time = time.time | |
_sleep = time.sleep | |
_func = self.func | |
_args = self.args | |
t1 = start_time | |
while True: | |
for i in range(step): | |
_func(*_args) | |
t2 = _time() | |
# If another thread terminated, the current measurement is invalid | |
# => return the previous one. | |
if end_event: | |
return niters, duration | |
niters += step | |
duration = t2 - start_time | |
if duration >= min_duration: | |
end_event.append(None) | |
return niters, duration | |
if t2 - t1 < 0.01: | |
# Minimize interference of measurement on overall runtime | |
step = step * 3 // 2 | |
elif do_yield: | |
# OS scheduling of Python threads is sometimes so bad that we | |
# have to force thread switching ourselves, otherwise we get | |
# completely useless results. | |
_sleep(0.0001) | |
t1 = t2 | |
def run_throughput_test(func, args, nthreads): | |
assert nthreads >= 1 | |
# Warm up | |
func(*args) | |
results = [] | |
loop = TimedLoop(func, args) | |
end_event = [] | |
if nthreads == 1: | |
# Pure single-threaded performance, without any switching or | |
# synchronization overhead. | |
start_time = time.time() | |
results.append(loop(start_time, THROUGHPUT_DURATION, | |
end_event, do_yield=False)) | |
return results | |
started = False | |
ready_cond = threading.Condition() | |
start_cond = threading.Condition() | |
ready = [] | |
def run(): | |
with ready_cond: | |
ready.append(None) | |
ready_cond.notify() | |
with start_cond: | |
while not started: | |
start_cond.wait() | |
results.append(loop(start_time, THROUGHPUT_DURATION, | |
end_event, do_yield=True)) | |
threads = [] | |
for i in range(nthreads): | |
threads.append(threading.Thread(target=run)) | |
for t in threads: | |
t.setDaemon(True) | |
t.start() | |
# We don't want measurements to include thread startup overhead, | |
# so we arrange for timing to start after all threads are ready. | |
with ready_cond: | |
while len(ready) < nthreads: | |
ready_cond.wait() | |
with start_cond: | |
start_time = time.time() | |
started = True | |
start_cond.notify(nthreads) | |
for t in threads: | |
t.join() | |
return results | |
def run_throughput_tests(max_threads): | |
for task in throughput_tasks: | |
print(task.__doc__) | |
print() | |
func, args = task() | |
nthreads = 1 | |
baseline_speed = None | |
while nthreads <= max_threads: | |
results = run_throughput_test(func, args, nthreads) | |
# Taking the max duration rather than average gives pessimistic | |
# results rather than optimistic. | |
speed = sum(r[0] for r in results) / max(r[1] for r in results) | |
print("threads=%d: %d" % (nthreads, speed), end="") | |
if baseline_speed is None: | |
print(" iterations/s.") | |
baseline_speed = speed | |
else: | |
print(" ( %d %%)" % (speed / baseline_speed * 100)) | |
nthreads += 1 | |
print() | |
LAT_END = "END" | |
def _sendto(sock, s, addr): | |
sock.sendto(s.encode('ascii'), addr) | |
def _recv(sock, n): | |
return sock.recv(n).decode('ascii') | |
def latency_client(addr, nb_pings, interval): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
_time = time.time | |
_sleep = time.sleep | |
def _ping(): | |
_sendto(sock, "%r\n" % _time(), addr) | |
# The first ping signals the parent process that we are ready. | |
_ping() | |
# We give the parent a bit of time to notice. | |
_sleep(1.0) | |
for i in range(nb_pings): | |
_sleep(interval) | |
_ping() | |
_sendto(sock, LAT_END + "\n", addr) | |
def run_latency_client(**kwargs): | |
cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] | |
cmd_line.extend(['--latclient', repr(kwargs)]) | |
return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, | |
#stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
def run_latency_test(func, args, nthreads): | |
# Create a listening socket to receive the pings. We use UDP which should | |
# be painlessly cross-platform. | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.bind(("127.0.0.1", 0)) | |
addr = sock.getsockname() | |
interval = LATENCY_PING_INTERVAL | |
duration = LATENCY_DURATION | |
nb_pings = int(duration / interval) | |
results = [] | |
threads = [] | |
end_event = [] | |
start_cond = threading.Condition() | |
started = False | |
if nthreads > 0: | |
# Warm up | |
func(*args) | |
results = [] | |
loop = TimedLoop(func, args) | |
ready = [] | |
ready_cond = threading.Condition() | |
def run(): | |
with ready_cond: | |
ready.append(None) | |
ready_cond.notify() | |
with start_cond: | |
while not started: | |
start_cond.wait() | |
loop(start_time, duration * 1.5, end_event, do_yield=False) | |
for i in range(nthreads): | |
threads.append(threading.Thread(target=run)) | |
for t in threads: | |
t.setDaemon(True) | |
t.start() | |
# Wait for threads to be ready | |
with ready_cond: | |
while len(ready) < nthreads: | |
ready_cond.wait() | |
# Run the client and wait for the first ping(s) to arrive before | |
# unblocking the background threads. | |
chunks = [] | |
process = run_latency_client(addr=sock.getsockname(), | |
nb_pings=nb_pings, interval=interval) | |
s = _recv(sock, 4096) | |
_time = time.time | |
with start_cond: | |
start_time = _time() | |
started = True | |
start_cond.notify(nthreads) | |
while LAT_END not in s: | |
s = _recv(sock, 4096) | |
t = _time() | |
chunks.append((t, s)) | |
# Tell the background threads to stop. | |
end_event.append(None) | |
for t in threads: | |
t.join() | |
process.wait() | |
for recv_time, chunk in chunks: | |
# NOTE: it is assumed that a line sent by a client wasn't received | |
# in two chunks because the lines are very small. | |
for line in chunk.splitlines(): | |
line = line.strip() | |
if line and line != LAT_END: | |
send_time = eval(line) | |
assert isinstance(send_time, float) | |
results.append((send_time, recv_time)) | |
return results | |
def run_latency_tests(max_threads): | |
for task in latency_tasks: | |
print("Background CPU task:", task.__doc__) | |
print() | |
func, args = task() | |
nthreads = 0 | |
while nthreads <= max_threads: | |
results = run_latency_test(func, args, nthreads) | |
n = len(results) | |
# We print out milliseconds | |
lats = [1000 * (t2 - t1) for (t1, t2) in results] | |
#print(list(map(int, lats))) | |
avg = sum(lats) / n | |
dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5 | |
print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="") | |
print() | |
#print(" [... from %d samples]" % n) | |
nthreads += 1 | |
print() | |
BW_END = "END" | |
def bandwidth_client(addr, packet_size, duration): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.bind(("127.0.0.1", 0)) | |
local_addr = sock.getsockname() | |
_time = time.time | |
_sleep = time.sleep | |
def _send_chunk(msg): | |
_sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr) | |
# We give the parent some time to be ready. | |
_sleep(1.0) | |
try: | |
start_time = _time() | |
end_time = start_time + duration * 2.0 | |
i = 0 | |
while _time() < end_time: | |
_send_chunk(str(i)) | |
s = _recv(sock, packet_size) | |
assert len(s) == packet_size | |
i += 1 | |
_send_chunk(BW_END) | |
finally: | |
sock.close() | |
def run_bandwidth_client(**kwargs): | |
cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] | |
cmd_line.extend(['--bwclient', repr(kwargs)]) | |
return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, | |
#stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
def run_bandwidth_test(func, args, nthreads): | |
# Create a listening socket to receive the packets. We use UDP which should | |
# be painlessly cross-platform. | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.bind(("127.0.0.1", 0)) | |
addr = sock.getsockname() | |
duration = BANDWIDTH_DURATION | |
packet_size = BANDWIDTH_PACKET_SIZE | |
results = [] | |
threads = [] | |
end_event = [] | |
start_cond = threading.Condition() | |
started = False | |
if nthreads > 0: | |
# Warm up | |
func(*args) | |
results = [] | |
loop = TimedLoop(func, args) | |
ready = [] | |
ready_cond = threading.Condition() | |
def run(): | |
with ready_cond: | |
ready.append(None) | |
ready_cond.notify() | |
with start_cond: | |
while not started: | |
start_cond.wait() | |
loop(start_time, duration * 1.5, end_event, do_yield=False) | |
for i in range(nthreads): | |
threads.append(threading.Thread(target=run)) | |
for t in threads: | |
t.setDaemon(True) | |
t.start() | |
# Wait for threads to be ready | |
with ready_cond: | |
while len(ready) < nthreads: | |
ready_cond.wait() | |
# Run the client and wait for the first packet to arrive before | |
# unblocking the background threads. | |
process = run_bandwidth_client(addr=addr, | |
packet_size=packet_size, | |
duration=duration) | |
_time = time.time | |
# This will also wait for the parent to be ready | |
s = _recv(sock, packet_size) | |
remote_addr = eval(s.partition('#')[0]) | |
with start_cond: | |
start_time = _time() | |
started = True | |
start_cond.notify(nthreads) | |
n = 0 | |
first_time = None | |
while not end_event and BW_END not in s: | |
_sendto(sock, s, remote_addr) | |
s = _recv(sock, packet_size) | |
if first_time is None: | |
first_time = _time() | |
n += 1 | |
end_time = _time() | |
end_event.append(None) | |
for t in threads: | |
t.join() | |
process.kill() | |
return (n - 1) / (end_time - first_time) | |
def run_bandwidth_tests(max_threads): | |
for task in bandwidth_tasks: | |
print("Background CPU task:", task.__doc__) | |
print() | |
func, args = task() | |
nthreads = 0 | |
baseline_speed = None | |
while nthreads <= max_threads: | |
results = run_bandwidth_test(func, args, nthreads) | |
speed = results | |
#speed = len(results) * 1.0 / results[-1][0] | |
print("CPU threads=%d: %.1f" % (nthreads, speed), end="") | |
if baseline_speed is None: | |
print(" packets/s.") | |
baseline_speed = speed | |
else: | |
print(" ( %d %%)" % (speed / baseline_speed * 100)) | |
nthreads += 1 | |
print() | |
def main(): | |
usage = "usage: %prog [-h|--help] [options]" | |
parser = OptionParser(usage=usage) | |
parser.add_option("-t", "--throughput", | |
action="store_true", dest="throughput", default=False, | |
help="run throughput tests") | |
parser.add_option("-l", "--latency", | |
action="store_true", dest="latency", default=False, | |
help="run latency tests") | |
parser.add_option("-b", "--bandwidth", | |
action="store_true", dest="bandwidth", default=False, | |
help="run I/O bandwidth tests") | |
parser.add_option("-i", "--interval", | |
action="store", type="int", dest="check_interval", default=None, | |
help="sys.setcheckinterval() value") | |
parser.add_option("-I", "--switch-interval", | |
action="store", type="float", dest="switch_interval", default=None, | |
help="sys.setswitchinterval() value") | |
parser.add_option("-n", "--num-threads", | |
action="store", type="int", dest="nthreads", default=4, | |
help="max number of threads in tests") | |
# Hidden option to run the pinging and bandwidth clients | |
parser.add_option("", "--latclient", | |
action="store", dest="latclient", default=None, | |
help=SUPPRESS_HELP) | |
parser.add_option("", "--bwclient", | |
action="store", dest="bwclient", default=None, | |
help=SUPPRESS_HELP) | |
options, args = parser.parse_args() | |
if args: | |
parser.error("unexpected arguments") | |
if options.latclient: | |
kwargs = eval(options.latclient) | |
latency_client(**kwargs) | |
return | |
if options.bwclient: | |
kwargs = eval(options.bwclient) | |
bandwidth_client(**kwargs) | |
return | |
if not options.throughput and not options.latency and not options.bandwidth: | |
options.throughput = options.latency = options.bandwidth = True | |
if options.check_interval: | |
sys.setcheckinterval(options.check_interval) | |
if options.switch_interval: | |
sys.setswitchinterval(options.switch_interval) | |
print("== %s %s (%s) ==" % ( | |
platform.python_implementation(), | |
platform.python_version(), | |
platform.python_build()[0], | |
)) | |
# Processor identification often has repeated spaces | |
cpu = ' '.join(platform.processor().split()) | |
print("== %s %s on '%s' ==" % ( | |
platform.machine(), | |
platform.system(), | |
cpu, | |
)) | |
print() | |
if options.throughput: | |
print("--- Throughput ---") | |
print() | |
run_throughput_tests(options.nthreads) | |
if options.latency: | |
print("--- Latency ---") | |
print() | |
run_latency_tests(options.nthreads) | |
if options.bandwidth: | |
print("--- I/O bandwidth ---") | |
print() | |
run_bandwidth_tests(options.nthreads) | |
if __name__ == "__main__": | |
main() |