Parallelization

The feature extraction, the feature selection as well as the rolling offer the possibility of parallelization. By default, all of those tasks are parallelized by tsfresh. Here we discuss the different settings to control the parallelization. To achieve best results for your use-case you should experiment with the parameters.

Note

This document describes parallelization for processing time speed up. If you are dealing with large amounts of data (which might not fit into memory anymore), you can also have a look into Large Input Data.

Please let us know about your results tuning the below mentioned parameters! It will help improve this document as well as the default settings.

Parallelization of Feature Selection

We use a multiprocessing.Pool to parallelize the calculation of the p-values for each feature. On instantiation we set the Pool’s number of worker processes to n_jobs. This field defaults to the number of processors on the current system. We recommend setting it to the maximum number of available (and otherwise idle) processors.

The chunksize of the Pool’s map function is another important parameter to consider. It can be set via the chunksize field. By default it is up to multiprocessing.Pool is parallelisation parameter. One data chunk is defined as a singular time series for one id and one kind. The chunksize is the number of chunks that are submitted as one task to one worker process. If you set the chunksize to 10, then it means that one worker task corresponds to calculate all features for 10 id/kind time series combinations. If it is set it to None, depending on distributor, heuristics are used to find the optimal chunksize. The chunksize can have an crucial influence on the optimal cluster performance and should be optimised in benchmarks for the problem at hand.

Parallelization of Feature Extraction

For the feature extraction tsfresh exposes the parameters n_jobs and chunksize. Both behave analogue to the parameters for the feature selection.

To do performance studies and profiling, it sometimes quite useful to turn off parallelization at all. This can be setting the parameter n_jobs to 0.

Parallelization beyond a single machine

The high volume of time series data can demand an analysis at scale. So, time series need to be processed on a group of computational units instead of a singular machine.

Accordingly, it may be necessary to distribute the extraction of time series features to a cluster. Indeed, it is possible to extract features with tsfresh in a distributed fashion. This page will explain how to setup a distributed tsfresh.

To distribute the calculation of features, we use a certain object, the Distributor class (contained in the tsfresh.utilities.distribution module).

Essentially, a Distributor organizes the application of feature calculators to data chunks. It maps the feature calculators to the data chunks and then reduces them, meaning that it combines the results of the individual mapping into one object, the feature matrix.

So, Distributor will, in the following order,

  1. calculates an optimal chunk_size, based on the characteristics of the time series data at hand (by calculate_best_chunk_size())
  2. split the time series data into chunks (by partition())
  3. distribute the applying of the feature calculators to the data chunks (by distribute())
  4. combine the results into the feature matrix (by map_reduce())
  5. close all connections, shutdown all resources and clean everything (by close())

So, how can you use such a Distributor to extract features with tsfresh? You will have to pass it into as the distributor argument to the extract_features() method.

The following example shows how to define the MultiprocessingDistributor, which will distribute the calculations to a local pool of threads:

from tsfresh.examples.robot_execution_failures import \
    download_robot_execution_failures, \
    load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import MultiprocessingDistributor

# download and load some time series data
download_robot_execution_failures()
df, y = load_robot_execution_failures()

# We construct a Distributor that will spawn the calculations
# over four threads on the local machine
Distributor = MultiprocessingDistributor(n_workers=4,
                                         disable_progressbar=False,
                                         progressbar_title="Feature Extraction")

# just to pass the Distributor object to
# the feature extraction, along the other parameters
X = extract_features(timeseries_container=df,
                     column_id='id', column_sort='time',
                     distributor=Distributor)

This example actually corresponds to the existing multiprocessing tsfresh API, where you just specify the number of jobs, without the need to construct the Distributor:

from tsfresh.examples.robot_execution_failures import \
    download_robot_execution_failures, \
    load_robot_execution_failures
from tsfresh.feature_extraction import extract_features

download_robot_execution_failures()
df, y = load_robot_execution_failures()

X = extract_features(timeseries_container=df,
                     column_id='id', column_sort='time',
                     n_jobs=4)

Using dask to distribute the calculations

We provide distributor for the dask framework, where “Dask is a flexible parallel computing library for analytic computing.”

Note

This part of the documentation only handles parallelizing the computation using a dask cluster. The input and output are still pandas objects. If you want to use dask’s capabilities to scale to data beyond your local memory, have a look into Large Input Data.

Dask is a great framework to distribute analytic calculations to a cluster. It scales up and down, meaning that you can even use it on a singular machine. The only thing that you will need to run tsfresh on a Dask cluster is the ip address and port number of the dask-scheduler.

Lets say that your dask scheduler is running at 192.168.0.1:8786, then we can easily construct a ClusterDaskDistributor that connects to the sceduler and distributes the time series data and the calculation to a cluster:

from tsfresh.examples.robot_execution_failures import \
    download_robot_execution_failures, \
    load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import ClusterDaskDistributor

download_robot_execution_failures()
df, y = load_robot_execution_failures()

Distributor = ClusterDaskDistributor(address="192.168.0.1:8786")

X = extract_features(timeseries_container=df,
                     column_id='id', column_sort='time',
                     distributor=Distributor)

Compared to the MultiprocessingDistributor example from above, we only had to change one line to switch from one machine to a whole cluster. It is as easy as that. By changing the Distributor you can easily deploy your application to run to a cluster instead of your workstation.

You can also use a local DaskCluster on your local machine to emulate a Dask network. The following example shows how to setup a LocalDaskDistributor on a local cluster of 3 workers:

from tsfresh.examples.robot_execution_failures import \
    download_robot_execution_failures, \
    load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import LocalDaskDistributor

download_robot_execution_failures()
df, y = load_robot_execution_failures()

Distributor = LocalDaskDistributor(n_workers=3)

X = extract_features(timeseries_container=df,
                     column_id='id', column_sort='time',
                     distributor=Distributor)

Writing your own distributor

If you want to user another framework than Dask, you will have to write your own Distributor. To construct your custom Distributor, you will have to define an object that inherits from the abstract base class tsfresh.utilities.distribution.DistributorBaseClass. The tsfresh.utilities.distribution module contains more information about what you will need to implement.