Streaming

Delta moves data between generator, middleman, and processor using the interface defined in this module. Delta employs the ADIOS2 library for data streaming.

Example code to instantiate a writer and stream data would look like this:

dataloader = get_loader(cfg)
writer = writer_gen(cfg["transport"], gen_channel_name(cfg["diagnostic"])
wrter.DefineVariable(gen_var_name(cfg), dataloader.get_chunk_shape(), dataloader.dtype)
writer.Open()
writer.DefineAttributes("stream_attrs", {"param1": 1.0, "param2": 2.0})

for nstep, chunk in enumerate(dataloader.batch_generator()):
    writer.BeginStep()
    writer.put_data(chunk)
    writer.EndStep()

writer.Close()

During instantiation, the writer uses cfg[‘transport’] to set appropriate ADIOS2 parameters, such as the streaming protocol , and additional parameters. For example, the configuration section

"transport":
{
    "engine": "BP4",
    "params":
    {
        "Threads: 4",
        "InitialBufferSize": "1Gb"
    }
}

will instantiate streaming.reader_mpi.reader_gen that expects input from a BP4-file. Details on BP4 and valid parameters can be found in the ADIOS2 documentation .

Since streaming.writers.writer_gen and streaming.reader_mpi.reader_gen need to open the same channel name, the convenience function data_models.helpers.gen_channel_name() generates such a name from the diagnostic object in the configuration file.

To send data through the stream, a variable has to be defined with a shape and an expected datatype. After the stream has been opened, one can define attributes on it. Data is written to the stream in a step-based manner. After writing has finished, the writer needs to be closed.

Example code to receive the data stream would look like this:

reader = reader_gen(cfg["transport"], gen_channel_name(cfg["diagnostic"])
reader.Open()
stream_attrs = None
stream_varname = gen_var_name(cfg)

while True:
    stepStatus = reader.BeginStep(timeoutSeconds=1.0)
    if stepStatus:
        if stream_attr is is None:
            stream_attrs = reader.get_attrs("stream_attrs")

        stream_data = reader.Get(stream_varname)
    else:
        break

Like the writer, the reader uses cfg[‘transport’[ during instantiation to set appropriate ADIOS2 parameters. The stream name to receive is defined by gen_channel_name helper function. Attributes are initialized empty and the variable name to receive on the stream is defined by the gen_var_name helper function. Entering the receive loop, stepStatus indicates whether a new step is available. If so, at first the attributes of the stream are inquired, if this has not been done before. Then a data packet is received. If no new steps are received after waiting for the defined timeout, it is assumed that the sender has stopped transmitting and receive is aborted.

reader_mpi

writers

stream_stats