Source code for woodwork.serialize

import datetime
import json
import os
import tarfile
import tempfile

import pandas as pd

import woodwork as ww
from woodwork.accessor_utils import _is_dask_dataframe, _is_koalas_dataframe
from woodwork.s3_utils import get_transport_params, use_smartopen
from woodwork.type_sys.utils import (
    _get_ltype_class,
    _get_specified_ltype_params
)
from woodwork.utils import _is_s3, _is_url

SCHEMA_VERSION = '11.3.0'
FORMATS = ['csv', 'pickle', 'parquet', 'arrow', 'feather', 'orc']


[docs]def typing_info_to_dict(dataframe): """Creates the description for a Woodwork table, including typing information for each column and loading information. Args: dataframe (pd.DataFrame, dd.Dataframe, ks.DataFrame): DataFrame with Woodwork typing information initialized. Returns: dict: Dictionary containing Woodwork typing information """ if _is_dask_dataframe(dataframe): # Need to determine the category info for Dask it can be saved below category_cols = [colname for colname, col in dataframe.ww._schema.columns.items() if col.is_categorical] dataframe = dataframe.ww.categorize(columns=category_cols) ordered_columns = dataframe.columns def _get_physical_type_dict(column): type_dict = {'type': str(column.dtype)} if str(column.dtype) == 'category': type_dict['cat_values'] = column.dtype.categories.to_list() type_dict['cat_dtype'] = str(column.dtype.categories.dtype) return type_dict column_typing_info = [ {'name': col_name, 'ordinal': ordered_columns.get_loc(col_name), 'use_standard_tags': col.use_standard_tags, 'logical_type': { 'parameters': _get_specified_ltype_params(col.logical_type), 'type': str(_get_ltype_class(col.logical_type)) }, 'physical_type': _get_physical_type_dict(dataframe[col_name]), 'semantic_tags': sorted(list(col.semantic_tags)), 'description': col.description, 'origin': col.origin, 'metadata': col.metadata, } for col_name, col in dataframe.ww.columns.items() ] if _is_dask_dataframe(dataframe): table_type = 'dask' elif _is_koalas_dataframe(dataframe): table_type = 'koalas' else: table_type = 'pandas' return { 'schema_version': SCHEMA_VERSION, 'name': dataframe.ww.name, 'index': dataframe.ww.index, 'time_index': dataframe.ww.time_index, 'column_typing_info': column_typing_info, 'loading_info': { 'table_type': table_type }, 'table_metadata': dataframe.ww.metadata }
[docs]def write_woodwork_table(dataframe, path, profile_name=None, **kwargs): """Serialize Woodwork table and write to disk or S3 path. Args: dataframe (pd.DataFrame, dd.DataFrame, ks.DataFrame): DataFrame with Woodwork typing information initialized. path (str) : Location on disk to write the Woodwork table. profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials. Set to False to use an anonymous profile. kwargs (keywords) : Additional keyword arguments to pass as keywords arguments to the underlying serialization method or to specify AWS profile. """ if _is_s3(path): with tempfile.TemporaryDirectory() as tmpdir: os.makedirs(os.path.join(tmpdir, 'data')) _dump_table(dataframe, tmpdir, **kwargs) file_path = _create_archive(tmpdir) transport_params = get_transport_params(profile_name) use_smartopen(file_path, path, read=False, transport_params=transport_params) elif _is_url(path): raise ValueError("Writing to URLs is not supported") else: path = os.path.abspath(path) os.makedirs(os.path.join(path, 'data'), exist_ok=True) _dump_table(dataframe, path, **kwargs)
def _dump_table(dataframe, path, **kwargs): """Writes Woodwork table at the specified path, including both the data and the typing information.""" loading_info = write_dataframe(dataframe, path, **kwargs) typing_info = typing_info_to_dict(dataframe) typing_info['loading_info'].update(loading_info) write_typing_info(typing_info, path)
[docs]def write_typing_info(typing_info, path): """Writes Woodwork typing information to the specified path at woodwork_typing_info.json Args: typing_info (dict): Dictionary containing Woodwork typing information. """ try: file = os.path.join(path, 'woodwork_typing_info.json') with open(file, 'w') as file: json.dump(typing_info, file) except TypeError: raise TypeError('Woodwork table is not json serializable. Check table and column metadata for values that may not be serializable.')
[docs]def write_dataframe(dataframe, path, format='csv', **kwargs): """Write underlying DataFrame data to disk or S3 path. Args: dataframe (pd.DataFrame, dd.DataFrame, ks.DataFrame): DataFrame with Woodwork typing information initialized. path (str) : Location on disk to write the Woodwork table. format (str) : Format to use for writing Woodwork data. Defaults to csv. kwargs (keywords) : Additional keyword arguments to pass as keywords arguments to the underlying serialization method. Returns: dict: Information on storage location and format of data. """ format = format.lower() ww_name = dataframe.ww.name or 'data' if _is_dask_dataframe(dataframe) and format == 'csv': basename = "{}-*.{}".format(ww_name, format) else: basename = '.'.join([ww_name, format]) location = os.path.join('data', basename) file = os.path.join(path, location) if format == 'csv': # engine kwarg not needed for writing, only reading csv_kwargs = kwargs.copy() if 'engine' in csv_kwargs.keys(): del csv_kwargs['engine'] if _is_koalas_dataframe(dataframe): dataframe = dataframe.ww.copy() columns = list(dataframe.select_dtypes('object').columns) dataframe[columns] = dataframe[columns].astype(str) csv_kwargs['compression'] = str(csv_kwargs['compression']) dataframe.to_csv(file, **csv_kwargs) elif format == 'pickle': # Dask and Koalas currently do not support to_pickle if not isinstance(dataframe, pd.DataFrame): msg = 'DataFrame type not compatible with pickle serialization. Please serialize to another format.' raise ValueError(msg) dataframe.to_pickle(file, **kwargs) elif format in ['parquet', 'arrow', 'feather', 'orc']: # Latlong columns in pandas and Dask DataFrames contain tuples, which raises # an error in parquet and arrow/feather format. latlong_columns = [col_name for col_name, col in dataframe.ww.columns.items() if _get_ltype_class(col.logical_type) == ww.logical_types.LatLong] if len(latlong_columns) > 0: dataframe = dataframe.ww.copy() dataframe[latlong_columns] = dataframe[latlong_columns].astype(str) if format == 'parquet': dataframe.to_parquet(file, **kwargs) elif format == 'orc': # Serialization to orc relies on pyarrow.Table.from_pandas which doesn't work with Dask if _is_dask_dataframe(dataframe): msg = 'DataFrame type not compatible with orc serialization. Please serialize to another format.' raise ValueError(msg) save_orc_file(dataframe, file) else: dataframe.to_feather(file, **kwargs) else: error = 'must be one of the following formats: {}' raise ValueError(error.format(', '.join(FORMATS))) return {'location': location, 'type': format, 'params': kwargs}
def _create_archive(tmpdir): """When seralizing to an S3 URL, writes a tar archive.""" file_name = "ww-{date:%Y-%m-%d_%H%M%S}.tar".format(date=datetime.datetime.now()) file_path = os.path.join(tmpdir, file_name) tar = tarfile.open(str(file_path), 'w') tar.add(str(tmpdir) + '/woodwork_typing_info.json', arcname='/woodwork_typing_info.json') tar.add(str(tmpdir) + '/data', arcname='/data') tar.close() return file_path def save_orc_file(dataframe, filepath): from pyarrow import Table, orc df = dataframe.copy() for c in df: if df[c].dtype.name == 'category': df[c] = df[c].astype('string') pa_table = Table.from_pandas(df, preserve_index=False) orc.write_table(pa_table, filepath)