"""
-------------------------------------------------------------------------------
Functions to compute economic aggregates.
-------------------------------------------------------------------------------
"""
# Packages
import numpy as np
from ogcore import tax, pensions
"""
-------------------------------------------------------------------------------
Functions
-------------------------------------------------------------------------------
"""
[docs]
def get_L(n, p, method):
r"""
Calculate aggregate labor supply.
.. math::
L_{t} = \sum_{s=E}^{E+S}\sum_{j=0}^{J}\omega_{s,t}\lambda_{j}n_{j,s,t}
Args:
n (Numpy array): labor supply of households
p (OG-Core Specifications object): model parameters
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
Returns:
L (array_like): aggregate labor supply
"""
if method == "SS":
L_presum = (
np.squeeze(p.e[-1, :, :])
* np.transpose(p.omega_SS * p.lambdas)
* n
)
L = L_presum.sum()
elif method == "TPI":
L_presum = (n * (p.e * np.squeeze(p.lambdas))) * np.tile(
np.reshape(p.omega[: p.T, :], (p.T, p.S, 1)), (1, 1, p.J)
)
L = L_presum.sum(1).sum(1)
return L
[docs]
def get_I(b_splus1, K_p1, K, p, method):
r"""
Calculate aggregate investment.
.. math::
I_{t} = (1 + g_{n,t+1})e^{g_{y}}(K_{t+1} - \sum_{s=E}^{E+S}
\sum_{j=0}^{J}\omega_{s+1,t}i_{s+1,t}\lambda_{j}b_{j,s+1,t+1} \
(1+ g_{n,t+1})) - (1 - \delta)K_{t}
Args:
b_splus1 (Numpy array): savings of households
K_p1 (array_like): aggregate capital, one period ahead
K (array_like): aggregate capital
p (OG-Core Specifications object): model parameters
method (str): adjusts calculation dimensions based on 'SS' or
'TPI', also compute total investment (not net of immigrants)
Returns:
aggI (array_like): aggregate investment
"""
if method == "SS":
omega_extended = np.append(p.omega_SS[1:], [0.0])
imm_extended = np.append(p.imm_rates[-1, 1:], [0.0])
part2 = (
(
b_splus1
* np.transpose((omega_extended * imm_extended) * p.lambdas)
).sum()
) / (1 + p.g_n_ss)
aggI = (1 + p.g_n_ss) * np.exp(p.g_y) * (K_p1 - part2) - (
1.0 - p.delta
) * K
elif method == "TPI":
omega_shift = np.append(p.omega[: p.T, 1:], np.zeros((p.T, 1)), axis=1)
imm_shift = np.append(
p.imm_rates[: p.T, 1:], np.zeros((p.T, 1)), axis=1
)
part2 = (
(
(b_splus1 * np.squeeze(p.lambdas))
* np.tile(
np.reshape(imm_shift * omega_shift, (p.T, p.S, 1)),
(1, 1, p.J),
)
)
.sum(1)
.sum(1)
) / (1 + np.squeeze(np.hstack((p.g_n[1 : p.T], p.g_n_ss))))
aggI = (
1 + np.squeeze(np.hstack((p.g_n[1 : p.T], p.g_n_ss)))
) * np.exp(p.g_y) * (K_p1 - part2) - (1.0 - p.delta) * K
elif method == "total_ss":
aggI = ((1 + p.g_n_ss) * np.exp(p.g_y) - 1 + p.delta) * K
elif method == "total_tpi":
aggI = (1 + p.g_n[1 : p.T + 1]) * np.exp(p.g_y) * K_p1 - (
1.0 - p.delta
) * K
return aggI
[docs]
def get_B(b, p, method, preTP):
r"""
Calculate aggregate savings
.. math::
B_{t} = \sum_{s=E}^{E+S}\sum_{j=0}^{J}\omega_{s,t}\lambda_{j}b_{j,s,t}
Args:
b (Numpy array): savings of households
p (OG-Core Specifications object): model parameters
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
preTP (bool): whether calculation is for the pre-time path
period amount of savings. If True, then need to use
`omega_S_preTP`.
Returns:
B (array_like): aggregate supply of savings
"""
if method == "SS":
if preTP:
part1 = b * np.transpose(p.omega_S_preTP * p.lambdas)
omega_extended = np.append(p.omega_S_preTP[1:], [0.0])
imm_extended = np.append(p.imm_rates[0, 1:], [0.0])
pop_growth_rate = p.g_n[0]
else:
part1 = b * np.transpose(p.omega_SS * p.lambdas)
omega_extended = np.append(p.omega_SS[1:], [0.0])
imm_extended = np.append(p.imm_rates[-1, 1:], [0.0])
pop_growth_rate = p.g_n_ss
part2 = b * np.transpose(omega_extended * imm_extended * p.lambdas)
B_presum = part1 + part2
B = B_presum.sum()
B /= 1.0 + pop_growth_rate
elif method == "TPI":
part1 = (b * np.squeeze(p.lambdas)) * np.tile(
np.reshape(p.omega[: p.T, :], (p.T, p.S, 1)), (1, 1, p.J)
)
omega_shift = np.append(p.omega[: p.T, 1:], np.zeros((p.T, 1)), axis=1)
imm_shift = np.append(
p.imm_rates[: p.T, 1:], np.zeros((p.T, 1)), axis=1
)
part2 = (b * np.squeeze(p.lambdas)) * np.tile(
np.reshape(imm_shift * omega_shift, (p.T, p.S, 1)), (1, 1, p.J)
)
B_presum = part1 + part2
B = B_presum.sum(1).sum(1)
B /= 1.0 + np.hstack((p.g_n[1 : p.T], p.g_n_ss))
return B
[docs]
def get_BQ(r, b_splus1, j, p, method, preTP):
r"""
Calculation of aggregate bequests. If `use_zeta` is False, then
computes aggregate bequests within each lifetime income group.
.. math::
BQ_{t} = \sum_{s=E}^{E+S}\sum_{j=0}^{J}\rho_{s}\omega_{s,t}
\lambda_{j}b_{j,s+1,1}
Args:
r (array_like): the real interest rate
b_splus1 (numpy array): household savings one period ahead
j (int): index of lifetime income group
p (OG-Core Specifications object): model parameters
method (str): adjusts calculation dimensions based on SS or
TPI
preTP (bool): whether calculation is for the pre-time path
period amount of savings. If True, then need to use
`omega_S_preTP`.
Returns:
BQ (array_like): aggregate bequests, overall or by lifetime
income group, depending on `use_zeta` value.
"""
if method == "SS":
if preTP:
omega = p.omega_S_preTP
pop_growth_rate = p.g_n[0]
rho = p.rho[0, :]
else:
omega = p.omega_SS
pop_growth_rate = p.g_n_ss
rho = p.rho[-1, :]
if j is not None:
BQ_presum = omega * rho * b_splus1 * p.lambdas[j]
else:
BQ_presum = np.transpose(omega * (rho * p.lambdas)) * b_splus1
BQ = BQ_presum.sum(0)
BQ *= (1.0 + r) / (1.0 + pop_growth_rate)
elif method == "TPI":
pop = np.append(
p.omega_S_preTP.reshape(1, p.S), p.omega[: p.T - 1, :], axis=0
)
rho = np.append(
p.rho[0, :].reshape(1, p.S), p.rho[: p.T - 1, :], axis=0
)
if j is not None:
BQ_presum = (b_splus1 * p.lambdas[j]) * (pop * rho)
BQ = BQ_presum.sum(1)
BQ *= (1.0 + r) / (1.0 + p.g_n[: p.T])
else:
BQ_presum = (b_splus1 * np.squeeze(p.lambdas)) * np.tile(
np.reshape(pop * rho, (p.T, p.S, 1)), (1, 1, p.J)
)
BQ = BQ_presum.sum(1)
BQ *= np.tile(
np.reshape((1.0 + r) / (1.0 + p.g_n[: p.T]), (p.T, 1)),
(1, p.J),
)
if p.use_zeta:
if method == "SS":
BQ = BQ.sum()
else:
if not j:
BQ = BQ.sum(1)
return BQ
[docs]
def get_RM(Y, p, method):
r"""
Calculation of aggregate remittances.
.. math::
\hat{RM}_{t} = \begin{cases}
\alpha_{RM,1}\hat{Y}_t \quad\text{for}\quad t=1 \\
\frac{(1+g_{RM,t})}{e^{g_y}(1+\tilde{g}_{n,t})}\hat{RM}_{t-1}
\quad\text{for}\quad 2\leq t\leq T_{G1} \\
\rho_{RM}\alpha_{RM,T}\hat{Y}_t
+ (1-\rho_{RM})\frac{(1+g_{RM,t})}{e^{g_y}(1+\tilde{g}_{n,t})}
\hat{RM}_{t-1} \quad\text{for}\quad T_{G1}< t<T_{G2} \\
\alpha_{RM,T}\hat{Y}_t \quad\text{for}\quad t\geq T_{G2}
\end{cases}
Args:
Y (array_like): GDP steady state or time path
p (OG-Core Specifications object): model parameters
method (str): adjusts calculation dimensions based on SS or TPI
Returns:
RM (array_like): aggregate remittances
"""
if method == "SS":
RM = p.alpha_RM_T * Y
elif method == "TPI":
RM = np.zeros_like(Y)
RM[0] = p.alpha_RM_1 * Y[0]
for t in range(1, p.tG1):
RM[t] = ((1 + p.g_RM[t]) / (np.exp(p.g_y) * (1 + p.g_n[t]))) * RM[
t - 1
]
rho_vec = np.linspace(0, 1, p.tG2 - p.tG1)
for t in range(p.tG1, p.tG2 - 1):
RM[t] = (
rho_vec[t - p.tG1] * p.alpha_RM_T * Y[t]
+ (1 - rho_vec[t - p.tG1])
* ((1 + p.g_RM[t]) / (np.exp(p.g_y) * (1 + p.g_n[t])))
* RM[t - 1]
)
RM[p.tG2 - 1 :] = p.alpha_RM_T * Y[p.tG2 - 1 :]
return RM
[docs]
def get_C(c, p, method):
r"""
Calculation of aggregate consumption.
Set up to only take one consumption good at a time. This
function is called in a loop to get consumption for all goods.
.. math::
C_{t} = \sum_{s=E}^{E+S}\sum_{j=0}^{J}\omega_{s,t}
\lambda_{j}c_{j,s,t}
Args:
c (Numpy array): consumption of households
p (OG-Core Specifications object): model parameters
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
Returns:
C (array_like): aggregate consumption
"""
if method == "SS":
aggC = (
(c * np.transpose(p.omega_SS * p.lambdas).reshape(1, p.S, p.J))
.sum(-1)
.sum(-1)
)
elif method == "TPI":
aggC = (
(
(c * np.squeeze(p.lambdas))
* np.tile(
np.reshape(p.omega[: p.T, :], (p.T, p.S, 1)), (1, 1, p.J)
)
)
.sum(-1)
.sum(-1)
)
return aggC
[docs]
def revenue(
r,
w,
b,
n,
bq,
c,
Y,
L,
K,
p_m,
factor,
ubi,
theta,
etr_params,
e,
p,
m,
method,
):
r"""
Calculate aggregate tax revenue.
.. math::
\begin{split}
R_{t} &= \sum_{s=E}^{E+S}\sum_{j=0}^{J}\omega_{s,t}\lambda_{j}
(T_{j,s,t} + \tau^{p}_{t}w_{t}e_{j,s}n_{j,s,t} - \theta_{j}
w_{t} + \tau^{bq}bq_{j,s,t} + \tau^{c}_{s,t}c_{j,s,t} +
\tau^{w}_{t}b_{j,s,t}) ... \\
&\qquad + \sum_{m=1}^{M}\tau^{b}_{m,t}(Y_{m,t}-w_{t}L_{m,t}) -
\tau^{b}_{m,t}\delta^{\tau}_{m,t}K^{\tau}_{m,t}
\end{split}
Args:
r (array_like): the real interest rate
w (array_like): the real wage rate
b (Numpy array): household savings
n (Numpy array): household labor supply
bq (Numpy array): household bequests received
c (Numpy array): household consumption
Y (array_like): aggregate output
L (array_like): aggregate labor
K (array_like): aggregate capital
p_m (array_like): output prices
factor (scalar): scaling factor converting model units to
dollars
ubi (array_like): universal basic income household distributions
theta (Numpy array): social security replacement rate for each
lifetime income group
etr_params (list): list of parameters of the effective tax rate
functions
e (Numpy array): effective labor units
p (OG-Core Specifications object): model parameters
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
Returns:
total_tax_revenue (array_like): aggregate tax revenue
iit_payroll_tax_revenue (array_like): aggregate income and
payroll tax revenue
agg_pension_outlays (array_like): aggregate outlays for gov't
pensions
UBI_outlays (array_like): aggregate universal basic income (UBI)
outlays
bequest_tax_revenue (array_like): aggregate bequest tax revenue
wealth_tax_revenue (array_like): aggregate wealth tax revenue
cons_tax_revenue (array_like): aggregate consumption tax revenue
business_tax_revenue (array_like): aggregate business tax
revenue
payroll_tax_revenue (array_like): aggregate payroll tax revenue
iit_tax_revenue (array_like): aggregate income tax revenue
"""
inc_pay_tax_liab = tax.income_tax_liab(
r, w, b, n, factor, 0, None, method, e, etr_params, p
)
pension_benefits = pensions.pension_amount(
r, w, n, Y, theta, 0, None, False, method, e, factor, p
)
bq_tax_liab = tax.bequest_tax_liab(r, b, bq, 0, None, method, p)
w_tax_liab = tax.wealth_tax_liab(r, b, 0, None, method, p)
if method == "SS":
p_i = np.dot(p.io_matrix, p_m)
pop_weights = np.transpose(p.omega_SS * p.lambdas)
iit_payroll_tax_revenue = (inc_pay_tax_liab * pop_weights).sum()
agg_pension_outlays = (pension_benefits * pop_weights).sum()
UBI_outlays = (ubi * pop_weights).sum()
wealth_tax_revenue = (w_tax_liab * pop_weights).sum()
bequest_tax_revenue = (bq_tax_liab * pop_weights).sum()
cons_tax_revenue = (
((p.tau_c[-1, :] * p_i).reshape(p.I, 1, 1) * c).sum(axis=0)
* pop_weights
).sum()
payroll_tax_revenue = p.frac_tax_payroll[-1] * iit_payroll_tax_revenue
elif method == "TPI":
p_i = (
np.tile(p.io_matrix.reshape(1, p.I, p.M), (p.T, 1, 1))
* np.tile(p_m[: p.T, :].reshape(p.T, 1, p.M), (1, p.I, 1))
).sum(axis=2)
pop_weights = np.squeeze(p.lambdas) * np.tile(
np.reshape(p.omega[: p.T, :], (p.T, p.S, 1)), (1, 1, p.J)
)
iit_payroll_tax_revenue = (
(inc_pay_tax_liab * pop_weights).sum(1).sum(1)
)
agg_pension_outlays = (pension_benefits * pop_weights).sum(1).sum(1)
UBI_outlays = (ubi[: p.T, :, :] * pop_weights).sum(1).sum(1)
wealth_tax_revenue = (w_tax_liab * pop_weights).sum(1).sum(1)
bequest_tax_revenue = (bq_tax_liab * pop_weights).sum(1).sum(1)
cons_tax_revenue = (
(
((p.tau_c[: p.T, :] * p_i).reshape(p.T, p.I, 1, 1) * c).sum(
axis=1
)
* pop_weights
)
.sum(1)
.sum(1)
)
payroll_tax_revenue = (
p.frac_tax_payroll[: p.T] * iit_payroll_tax_revenue
)
business_tax_revenue = tax.get_biz_tax(w, Y, L, K, p_m, p, m, method).sum(
-1
)
iit_revenue = iit_payroll_tax_revenue - payroll_tax_revenue
total_tax_revenue = (
iit_payroll_tax_revenue
+ wealth_tax_revenue
+ bequest_tax_revenue
+ cons_tax_revenue
+ business_tax_revenue
)
return (
total_tax_revenue,
iit_payroll_tax_revenue,
agg_pension_outlays,
UBI_outlays,
bequest_tax_revenue,
wealth_tax_revenue,
cons_tax_revenue,
business_tax_revenue,
payroll_tax_revenue,
iit_revenue,
)
[docs]
def get_r_p(r, r_gov, p_m, K_vec, K_g, D, MPKg_vec, p, method):
r"""
Compute the interest rate on the household's portfolio of assets,
a mix of government debt and private equity.
.. math::
r_{p,t} = \frac{r_{gov,t}D_{t} + r_{K,t}K_{t}}{D_{t} + K_{t}}
Args:
r (array_like): the real interest rate
r_gov (array_like): the real interest rate on government debt
p_m (array_like): good prices
K_vec (array_like): aggregate capital demand from each industry
K_g (array_like): aggregate public capital
D (array_like): aggregate government debt
MPKg_vec (array_like): marginal product of government capital
for each industry
p (OG-Core Specifications object): model parameters
method (str): adjusts calculation dimensions based on 'SS' or
'TPI'
Returns:
r_p (array_like): the real interest rate on the household portfolio
"""
if method == "SS":
tau_b = p.tau_b[-1, :]
T = 1
else:
T = p.T
tau_b = p.tau_b[: p.T, :].reshape((p.T, p.M))
K_g = K_g.reshape((p.T, 1))
r = r.reshape((p.T, 1))
r_gov = r_gov.reshape((p.T, 1))
D = D.reshape((p.T, 1))
p_m = p_m.reshape((p.T, p.M))
MPKg_vec = MPKg_vec.reshape((p.T, p.M))
K_vec = K_vec.reshape((p.T, p.M))
r_K = r + (
((1 - tau_b) * p_m * MPKg_vec * K_g).sum(axis=-1).reshape((T, 1))
/ K_vec.sum(axis=-1).reshape((T, 1))
)
r_p = ((r_gov * D) + (r_K * K_vec.sum(axis=-1).reshape((T, 1)))) / (
D + K_vec.sum(axis=-1).reshape((T, 1))
)
return np.squeeze(r_p)
[docs]
def resource_constraint(Y, C, G, I_d, I_g, net_capital_flows, RM):
r"""
Compute the error in the resource constraint.
.. math::
\begin{split}
\text{rc_error} &= \hat{Y}_t - \hat{C}_t -
\Bigl(e^{g_y}\bigl[1 + \tilde{g}_{n,t+1}\bigr]\hat{K}^d_{t+1} -
\hat{K}^d_t\Bigr) - \delta\hat{K}_t - \hat{G}_t - \hat{I}_{g,t} ... \\
&\qquad -\: \hat{\text{net capital outflows}}_t - \hat{RM}_t
\end{split}
Args:
Y (array_like): aggregate output by industry
C (array_like): aggregate consumption by industry
G (array_like): aggregate government spending by industry
I_d (array_like): aggregate private investment from domestic households
I_g (array_like): investment in government capital
net_capital_flows (array_like): net capital outflows
RM (array_like): aggregate remittances
Returns:
rc_error (array_like): error in the resource constraint
"""
rc_error = Y - C - I_d - I_g - G - net_capital_flows - RM
return rc_error
def get_capital_outflows(r, K_f, new_borrowing_f, debt_service_f, p):
r"""
Compute net capital outflows for open economy parameterizations
.. math::
\text{net capital flows} &= r_{p,t}\hat{K}^f_t ... \\
&\quad\quad + \Bigl(e^{g_y}\bigl[1 +
\tilde{g}_{n,t+1}\bigr]\hat{D}^f_{t+1} - \hat{D}^f_t\Bigr) -
r_{p,t}\hat{D}^f_t \quad\forall t
Args:
r (array_like): the real interest rate
K_f (array_like): aggregate capital that is foreign-owned
new_borrowing_f (array_like): new borrowing of government debt
from foreign investors
debt_service_f (array_like): interest payments on government
debt owned by foreigners
p (OG-Core Specifications object): model parameters
Returns:
new_flow (array_like): net capital outflows
"""
net_flow = (r + p.delta) * K_f - new_borrowing_f + debt_service_f
return net_flow
[docs]
def get_K_splits(B, K_demand_open, D_d, zeta_K):
r"""
Returns total domestic capital as well as amounts of domestic
capital held by domestic and foreign investors separately.
.. math::
\begin{split}
\hat{K}_{t} &= \hat{K}^{f}_{t} + \hat{K}^{d}_{t}\\
\hat{K}^{d}_{t} &= \hat{B}_{t} + \hat{D}^{d}_{t}\\
\hat{K}^{f}_{t} &= \zeta_{D}\left(\hat{K}^{open}_{t} -
K^{d}_{t}\right)
\end{split}
Args:
B (array_like): aggregate savings by domestic households
K_demand_open (array_like): capital demand at the world
interest rate
D_d (array_like): governmet debt held by domestic households
zeta_K (array_like): fraction of excess capital demand satisfied
by foreign investors
Returns:
(tuple): series of capital stocks:
* K (array_like): total capital
* K_d (array_like): capital held by domestic households
* K_f (array_like): capital held by foreign households
"""
K_d = B - D_d
if np.any(K_d < 0):
print(
"K_d has negative elements. Setting them "
+ "positive to prevent NAN."
)
K_d = np.fmax(K_d, 0.05 * B)
K_f = zeta_K * (K_demand_open - B + D_d)
K = K_f + K_d
return K, K_d, K_f
[docs]
def get_ptilde(p_i, tau_c, alpha_c, method="SS"):
r"""
Calculate price of composite good.
.. math::
\tilde{p}_{t} = \prod_{i=1}^{I} \left(\frac{(1 +
\tau^{c}_{i,t})p_{i,j}}{\alpha_{i,j}}\right)^{\alpha_{i,j}}
Args:
p_i (array_like): prices for consumption good i
tau_c (array_like): consumption taxes on good i
alpha_c (array_like): consumption share parameters
Returns:
p_tilde (array_like): tax-inclusive price of composite good
"""
if method == "SS":
p_tilde = np.prod((((1 + tau_c) * p_i) / alpha_c) ** alpha_c)
else: # TPI case
alpha_c = alpha_c.reshape(1, alpha_c.shape[0])
p_tilde = np.prod((((1 + tau_c) * p_i) / alpha_c) ** alpha_c, axis=1)
return p_tilde