-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
187 lines (145 loc) · 5.59 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
Script to scrape food delivery orders from Gmail
"""
import json
import multiprocessing
import os
import sys
import pandas as pd
# Add src directory to Python path
current_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.join(current_dir, "src")
sys.path.append(src_dir)
from gmail import Gmail # noqa: E402
from gsheets import GoogleSheets # noqa: E402
from scrapers import scrape_doordash_command_factory # noqa: E402
from validators import ( # noqa: E402
validate_order_modifier_counts,
validate_order_subtotal,
)
def load_from_cache(cache_path: str):
if os.path.exists(cache_path):
with open(cache_path, "r") as cache_file:
return json.load(cache_file)
return {}
def save_to_cache(messages, cache_path: str):
with open(cache_path, "w") as cache_file:
json.dump(messages, cache_file)
def get_messages(gmail: Gmail, query: str, use_cache: bool):
cache_path = "data/cache/cached_messages.json"
cached_messages = load_from_cache(cache_path) if use_cache else {}
search_results = gmail.search(query)
new_messages = {
m["id"]: gmail.get_message_by_id(id=m["id"])
for m in search_results
if m["id"] not in cached_messages
}
all_messages = {**cached_messages, **new_messages}
save_to_cache(all_messages, cache_path)
return all_messages
def extract_order(scrape_command, msg):
try:
order = scrape_command(message_obj=msg)
return "success", order, msg
except AssertionError:
return "invalid", None, msg
except Exception as error:
return "failed", str(error), msg
def extract_orders(scrape_command, messages):
orders = []
success_msgs = []
failed_msgs = []
invalid_msgs = []
num_processes = max(1, multiprocessing.cpu_count() - 2)
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.starmap(
extract_order, [(scrape_command, msg) for msg in messages.values()]
)
for result_type, order_or_error, msg in results:
if result_type == "success":
orders.append(order_or_error)
success_msgs.append(msg)
elif result_type == "failed":
failed_msgs.append({"error": order_or_error, "msg": msg})
else:
invalid_msgs.append(msg)
print(
f"Total Messages: {len(messages)}"
f"\n - Success: {len(orders)}"
f"\n - Failure: {len(failed_msgs)}"
f"\n - Invalid: {len(invalid_msgs)}"
)
save_to_cache(invalid_msgs, "data/cache/invalid_messages.json")
save_to_cache(failed_msgs, "data/cache/scraping_errors.json")
return orders, success_msgs
def validate_order(order, msg):
if not validate_order_subtotal(order=order):
return None
if not validate_order_modifier_counts(order=order, msg=msg):
return None
return order
def validate_results(orders, messages):
orders_clean = []
num_processes = max(1, multiprocessing.cpu_count() - 2)
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.starmap(validate_order, zip(orders, messages))
# Filter out None results (failed validations)
orders_clean = [order for order in results if order is not None]
return orders_clean
def process_orders(orders):
orders_lst = []
order_items_lst = []
for order in orders:
flattened_order = {}
flattened_order["message_id"] = order["message_id"]
flattened_order["date"] = order["date"]
flattened_order["store_name"] = order["store_name"]
flattened_order["delivery_address"] = order["delivery_address"]
flattened_order["eta"] = order["eta"]
flattened_order.update(order["cost_summary"])
orders_lst.append(flattened_order)
for item in order["items"]:
item_dict = {"order_message_id": order["message_id"], **item}
item_dict["modifiers"] = " | ".join(item["modifiers"])
order_items_lst.append(item_dict)
orders_df = pd.DataFrame(orders_lst)
for col in orders_df.columns:
fill_value = 0 if col[0].isupper() else ""
orders_df.loc[:, col] = orders_df[col].fillna(fill_value)
order_items_df = pd.DataFrame(order_items_lst)
order_items_df.fillna("", inplace=True)
return orders_df, order_items_df
def export_results(orders_df, order_items_df, save_local_copy: bool = True):
if save_local_copy:
orders_df.to_csv("data/orders.csv", index=False)
order_items_df.to_csv("data/order_items.csv", index=False)
sheets = GoogleSheets()
dest = "Doordash Orders"
sheets.write_df_to_sheet(
orders_df, sheet_name=dest, worksheet_title="Orders", worksheet_index=0
)
sheets.write_df_to_sheet(
order_items_df,
sheet_name=dest,
worksheet_title="Order Items",
worksheet_index=1,
)
def main():
print("🌟 EXECUTION STARTED!")
print("\n😺 Fetching emails...")
gmail = Gmail()
query = 'from:no-reply@doordash.com subject:"Order Confirmation for"'
messages = get_messages(gmail, query, use_cache=True)
print("\n🍕 Extracting orders...")
orders, success_msgs = extract_orders(
scrape_command=scrape_doordash_command_factory(),
messages=messages,
)
print("\n❓ Validating results...")
orders = validate_results(orders=orders, messages=success_msgs)
print("\n📤 Exporting results for analysis")
orders_df, order_items_df = process_orders(orders=orders)
export_results(orders_df=orders_df, order_items_df=order_items_df)
print("\n🏁 EXECUTION COMPLETE.")
if __name__ == "__main__":
main()