diff --git a/.travis.yml b/.travis.yml index 3ed2151..aab3b68 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ before_install: - export DJANGO_SETTINGS_MODULE=celery_haystack.test_settings install: - pip install -e . - - pip install -r requirements/v2.txt $DJANGO + - pip install -r requirements/v2.txt $DJANGO $CELERY before_script: - flake8 celery_haystack --ignore=E501,E731 script: @@ -19,6 +19,8 @@ env: - DJANGO="Django>=1.9,<1.10" - DJANGO="Django>=1.10,<1.11" - DJANGO="Django>=1.11,<1.12" + - CELERY="celery>=3.1,<4.0" + - CELERY="celery>=4.0,<5.0" notifications: irc: "irc.freenode.org#haystack" diff --git a/README.rst b/README.rst index 656fba1..c6edacb 100644 --- a/README.rst +++ b/README.rst @@ -14,7 +14,7 @@ Requirements * Django 1.8+ * Haystack_ `2.X`_ -* Celery_ 3.X +* Celery_ 3.X / 4.X You also need to install your choice of one of the supported search engines for Haystack and one of the supported backends for Celery. diff --git a/celery_haystack/conf.py b/celery_haystack/conf.py index 26f278a..b8d32e2 100644 --- a/celery_haystack/conf.py +++ b/celery_haystack/conf.py @@ -15,7 +15,9 @@ class CeleryHaystack(AppConf): #: The number of retries that are done MAX_RETRIES = 1 #: The default Celery task class - DEFAULT_TASK = 'celery_haystack.tasks.CeleryHaystackSignalHandler' + DEFAULT_TASK = 'celery_haystack.tasks.haystack_signal_handler' + #: The update handler class + HANDLER = 'celery_haystack.handler.CeleryHaystackSignalHandler' #: The name of the celery queue to use, or None for default QUEUE = None #: Whether the task should be handled transaction safe diff --git a/celery_haystack/exceptions.py b/celery_haystack/exceptions.py new file mode 100644 index 0000000..af37c0e --- /dev/null +++ b/celery_haystack/exceptions.py @@ -0,0 +1,35 @@ +import six + + +class CeleryHaystackException(Exception): + pass + + +@six.python_2_unicode_compatible +class IndexOperationException(CeleryHaystackException): + def __init__(self, index, reason): + self.index = index + self.reason = reason + + def __str__(self): + return "Failed to update index %s. %s" % (self.index, self.reason) + + +@six.python_2_unicode_compatible +class InstanceNotFoundException(CeleryHaystackException): + def __init__(self, model_class, pk, reason): + self.model_class = model_class + self.pk = pk + self.reason = reason + + def __str__(self): + return "Unable to load instance %s with pk=%s. %s" % (self.model_class, self.pk, self.reason) + + +@six.python_2_unicode_compatible +class UnrecognizedActionException(CeleryHaystackException): + def __init__(self, action): + self.action = action + + def __str__(self): + return "Unrecognized action '%s'" % self.action diff --git a/celery_haystack/handler.py b/celery_haystack/handler.py new file mode 100644 index 0000000..44cdc76 --- /dev/null +++ b/celery_haystack/handler.py @@ -0,0 +1,129 @@ +import logging +from django.apps import apps +from django.core.exceptions import ImproperlyConfigured +from haystack import connection_router, connections +from haystack.exceptions import NotHandled as IndexNotFoundException + +from celery_haystack import exceptions + + +logger = logging.getLogger(__name__) + + +class CeleryHaystackSignalHandler(object): + def __init__(self, identifier): + self.identifier = identifier + + # First get the object path and pk (e.g. ('notes.note', 23)) + self.object_path, self.pk = self.split_identifier(identifier) + + @staticmethod + def split_identifier(identifier, **kwargs): + """ + Break down the identifier representing the instance. + + Converts 'notes.note.23' into ('notes.note', 23). + """ + bits = identifier.split('.') + + if len(bits) < 2: + raise ValueError("Unable to parse object identifer '%s'" % identifier) + + pk = bits[-1] + # In case Django ever handles full paths... + object_path = '.'.join(bits[:-1]) + return object_path, pk + + def get_model_class(self): + """ + Fetch the model's class in a standarized way. + """ + bits = self.object_path.split('.') + app_name = '.'.join(bits[:-1]) + classname = bits[-1] + model_class = apps.get_model(app_name, classname) + + if model_class is None: + raise ImproperlyConfigured("Could not load model '%s'." % + self.object_path) + return model_class + + @staticmethod + def get_instance(model_class, pk): + """ + Fetch the instance in a standarized way. + """ + try: + instance = model_class._default_manager.get(pk=pk) + except (model_class.DoesNotExist, model_class.MultipleObjectsReturned) as exc: + raise exceptions.InstanceNotFoundException(model_class, pk, reason=exc) + + return instance + + @staticmethod + def get_indexes(model_class): + """ + Fetch the model's registered ``SearchIndex`` in a standarized way. + """ + try: + using_backends = connection_router.for_write(**{'models': [model_class]}) + for using in using_backends: + index_holder = connections[using].get_unified_index() + yield index_holder.get_index(model_class), using + except IndexNotFoundException: + raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." % model_class) + + @staticmethod + def get_index_name(index): + """ + Get index name + """ + return ".".join([index.__class__.__module__, + index.__class__.__name__]) + + def handle_delete(self, current_index, using, model_class): + # If the object is gone, we'll use just the identifier + # against the index. + try: + current_index.remove_object(self.identifier, using=using) + except Exception as exc: + raise exceptions.IndexOperationException(index=current_index, reason=exc) + else: + + msg = ("Deleted '%s' (with %s)" % + (self.identifier, self.get_index_name(current_index))) + logger.debug(msg) + + def handle_update(self, current_index, using, model_class): + # and the instance of the model class with the pk + + instance = self.get_instance(model_class, self.pk) + + # Call the appropriate handler of the current index and + # handle exception if neccessary + try: + current_index.update_object(instance, using=using) + except Exception as exc: + raise exceptions.IndexOperationException(index=current_index, reason=exc) + else: + msg = ("Updated '%s' (with %s)" % + (self.identifier, self.get_index_name(current_index))) + logger.debug(msg) + + def handle(self, action): + """ + Trigger the actual index handler depending on the + given action ('update' or 'delete'). + """ + + # Then get the model class for the object path + model_class = self.get_model_class() + + for current_index, using in self.get_indexes(model_class): + + if action == 'delete': + self.handle_delete(current_index, using, model_class) + elif action == 'update': + self.handle_update(current_index, using, model_class) + else: + raise exceptions.UnrecognizedActionException(action) diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index d8acedd..9a2629e 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -1,155 +1,48 @@ -from django.core.exceptions import ImproperlyConfigured +from celery.app import shared_task +from celery.utils.log import get_task_logger from django.core.management import call_command -from django.apps import apps +from .utils import get_handler +from . import exceptions from .conf import settings -from haystack import connections, connection_router -from haystack.exceptions import NotHandled as IndexNotFoundException - -from celery.task import Task # noqa -from celery.utils.log import get_task_logger - logger = get_task_logger(__name__) -class CeleryHaystackSignalHandler(Task): - using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS - max_retries = settings.CELERY_HAYSTACK_MAX_RETRIES - default_retry_delay = settings.CELERY_HAYSTACK_RETRY_DELAY - - def split_identifier(self, identifier, **kwargs): - """ - Break down the identifier representing the instance. - - Converts 'notes.note.23' into ('notes.note', 23). - """ - bits = identifier.split('.') - - if len(bits) < 2: - logger.error("Unable to parse object " - "identifer '%s'. Moving on..." % identifier) - return (None, None) - - pk = bits[-1] - # In case Django ever handles full paths... - object_path = '.'.join(bits[:-1]) - return (object_path, pk) - - def get_model_class(self, object_path, **kwargs): - """ - Fetch the model's class in a standarized way. - """ - bits = object_path.split('.') - app_name = '.'.join(bits[:-1]) - classname = bits[-1] - model_class = apps.get_model(app_name, classname) - - if model_class is None: - raise ImproperlyConfigured("Could not load model '%s'." % - object_path) - return model_class - - def get_instance(self, model_class, pk, **kwargs): - """ - Fetch the instance in a standarized way. - """ - instance = None - try: - instance = model_class._default_manager.get(pk=pk) - except model_class.DoesNotExist: - logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" % - (model_class._meta.app_label.lower(), - model_class._meta.object_name.lower(), pk)) - except model_class.MultipleObjectsReturned: - logger.error("More than one object with pk %s. Oops?" % pk) - return instance - - def get_indexes(self, model_class, **kwargs): - """ - Fetch the model's registered ``SearchIndex`` in a standarized way. - """ - try: - using_backends = connection_router.for_write(**{'models': [model_class]}) - for using in using_backends: - index_holder = connections[using].get_unified_index() - yield index_holder.get_index(model_class), using - except IndexNotFoundException: - raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." % - model_class) - - def run(self, action, identifier, **kwargs): - """ - Trigger the actual index handler depending on the - given action ('update' or 'delete'). - """ - # First get the object path and pk (e.g. ('notes.note', 23)) - object_path, pk = self.split_identifier(identifier, **kwargs) - if object_path is None or pk is None: - msg = "Couldn't handle object with identifier %s" % identifier - logger.error(msg) - raise ValueError(msg) - - # Then get the model class for the object path - model_class = self.get_model_class(object_path, **kwargs) - for current_index, using in self.get_indexes(model_class, **kwargs): - current_index_name = ".".join([current_index.__class__.__module__, - current_index.__class__.__name__]) - - if action == 'delete': - # If the object is gone, we'll use just the identifier - # against the index. - try: - current_index.remove_object(identifier, using=using) - except Exception as exc: - logger.exception(exc) - self.retry(exc=exc) - else: - msg = ("Deleted '%s' (with %s)" % - (identifier, current_index_name)) - logger.debug(msg) - elif action == 'update': - # and the instance of the model class with the pk - instance = self.get_instance(model_class, pk, **kwargs) - if instance is None: - logger.debug("Failed updating '%s' (with %s)" % - (identifier, current_index_name)) - raise ValueError("Couldn't load object '%s'" % identifier) - - # Call the appropriate handler of the current index and - # handle exception if neccessary - try: - current_index.update_object(instance, using=using) - except Exception as exc: - logger.exception(exc) - self.retry(exc=exc) - else: - msg = ("Updated '%s' (with %s)" % - (identifier, current_index_name)) - logger.debug(msg) - else: - logger.error("Unrecognized action '%s'. Moving on..." % action) - raise ValueError("Unrecognized action %s" % action) - - -class CeleryHaystackUpdateIndex(Task): +@shared_task(bind=True, + using=settings.CELERY_HAYSTACK_DEFAULT_ALIAS, + max_retries=settings.CELERY_HAYSTACK_MAX_RETRIES, + default_retry_delay=settings.CELERY_HAYSTACK_RETRY_DELAY) +def haystack_signal_handler(self, action, identifier, **kwargs): + try: + get_handler()(identifier).handle(action) + except exceptions.IndexOperationException as exc: + logger.exception(exc) + self.retry(exc=exc) + except exceptions.InstanceNotFoundException as exc: + logger.error(exc) + except exceptions.UnrecognizedActionException as exc: + logger.error("%s. Moving on..." % action) + + +@shared_task() +def celery_haystack_update_index(apps=None, **kwargs): """ A celery task class to be used to call the update_index management command from Celery. """ - def run(self, apps=None, **kwargs): - defaults = { - 'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE, - 'age': settings.CELERY_HAYSTACK_COMMAND_AGE, - 'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE, - 'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS], - 'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS, - 'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY, - } - defaults.update(kwargs) - if apps is None: - apps = settings.CELERY_HAYSTACK_COMMAND_APPS - # Run the update_index management command - logger.info("Starting update index") - call_command('update_index', *apps, **defaults) - logger.info("Finishing update index") + defaults = { + 'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE, + 'age': settings.CELERY_HAYSTACK_COMMAND_AGE, + 'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE, + 'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS], + 'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS, + 'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY, + } + defaults.update(kwargs) + if apps is None: + apps = settings.CELERY_HAYSTACK_COMMAND_APPS + # Run the update_index management command + logger.info("Starting update index") + call_command('update_index', *apps, **defaults) + logger.info("Finishing update index") diff --git a/celery_haystack/utils.py b/celery_haystack/utils.py index 9cb4938..8a05fb3 100644 --- a/celery_haystack/utils.py +++ b/celery_haystack/utils.py @@ -10,20 +10,24 @@ from .conf import settings -def get_update_task(task_path=None): - import_path = task_path or settings.CELERY_HAYSTACK_DEFAULT_TASK - module, attr = import_path.rsplit('.', 1) +def get_class(import_path): + module_name, attr = import_path.rsplit('.', 1) try: - mod = import_module(module) + mod = import_module(module_name) except ImportError as e: raise ImproperlyConfigured('Error importing module %s: "%s"' % - (module, e)) + (module_name, e)) try: - Task = getattr(mod, attr) + update_task = getattr(mod, attr) except AttributeError: raise ImproperlyConfigured('Module "%s" does not define a "%s" ' - 'class.' % (module, attr)) - return Task() + 'class.' % (module_name, attr)) + return update_task + + +def get_update_task(task_path=None): + import_path = task_path or settings.CELERY_HAYSTACK_DEFAULT_TASK + return get_class(import_path) def enqueue_task(action, instance, **kwargs): @@ -53,3 +57,7 @@ def enqueue_task(action, instance, **kwargs): ) else: task_func() + + +def get_handler(): + return get_class(settings.CELERY_HAYSTACK_HANDLER) diff --git a/requirements/v2.txt b/requirements/v2.txt index 7e08873..18dca78 100644 --- a/requirements/v2.txt +++ b/requirements/v2.txt @@ -1,6 +1,4 @@ django-discover-runner -django-haystack -celery Whoosh flake8 coverage diff --git a/setup.cfg b/setup.cfg index ad6de1c..a482717 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,8 +25,11 @@ classifier = Programming Language :: Python :: 3.4 Topic :: Utilities requires-dist = + django>=1.8,<1.12 django-appconf>=0.4.1 - + six + celery>=3.1,<5.0 + django-haystack>=2.0,<3.0 [files] packages =