Source code for tsfresh.scripts.measure_execution_time

# This script extracts the execution time for
# various different settings of tsfresh
# using different input data
# Attention: it will run for ~half a day
# Do these calculations in a controlled environment
# (e.g. a cloud provider VM)
# You will need to have b2luigi installed.
import json
from time import time

import b2luigi as luigi
import numpy as np
import pandas as pd

from tsfresh.feature_extraction import (
    ComprehensiveFCParameters,
    MinimalFCParameters,
    extract_features,
)


[docs] class DataCreationTask(luigi.Task): """Create random data for testing""" num_ids = luigi.IntParameter(default=100) time_series_length = luigi.IntParameter() random_seed = luigi.IntParameter()
[docs] def output(self): yield self.add_to_output("data.csv")
[docs] def run(self): np.random.seed(self.random_seed) df = pd.concat( [ pd.DataFrame( { "id": [i] * self.time_series_length, "time": range(self.time_series_length), "value": np.random.randn(self.time_series_length), } ) for i in range(self.num_ids) ] ) with self._get_output_target("data.csv").open("w") as f: df.to_csv(f)
[docs] @luigi.requires(DataCreationTask) class TimingTask(luigi.Task): """Run tsfresh with the given parameters""" feature_parameter = luigi.DictParameter(hashed=True) n_jobs = luigi.IntParameter() try_number = luigi.IntParameter()
[docs] def output(self): yield self.add_to_output("result.json")
[docs] def run(self): input_file = self._get_input_targets("data.csv")[0] with input_file.open("r") as f: df = pd.read_csv(f) start_time = time() extract_features( df, column_id="id", column_sort="time", n_jobs=self.n_jobs, default_fc_parameters=self.feature_parameter, disable_progressbar=True, ) end_time = time() single_parameter_name = list(self.feature_parameter.keys())[0] single_parameter_params = self.feature_parameter[single_parameter_name] result_json = { "time": end_time - start_time, "n_ids": self.num_ids, "n_jobs": self.n_jobs, "feature": single_parameter_name, "number_parameters": len(single_parameter_params) if single_parameter_params else 0, "time_series_length": int((df["id"] == 0).sum()), "try_number": self.try_number, } with self._get_output_target("result.json").open("w") as f: json.dump(result_json, f)
[docs] @luigi.requires(DataCreationTask) class FullTimingTask(luigi.Task): """Run tsfresh with all calculators for comparison""" n_jobs = luigi.IntParameter()
[docs] def output(self): yield self.add_to_output("result.json")
[docs] def run(self): input_file = self._get_input_targets("data.csv")[0] with input_file.open("r") as f: df = pd.read_csv(f) start_time = time() extract_features( df, column_id="id", column_sort="time", n_jobs=self.n_jobs, disable_progressbar=True, ) end_time = time() result_json = { "time": end_time - start_time, "n_ids": self.num_ids, "n_jobs": self.n_jobs, "time_series_length": int((df["id"] == 0).sum()), } with self._get_output_target("result.json").open("w") as f: json.dump(result_json, f)
[docs] class CombinerTask(luigi.Task): """Collect all tasks into a single result.csv file"""
[docs] def complete(self): return False
[docs] def requires(self): settings = ComprehensiveFCParameters() for job in [0, 1, 4]: for time_series_length in [100, 500, 1000, 5000]: yield FullTimingTask( time_series_length=time_series_length, n_jobs=job, num_ids=10, random_seed=42, ) yield FullTimingTask( time_series_length=time_series_length, n_jobs=job, num_ids=100, random_seed=42, ) for feature_name in settings: yield TimingTask( feature_parameter={feature_name: settings[feature_name]}, time_series_length=time_series_length, n_jobs=job, num_ids=100, try_number=0, random_seed=42, ) for try_number in range(3): yield TimingTask( feature_parameter={feature_name: settings[feature_name]}, n_jobs=job, try_number=try_number, num_ids=10, time_series_length=time_series_length, random_seed=42, )
[docs] def output(self): yield self.add_to_output("results.csv")
[docs] def run(self): results = [] for input_file in self._get_input_targets("result.json"): with input_file.open("r") as f: results.append(json.load(f)) df = pd.DataFrame(results) with self._get_output_target("results.csv").open("w") as f: df.to_csv(f)
if __name__ == "__main__": luigi.set_setting("result_path", "results") luigi.process(CombinerTask())