Source code for ccc.utils

import pkg_resources
from collections import OrderedDict
import warnings
import numbers
import os
import json
import pandas as pd

PACKAGE_NAME = "ccc"
PYPI_PACKAGE_NAME = "cost-of-capital-calculator"

# Default year for model runs
DEFAULT_START_YEAR = 2024

# Year of asset data
ASSET_DATA_CSV_YEAR = 2013

# Start year for tax data (e.g. year of PUF)
RECORDS_START_YEAR = 2011

# Latest year TaxData extrapolates to
TC_LAST_YEAR = 2033


[docs] def to_str(x): """ Function to decode string. Args: x (string): string to decode Returns: x (string): decoded string """ if hasattr(x, "decode"): return x.decode() return x
[docs] def str_modified(i): """ Function to deal with conversion of a decimal number to a string. Args: i (scalar): number that will convert to string Returns: str_i (string): number converted to a string """ if i == 27.5: str_i = "27_5" else: str_i = str(int(i)) return str_i
[docs] def diff_two_tables(df1, df2): """ Create the difference between two dataframes. Args: df1 (Pandas DataFrame): first DataFrame in difference df2 (Pandas DataFrame): second DataFrame in difference Returns: diff_df (Pandas DataFrame): DataFrame with differences between two DataFrames """ assert tuple(df1.columns) == tuple(df2.columns) diffs = OrderedDict() for c in df1.columns: try: example = getattr(df1, c).iloc[0] can_diff = isinstance(example, numbers.Number) if can_diff: diffs[c] = getattr(df1, c) - getattr(df2, c) else: diffs[c] = getattr(df1, c) except AttributeError: pass diff_df = pd.DataFrame(diffs) return diff_df
[docs] def wavg(group, avg_name, weight_name): """ Computes a weighted average. Args: group (Pandas DataFrame): data for the particular grouping avg_name (string): name of variable to compute wgt avg with weight_name (string): name of weighting variables Returns: d (scalar): weighted avg for the group """ warnings.filterwarnings("error") d = group[avg_name] w = group[weight_name] try: return (d * w).sum() / w.sum() except Warning: return d.mean()
[docs] def read_egg_csv(fname, index_col=None): """ Read from egg the file named fname that contains CSV data and return pandas DataFrame containing the data. Args: fname (string): name of csv file index_col (string): name of column containing index Returns: vdf (Pandas DataFrame): data from csv file """ # try: path_in_egg = os.path.join(PACKAGE_NAME, fname) try: vdf = pd.read_csv( pkg_resources.resource_stream( pkg_resources.Requirement.parse(PYPI_PACKAGE_NAME), path_in_egg ), index_col=index_col, ) except Exception: raise ValueError("could not read {} data from egg".format(fname)) # cannot call read_egg_ function in unit tests return vdf # pragma: no cover
[docs] def read_egg_json(fname): """ Read from egg the file named fname that contains JSON data and return dictionary containing the data. Args: fname (string): name of JSON file Returns: pdict (dict): data from JSON file """ try: path_in_egg = os.path.join(PACKAGE_NAME, fname) pdict = json.loads( pkg_resources.resource_stream( pkg_resources.Requirement.parse(PYPI_PACKAGE_NAME), path_in_egg ) .read() .decode("utf-8"), object_pairs_hook=OrderedDict, ) except Exception: raise ValueError("could not read {} data from egg".format(fname)) # cannot call read_egg_ function in unit tests return pdict # pragma: no cover
[docs] def json_to_dict(json_text): """ Convert specified JSON text into an ordered Python dictionary. Args: json_text (string): JSON text Raises: ValueError: if json_text contains a JSON syntax error Returns: ordered_dict (collections.OrderedDict): JSON data expressed as an ordered Python dictionary. """ try: ordered_dict = json.loads(json_text, object_pairs_hook=OrderedDict) except ValueError as valerr: text_lines = json_text.split("\n") msg = "Text below contains invalid JSON:\n" msg += str(valerr) + "\n" msg += "Above location of the first error may be approximate.\n" msg += "The invalid JSON text is between the lines:\n" bline = ( "XXXX----.----1----.----2----.----3----.----4" "----.----5----.----6----.----7" ) msg += bline + "\n" linenum = 0 for line in text_lines: linenum += 1 msg += "{:04d}{}".format(linenum, line) + "\n" msg += bline + "\n" raise ValueError(msg) return ordered_dict
[docs] def save_return_table(table_df, output_type=None, path=None, precision=0): """ Function to save or return a table of data. Args: table_df (Pandas DataFrame): table output_type (string): specifies the type of file to save table to: 'csv', 'tex', 'excel', 'json' path (string): specifies path to save file with table to precision (integer): number of significant digits to print. Defaults to 0. Returns: table_df (Pandas DataFrame): table """ if path is None: if output_type == "tex": tab_str = table_df.to_latex( buf=path, index=False, na_rep="", float_format=lambda x: "%." + str(precision) + "0f" % x, ) return tab_str elif output_type == "json": tab_str = table_df.to_json(path_or_buf=path, double_precision=0) return tab_str elif output_type == "html": with pd.option_context("display.precision", precision): tab_html = table_df.to_html( index=False, float_format=lambda x: "%10.0f" % x, classes="table table-striped table-hover", ) return tab_html else: return table_df else: condition = ( (path.split(".")[-1] == output_type) or (path.split(".")[-1] == "xlsx" and output_type == "excel") or (path.split(".")[-1] == "xls" and output_type == "excel") ) if condition: if output_type == "tex": table_df.to_latex( buf=path, index=False, na_rep="", float_format=lambda x: "%." + str(precision) + "0f" % x, ) elif output_type == "csv": table_df.to_csv( path_or_buf=path, index=False, na_rep="", float_format="%." + str(precision) + "0f", ) elif output_type == "json": table_df.to_json(path_or_buf=path, double_precision=precision) elif output_type == "excel": table_df.to_excel( excel_writer=path, index=False, na_rep="", float_format="%." + str(precision) + "0f", ) else: raise ValueError("Please enter a valid output format")