-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathclean_fulltext.py
79 lines (66 loc) · 2.67 KB
/
clean_fulltext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import time
import traceback
from datetime import datetime
from threading import Thread
from bs4 import BeautifulSoup
import os
from sqlalchemy import create_engine, text
engine = create_engine(
os.getenv("OPENALEX_DATABASE_URL").replace('postgres://', 'postgresql://'))
conn = engine.connect()
PROCESSED_COUNT = 0
CHUNK_SIZE = 100
def print_stats():
started = datetime.now()
while True:
now = datetime.now()
hrs_passed = (now - started).total_seconds() / (60 * 60)
rate = round(PROCESSED_COUNT / hrs_passed, 2)
print(f'[*] Processing rate: {rate}/hr')
time.sleep(5)
def clean_fulltext(fulltext, truncate_limit):
soup = BeautifulSoup(fulltext, features='lxml', parser='lxml')
cleaned = soup.get_text(separator=' ')
return cleaned[:truncate_limit] if cleaned else ''
def update_fulltexts():
global PROCESSED_COUNT
stmnt = f'''
with queue as (
SELECT record_fulltext_truncate_queue.recordthresher_id, rf.fulltext FROM mid.record_fulltext_truncate_queue
JOIN mid.record_fulltext rf on record_fulltext_truncate_queue.recordthresher_id = rf.recordthresher_id
WHERE started is FALSE AND rf.fulltext IS NOT NULL
LIMIT {CHUNK_SIZE} FOR UPDATE SKIP LOCKED
)
update mid.record_fulltext_truncate_queue update_rows SET started = TRUE
FROM queue WHERE update_rows.recordthresher_id = queue.recordthresher_id
RETURNING queue.*;
'''
updates = []
with conn.connection.cursor() as cursor:
while True:
try:
rows = conn.execute(text(stmnt)).fetchall()
if not rows:
break
for row in rows:
try:
recordthresher_id, fulltext = row
cleaned = clean_fulltext(fulltext, truncate_limit=200_000)
updates.append((recordthresher_id, cleaned))
PROCESSED_COUNT += 1
except Exception as e:
print(e)
if len(updates) > 0 and ((len(updates) % CHUNK_SIZE) == 0):
updates_template = ','.join(['%s'] * len(updates))
query = cursor.mogrify(
f'INSERT INTO mid.tmp_cleaned_fulltext (recordthresher_id, fulltext) VALUES {updates_template};',
updates)
cursor.execute(query.decode())
conn.connection.commit()
updates = []
except Exception as e:
print(traceback.format_exc())
conn.close()
if __name__ == '__main__':
Thread(target=print_stats, daemon=True).start()
update_fulltexts()