Averaging detector data with Dask
We often want to average large detector data across trains, keeping the pulses within each train separate, so we have an average image for pulse 0, another for pulse 1, etc.
This data may be too big to load into memory at once, but using Dask we can work with it like a numpy array. Dask takes care of splitting the job up into smaller pieces and assembling the result.
[1]:
from extra_data import open_run
import dask.array as da
from dask.distributed import Client, progress
from dask_jobqueue import SLURMCluster
import numpy as np
First, we use Dask-Jobqueue to talk to the Maxwell cluster.
[2]:
partition = 'exfel' # For EuXFEL staff
#partition = 'upex' # For users
cluster = SLURMCluster(
queue=partition,
local_directory='/scratch', # Local disk space for workers to use
# Resources per SLURM job (per node, the way SLURM is configured on Maxwell)
# processes=16 runs 16 Dask workers in a job, so each worker has 1 core & 32 GB RAM.
processes=16, cores=16, memory='512GB',
)
# Get a notbook widget showing the cluster state
cluster
[3]:
# Submit 2 Slurm jobs, for 32 Dask workers
cluster.scale(32)
If the cluster is busy, you might need to wait a while for the jobs to start. The cluster widget above will update when they’re running.
Next, we’ll set Dask up to use those workers:
[4]:
client = Client(cluster)
print("Created dask client:", client)
Created dask client: <Client: scheduler='tcp://131.169.193.102:44986' processes=32 cores=32>
Now Dask is ready, let’s open the run we’re going to operate on:
[5]:
run = open_run(proposal=700000, run=2)
run.info()
# of trains: 3392
Duration: 0:05:39.2
First train ID: 79726751
Last train ID: 79730142
16 detector modules (SPB_DET_AGIPD1M-1)
e.g. module SPB_DET_AGIPD1M-1 0 : 512 x 128 pixels
SPB_DET_AGIPD1M-1/DET/0CH0:xtdf
64 frames per train, up to 217088 frames total
3 instrument sources (excluding detectors):
- SA1_XTD2_XGM/XGM/DOOCS:output
- SPB_IRU_SIDEMIC_CAM:daqOutput
- SPB_XTD9_XGM/XGM/DOOCS:output
13 control sources: (1 entry per train)
- ACC_SYS_DOOCS/CTRL/BEAMCONDITIONS
- SA1_XTD2_XGM/XGM/DOOCS
- SPB_IRU_AGIPD1M/PSC/HV
- SPB_IRU_AGIPD1M/TSENS/H1_T_EXTHOUS
- SPB_IRU_AGIPD1M/TSENS/H2_T_EXTHOUS
- SPB_IRU_AGIPD1M/TSENS/Q1_T_BLOCK
- SPB_IRU_AGIPD1M/TSENS/Q2_T_BLOCK
- SPB_IRU_AGIPD1M/TSENS/Q3_T_BLOCK
- SPB_IRU_AGIPD1M/TSENS/Q4_T_BLOCK
- SPB_IRU_AGIPD1M1/CTRL/MC1
- SPB_IRU_AGIPD1M1/CTRL/MC2
- SPB_IRU_VAC/GAUGE/GAUGE_FR_6
- SPB_XTD9_XGM/XGM/DOOCS
We’re working with data from the AGIPD detector. In this run, it’s recording 64 frames for each train - this is part of the info above.
We can get a dask array for each module. This doesn’t load the data yet, but it knows what shape it is:
[6]:
run.get_dask_array('SPB_DET_AGIPD1M-1/DET/0CH0:xtdf', 'image.data')
[6]:
|
Now, we’ll define how we’re going to average over trains for each module:
[7]:
def average_module(modno, run, pulses_per_train=64):
source = f'SPB_DET_AGIPD1M-1/DET/{modno}CH0:xtdf'
counts = run.get_data_counts(source, 'image.data')
arr = run.get_dask_array(source, 'image.data')[:, :1]
# Make a new dimension for trains
arr_trains = arr.reshape(-1, pulses_per_train, 512, 128)
if modno == 0:
print("array shape:", arr.shape) # frames, dummy, 512, 128
print("Reshaped to:", arr_trains.shape)
return arr_trains.mean(axis=0, dtype=np.float32)
[8]:
mod_averages = [
average_module(i, run, pulses_per_train=64)
for i in range(16)
]
mod_averages
array shape: (191872, 1, 512, 128)
Reshaped to: (2998, 64, 512, 128)
[8]:
[dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>,
dask.array<mean_agg-aggregate, shape=(64, 512, 128), dtype=float32, chunksize=(64, 512, 128), chunktype=numpy.ndarray>]
[9]:
# Stack the averages into a single array
all_average = da.stack(mod_averages)
all_average
[9]:
|
Dask shows us what shape the result array will be, but so far, no real computation has happened. Now that we’ve defined what we want, let’s tell Dask to compute it.
This will take a minute or two. If you’re running it, scroll up to the Dask cluster widget and click the status link to see what it’s doing.
[10]:
%%time
all_average_arr = all_average.compute() # Get a concrete numpy array for the result
CPU times: user 29.5 s, sys: 1.99 s, total: 31.4 s
Wall time: 1min 50s
all_average_arr
is a regular numpy array with our results. Here are the values from the corner of module 0, frame 0:
[11]:
print(all_average_arr[0, 0, :5, :5])
[[5172.2964 5027.5137 5265.5615 4804.2617 4851.1353]
[5534.881 5434.519 5051.8687 4966.505 5019.865 ]
[5271.6772 5522.6396 5437.736 5310.047 5110.2173]
[5560.7173 5607.7104 4831.513 4956.135 5309.423 ]
[5209.8374 5452.4673 5573.138 5163.399 4962.6494]]
Please shut down the cluster (or scale it down to 0 workers) if you won’t be using it for a while. This releases the resources for other people.
[12]:
client.close()
cluster.close()