import numpy as np
import pandas as pd
import scipy.stats as st
import scipy
from statsmodels.nonparametric.kernel_regression import KernelReg
from scipy.interpolate import UnivariateSpline
from scipy.linalg import lstsq
[docs]
class IOT:
"""
Constructor for the IOT class.
This IOT class can be used to compute the social welfare weights
across the income distribution given data, tax policy parameters,
and behavioral parameters.
Args:
data (Pandas DataFrame): micro data representing tax payers.
Must include the following columns: income_measure,
weight_var, mtr
income_measure (str): name of income measure from data to use
weight_var (str): name of weight measure from data to use
eti (scalar): compensated elasticity of taxable income
w.r.t. the marginal tax rate
bandwidth (scalar): size of income bins in units of income
lower_bound (scalar): minimum income to consider
upper_bound (scalar): maximum income to consider
dist_type (None or str): type of distribution to use if
parametric, if None, then non-parametric bin weights
mtr_smoother (None or str): method used to smooth our mtr
function, if None, then use bin average mtrs
mtr_smooth_param (scalar): parameter for mtr_smoother
kreg_bw (array_like): bandwidth for kernel regression
Returns:
class instance: IOT
"""
def __init__(
self,
data,
income_measure="e00200",
weight_var="s006",
eti=0.25,
dist_type="log_normal",
kde_bw=None,
mtr_smoother="kreg",
mtr_smooth_param=1000,
kreg_bw=[120_000],
):
# keep the original data intact
self.data_original = data.copy()
# clean data based on upper and lower bounds
# data = data[
# (data[income_measure] >= lower_bound)
# & (data[income_measure] <= upper_bound)
# ]
# Get income distribution
self.z, self.F, self.f, self.f_prime = self.compute_income_dist(
data, income_measure, weight_var, dist_type, kde_bw
)
# see if eti is a scalar
if isinstance(eti, float):
self.eti = eti
else: # if not, then it should be a dict with keys containing lists as values
# check that same number of ETI values as knot points
assert len(eti["knot_points"]) == len(eti["eti_values"])
# want to interpolate across income distribution with knot points
# assume that eti can't go beyond 1 (or the max of the eti_values provided)
if len(eti["knot_points"]) > 3:
spline_order = 3
else:
spline_order = 1
eti_spl = UnivariateSpline(
eti["knot_points"], eti["eti_values"], k=spline_order, s=0
)
self.eti = eti_spl(self.z)
# compute marginal tax rate schedule
self.mtr, self.mtr_prime = self.compute_mtr_dist(
data,
weight_var,
income_measure,
mtr_smoother,
mtr_smooth_param,
kreg_bw,
)
# compute theta_z, the elasticity of the tax base
self.theta_z = 1 + ((self.z * self.f_prime) / self.f)
# compute the social welfare weights
self.g_z, self.g_z_numerical = self.sw_weights()
[docs]
def df(self):
"""
Return all vector attributes in a DataFrame format
Args:
None
Returns:
df (Pandas DataFrame): DataFrame with all inputs/outputs
for each income bin
"""
dict_out = {
"z": self.z,
"f": self.f,
"F": self.F,
"f_prime": self.f_prime,
"mtr": self.mtr,
"mtr_prime": self.mtr_prime,
"theta_z": self.theta_z,
"g_z": self.g_z,
"g_z_numerical": self.g_z_numerical,
}
df = pd.DataFrame.from_dict(dict_out)
return df
[docs]
def compute_mtr_dist(
self,
data,
weight_var,
income_measure,
mtr_smoother,
mtr_smooth_param,
kreg_bw,
):
"""
Compute marginal tax rates over the income distribution and
their derivative.
Args:
data (Pandas DataFrame): micro data representing tax payers.
Must include the following columns: income_measure,
weight_var, mtr
weight_var (str): name of weight measure from data to use
mtr_smoother (None or str): method used to smooth our mtr
function, if None, then use bin average mtrs
mtr_smooth_param (scalar): parameter for mtr_smoother
kreg_bw (array_like): bandwidth for kernel regression
Returns:
tuple:
* mtr (array_like): mean marginal tax rate for each income bin
* mtr_prime (array_like): rate of change in marginal tax rates
for each income bin
"""
if mtr_smoother == "kreg":
bins = mtr_smooth_param # number of equal-width bins
data.loc[:, ["z_bin"]] = pd.cut(
data[income_measure], bins, include_lowest=True
)
binned_data = pd.DataFrame(
data[["mtr", income_measure, "z_bin", weight_var]]
.groupby(["z_bin"], observed=False)
.apply(lambda x: wm(x[["mtr", income_measure]], x[weight_var]))
)
# make column 0 into two columns
binned_data[["mtr", income_measure]] = pd.DataFrame(
binned_data[0].tolist(), index=binned_data.index
)
binned_data.drop(columns=0, inplace=True)
binned_data.reset_index(inplace=True)
mtr_function = KernelReg(
binned_data["mtr"].dropna(),
binned_data[income_measure].dropna(),
var_type="c",
reg_type="ll",
bw=kreg_bw,
)
mtr, _ = mtr_function.fit(self.z)
mtr_prime = np.gradient(mtr, self.z, edge_order=2)
elif mtr_smoother == "HSV":
# estimate the HSV function on mtrs via weighted least squares
# DATA CLEANING
# drop rows with missing or inf mtr
data = data[~data["mtr"].isna()]
data = data[~data["mtr"].isin([np.inf, -np.inf])]
# drop if MTR > 100%
data = data[data["mtr"] < 1]
# drop rows with missing, inf, or zero income
data = data[data[income_measure] > 0]
# drop rows with missing, inf or negative weights
data = data[~data[weight_var].isna()]
data = data[~data[weight_var].isin([np.inf, -np.inf])]
data = data[data[weight_var] > 0]
# ESTIMATION
X = np.log(data[income_measure].values)
X = np.column_stack((np.ones(len(X)), X))
w = np.array(data[weight_var].values)
w_sqrt = np.sqrt(w)
y = np.log(1 - data["mtr"].values)
X_weighted = X * w_sqrt[:, np.newaxis]
y_weighted = y * w_sqrt
coef, _, _, _ = lstsq(X_weighted, y_weighted)
tau = -coef[1]
lambda_param = np.exp(coef[0]) / (1 - tau)
mtr = 1 - lambda_param * (1 - tau) * self.z ** (-tau)
mtr_prime = lambda_param * tau * (1 - tau) * self.z ** (-tau - 1)
else:
print("Please enter a value mtr_smoother method")
assert False
return mtr, mtr_prime
[docs]
def compute_income_dist(
self, data, income_measure, weight_var, dist_type, kde_bw=None
):
"""
Compute the distribution of income (parametrically or not) from
the raw data.
This method computes the probability density function and its
derivative.
Args:
data (Pandas DataFrame): micro data representing tax payers.
Must include the following columns: income_measure,
weight_var, mtr
income_measure (str): name of income measure from data to
use
weight_var (str): name of weight measure from data to use
dist_type (None or str): type of distribution to use if
parametric, if None, then non-parametric bin weights
kde_bw (array_like): bandwidth for kernel regression
Returns:
tuple:
* z (array_like): mean income at each bin in the income
distribution
* f (array_like): density for income bin z
* f_prime (array_like): slope of the density function for
income bin z
"""
z_line = np.linspace(100, 1000000, 100000)
# drop zero income observations
data = data[data[income_measure] > 0]
if dist_type == "log_normal":
mu = (
np.log(data[income_measure]) * data[weight_var]
).sum() / data[weight_var].sum()
sigmasq = (
(
((np.log(data[income_measure]) - mu) ** 2)
* data[weight_var]
).values
/ data[weight_var].sum()
).sum()
# F = st.lognorm.cdf(z_line, s=(sigmasq) ** 0.5, scale=np.exp(mu))
# f = st.lognorm.pdf(z_line, s=(sigmasq) ** 0.5, scale=np.exp(mu))
# f = f / np.sum(f)
# f_prime = np.gradient(f, edge_order=2)
# analytical derivative of lognormal
sigma = np.sqrt(sigmasq)
F = (1 / 2) * (
1
+ scipy.special.erf(
(np.log(z_line) - mu) / (np.sqrt(2) * sigma)
)
)
f = (
(1 / (sigma * np.sqrt(2 * np.pi)))
* np.exp(-((np.log(z_line) - mu) ** 2) / (2 * sigma**2))
* (1 / z_line)
)
f_prime = (
-1
* np.exp(-((np.log(z_line) - mu) ** 2) / (2 * sigma**2))
* (
(np.log(z_line) + sigma**2 - mu)
/ (z_line**2 * sigma**3 * np.sqrt(2 * np.pi))
)
)
elif dist_type == "kde":
# uses the original full data for kde estimation
f_function = st.gaussian_kde(
data[income_measure],
# bw_method=kde_bw,
weights=data[weight_var],
)
f = f_function.pdf(z_line)
F = np.cumsum(f)
f_prime = np.gradient(f, edge_order=2)
elif dist_type == "Pln":
def pln_pdf(y, mu, sigma, alpha):
x1 = alpha * sigma - (np.log(y) - mu) / sigma
phi = st.norm.pdf((np.log(y) - mu) / sigma)
R = (1 - st.norm.cdf(x1)) / (st.norm.pdf(x1) + 1e-15)
# 1e-15 to avoid division by zero
pdf = alpha / y * phi * R
return pdf
def neg_weighted_log_likelihood(params, data, weights):
mu, sigma, alpha = params
likelihood = np.sum(
weights * np.log(pln_pdf(data, mu, sigma, alpha) + 1e-15)
)
# 1e-15 to avoid log(0)
return -likelihood
def fit_pln(data, weights, initial_guess):
bounds = [(None, None), (0.01, None), (0.01, None)]
result = scipy.optimize.minimize(
neg_weighted_log_likelihood,
initial_guess,
args=(data, weights),
method="L-BFGS-B",
bounds=bounds,
)
return result.x
mu_initial = (
np.log(data[income_measure]) * data[weight_var]
).sum() / data[weight_var].sum()
sigmasq = (
(
((np.log(data[income_measure]) - mu_initial) ** 2)
* data[weight_var]
).values
/ data[weight_var].sum()
).sum()
sigma_initial = np.sqrt(sigmasq)
# Initial guess for m, sigma, alpha
initial_guess = np.array([mu_initial, sigma_initial, 1.5])
mu, sigma, alpha = fit_pln(
data[income_measure], data[weight_var], initial_guess
)
def pln_cdf(y, mu, sigma, alpha):
x1 = alpha * sigma - (np.log(y) - mu) / sigma
R = (1 - st.norm.cdf(x1)) / (st.norm.pdf(x1) + 1e-12)
CDF = (
st.norm.cdf((np.log(y) - mu) / sigma)
- st.norm.pdf((np.log(y) - mu) / sigma) * R
)
return CDF
def pln_dpdf(y, mu, sigma, alpha):
x = (np.log(y) - mu) / sigma
R = (1 - st.norm.cdf(alpha * sigma - x)) / (
st.norm.pdf(alpha * sigma - x) + 1e-15
)
left = (1 + x / sigma) * pln_pdf(y, mu, sigma, alpha)
right = (
alpha
* st.norm.pdf(x)
* ((alpha * sigma - x) * R - 1)
/ (sigma * y)
)
return -(left + right) / y
f = pln_pdf(z_line, mu, sigma, alpha)
F = pln_cdf(z_line, mu, sigma, alpha)
f_prime = pln_dpdf(z_line, mu, sigma, alpha)
else:
print("Please enter a valid value for dist_type")
assert False
return z_line, F, f, f_prime
[docs]
def sw_weights(self):
r"""
Returns the social welfare weights for a given tax policy.
See Jacobs, Jongen, and Zoutman (2017) and
Lockwood and Weinzierl (2016) for details.
.. math::
g_{z} = 1 + \theta_z \varepsilon^{c}\frac{T'(z)}{(1-T'(z))} +
\varepsilon^{c}\frac{zT''(z)}{(1-T''(z))^{2}}
Args:
None
Returns:
array_like: vector of social welfare weights across
the income distribution
"""
g_z = (
1
+ ((self.theta_z * self.eti * self.mtr) / (1 - self.mtr))
+ ((self.eti * self.z * self.mtr_prime) / (1 - self.mtr) ** 2)
)
integral = np.trapz(
g_z * self.f, self.z
) # renormalize to integrate to 1
g_z = g_z / integral
# use Lockwood and Weinzierl formula, which should be equivalent but using numerical differentiation
bracket_term = (
1
- self.F
- (self.mtr / (1 - self.mtr)) * self.eti * self.z * self.f
)
d_dz_bracket = np.gradient(bracket_term, edge_order=2)
# d_dz_bracket = np.diff(bracket_term) / np.diff(self.z)
# d_dz_bracket = np.append(d_dz_bracket, d_dz_bracket[-1])
g_z_numerical = -(1 / self.f) * d_dz_bracket
integral = np.trapz(g_z_numerical * self.f, self.z)
g_z_numerical = g_z_numerical / integral
return g_z, g_z_numerical
def find_eti(iot, g_z=None, eti_0=0.25):
"""
This function solves for the ETI that would result in the
policy represented via MTRs in IOT being consistent with the
social welfare function supplied. It solves a first order
ordinary differential equation.
.. math::
\varepsilon'(z)\left[\frac{zT'(z)}{1-T'(z)}\right] + \varepsilon(z)\left[\theta_z \frac{T'(z)}{1-T'(z)} +\frac{zT''(z)}{(1-T'(z))^2}\right]+ (1-g(z))
Args:
iot (IOT): instance of the I
g_z (None or array_like): vector of social welfare weights
eti_0 (scalar): guess for ETI at z=0
Returns:
eti_beliefs (array-like): vector of ETI beliefs over z
"""
if g_z is None:
g_z = iot.g_z
# we solve an ODE of the form f'(z) + P(z)f(z) = Q(z)
P_z = (
1 / iot.z
+ iot.f_prime / iot.f
+ iot.mtr_prime / (iot.mtr * (1 - iot.mtr))
)
# integrating factor for ODE: mu(z) * f'(z) + mu(z) * P(z) * f(z) = mu(z) * Q(z)
mu_z = np.exp(np.cumsum(P_z))
Q_z = (g_z - 1) * (1 - iot.mtr) / (iot.mtr * iot.z)
# integrate Q(z) * mu(z), as we integrate both sides of the ODE
int_mu_Q = np.cumsum(mu_z * Q_z)
eti_beliefs = (eti_0 + int_mu_Q) / mu_z
return eti_beliefs
def wm(value, weight):
"""
Weighted mean function that allows for zero division
Args:
value (array_like): values to be averaged
weight (array_like): weights for each value
Returns:
scalar: weighted average
"""
try:
return np.average(value, weights=weight, axis=0)
except ZeroDivisionError:
return [np.nan, np.nan]