Following TorchArc's principle of building neural network architecture from configuration file, complex and tideous data transformation job should also be configuration-driven.
After many iterations, I found that the fastest way for doing data transformation for independent-sized ML project is to lean on Scikit's ColumnTransformer API, which also has an extension from Dask for larger workload. The API however is still code-heavy, so I wrote a wrapper that takes YAML and construct ColumnTransformers.
And if you need parallelization, install dask-ml additionally:
pipinstalldask-ml
Let's see how it's used.
Example Usage
The ColumnTransformer class of Scikit / DaskML allows us to build a complex pipeline of feature preprocessors/transformers that takes dataframe as input and outputs numpy arrays. However, using it requires maintaining Python code.
This project started with the vision of building the entire feature transformation pipeline by just specifying what preprocessors to apply to a dataframe's column.
For example, take the iris dataset with columns: sepal length (cm), sepal width (cm), petal length (cm), petal width (cm), target. We want the first 4 columns to be the features for our input x, where each feature goes through a StandardScaler; and target to be the feature of our output y, where it is one-hot encoded. Then, use this directly to fit_transform the iris dataframe and obtain numpy arrays xs, ys. Here's the code:
from feature_transform import transformfrom sklearn import datasetsimport pandas as pd# specify transform for each featurespec ={'dataset':{'transform':{'module':'sklearn','n_jobs':1}},'transform':{'x':{# the "mode"'sepal length (cm)':{'StandardScaler':None},# the column name and its {preprocessor: kwargs, ...}'sepal width (cm)':{'StandardScaler':None},'petal length (cm)':{'StandardScaler':None},'petal width (cm)':{'StandardScaler':None},},'y':{'target':{'OneHotEncoder':{'sparse':False,'handle_unknown':'ignore'}}}}}# load iris dataframedata_df = pd.concat(datasets.load_iris(return_X_y=True, as_frame=True), axis=1)# transform into numpy arrays ready for modelmode2data = transform.fit_transform(spec, stage='fit', df=data_df)xs, ys = mode2data['x'], mode2data['y']# to reload the fitted transformers for validation/test, specify stage='validate' or 'test'val_df = data_df.copy()mode2val_data = transform.fit_transform(spec, stage='validate', df=val_df)val_xs, val_ys = mode2val_data['x'], mode2val_data['y']# artifacts to get the column transformers and transformed names directlyartifacts = transform.get_artifacts(spec)artifacts['mode2col_transfmr']# {'x': ColumnTransformer(n_jobs=1, sparse_threshold=0, transformers=[('sepal length (cm)', Pipeline(steps=[('standardscaler',...artifacts['mode2transformed_names']# {'x': ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)'],# 'y': ['target_0', 'target_1', 'target_2']}
What happens in the background is as follows:
for each mode in spec.transform
for each column in mode, create a pipeline of [preprocessor(**kwargs)], and compose them into a ColumnTransformer for the mode.
during fit_transform, each mode runs its ColumnTransformer.fit_transform
then it saves the fitted ColumnTransformer to ./data/{hash}-{mode}-col_transfmr.pkl.
these filenames will be logged. These files are the ones loaded in transform.get_artifacts for uses such as test/validation.
Using YAML config
The goal of this library is to make feature transform configuration, so let's do the same as above, but with a YAML config file. The spec format is:
dataset:transform:module: {str} # options: 'sklearn' (serial-row) or 'dask_ml' (parallel-row)n_jobs: {null|int} # parallelization; -1 to use all corestransform: {mode}: {column}: {preprocessor}: {null|kwargs} # optional kwargs for preprocessor {preprocessor}: {null|kwargs}...
The {preprocessor} value can be any of the preprocessor classes Scikit or DaskML. Additional custom ones are also registered in feature_transform/transform.py.
For example, the earlier spec can be rewritten in YAML as:
from feature_transform import transform, utilfrom sklearn import datasetsimport pandas as pd# convenient method to read YAMLspec = util.read('transform.yaml')# load iris dataframedata_df = pd.concat(datasets.load_iris(return_X_y=True, as_frame=True), axis=1)# transform into numpy arrays ready for modelmode2data = transform.fit_transform(spec, stage='fit', df=data_df)xs, ys = mode2data['x'], mode2data['y']# to reload the fitted transformers for validation/test, specify stage='validate' or 'test'val_df = data_df.copy()mode2val_data = transform.fit_transform(spec, stage='validate', df=val_df)val_xs, val_ys = mode2val_data['x'], mode2val_data['y']
Chain Preprocessors
To chain multiple preprocessors, simply add more steps:
dataset:transform:module:sklearnn_jobs:nulltransform:x:sepal length (cm):Log1pScaler:# custom preprocessor for np.log1pStandardScaler:sepal width (cm):Clipper:# custom preprocessor to clip valuesa_min:0a_max:10StandardScaler:petal length (cm):StandardScaler:petal width (cm):StandardScaler:y:target:OneHotEncoder:sparse:falsehandle_unknown:ignore
Specify any module
By default the config refers to classes in the preprocessing module of sklearn/dask-ml. Use dot-notation to specify other modules:
dataset:transform:module:sklearnn_jobs:nulltransform:x:a_float_column:StandardScaler:a_column_with_dict_values:feature_extraction.DictVectorizer:a_column_with_na:StandardScaler:impute.SimpleImputer:# handle na valuesstrategy:constantfill_value:-1y:a_target_column:Identity: