# This script extracts the execution time for
# various different settings of tsfresh
# using different input data
# Attention: it will run for ~half a day
# Do these calculations in a controlled environment
# (e.g. a cloud provider VM)
# You will need to have b2luigi installed.
from tsfresh.feature_extraction import ComprehensiveFCParameters, MinimalFCParameters, extract_features
import pandas as pd
import numpy as np
from time import time
import b2luigi as luigi
import json
[docs]class DataCreationTask(luigi.Task):
"""Create random data for testing"""
num_ids = luigi.IntParameter(default=100)
time_series_length = luigi.IntParameter()
random_seed = luigi.IntParameter()
[docs] def output(self):
yield self.add_to_output("data.csv")
[docs] def run(self):
np.random.seed(self.random_seed)
df = pd.concat([
pd.DataFrame({
"id": [i] * self.time_series_length,
"time": range(self.time_series_length),
"value": np.random.randn(self.time_series_length)
})
for i in range(self.num_ids)
])
with self._get_output_target("data.csv").open("w") as f:
df.to_csv(f)
[docs]@luigi.requires(DataCreationTask)
class TimingTask(luigi.Task):
"""Run tsfresh with the given parameters"""
feature_parameter = luigi.DictParameter(hashed=True)
n_jobs = luigi.IntParameter()
try_number = luigi.IntParameter()
[docs] def output(self):
yield self.add_to_output("result.json")
[docs] def run(self):
input_file = self._get_input_targets("data.csv")[0]
with input_file.open("r") as f:
df = pd.read_csv(f)
start_time = time()
extract_features(df, column_id="id", column_sort="time", n_jobs=self.n_jobs,
default_fc_parameters=self.feature_parameter,
disable_progressbar=True)
end_time = time()
single_parameter_name = list(self.feature_parameter.keys())[0]
single_parameter_params = self.feature_parameter[single_parameter_name]
result_json = {
"time": end_time - start_time,
"n_ids": self.num_ids,
"n_jobs": self.n_jobs,
"feature": single_parameter_name,
"number_parameters": len(single_parameter_params) if single_parameter_params else 0,
"time_series_length": int((df["id"] == 0).sum()),
"try_number": self.try_number,
}
with self._get_output_target("result.json").open("w") as f:
json.dump(result_json, f)
[docs]@luigi.requires(DataCreationTask)
class FullTimingTask(luigi.Task):
"""Run tsfresh with all calculators for comparison"""
n_jobs = luigi.IntParameter()
[docs] def output(self):
yield self.add_to_output("result.json")
[docs] def run(self):
input_file = self._get_input_targets("data.csv")[0]
with input_file.open("r") as f:
df = pd.read_csv(f)
start_time = time()
extract_features(df, column_id="id", column_sort="time", n_jobs=self.n_jobs,
disable_progressbar=True)
end_time = time()
result_json = {
"time": end_time - start_time,
"n_ids": self.num_ids,
"n_jobs": self.n_jobs,
"time_series_length": int((df["id"] == 0).sum()),
}
with self._get_output_target("result.json").open("w") as f:
json.dump(result_json, f)
[docs]class CombinerTask(luigi.Task):
"""Collect all tasks into a single result.csv file"""
[docs] def complete(self):
return False
[docs] def requires(self):
settings = ComprehensiveFCParameters()
for job in [0, 1, 4]:
for time_series_length in [100, 500, 1000, 5000]:
yield FullTimingTask(time_series_length=time_series_length,
n_jobs=job,
num_ids=10,
random_seed=42)
yield FullTimingTask(time_series_length=time_series_length,
n_jobs=job,
num_ids=100,
random_seed=42)
for feature_name in settings:
yield TimingTask(
feature_parameter={feature_name: settings[feature_name]},
time_series_length=time_series_length,
n_jobs=job,
num_ids=100,
try_number=0,
random_seed=42
)
for try_number in range(3):
yield TimingTask(
feature_parameter={feature_name: settings[feature_name]},
n_jobs=job,
try_number=try_number,
num_ids=10,
time_series_length=time_series_length,
random_seed=42
)
[docs] def output(self):
yield self.add_to_output("results.csv")
[docs] def run(self):
results = []
for input_file in self._get_input_targets("result.json"):
with input_file.open("r") as f:
results.append(json.load(f))
df = pd.DataFrame(results)
with self._get_output_target("results.csv").open("w") as f:
df.to_csv(f)
if __name__ == "__main__":
luigi.set_setting("result_path", "results")
luigi.process(CombinerTask())