9
9
import traceback
10
10
from urllib .parse import urlparse
11
11
from redisrpc .version import VERSION
12
+ from redisrpc .errors import ChannelActiveException
12
13
13
14
try :
14
15
import redis
15
- except :
16
- pass
16
+ except ImportError :
17
+ raise ImportError ( "redis package must be installed" )
17
18
18
19
19
20
class BasePubSub (object ):
20
21
"""
21
- Base publish–subscribe is a
22
- messaging pattern class
22
+ Base publish–subscribe is a
23
+ messaging pattern class
23
24
"""
24
25
25
26
def __init__ (self , channel_name ):
26
27
"""
27
- Initialize and subscribe channel
28
+ Initialize and subscribe channel
28
29
"""
30
+ self .validate_env ()
29
31
self .rdb = redis .StrictRedis .from_url (os .getenv ("REDIS_URI" ))
30
32
self .check_connection_redis ()
31
33
self .channel = channel_name
@@ -44,7 +46,19 @@ def check_connection_redis(self):
44
46
f"redis_uri: { os .getenv ('REDIS_URI' )} "
45
47
)
46
48
49
+ def validate_env (self ):
50
+ if not os .getenv ("REDIS_URI" ):
51
+ raise ValueError ("REDIS_URI environment variable must be set" )
52
+
53
+ def check_before_connect (self ):
54
+ # Get the list of currently active channels
55
+ active_channels = self .rdb .pubsub_channels ()
56
+ if self .channel .encode ("utf-8" ) in active_channels :
57
+ print ("Please use different channel name or kill existing channel" )
58
+ raise ChannelActiveException (self .channel )
59
+
47
60
def listen (self ):
61
+ self .check_before_connect ()
48
62
try :
49
63
self .print_start ()
50
64
self .log_print ("Pubsub is listen..." )
@@ -72,20 +86,20 @@ def event_handler(self, event_name, data):
72
86
response = event (data )
73
87
if response :
74
88
self .log_print (f"Success response from { event_name } " , "DEBUG" )
75
- self .__send_reponse ({
76
- "token" : self .token ,
77
- "event_name" : event_name ,
78
- "data" : response
79
- })
89
+ self .__send_reponse (
90
+ {
91
+ "token" : self .token ,
92
+ "event_name" : event_name ,
93
+ "data" : response ,
94
+ }
95
+ )
80
96
else :
81
97
self .log_print (f"Empty response from { event_name } " , "WARNING" )
82
98
except :
83
99
self .log_print (traceback .format_exc (), "FATAL" )
84
100
else :
85
101
self .log_print (f"Can't find `{ event_name } ` event name" , "ERROR" )
86
- return {
87
- "error" : f"Can't find `{ event_name } ` event name"
88
- }
102
+ return {"error" : f"Can't find `{ event_name } ` event name" }
89
103
90
104
def print_start (self ):
91
105
start_text = f"""
@@ -122,10 +136,11 @@ def log_print(self, text, type="INFO"):
122
136
def connection_uri (self ):
123
137
uri = urlparse (os .getenv ("REDIS_URI" ))
124
138
host = uri .netloc
125
- paswd = ""
126
139
if ":" in host and "@" in host :
127
- paswd = host [host .index (":" ):host .index ("@" )]
128
- return os .getenv ("REDIS_URI" ).replace (paswd , "****" )
140
+ paswd = host [host .index (":" ) : host .index ("@" )]
141
+ return os .getenv ("REDIS_URI" ).replace (paswd , "***" )
142
+ else :
143
+ return os .getenv ("REDIS_URI" )
129
144
130
145
def __encode_base64 (self , data ):
131
146
return base64 .b64encode (json .dumps (data ).encode ("utf-8" ))
@@ -153,11 +168,7 @@ def __send_reponse(self, data):
153
168
self .rdb .publish (self .channel , decode )
154
169
155
170
def send (self , event_name , data , wait_response_time = 2 ):
156
- resp = {
157
- "token" : self .token ,
158
- "event_name" : event_name ,
159
- "data" : data
160
- }
171
+ resp = {"token" : self .token , "event_name" : event_name , "data" : data }
161
172
decode = self .__encode_base64 (resp )
162
173
self .rdb .publish (self .channel , decode )
163
174
print ("Send" )
0 commit comments