Parallel processing with a virtual HDF5 dataset

This example demonstrates splitting up some data to be processed by several worker processes, and collecting the results back together.

For this example, we’ll use data from an XGM, and find the average intensity of each pulse across all the trains in the run. This doesn’t actually need parallel processing: we can easily do it directly in the notebook. But the same techniques should work with much more data and more complex calculations.

[1]:
from extra_data import RunDirectory
import multiprocessing
import numpy as np

The data that we want is separated over these seven sequence files:

[2]:
!ls /gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/RAW-R0034-DA01-S*.h5
/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/RAW-R0034-DA01-S00000.h5
/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/RAW-R0034-DA01-S00001.h5
/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/RAW-R0034-DA01-S00002.h5
/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/RAW-R0034-DA01-S00003.h5
/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/RAW-R0034-DA01-S00004.h5
/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/RAW-R0034-DA01-S00005.h5
/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/RAW-R0034-DA01-S00006.h5
[3]:
run = RunDirectory('/gpfs/exfel/exp/XMPL/201750/p700000/raw/r0002/')

By making a virtual dataset, we can see the shape of it, as if it was one big numpy array:

[4]:
vds_filename = 'xgm_vds.h5'
xgm_vds = run.get_virtual_dataset(
    'SA1_XTD2_XGM/XGM/DOOCS:output', 'data.intensityTD',
    filename=vds_filename
)
xgm_vds
[4]:
<HDF5 dataset "intensityTD": shape (3391, 1000), type "<f4">

Let’s read this into memory and calculate the means directly, to check our parallel calculations against. We can do this for this example because the calculation is simple and the data is small; it wouldn’t be practical in real situations where parallelisation is useful.

These data are recorded in 32-bit floats, but to minimise rounding errors we’ll tell numpy to give the results as 64-bit floats. Try re-running this example with 32-bit floats to see how much the results change!

[5]:
simple_mean = xgm_vds[:, :40].mean(axis=0, dtype=np.float64)
simple_mean.round(4)
[5]:
array([834.2744, 860.0754, 869.2637, 891.4351, 899.6227, 899.3759,
       900.3555, 899.1162, 898.4991, 904.4979, 910.5669, 914.1612,
       922.5737, 925.8734, 930.093 , 935.3124, 938.9643, 941.4609,
       946.1351, 950.6574, 951.855 , 954.2491, 956.6414, 957.5584,
       961.7528, 961.1457, 958.9655, 957.6415, 953.8603, 947.9236,
         0.    ,   0.    ,   0.    ,   0.    ,   0.    ,   0.    ,
         0.    ,   0.    ,   0.    ,   0.    ])

Now, we’re going to define chunks of the data for each of 4 worker processes.

[6]:
N_proc = 4
cuts = [int(xgm_vds.shape[0] * i / N_proc) for i in range(N_proc + 1)]
chunks = list(zip(cuts[:-1], cuts[1:]))
chunks
[6]:
[(0, 847), (847, 1695), (1695, 2543), (2543, 3391)]

Using multiprocessing

This is the function we’ll ask each worker process to run, adding up the data and returning a 1D numpy array.

We’re using default arguments as a convenient way to copy the filename and the dataset path into the worker process.

[7]:
def sum_chunk(chunk, filename=vds_filename, ds_name=xgm_vds.name):
    start, end = chunk
    # Reopen the file in the worker process:
    import h5py, numpy
    with h5py.File(filename, 'r') as f:
        ds = f[ds_name]
        data = ds[start:end]  # Read my chunk

    return data.sum(axis=0, dtype=numpy.float64)

Using Python’s multiprocessing module, we start four workers, farm the chunks out to them, and collect the results back.

[8]:
with multiprocessing.Pool(N_proc) as pool:
    res = pool.map(sum_chunk, chunks)

res is now a list of 4 arrays, containing the sums from each chunk. To get the mean, we’ll add these up to get a grand total, and then divide by the number of trains we have data from.

[9]:
multiproc_mean = (np.stack(res).sum(axis=0, dtype=np.float64)[:40] / xgm_vds.shape[0])
np.testing.assert_allclose(multiproc_mean, simple_mean)

multiproc_mean.round(4)
[9]:
array([834.2744, 860.0754, 869.2637, 891.4351, 899.6227, 899.3759,
       900.3555, 899.1162, 898.4991, 904.4979, 910.5669, 914.1612,
       922.5737, 925.8734, 930.093 , 935.3124, 938.9643, 941.4609,
       946.1351, 950.6574, 951.855 , 954.2491, 956.6414, 957.5584,
       961.7528, 961.1457, 958.9655, 957.6415, 953.8603, 947.9236,
         0.    ,   0.    ,   0.    ,   0.    ,   0.    ,   0.    ,
         0.    ,   0.    ,   0.    ,   0.    ])

Using Slurm

What if we need more power? The example above is limited to one machine, but we can use Slurm to spread the work over multiple machines on the Maxwell cluster.

This is massive overkill for this example calculation - we’ll only use one CPU core for a fraction of a second on each machine. But we could do something similar for a much bigger problem.

[10]:
from getpass import getuser
import h5py
import subprocess

We’ll write a Python script for each worker to run. Like the sum_chunk function above, this reads a chunk of data from the virtual dataset and sums it along the train axis. It saves the result into another HDF5 file for us to collect.

[11]:
%%writefile parallel_eg_worker.py
#!/gpfs/exfel/sw/software/xfel_anaconda3/1.1/bin/python
import h5py
import numpy as np
import sys

filename = sys.argv[1]
ds_name = sys.argv[2]
chunk_start = int(sys.argv[3])
chunk_end = int(sys.argv[4])
worker_idx = sys.argv[5]

with h5py.File(filename, 'r') as f:
    ds = f[ds_name]
    data = ds[chunk_start:chunk_end]  # Read my chunk

chunk_totals = data.sum(axis=0, dtype=np.float64)

with h5py.File(f'parallel_eg_result_{worker_idx}.h5', 'w') as f:
    f['chunk_totals'] = chunk_totals
Writing parallel_eg_worker.py

The Maxwell cluster is divided into various partitions for different groups of users. If you’re running this as an external user, comment out the ‘Staff’ line below.

[12]:
partition = 'upex'   # External users
partition = 'exfel'  # Staff

Now we submit 4 jobs with the sbatch command:

[13]:
for i, (start, end) in enumerate(chunks):
    cmd = ['sbatch', '-p', partition, 'parallel_eg_worker.py', vds_filename, xgm_vds.name, str(start), str(end), str(i)]
    print(subprocess.check_output(cmd))
b'Submitted batch job 2631813\n'
b'Submitted batch job 2631814\n'
b'Submitted batch job 2631815\n'
b'Submitted batch job 2631816\n'

We can use squeue to monitor the jobs running. Re-run this until all the jobs have disappeared, meaning they’re finished.

[14]:
!squeue -u {getuser()}
             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)

Now, so long as all the workers succeeded, we can collect the results.

If any workers failed, you’ll find tracebacks in slurm-*.out files in the working directory.

[15]:
res = []

for i in range(N_proc):
    with h5py.File(f'parallel_eg_result_{i}.h5', 'r') as f:
        res.append(f['chunk_totals'][:])

Now res is once again a list of 1D numpy arrays, representing the totals from each chunk. So we can finish the calculation as in the previous section:

[16]:
slurm_mean = np.stack(res).sum(axis=0)[:40] / xgm_vds.shape[0]
np.testing.assert_allclose(slurm_mean, simple_mean)

slurm_mean.round(4)
[16]:
array([834.2744, 860.0754, 869.2637, 891.4351, 899.6227, 899.3759,
       900.3555, 899.1162, 898.4991, 904.4979, 910.5669, 914.1612,
       922.5737, 925.8734, 930.093 , 935.3124, 938.9643, 941.4609,
       946.1351, 950.6574, 951.855 , 954.2491, 956.6414, 957.5584,
       961.7528, 961.1457, 958.9655, 957.6415, 953.8603, 947.9236,
         0.    ,   0.    ,   0.    ,   0.    ,   0.    ,   0.    ,
         0.    ,   0.    ,   0.    ,   0.    ])