-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbest_price_scraper.py
193 lines (148 loc) · 8.29 KB
/
best_price_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import configparser
import time
from selenium import webdriver
from selenium.common import NoSuchElementException, TimeoutException
from selenium.webdriver import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from tqdm import tqdm
from Base_Scraper import Base_Scraper
import json
class best_price_scraper(Base_Scraper):
def __init__(self):
super().__init__()
self.setup()
def setup(self):
"Sets up the web driver with the customized configuration"
global driver
config = configparser.ConfigParser()
config.read("config.ini")
driver_path = config.get('General_settings', 'driver_path')
browser_type = config.get("General_settings", "browser_type").lower()
if (browser_type == "chrome"):
browser_options = webdriver.ChromeOptions()
browser_options.add_argument('--headless=new')
if (driver_path == "0"):
driver = webdriver.Chrome(options=browser_options)
else:
driver = webdriver.Chrome(driver_path, options=browser_options)
elif (browser_type == "firefox"):
browser_options = webdriver.FirefoxOptions()
browser_options.add_argument("-headless")
if (driver_path == "0"):
driver = webdriver.Firefox(options=browser_options)
else:
driver = webdriver.Firefox(driver_path, options=browser_options)
elif (browser_type == "edge"):
browser_options = webdriver.EdgeOptions()
browser_options.add_argument("--headless=new")
if (driver_path == "0"):
driver = webdriver.Edge(options=browser_options)
else:
driver = webdriver.Edge(driver_path, options=browser_options)
def categories_are_available(self):
"Returns True if there are available categories to choose from and False if there aren't"
return len(driver.find_elements(By.CLASS_NAME, "categories__category")) != 0
def process_items(self, products_number):
"Processes all products from all the available pages and stores them in a list"
processed_product_links = set()
with tqdm(total=products_number, desc="Processing product items...", colour="GREEN", unit="product") as pbar:
while len(self.all_products) < products_number:
products_list = WebDriverWait(driver, timeout=10).until(EC.presence_of_element_located((By.CLASS_NAME, "p__products")))
window_height = driver.execute_script("return window.innerHeight || document.documentElement.clientHeight || document.body.clientHeight;")
last_height = driver.execute_script("return document.body.scrollHeight")
scroll_pause_time = 2 # Adjust this based on how fast new content loads
while True:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(scroll_pause_time)
new_height = driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
try:
products_info_list = products_list.find_elements(By.XPATH, "./child::*")
products_info_list = [element for element in products_info_list if 'p__products-section' not in element.get_attribute('class')]
except NoSuchElementException:
print("No such element")
products_info_list = products_list.find_elements(By.CLASS_NAME, "p")
products_number = len(products_info_list)
i = 0
for product in products_info_list:
try:
product_element = product.find_element(By.CLASS_NAME, "p__meta")
product_name = WebDriverWait(product_element, timeout=10).until(EC.presence_of_element_located((By.TAG_NAME, "a"))).get_attribute("title")
product_link = WebDriverWait(product_element, timeout=10).until(EC.presence_of_element_located((By.TAG_NAME, "a"))).get_attribute("href")
review_score = reviews_count = 0
script = driver.find_elements(By.XPATH, '//script[@type="application/ld+json"]')[-1].get_attribute("innerHTML")
products = json.loads(script)
products = products["itemListElement"]
if "aggregateRating" in products[i]["item"]:
review_score = products[i]["item"]["aggregateRating"]["ratingValue"]
reviews_count = products[i]["item"]["aggregateRating"]["reviewCount"]
if product_link in processed_product_links:
continue # Skip already processed products
try:
product_price_elem = product.find_element(By.CLASS_NAME, "p__price--current").text
except NoSuchElementException:
continue # Product is unavailable
product_price = product_price_elem.strip().replace('€', '').replace('.', '').replace(",",".")
except TimeoutException:
continue
product_info = {
"name": product_name,
"link": product_link,
"price": float(product_price),
"review_score": review_score,
"reviews_count": reviews_count
}
self.all_products.append(product_info)
processed_product_links.add(product_link)
# Update tqdm progress bar
pbar.update(1)
if len(self.all_products) >= products_number:
break
i+=1
print("Processed all products")
def select_products(self):
url = "https://www.bestprice.gr/"
driver.get(url)
WebDriverWait(driver, 10).until(EC.url_to_be(url))
search = driver.find_element(By.CLASS_NAME, "search__field").find_element(By.NAME, "q")
product = input("Enter the product you're looking for on BestPrice: ")
search.send_keys(product)
search.send_keys(Keys.RETURN)
driver.get("https://www.bestprice.gr/search?q=" + product)
time.sleep(2)
# if there's categories to select, select one and browse to that category
if (self.categories_are_available()):
categories_tag = driver.find_elements(By.CLASS_NAME, "categories__category")
categories = [category.get_attribute("title") for category in categories_tag]
print() # empty tile
print("Categories", end="\n\n")
print("-----------------------------")
i = 1
for category in categories:
print(str(i) + ". " + category)
i += 1
print("-----------------------------", end="\n\n")
category_number = input("Choose the category of the product: ")
print() # empty line
while (int(category_number) not in [j for j in range(1, i + 1)]):
category_number = input("Wrong number, enter again: ")
category_number = int(category_number)
category_div = categories_tag[category_number-1].find_element(By.CLASS_NAME,"categories__cnt")
products_number = int(category_div.text.replace("προϊόντα",""))
# go to the right category
link = categories_tag[category_number - 1].get_attribute("href")
driver.get(link)
# save all the products to a list
self.process_items(products_number)
# print all products
products_number = len(self.all_products)
print("-----------------------------")
for i in range(products_number):
print(str(i + 1) + ": " + self.all_products[i]["name"])
# Select product number(s) and save them in a list
self.selected_products = self.select_items(products_number)
driver.quit()