import numpy as np
import pandas as pd
import scipy.stats as st
import scipy
from statsmodels.nonparametric.kernel_regression import KernelReg
from scipy.interpolate import UnivariateSpline
from scipy.linalg import lstsq
[docs]
class IOT:
"""
Constructor for the IOT class.
This IOT class can be used to compute the social welfare weights
across the income distribution given data, tax policy parameters,
and behavioral parameters.
Args:
data (Pandas DataFrame): micro data representing tax payers.
Must include the following columns: income_measure,
weight_var, mtr
income_measure (str): name of income measure from data to use
weight_var (str): name of weight measure from data to use
eti (scalar): compensated elasticity of taxable income
w.r.t. the marginal tax rate
bandwidth (scalar): size of income bins in units of income
lower_bound (scalar): minimum income to consider
upper_bound (scalar): maximum income to consider
dist_type (None or str): type of distribution to use if
parametric, if None, then non-parametric bin weights
mtr_smoother (None or str): method used to smooth our mtr
function, if None, then use bin average mtrs
Returns:
class instance: IOT
"""
def __init__(
self,
data,
income_measure="e00200",
weight_var="s006",
eti=0.25,
dist_type="log_normal",
kde_bw=None,
mtr_smoother="kreg",
mtr_smooth_param=3,
):
# keep the original data intact
self.data_original = data.copy()
# clean data based on upper and lower bounds
# data = data[
# (data[income_measure] >= lower_bound)
# & (data[income_measure] <= upper_bound)
# ]
# Get income distribution
self.z, self.F, self.f, self.f_prime = self.compute_income_dist(
data, income_measure, weight_var, dist_type, kde_bw
)
# see if eti is a scalar
if isinstance(eti, float):
self.eti = eti
else: # if not, then it should be a dict with keys containing lists as values
# check that same number of ETI values as knot points
assert len(eti["knot_points"]) == len(eti["eti_values"])
# want to interpolate across income distribution with knot points
# assume that eti can't go beyond 1 (or the max of the eti_values provided)
if len(eti["knot_points"]) > 3:
spline_order = 3
else:
spline_order = 1
eti_spl = UnivariateSpline(
eti["knot_points"], eti["eti_values"], k=spline_order, s=0
)
self.eti = eti_spl(self.z)
# compute marginal tax rate schedule
self.mtr, self.mtr_prime = self.compute_mtr_dist(
data, weight_var, income_measure, mtr_smoother, mtr_smooth_param
)
# compute theta_z, the elasticity of the tax base
self.theta_z = 1 + ((self.z * self.f_prime) / self.f)
# compute the social welfare weights
self.g_z, self.g_z_numerical = self.sw_weights()
[docs]
def df(self):
"""
Return all vector attributes in a DataFrame format
Args:
None
Returns:
df (Pandas DataFrame): DataFrame with all inputs/outputs
for each income bin
"""
dict_out = {
"z": self.z,
"f": self.f,
"F": self.F,
"f_prime": self.f_prime,
"mtr": self.mtr,
"mtr_prime": self.mtr_prime,
"theta_z": self.theta_z,
"g_z": self.g_z,
"g_z_numerical": self.g_z_numerical,
}
df = pd.DataFrame.from_dict(dict_out)
return df
[docs]
def compute_mtr_dist(
self, data, weight_var, income_measure, mtr_smoother, mtr_smooth_param
):
"""
Compute marginal tax rates over the income distribution and
their derivative.
Args:
data (Pandas DataFrame): micro data representing tax payers.
Must include the following columns: income_measure,
weight_var, mtr
weight_var (str): name of weight measure from data to use
mtr_smoother (None or str): method used to smooth our mtr
function, if None, then use bin average mtrs
Returns:
tuple:
* mtr (array_like): mean marginal tax rate for each income bin
* mtr_prime (array_like): rate of change in marginal tax rates
for each income bin
"""
if mtr_smoother == "kreg":
bins = 1000 # number of equal-width bins
data.loc[:, ["z_bin"]] = pd.cut(
data[income_measure], bins, include_lowest=True
)
binned_data = pd.DataFrame(
data[["mtr", income_measure, "z_bin", weight_var]]
.groupby(["z_bin"], observed=False)
.apply(lambda x: wm(x[["mtr", income_measure]], x[weight_var]))
)
# make column 0 into two columns
binned_data[["mtr", income_measure]] = pd.DataFrame(
binned_data[0].tolist(), index=binned_data.index
)
binned_data.drop(columns=0, inplace=True)
binned_data.reset_index(inplace=True)
mtr_function = KernelReg(
binned_data["mtr"].dropna(),
binned_data[income_measure].dropna(),
var_type="c",
reg_type="ll",
bw=[mtr_smooth_param * 40_000],
)
mtr, _ = mtr_function.fit(self.z)
mtr_prime = np.gradient(mtr, edge_order=2)
elif mtr_smoother == "HSV":
# estimate the HSV function on mtrs via weighted least squares
# DATA CLEANING
# drop rows with missing or inf mtr
data = data[~data["mtr"].isna()]
data = data[~data["mtr"].isin([np.inf, -np.inf])]
# drop if MTR > 100%
data = data[data["mtr"] < 1]
# drop rows with missing, inf, or zero income
data = data[data[income_measure] > 0]
# drop rows with missing, inf or negative weights
data = data[~data[weight_var].isna()]
data = data[~data[weight_var].isin([np.inf, -np.inf])]
data = data[data[weight_var] > 0]
# ESTIMATION
X = np.log(data[income_measure].values)
X = np.column_stack((np.ones(len(X)), X))
w = np.array(data[weight_var].values)
w_sqrt = np.sqrt(w)
y = np.log(1 - data["mtr"].values)
X_weighted = X * w_sqrt[:, np.newaxis]
y_weighted = y * w_sqrt
coef, _, _, _ = lstsq(X_weighted, y_weighted)
tau = -coef[1]
lambda_param = np.exp(coef[0]) / (1 - tau)
mtr = 1 - lambda_param * (1 - tau) * self.z ** (-tau)
mtr_prime = lambda_param * tau * (1 - tau) * self.z ** (-tau - 1)
else:
print("Please enter a value mtr_smoother method")
assert False
return mtr, mtr_prime
[docs]
def compute_income_dist(
self, data, income_measure, weight_var, dist_type, kde_bw=None
):
"""
Compute the distribution of income (parametrically or not) from
the raw data.
This method computes the probability density function and its
derivative.
Args:
data (Pandas DataFrame): micro data representing tax payers.
Must include the following columns: income_measure,
weight_var, mtr
income_measure (str): name of income measure from data to
use
weight_var (str): name of weight measure from data to use
dist_type (None or str): type of distribution to use if
parametric, if None, then non-parametric bin weights
Returns:
tuple:
* z (array_like): mean income at each bin in the income
distribution
* f (array_like): density for income bin z
* f_prime (array_like): slope of the density function for
income bin z
"""
z_line = np.linspace(100, 1000000, 100000)
# drop zero income observations
data = data[data[income_measure] > 0]
if dist_type == "log_normal":
mu = (
np.log(data[income_measure]) * data[weight_var]
).sum() / data[weight_var].sum()
sigmasq = (
(
((np.log(data[income_measure]) - mu) ** 2)
* data[weight_var]
).values
/ data[weight_var].sum()
).sum()
# F = st.lognorm.cdf(z_line, s=(sigmasq) ** 0.5, scale=np.exp(mu))
# f = st.lognorm.pdf(z_line, s=(sigmasq) ** 0.5, scale=np.exp(mu))
# f = f / np.sum(f)
# f_prime = np.gradient(f, edge_order=2)
# analytical derivative of lognormal
sigma = np.sqrt(sigmasq)
F = (1 / 2) * (
1
+ scipy.special.erf(
(np.log(z_line) - mu) / (np.sqrt(2) * sigma)
)
)
f = (
(1 / (sigma * np.sqrt(2 * np.pi)))
* np.exp(-((np.log(z_line) - mu) ** 2) / (2 * sigma**2))
* (1 / z_line)
)
f_prime = (
-1
* np.exp(-((np.log(z_line) - mu) ** 2) / (2 * sigma**2))
* (
(np.log(z_line) + sigma**2 - mu)
/ (z_line**2 * sigma**3 * np.sqrt(2 * np.pi))
)
)
elif dist_type == "kde":
# uses the original full data for kde estimation
f_function = st.gaussian_kde(
data[income_measure],
# bw_method=kde_bw,
weights=data[weight_var],
)
f = f_function.pdf(z_line)
F = np.cumsum(f)
f_prime = np.gradient(f, edge_order=2)
elif dist_type == "Pln":
def pln_pdf(y, mu, sigma, alpha):
x1 = alpha * sigma - (np.log(y) - mu) / sigma
phi = st.norm.pdf((np.log(y) - mu) / sigma)
R = (1 - st.norm.cdf(x1)) / (st.norm.pdf(x1) + 1e-15)
# 1e-15 to avoid division by zero
pdf = alpha / y * phi * R
return pdf
def neg_weighted_log_likelihood(params, data, weights):
mu, sigma, alpha = params
likelihood = np.sum(
weights * np.log(pln_pdf(data, mu, sigma, alpha) + 1e-15)
)
# 1e-15 to avoid log(0)
return -likelihood
def fit_pln(data, weights, initial_guess):
bounds = [(None, None), (0.01, None), (0.01, None)]
result = scipy.optimize.minimize(
neg_weighted_log_likelihood,
initial_guess,
args=(data, weights),
method="L-BFGS-B",
bounds=bounds,
)
return result.x
mu_initial = (
np.log(data[income_measure]) * data[weight_var]
).sum() / data[weight_var].sum()
sigmasq = (
(
((np.log(data[income_measure]) - mu_initial) ** 2)
* data[weight_var]
).values
/ data[weight_var].sum()
).sum()
sigma_initial = np.sqrt(sigmasq)
# Initial guess for m, sigma, alpha
initial_guess = np.array([mu_initial, sigma_initial, 1.5])
mu, sigma, alpha = fit_pln(
data[income_measure], data[weight_var], initial_guess
)
def pln_cdf(y, mu, sigma, alpha):
x1 = alpha * sigma - (np.log(y) - mu) / sigma
R = (1 - st.norm.cdf(x1)) / (st.norm.pdf(x1) + 1e-12)
CDF = (
st.norm.cdf((np.log(y) - mu) / sigma)
- st.norm.pdf((np.log(y) - mu) / sigma) * R
)
return CDF
def pln_dpdf(y, mu, sigma, alpha):
x = (np.log(y) - mu) / sigma
R = (1 - st.norm.cdf(alpha * sigma - x)) / (
st.norm.pdf(alpha * sigma - x) + 1e-15
)
left = (1 + x / sigma) * pln_pdf(y, mu, sigma, alpha)
right = (
alpha
* st.norm.pdf(x)
* ((alpha * sigma - x) * R - 1)
/ (sigma * y)
)
return -(left + right) / y
f = pln_pdf(z_line, mu, sigma, alpha)
F = pln_cdf(z_line, mu, sigma, alpha)
f_prime = pln_dpdf(z_line, mu, sigma, alpha)
else:
print("Please enter a valid value for dist_type")
assert False
return z_line, F, f, f_prime
[docs]
def sw_weights(self):
r"""
Returns the social welfare weights for a given tax policy.
See Jacobs, Jongen, and Zoutman (2017) and
Lockwood and Weinzierl (2016) for details.
.. math::
g_{z} = 1 + \theta_z \varepsilon^{c}\frac{T'(z)}{(1-T'(z))} +
\varepsilon^{c}\frac{zT''(z)}{(1-T''(z))^{2}}
Args:
None
Returns:
array_like: vector of social welfare weights across
the income distribution
"""
g_z = (
1
+ ((self.theta_z * self.eti * self.mtr) / (1 - self.mtr))
+ ((self.eti * self.z * self.mtr_prime) / (1 - self.mtr) ** 2)
)
integral = np.trapz(g_z * self.f, self.z)
g_z = g_z / integral
# use Lockwood and Weinzierl formula, which should be equivalent but using numerical differentiation
bracket_term = (
1
- self.F
- (self.mtr / (1 - self.mtr)) * self.eti * self.z * self.f
)
d_dz_bracket = np.gradient(bracket_term, edge_order=2)
# d_dz_bracket = np.diff(bracket_term) / np.diff(self.z)
# d_dz_bracket = np.append(d_dz_bracket, d_dz_bracket[-1])
g_z_numerical = -(1 / self.f) * d_dz_bracket
integral = np.trapz(g_z_numerical * self.f, self.z)
g_z_numerical = g_z_numerical / integral
return g_z, g_z_numerical
def find_eti(iot1, iot2, g_z_type="g_z"):
"""
This function solves for the ETI that would result in the
policy represented via MTRs in iot2 be consistent with the
social welfare function inferred from the policies of iot1.
.. math::
\varepsilon_{z} = \frac{(1-T'(z))}{T'(z)}\frac{(1-F(z))}{zf(z)}\int_{z}^{\infty}\frac{1-g_{\tilde{z}}{1-F(y)}dF(\tilde{z})
Args:
iot1 (IOT): IOT class instance representing baseline policy
iot2 (IOT): IOT class instance representing reform policy
g_z_type (str): type of social welfare function to use
Options are:
* 'g_z' for the analytical formula
* 'g_z_numerical' for the numerical approximation
Returns:
eti_beliefs (array-like): vector of ETI beliefs over z
"""
if g_z_type == "g_z":
g_z = iot1.g_z
else:
g_z = iot1.g_z_numerical
# The equation below is a simplication of the above to make the integration easier
eti_beliefs_lw = ((1 - iot2.mtr) / (iot2.z * iot2.f * iot2.mtr)) * (
1 - iot2.F - (g_z.sum() - np.cumsum(g_z))
)
# derivation from JJZ analytical solution that doesn't involved integration
eti_beliefs_jjz = (g_z - 1) / (
(iot2.theta_z * (iot2.mtr / (1 - iot2.mtr)))
+ (iot2.z * (iot2.mtr_prime / (1 - iot2.mtr) ** 2))
)
return eti_beliefs_lw, eti_beliefs_jjz
def wm(value, weight):
"""
Weighted mean function that allows for zero division
Args:
value (array_like): values to be averaged
weight (array_like): weights for each value
Returns:
scalar: weighted average
"""
try:
return np.average(value, weights=weight, axis=0)
except ZeroDivisionError:
return [np.nan, np.nan]