"""
------------------------------------------------------------------------
This program extracts tax rate and income data from the microsimulation
model (Tax-Calculator).
------------------------------------------------------------------------
"""
from taxcalc import Records, Calculator, Policy
from pandas import DataFrame
from dask import delayed, compute
import dask.multiprocessing
import numpy as np
import os
import pickle
import pkg_resources
from ogcore import utils
from ogusa.constants import DEFAULT_START_YEAR, TC_LAST_YEAR, PUF_START_YEAR
CUR_PATH = os.path.split(os.path.abspath(__file__))[0]
[docs]
def get_calculator(
baseline,
calculator_start_year,
reform=None,
data=None,
gfactors=None,
weights=None,
records_start_year=PUF_START_YEAR,
):
"""
This function creates the tax calculator object with the policy
specified in reform and the data specified with the data kwarg.
Args:
baseline (boolean): True if baseline tax policy
calculator_start_year (int): first year of budget window
reform (dictionary): IIT policy reform parameters, None if
baseline
data (DataFrame or str): DataFrame or path to datafile for
Records object
gfactors (Tax-Calculator GrowthFactors object): growth factors
to use to extrapolate data over budget window
weights (DataFrame): weights for Records object
records_start_year (int): the start year for the data and
weights dfs (default is set to the PUF start year as defined
in the Tax-Calculator project)
Returns:
calc1 (Tax-Calculator Calculator object): Calculator object with
current_year equal to calculator_start_year
"""
# create a calculator
policy1 = Policy()
if data is not None and "cps" in data:
records1 = Records.cps_constructor()
# impute short and long term capital gains if using CPS data
# in 2012 SOI data 6.587% of CG as short-term gains
records1.p22250 = 0.06587 * records1.e01100
records1.p23250 = (1 - 0.06587) * records1.e01100
# set total capital gains to zero
records1.e01100 = np.zeros(records1.e01100.shape[0])
elif data is not None: # pragma: no cover
records1 = Records(
data=data,
gfactors=gfactors,
weights=weights,
start_year=records_start_year,
) # pragma: no cover
else: # pragma: no cover
records1 = Records() # pragma: no cover
if baseline:
if not reform:
print("Running current law policy baseline")
else:
print("Baseline policy is: ", reform)
else:
if not reform:
print("Running with current law as reform")
else:
print("Reform policy is: ", reform)
print("TYPE", type(reform))
policy1.implement_reform(reform)
# the default set up increments year to 2013
calc1 = Calculator(records=records1, policy=policy1)
# Check that start_year is appropriate
if calculator_start_year > TC_LAST_YEAR:
raise RuntimeError("Start year is beyond data extrapolation.")
return calc1
[docs]
def get_data(
baseline=False,
start_year=DEFAULT_START_YEAR,
reform={},
data=None,
path=CUR_PATH,
client=None,
num_workers=1,
):
"""
This function creates dataframes of micro data with marginal tax
rates and information to compute effective tax rates from the
Tax-Calculator output. The resulting dictionary of dataframes is
returned and saved to disk in a pickle file.
Args:
baseline (boolean): True if baseline tax policy
calculator_start_year (int): first year of budget window
reform (dictionary): IIT policy reform parameters, None if
baseline
data (DataFrame or str): DataFrame or path to datafile for
Records object
path (str): path to save microdata files to
client (Dask Client object): client for Dask multiprocessing
num_workers (int): number of workers to use for Dask
multiprocessing
Returns:
micro_data_dict (dict): dict of Pandas Dataframe, one for each
year from start_year to the maximum year Tax-Calculator can
analyze
taxcalc_version (str): version of Tax-Calculator used
"""
# Compute MTRs and taxes or each year, but not beyond TC_LAST_YEAR
lazy_values = []
for year in range(start_year, TC_LAST_YEAR + 1):
lazy_values.append(
delayed(taxcalc_advance)(baseline, start_year, reform, data, year)
)
if client: # pragma: no cover
futures = client.compute(lazy_values, num_workers=num_workers)
results = client.gather(futures)
else:
results = results = compute(
*lazy_values,
scheduler=dask.multiprocessing.get,
num_workers=num_workers,
)
# dictionary of data frames to return
micro_data_dict = {}
for i, result in enumerate(results):
year = start_year + i
micro_data_dict[str(year)] = DataFrame(result)
if baseline:
pkl_path = os.path.join(path, "micro_data_baseline.pkl")
else:
pkl_path = os.path.join(path, "micro_data_policy.pkl")
utils.mkdirs(path)
with open(pkl_path, "wb") as f:
pickle.dump(micro_data_dict, f)
# Do some garbage collection
del results
# Pull Tax-Calc version for reference
taxcalc_version = pkg_resources.get_distribution("taxcalc").version
return micro_data_dict, taxcalc_version
[docs]
def taxcalc_advance(baseline, start_year, reform, data, year):
"""
This function advances the year used in Tax-Calculator, compute
taxes and rates, and save the results to a dictionary.
Args:
calc1 (Tax-Calculator Calculator object): TC calculator
year (int): year to begin advancing from
Returns:
tax_dict (dict): a dictionary of microdata with marginal tax
rates and other information computed in TC
"""
calc1 = get_calculator(
baseline=baseline,
calculator_start_year=start_year,
reform=reform,
data=data,
)
calc1.advance_to_year(year)
calc1.calc_all()
print("Year: ", str(calc1.current_year))
# define market income - taking expanded_income and excluding gov't
# transfer benefits found in the Tax-Calculator expanded income
market_income = calc1.array("expanded_income") - calc1.array(
"benefit_value_total"
)
# Compute mtr on capital income
mtr_combined_capinc = cap_inc_mtr(calc1)
# Compute weighted avg mtr for labor income
# Note the index [2] in the mtr results means that we are pulling
# the combined mtr from the IIT + FICA taxes
mtr_combined_labinc = (
calc1.mtr("e00200p")[2] * np.abs(calc1.array("e00200"))
+ calc1.mtr("e00900p")[2] * np.abs(calc1.array("sey"))
) / (np.abs(calc1.array("sey")) + np.abs(calc1.array("e00200")))
# Put MTRs, income, tax liability, and other variables in dict
length = len(calc1.array("s006"))
tax_dict = {
"mtr_labinc": mtr_combined_labinc,
"mtr_capinc": mtr_combined_capinc,
"age": calc1.array("age_head"),
"total_labinc": calc1.array("sey") + calc1.array("e00200"),
"total_capinc": (
market_income - calc1.array("sey") + calc1.array("e00200")
),
"market_income": market_income,
"total_tax_liab": calc1.array("combined"),
"payroll_tax_liab": calc1.array("payrolltax"),
"etr": (
(calc1.array("combined") - calc1.array("ubi")) / market_income
),
"year": calc1.current_year * np.ones(length),
"weight": calc1.array("s006"),
}
# garbage collection
del calc1
return tax_dict
[docs]
def cap_inc_mtr(calc1): # pragma: no cover
"""
This function computes the marginal tax rate on capital income,
which is calculated as a weighted average of the marginal tax rates
on different sources of capital income.
Args:
calc1 (Tax-Calculator Calculator object): TC calculator
Returns:
mtr_combined_capinc (Numpy array): array with marginal tax rates
for each observation in the TC Records object
"""
# Note: PUF does not have variable for non-taxable IRA distributions
# Exclude Sch E income (e02000) from this list since we'll compute
# MTRs for this income in two parts - one for overall Sch C and one
# for S Corp and Partnerhsip income (e26270) (note that TaxCalc
# doesn't allow for an MTR on rents and royalties alone)
# e00300 = interest income
# e00400 = nontaxable interest income
# e00600 = ordinary dividend income
# e00650 = qualified dividend income
# e01400 = taxable IRA distributions
# e01700 = pension and annuity income
# p22250 = short term cap gain/loss
# p23250 = long term cap gain/loss
# e26270 = partnership and s corp income/loss
# e02000 = Sch E income (includes e26270)
capital_income_sources = (
"e00300",
"e00400",
"e00600",
"e00650",
"e01400",
"e01700",
"p22250",
"p23250",
"e26270",
)
rent_royalty_inc = np.abs(calc1.array("e02000") - calc1.array("e26270"))
# assign overall Sch E mtr to rent and royalities since TC can't do
# this component separately
rent_royalty_mtr = calc1.mtr("e02000")[2]
# calculating MTRs separately - can skip items with zero tax
all_mtrs = {
income_source: calc1.mtr(income_source)
for income_source in capital_income_sources
}
# Get each column of income sources, to include non-taxable income
record_columns = [calc1.array(x) for x in capital_income_sources]
# Compute weighted average of all those MTRs
# first find total capital income
total_cap_inc = sum(map(abs, record_columns)) + rent_royalty_inc
# Note that all_mtrs gives fica (0), iit (1), and combined (2) mtrs
# We'll use the combined - hence all_mtrs[source][2]
capital_mtr = [
abs(col) * all_mtrs[source][2]
for col, source in zip(record_columns, capital_income_sources)
]
mtr_combined_capinc = np.zeros_like(total_cap_inc)
mtr_combined_capinc[total_cap_inc != 0] = (
sum(capital_mtr + rent_royalty_mtr * rent_royalty_inc)[
total_cap_inc != 0
]
/ total_cap_inc[total_cap_inc != 0]
)
mtr_combined_capinc[total_cap_inc == 0] = all_mtrs["e00300"][2][
total_cap_inc == 0
]
return mtr_combined_capinc