import datetime
import json
import os
import tarfile
import tempfile
import pandas as pd
import woodwork as ww
from woodwork.accessor_utils import _is_dask_dataframe, _is_koalas_dataframe
from woodwork.s3_utils import get_transport_params, use_smartopen
from woodwork.type_sys.utils import _get_ltype_class, _get_specified_ltype_params
from woodwork.utils import _is_s3, _is_url
SCHEMA_VERSION = "11.3.0"
FORMATS = ["csv", "pickle", "parquet", "arrow", "feather", "orc"]
[docs]def typing_info_to_dict(dataframe):
"""Creates the description for a Woodwork table, including typing information for each column
and loading information.
Args:
dataframe (pd.DataFrame, dd.Dataframe, ks.DataFrame): DataFrame with Woodwork typing
information initialized.
Returns:
dict: Dictionary containing Woodwork typing information
"""
if _is_dask_dataframe(dataframe):
# Need to determine the category info for Dask it can be saved below
category_cols = [
colname
for colname, col in dataframe.ww._schema.columns.items()
if col.is_categorical
]
dataframe = dataframe.ww.categorize(columns=category_cols)
ordered_columns = dataframe.columns
def _get_physical_type_dict(column):
type_dict = {"type": str(column.dtype)}
if str(column.dtype) == "category":
type_dict["cat_values"] = column.dtype.categories.to_list()
type_dict["cat_dtype"] = str(column.dtype.categories.dtype)
return type_dict
column_typing_info = [
{
"name": col_name,
"ordinal": ordered_columns.get_loc(col_name),
"use_standard_tags": col.use_standard_tags,
"logical_type": {
"parameters": _get_specified_ltype_params(col.logical_type),
"type": str(_get_ltype_class(col.logical_type)),
},
"physical_type": _get_physical_type_dict(dataframe[col_name]),
"semantic_tags": sorted(list(col.semantic_tags)),
"description": col.description,
"origin": col.origin,
"metadata": col.metadata,
}
for col_name, col in dataframe.ww.columns.items()
]
if _is_dask_dataframe(dataframe):
table_type = "dask"
elif _is_koalas_dataframe(dataframe):
table_type = "koalas"
else:
table_type = "pandas"
return {
"schema_version": SCHEMA_VERSION,
"name": dataframe.ww.name,
"index": dataframe.ww.index,
"time_index": dataframe.ww.time_index,
"column_typing_info": column_typing_info,
"loading_info": {"table_type": table_type},
"table_metadata": dataframe.ww.metadata,
}
[docs]def write_woodwork_table(dataframe, path, profile_name=None, **kwargs):
"""Serialize Woodwork table and write to disk or S3 path.
Args:
dataframe (pd.DataFrame, dd.DataFrame, ks.DataFrame): DataFrame with Woodwork typing information initialized.
path (str) : Location on disk to write the Woodwork table.
profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials.
Set to False to use an anonymous profile.
kwargs (keywords) : Additional keyword arguments to pass as keywords arguments to the underlying serialization method or to specify AWS profile.
"""
if _is_s3(path):
with tempfile.TemporaryDirectory() as tmpdir:
os.makedirs(os.path.join(tmpdir, "data"))
_dump_table(dataframe, tmpdir, **kwargs)
file_path = _create_archive(tmpdir)
transport_params = get_transport_params(profile_name)
use_smartopen(
file_path, path, read=False, transport_params=transport_params
)
elif _is_url(path):
raise ValueError("Writing to URLs is not supported")
else:
path = os.path.abspath(path)
os.makedirs(os.path.join(path, "data"), exist_ok=True)
_dump_table(dataframe, path, **kwargs)
def _dump_table(dataframe, path, **kwargs):
"""Writes Woodwork table at the specified path, including both the data and the typing information."""
loading_info = write_dataframe(dataframe, path, **kwargs)
typing_info = typing_info_to_dict(dataframe)
typing_info["loading_info"].update(loading_info)
write_typing_info(typing_info, path)
[docs]def write_typing_info(typing_info, path):
"""Writes Woodwork typing information to the specified path at woodwork_typing_info.json
Args:
typing_info (dict): Dictionary containing Woodwork typing information.
"""
try:
file = os.path.join(path, "woodwork_typing_info.json")
with open(file, "w") as file:
json.dump(typing_info, file)
except TypeError:
raise TypeError(
"Woodwork table is not json serializable. Check table and column metadata for values that may not be serializable."
)
[docs]def write_dataframe(dataframe, path, format="csv", **kwargs):
"""Write underlying DataFrame data to disk or S3 path.
Args:
dataframe (pd.DataFrame, dd.DataFrame, ks.DataFrame): DataFrame with Woodwork typing information initialized.
path (str) : Location on disk to write the Woodwork table.
format (str) : Format to use for writing Woodwork data. Defaults to csv.
kwargs (keywords) : Additional keyword arguments to pass as keywords arguments to the underlying serialization method.
Returns:
dict: Information on storage location and format of data.
"""
format = format.lower()
ww_name = dataframe.ww.name or "data"
if _is_dask_dataframe(dataframe) and format == "csv":
basename = "{}-*.{}".format(ww_name, format)
else:
basename = ".".join([ww_name, format])
location = os.path.join("data", basename)
file = os.path.join(path, location)
if format == "csv":
# engine kwarg not needed for writing, only reading
csv_kwargs = kwargs.copy()
if "engine" in csv_kwargs.keys():
del csv_kwargs["engine"]
if _is_koalas_dataframe(dataframe):
dataframe = dataframe.ww.copy()
columns = list(dataframe.select_dtypes("object").columns)
dataframe[columns] = dataframe[columns].astype(str)
csv_kwargs["compression"] = str(csv_kwargs["compression"])
dataframe.to_csv(file, **csv_kwargs)
elif format == "pickle":
# Dask and Koalas currently do not support to_pickle
if not isinstance(dataframe, pd.DataFrame):
msg = "DataFrame type not compatible with pickle serialization. Please serialize to another format."
raise ValueError(msg)
dataframe.to_pickle(file, **kwargs)
elif format in ["parquet", "arrow", "feather", "orc"]:
# Latlong columns in pandas and Dask DataFrames contain tuples, which raises
# an error in parquet and arrow/feather format.
latlong_columns = [
col_name
for col_name, col in dataframe.ww.columns.items()
if _get_ltype_class(col.logical_type) == ww.logical_types.LatLong
]
if len(latlong_columns) > 0:
dataframe = dataframe.ww.copy()
dataframe[latlong_columns] = dataframe[latlong_columns].astype(str)
if format == "parquet":
dataframe.to_parquet(file, **kwargs)
elif format == "orc":
# Serialization to orc relies on pyarrow.Table.from_pandas which doesn't work with Dask
if _is_dask_dataframe(dataframe):
msg = "DataFrame type not compatible with orc serialization. Please serialize to another format."
raise ValueError(msg)
save_orc_file(dataframe, file)
else:
dataframe.to_feather(file, **kwargs)
else:
error = "must be one of the following formats: {}"
raise ValueError(error.format(", ".join(FORMATS)))
return {"location": location, "type": format, "params": kwargs}
def _create_archive(tmpdir):
"""When seralizing to an S3 URL, writes a tar archive."""
file_name = "ww-{date:%Y-%m-%d_%H%M%S}.tar".format(date=datetime.datetime.now())
file_path = os.path.join(tmpdir, file_name)
tar = tarfile.open(str(file_path), "w")
tar.add(
str(tmpdir) + "/woodwork_typing_info.json", arcname="/woodwork_typing_info.json"
)
tar.add(str(tmpdir) + "/data", arcname="/data")
tar.close()
return file_path
def save_orc_file(dataframe, filepath):
from pyarrow import Table, orc
df = dataframe.copy()
for c in df:
if df[c].dtype.name == "category":
df[c] = df[c].astype("string")
pa_table = Table.from_pandas(df, preserve_index=False)
orc.write_table(pa_table, filepath)