Mandelbrot threads
In [1]:
Copied!
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
In [2]:
Copied!
# Setting parameters (these values can be changed)
nrows = 800
ncols = 800
xcenter = -0.4601222
ycenter = .570286
bound = .002
x_domain, y_domain = np.linspace(xcenter-bound, xcenter+bound, ncols), np.linspace(ycenter-bound, ycenter+bound, nrows)
max_iterations = 2000 # any positive integer value
colormap = "nipy_spectral" # set to any matplotlib valid colormap
def mandelbrot(x, y):
z = 0
p = 2
c = complex(x, y)
for iteration_number in range(max_iterations):
if abs(z) >= 2:
return iteration_number
z = z**p + c
return 0
def plot_data(iteration_array):
# Plotting the data
ax = plt.axes()
ax.set_aspect("equal")
graph = ax.pcolormesh(x_domain, y_domain, iteration_array, cmap=colormap,
norm=matplotlib.colors.LogNorm())
plt.colorbar(graph)
plt.xlabel("Real-Axis")
plt.ylabel("Imaginary-Axis")
plt.show()
iteration_array = np.zeros((nrows, ncols))
# Setting parameters (these values can be changed)
nrows = 800
ncols = 800
xcenter = -0.4601222
ycenter = .570286
bound = .002
x_domain, y_domain = np.linspace(xcenter-bound, xcenter+bound, ncols), np.linspace(ycenter-bound, ycenter+bound, nrows)
max_iterations = 2000 # any positive integer value
colormap = "nipy_spectral" # set to any matplotlib valid colormap
def mandelbrot(x, y):
z = 0
p = 2
c = complex(x, y)
for iteration_number in range(max_iterations):
if abs(z) >= 2:
return iteration_number
z = z**p + c
return 0
def plot_data(iteration_array):
# Plotting the data
ax = plt.axes()
ax.set_aspect("equal")
graph = ax.pcolormesh(x_domain, y_domain, iteration_array, cmap=colormap,
norm=matplotlib.colors.LogNorm())
plt.colorbar(graph)
plt.xlabel("Real-Axis")
plt.ylabel("Imaginary-Axis")
plt.show()
iteration_array = np.zeros((nrows, ncols))
In [3]:
Copied!
%%timeit -n 1 -r 3 -v runtime
for (i, x) in enumerate(x_domain):
for (j, y) in enumerate(y_domain):
iteration_array[j, i] = mandelbrot(x, y)
%%timeit -n 1 -r 3 -v runtime
for (i, x) in enumerate(x_domain):
for (j, y) in enumerate(y_domain):
iteration_array[j, i] = mandelbrot(x, y)
12 s ± 14.5 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
In [4]:
Copied!
times_serial = {1: runtime}
times_serial = {1: runtime}
In [5]:
Copied!
plot_data(iteration_array)
plot_data(iteration_array)
In [6]:
Copied!
import sys
print(sys._is_gil_enabled())
import sys
print(sys._is_gil_enabled())
False
In [7]:
Copied!
import os
ncpus = os.cpu_count()
# Adjust this to fit your host. Not easy to detect it manually.
# Set to 1 if hyperthreading is disabled.
hyperthreads_per_core = 2
# Number of rows to dispatch to each task
chunk_size = 4
num_workers_domain = [1] + list(range(2, ncpus + 1, 2))
import os
ncpus = os.cpu_count()
# Adjust this to fit your host. Not easy to detect it manually.
# Set to 1 if hyperthreading is disabled.
hyperthreads_per_core = 2
# Number of rows to dispatch to each task
chunk_size = 4
num_workers_domain = [1] + list(range(2, ncpus + 1, 2))
In [8]:
Copied!
def process_worker(j_y, x_domain, mandelbrot):
ret = np.empty((len(j_y), x_domain.shape[0]))
for (i, x) in enumerate(x_domain):
for (j, y) in j_y:
ret[j-j_y[0][0], i] = mandelbrot(x, y)
return ret
def process_worker(j_y, x_domain, mandelbrot):
ret = np.empty((len(j_y), x_domain.shape[0]))
for (i, x) in enumerate(x_domain):
for (j, y) in j_y:
ret[j-j_y[0][0], i] = mandelbrot(x, y)
return ret
In [9]:
Copied!
import itertools
from joblib import Parallel, delayed
def run_process_pool(num_workers):
chunks = itertools.batched(enumerate(y_domain), chunk_size, strict=True)
return Parallel(n_jobs=num_workers)(
delayed(process_worker)(arg, x_domain, mandelbrot) for arg in chunks
)
times_processes = {}
for i in num_workers_domain:
iteration_array = np.zeros((nrows, ncols))
print(f"{i} workers:")
%timeit -n 1 -r 3 -v runtime np.vstack(run_process_pool(i))
times_processes[i] = runtime
import itertools
from joblib import Parallel, delayed
def run_process_pool(num_workers):
chunks = itertools.batched(enumerate(y_domain), chunk_size, strict=True)
return Parallel(n_jobs=num_workers)(
delayed(process_worker)(arg, x_domain, mandelbrot) for arg in chunks
)
times_processes = {}
for i in num_workers_domain:
iteration_array = np.zeros((nrows, ncols))
print(f"{i} workers:")
%timeit -n 1 -r 3 -v runtime np.vstack(run_process_pool(i))
times_processes[i] = runtime
1 workers: 12.2 s ± 69.9 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 2 workers: 6.77 s ± 167 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 4 workers: 3.54 s ± 80.7 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 6 workers: 2.3 s ± 127 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 8 workers: 1.84 s ± 165 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 10 workers: 1.5 s ± 114 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 12 workers: 1.31 s ± 130 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 14 workers: 1.15 s ± 38.5 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 16 workers: 1.07 s ± 55.7 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 18 workers: 1.14 s ± 161 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 20 workers: 1.01 s ± 27.5 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 22 workers: 880 ms ± 26.6 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 24 workers: 862 ms ± 16.1 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 26 workers: 845 ms ± 13.8 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 28 workers: 790 ms ± 10.7 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 30 workers: 780 ms ± 8.56 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 32 workers: 768 ms ± 12.7 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
In [10]:
Copied!
def thread_worker(j_y):
ret = np.empty((len(j_y), x_domain.shape[0]))
for (i, x) in enumerate(x_domain):
for (j, y) in j_y:
iteration_array[j, i] = mandelbrot(x, y)
def thread_worker(j_y):
ret = np.empty((len(j_y), x_domain.shape[0]))
for (i, x) in enumerate(x_domain):
for (j, y) in j_y:
iteration_array[j, i] = mandelbrot(x, y)
In [11]:
Copied!
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import itertools
def run_thread_pool(num_workers):
with ThreadPoolExecutor(max_workers=num_workers) as tpe:
chunks = itertools.batched(enumerate(y_domain), chunk_size, strict=True)
try:
futures = [tpe.submit(thread_worker, arg) for arg in chunks]
# block until all work finishes
concurrent.futures.wait(futures)
finally:
# check for exceptions in worker threads
[f.result() for f in futures]
times_threads = {}
for i in num_workers_domain:
iteration_array = np.zeros((nrows, ncols))
print(f"{i} workers:")
%timeit -n 1 -r 3 -v runtime run_thread_pool(i)
times_threads[i] = runtime
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import itertools
def run_thread_pool(num_workers):
with ThreadPoolExecutor(max_workers=num_workers) as tpe:
chunks = itertools.batched(enumerate(y_domain), chunk_size, strict=True)
try:
futures = [tpe.submit(thread_worker, arg) for arg in chunks]
# block until all work finishes
concurrent.futures.wait(futures)
finally:
# check for exceptions in worker threads
[f.result() for f in futures]
times_threads = {}
for i in num_workers_domain:
iteration_array = np.zeros((nrows, ncols))
print(f"{i} workers:")
%timeit -n 1 -r 3 -v runtime run_thread_pool(i)
times_threads[i] = runtime
1 workers: 12 s ± 113 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 2 workers: 6.15 s ± 39.2 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 4 workers: 3.13 s ± 11.8 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 6 workers: 2.11 s ± 11.9 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 8 workers: 1.6 s ± 7.43 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 10 workers: 1.29 s ± 4.75 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 12 workers: 1.07 s ± 1.58 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 14 workers: 944 ms ± 358 μs per loop (mean ± std. dev. of 3 runs, 1 loop each) 16 workers: 869 ms ± 40.8 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 18 workers: 886 ms ± 25.7 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 20 workers: 906 ms ± 16.1 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 22 workers: 930 ms ± 33.4 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 24 workers: 919 ms ± 4.01 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 26 workers: 921 ms ± 12.8 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 28 workers: 942 ms ± 5.99 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 30 workers: 918 ms ± 33.9 ms per loop (mean ± std. dev. of 3 runs, 1 loop each) 32 workers: 934 ms ± 12.6 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
In [12]:
Copied!
def get_results(times):
all_runtimes = np.array(list(time.all_runs for time in times.values()))
runtimes = np.mean(all_runtimes, axis=1)
err = np.std(all_runtimes, axis=1)
nworkers = np.array(list(times))
return {'runtimes': runtimes, 'err': err, 'nworkers': nworkers}
def get_results(times):
all_runtimes = np.array(list(time.all_runs for time in times.values()))
runtimes = np.mean(all_runtimes, axis=1)
err = np.std(all_runtimes, axis=1)
nworkers = np.array(list(times))
return {'runtimes': runtimes, 'err': err, 'nworkers': nworkers}
In [15]:
Copied!
baseline = times_serial[1].average
for label, times in (
# ("serial", times_serial),
("threads", times_threads),
("processes", times_processes),
):
results = get_results(times)
plt.errorbar(
results['nworkers'],
baseline/results['runtimes'],
yerr=results['err'],
fmt='o',
capsize=5,
label=label,
)
nworkers = get_results(times_threads)['nworkers']
plt.plot(nworkers, nworkers, label="perfect scaling")
plt.plot(
[ncpus // hyperthreads_per_core, ncpus // hyperthreads_per_core],
[0, ncpus + 1],
linestyle="--",
color="grey",
label="# of physical CPUs on testing machine",
)
plt.legend()
plt.xlim(0, ncpus + 1)
plt.ylim(0, ncpus + 1)
plt.xlabel("Number of worker threads")
plt.ylabel("Parallel speedup ($t_1/t_N$)")
plt.show()
baseline = times_serial[1].average
for label, times in (
# ("serial", times_serial),
("threads", times_threads),
("processes", times_processes),
):
results = get_results(times)
plt.errorbar(
results['nworkers'],
baseline/results['runtimes'],
yerr=results['err'],
fmt='o',
capsize=5,
label=label,
)
nworkers = get_results(times_threads)['nworkers']
plt.plot(nworkers, nworkers, label="perfect scaling")
plt.plot(
[ncpus // hyperthreads_per_core, ncpus // hyperthreads_per_core],
[0, ncpus + 1],
linestyle="--",
color="grey",
label="# of physical CPUs on testing machine",
)
plt.legend()
plt.xlim(0, ncpus + 1)
plt.ylim(0, ncpus + 1)
plt.xlabel("Number of worker threads")
plt.ylabel("Parallel speedup ($t_1/t_N$)")
plt.show()
In [ ]:
Copied!