Tag: celery

  • Celery Worker wide configuration

    Celery is a distributed task execution environment for Python. While the emphasis is on distributed in this software, the concept of having workers allows for settings beyond the individual task. While the first rule of optimisation is “don’t”, sharing database connections is a low hanging fruit in most cases. And this can be configured per worker with Celery provided signals. To create a database connection for individual worker instances, leverage these signals to create the connection when the worker starts.

    This can be achieved leveraging the worker_process_init signal, and the corresponding worker_process_shutdown signal to clean up when the worker shuts down.

    The code should obviously be picked up at worker start, hence the tasks.py file will be a good location to keep these settings.

    Example tasks.py:

    from celery.signals import worker_process_init
    from celery.signals import worker_process_shutdown
    
    app = Celery('tasks', broker=CELERY_BROKER_URL)
    db = None
    
    @worker_process_init.connect
    def init_worker(**kwargs):
      global db
      log.debug('Initializing database connection for worker.')
      db = sqlite3.connect("urls.sqlite")
    
    @worker_process_shutdown.connect
    def shutdown_worker(**kwargs):
      global db
      if db:
        log.debug('Closing database connectionn for worker.')
        db.close()
    

    The example above opens a connection to a sqlite3 database, which in itself has other issues, but is only meant as an example. This connection is established for each individual worker at startup.

  • Django with Celery

    Erste Schritte mit Celery und Django-Celery

    Installation:

    In debian wheezy, einfach folgende Pakete installieren:

    python-celery
    python-django-celery

    settings.py:

    INSTALLED_APPS = (
      ...,
      'djcelery',
      'kombu.transport.django',
      ...,
    )
    import djcelery
    djcelery.setup_loader()
    BROKER_URL="django://"

    PROJECT/APP/tasks.py:

    from celery.task import Task
    from celery.registry import tasks
    
    class SomeTask(Task):
      def run(self, SomeArg, **kwargs):
        return SomeResult
    
    tasks.register(SomeTask)

    celeryd:

    ./manage.py celeryd

    Allerdings produziert settings.DEBUG folgende Warning:

    UserWarning: Using settings.DEBUG leads to a memory leak, never use
    this setting in production environments!

    See also: