Source code for ccc.utils

import pkg_resources
from collections import OrderedDict
import warnings
import numbers
import os
import json
import pandas as pd

# Default year for model runs

# Year of asset data

# Start year for tax data (e.g. year of PUF)

# Latest year TaxData extrapolates to

[docs]def to_str(x): ''' Function to decode string. Args: x (string): string to decode Returns: x (string): decoded string ''' if hasattr(x, 'decode'): return x.decode() return x
[docs]def str_modified(i): ''' Function to deal with conversion of a decimal number to a string. Args: i (scalar): number that will convert to string Returns: str_i (string): number converted to a string ''' if i == 27.5: str_i = '27_5' else: str_i = str(int(i)) return str_i
[docs]def diff_two_tables(df1, df2): ''' Create the difference betweeen two dataframes. Args: df1 (Pandas DataFrame): first DataFrame in difference df2 (Pandas DataFrame): second DataFrame in difference Returns: diff_df (Pandas DataFrame): DataFrame with differences between two DataFrames ''' assert tuple(df1.columns) == tuple(df2.columns) diffs = OrderedDict() for c in df1.columns: try: example = getattr(df1, c).iloc[0] can_diff = isinstance(example, numbers.Number) if can_diff: diffs[c] = getattr(df1, c) - getattr(df2, c) else: diffs[c] = getattr(df1, c) except AttributeError: pass diff_df = pd.DataFrame(diffs) return diff_df
[docs]def wavg(group, avg_name, weight_name): ''' Computes a weighted average. Args: group (Pandas DataFrame): data for the particular grouping avg_name (string): name of variable to compute wgt avg with weight_name (string): name of weighting variables Returns: d (scalar): weighted avg for the group ''' warnings.filterwarnings('error') d = group[avg_name] w = group[weight_name] try: return (d * w).sum() / w.sum() except Warning: return d.mean()
[docs]def read_egg_csv(fname, index_col=None): ''' Read from egg the file named fname that contains CSV data and return pandas DataFrame containing the data. Args: fname (string): name of csv file index_col (string): name of column containing index Returns: vdf (Pandas DataFrame): data from csv file ''' try: path_in_egg = os.path.join('ccc', fname) vdf = pd.read_csv( pkg_resources.resource_stream( pkg_resources.Requirement.parse('ccc'), path_in_egg), index_col=index_col ) except Exception: raise ValueError('could not read {} data from egg'.format(fname)) # cannot call read_egg_ function in unit tests return vdf # pragma: no cover
[docs]def read_egg_json(fname): ''' Read from egg the file named fname that contains JSON data and return dictionary containing the data. Args: fname (string): name of JSON file Returns: pdict (dict): data from JSON file ''' try: path_in_egg = os.path.join('ccc', fname) pdict = json.loads( pkg_resources.resource_stream( pkg_resources.Requirement.parse('ccc'), path_in_egg).read().decode('utf-8'), object_pairs_hook=OrderedDict ) except Exception: raise ValueError('could not read {} data from egg'.format(fname)) # cannot call read_egg_ function in unit tests return pdict # pragma: no cover
[docs]def json_to_dict(json_text): ''' Convert specified JSON text into an ordered Python dictionary. Args: json_text (string): JSON text Raises: ValueError: if json_text contains a JSON syntax error Returns: ordered_dict (collections.OrderedDict): JSON data expressed as an ordered Python dictionary. ''' try: ordered_dict = json.loads(json_text, object_pairs_hook=OrderedDict) except ValueError as valerr: text_lines = json_text.split('\n') msg = 'Text below contains invalid JSON:\n' msg += str(valerr) + '\n' msg += 'Above location of the first error may be approximate.\n' msg += 'The invalid JSON text is between the lines:\n' bline = ('XXXX----.----1----.----2----.----3----.----4' '----.----5----.----6----.----7') msg += bline + '\n' linenum = 0 for line in text_lines: linenum += 1 msg += '{:04d}{}'.format(linenum, line) + '\n' msg += bline + '\n' raise ValueError(msg) return ordered_dict
[docs]def save_return_table(table_df, output_type=None, path=None, precision=0): ''' Function to save or return a table of data. Args: table_df (Pandas DataFrame): table output_type (string): specifies the type of file to save table to: 'csv', 'tex', 'excel', 'json' path (string): specifies path to save file with table to precision (integer): number of significant digits to print. Defaults to 0. Returns: table_df (Pandas DataFrame): table ''' if path is None: if output_type == 'tex': tab_str = table_df.to_latex( buf=path, index=False, na_rep='', float_format=lambda x: '%.' + str(precision) + '0f' % x) return tab_str elif output_type == 'json': tab_str = table_df.to_json( path_or_buf=path, double_precision=0) return tab_str elif output_type == 'html': with pd.option_context('display.precision', precision): tab_html = table_df.to_html( index=False, float_format=lambda x: '%10.0f' % x, classes="table table-striped table-hover") return tab_html else: return table_df else: condition = ( (path.split('.')[-1] == output_type) or (path.split('.')[-1] == 'xlsx' and output_type == 'excel') or (path.split('.')[-1] == 'xls' and output_type == 'excel')) if condition: if output_type == 'tex': table_df.to_latex(buf=path, index=False, na_rep='', float_format=lambda x: '%.' + str(precision) + '0f' % x) elif output_type == 'csv': table_df.to_csv(path_or_buf=path, index=False, na_rep='', float_format='%.' + str(precision) + '0f') elif output_type == 'json': table_df.to_json(path_or_buf=path, double_precision=precision) elif output_type == 'excel': table_df.to_excel(excel_writer=path, index=False, na_rep='', float_format='%.' + str(precision) + '0f') else: raise ValueError('Please enter a valid output format')