Tag: concurrency

  • Celery Worker wide configuration

    Celery is a distributed task execution environment for Python. While the emphasis is on distributed in this software, the concept of having workers allows for settings beyond the individual task. While the first rule of optimisation is “don’t”, sharing database connections is a low hanging fruit in most cases. And this can be configured per worker with Celery provided signals. To create a database connection for individual worker instances, leverage these signals to create the connection when the worker starts.

    This can be achieved leveraging the worker_process_init signal, and the corresponding worker_process_shutdown signal to clean up when the worker shuts down.

    The code should obviously be picked up at worker start, hence the tasks.py file will be a good location to keep these settings.

    Example tasks.py:

    from celery.signals import worker_process_init
    from celery.signals import worker_process_shutdown
    
    app = Celery('tasks', broker=CELERY_BROKER_URL)
    db = None
    
    @worker_process_init.connect
    def init_worker(**kwargs):
      global db
      log.debug('Initializing database connection for worker.')
      db = sqlite3.connect("urls.sqlite")
    
    @worker_process_shutdown.connect
    def shutdown_worker(**kwargs):
      global db
      if db:
        log.debug('Closing database connectionn for worker.')
        db.close()
    

    The example above opens a connection to a sqlite3 database, which in itself has other issues, but is only meant as an example. This connection is established for each individual worker at startup.