Source code for pyDive.algorithm

"""
Copyright 2014 Heiko Burau

This file is part of pyDive.

pyDive is free software: you can redistribute it and/or modify
it under the terms of of either the GNU General Public License or
the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyDive is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License and the GNU Lesser General Public License
for more details.

You should have received a copy of the GNU General Public License
and the GNU Lesser General Public License along with pyDive.
If not, see <http://www.gnu.org/licenses/>.
"""
__doc__ = None

import os
# check whether this code is executed on target or not
onTarget = os.environ.get("onTarget", 'False')
if onTarget == 'False':
    from arrays.ndarray import ndarray
    from arrays.h5_ndarray import h5_ndarray
    from cloned_ndarray.cloned_ndarray import cloned_ndarray
    import IPParallelClient as com
    from IPython.parallel import interactive
    from fragment import fragment, hdd_arraytypes
    from structured import VirtualArrayOfStructs
import numpy as np

[docs]def map(f, *arrays, **kwargs): """Applies *f* on :term:`engine` on local arrays related to *arrays*. Example: :: cluster_array = pyDive.ones(shape=[100], distaxes=0) cluster_array *= 2.0 # equivalent to pyDive.map(lambda a: a *= 2.0, cluster_array) # a is the local numpy-array of *cluster_array* Or, as a decorator: :: @pyDive.map def twice(a): a *= 2.0 twice(cluster_array) :param callable f: function to be called on :term:`engine`. Has to accept *numpy-arrays* and *kwargs* :param arrays: list of arrays including *pyDive.ndarrays*, *pyDive.h5_ndarrays* or *pyDive.cloned_ndarrays* :param kwargs: user-specified keyword arguments passed to *f* :raises AssertionError: if the *shapes* of *pyDive.ndarrays* and *pyDive.h5_ndarrays* do not match :raises AssertionError: if the *distaxes* attributes of *pyDive.ndarrays* and *pyDive.h5_ndarrays* do not match Notes: - If the hdf5 data exceeds the memory limit (currently 25% of the combined main memory of all cluster nodes)\ the data will be read block-wise so that a block fits into memory. - *map* chooses the list of *engines* from the **first** element of *arrays*. On these engines *f* is called.\ If the first array is a *pyDive.h5_ndarray* all engines will be used. - *map* is not writing data back to a *pyDive.h5_ndarray* yet. - *map* does not equalize the element distribution of *pyDive.ndarrays* before execution. """ if not arrays: # decorator mode def map_deco(*arrays, **kwargs): map(f, *arrays, **kwargs) return map_deco def map_wrapper(f, array_names, **kwargs): arrays = [globals()[array_name] for array_name in array_names] f(*arrays, **kwargs) view = com.getView() tmp_targets = view.targets # save current target list if type(arrays[0]) == VirtualArrayOfStructs: view.targets = arrays[0].firstArray.target_ranks else: view.targets = arrays[0].target_ranks hdd_arrays = [a for a in arrays if (hasattr(a, "arraytype") and a.arraytype in hdd_arraytypes) or type(a) in hdd_arraytypes] if hdd_arrays: cloned_arrays = [a for a in arrays if (hasattr(a, "arraytype") and a.arraytype is cloned_ndarray) or type(a) is cloned_ndarray] other_arrays = [a for a in arrays if not ((hasattr(a, "arraytype") and a.arraytype is cloned_ndarray) or type(a) is cloned_ndarray)] cloned_arrays_ids = [id(a) for a in cloned_arrays] other_arrays_ids = [id(a) for a in other_arrays] for fragments in fragment(*other_arrays): it_other_arrays = iter(other_arrays) it_cloned_arrays = iter(cloned_arrays) array_names = [] for a in arrays: if id(a) in cloned_arrays_ids: array_names.append(repr(it_cloned_arrays.next())) continue if id(a) in other_arrays_ids: array_names.append(repr(it_other_arrays.next())) continue view.apply(interactive(map_wrapper), interactive(f), array_names, **kwargs) else: array_names = [repr(a) for a in arrays] view.apply(interactive(map_wrapper), interactive(f), array_names, **kwargs) view.targets = tmp_targets # restore target list
[docs]def reduce(array, op): """Perform a tree-like reduction over all axes of *array*. :param array: *pyDive.ndarray*, *pyDive.h5_ndarray* or *pyDive.cloned_ndarray* to be reduced :param numpy-ufunc op: reduce operation, e.g. *numpy.add*. If the hdf5 data exceeds the memory limit (currently 25% of the combined main memory of all cluster nodes)\ the data will be read block-wise so that a block fits into memory. """ def reduce_wrapper(array_name, op_name): array = globals()[array_name] op = eval("np." + op_name) return algorithm.__tree_reduce(array, axis=None, op=op) # reduction over all axes view = com.getView() tmp_targets = view.targets # save current target list if type(array) == VirtualArrayOfStructs: view.targets = array.firstArray.target_ranks else: view.targets = array.target_ranks result = None if (hasattr(array, "arraytype") and array.arraytype in hdd_arraytypes) or type(array) in hdd_arraytypes: for chunk in fragment(array): array_name = repr(chunk) targets_results = view.apply(interactive(reduce_wrapper), array_name, op.__name__) chunk_result = op.reduce(targets_results) # reduce over targets' results if result is None: result = chunk_result else: result = op(result, chunk_result) else: array_name = repr(array) targets_results = view.apply(interactive(reduce_wrapper), array_name, op.__name__) result = op.reduce(targets_results) # reduce over targets' results view.targets = tmp_targets # restore target list return result
[docs]def mapReduce(map_func, reduce_op, *arrays, **kwargs): """Applies *map_func* on :term:`engine` on local arrays related to *arrays* and reduces its result in a tree-like fashion over all axes. Example: :: cluster_array = pyDive.ones(shape=[100], distaxes=0) s = pyDive.mapReduce(lambda a: a**2, np.add, cluster_array) # a is the local numpy-array of *cluster_array* assert s == 100 :param callable f: function to be called on :term:`engine`. Has to accept *numpy-arrays* and *kwargs* :param numpy-ufunc reduce_op: reduce operation, e.g. *numpy.add*. :param arrays: list of arrays including *pyDive.ndarrays*, *pyDive.h5_ndarrays* or *pyDive.cloned_ndarrays* :param kwargs: user-specified keyword arguments passed to *f* :raises AssertionError: if the *shapes* of *pyDive.ndarrays* and *pyDive.h5_ndarrays* do not match :raises AssertionError: if the *distaxes* attributes of *pyDive.ndarrays* and *pyDive.h5_ndarrays* do not match Notes: - If the hdf5 data exceeds the memory limit (currently 25% of the combined main memory of all cluster nodes)\ the data will be read block-wise so that a block fits into memory. - *mapReduce* chooses the list of *engines* from the **first** element of *arrays*. On these engines the mapReduce will be executed.\ If the first array is a *pyDive.h5_ndarray* all engines will be used. - *mapReduce* is not writing data back to a *pyDive.h5_ndarray* yet. - *mapReduce* does not equalize the element distribution of *pyDive.ndarrays* before execution. """ def mapReduce_wrapper(map_func, reduce_op_name, array_names, **kwargs): arrays = [globals()[array_name] for array_name in array_names] reduce_op = eval("np." + reduce_op_name) return algorithm.__tree_reduce(map_func(*arrays, **kwargs), axis=None, op=reduce_op) view = com.getView() tmp_targets = view.targets # save current target list if type(arrays[0]) == VirtualArrayOfStructs: view.targets = arrays[0].firstArray.target_ranks else: view.targets = arrays[0].target_ranks result = None hdd_arrays = [a for a in arrays if (hasattr(a, "arraytype") and a.arraytype in hdd_arraytypes) or type(a) in hdd_arraytypes] if hdd_arrays: cloned_arrays = [a for a in arrays if (hasattr(a, "arraytype") and a.arraytype is cloned_ndarray) or type(a) is cloned_ndarray] other_arrays = [a for a in arrays if not ((hasattr(a, "arraytype") and a.arraytype is cloned_ndarray) or type(a) is cloned_ndarray)] cloned_arrays_ids = [id(a) for a in cloned_arrays] other_arrays_ids = [id(a) for a in other_arrays] for fragments in fragment(*other_arrays): it_other_arrays = iter(other_arrays) it_cloned_arrays = iter(cloned_arrays) array_names = [] for a in arrays: if id(a) in cloned_arrays_ids: array_names.append(repr(it_cloned_arrays.next())) continue if id(a) in other_arrays_ids: array_names.append(repr(it_other_arrays.next())) continue targets_results = view.apply(interactive(mapReduce_wrapper),\ interactive(map_func), reduce_op.__name__, array_names, **kwargs) fragment_result = reduce_op.reduce(targets_results) # reduce over targets' results if result is None: result = fragment_result else: result = reduce_op(result, fragment_result) else: array_names = [repr(a) for a in arrays] targets_results = view.apply(interactive(mapReduce_wrapper),\ interactive(map_func), reduce_op.__name__, array_names, **kwargs) result = reduce_op.reduce(targets_results) # reduce over targets' results view.targets = tmp_targets # restore target list return result
def __tree_reduce(array, axis=None, op=np.add): # reduce all axes if axis is None: result = array for axis in range(len(array.shape))[::-1]: result = __tree_reduce(result, axis=axis, op=op) return result assert 0 <= axis and axis < len(array.shape) result = np.rollaxis(array, axis) while True: l = result.shape[0] # if axis is small enough to neglect rounding errors do a faster numpy-reduce if l < 10000: return op.reduce(result, axis=0) if l % 2 == 0: result = op(result[0:l/2], result[l/2:]) else: tmp = result[-1] result = op(result[0:l/2], result[l/2:-1]) result[0] = op(result[0], tmp)