McUtils.Parallelizers
Provides utilities for setting up platform-independent parallelism in a hopefully unobtrusive way.
This is used more extensively in Psience
, but the design is to unify the MPI and multiprocessing
APIs
so that one can simply pass in a Parallelizer
object to a function and obtain parallelism over as
many processes as that object supports.
As a fallthrough, a SerialNonParallelizer
is provided as a subclass that handles serial evaluation with
the same API so fewer special cases need to be checked.
Any function that supports parallelism should take the parallelizer
keyword, which will be fed
the Parallelizer
object itself.
Members
Examples
The simplest parallelism is just parallelizing with multiprocessing
over a single function
def run_job(parallelizer=None):
if parallelizer.on_main:
data = np.arange(1000)
else:
data = None
if parallelizer.on_main:
flag = "woop"
else:
flag = None
test = parallelizer.broadcast(flag) # send a flag from the main process to all the workers
data = parallelizer.scatter(data)
lens = parallelizer.gather(len(data))
return lens
MultiprocessingParallelizer().run(run_job)
[67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 66, 66, 66, 66, 66]
This will make sure a Pool
of workers gets set up and will create communication channels from the main process to the works, then each process will run run_job
, spreading the data out with scatter
and bringing it back with gather
.
This paradigm can be handled more simply with map
.
Here we’ll map a function over blocks of data
def mapped_func(self, data):
return 1 + data
def map_applier(n=10, parallelizer=None):
if parallelizer.on_main:
data = np.arange(n)
else:
data = None
return parallelizer.map(mapped_func, data)
MultiprocessingParallelizer().run(map_applier)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
but all of these will work equivalently well if the parallelizer
were a MPIParallelizer
instead (with correct run setup).
This also adapts itself well to more object-oriented solutions. Here’s a sample class that can effectively use a parallelizer
class SampleProgram:
def __init__(self, nvals=1000, parallelizer=None):
if not isinstance(parallelizer, Parallelizer):
parallelizer = Parallelizer.lookup(parallelizer) # `Parallelizer` supports a registry in case you want to give a name
self.par = parallelizer
self.nvals = nvals
def initialize_data(self):
data = np.random.rand(self.nvals)
# could be more expensive too
return data
def eval_parallel(self, data, parallelizer=None):
data = parallelizer.scatter(data)
# this would usually be much more sophisticated
res = data**2
return parallelizer.gather(res)
@Parallelizer.main_restricted
def run_main(self, parallelizer=None):
"""
A function to be run by the main processes, setting
up data, scattering, gathering, and post-processing
"""
data = self.initialize_data()
vals = self.eval_parallel(data, parallelizer=parallelizer)
post_process = np.sqrt(vals)
return post_process
@Parallelizer.worker_restricted
def run_worker(self, parallelizer=None):
"""
A function to be run by the worker processes, really
just doing the parallel work
"""
self.eval_parallel(None, parallelizer=parallelizer)
def run_par(self, parallelizer=None):
"""
Something to be called by all processes
"""
self.run_worker(parallelizer=parallelizer)
return self.run_main(parallelizer=parallelizer)
def run(self):
"""
Boilerplate runner
"""
print("Running with {}".format(self.par))
return self.par.run(self.run_par)
and we can easily add in a parallelizer
at run time.
First serial evaluation
SampleProgram(nvals=10).run()
Running with SerialNonParallelizer(id=0, nprocs=1)
array([0.08772434, 0.18266685, 0.11234067, 0.4918653 , 0.30925003,
0.43065691, 0.8271145 , 0.52147149, 0.13801914, 0.92917295])
but adding in parallelism is straightforward
SampleProgram(nvals=10, parallelizer=MultiprocessingParallelizer()).run()
Running with MultiprocessingParallelizer(id=None, nprocs=None)
array([0.5852531 , 0.63836097, 0.40315219, 0.04769397, 0.5226616 ,
0.68647924, 0.30869102, 0.01006922, 0.07439768, 0.83100183])
To support MPI-style calling, a ClientServerRunner
is also provided.