Source code for tsfresh.convenience.bindings

from functools import partial

import pandas as pd

from tsfresh.feature_extraction.extraction import _do_extraction_on_chunk
from tsfresh.feature_extraction.settings import ComprehensiveFCParameters

def _feature_extraction_on_chunk_helper(
    Helper function wrapped around _do_extraction_on_chunk to use the correct format
    of the "chunk" and output a pandas dataframe.
    Is used e.g. in the convenience functions for dask and spark.

    For the definitions of the parameters, please see these convenience functions.
    if default_fc_parameters is None and kind_to_fc_parameters is None:
        default_fc_parameters = ComprehensiveFCParameters()
    elif default_fc_parameters is None and kind_to_fc_parameters is not None:
        default_fc_parameters = {}

    if column_sort is not None:
        df = df.sort_values(column_sort)

    chunk = df[column_id].iloc[0], df[column_kind].iloc[0], df[column_value]
    features = _do_extraction_on_chunk(
    features = pd.DataFrame(features, columns=[column_id, "variable", "value"])
    features["value"] = features["value"].astype("double")

    return features[[column_id, "variable", "value"]]

[docs] def dask_feature_extraction_on_chunk( df, column_id, column_kind, column_value, column_sort=None, default_fc_parameters=None, kind_to_fc_parameters=None, ): """ Extract features on a grouped dask dataframe given the column names and the extraction settings. This wrapper function should only be used if you have a dask dataframe as input. All format handling (input and output) needs to be done before or after that. Examples ======== For example if you want to extract features on the robot example dataframe (stored as csv): Import statements: >>> from dask import dataframe as dd >>> from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk >>> from tsfresh.feature_extraction.settings import MinimalFCParameters Read in the data >>> df = dd.read_csv("robot.csv") Prepare the data into correct format. The format needs to be a grouped dataframe (grouped by time series id and feature kind), where each group chunk consists of a dataframe with exactly 4 columns: ``column_id``, ``column_kind``, ``column_sort`` and ``column_value``. You can find the description of the columns in :ref:`data-formats-label`. Please note: for this function to work you need to have all columns present! If necessary create the columns and fill them with dummy values. >>> df = df.melt(id_vars=["id", "time"], ... value_vars=["F_x", "F_y", "F_z", "T_x", "T_y", "T_z"], ... var_name="kind", value_name="value") >>> df_grouped = df.groupby(["id", "kind"]) Call the feature extraction >>> features = dask_feature_extraction_on_chunk(df_grouped, column_id="id", column_kind="kind", ... column_sort="time", column_value="value", ... default_fc_parameters=MinimalFCParameters()) Write out the data in a tabular format >>> features = features.categorize(columns=["variable"]) >>> features = features.reset_index(drop=True) \\ ... .pivot_table(index="id", columns="variable", values="value", aggfunc="mean") >>> features.to_csv("output") :param df: A dask dataframe grouped by id and kind. :type df: dask.dataframe.groupby.DataFrameGroupBy :param default_fc_parameters: mapping from feature calculator names to parameters. Only those names which are keys in this dict will be calculated. See the class:`ComprehensiveFCParameters` for more information. :type default_fc_parameters: dict :param kind_to_fc_parameters: mapping from kind names to objects of the same type as the ones for default_fc_parameters. If you put a kind as a key here, the fc_parameters object (which is the value), will be used instead of the default_fc_parameters. This means that kinds, for which kind_of_fc_parameters doe not have any entries, will be ignored by the feature selection. :type kind_to_fc_parameters: dict :param column_id: The name of the id column to group by. :type column_id: str :param column_sort: The name of the sort column. :type column_sort: str or None :param column_kind: The name of the column keeping record on the kind of the value. :type column_kind: str :param column_value: The name for the column keeping the value itself. :type column_value: str :return: A dask dataframe with the columns ``column_id``, "variable" and "value". The index is taken from the grouped dataframe. :rtype: dask.dataframe.DataFrame (id int64, variable object, value float64) """ feature_extraction = partial( _feature_extraction_on_chunk_helper, column_id=column_id, column_kind=column_kind, column_sort=column_sort, column_value=column_value, default_fc_parameters=default_fc_parameters, kind_to_fc_parameters=kind_to_fc_parameters, ) return df.apply( feature_extraction, meta=[(column_id, "int64"), ("variable", "object"), ("value", "float64")], )
[docs] def spark_feature_extraction_on_chunk( df, column_id, column_kind, column_value, column_sort=None, default_fc_parameters=None, kind_to_fc_parameters=None, ): """ Extract features on a grouped spark dataframe given the column names and the extraction settings. This wrapper function should only be used if you have a spark dataframe as input. All format handling (input and output) needs to be done before or after that. Examples ======== For example if you want to extract features on the robot example dataframe (stored as csv): Import statements: >>> from tsfresh.convenience.bindings import spark_feature_extraction_on_chunk >>> from tsfresh.feature_extraction.settings import MinimalFCParameters Read in the data >>> df = Prepare the data into correct format. The format needs to be a grouped dataframe (grouped by time series id and feature kind), where each group chunk consists of a dataframe with exactly 4 columns: ``column_id``, ``column_kind``, ``column_sort`` and ``column_value``. You can find the description of the columns in :ref:`data-formats-label`. Please note: for this function to work you need to have all columns present! If necessary create the columns and fill them with dummy values. >>> df = ... >>> df_grouped = df.groupby(["id", "kind"]) Call the feature extraction >>> features = spark_feature_extraction_on_chunk(df_grouped, column_id="id", column_kind="kind", ... column_sort="time", column_value="value", ... default_fc_parameters=MinimalFCParameters()) Write out the data in a tabular format >>> features = features.groupby("id").pivot("variable").sum("value") >>> features.write.csv("output") :param df: A spark dataframe grouped by id and kind. :type df: :param default_fc_parameters: mapping from feature calculator names to parameters. Only those names which are keys in this dict will be calculated. See the class:`ComprehensiveFCParameters` for more information. :type default_fc_parameters: dict :param kind_to_fc_parameters: mapping from kind names to objects of the same type as the ones for default_fc_parameters. If you put a kind as a key here, the fc_parameters object (which is the value), will be used instead of the default_fc_parameters. This means that kinds, for which kind_of_fc_parameters doe not have any entries, will be ignored by the feature selection. :type kind_to_fc_parameters: dict :param column_id: The name of the id column to group by. :type column_id: str :param column_sort: The name of the sort column. :type column_sort: str or None :param column_kind: The name of the column keeping record on the kind of the value. :type column_kind: str :param column_value: The name for the column keeping the value itself. :type column_value: str :return: A dask dataframe with the columns ``column_id``, "variable" and "value". :rtype: pyspark.sql.DataFrame[id: bigint, variable: string, value: double] """ from pyspark.sql.functions import PandasUDFType, pandas_udf feature_extraction = partial( _feature_extraction_on_chunk_helper, column_id=column_id, column_kind=column_kind, column_sort=column_sort, column_value=column_value, default_fc_parameters=default_fc_parameters, kind_to_fc_parameters=kind_to_fc_parameters, ) type_string = "{column_id} long, variable string, value double".format( column_id=column_id ) feature_extraction_udf = pandas_udf(type_string, PandasUDFType.GROUPED_MAP)( feature_extraction ) return df.apply(feature_extraction_udf)