"""
-------------------------------------------------------------------------------
Functions for generating demographic objects necessary for the OG-UK model
This module contains the following functions:
get_un_fert_data()
get_un_mort_data()
get_wb_infmort_rate()
get_un_pop_data()
get_fert()
get_mort()
pop_rebin()
get_imm_resid()
immsolve()
get_pop_objs()
extrap_exp_3()
extrap_arctan_3()
ab_zero_eqs_exp_func()
b_zero_eq_artctan_func()
-------------------------------------------------------------------------------
"""
# Import packages
import os
import numpy as np
import pandas as pd
import scipy.optimize as opt
from ogcore import parameter_plots as pp
from pandas_datareader import wb
import matplotlib.pyplot as plt
# create output director for figures
CUR_PATH = os.path.split(os.path.abspath(__file__))[0]
DATA_DIR = os.path.join(CUR_PATH, "data", "demographic")
OUTPUT_DIR = os.path.join(CUR_PATH, "OUTPUT", "Demographics")
if os.access(OUTPUT_DIR, os.F_OK) is False:
os.makedirs(OUTPUT_DIR)
"""
------------------------------------------------------------------------
Define functions
------------------------------------------------------------------------
"""
[docs]
def get_un_fert_data(
country_id: str = "826",
start_year: int = 2021,
end_year: int = None,
download: bool = True,
) -> pd.DataFrame:
"""
Get UN fertility rate data for a country for some range of years (at least
one year) and by age. The country_id=826 is for United Kingdom. These data
come from the United Nations Data Portal API for UN population data (see
https://population.un.org/dataportal/about/dataapi)
Args:
country_id (str): 3-digit country id (numerical)
start_year (int): beginning year of the data
end_year (int or None): end year of the data
download (bool): whether to download the data from the UN Data Portal.
If False, a path must be specified in the path_folder argument.
path_folder (None or str): string path to folder where data are stored
Returns:
fert_rates_df (DataFrame): dataset with fertility rates by age
"""
if start_year > 2021: # 2021 is the most recent non-forecast data
start_year = 2021
if end_year is None:
end_year = start_year
# UN variable code for Population by 1-year age groups and sex
pop_code = "47"
# UN variable code for Fertility rates by age of mother (1-year)
fert_code = "68"
if download:
pop_target = (
"https://population.un.org/dataportalapi/api/v1/data/indicators/"
+ pop_code
+ "/locations/"
+ country_id
+ "/start/"
+ str(start_year)
+ "/end/"
+ str(end_year)
+ "?format=csv"
)
fert_target = (
"https://population.un.org/dataportalapi/api/v1/data/indicators/"
+ fert_code
+ "/locations/"
+ country_id
+ "/start/"
+ str(start_year)
+ "/end/"
+ str(end_year)
+ "?format=csv"
)
else:
pop_target = os.path.join(DATA_DIR, "un_uk_pop.csv")
fert_target = os.path.join(DATA_DIR, "un_uk_fert.csv")
# Convert .csv file to Pandas DataFrame
pop_df = pd.read_csv(
pop_target,
sep="|",
header=1,
usecols=["TimeLabel", "SexId", "Sex", "AgeStart", "Value"],
float_precision="round_trip",
)
fert_rates_df = pd.read_csv(
fert_target,
sep="|",
header=1,
usecols=["TimeLabel", "AgeStart", "Value"],
float_precision="round_trip",
)
# Rename variables in the population and fertility rates data
pop_df.rename(
columns={
"TimeLabel": "year",
"SexId": "sex_num",
"Sex": "sex_str",
"AgeStart": "age",
"Value": "pop",
},
inplace=True,
)
fert_rates_df.rename(
columns={
"TimeLabel": "year",
"AgeStart": "age",
"Value": "births_p_1000f",
},
inplace=True,
)
# Merge in the male and female population by age data
fert_rates_df = fert_rates_df.merge(
pop_df[["year", "age", "pop"]][pop_df["sex_num"] == 1],
how="left",
on=["year", "age"],
)
fert_rates_df.rename(columns={"pop": "pop_male"}, inplace=True)
fert_rates_df = fert_rates_df.merge(
pop_df[["year", "age", "pop"]][pop_df["sex_num"] == 2],
how="left",
on=["year", "age"],
)
fert_rates_df.rename(columns={"pop": "pop_female"}, inplace=True)
fert_rates_df["fert_rate"] = fert_rates_df["births_p_1000f"] / (
1000 * (1 + (fert_rates_df["pop_male"] / fert_rates_df["pop_female"]))
)
fert_rates_df = fert_rates_df[
(
(fert_rates_df["year"] >= start_year)
& (fert_rates_df["year"] <= end_year)
)
]
return fert_rates_df
[docs]
def get_un_mort_data(
country_id: str = "826",
start_year: int = 2021,
end_year: int = None,
download: bool = True,
) -> pd.DataFrame:
"""
Get UN mortality rate data for a country for some range of years (at least
one year) and by age, and get infant mortality rate data. The
country_id=826 is for United Kingdom. These data come from the United
Nations Population Data Portal API for UN population data (see
https://population.un.org/dataportal/about/dataapi)
Args:
country_id (str): 3-digit country id (numerical)
start_year (int): beginning year of the data
end_year (int or None): end year of the data
download (bool): whether to download the data from the UN Data Portal.
If False, a path must be specified in the path_folder argument.
path_folder (None or str): string path to folder where data are stored
Returns:
infmort_rate_df (DataFrame): dataset with infant mortality rates by yr
mort_rates_df(DataFrame): dataset with mortality rates by age
"""
if start_year > 2021: # 2021 is the most recent non-forecast data
start_year = 2021
if end_year is None:
end_year = start_year
# UN variable code for Population by 1-year age groups and sex
pop_code = "47"
# # UN variable code for Age specific mortality rate
# mort_code = "80"
# We use deaths and population to compute mortality rates rather than the
# mortality rates data so that we have the option to have totpers<100.
# UN variable code for Deaths by 1-year age groups
deaths_code = "69"
# UN variable code for Age specific mortality rate
infmort_code = "22"
if download:
pop_target = (
"https://population.un.org/dataportalapi/api/v1/data/indicators/"
+ pop_code
+ "/locations/"
+ country_id
+ "/start/"
+ str(start_year)
+ "/end/"
+ str(end_year)
+ "?format=csv"
)
infmort_target = (
"https://population.un.org/dataportalapi/api/v1/data/indicators/"
+ infmort_code
+ "/locations/"
+ country_id
+ "/start/"
+ str(start_year)
+ "/end/"
+ str(end_year)
+ "?format=csv"
)
deaths_target = (
"https://population.un.org/dataportalapi/api/v1/data/indicators/"
+ deaths_code
+ "/locations/"
+ country_id
+ "/start/"
+ str(start_year)
+ "/end/"
+ str(end_year)
+ "?format=csv"
)
else:
pop_target = os.path.join(DATA_DIR, "un_uk_pop.csv")
infmort_target = os.path.join(DATA_DIR, "un_uk_infmort.csv")
deaths_target = os.path.join(DATA_DIR, "un_uk_deaths.csv")
# Convert .csv file to Pandas DataFrame
pop_df = pd.read_csv(
pop_target,
sep="|",
header=1,
usecols=["TimeLabel", "SexId", "Sex", "AgeStart", "Value"],
float_precision="round_trip",
)
infmort_rate_df = pd.read_csv(
infmort_target,
sep="|",
header=1,
usecols=["TimeLabel", "SexId", "Sex", "Value"],
float_precision="round_trip",
)
deaths_df = pd.read_csv(
deaths_target,
sep="|",
header=1,
usecols=["TimeLabel", "SexId", "Sex", "AgeStart", "Value"],
float_precision="round_trip",
)
# Rename variables in the population and mortality rates data
pop_df.rename(
columns={
"TimeLabel": "year",
"SexId": "sex_num",
"Sex": "sex_str",
"AgeStart": "age",
"Value": "pop",
},
inplace=True,
)
infmort_rate_df.rename(
columns={
"TimeLabel": "year",
"SexId": "sex_num",
"Sex": "sex_str",
"Value": "inf_deaths_p_1000",
},
inplace=True,
)
deaths_df.rename(
columns={
"TimeLabel": "year",
"SexId": "sex_num",
"Sex": "sex_str",
"AgeStart": "age",
"Value": "deaths",
},
inplace=True,
)
# Merge in the male and female population by age data to the deaths_df
deaths_df = deaths_df.merge(
pop_df,
how="left",
on=["year", "sex_num", "sex_str", "age"],
)
deaths_df["mort_rate"] = deaths_df["deaths"] / deaths_df["pop"]
deaths_df = deaths_df[
((deaths_df["year"] >= start_year) & (deaths_df["year"] <= end_year))
]
mort_rates_df = deaths_df.copy()
# Clean the data
infmort_rate_df["infmort_rate"] = (
infmort_rate_df["inf_deaths_p_1000"] / 1000
)
infmort_rate_df = infmort_rate_df[
(
(infmort_rate_df["year"] >= start_year)
& (infmort_rate_df["year"] <= end_year)
)
]
return infmort_rate_df, mort_rates_df
[docs]
def get_wb_infmort_rate(
country: str = "GBR",
start_year: int = 2020,
end_year: int = None,
download: bool = True,
) -> np.float64:
"""
Get World Bank infant mortality rate measure from neonatal mortality rate
(deaths per 1,000 live births, divided by 1,0000)
https://data.worldbank.org/indicator/SH.DYN.NMRT
Args:
country (str): 3-digit country id (alphabetic)
start_year (int): beginning year of the data
end_year (int or None): end year of the data
download (bool): whether to download the data from the UN Data Portal.
If False, a path must be specified in the path_folder argument.
Returns:
wb_infmort_rate (float): neonatal infant mortality rate
"""
if start_year > 2020: # 2020 is the most recent data
start_year = 2020
if end_year is None:
end_year = start_year
if download:
wb_infmort_rate = (
wb.download(
indicator="SH.DYN.NMRT",
country=country,
start=start_year,
end=end_year,
).squeeze()
/ 1000
)
else:
# Hard code the infant mortality rate for South Africa from the most
# recent year (2020)
wb_infmort_rate = 0.0027
return wb_infmort_rate
[docs]
def get_un_pop_data(
country_id: str = "826",
start_year: int = 2021,
end_year: int = None,
download: bool = True,
) -> pd.DataFrame:
"""
Get UN population data for a country for some range of years (at least
one year) and by age. The country_id=826 is for United Kingdom. These data
come from the United Nations Data Portal API for UN population data (see
https://population.un.org/dataportal/about/dataapi)
Args:
country_id (str): 3-digit country id (numerical)
start_year (int): beginning year of the data
end_year (int): end year of the data
download (bool): whether to download the data from the UN Data Portal.
If False, a path must be specified in the path_folder argument.
path_folder (None or str): string path to folder where data are stored
Returns:
pop_df (DataFrame): dataset with total population by age
"""
if end_year is None:
end_year = start_year
# UN variable code for Population by 1-year age groups and sex
pop_code = "47"
if download:
pop_target = (
"https://population.un.org/dataportalapi/api/v1/data/indicators/"
+ pop_code
+ "/locations/"
+ country_id
+ "/start/"
+ str(start_year)
+ "/end/"
+ str(end_year)
+ "?format=csv"
)
else:
pop_target = os.path.join(DATA_DIR, "un_uk_pop.csv")
# Convert .csv file to Pandas DataFrame
pop_df = pd.read_csv(
pop_target,
sep="|",
header=1,
usecols=["TimeLabel", "SexId", "Sex", "AgeStart", "Value"],
float_precision="round_trip",
)
# Rename variables in the population and fertility rates data
pop_df.rename(
columns={
"TimeLabel": "year",
"SexId": "sex_num",
"Sex": "sex_str",
"AgeStart": "age",
"Value": "pop",
},
inplace=True,
)
# Clean the data
pop_df = pop_df[
((pop_df["year"] >= start_year) & (pop_df["year"] <= end_year))
]
return pop_df
[docs]
def get_fert(totpers, start_year=2021, end_year=None, graph=False):
"""
This function generates a vector of fertility rates by model period
age that corresponds to the fertility rate data by age in years.
Args:
totpers (int): total number of agent life periods (E+S), >= 3
start_year (int): first year data to download
end_year (int or None): end year data to download
graph (bool): =True if want graphical output
Returns:
fert_rates (Numpy array): fertility rates for each model period
of life
"""
if totpers > 100:
err_msg = "ERROR get_fert(): totpers must be <= 100."
raise ValueError(err_msg)
# Get UN fertility rates for United Kingdom for ages 15-49
ages_15_49 = np.arange(15, 50)
fert_rates_15_49 = (
get_un_fert_data(
start_year=start_year, end_year=end_year, download=False
)["fert_rate"]
.to_numpy()
.flatten()
)
# Extrapolate fertility rates for ages 1-14 and 50-100 using exponential
# function
ages_1_14 = np.arange(1, 15)
slope_15 = (fert_rates_15_49[1] - fert_rates_15_49[0]) / (
ages_15_49[1] - ages_15_49[0]
)
fert_rates_1_14 = extrap_exp_3(
ages_1_14, (15, fert_rates_15_49[0]), slope_15, (9, 0.0001), low=True
)
ages_50_100 = np.arange(50, 101)
slope_49 = (fert_rates_15_49[-1] - fert_rates_15_49[-2]) / (
ages_15_49[-1] - ages_15_49[-2]
)
fert_rates_50_100 = extrap_exp_3(
ages_50_100,
(49, fert_rates_15_49[-1]),
slope_49,
(57, 0.0001),
low=False,
)
fert_rates_1_100 = np.hstack(
(fert_rates_1_14, fert_rates_15_49, fert_rates_50_100)
)
if totpers == 100:
fert_rates = fert_rates_1_100.copy()
ages = np.arange(1, 101)
elif totpers < 100:
# Create population weighted average fertility rates across bins
# Get population data for ages 1-100
pop_df = get_un_pop_data(
start_year=start_year, end_year=end_year, download=False
)
pop_1_100 = (
pop_df[((pop_df["age"] < 100) & (pop_df["sex_num"] == 3))]["pop"]
.to_numpy()
.flatten()
)
fert_rates = np.zeros(totpers)
len_subbins = len_subbins = np.float64(100 / totpers)
end_sub_bin = int(0)
end_pct = 0.0
for i in range(totpers):
if end_pct < 1.0:
beg_sub_bin = int(end_sub_bin)
beg_pct = 1 - end_pct
elif end_pct == 1.0:
beg_sub_bin = 1 + int(end_sub_bin)
beg_pct = 1.0
end_sub_bin = int((i + 1) * len_subbins)
if (i + 1) * len_subbins - end_sub_bin == 0.0:
end_sub_bin = end_sub_bin - 1
end_pct = 1
elif (i + 1) * len_subbins - end_sub_bin > 0.0:
end_pct = (i + 1) * len_subbins - end_sub_bin
fert_rates_sub = fert_rates_1_100[beg_sub_bin : end_sub_bin + 1]
pop_sub = pop_1_100[beg_sub_bin : end_sub_bin + 1]
pop_sub[0] = beg_pct * pop_sub[0]
pop_sub[-1] = end_pct * pop_sub[-1]
fert_rates[i] = ((fert_rates_sub * pop_sub) / pop_sub.sum()).sum()
ages = np.arange(1, totpers + 1)
if graph: # Plot fertility rates
plt.plot(ages, fert_rates)
plt.scatter(ages, fert_rates, marker="d")
plt.xlabel(r"Age $s$")
plt.ylabel(r"Fertility rate $f_{s}$")
plt.text(
-0,
-0.023,
"Source: UN Population Data",
fontsize=9,
)
plt.tight_layout(rect=(0, 0.035, 1, 1))
output_path = os.path.join(OUTPUT_DIR, "fert_rates")
plt.savefig(output_path)
plt.close()
return fert_rates
[docs]
def get_mort(totpers, start_year=2021, end_year=None, graph=False):
"""
This function generates a vector of mortality rates by model period
age. Source: UN Population Data portal.
Args:
totpers (int): total number of agent life periods (E+S), >= 3
start_year (int): first year data to download
end_year (int or None): end year data to download
graph (bool): =True if want graphical output
Returns:
mort_rates (Numpy array) mortality rates that correspond to each
period of life
infmort_rate (scalar): infant mortality rate
"""
if totpers > 100:
err_msg = "ERROR get_mort(): totpers must be <= 100."
raise ValueError(err_msg)
# Get UN infant mortality and mortality rate data by age
un_infmort_rate_df, mort_rates_df = get_un_mort_data(
start_year=start_year, end_year=end_year, download=False
)
un_infmort_rate = un_infmort_rate_df["infmort_rate"][
un_infmort_rate_df["sex_num"] == 3
].to_numpy()[0]
# Use World Bank infant mortality rate data (neonatal mortality rate) from
# World Bank World Development Indicators database
most_recent_wb_infmort_datayear = 2020
if start_year > most_recent_wb_infmort_datayear:
wb_infmort_rate = get_wb_infmort_rate(
start_year=most_recent_wb_infmort_datayear, download=False
)
else:
wb_infmort_rate = get_wb_infmort_rate(
start_year=start_year, download=False
)
infmort_rate = wb_infmort_rate
if totpers == 100:
mort_rates = (
mort_rates_df["mort_rate"][
(
(mort_rates_df["sex_num"] == 3)
& (mort_rates_df["age"] < 100)
)
]
.to_numpy()
.flatten()
)
elif totpers < 100:
# Create population weighted average mortality rates across bins
mort_rates = np.zeros(totpers)
len_subbins = np.float64(100 / totpers)
end_sub_bin = int(0)
end_pct = 0.0
deaths_0_99 = (
mort_rates_df[
(
(mort_rates_df["sex_num"] == 3)
& (mort_rates_df["age"] < 100)
)
]["deaths"]
.to_numpy()
.flatten()
)
pop_0_99 = (
mort_rates_df[
(
(mort_rates_df["sex_num"] == 3)
& (mort_rates_df["age"] < 100)
)
]["pop"]
.to_numpy()
.flatten()
)
deaths_pop_0_99 = mort_rates_df[
((mort_rates_df["sex_num"] == 3) & (mort_rates_df["age"] < 100))
][["age", "deaths", "pop"]]
for i in range(totpers):
if end_pct < 1.0:
beg_sub_bin = int(end_sub_bin)
beg_pct = 1 - end_pct
elif end_pct == 1.0:
beg_sub_bin = 1 + int(end_sub_bin)
beg_pct = 1.0
end_sub_bin = int((i + 1) * len_subbins)
if (i + 1) * len_subbins - end_sub_bin == 0.0:
end_sub_bin = end_sub_bin - 1
end_pct = 1
elif (i + 1) * len_subbins - end_sub_bin > 0.0:
end_pct = (i + 1) * len_subbins - end_sub_bin
deaths_sub = deaths_0_99[beg_sub_bin : end_sub_bin + 1]
pop_sub = pop_0_99[beg_sub_bin : end_sub_bin + 1]
deaths_sub[0] = beg_pct * deaths_sub[0]
pop_sub[0] = beg_pct * pop_sub[0]
deaths_sub[-1] = end_pct * deaths_sub[-1]
pop_sub[-1] = end_pct * pop_sub[-1]
mort_rates[i] = deaths_sub.sum() / pop_sub.sum()
# Artificially set the mortality rate of the oldest age to 1.
orig_end_mort_rate = mort_rates[-1]
mort_rates[-1] = 1.0
ages = np.arange(1, totpers + 1)
if graph:
mort_rates_all = np.hstack((infmort_rate, mort_rates))
mort_rates_all[-1] = orig_end_mort_rate
plt.plot(np.arange(0, totpers + 1), mort_rates_all)
plt.scatter(
0,
infmort_rate,
c="green",
marker="d",
label="Infant mortality rate",
)
plt.scatter(
ages,
np.hstack((mort_rates[:-1], orig_end_mort_rate)),
c="blue",
marker="d",
label="Mortality rates, model ages 1 to " + str(totpers),
)
plt.scatter(
totpers,
1.0,
c="red",
marker="d",
label="Artificial mortality limit, model age " + str(totpers),
)
plt.xlabel(r"Age $s$")
plt.ylabel(r"Mortality rate $\rho_{s}$")
plt.legend(loc="upper left")
plt.text(
0,
-0.23,
"Source: UN Population Data",
fontsize=9,
)
plt.tight_layout(rect=(0, 0.035, 1, 1))
output_path = os.path.join(OUTPUT_DIR, "mort_rates")
plt.savefig(output_path)
plt.close()
return mort_rates, infmort_rate
[docs]
def pop_rebin(curr_pop_dist, totpers_new):
"""
For cases in which totpers (E+S) is less than the number of periods
in the population distribution data, this function calculates a new
population distribution vector with totpers (E+S) elements.
Args:
curr_pop_dist (Numpy array): population distribution over N
periods
totpers_new (int): number of periods to which we are
transforming the population distribution, >= 3
Returns:
curr_pop_new (Numpy array): new population distribution over
totpers (E+S) periods that approximates curr_pop_dist
"""
assert totpers_new >= 3
# Number of periods in original data
totpers_orig = len(curr_pop_dist)
if int(totpers_new) == totpers_orig:
curr_pop_new = curr_pop_dist
elif int(totpers_new) < totpers_orig:
curr_pop_new = np.zeros(totpers_new)
len_subbins = np.float64(totpers_orig / totpers_new)
end_sub_bin = int(0)
end_pct = 0.0
for i in range(totpers_new):
if end_pct < 1.0:
beg_sub_bin = int(end_sub_bin)
beg_pct = 1 - end_pct
elif end_pct == 1.0:
beg_sub_bin = 1 + int(end_sub_bin)
beg_pct = 1.0
end_sub_bin = int((i + 1) * len_subbins)
if (i + 1) * len_subbins - end_sub_bin == 0.0:
end_sub_bin = end_sub_bin - 1
end_pct = 1
elif (i + 1) * len_subbins - end_sub_bin > 0.0:
end_pct = (i + 1) * len_subbins - end_sub_bin
curr_pop_sub = curr_pop_dist[beg_sub_bin : end_sub_bin + 1]
curr_pop_sub[0] = beg_pct * curr_pop_sub[0]
curr_pop_sub[-1] = end_pct * curr_pop_sub[-1]
curr_pop_new[i] = curr_pop_sub.sum()
return curr_pop_new
[docs]
def get_imm_resid(totpers, start_year=2021, end_year=None, graph=False):
"""
Calculate immigration rates by age as a residual given population levels in
different periods, then output average calculated immigration rate. We have
to replace the first mortality rate in this function in order to adjust the
first implied immigration rate. Source: UN Population Data portal.
Args:
totpers (int): total number of agent life periods (E+S), >= 3
start_year (int): first year data to download
end_year (int or None): end year data to download
graph (bool): =True if want graphical output
Returns:
imm_rates (Numpy array):immigration rates that correspond to
each period of life, length E+S
"""
pop_df = get_un_pop_data(start_year=2019, end_year=2021, download=False)
pop_2019 = (
pop_df["pop"][
(
(pop_df["sex_num"] == 3)
& (pop_df["year"] == 2019)
& (pop_df["age"] < 100)
)
]
.to_numpy()
.flatten()
)
pop_2020 = (
pop_df["pop"][
(
(pop_df["sex_num"] == 3)
& (pop_df["year"] == 2020)
& (pop_df["age"] < 100)
)
]
.to_numpy()
.flatten()
)
pop_2021 = (
pop_df["pop"][
(
(pop_df["sex_num"] == 3)
& (pop_df["year"] == 2021)
& (pop_df["age"] < 100)
)
]
.to_numpy()
.flatten()
)
pop_2019_EpS = pop_rebin(pop_2019, totpers)
pop_2020_EpS = pop_rebin(pop_2020, totpers)
pop_2021_EpS = pop_rebin(pop_2021, totpers)
fert_rates = get_fert(totpers, start_year=start_year, end_year=end_year)
mort_rates, infmort_rate = get_mort(
totpers, start_year=start_year, end_year=end_year
)
imm_rate_1_2020 = (
pop_2021_EpS[0]
- (1 - infmort_rate) * (fert_rates * pop_2020_EpS).sum()
) / pop_2020_EpS[0]
imm_rate_1_2019 = (
pop_2020_EpS[0]
- (1 - infmort_rate) * (fert_rates * pop_2019_EpS).sum()
) / pop_2019_EpS[0]
imm_rate_1 = (imm_rate_1_2020 + imm_rate_1_2019) / 2
imm_rates_sp1_2020 = (
pop_2021_EpS[1:] - (1 - mort_rates[:-1]) * pop_2020_EpS[:-1]
) / pop_2020_EpS[:-1]
imm_rates_sp1_2019 = (
pop_2020_EpS[1:] - (1 - mort_rates[:-1]) * pop_2019_EpS[:-1]
) / pop_2019_EpS[:-1]
imm_rates_sp1 = (imm_rates_sp1_2020 + imm_rates_sp1_2019) / 2
imm_rates = np.hstack((imm_rate_1, imm_rates_sp1))
if graph:
ages = np.arange(1, totpers + 1)
plt.plot(ages, imm_rates, label="Residual data")
plt.xlabel(r"Age $s$")
plt.ylabel(r"immigration rates $\i_s$")
output_path = os.path.join(OUTPUT_DIR, "imm_rates")
plt.savefig(output_path)
plt.close()
return imm_rates
[docs]
def immsolve(imm_rates, *args):
"""
This function generates a vector of errors representing the
difference in two consecutive periods stationary population
distributions. This vector of differences is the zero-function
objective used to solve for the immigration rates vector, similar to
the original immigration rates vector from get_imm_resid(), that
sets the steady-state population distribution by age equal to the
population distribution in period int(1.5*S)
Args:
imm_rates (Numpy array):immigration rates that correspond to
each period of life, length E+S
args (tuple): (fert_rates, mort_rates, infmort_rate, omega_cur,
g_n_ss)
Returns:
omega_errs (Numpy array): difference between omega_new and
omega_cur_pct, length E+S
"""
fert_rates, mort_rates, infmort_rate, omega_cur_lev, g_n_ss = args
omega_cur_pct = omega_cur_lev / omega_cur_lev.sum()
totpers = len(fert_rates)
OMEGA = np.zeros((totpers, totpers))
OMEGA[0, :] = (1 - infmort_rate) * fert_rates + np.hstack(
(imm_rates[0], np.zeros(totpers - 1))
)
OMEGA[1:, :-1] += np.diag(1 - mort_rates[:-1])
OMEGA[1:, 1:] += np.diag(imm_rates[1:])
omega_new = np.dot(OMEGA, omega_cur_pct) / (1 + g_n_ss)
omega_errs = omega_new - omega_cur_pct
return omega_errs
[docs]
def get_pop_objs(E, S, T, curr_year, GraphDiag=False):
"""
This function produces the demographics objects to be used in the OG-ZAF
model package.
Args:
E (int): number of model periods in which agent is not
economically active, >= 1
S (int): number of model periods in which agent is economically
active, >= 3
T (int): number of periods to be simulated in TPI, > 2*S
curr_year (int): current year for which analysis will begin,
>= 2016
GraphDiag (bool): =True if want graphical output and printed
diagnostics
Returns:
pop_dict (dict): includes:
omega_path_S (Numpy array), time path of the population
distribution from the current state to the steady-state,
size T+S x S
g_n_ss (scalar): steady-state population growth rate
omega_SS (Numpy array): normalized steady-state population
distribution, length S
surv_rates (Numpy array): survival rates that correspond to
each model period of life, length S
mort_rates (Numpy array): mortality rates that correspond to
each model period of life, length S
g_n_path (Numpy array): population growth rates over the time
path, length T + S
"""
assert curr_year >= 2021
fert_rates = get_fert(E + S, start_year=curr_year)
mort_rates, infmort_rate = get_mort(E + S, start_year=curr_year)
mort_rates_S = mort_rates[-S:]
imm_rates_orig = get_imm_resid(E + S, start_year=curr_year)
OMEGA_orig = np.zeros((E + S, E + S))
OMEGA_orig[0, :] = (1 - infmort_rate) * fert_rates
OMEGA_orig[1:, :-1] += np.diag(1 - mort_rates[:-1])
OMEGA_orig += np.diag(imm_rates_orig)
# Solve for steady-state population growth rate and steady-state
# population distribution by age using eigenvalue and eigenvector
# decomposition
eigvalues_orig, eigvectors_orig = np.linalg.eig(OMEGA_orig)
g_n_ss_orig = (eigvalues_orig[np.isreal(eigvalues_orig)].real).max() - 1
eigvec_raw_orig = eigvectors_orig[
:, (eigvalues_orig[np.isreal(eigvalues_orig)].real).argmax()
].real
omega_SS_orig = eigvec_raw_orig / eigvec_raw_orig.sum()
# Generate time path of the nonstationary population distribution
omega_path_lev = np.zeros((E + S, T + S))
pop_df = get_un_pop_data(start_year=2019, end_year=2021, download=False)
pop_2020 = (
pop_df["pop"][
(
(pop_df["sex_num"] == 3)
& (pop_df["year"] == 2020)
& (pop_df["age"] < 100)
)
]
.to_numpy()
.flatten()
)
pop_2021 = (
pop_df["pop"][
(
(pop_df["sex_num"] == 3)
& (pop_df["year"] == 2021)
& (pop_df["age"] < 100)
)
]
.to_numpy()
.flatten()
)
age_per_EpS = np.arange(1, E + S + 1)
pop_2020_EpS = pop_rebin(pop_2020, E + S)
pop_2021_EpS = pop_rebin(pop_2021, E + S)
pop_2021_pct = pop_2021_EpS / pop_2021_EpS.sum()
# Age most recent population data to the current year of analysis
most_recent_data_year = 2021
pop_curr = pop_2020_EpS.copy()
# pop_past = pop_2020_EpS.copy()
if curr_year == most_recent_data_year:
pop_past = pop_curr.copy()
pop_curr = np.dot(OMEGA_orig, pop_past)
g_n_curr = (pop_curr[-S:].sum() - pop_past[-S:].sum()) / pop_past[
-S:
].sum()
omega_path_lev[:, 0] = pop_curr
elif curr_year > most_recent_data_year:
for per in range(curr_year - most_recent_data_year):
pop_past = pop_curr.copy()
pop_curr = np.dot(OMEGA_orig, pop_past)
g_n_curr = (pop_curr[-S:].sum() - pop_past[-S:].sum()) / pop_past[
-S:
].sum()
omega_path_lev[:, 0] = pop_curr
for per in range(1, T + S):
pop_next = np.dot(OMEGA_orig, pop_curr)
omega_path_lev[:, per] = pop_next.copy()
pop_curr = pop_next.copy()
# Force the population distribution after 1.5*S periods to be the
# steady-state distribution by adjusting immigration rates, holding
# constant mortality, fertility, and SS growth rates
imm_tol = 1e-14
fixper = int(1.5 * S)
omega_SSfx = omega_path_lev[:, fixper] / omega_path_lev[:, fixper].sum()
imm_objs = (
fert_rates,
mort_rates,
infmort_rate,
omega_path_lev[:, fixper],
g_n_ss_orig,
)
imm_fulloutput = opt.fsolve(
immsolve,
imm_rates_orig,
args=(imm_objs),
full_output=True,
xtol=imm_tol,
)
imm_rates_adj = imm_fulloutput[0]
imm_diagdict = imm_fulloutput[1]
omega_path_S = omega_path_lev[-S:, :] / np.tile(
omega_path_lev[-S:, :].sum(axis=0), (S, 1)
)
omega_path_S[:, fixper:] = np.tile(
omega_path_S[:, fixper].reshape((S, 1)), (1, T + S - fixper)
)
g_n_path = np.zeros(T + S)
g_n_path[0] = g_n_curr.copy()
g_n_path[1:] = (
omega_path_lev[-S:, 1:].sum(axis=0)
- omega_path_lev[-S:, :-1].sum(axis=0)
) / omega_path_lev[-S:, :-1].sum(axis=0)
# Compute adjusted population growth rate
OMEGA2 = np.zeros((E + S, E + S))
OMEGA2[0, :] = (1 - infmort_rate) * fert_rates
OMEGA2[1:, :-1] += np.diag(1 - mort_rates[:-1])
OMEGA2 += np.diag(imm_rates_adj)
eigvalues2, eigvectors2 = np.linalg.eig(OMEGA2)
g_n_ss_adj = (eigvalues2[np.isreal(eigvalues2)].real).max() - 1
g_n_ss = g_n_ss_adj.copy()
# g_n_path[fixper + 1 :] = g_n_ss
omega_S_preTP = pop_past[-S:] / pop_past[-S:].sum()
imm_rates_mat = np.hstack(
(
np.tile(np.reshape(imm_rates_orig[E:], (S, 1)), (1, fixper)),
np.tile(
np.reshape(imm_rates_adj[E:], (S, 1)), (1, T + S - fixper)
),
)
)
if GraphDiag:
# Check whether original SS population distribution is close to
# the period-T population distribution
omegaSSmaxdif = np.absolute(
omega_SS_orig - (omega_path_lev[:, T] / omega_path_lev[:, T].sum())
).max()
if omegaSSmaxdif > 0.0003:
print(
"POP. WARNING: Max. abs. dist. between original SS "
+ "pop. dist'n and period-T pop. dist'n is greater than"
+ " 0.0003. It is "
+ str(omegaSSmaxdif)
+ "."
)
else:
print(
"POP. SUCCESS: orig. SS pop. dist is very close to "
+ "period-T pop. dist'n. The maximum absolute "
+ "difference is "
+ str(omegaSSmaxdif)
+ "."
)
# Plot the adjusted steady-state population distribution versus
# the original population distribution. The difference should be
# small
omegaSSvTmaxdiff = np.absolute(omega_SS_orig - omega_SSfx).max()
if omegaSSvTmaxdiff > 0.0003:
print(
"POP. WARNING: The maximimum absolute difference "
+ "between any two corresponding points in the original"
+ " and adjusted steady-state population "
+ "distributions is"
+ str(omegaSSvTmaxdiff)
+ ", "
+ "which is greater than 0.0003."
)
else:
print(
"POP. SUCCESS: The maximum absolute difference "
+ "between any two corresponding points in the original"
+ " and adjusted steady-state population "
+ "distributions is "
+ str(omegaSSvTmaxdiff)
)
# Print whether or not the adjusted immigration rates solved the
# zero condition
immtol_solved = np.absolute(imm_diagdict["fvec"].max()) < imm_tol
if immtol_solved:
print(
"POP. SUCCESS: Adjusted immigration rates solved "
+ "with maximum absolute error of "
+ str(np.absolute(imm_diagdict["fvec"].max()))
+ ", which is less than the tolerance of "
+ str(imm_tol)
)
else:
print(
"POP. WARNING: Adjusted immigration rates did not "
+ "solve. Maximum absolute error of "
+ str(np.absolute(imm_diagdict["fvec"].max()))
+ " is greater than the tolerance of "
+ str(imm_tol)
)
# Test whether the steady-state growth rates implied by the
# adjusted OMEGA matrix equals the steady-state growth rate of
# the original OMEGA matrix
if np.max(np.absolute(g_n_ss_adj - g_n_ss_orig)) > 10 ** (-8):
print(
"FAILURE: The steady-state population growth rate"
+ " from adjusted OMEGA is different (diff is "
+ str(g_n_ss_adj - g_n_ss_orig)
+ ") than the steady-"
+ "state population growth rate from the original"
+ " OMEGA."
)
elif np.max(np.absolute(g_n_ss_adj - g_n_ss_orig)) <= 10 ** (-8):
print(
"SUCCESS: The steady-state population growth rate"
+ " from adjusted OMEGA is close to (diff is "
+ str(g_n_ss_adj - g_n_ss_orig)
+ ") the steady-"
+ "state population growth rate from the original"
+ " OMEGA."
)
# Do another test of the adjusted immigration rates. Create the
# new OMEGA matrix implied by the new immigration rates. Plug in
# the adjusted steady-state population distribution. Hit is with
# the new OMEGA transition matrix and it should return the new
# steady-state population distribution
omega_new = np.dot(OMEGA2, omega_SSfx)
omega_errs = np.absolute(omega_new - omega_SSfx)
print(
"The maximum absolute difference between the adjusted "
+ "steady-state population distribution and the "
+ "distribution generated by hitting the adjusted OMEGA "
+ "transition matrix is "
+ str(omega_errs.max())
)
# Plot the original immigration rates versus the adjusted
# immigration rates
immratesmaxdiff = np.absolute(imm_rates_orig - imm_rates_adj).max()
print(
"The maximum absolute distance between any two points "
+ "of the original immigration rates and adjusted "
+ "immigration rates is "
+ str(immratesmaxdiff)
)
# plots
pp.plot_omega_fixed(
age_per_EpS, omega_SS_orig, omega_SSfx, E, S, output_dir=OUTPUT_DIR
)
pp.plot_imm_fixed(
age_per_EpS,
imm_rates_orig,
imm_rates_adj,
E,
S,
output_dir=OUTPUT_DIR,
)
pp.plot_population_path(
age_per_EpS,
pop_2021_pct,
omega_path_lev,
omega_SSfx,
curr_year,
E,
S,
output_dir=OUTPUT_DIR,
)
# return omega_path_S, g_n_ss, omega_SSfx, survival rates,
# mort_rates_S, and g_n_path
pop_dict = {
"omega": omega_path_S.T,
"g_n_ss": g_n_ss,
"omega_SS": omega_SSfx[-S:] / omega_SSfx[-S:].sum(),
"surv_rate": 1 - mort_rates_S,
"rho": mort_rates_S,
"g_n": g_n_path,
"imm_rates": imm_rates_mat.T,
"omega_S_preTP": omega_S_preTP,
}
return pop_dict
[docs]
def ab_zero_eqs_exp_func(ab_vals, params):
""" "
This function returns a vector of error values for the two zero equations
in terms of parameters a and b for given values of a and b.
"""
con_slope, x_con, y_con, x_eps, eps = params
a, b = ab_vals
c = np.log(y_con) - a * (x_con**2) - b * x_con
error_1 = (2 * a * x_con + b) * np.exp(
a * (x_con**2) + b * x_con + c
) - con_slope
error_2 = (2 * a * x_eps + b) * np.exp(
a * (x_eps**2) + b * x_eps + c
) - eps
error_vec = np.array([error_1, error_2])
return error_vec
[docs]
def b_zero_eq_arctan_func(b, params):
""" "
This function returns a scalar error value of the univariate error function
in parameter b for given values of b.
"""
con_slope, x_con, y_con, x_eps, low = params
if low:
a = y_con / (
(1 / np.pi) * np.arctan((b / 3) * (x_con - x_eps)) + (1 / 2)
)
a_other = (
con_slope * np.pi * (1 + ((b / 3) ** 2) * ((x_con - x_eps) ** 2))
) / b
error_val = a_other - a
else:
a = y_con / (
(-1 / np.pi) * np.arctan((b / 3) * (x_con - x_eps)) + (1 / 2)
)
a_other = (
-con_slope * np.pi * (1 + ((b / 3) ** 2) * ((x_con - x_eps) ** 2))
) / b
error_val = a_other - a
return error_val