diff --git a/celery_haystack/conf.py b/celery_haystack/conf.py index 26f278a..248cf6e 100644 --- a/celery_haystack/conf.py +++ b/celery_haystack/conf.py @@ -1,8 +1,8 @@ +from appconf import AppConf from django.conf import settings # noqa from django.core.exceptions import ImproperlyConfigured from haystack import constants from haystack.management.commands import update_index as cmd -from appconf import AppConf class CeleryHaystack(AppConf): @@ -15,11 +15,13 @@ class CeleryHaystack(AppConf): #: The number of retries that are done MAX_RETRIES = 1 #: The default Celery task class - DEFAULT_TASK = 'celery_haystack.tasks.CeleryHaystackSignalHandler' + DEFAULT_TASK = "celery_haystack.tasks.CeleryHaystackSignalHandler" #: The name of the celery queue to use, or None for default QUEUE = None #: Whether the task should be handled transaction safe TRANSACTION_SAFE = True + #: Whether the task results should be ignored + IGNORE_RESULT = False #: The batch size used by the CeleryHaystackUpdateIndex task COMMAND_BATCH_SIZE = None @@ -35,26 +37,29 @@ class CeleryHaystack(AppConf): COMMAND_VERBOSITY = 1 def configure_default_alias(self, value): - return value or getattr(constants, 'DEFAULT_ALIAS', None) + return value or getattr(constants, "DEFAULT_ALIAS", None) def configure_command_batch_size(self, value): - return value or getattr(cmd, 'DEFAULT_BATCH_SIZE', None) + return value or getattr(cmd, "DEFAULT_BATCH_SIZE", None) def configure_command_age(self, value): - return value or getattr(cmd, 'DEFAULT_AGE', None) + return value or getattr(cmd, "DEFAULT_AGE", None) def configure(self): data = {} for name, value in self.configured_data.items(): - if name in ('RETRY_DELAY', 'MAX_RETRIES', - 'COMMAND_WORKERS', 'COMMAND_VERBOSITY'): + if name in ( + "RETRY_DELAY", + "MAX_RETRIES", + "COMMAND_WORKERS", + "COMMAND_VERBOSITY", + ): value = int(value) data[name] = value return data -signal_processor = getattr(settings, 'HAYSTACK_SIGNAL_PROCESSOR', None) - +signal_processor = getattr(settings, "HAYSTACK_SIGNAL_PROCESSOR", None) if signal_processor is None: raise ImproperlyConfigured("When using celery-haystack with Haystack 2.X " diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index d8acedd..c07af20 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -1,14 +1,13 @@ +from celery.task import Task # noqa +from celery.utils.log import get_task_logger +from django.apps import apps from django.core.exceptions import ImproperlyConfigured from django.core.management import call_command -from django.apps import apps - -from .conf import settings - -from haystack import connections, connection_router +from haystack import connection_router +from haystack import connections from haystack.exceptions import NotHandled as IndexNotFoundException -from celery.task import Task # noqa -from celery.utils.log import get_task_logger +from .conf import settings logger = get_task_logger(__name__) @@ -17,6 +16,8 @@ class CeleryHaystackSignalHandler(Task): using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS max_retries = settings.CELERY_HAYSTACK_MAX_RETRIES default_retry_delay = settings.CELERY_HAYSTACK_RETRY_DELAY + ignore_result = settings.CELERY_HAYSTACK_IGNORE_RESULT + store_errors_even_if_ignored = True def split_identifier(self, identifier, **kwargs): """ @@ -24,7 +25,7 @@ def split_identifier(self, identifier, **kwargs): Converts 'notes.note.23' into ('notes.note', 23). """ - bits = identifier.split('.') + bits = identifier.split(".") if len(bits) < 2: logger.error("Unable to parse object " @@ -33,15 +34,15 @@ def split_identifier(self, identifier, **kwargs): pk = bits[-1] # In case Django ever handles full paths... - object_path = '.'.join(bits[:-1]) + object_path = ".".join(bits[:-1]) return (object_path, pk) def get_model_class(self, object_path, **kwargs): """ Fetch the model's class in a standarized way. """ - bits = object_path.split('.') - app_name = '.'.join(bits[:-1]) + bits = object_path.split(".") + app_name = ".".join(bits[:-1]) classname = bits[-1] model_class = apps.get_model(app_name, classname) @@ -58,9 +59,11 @@ def get_instance(self, model_class, pk, **kwargs): try: instance = model_class._default_manager.get(pk=pk) except model_class.DoesNotExist: - logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" % - (model_class._meta.app_label.lower(), - model_class._meta.object_name.lower(), pk)) + logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" % ( + model_class._meta.app_label.lower(), + model_class._meta.object_name.lower(), + pk, + )) except model_class.MultipleObjectsReturned: logger.error("More than one object with pk %s. Oops?" % pk) return instance @@ -70,7 +73,8 @@ def get_indexes(self, model_class, **kwargs): Fetch the model's registered ``SearchIndex`` in a standarized way. """ try: - using_backends = connection_router.for_write(**{'models': [model_class]}) + using_backends = connection_router.for_write( + **{"models": [model_class]}) for using in using_backends: index_holder = connections[using].get_unified_index() yield index_holder.get_index(model_class), using @@ -93,10 +97,12 @@ def run(self, action, identifier, **kwargs): # Then get the model class for the object path model_class = self.get_model_class(object_path, **kwargs) for current_index, using in self.get_indexes(model_class, **kwargs): - current_index_name = ".".join([current_index.__class__.__module__, - current_index.__class__.__name__]) + current_index_name = ".".join([ + current_index.__class__.__module__, + current_index.__class__.__name__ + ]) - if action == 'delete': + if action == "delete": # If the object is gone, we'll use just the identifier # against the index. try: @@ -105,10 +111,10 @@ def run(self, action, identifier, **kwargs): logger.exception(exc) self.retry(exc=exc) else: - msg = ("Deleted '%s' (with %s)" % - (identifier, current_index_name)) + msg = "Deleted '%s' (with %s)" % (identifier, + current_index_name) logger.debug(msg) - elif action == 'update': + elif action == "update": # and the instance of the model class with the pk instance = self.get_instance(model_class, pk, **kwargs) if instance is None: @@ -124,8 +130,8 @@ def run(self, action, identifier, **kwargs): logger.exception(exc) self.retry(exc=exc) else: - msg = ("Updated '%s' (with %s)" % - (identifier, current_index_name)) + msg = "Updated '%s' (with %s)" % (identifier, + current_index_name) logger.debug(msg) else: logger.error("Unrecognized action '%s'. Moving on..." % action) @@ -137,19 +143,20 @@ class CeleryHaystackUpdateIndex(Task): A celery task class to be used to call the update_index management command from Celery. """ + def run(self, apps=None, **kwargs): defaults = { - 'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE, - 'age': settings.CELERY_HAYSTACK_COMMAND_AGE, - 'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE, - 'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS], - 'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS, - 'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY, + "batchsize": settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE, + "age": settings.CELERY_HAYSTACK_COMMAND_AGE, + "remove": settings.CELERY_HAYSTACK_COMMAND_REMOVE, + "using": [settings.CELERY_HAYSTACK_DEFAULT_ALIAS], + "workers": settings.CELERY_HAYSTACK_COMMAND_WORKERS, + "verbosity": settings.CELERY_HAYSTACK_COMMAND_VERBOSITY, } defaults.update(kwargs) if apps is None: apps = settings.CELERY_HAYSTACK_COMMAND_APPS # Run the update_index management command logger.info("Starting update index") - call_command('update_index', *apps, **defaults) + call_command("update_index", *apps, **defaults) logger.info("Finishing update index")