9
9
from celery import Celery
10
10
from celery import Task
11
11
from celery import subtask
12
+ from celery .utils .dispatch import Signal
12
13
from celery .signals import worker_shutdown
13
14
from celery .signals import task_success
14
15
from catalog .store import RedlandStore , EntryNotFound
29
30
CELERY_IMPORTS = ("catalog.store" , "catalog.log" , "catalog_backend" ),
30
31
)
31
32
33
+ on_work_updated = Signal (providing_args = ('task' , 'update_subtask' ))
34
+
32
35
class FileLock (object ):
33
36
def __init__ (self , id , timeout = 15 , lockdir = '.' ):
34
37
self ._filename = os .path .join (lockdir , 'lock-%s' % id )
@@ -111,7 +114,7 @@ def create_work(self, store='main', **kwargs):
111
114
payload = json .dumps (work .get_data ())
112
115
113
116
log_event .apply_async (args = ('create_work' , time , user , resource , payload ))
114
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
117
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
115
118
116
119
return work .get_data ()
117
120
@@ -129,7 +132,7 @@ def update_work(self, store='main', **kwargs):
129
132
payload = json .dumps (work .get_data ())
130
133
131
134
log_event .apply_async (args = ('update_work' , time , user , resource , payload ))
132
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
135
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
133
136
134
137
return work .get_data ()
135
138
@@ -149,7 +152,7 @@ def delete_work(self, store='main', **kwargs):
149
152
payload = None
150
153
151
154
log_event .apply_async (args = ('delete_work' , time , user , resource , payload ))
152
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
155
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
153
156
154
157
return kwargs
155
158
@@ -183,7 +186,7 @@ def add_source(self, store='main', **kwargs):
183
186
payload = json .dumps (source .get_data ())
184
187
185
188
log_event .apply_async (args = ('add_source' , time , user , resource , payload ))
186
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
189
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
187
190
188
191
return source .get_data ()
189
192
else :
@@ -195,7 +198,7 @@ def add_source(self, store='main', **kwargs):
195
198
payload = json .dumps (source .get_data ())
196
199
197
200
log_event .apply_async (args = ('add_source' , time , user , resource , payload ))
198
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
201
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
199
202
200
203
return source .get_data ()
201
204
@@ -227,7 +230,7 @@ def update_source(self, store='main', **kwargs):
227
230
payload = json .dumps (source .get_data ())
228
231
229
232
log_event .apply_async (args = ('update_source' , time , user , resource , payload ))
230
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
233
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
231
234
232
235
return source .get_data ()
233
236
else :
@@ -239,7 +242,7 @@ def update_source(self, store='main', **kwargs):
239
242
payload = json .dumps (source .get_data ())
240
243
241
244
log_event .apply_async (args = ('update_source' , time , user , resource , payload ))
242
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
245
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
243
246
244
247
return source .get_data ()
245
248
@@ -264,7 +267,7 @@ def delete_source(self, store='main', **kwargs):
264
267
payload = json .dumps (kwargs )
265
268
266
269
log_event .apply_async (args = ('delete_source' , time , user , resource , payload ))
267
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
270
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
268
271
269
272
return kwargs
270
273
else :
@@ -276,7 +279,7 @@ def delete_source(self, store='main', **kwargs):
276
279
payload = json .dumps (kwargs )
277
280
278
281
log_event .apply_async (args = ('delete_source' , time , user , resource , payload ))
279
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
282
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
280
283
281
284
return kwargs
282
285
@@ -300,7 +303,7 @@ def add_post(self, store='main', **kwargs):
300
303
payload = json .dumps (post .get_data ())
301
304
302
305
log_event .apply_async (args = ('add_post' , time , user , resource , payload ))
303
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
306
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
304
307
305
308
return post .get_data ()
306
309
@@ -330,7 +333,7 @@ def delete_post(self, store='main', **kwargs):
330
333
payload = None
331
334
332
335
log_event .apply_async (args = ('delete_post' , time , user , resource , payload ))
333
- if store != self .public_store : on_work_updated .apply_async ( args = ( self .subtask (kwargs = kwargs ), ))
336
+ if store != self .public_store : on_work_updated .send ( sender = self , task = self , update_subtask = self .subtask (kwargs = kwargs ))
334
337
335
338
store .delete_post (** kwargs )
336
339
return kwargs
@@ -340,6 +343,10 @@ def get_complete_metadata(self, store='main', **kwargs):
340
343
store = self .main_store if store == 'main' else self .public_store
341
344
return store .get_complete_metadata (** kwargs )
342
345
346
+ @app .task (base = StoreTask , bind = True )
347
+ def query_sparql (self , ** kwargs ):
348
+ return self .public_store .query_sparql (** kwargs )
349
+
343
350
@app .task (base = StoreTask , bind = True )
344
351
def log_event (self , type , time , user , resource , data ):
345
352
self .log .log_event (type , time , user , resource , data )
@@ -348,29 +355,28 @@ def log_event(self, type, time, user, resource, data):
348
355
def query_events (self , type = None , user = None , time_min = None , time_max = None , resource = None , limit = 100 , offset = 0 ):
349
356
return self .log .query_events (type , user , time_min , time_max , resource , limit , offset )
350
357
351
- @app .task (base = StoreTask , bind = True )
352
- def on_work_updated (self , update_subtask ):
353
- task = update_subtask ['task' ]
354
- kwargs = update_subtask ['kwargs' ]
358
+ @on_work_updated .connect
359
+ def work_updated_handler (sender = None , task = None , update_subtask = None , ** kwargs ):
360
+ subtask_kwargs = update_subtask ['kwargs' ]
355
361
356
- if task == "catalog_backend. create_work" :
357
- visibility = kwargs .get ('visibility' , None )
362
+ if sender == create_work :
363
+ visibility = subtask_kwargs .get ('visibility' , None )
358
364
if visibility != "public" :
359
365
return False
360
- elif task == "catalog_backend. update_work" :
361
- work_id = kwargs ['id' ]
362
- work = self .main_store .get_work (user = kwargs ['user' ], id = work_id )
363
- visibility = kwargs .get ('visibility' , work ['visibility' ])
366
+ elif sender == update_work :
367
+ work_id = subtask_kwargs ['id' ]
368
+ work = task .main_store .get_work (user = subtask_kwargs ['user' ], id = work_id )
369
+ visibility = subtask_kwargs .get ('visibility' , work ['visibility' ])
364
370
if visibility != "public" :
365
371
return False
366
- elif task == "catalog_backend. add_source" or \
367
- task == "catalog_backend. update_source" or \
368
- task == "catalog_backend. add_post" :
369
- work_id = kwargs ['work_id' ]
370
- work = self .main_store .get_work (user = kwargs ['user' ], id = work_id )
372
+ elif sender == add_source or \
373
+ sender == update_source or \
374
+ sender == add_post :
375
+ work_id = subtask_kwargs ['work_id' ]
376
+ work = task .main_store .get_work (user = subtask_kwargs ['user' ], id = work_id )
371
377
if work ['visibility' ] != 'public' :
372
378
return False
373
379
374
380
# work is public or deleted, ok to re-run the updater task for public store now
375
381
sub = subtask (update_subtask )
376
- sub .apply_async (kwargs = {"store" : "public" })
382
+ sub .apply_async (kwargs = {"store" : "public" })
0 commit comments