Tutorials

In this section we are going through a few use cases for pyDive. If you want to test the code you can download the sample hdf5-file. It has the following dataset structure:

$ h5ls -r sample.h5
/                        Group
/fields                  Group
/fields/fieldB           Group
/fields/fieldB/z         Dataset {256, 256}
/fields/fieldE           Group
/fields/fieldE/x         Dataset {256, 256}
/fields/fieldE/y         Dataset {256, 256}
/particles               Group
/particles/cellidx       Group
/particles/cellidx/x     Dataset {10000}
/particles/cellidx/y     Dataset {10000}
/particles/pos           Group
/particles/pos/x         Dataset {10000}
/particles/pos/y         Dataset {10000}
/particles/vel           Group
/particles/vel/x         Dataset {10000}
/particles/vel/y         Dataset {10000}
/particles/vel/z         Dataset {10000}

After launching the cluster (Setup an IPython.parallel cluster configuration) the first step is to initialize pyDive:

import pyDive
pyDive.init()

Load a single dataset:

h5fieldB_z = pyDive.h5.open("sample.h5", "/fields/fieldB/z", distaxes='all')

assert type(h5fieldB_z) is pyDive.h5.h5_ndarray

h5fieldB_z just holds a dataset handle. To read out data into memory call load():

fieldB_z = h5fieldB_z.load()

assert type(fieldB_z) is pyDive.ndarray

This loads the entire dataset into the main memory of all engines. The array elements are distributed along all axes.

We can also load a hdf5-group:

h5fieldE = pyDive.h5.open("sample.h5", "/fields/fieldE", distaxes='all')
fieldE = h5fieldE.load()

h5fieldE and fieldE are some so called “virtual array-of-structures”, see: pyDive.structered.

>>> print h5fieldE
VirtualArrayOfStructs<array-type: <class 'pyDive.distribution.multiple_axes.h5_ndarray'>, shape: [256, 256]>:
  y -> float32
  x -> float32

>>> print fieldE
VirtualArrayOfStructs<array-type: <class 'pyDive.distribution.multiple_axes.ndarray'>, shape: [256, 256]>:
  y -> float32
  x -> float32

Now, let’s do some calculations!

Example 1: Total field energy

Computing the total field energy of an electromagnetic field means squaring and summing or in pyDive’s words:

import pyDive
import numpy as np
pyDive.init()

h5input = "sample.h5"

h5fields = pyDive.h5.open(h5input, "/fields") # defaults to distaxes='all'
fields = h5fields.load() # read out all fields into cluster's main memory in parallel

energy_field = fields.fieldE.x**2 + fields.fieldE.y**2 + fields.fieldB.z**2

total_energy = pyDive.reduce(energy_field, np.add)
print total_energy

Output:

$ python example1.py
557502.0

Well this was just a very small hdf5-sample of 1.3 MB however in real world we deal with a lot greater data volumes. So what happens if h5fields is too large to be stored in the main memory of the whole cluster? The line fields = h5fields.load() will crash. In this case we want to load the hdf5 data piece by piece. The function pyDive.fragment helps us doing so:

import pyDive
import numpy as np
pyDive.init()

h5input = "sample.h5"

big_h5fields = pyDive.h5.open(h5input, "/fields")
# big_h5fields.load() # would cause a crash

total_energy = 0.0
for h5fields in pyDive.fragment(big_h5fields):
    fields = h5fields.load()

    energy_field = fields.fieldE.x**2 + fields.fieldE.y**2 + fields.fieldB.z**2

    total_energy += pyDive.reduce(energy_field, np.add)

print total_energy

An equivalent way to get this result is a pyDive.mapReduce:

...
def square_fields(h5fields):
    fields = h5fields.load()
    return fields.fieldE.x**2 + fields.fieldE.y**2 + fields.fieldB.z**2

total_energy = pyDive.mapReduce(square_fields, np.add, h5fields)
print total_energy

square_fields is called on each engine where h5fields is a structure (pyDive.arrayOfStructs) of h5_ndarrays representing a sub part of the big h5fields. pyDive.algorithm.mapReduce() can be called with an arbitrary number of arrays including pyDive.ndarrays, pyDive.h5.h5_ndarrays, pyDive.adios.ad_ndarrays and pyDive.cloned_ndarrays. If there are pyDive.h5.h5_ndarrays or pyDive.adios.ad_ndarrays it will check whether they fit into the combined main memory of all cluster nodes as a whole and loads them piece by piece if not.

Now let’s say our dataset is really big and we just want to get a first estimate of the total energy:

...
total_energy = pyDive.mapReduce(square_fields, np.add, h5fields[::10, ::10]) * 10.0**2

Slicing on pyDive-arrays is always allowed.

If you use picongpu here is an example of how to get the total field energy for each timestep (see pyDive.picongpu):

import pyDive
import numpy as np
pyDive.init()

def square_field(h5field):
    field = h5field.load()
    return field.x**2 + field.x**2 + field.x**2

for step, h5field in pyDive.picongpu.loadAllSteps("/.../simOutput", "fields/FieldE"):
    total_energy = pyDive.mapReduce(square_field, np.add, h5field)

    print step, total_energy

Example 2: Particle density field

Given the list of particles in our sample.h5 we want to create a 2D density field out of it. For this particle-to-mesh mapping we need to apply a certain particle shape like cloud-in-cell (CIC), triangular-shaped-cloud (TSC), and so on. A list of these together with the actual mapping functions can be found in the pyDive.mappings module. If you miss a shape you can easily create one by your own by defining a particle shape function. Note that if you have numba installed the shape function will be compiled resulting in a significant speed-up.

We assume that the particle positions are distributed randomly. This means although each engine is loading a separate part of all particles it needs to write to the entire density field. Therefore the density field must have a whole representation on each participating engine. This is the job of pyDive.cloned_ndarray.cloned_ndarray.cloned_ndarray.

import pyDive
import numpy as np
pyDive.init()

shape = [256, 256]
density = pyDive.cloned.zeros(shape)

h5input = "sample.h5"

particles = pyDive.h5.open(h5input, "/particles")

def particles2density(particles, density):
    particles = particles.load()
    total_pos = particles.cellidx.astype(np.float32) + particles.pos

    # convert total_pos to an (N, 2) shaped array
    total_pos = np.hstack((total_pos.x[:,np.newaxis],
                           total_pos.y[:,np.newaxis]))

    par_weighting = np.ones(particles.shape)
    import pyDive.mappings
    pyDive.mappings.particles2mesh(density, par_weighting, total_pos, pyDive.mappings.CIC)

pyDive.map(particles2density, particles, density)

final_density = density.sum() # add up all local copies

from matplotlib import pyplot as plt
plt.imshow(final_density)
plt.show()

Output:

_images/density.png

Here, as in the first example, particles2density is a function executed on the engines by pyDive.algorithm.map(). All of its arguments are numpy-arrays or structures (pyDive.arrayOfStructs) of numpy-arrays.

pyDive.algorithm.map() can also be used as a decorator:

@pyDive.map
def particles2density(particles, density):
    ...

particles2density(particles, density)

Example 3: Particle energy spectrum

import pyDive
import numpy as np
pyDive.init()

bins = 256
spectrum = pyDive.cloned.zeros([bins])

h5input = "sample.h5"

velocities = pyDive.h5.open(h5input, "/particles/vel")

@pyDive.map
def vel2spectrum(velocities, spectrum, bins):
    velocities = velocities.load()
    mass = 1.0
    energies = 0.5 * mass * (velocities.x**2 + velocities.y**2 + velocities.z**2)

    spectrum[:], bin_edges = np.histogram(energies, bins)

vel2spectrum(velocities, spectrum, bins=bins)

final_spectrum = spectrum.sum() # add up all local copies

from matplotlib import pyplot as plt
plt.plot(final_spectrum)
plt.show()

Output:

_images/spectrum.png