Streaming data over ZeroMQ

Karabo Bridge provides access to live data during the experiment over a ZeroMQ socket. The extra_data Python package can stream data from files using the same protocol. You can use this to test code which expects to receive data from Karabo Bridge, or use the same code for analysing live data and stored data.

To stream data from a saved run, use the karabo-bridge-serve-run command:

#                     Proposal run
karabo-bridge-serve-run 700000 40 --port 4545 \
     --include 'SPB_IRDA_JF4M/DET/JNGFR*:daqOutput' \
     --include '*/MOTOR/*[*Position]'

The port number (4545 above) must be an unused TCP port above 1024. Clients will need this port and the IP address of the sender to connect. For clients running on the same node, use the IP address 127.0.0.1. Command-line options are explained on the command reference page.

Note

If you install EXtra-data yourself, streaming data requires some optional dependencies. To ensure you have these, run:

pip install extra_data[bridge]

These dependencies are installed in the EuXFEL Anaconda installation on the Maxwell cluster.

We provide Karabo bridge clients as Python and C++ libraries.

If you want to do some processing on the data before streaming it, you can use this Python interface to send it out:

class extra_data.export.ZMQStreamer(port, sock='REP', maxlen=10, protocol_version='2.2', dummy_timestamps=False)
start()
feed(data, metadata=None, block=True, timeout=None)

Push data to the sending queue.

This blocks if the queue already has maxlen items waiting to be sent.

Parameters:
  • data (dict) – Contains train data. The dictionary has to follow the karabo_bridge see serialize() for details

  • metadata (dict, optional) – Contains train metadata. If the metadata dict is not provided it will be extracted from ‘data’ or an empty dict if ‘metadata’ key is missing from a data source. see serialize() for details

  • block (bool) – If True, block if necessary until a free slot is available or ‘timeout’ has expired. If False and there is no free slot, raises ‘queue.Full’ (timeout is ignored)

  • timeout (float) – In seconds, raises ‘queue.Full’ if no free slow was available within that time.