Pre-processing
This module provides the building blocks to construct a composable pre-processing pipeline.
Data chunks that are passed into the pre-processing pipeline are sequentially transformed by
each pre-processing routine. The pipeline is defined by the preprocess section in the
configuration file. Each entry in this section is interpreted as the name of a preprocessing routine
and its parameters. The routines itself are executed on a PEP-3148 style executor.
For example, the entry
"preprocess":
{
"bandpass_fir": {"N": 5, "Wn": [0.02, 0.036], "btype": "bandpass", "output": "sos"},
"plot": {"time_range": [2.7175, 2.7178], "plot_dir": "/home/user/delta_run/plots/"}
}
defines a pre-processing pipeline consisting of two routines,
preprocess.pre_bandpass.bandpass_fir and
preprocess.pre_plot.pre_plot. The function preprocess.helpers.get_preprocess_routine()
implements a mapping from keys such as bandpass_fir and
plot to the respective preprocessor classes.
Preprocessor classes are instantiated like this:
pp_fir = pre_bandpass_fir(params)
pp_plot = pre_plot(params)
The parameters are stored as member variables by the class objects. To preprocess time chunks,
preprocessing classes define preprocess.wavelet.process() member. This preprocesses a
time-chunk on an executor and returns the transformed time-chunk.
By returning the same data type as was passed into the process member, preprocessing routines can be combined into a preprocessing pipeline:
executor_pre = ThreadPoolExecutor(max_workers=4)
my_preprocessor = preprocessor(executor_pre, cfg["preprocess"])
while True:
stream_data = reader.Get(stream_varname)
msg = data_model_gen.new_chunk(stream_data, reader.CurrentStep())
...
msg_pp = my_preprocessor.submit(msg)
First, an executor on which the pre-processing will be performed, is instantiated. In the example here, this is a ThreadPoolExecutor that uses 4 worker threads on the root node. Following that call, the pre-processing pipeline is instantiated.
In the receiver loop, this pipeline is executed by calling
preprocess.preprocessor.submit() with the just received
message. The pre-processed time-chunk is then avilable as msg_pp.
As a common interface, all preprocessor classes define the method process, taking an
data_models.data_model as input and returning the same object type. This
allows to compose preprocessing routines into a pipeline.
Contents