Source code for ccc.utils
import importlib.resources as pkg_resources
from collections import OrderedDict
import warnings
import numbers
import os
import json
import pandas as pd
PACKAGE_NAME = "ccc"
PYPI_PACKAGE_NAME = "cost-of-capital-calculator"
# Default year for model runs
DEFAULT_START_YEAR = 2024
# Year of asset data
ASSET_DATA_CSV_YEAR = 2013
# Start year for tax data (e.g. year of PUF)
RECORDS_START_YEAR = 2011
# Latest year TaxData extrapolates to
TC_LAST_YEAR = 2033
[docs]
def to_str(x):
"""
Function to decode string.
Args:
x (string): string to decode
Returns:
x (string): decoded string
"""
if hasattr(x, "decode"):
return x.decode()
return x
[docs]
def str_modified(i):
"""
Function to deal with conversion of a decimal number to a string.
Args:
i (scalar): number that will convert to string
Returns:
str_i (string): number converted to a string
"""
if i == 27.5:
str_i = "27_5"
else:
str_i = str(int(i))
return str_i
[docs]
def diff_two_tables(df1, df2):
"""
Create the difference between two dataframes.
Args:
df1 (Pandas DataFrame): first DataFrame in difference
df2 (Pandas DataFrame): second DataFrame in difference
Returns:
diff_df (Pandas DataFrame): DataFrame with differences between
two DataFrames
"""
assert tuple(df1.columns) == tuple(df2.columns)
diffs = OrderedDict()
for c in df1.columns:
try:
example = getattr(df1, c).iloc[0]
can_diff = isinstance(example, numbers.Number)
if can_diff:
diffs[c] = getattr(df1, c) - getattr(df2, c)
else:
diffs[c] = getattr(df1, c)
except AttributeError:
pass
diff_df = pd.DataFrame(diffs)
return diff_df
[docs]
def wavg(group, avg_name, weight_name):
"""
Computes a weighted average.
Args:
group (Pandas DataFrame): data for the particular grouping
avg_name (string): name of variable to compute wgt avg with
weight_name (string): name of weighting variables
Returns:
d (scalar): weighted avg for the group
"""
warnings.filterwarnings("error")
d = group[avg_name]
w = group[weight_name]
try:
return (d * w).sum() / w.sum()
except Warning:
return d.mean()
[docs]
def read_egg_csv(fname, index_col=None):
"""
Read from egg the file named fname that contains CSV data and
return pandas DataFrame containing the data.
Args:
fname (string): name of csv file
index_col (string): name of column containing index
Returns:
vdf (Pandas DataFrame): data from csv file
"""
try:
with pkg_resources.open_text(PACKAGE_NAME, fname) as file:
vdf = pd.read_csv(file, index_col=index_col)
except Exception:
raise ValueError("could not read {} data from egg".format(fname))
# cannot call read_egg_ function in unit tests
return vdf # pragma: no cover
[docs]
def read_egg_json(fname):
"""
Read from egg the file named fname that contains JSON data and
return dictionary containing the data.
Args:
fname (string): name of JSON file
Returns:
pdict (dict): data from JSON file
"""
try:
with pkg_resources.open_text(PACKAGE_NAME, fname) as file:
pdict = json.loads(file.read(), object_pairs_hook=OrderedDict)
except Exception:
raise ValueError("could not read {} data from egg".format(fname))
# cannot call read_egg_ function in unit tests
return pdict # pragma: no cover
[docs]
def json_to_dict(json_text):
"""
Convert specified JSON text into an ordered Python dictionary.
Args:
json_text (string): JSON text
Raises:
ValueError: if json_text contains a JSON syntax error
Returns:
ordered_dict (collections.OrderedDict): JSON data expressed as
an ordered Python dictionary.
"""
try:
ordered_dict = json.loads(json_text, object_pairs_hook=OrderedDict)
except ValueError as valerr:
text_lines = json_text.split("\n")
msg = "Text below contains invalid JSON:\n"
msg += str(valerr) + "\n"
msg += "Above location of the first error may be approximate.\n"
msg += "The invalid JSON text is between the lines:\n"
bline = (
"XXXX----.----1----.----2----.----3----.----4"
"----.----5----.----6----.----7"
)
msg += bline + "\n"
linenum = 0
for line in text_lines:
linenum += 1
msg += "{:04d}{}".format(linenum, line) + "\n"
msg += bline + "\n"
raise ValueError(msg)
return ordered_dict
[docs]
def save_return_table(table_df, output_type=None, path=None, precision=0):
"""
Function to save or return a table of data.
Args:
table_df (Pandas DataFrame): table
output_type (string): specifies the type of file to save
table to: 'csv', 'tex', 'excel', 'json'
path (string): specifies path to save file with table to
precision (integer): number of significant digits to print.
Defaults to 0.
Returns:
table_df (Pandas DataFrame): table
"""
if path is None:
if output_type == "tex":
tab_str = table_df.to_latex(
buf=path,
index=False,
na_rep="",
float_format=lambda x: "%." + str(precision) + "0f" % x,
)
return tab_str
elif output_type == "json":
tab_str = table_df.to_json(path_or_buf=path, double_precision=0)
return tab_str
elif output_type == "html":
with pd.option_context("display.precision", precision):
tab_html = table_df.to_html(
index=False,
float_format=lambda x: "%10.0f" % x,
classes="table table-striped table-hover",
)
return tab_html
else:
return table_df
else:
condition = (
(path.split(".")[-1] == output_type)
or (path.split(".")[-1] == "xlsx" and output_type == "excel")
or (path.split(".")[-1] == "xls" and output_type == "excel")
)
if condition:
if output_type == "tex":
table_df.to_latex(
buf=path,
index=False,
na_rep="",
float_format=lambda x: "%." + str(precision) + "0f" % x,
)
elif output_type == "csv":
table_df.to_csv(
path_or_buf=path,
index=False,
na_rep="",
float_format="%." + str(precision) + "0f",
)
elif output_type == "json":
table_df.to_json(path_or_buf=path, double_precision=precision)
elif output_type == "excel":
table_df.to_excel(
excel_writer=path,
index=False,
na_rep="",
float_format="%." + str(precision) + "0f",
)
else:
raise ValueError("Please enter a valid output format")