Averaging detector data with Dask

We often want to average large detector data across trains, keeping the pulses within each train separate, so we have an average image for pulse 0, another for pulse 1, etc.

This data may be too big to load into memory at once, but using Dask we can work with it like a numpy array. Dask takes care of splitting the job up into smaller pieces and assembling the result.

[1]:
from extra_data import open_run

import dask.array as da
from dask.distributed import Client, progress
from dask_jobqueue import SLURMCluster
import numpy as np

First, we use Dask-Jobqueue to talk to the Maxwell cluster.

[2]:
partition = 'exfel'  # For EuXFEL staff
#partition = 'upex'   # For users

cluster = SLURMCluster(
    queue=partition,
    local_directory='/scratch',  # Local disk space for workers to use

    # Resources per SLURM job (per node, the way SLURM is configured on Maxwell)
    # processes=16 runs 16 Dask workers in a job, so each worker has 1 core & 32 GB RAM.
    processes=16, cores=16, memory='512GB',
)

# Get a notbook widget showing the cluster state
cluster
[3]:
# Submit 2 Slurm jobs, for 32 Dask workers
cluster.scale(32)

If the cluster is busy, you might need to wait a while for the jobs to start. The cluster widget above will update when they’re running.

Next, we’ll set Dask up to use those workers:

[4]:
client = Client(cluster)
print("Created dask client:", client)
Created dask client: <Client: scheduler='tcp://131.169.193.102:44986' processes=32 cores=32>

Now Dask is ready, let’s open the run we’re going to operate on:

[5]:
run = open_run(proposal=700000, run=2)
run.info()
# of trains:    3392
Duration:       0:05:39.2
First train ID: 79726751
Last train ID:  79730142

16 detector modules (SPB_DET_AGIPD1M-1)
  e.g. module SPB_DET_AGIPD1M-1 0 : 512 x 128 pixels
  SPB_DET_AGIPD1M-1/DET/0CH0:xtdf
  64 frames per train, up to 217088 frames total

3 instrument sources (excluding detectors):
  - SA1_XTD2_XGM/XGM/DOOCS:output
  - SPB_IRU_SIDEMIC_CAM:daqOutput
  - SPB_XTD9_XGM/XGM/DOOCS:output

13 control sources: (1 entry per train)
  - ACC_SYS_DOOCS/CTRL/BEAMCONDITIONS
  - SA1_XTD2_XGM/XGM/DOOCS
  - SPB_IRU_AGIPD1M/PSC/HV
  - SPB_IRU_AGIPD1M/TSENS/H1_T_EXTHOUS
  - SPB_IRU_AGIPD1M/TSENS/H2_T_EXTHOUS
  - SPB_IRU_AGIPD1M/TSENS/Q1_T_BLOCK
  - SPB_IRU_AGIPD1M/TSENS/Q2_T_BLOCK
  - SPB_IRU_AGIPD1M/TSENS/Q3_T_BLOCK
  - SPB_IRU_AGIPD1M/TSENS/Q4_T_BLOCK
  - SPB_IRU_AGIPD1M1/CTRL/MC1
  - SPB_IRU_AGIPD1M1/CTRL/MC2
  - SPB_IRU_VAC/GAUGE/GAUGE_FR_6
  - SPB_XTD9_XGM/XGM/DOOCS

We’re working with data from the AGIPD detector. In this run, it’s recording 64 frames for each train - this is part of the info above.

We can get a dask array for each module. This doesn’t load the data yet, but it knows what shape it is:

[6]:
run.get_dask_array('SPB_DET_AGIPD1M-1/DET/0CH0:xtdf', 'image.data')
[6]:
Array Chunk
Bytes 50.30 GB 2.15 GB
Shape (191872, 2, 512, 128) (8192, 2, 512, 128)
Count 54 Tasks 24 Chunks
Type uint16 numpy.ndarray
191872 1 128 512 2

Now, we’ll define how we’re going to average over trains for each module:

[7]:
def average_module(modno, run, pulses_per_train=64):
    source = f'SPB_DET_AGIPD1M-1/DET/{modno}CH0:xtdf'
    counts = run.get_data_counts(source, 'image.data')

    arr = run.get_dask_array(source, 'image.data')[:, :1]
    # Make a new dimension for trains
    arr_trains = arr.reshape(-1, pulses_per_train, 512, 128)
    if modno == 0:
        print("array shape:", arr.shape)  # frames, dummy, 512, 128
        print("Reshaped to:", arr_trains.shape)

    return arr_trains.mean(axis=0, dtype=np.float32)
[8]:
mod_averages = [
    average_module(i, run, pulses_per_train=64)
    for i in range(16)
]

mod_averages
array shape: (191872, 1, 512, 128)
Reshaped to: (2998, 64, 512, 128)
[8]:
[dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
 dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>]
[9]:
# Stack the averages into a single array
all_average = da.stack(mod_averages)
all_average
[9]:
Array Chunk
Bytes 268.44 MB 16.78 MB
Shape (16, 64, 512, 128) (1, 64, 512, 128)
Count 2710 Tasks 16 Chunks
Type float32 numpy.ndarray
16 1 128 512 64

Dask shows us what shape the result array will be, but so far, no real computation has happened. Now that we’ve defined what we want, let’s tell Dask to compute it.

This will take a minute or two. If you’re running it, scroll up to the Dask cluster widget and click the status link to see what it’s doing.

[10]:
%%time
all_average_arr = all_average.compute()  # Get a concrete numpy array for the result
CPU times: user 29.5 s, sys: 1.99 s, total: 31.4 s
Wall time: 1min 50s

all_average_arr is a regular numpy array with our results. Here are the values from the corner of module 0, frame 0:

[11]:
print(all_average_arr[0, 0, :5, :5])
[[5172.2964 5027.5137 5265.5615 4804.2617 4851.1353]
 [5534.881  5434.519  5051.8687 4966.505  5019.865 ]
 [5271.6772 5522.6396 5437.736  5310.047  5110.2173]
 [5560.7173 5607.7104 4831.513  4956.135  5309.423 ]
 [5209.8374 5452.4673 5573.138  5163.399  4962.6494]]

Please shut down the cluster (or scale it down to 0 workers) if you won’t be using it for a while. This releases the resources for other people.

[12]:
client.close()
cluster.close()