From a911f5649df85df5f1e41827a5ffebf120edba94 Mon Sep 17 00:00:00 2001 From: lidanyang Date: Fri, 24 Nov 2023 15:03:03 +0800 Subject: [PATCH] add feature engineering tools --- .../libs/machine_learning/__init__.py | 7 + .../machine_learning/feature_engineering.py | 174 ++++++++++++++++++ .../schemas/machine_learning/__init__.py | 6 + .../machine_learning/feature_engineering.py | 98 ++++++++++ 4 files changed, 285 insertions(+) create mode 100644 metagpt/tools/functions/libs/machine_learning/__init__.py create mode 100644 metagpt/tools/functions/libs/machine_learning/feature_engineering.py create mode 100644 metagpt/tools/functions/schemas/machine_learning/__init__.py create mode 100644 metagpt/tools/functions/schemas/machine_learning/feature_engineering.py diff --git a/metagpt/tools/functions/libs/machine_learning/__init__.py b/metagpt/tools/functions/libs/machine_learning/__init__.py new file mode 100644 index 000000000..5e9760c64 --- /dev/null +++ b/metagpt/tools/functions/libs/machine_learning/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/11/16 16:36 +# @Author : lidanyang +# @File : __init__.py +# @Desc : +from metagpt.tools.functions.libs.machine_learning.feature_engineering import * diff --git a/metagpt/tools/functions/libs/machine_learning/feature_engineering.py b/metagpt/tools/functions/libs/machine_learning/feature_engineering.py new file mode 100644 index 000000000..584bd125d --- /dev/null +++ b/metagpt/tools/functions/libs/machine_learning/feature_engineering.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/11/17 10:33 +# @Author : lidanyang +# @File : feature_engineering.py +# @Desc : Feature Engineering Functions +import itertools + +from dateutil.relativedelta import relativedelta +from pandas.api.types import is_numeric_dtype +from sklearn.preprocessing import PolynomialFeatures, OneHotEncoder + +from metagpt.tools.functions import registry +from metagpt.tools.functions.schemas.machine_learning.feature_engineering import * + + +@registry.register("feature_engineering", PolynomialExpansion) +def polynomial_expansion(df, cols, degree=2): + for col in cols: + if not is_numeric_dtype(df[col]): + raise ValueError(f"Column '{col}' must be numeric.") + + poly = PolynomialFeatures(degree=degree, include_bias=False) + ts_data = poly.fit_transform(df[cols].fillna(0)) + new_columns = poly.get_feature_names_out(cols) + ts_data = pd.DataFrame(ts_data, columns=new_columns, index=df.index) + ts_data = ts_data.drop(cols, axis=1) + df = pd.concat([df, ts_data], axis=1) + return df + + +@registry.register("feature_engineering", OneHotEncoding) +def one_hot_encoding(df, cols): + enc = OneHotEncoder(handle_unknown="ignore", sparse=False) + ts_data = enc.fit_transform(df[cols]) + new_columns = enc.get_feature_names_out(cols) + ts_data = pd.DataFrame(ts_data, columns=new_columns, index=df.index) + df.drop(cols, axis=1, inplace=True) + df = pd.concat([df, ts_data], axis=1) + return df + + +@registry.register("feature_engineering", FrequencyEncoding) +def frequency_encoding(df, cols): + for col in cols: + encoder_dict = df[col].value_counts().to_dict() + df[f"{col}_cnt"] = df[col].map(encoder_dict) + return df + + +@registry.register("feature_engineering", CatCross) +def cat_cross(df, cols, max_cat_num=100): + for col in cols: + if df[col].nunique() > max_cat_num: + cols.remove(col) + + for col1, col2 in itertools.combinations(cols, 2): + cross_col = f"{col1}_cross_{col2}" + df[cross_col] = df[col1].astype(str) + "_" + df[col2].astype(str) + return df + + +@registry.register("feature_engineering", GroupStat) +def group_stat(df, group_col, agg_col, agg_funcs): + group_df = df.groupby(group_col)[agg_col].agg(agg_funcs).reset_index() + group_df.columns = group_col + [ + f"{agg_col}_{agg_func}_by_{group_col}" for agg_func in agg_funcs + ] + df = df.merge(group_df, on=group_col, how="left") + return df + + +@registry.register("feature_engineering", ExtractTimeComps) +def extract_time_comps(df, time_col, time_comps): + time_s = pd.to_datetime(df[time_col], errors="coerce") + time_comps_df = pd.DataFrame() + + if "year" in time_comps: + time_comps_df["year"] = time_s.dt.year + if "month" in time_comps: + time_comps_df["month"] = time_s.dt.month + if "day" in time_comps: + time_comps_df["day"] = time_s.dt.day + if "hour" in time_comps: + time_comps_df["hour"] = time_s.dt.hour + if "dayofweek" in time_comps: + time_comps_df["dayofweek"] = time_s.dt.dayofweek + 1 + if "is_weekend" in time_comps: + time_comps_df["is_weekend"] = time_s.dt.dayofweek.isin([5, 6]).astype(int) + df = pd.concat([df, time_comps_df], axis=1) + return df + + +@registry.register("feature_engineering", FeShiftByTime) +def fe_shift_by_time(df, time_col, group_col, shift_col, periods, freq): + df[time_col] = pd.to_datetime(df[time_col]) + + def shift_datetime(date, offset, unit): + if unit in ["year", "y", "Y"]: + return date + relativedelta(years=offset) + elif unit in ["month", "m", "M"]: + return date + relativedelta(months=offset) + elif unit in ["day", "d", "D"]: + return date + relativedelta(days=offset) + elif unit in ["week", "w", "W"]: + return date + relativedelta(weeks=offset) + elif unit in ["hour", "h", "H"]: + return date + relativedelta(hours=offset) + else: + return date + + def shift_by_time_on_key( + inner_df, time_col, group_col, shift_col, offset, unit, col_name + ): + inner_df = inner_df.drop_duplicates() + inner_df[time_col] = inner_df[time_col].map( + lambda x: shift_datetime(x, offset, unit) + ) + inner_df = inner_df.groupby([time_col, group_col], as_index=False)[ + shift_col + ].mean() + inner_df.rename(columns={shift_col: col_name}, inplace=True) + return inner_df + + shift_df = df[[time_col, group_col, shift_col]].copy() + for period in periods: + new_col_name = f"{group_col}_{shift_col}_lag_{period}_{freq}" + tmp = shift_by_time_on_key( + shift_df, time_col, group_col, shift_col, period, freq, new_col_name + ) + df = df.merge(tmp, on=[time_col, group_col], how="left") + + return df + + +@registry.register("feature_engineering", FeRollingByTime) +def fe_rolling_by_time(df, time_col, group_col, rolling_col, periods, freq, agg_funcs): + df[time_col] = pd.to_datetime(df[time_col]) + + def rolling_by_time_on_key(inner_df, offset, unit, agg_func, col_name): + time_freq = { + "Y": [365 * offset, "D"], + "M": [30 * offset, "D"], + "D": [offset, "D"], + "W": [7 * offset, "D"], + "H": [offset, "h"], + } + + if agg_func not in ["mean", "std", "max", "min", "median", "sum", "count"]: + raise ValueError(f"Invalid agg function: {agg_func}") + + rolling_feat = inner_df.rolling( + f"{time_freq[unit][0]}{time_freq[unit][1]}", closed="left" + ) + rolling_feat = getattr(rolling_feat, agg_func)() + depth = df.columns.nlevels + rolling_feat = rolling_feat.stack(list(range(depth))) + rolling_feat.name = col_name + return rolling_feat + + rolling_df = df[[time_col, group_col, rolling_col]].copy() + for period in periods: + for func in agg_funcs: + new_col_name = f"{group_col}_{rolling_col}_rolling_{period}_{freq}_{func}" + tmp = pd.pivot_table( + rolling_df, + index=time_col, + values=rolling_col, + columns=group_col, + ) + tmp = rolling_by_time_on_key(tmp, period, freq, func, new_col_name) + df = df.merge(tmp, on=[time_col, group_col], how="left") + + return df diff --git a/metagpt/tools/functions/schemas/machine_learning/__init__.py b/metagpt/tools/functions/schemas/machine_learning/__init__.py new file mode 100644 index 000000000..c80872750 --- /dev/null +++ b/metagpt/tools/functions/schemas/machine_learning/__init__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/11/16 16:37 +# @Author : lidanyang +# @File : __init__.py +# @Desc : diff --git a/metagpt/tools/functions/schemas/machine_learning/feature_engineering.py b/metagpt/tools/functions/schemas/machine_learning/feature_engineering.py new file mode 100644 index 000000000..8237c83f4 --- /dev/null +++ b/metagpt/tools/functions/schemas/machine_learning/feature_engineering.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/11/17 10:34 +# @Author : lidanyang +# @File : feature_engineering.py +# @Desc : Schema for feature engineering functions +from typing import List + +import pandas as pd + +from metagpt.tools.functions.schemas.base import field, ToolSchema + + +class PolynomialExpansion(ToolSchema): + """Generate polynomial and interaction features from selected columns, excluding the bias column.""" + + df: pd.DataFrame = field(description="DataFrame to process.") + cols: list = field(description="Columns for polynomial expansion.") + degree: int = field(description="Degree of polynomial features.", default=2) + + +class OneHotEncoding(ToolSchema): + """Apply one-hot encoding to specified categorical columns in a DataFrame.""" + + df: pd.DataFrame = field(description="DataFrame to process.") + cols: list = field(description="Categorical columns to be one-hot encoded.") + + +class FrequencyEncoding(ToolSchema): + """Convert categorical columns to frequency encoding.""" + + df: pd.DataFrame = field(description="DataFrame to process.") + cols: list = field(description="Categorical columns to be frequency encoded.") + + +class CatCross(ToolSchema): + """Create pairwise crossed features from categorical columns, joining values with '_'.""" + + df: pd.DataFrame = field(description="DataFrame to process.") + cols: list = field(description="Columns to be pairwise crossed.") + max_cat_num: int = field( + description="Maximum unique categories per crossed feature.", default=100 + ) + + +class GroupStat(ToolSchema): + """Perform aggregation operations on a specified column grouped by certain categories.""" + + df: pd.DataFrame = field(description="DataFrame to process.") + group_col: str = field(description="Column used for grouping.") + agg_col: str = field(description="Column on which aggregation is performed.") + agg_funcs: list = field( + description="""List of aggregation functions to apply, such as ['mean', 'std']. + Each function must be supported by pandas.""" + ) + + +class ExtractTimeComps(ToolSchema): + """Extract specific time components from a designated time column in a DataFrame.""" + + df: pd.DataFrame = field(description="DataFrame to process.") + time_col: str = field(description="The name of the column containing time data.") + time_comps: List[str] = field( + description="""List of time components to extract. + Each component must be in ['year', 'month', 'day', 'hour', 'dayofweek', 'is_weekend'].""" + ) + + +class FeShiftByTime(ToolSchema): + """Shift column values in a DataFrame based on specified time intervals.""" + + df: pd.DataFrame = field(description="DataFrame to process.") + time_col: str = field(description="Column for time-based shifting.") + group_col: str = field(description="Column for grouping before shifting.") + shift_col: str = field(description="Column to shift.") + periods: list = field(description="Time intervals for shifting.") + freq: str = field( + description="Frequency unit for time intervals (e.g., 'D', 'M').", + enum=["D", "M", "Y", "W", "H"], + ) + + +class FeRollingByTime(ToolSchema): + """Calculate rolling statistics for a DataFrame column over time intervals.""" + + df: pd.DataFrame = field(description="DataFrame to process.") + time_col: str = field(description="Column for time-based rolling.") + group_col: str = field(description="Column for grouping before rolling.") + rolling_col: str = field(description="Column for rolling calculations.") + periods: list = field(description="Window sizes for rolling.") + freq: str = field( + description="Frequency unit for time windows (e.g., 'D', 'M').", + enum=["D", "M", "Y", "W", "H"], + ) + agg_funcs: list = field( + description="""List of aggregation functions for rolling, like ['mean', 'std']. + Each function must be in ['mean', 'std', 'min', 'max', 'median', 'sum', 'count'].""" + )