-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdatabase.py
153 lines (129 loc) · 5.07 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import psycopg2
from psycopg2 import Error
from psycopg2.extras import execute_batch
import logging
from podcast_model import Podcast, UserReview
class DatabaseStore:
def __init__(self, user: str, password: str, host: str, port: str,database=None):
try:
self.connection = psycopg2.connect(
user=user,
password=password,
host=host,
port=port,
database = database
)
cursor = self.connection.cursor()
self.connection.autocommit = True
# create a new database if id doesnt exists
if database == None:
create_database ='''
create database podcasts
'''
cursor.execute(create_database)
cursor.close()
self.connection.close()
self.connection = psycopg2.connect(
user=user,
password=password,
host=host,
port=port,
database = 'podcasts',
)
logging.info("Succesfully connected to postgres")
except Exception as e:
logging.error(f" Error while connecting to postgres: {e}")
raise e
@staticmethod
def create_database_from_config(config):
"""
Create and connect to a database from configuration
"""
user = config['USER']
host = config['HOST']
password = config['PASSWORD']
port = config['PORT']
dbname = config['DBNAME']
data_store = DatabaseStore(user=user, password=password,port=port, host=host, database=dbname)
data_store.create_podcast_and_reviews_tables()
return data_store
def create_podcast_and_reviews_tables(self):
cursor = self.connection.cursor()
create_podcast_table = '''
create table if not exists podcasts(
id text primary key unique,
name text not null,
url text not null,
studio text,
category text,
episode_count int,
avg_rating float,
total_ratings int,
description text
) '''
create_reviews_table = '''
create table if not exists user_reviews(
id serial primary key,
podcast_id text REFERENCES podcasts(id) not null,
username text not null,
title text not null,
review text not null,
rating float not null,
date date not null
)
'''
try:
cursor.execute(create_podcast_table)
cursor.execute(create_reviews_table)
self.connection.commit()
cursor.close()
logging.info("Podcast Table created")
except Exception as e:
logging.error(f" Error while connecting to postgres: {e}")
def insert_podcast(self,podcast:Podcast):
"""
Insert podcast information to a database
Returns: Podcast id from database
"""
cursor = self.connection.cursor()
query ="""INSERT INTO podcasts(id,name,url,studio,category,episode_count,avg_rating,total_ratings,description)
VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s);"""
try:
cursor.execute(query,(podcast.id,podcast.name,podcast.url,podcast.studio,podcast.category,podcast.total_episodes,podcast.avg_rating,podcast.total_ratings,podcast.description))
self.connection.commit()
logging.info("Successfully inserted podcast into database")
cursor.close()
except (Exception, psycopg2.DatabaseError) as e:
logging.error(f" Error while inserting podcast : {e}")
def insert_reviews(self,podcast_id:int,reviews):
"""
Insert a list of reviews belonging to a podcast to a database
"""
cursor = self.connection.cursor()
query ="""INSERT INTO user_reviews(podcast_id,title,username,rating,review,date)
VALUES(%s,%s,%s,%s,%s,%s)"""
try:
reviews = [(podcast_id,review.title,review.username,review.rating,review.review,review.date) for review in reviews]
execute_batch(cursor,query,reviews)
self.connection.commit()
logging.info("reviews inserted into database")
cursor.close()
except (Exception, psycopg2.DatabaseError) as e:
logging.error(f" Error while inserting record : {e}")
def fetch_podcasts(self):
"""
Insert a list of reviews belonging to a podcast to a database
"""
cursor = self.connection.cursor()
query ="""select name from podcasts"""
try:
cursor.execute(query)
lists = cursor.fetchall()
self.connection.commit()
cursor.close()
return lists
except (Exception, psycopg2.DatabaseError) as e:
logging.error(f" Error while fetching record : {e}")
def close(self):
# close the database connection
self.connection.close()