Using Woodwork with Dask and Spark DataFrames

Woodwork allows you to add custom typing information to Dask DataFrames or Spark DataFrames when working with datasets that are too large to easily fit in memory. Although initializing Woodwork on a Dask or Spark DataFrame follows the same process as you follow when initializing on a pandas DataFrame, there are a few limitations to be aware of. This guide provides a brief overview of using Woodwork with a Dask or Spark DataFrame. Along the way, the guide highlights several key items to keep in mind when using a Dask or Spark DataFrame as input.

Using Woodwork with either Dask or Spark requires the installation of the Dask or Spark libraries respectively. These libraries can be installed directly with these commands:

python -m pip install "woodwork[dask]"
python -m pip install "woodwork[spark]"

Dask DataFrame Example

Create a Dask DataFrame to use in our example. Normally you create the DataFrame directly by reading in the data from saved files, but you will create it from a demo pandas DataFrame.

[1]:
import dask.dataframe as dd
import woodwork as ww

df_pandas = ww.demo.load_retail(nrows=1000, init_woodwork=False)
df_dask = dd.from_pandas(df_pandas, npartitions=10)
df_dask
[1]:
Dask DataFrame Structure:
order_product_id order_id product_id description quantity order_date unit_price customer_name country total cancelled
npartitions=10
0 int64 object object object int64 datetime64[ns] float64 object object float64 bool
100 ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ...
900 ... ... ... ... ... ... ... ... ... ... ...
999 ... ... ... ... ... ... ... ... ... ... ...
Dask Name: from_pandas, 10 tasks

Now that you have a Dask DataFrame, you can use it to create a Woodwork DataFrame, just as you would with a pandas DataFrame:

[2]:
df_dask.ww.init(index='order_product_id')
df_dask.ww
/home/docs/checkouts/readthedocs.org/user_builds/feature-labs-inc-datatables/envs/v0.16.3/lib/python3.8/site-packages/dask/dataframe/core.py:7377: UserWarning: Insufficient elements for `head`. 100000 elements requested, only 100 elements available. Try passing larger `npartitions` to `head`.
  warnings.warn(
/home/docs/checkouts/readthedocs.org/user_builds/feature-labs-inc-datatables/envs/v0.16.3/lib/python3.8/site-packages/dask/dataframe/core.py:7377: UserWarning: Insufficient elements for `head`. 100000 elements requested, only 100 elements available. Try passing larger `npartitions` to `head`.
  warnings.warn(
/home/docs/checkouts/readthedocs.org/user_builds/feature-labs-inc-datatables/envs/v0.16.3/lib/python3.8/site-packages/dask/dataframe/core.py:7377: UserWarning: Insufficient elements for `head`. 100000 elements requested, only 100 elements available. Try passing larger `npartitions` to `head`.
  warnings.warn(
/home/docs/checkouts/readthedocs.org/user_builds/feature-labs-inc-datatables/envs/v0.16.3/lib/python3.8/site-packages/dask/dataframe/core.py:7377: UserWarning: Insufficient elements for `head`. 100000 elements requested, only 100 elements available. Try passing larger `npartitions` to `head`.
  warnings.warn(
/home/docs/checkouts/readthedocs.org/user_builds/feature-labs-inc-datatables/envs/v0.16.3/lib/python3.8/site-packages/dask/dataframe/core.py:7377: UserWarning: Insufficient elements for `head`. 100000 elements requested, only 100 elements available. Try passing larger `npartitions` to `head`.
  warnings.warn(
/home/docs/checkouts/readthedocs.org/user_builds/feature-labs-inc-datatables/envs/v0.16.3/lib/python3.8/site-packages/dask/dataframe/core.py:7377: UserWarning: Insufficient elements for `head`. 100000 elements requested, only 100 elements available. Try passing larger `npartitions` to `head`.
  warnings.warn(
[2]:
Physical Type Logical Type Semantic Tag(s)
Column
order_product_id int64 Integer ['index']
order_id category Categorical ['category']
product_id string Unknown []
description string NaturalLanguage []
quantity int64 Integer ['numeric']
order_date datetime64[ns] Datetime []
unit_price float64 Double ['numeric']
customer_name category Categorical ['category']
country category Categorical ['category']
total float64 Double ['numeric']
cancelled bool Boolean []

As you can see from the output above, Woodwork was initialized successfully, and logical type inference was performed for all of the columns.

However, that illustrates one of the key issues in working with Dask DataFrames. In order to perform logical type inference, Woodwork needs to bring the data into memory so it can be analyzed. Currently, Woodwork reads data from the first partition of data only, and then uses this data for type inference. Depending on the complexity of the data, this could be a time consuming operation. Additionally, if the first partition is not representative of the entire dataset, the logical types for some columns may be inferred incorrectly.

Skipping or Overriding Type Inference

If this process takes too much time, or if the logical types are not inferred correctly, you can manually specify the logical types for each column. If the logical type for a column is specified, type inference for that column will be skipped. If logical types are specified for all columns, logical type inference will be skipped completely and Woodwork will not need to bring any of the data into memory during initialization.

To skip logical type inference completely or to correct type inference issues, define a logical types dictionary with the correct logical type defined for each column in the DataFrame, then pass that dictionary to the initialization call.

[3]:
logical_types = {
    'order_product_id': 'Integer',
    'order_id': 'Categorical',
    'product_id': 'Categorical',
    'description': 'NaturalLanguage',
    'quantity': 'Integer',
    'order_date': 'Datetime',
    'unit_price': 'Double',
    'customer_name': 'PersonFullName',
    'country': 'Categorical',
    'total': 'Double',
    'cancelled': 'Boolean',
}

df_dask.ww.init(index='order_product_id', logical_types=logical_types)
df_dask.ww
[3]:
Physical Type Logical Type Semantic Tag(s)
Column
order_product_id int64 Integer ['index']
order_id category Categorical ['category']
product_id category Categorical ['category']
description string NaturalLanguage []
quantity int64 Integer ['numeric']
order_date datetime64[ns] Datetime []
unit_price float64 Double ['numeric']
customer_name string PersonFullName []
country category Categorical ['category']
total float64 Double ['numeric']
cancelled bool Boolean []

DataFrame Statistics

There are some Woodwork methods that require bringing the underlying Dask DataFrame into memory: describe, value_counts and mutual_information. When called, these methods will call a compute operation on the DataFrame to calculate the desired information. This might be problematic for datasets that cannot fit in memory, so exercise caution when using these methods.

[4]:
df_dask.ww.describe(include=['numeric'])
[4]:
quantity unit_price total
physical_type int64 float64 float64
logical_type Integer Double Double
semantic_tags {numeric} {numeric} {numeric}
count 1000.0 1000.0 1000.0
nunique 43.0 61.0 232.0
nan_count 0 0 0
mean 12.735 5.003658 40.390465
mode 1 2.0625 24.75
std 38.401634 9.73817 123.99357
min -24.0 0.165 -68.31
first_quartile 2.0 2.0625 5.709
second_quartile 4.0 3.34125 17.325
third_quartile 12.0 6.1875 33.165
max 600.0 272.25 2684.88
num_true NaN NaN NaN
num_false NaN NaN NaN
[5]:
df_dask.ww.value_counts()
[5]:
{'order_id': [{'value': '536464', 'count': 81},
  {'value': '536520', 'count': 71},
  {'value': '536412', 'count': 68},
  {'value': '536401', 'count': 64},
  {'value': '536415', 'count': 59},
  {'value': '536409', 'count': 54},
  {'value': '536408', 'count': 48},
  {'value': '536381', 'count': 35},
  {'value': '536488', 'count': 34},
  {'value': '536446', 'count': 31}],
 'product_id': [{'value': '22632', 'count': 11},
  {'value': '85123A', 'count': 10},
  {'value': '22633', 'count': 10},
  {'value': '22961', 'count': 9},
  {'value': '84029E', 'count': 9},
  {'value': '22866', 'count': 7},
  {'value': '84879', 'count': 7},
  {'value': '22960', 'count': 7},
  {'value': '21212', 'count': 7},
  {'value': '22197', 'count': 7}],
 'country': [{'value': 'United Kingdom', 'count': 964},
  {'value': 'France', 'count': 20},
  {'value': 'Australia', 'count': 14},
  {'value': 'Netherlands', 'count': 2}]}
[6]:
df_dask.ww.mutual_information().head()
[6]:
column_1 column_2 mutual_info
0 order_id order_date 0.754317
1 quantity total 0.267705
2 order_id quantity 0.212196
3 product_id unit_price 0.209396
4 order_id total 0.189810

Spark DataFrame Example

As above, first create a Spark DataFrame to use in our example. Normally you create the DataFrame directly by reading in the data from saved files, but here you create it from a demo pandas DataFrame.

[8]:
import pyspark.pandas as ps

df_spark = ps.from_pandas(df_pandas)
df_spark.head()

[8]:
order_product_id order_id product_id description quantity order_date unit_price customer_name country total cancelled
0 0 536365 85123A WHITE HANGING HEART T-LIGHT HOLDER 6 2010-12-01 08:26:00 4.2075 Andrea Brown United Kingdom 25.245 False
1 1 536365 71053 WHITE METAL LANTERN 6 2010-12-01 08:26:00 5.5935 Andrea Brown United Kingdom 33.561 False
2 2 536365 84406B CREAM CUPID HEARTS COAT HANGER 8 2010-12-01 08:26:00 4.5375 Andrea Brown United Kingdom 36.300 False
3 3 536365 84029G KNITTED UNION FLAG HOT WATER BOTTLE 6 2010-12-01 08:26:00 5.5935 Andrea Brown United Kingdom 33.561 False
4 4 536365 84029E RED WOOLLY HOTTIE WHITE HEART. 6 2010-12-01 08:26:00 5.5935 Andrea Brown United Kingdom 33.561 False

Now that you have a Spark DataFrame, you can initialize Woodwork, just as you would with a pandas DataFrame:

[9]:
df_spark.ww.init(index='order_product_id')
df_spark.ww
[9]:
Physical Type Logical Type Semantic Tag(s)
Column
order_product_id int64 Integer ['index']
order_id string Categorical ['category']
product_id string Unknown []
description string NaturalLanguage []
quantity int64 Integer ['numeric']
order_date datetime64[ns] Datetime []
unit_price float64 Double ['numeric']
customer_name string Categorical ['category']
country string Categorical ['category']
total float64 Double ['numeric']
cancelled bool Boolean []

As you can see from the output above, Woodwork has been initialized successfully, and logical type inference was performed for all of the columns.

Notes on Spark Dtype Conversions

In the types table above, one important thing to notice is that the physical types for the Spark DataFrame are different than the physical types for the Dask DataFrame. The reason for this is that Spark does not support the category dtype that is available with pandas and Dask.

When Woodwork is initialized, the dtype of the DataFrame columns are converted to a set of standard dtypes, defined by the LogicalType primary_dtype property. By default, Woodwork uses the category dtype for any categorical logical types, but this is not available with Spark.

For LogicalTypes that have primary_dtype properties that are not compatible with Spark, Woodwork will try to convert the column dtype, but will be unsuccessful. At that point, Woodwork will use a backup dtype that is compatible with Spark. The implication of this is that using Woodwork with a Spark DataFrame may result in dtype values that are different than the values you would get when working with an otherwise identical pandas DataFrame.

Since Spark does not support the category dtype, any column that is inferred or specified with a logical type of Categorical will have its values converted to strings and stored with a dtype of string. This means that a categorical column containing numeric values, will be converted into the equivalent string values.

Finally, Spark does not support the timedelta64[ns] dtype. For this, there is not a clean backup dtype, so the use of Timedelta LogicalType is not supported with Spark DataFrames.

Skipping or Overriding Type Inference

As with Dask, Woodwork must bring the data into memory so it can be analyzed for type inference. Currently, Woodwork reads the first 100,000 rows of data to use for type inference when using a Spark DataFrame as input. If the first 100,000 rows are not representative of the entire dataset, the logical types for some columns might be inferred incorrectly.

To skip logical type inference completely or to correct type inference issues, define a logical types dictionary with the correct logical type defined for each column in the dataframe.

[10]:
logical_types = {
    'order_product_id': 'Integer',
    'order_id': 'Categorical',
    'product_id': 'Categorical',
    'description': 'NaturalLanguage',
    'quantity': 'Integer',
    'order_date': 'Datetime',
    'unit_price': 'Double',
    'customer_name': 'PersonFullName',
    'country': 'Categorical',
    'total': 'Double',
    'cancelled': 'Boolean',
}

df_spark.ww.init(index='order_product_id', logical_types=logical_types)
df_spark.ww
[10]:
Physical Type Logical Type Semantic Tag(s)
Column
order_product_id int64 Integer ['index']
order_id string Categorical ['category']
product_id string Categorical ['category']
description string NaturalLanguage []
quantity int64 Integer ['numeric']
order_date datetime64[ns] Datetime []
unit_price float64 Double ['numeric']
customer_name string PersonFullName []
country string Categorical ['category']
total float64 Double ['numeric']
cancelled bool Boolean []

DataFrame Statistics

As with Dask, running describe, value_counts or mutual_information requires bringing the data into memory to perform the analysis. When called, these methods will call a to_pandas operation on the DataFrame to calculate the desired information. This may be problematic for very large datasets, so exercise caution when using these methods.

[11]:
df_spark.ww.describe(include=['numeric'])
[11]:
quantity unit_price total
physical_type int64 float64 float64
logical_type Integer Double Double
semantic_tags {numeric} {numeric} {numeric}
count 1000.0 1000.0 1000.0
nunique 43.0 61.0 232.0
nan_count 0 0 0
mean 12.735 5.003658 40.390465
mode 1 2.0625 24.75
std 38.401634 9.73817 123.99357
min -24.0 0.165 -68.31
first_quartile 2.0 2.0625 5.709
second_quartile 4.0 3.34125 17.325
third_quartile 12.0 6.1875 33.165
max 600.0 272.25 2684.88
num_true NaN NaN NaN
num_false NaN NaN NaN
[12]:
df_spark.ww.value_counts()
[12]:
{'order_id': [{'value': '536464', 'count': 81},
  {'value': '536520', 'count': 71},
  {'value': '536412', 'count': 68},
  {'value': '536401', 'count': 64},
  {'value': '536415', 'count': 59},
  {'value': '536409', 'count': 54},
  {'value': '536408', 'count': 48},
  {'value': '536381', 'count': 35},
  {'value': '536488', 'count': 34},
  {'value': '536446', 'count': 31}],
 'product_id': [{'value': '22632', 'count': 11},
  {'value': '22633', 'count': 10},
  {'value': '85123A', 'count': 10},
  {'value': '84029E', 'count': 9},
  {'value': '22961', 'count': 9},
  {'value': '22866', 'count': 7},
  {'value': '22197', 'count': 7},
  {'value': '22960', 'count': 7},
  {'value': '21212', 'count': 7},
  {'value': '84879', 'count': 7}],
 'country': [{'value': 'United Kingdom', 'count': 964},
  {'value': 'France', 'count': 20},
  {'value': 'Australia', 'count': 14},
  {'value': 'Netherlands', 'count': 2}]}
[13]:
df_spark.ww.mutual_information().head()
[13]:
column_1 column_2 mutual_info
0 order_id order_date 0.754317
1 quantity total 0.267705
2 order_id quantity 0.212196
3 product_id unit_price 0.209396
4 order_id total 0.189810

Data Validation Limitations

Woodwork performs several validation checks to confirm that the data in the DataFrame is appropriate for the specified parameters. Because some of these validation steps would require pulling the data into memory, they are skipped when using Woodwork with a Dask or Spark DataFrame. This section provides an overview of the validation checks that are performed with pandas input but skipped with Dask or Spark input.

Index Uniqueness

Normally a check is performed to verify that any column specified as the index contains no duplicate values. With Dask or Spark input, this check is skipped and you must manually verify that any column specified as an index column contains unique values.

Data Consistency with LogicalType (Dask Only)

If you manually define the LogicalType for a column when initializing Woodwork, a check is performed to verify that the data in that column is appropriate for the specified LogicalType. For example, with pandas input if you specify a LogicalType of Double for a column that contains letters such as ['a', 'b', 'c'], an error is raised because it is not possible to convert the letters into numeric values with the float dtype associated with the Double LogicalType.

With Dask input, no such error appears at the time initialization. However, behind the scenes, Woodwork attempts to convert the column physical type to float, and this conversion is added to the Dask task graph, without raising an error. However, an error is raised if a compute operation is called on the DataFrame as Dask attempts to execute the conversion step. Extra care should be taken when using Dask input to make sure any specified logical types are consistent with the data in the columns to avoid this type of error.

Ordinal Order Values Check

For the Ordinal LogicalType, a check is typically performed to make sure that the data column does not contain any values that are not present in the defined order values. This check will not be performed with Dask or Spark input. Users should manually verify that the defined order values are complete to avoid unexpected results.

Other Limitations

Reading from CSV Files

Woodwork provides the ability to read data directly from a CSV file into a Woodwork DataFrame. The helper function used for this, woodwork.read_file, currently only reads the data into a pandas DataFrame. At some point, this limitation may be removed, allowing data to be read into a Dask or Spark DataFrame. For now, only pandas DataFrames can be created with this function.

Sorting DataFrame on Time Index

When initializing with a time index, Woodwork, by default, will sort the input DataFrame first on the time index and then on the index, if specified. Because sorting a distributed DataFrame is a computationally expensive operation, this sorting is performed only when using a pandas DataFrame. If a sorted DataFrame is needed when using a Dask or Spark, the user should manually sort the DataFrame as needed.

Equality of Woodwork DataFrames

In order to avoid bringing a Dask DataFrame into memory, Woodwoork does not consider the equality of the data when checking whether Woodwork Dataframe initialized from a Dask or Spark DataFrame is equal to another Woodwork DataFrame. This means that two DataFrames with identical names, columns, indices, semantic tags, and LogicalTypes but different underlying data will be treated as equal if at least one of them uses Dask or Spark.

LatLong Columns

When working with the LatLong logical type, Woodwork converts all LatLong columns to a standard format of a tuple of floats for Dask DataFrames and a list of floats for Spark DataFrames. In order to do this, the data is read into memory, which may be problematic for large datatsets.

Integer Column Names

Woodwork allows column names of any format that is supported by the DataFrame. However, Dask DataFrames do not currently support integer column names.

Setting DataFrame Index

When specifying a Woodwork index with a pandas DataFrame, the underlying index of the DataFrame will be updated to match the column specified as the Woodwork index. When specifying a Woodwork index on a Dask or Spark DataFrame, however, the underlying index will remain unchanged.