-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
158 lines (136 loc) · 5.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import argparse
import datetime
import pandas as pd
from gap_filling.data_handler import DataHandler
from gap_filling.edgar_projection import ProjectData
from gap_filling.fill_gaps import fill_all_sector_gaps, prepare_df, update_based_on_activity
from gap_filling.utils import (
parse_and_format_data_to_insert,
get_all_edgar_data,
get_all_faostat_data,
generate_carbon_equivalencies,
assemble_data,
get_all_ceds_data,
get_all_ceds_derived_data,
)
from gap_filling.constants import get_gap_equations, get_sectors
from gap_filling import ceds_derived_sectors
def process_all(args):
############################
# Get the data
############################
# Init the Data Handler
# get connections
getedgar_conn = DataHandler()
getct_conn = DataHandler()
getceds_conn = DataHandler()
getcedsderived_conn = DataHandler()
write_conn = DataHandler()
# Get Gap equations
gap_equations = get_gap_equations()
sectors = get_sectors(gap_equations)
############################
# Project Data
############################
# Project the Edgar Data
############################
print("Projecting EDGAR data...")
proj_edgar = ProjectData(db_params_file_path=args.params_file, source="edgar")
proj_edgar.load()
proj_edgar.clean()
df_projections = proj_edgar.project()
df_projections_final = proj_edgar.prepare_to_write(df_projections)
# Write results to the DB
df_projections_final = df_projections_final.drop(
columns="measurement_method_doi_or_url"
)
# write_conn.insert_with_update(df_projections_final, "country_emissions_staging")
#
###########################
# Project the FAOSTAT Data
############################
print("Projecting FAOSTAT data...")
proj_edgar = ProjectData(db_params_file_path=args.params_file, source="faostat")
proj_edgar.load()
proj_edgar.clean()
df_projections = proj_edgar.project()
df_projections_final = proj_edgar.prepare_to_write(df_projections)
# Write results to the DB
df_projections_final = df_projections_final.drop(
columns="measurement_method_doi_or_url"
)
write_conn.insert_with_update(df_projections_final, "country_emissions_staging")
############################
# Project the CEDS Data
############################
print("Projecting CEDS data...")
proj_edgar = ProjectData(db_params_file_path=args.params_file, source="ceds")
proj_edgar.load()
proj_edgar.clean()
df_projections = proj_edgar.project()
df_projections_final = proj_edgar.prepare_to_write(df_projections)
# Write results to the DB
df_projections_final = df_projections_final.drop(
columns="measurement_method_doi_or_url"
)
write_conn.insert_with_update(df_projections_final, "country_emissions_staging")
############################
# Recalculate ceds-derived data with projected data
############################
# ceds_derived_sectors.main("country_emissions_staging")
############################
# Fill Gaps
############################
# Get the newly projected edgar data from db from OLD db
edgar_data = get_all_edgar_data(getedgar_conn, get_projected=True)
# Get the FAOSTAT data from db
faostat_data = get_all_faostat_data(getedgar_conn, get_projected=True)
# Get the CEDS data from db
ceds_data = get_all_ceds_data(getceds_conn, get_projected=True)
# Get the CEDS-derived data from db
ceds_derived_data = get_all_ceds_derived_data(
getcedsderived_conn, get_projected=True
)
# Get the CT data from db
ct_data = getct_conn.load_data("climate-trace", years_to_columns=True)
# Gap fill on projected data
concat_df = pd.concat(
[edgar_data, faostat_data, ct_data, ceds_data, ceds_derived_data]
)
df = prepare_df(concat_df)
gap_filled_data = fill_all_sector_gaps(df, gap_equations)
#Update emissions based on country level activity, where applicable (i.e. where activity is nonzero but emissions are zero)
gap_filled_data = update_based_on_activity(gap_filled_data)
# Generate the co2e data
co2e_20_data = generate_carbon_equivalencies(
getedgar_conn, gap_filled_data, co2e_to_compute=20
)
co2e_100_data = generate_carbon_equivalencies(
getedgar_conn, gap_filled_data, co2e_to_compute=100
)
# This function generates placeholders for every sector, country, and gas combination and merges the dataframes
assembled_df = assemble_data(
gap_filled_data, co2e_20_data, co2e_100_data, SECTORS=sectors
)
# These data need to undergo a transformation before we can insert them into the db
data_to_insert = parse_and_format_data_to_insert(assembled_df)
data_to_insert["created_date"] = datetime.datetime.now().isoformat()
data_to_insert["recency_date"] = datetime.datetime.now().isoformat()
# Write results to the DB
write_conn.insert_with_update(data_to_insert, "country_emissions_staging")
#Close all connections:
getedgar_conn.close_conn(); getceds_conn.close_conn(); getct_conn.close_conn(); getcedsderived_conn.close_conn(); write_conn.close_conn()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Code to fill gaps in the Climate-Trace data using EDGAR"
)
parser.add_argument(
"-p",
"--params_file",
dest="params_file",
type=str,
help="location of db connection params",
default="params.json",
)
args = parser.parse_args()
process_all(args)