"""
------------------------------------------------------------------------
Functions for taxes in the steady state and along the transition path.
------------------------------------------------------------------------
"""
# Packages
import numpy as np
from ogcore import utils
from ogcore.txfunc import get_tax_rates
"""
------------------------------------------------------------------------
Functions
------------------------------------------------------------------------
"""
[docs]
def replacement_rate_vals(nssmat, wss, factor_ss, j, p):
"""
Calculates replacement rate values for the social security system.
Args:
nssmat (Numpy array): initial guess at labor supply, size = SxJ
new_w (scalar): steady state real wage rate
factor_ss (scalar): scaling factor converting model units to
dollars
j (int): index of lifetime income group
p (OG-Core Specifications object): model parameters
Returns:
theta (Numpy array): social security replacement rate value for
lifetime income group j
"""
if j is not None:
e = np.squeeze(p.e[-1, :, j]) # Only computes using SS earnings
else:
e = np.squeeze(p.e[-1, :, :]) # Only computes using SS earnings
# adjust number of calendar years AIME computed from int model periods
equiv_periods = int(round((p.S / 80.0) * p.AIME_num_years)) - 1
if e.ndim == 2:
dim2 = e.shape[1]
else:
dim2 = 1
earnings = (e * (wss * nssmat * factor_ss)).reshape(p.S, dim2)
# get highest earning years for number of years AIME computed from
highest_earn = (
-1.0 * np.sort(-1.0 * earnings[: p.retire[-1], :], axis=0)
)[:equiv_periods]
AIME = highest_earn.sum(0) / ((12.0 * (p.S / 80.0)) * equiv_periods)
PIA = np.zeros(dim2)
# Compute level of replacement using AIME brackets and PIA rates
for j in range(dim2):
if AIME[j] < p.AIME_bkt_1:
PIA[j] = p.PIA_rate_bkt_1 * AIME[j]
elif AIME[j] < p.AIME_bkt_2:
PIA[j] = p.PIA_rate_bkt_1 * p.AIME_bkt_1 + p.PIA_rate_bkt_2 * (
AIME[j] - p.AIME_bkt_1
)
else:
PIA[j] = (
p.PIA_rate_bkt_1 * p.AIME_bkt_1
+ p.PIA_rate_bkt_2 * (p.AIME_bkt_2 - p.AIME_bkt_1)
+ p.PIA_rate_bkt_3 * (AIME[j] - p.AIME_bkt_2)
)
# Set the maximum monthly replacment rate from SS benefits tables
PIA[PIA > p.PIA_maxpayment] = p.PIA_maxpayment
if p.PIA_minpayment != 0.0:
PIA[PIA < p.PIA_minpayment] = p.PIA_minpayment
theta = (PIA * (12.0 * p.S / 80.0)) / (factor_ss * wss)
return theta
[docs]
def ETR_wealth(b, h_wealth, m_wealth, p_wealth):
r"""
Calculates the effective tax rate on wealth.
.. math::
T_{j,s,t}^{w} = \frac{h^{w}p_{w}b_{j,s,t}}{h^{w}b_{j,s,t} + m^{w}}
Args:
b (Numpy array): savings
h_wealth (scalar): parameter of wealth tax function
p_wealth (scalar): parameter of wealth tax function
m_wealth (scalar): parameter of wealth tax function
Returns:
tau_w (Numpy array): effective tax rate on wealth, size = SxJ
"""
tau_w = (p_wealth * h_wealth * b) / (h_wealth * b + m_wealth)
return tau_w
[docs]
def MTR_wealth(b, h_wealth, m_wealth, p_wealth):
r"""
Calculates the marginal tax rate on wealth from the wealth tax.
.. math::
\frac{\partial T_{j,s,t}^{w}}{\partial b_{j,s,t}} =
\frac{h^{w}p_{w}b_{j,s,t}}{(b_{j,s,t}h^{w}+m^{w})}\left[2 -
\frac{h^{w}p_{w}b_{j,s,t}}{(b_{j,s,t}h^{w}+m^{w})}\right]
Args:
b (Numpy array): savings
h_wealth (scalar): parameter of wealth tax function
p_wealth (scalar): parameter of wealth tax function
m_wealth (scalar): parameter of wealth tax function
Returns:
tau_prime (Numpy array): marginal tax rate on wealth, size = SxJ
"""
tau_prime = ETR_wealth(b, h_wealth, m_wealth, p_wealth) * 2 - (
(h_wealth**2 * p_wealth * b**2) / ((b * h_wealth + m_wealth) ** 2)
)
return tau_prime
[docs]
def ETR_income(
r,
w,
b,
n,
factor,
e,
etr_params,
labor_noncompliance_rate,
capital_noncompliance_rate,
p,
):
"""
Calculates effective personal income tax rate.
Args:
r (array_like): real interest rate
w (array_like): real wage rate
b (Numpy array): savings
n (Numpy array): labor supply
factor (scalar): scaling factor converting model units to
dollars
e (Numpy array): effective labor units
etr_params (list): list of effective tax rate function
parameters or nonparametric function
labor_noncompliance_rate (Numpy array): income tax noncompliance rate for labor income
capital_noncompliance_rate (Numpy array): income tax noncompliance rate for capital income
p (OG-Core Specifications object): model parameters
Returns:
tau (Numpy array): effective tax rate on total income
"""
X = (w * e * n) * factor
Y = (r * b) * factor
noncompliance_rate = (
(X * labor_noncompliance_rate) + (Y * capital_noncompliance_rate)
) / (X + Y)
tau = get_tax_rates(
etr_params, X, Y, None, p.tax_func_type, "etr", for_estimation=False
)
return tau * (1 - noncompliance_rate)
[docs]
def MTR_income(
r,
w,
b,
n,
factor,
mtr_capital,
e,
etr_params,
mtr_params,
noncompliance_rate,
p,
):
r"""
Generates the marginal tax rate on labor income for households.
Args:
r (array_like): real interest rate
w (array_like): real wage rate
b (Numpy array): savings
n (Numpy array): labor supply
factor (scalar): scaling factor converting model units to
dollars
mtr_capital (bool): whether to compute the marginal tax rate on
capital income or labor income
e (Numpy array): effective labor units
etr_params (list): list of effective tax rate function
parameters or nonparametric function
mtr_params (list): list of marginal tax rate function
parameters or nonparametric function
noncompliance_rate (Numpy array): income tax noncompliance rate
p (OG-Core Specifications object): model parameters
Returns:
tau (Numpy array): marginal tax rate on income source
"""
X = (w * e * n) * factor
Y = (r * b) * factor
if p.analytical_mtrs:
tau = get_tax_rates(
etr_params,
X,
Y,
None,
p.tax_func_type,
"mtr",
p.analytical_mtrs,
mtr_capital,
for_estimation=False,
)
else:
tau = get_tax_rates(
mtr_params,
X,
Y,
None,
p.tax_func_type,
"mtr",
p.analytical_mtrs,
mtr_capital,
for_estimation=False,
)
return tau * (1 - noncompliance_rate)
[docs]
def get_biz_tax(w, Y, L, K, p_m, p, m, method):
r"""
Finds total business income tax revenue.
.. math::
R_{t}^{b} = \sum_{m=1}^{M}\tau_{m,t}^{b}(Y_{m,t} - w_{t}L_{m,t}) -
\tau_{m,t}^{b}\delta_{m,t}^{\tau}K_{m,t}^{\tau} - \tau^{inv}_{m,t}I_{m,t}
Args:
r (array_like): real interest rate
Y (array_like): aggregate output for each industry
L (array_like): aggregate labor demand for each industry
K (array_like): aggregate capital demand for each industry
p_m (array_like): output prices
p (OG-Core Specifications object): model parameters
m (int or None): index for production industry, if None, then
compute for all industries
Returns:
business_revenue (array_like): aggregate business tax revenue
"""
if m is not None:
if method == "SS":
delta_tau = p.delta_tau[-1, m]
tau_b = p.tau_b[-1, m]
tau_inv = p.inv_tax_credit[-1, m]
price = p_m[m]
Inv = p.delta * K[m] # compute gross investment
else:
delta_tau = p.delta_tau[: p.T, m].reshape(p.T)
tau_b = p.tau_b[: p.T, m].reshape(p.T)
tau_inv = p.inv_tax_credit[: p.T, m].reshape(p.T)
price = p_m[: p.T, m].reshape(p.T)
w = w.reshape(p.T)
Inv = p.delta * K
else:
if method == "SS":
delta_tau = p.delta_tau[-1, :]
tau_b = p.tau_b[-1, :]
tau_inv = p.inv_tax_credit[-1, :]
price = p_m
Inv = p.delta * K
else:
delta_tau = p.delta_tau[: p.T, :].reshape(p.T, p.M)
tau_b = p.tau_b[: p.T, :].reshape(p.T, p.M)
tau_inv = p.inv_tax_credit[: p.T, :].reshape(p.T, p.M)
price = p_m[: p.T, :].reshape(p.T, p.M)
w = w.reshape(p.T, 1)
Inv = p.delta * K
business_revenue = (
tau_b * (price * Y - w * L) - tau_b * delta_tau * K - tau_inv * Inv
)
return business_revenue
[docs]
def net_taxes(
r,
w,
b,
n,
bq,
factor,
tr,
ubi,
theta,
t,
j,
shift,
method,
e,
etr_params,
p,
):
"""
Calculate net taxes paid for each household.
Args:
r (array_like): real interest rate
w (array_like): real wage rate
b (Numpy array): savings
n (Numpy array): labor supply
bq (Numpy array): bequests received
factor (scalar): scaling factor converting model units to
dollars
tr (Numpy array): government transfers to the household
ubi (Numpy array): universal basic income payments to households
theta (Numpy array): social security replacement rate value for
lifetime income group j
t (int): time period
j (int): index of lifetime income group
shift (bool): whether computing for periods 0--s or 1--(s+1),
=True for 1--(s+1)
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
e (Numpy array): effective labor units
etr_params (list): list of effective tax rate function parameters
p (OG-Core Specifications object): model parameters
Returns:
net_tax (Numpy array): net taxes paid for each household
"""
T_I = income_tax_liab(r, w, b, n, factor, t, j, method, e, etr_params, p)
pension = pension_amount(w, n, theta, t, j, shift, method, e, p)
T_BQ = bequest_tax_liab(r, b, bq, t, j, method, p)
T_W = wealth_tax_liab(r, b, t, j, method, p)
net_tax = T_I - pension + T_BQ + T_W - tr - ubi
return net_tax
[docs]
def income_tax_liab(r, w, b, n, factor, t, j, method, e, etr_params, p):
"""
Calculate income and payroll tax liability for each household
Args:
r (array_like): real interest rate
w (array_like): real wage rate
b (Numpy array): savings
n (Numpy array): labor supply
factor (scalar): scaling factor converting model units to
dollars
t (int): time period
j (int): index of lifetime income group
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
e (Numpy array): effective labor units
etr_params (list): effective tax rate function parameters
p (OG-Core Specifications object): model parameters
Returns:
T_I (Numpy array): total income and payroll taxes paid for each
household
"""
if j is not None:
if method == "TPI":
if b.ndim == 2:
r = r.reshape(r.shape[0], 1)
w = w.reshape(w.shape[0], 1)
labor_income_tax_compliance_rate = (
p.labor_income_tax_noncompliance_rate[t, j]
)
capital_income_tax_compliance_rate = (
p.capital_income_tax_noncompliance_rate[t, j]
)
else:
labor_income_tax_compliance_rate = (
p.labor_income_tax_noncompliance_rate[-1, j]
)
capital_income_tax_compliance_rate = (
p.capital_income_tax_noncompliance_rate[-1, j]
)
else:
if method == "TPI":
r = utils.to_timepath_shape(r)
w = utils.to_timepath_shape(w)
labor_income_tax_compliance_rate = (
p.labor_income_tax_noncompliance_rate[t, :]
)
capital_income_tax_compliance_rate = (
p.capital_income_tax_noncompliance_rate[t, :]
)
else:
labor_income_tax_compliance_rate = (
p.labor_income_tax_noncompliance_rate[-1, :]
)
capital_income_tax_compliance_rate = (
p.capital_income_tax_noncompliance_rate[-1, :]
)
income = r * b + w * e * n
labor_income = w * e * n
T_I = (
ETR_income(
r,
w,
b,
n,
factor,
e,
etr_params,
labor_income_tax_compliance_rate,
capital_income_tax_compliance_rate,
p,
)
* income
)
if method == "SS":
T_P = p.tau_payroll[-1] * labor_income
elif method == "TPI":
length = w.shape[0]
if len(b.shape) == 1:
T_P = p.tau_payroll[t : t + length] * labor_income
elif len(b.shape) == 2:
T_P = (
p.tau_payroll[t : t + length].reshape(length, 1) * labor_income
)
else:
T_P = (
p.tau_payroll[t : t + length].reshape(length, 1, 1)
* labor_income
)
elif method == "TPI_scalar":
T_P = p.tau_payroll[0] * labor_income
income_payroll_tax_liab = T_I + T_P
return income_payroll_tax_liab
[docs]
def pension_amount(w, n, theta, t, j, shift, method, e, p):
"""
Calculate public pension benefit amounts for each household.
Args:
w (array_like): real wage rate
n (Numpy array): labor supply
theta (Numpy array): social security replacement rate value for
lifetime income group j
t (int): time period
j (int): index of lifetime income group
shift (bool): whether computing for periods 0--s or 1--(s+1),
=True for 1--(s+1)
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
e (Numpy array): effective labor units
p (OG-Core Specifications object): model parameters
Returns:
pension (Numpy array): pension amount for each household
"""
if j is not None:
if method == "TPI":
if n.ndim == 2:
w = w.reshape(w.shape[0], 1)
else:
if method == "TPI":
w = utils.to_timepath_shape(w)
pension = np.zeros_like(n)
if method == "SS":
# Depending on if we are looking at b_s or b_s+1, the
# entry for retirement will change (it shifts back one).
# The shift boolean makes sure we start replacement rates
# at the correct age.
if shift is False:
pension[p.retire[-1] :] = theta * w
else:
pension[p.retire[-1] - 1 :] = theta * w
elif method == "TPI":
length = w.shape[0]
if not shift:
# retireTPI is different from retire, because in TP income
# we are counting backwards with different length lists.
# This will always be the correct location of retirement,
# depending on the shape of the lists.
retireTPI = p.retire[t : t + length] - p.S
else:
retireTPI = p.retire[t : t + length] - 1 - p.S
if len(n.shape) == 1:
if not shift:
retireTPI = p.retire[t] - p.S
else:
retireTPI = p.retire[t] - 1 - p.S
pension[retireTPI:] = (
theta[j] * p.replacement_rate_adjust[t] * w[retireTPI:]
)
elif len(n.shape) == 2:
for tt in range(pension.shape[0]):
pension[tt, retireTPI[tt] :] = (
theta * p.replacement_rate_adjust[t + tt] * w[tt]
)
else:
for tt in range(pension.shape[0]):
pension[tt, retireTPI[tt] :, :] = (
theta.reshape(1, p.J)
* p.replacement_rate_adjust[t + tt]
* w[tt]
)
elif method == "TPI_scalar":
# The above methods won't work if scalars are used. This option
# is only called by the SS_TPI_firstdoughnutring function in TPI.
pension = theta * p.replacement_rate_adjust[0] * w
return pension
[docs]
def wealth_tax_liab(r, b, t, j, method, p):
"""
Calculate wealth tax liability for each household.
Args:
r (array_like): real interest rate
b (Numpy array): savings
t (int): time period
j (int): index of lifetime income group
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
p (OG-Core Specifications object): model parameters
Returns:
T_W (Numpy array): wealth tax liability for each household
"""
if j is not None:
if method == "TPI":
if b.ndim == 2:
r = r.reshape(r.shape[0], 1)
else:
if method == "TPI":
r = utils.to_timepath_shape(r)
if method == "SS":
T_W = ETR_wealth(b, p.h_wealth[-1], p.m_wealth[-1], p.p_wealth[-1]) * b
elif method == "TPI":
length = r.shape[0]
if len(b.shape) == 1:
T_W = (
ETR_wealth(
b,
p.h_wealth[t : t + length],
p.m_wealth[t : t + length],
p.p_wealth[t : t + length],
)
* b
)
elif len(b.shape) == 2:
T_W = (
ETR_wealth(
b,
p.h_wealth[t : t + length],
p.m_wealth[t : t + length],
p.p_wealth[t : t + length],
)
* b
)
else:
T_W = (
ETR_wealth(
b,
p.h_wealth[t : t + length].reshape(length, 1, 1),
p.m_wealth[t : t + length].reshape(length, 1, 1),
p.p_wealth[t : t + length].reshape(length, 1, 1),
)
* b
)
elif method == "TPI_scalar":
T_W = ETR_wealth(b, p.h_wealth[0], p.m_wealth[0], p.p_wealth[0]) * b
return T_W
[docs]
def bequest_tax_liab(r, b, bq, t, j, method, p):
"""
Calculate liability due from taxes on bequests for each household.
Args:
r (array_like): real interest rate
b (Numpy array): savings
bq (Numpy array): bequests received
t (int): time period
j (int): index of lifetime income group
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
p (OG-Core Specifications object): model parameters
Returns:
T_BQ (Numpy array): bequest tax liability for each household
"""
if j is not None:
lambdas = p.lambdas[j]
if method == "TPI":
if b.ndim == 2:
r = r.reshape(r.shape[0], 1)
else:
lambdas = np.transpose(p.lambdas)
if method == "TPI":
r = utils.to_timepath_shape(r)
if method == "SS":
T_BQ = p.tau_bq[-1] * bq
elif method == "TPI":
length = r.shape[0]
if len(b.shape) == 1:
T_BQ = p.tau_bq[t : t + length] * bq
elif len(b.shape) == 2:
T_BQ = p.tau_bq[t : t + length].reshape(length, 1) * bq / lambdas
else:
T_BQ = p.tau_bq[t : t + length].reshape(length, 1, 1) * bq
elif method == "TPI_scalar":
# The above methods won't work if scalars are used. This option
# is only called by the SS_TPI_firstdoughnutring function in TPI.
T_BQ = p.tau_bq[0] * bq
return T_BQ