Cron Jobs

Monitoring Celery | An Actionable Guide

By: Rajkumar Venkatasamy|Last Updated: Feb 13, 2023

Have you ever experienced a web/mobile application accepting your request to upload videos or images immediately but performing the actual video or image upload processing work behind the scenes? If so, you’ve seen an application that works based on the task queue concept.

Task queues aid in the registration of time-consuming tasks and execute them in another compute layer, thereby freeing up the user-facing HTTP request and response lifecycle. In addition to freeing up the application server to handle other incoming requests, this kind of approach also enhances the user experience. You don’t need to wait around for your long-running task to get completed. After the request registration in the queue, you will be immediately given an acknowledgment with a task ID for tracking so you can focus on other things.

Once the task processing is complete, you’ll be updated on the task status in one of the following ways:

  • By making a call to an API exposed for such a tracking purpose
  • By checking the report/dashboard page designed for that purpose
  • Via a notification service (if you have an active notification subscription in the system)

Celery, “a task queue implementation for Python web applications used to asynchronously execute work outside the HTTP request-response cycle,” was designed and created to facilitate the above characteristics in an application. It is especially useful for serving use cases that deal with long-running processes.

In this article, you will learn more about Celery by exploring how it works, how and why it should be monitored, and how Cronitor can help you ensure that your scheduled and background tasks in Celery are properly executed.

To access the source code associated with this article, check out this GitHub repository.

Components of a Celery-Based Tech Stack

Any tech stack involving Celery has the following key components:

  • Task producer: your application that, on receiving a manual or system-generated event-based request or one based on a schedule, registers the task in the task queue
  • Task queue: the message broker, which can also act as a storage layer
  • Task scheduler: handles the scheduled jobs via celery beat, the recurring job default scheduler in the Celery ecosystem
  • Task consumer: consumes the registered tasks and executes the actual processing logic, also known as celery workers
  • Backend: stores the result of the task

For more information, refer to Celery’s documentation.

Why Do You Need to Monitor Celery?

The key components that make up a Celery-based tech stack must operate in harmony to meet the goal of processing an assigned task. However, things can go wrong in production for any of these components, leading to process failure. For instance, common issues might involve any of the following scenarios:

  • The queue occupancy rate is faster than usual (anomaly situation).
  • The same task gets processed in multiple workers against the expectation (duplicate task-processing scenario).
  • Celery workers may cause latency in the processing front, leading to slow progress in the task queue.

Monitoring Celery helps you keep an eye out for issues like these and more. You’ll be able to easily identify things like the following:

  • How many workers are online, and how many are offline?
  • How many active tasks are handled by a specific worker?
  • How many scheduled tasks are involved?
  • How many tasks failed in processing?
  • How many tasks got registered in the queue?
  • How much is the latency or elapsed time for a task?
  • What is the configured frequency of a scheduled job/task?
  • What details are available about system performance?

How to Monitor Celery

Let’s get started and take a closer look at monitoring Celery. In this section, you will learn how to do so using the following methods:

  • A command-line utility of Celery
  • Flower, a GUI-based monitoring tool
  • Cronitor

Monitoring Celery Tasks Using Command-Line Utilities

This section will provide some example source codes of the celery command-line utility.

Execute the following commands in the machine’s/container’s terminal where Celery is installed or where the Celery worker is running. All the given commands are applicable to monitor regular tasks and scheduled tasks.

To List All Active Tasks Processed by Workers

celery inspect active

To List All Registered Tasks Processed by Workers

celery inspect registered

To List All Active Queues in Celery

celery inspect active_queues

To Monitor System Performance

celery inspect stats

To Monitor the Scheduled Job Frequency

celery inspect conf

Output snippet:

->
    {
        "beat_schedule": {
            "scheduled_delayed_greetings": {
                "schedule": "<crontab: * * * * * (m/h/d/dM/MY)>",
                "task": "celery_beat_tasks.tasks.scheduled_delayed_greetings"
            }
        },

Here, the schedule variable holds the task frequency. In this example, the task is scheduled to execute for every minute using celery beat.

To List Workers and Their Availability Status

celery status

This command will list all workers and tell you how many are online.

Monitoring Celery Tasks Using Flower

Flower is a GUI-based tool, and it comes in quite handy to surf through web pages to monitor various aspects of Celery. Using the Flower Dashboard, you can monitor the following:

  • The number of workers and their status
  • The number of active tasks, failed tasks, and successful tasks
  • The workload of the worker instance

Flower Dashboard

With the Flower Tasks view, you can monitor the following:

  • The tasks assigned to each worker
  • The task status
  • Latency in the queue for a particular task (the time difference between “Received” and “Started” is the latency for that task to be picked up from the queue)
  • The elapsed time (“Runtime” field) of each task by the worker
  • Arguments, if any are supplied as part of the task processing

Flower Tasks

Flower also provides relevant system usage statistics on the “System” tab:

Flower system usage statistics

If you have scheduled a celery beat task, Flower can help you view the job/task frequency on the “Config” tab:

Flower configuration details

As you can see, Flower offers some helpful monitoring benefits. However, note that no features exist to support alert or notification mechanisms out of the box.

In the next section, you will learn about a state-of-the-art monitoring tool called Cronitor that, in addition to alert notification capabilities, offers several other features to make monitoring Celery and its tasks a breeze.

Monitoring Celery with Cronitor

Cronitor is a popular choice for end-to-end monitoring of websites, scheduled jobs, APIs, and any application or program that supports sending or receiving an HTTP request. It also offers powerful assertions capabilities to verify the API response. Of course, the capabilities of the Cronitor application do not end with this. To learn about all of Cronitor’s features, check out the official site.

Cronitor’s easy-to-use GUI-based application and powerful SDKs make the integration of Celery with Cronitor a painless process. In this section, you’ll learn how to make use of the Cronitor application and its Python SDK to monitor a Celery-based application.

Set Up Your Cronitor Account

To begin, sign up for a free account, in which you can view and manage all of your alerts/notifications and monitored jobs, websites, or APIs. Once signed in, you should see a homepage like this:

Cronitor home page

Since you don’t yet have any active monitors in place, the welcome page provides links to help you get started with the Cronitor application.

Install Cronitor SDK

Next, move on to integrating Cronitor’s SDK libraries with your application. In this case, you’ll be using a Python-based Celelery application built on top of the Flask framework. The package installer used in this example is pip.

Execute the following command in your Python virtual environment to install the cronitor and celery libraries:

pip install cronitor==4.4.4
pip install celery==5.2.6

Cronitor and Celery: Example Integration Scenario

In Celery, you may need to deal with either or both of regular background tasks and scheduled tasks. The Cronitor-Celery integration example that you see here deals with both to give you a better understanding of how to deal with such tasks.

As part of this example, the Cronitor application monitors a scheduled task in Celery. If the scheduled task fails to get executed in an expected time frame (every twenty seconds in this case), an alert will be generated. The application in this example exposes an endpoint, /celery/stats, to return system statistics information to the caller. Cronitor monitors this endpoint, and assertions will be on the response code, response time, and one of the elements in the response body. If any of the assertion conditions fail, an alert is triggered.

Create a Job Monitor in Cronitor Application

Open a browser and sign in to your Cronitor account. Then select Create A Monitor on the Jobs page:

Create Job Monitor

Fill in the configuration options displayed on the Create A Monitor screen. In this case, the configuration is set to monitor the scheduled task every twenty seconds. If the scheduled task doesn’t execute every twenty seconds, a failure alert gets generated. Furthermore, an email gets sent to your default email id that is registered with the Cronitor application. Email notification is the default alert notification mechanism in Cronitor. This notification configuration can be viewed or edited on the Settings page:

Alert settings

Now that you’ve seen how to set up manual monitoring through the Cronitor application, let’s take a look at another example in which you’ll set up monitoring programmatically using the Cronitor SDK.

Define tasks.py

Create a Python file named tasks.py and paste the following code:


import os
import time

import cronitor.celery
from celery import Celery

celery = Celery(__name__, backend=os.getenv("CELERY_RESULT_BACKEND"), broker=os.getenv("CELERY_BROKER_URL"))

# See Celery's config page (https://docs.celeryq.dev/en/stable/userguide/configuration.html) for more details
celery.conf.imports = ('tasks',)
celery.conf.timezone = 'UTC'
celery.conf.result_expires = 120
celery.conf.beat_schedule = {
    'scheduled_delayed_greetings': {
        'task': 'tasks.scheduled_delayed_greetings',
        # Every 20 Seconds
        'schedule': 20
    }
}

# Auto discover all tasks for monitoring in cronitor
cronitor.celery.initialize(celery, api_key="<Paste your Cronitor API Key here>")


@celery.task()
def scheduled_delayed_greetings():
    print("Inside scheduled delayed greetings")
    time.sleep(20)
    print("Completed scheduled delayed greetings")


@celery.task()
def get_celery_stats():
    print("Inside post_celery_stats")
    inspect_output = celery.control.inspect()
    print("Stats ", inspect_output.stats(), flush=True)
    return inspect_output.stats()


cronitor.Monitor.put([{
    'type': 'check',
    'key': 'Celery Stats',
    'request': {
        'url': 'https://<Your App's Public HOST/IP and port>/celery/stats',
        'regions': ['us-east-1', 'eu-central-1', 'ap-south-1']
    },
    'assertions': [
        'response.code = 200',
        'response.time < 1s',
        'response.body contains tasks.get_celery_stats'
    ]
}
])
Explanation of tasks.py

Here’s an overview of what’s happening in each part of the above code.

To begin with, you need to define a Celery instance with the first argument as app name, followed by the backend to store the results and a broker URL to register and process the messages/events sent to the queues:

celery = Celery(__name__, backend=os.getenv("CELERY_RESULT_BACKEND"), broker=os.getenv("CELERY_BROKER_URL"))

Then the configuration of the Celery instance is updated as required. An important thing to note here is the scheduled frequency of the Celery task, which is a method defined in tasks.py named scheduled_delayed_greetings. The task is scheduled to run every twenty seconds:

# See Celery's config page (https://docs.celeryq.dev/en/stable/userguide/configuration.html) for more details
celery.conf.imports = ('tasks',)
celery.conf.timezone = 'UTC'
celery.conf.result_expires = 120
celery.conf.beat_schedule = {
    'scheduled_delayed_greetings': {
        'task': 'tasks.scheduled_delayed_greetings',
        # Every 20 Seconds
        'schedule': 20
    }
}

Next, the cronitor object is initialized programmatically by providing the Celery instance, which was defined earlier, along with the API key of the Cronitor application. You can find the API key by signing in to your Cronitor application account and visiting this page.

cronitor.celery.initialize(celery, api_key="<paste your Cronitor API Key here>")

The method scheduled_delayed_greetings defines the scheduled task processing logic. In this case, it just prints some information in the log and sleeps for twenty seconds. Note that the decorator @celery.task denotes a Celery task. This decorator syntax remains the same for both the regular and scheduled task. The difference comes when the method (scheduled_delayed_greetings) is defined as part of the celery beat schedule config, which you saw earlier.

@celery.task()
def scheduled_delayed_greetings():
    print("Inside scheduled delayed greetings")
    time.sleep(20)
    print("Completed scheduled delayed greetings")

Then comes the function get_celery_stats, which is a regular Celery background task. This method uses Celery’s inspect() method (exposed by Celery’s SDK) to get the performance statistics and print them in the log. This function also returns the stats information to the caller. You will see the caller definition in a later section.

@celery.task()
def get_celery_stats():
    print("Inside post_celery_stats")
    inspect_output = celery.control.inspect()
    print("Stats ", inspect_output.stats(), flush=True)
    return inspect_output.stats()

Here is the sample output of system performance stats returned by the Celery inspect() method call.

{'celery@b1903b37c511': {'total': {'tasks.get_celery_stats': 13, 'tasks.scheduled_delayed_greetings': 21}, 'pid': 1, 'clock': '700', 'uptime': 479, 'pool': {'max-concurrency': 1, 'processes': [8], 'max-tasks-per-child': 'N/A', 'put-guarded-by-semaphore': False, 'timeouts': [0, 0], 'writes': {'total': 34, 'avg': '1.00', 'all': '1.00', 'raw': '34', 'strategy': 'fair', 'inqueues': {'total': 1, 'active': 0}}}, 'broker': {'hostname': 'redis', 'userid': None, 'virtual_host': '0', 'port': 6379, 'insist': False, 'ssl': False, 'transport': 'redis', 'connect_timeout': 4, 'transport_options': {}, 'login_method': None, 'uri_prefix': None, 'heartbeat': 120.0, 'failover_strategy': 'round-robin', 'alternates': []}, 'prefetch_count': 4, 'rusage': {'utime': 0.9243969999999999, 'stime': 0.16507, 'maxrss': 44788, 'ixrss': 0, 'idrss': 0, 'isrss': 0, 'minflt': 17220, 'majflt': 13, 'nswap': 0, 'inblock': 0, 'oublock': 688, 'msgsnd': 0, 'msgrcv': 0, 'nsignals': 0, 'nvcsw': 1877, 'nivcsw': 25}}}

Finally, the code snippet below creates a Cronitor monitor programmatically. The scope of this monitor definition is to monitor the endpoint /celery/stats from three different locations. This sort of monitoring type is called check, which is used to monitor a website or an API endpoint. Note that the assertions feature of the Cronitor platform is used here to validate the response conditions. The check conditions include the following:

  • The response code should be returned as 200.
  • The response time of the API call should be less than one second.
  • The response body should contain a value as tasks.get_celery_stats, which in this case is nothing but the regular task function defined in the tasks.py file.
cronitor.Monitor.put([{
    'type': 'check',
    'key': 'Celery Stats',
    'request': {
        'url': 'https://<Your App's Public HOST/IP and port>/celery/stats',
        'regions': ['us-east-1', 'eu-central-1', 'ap-south-1']
    },
    'assertions': [
        'response.code = 200',
        'response.time < 1s',
        'response.body contains tasks.get_celery_stats'
    ]
}
])
Define app.py

Here is the app.py code for the sample Flask application:

from flask import Flask, jsonify
from tasks import get_celery_stats

app = Flask(__name__)

@app.route('/celery/stats')
def get_celery_stats_route():
    response = get_celery_stats.delay().get()
    print("Get AsyncResult response ", response, flush=True)
    return jsonify(response), 200


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=True)

This is the code that has the route /celery/stats defined in it. This route is responsible for fetching the Celery system’s performance stats via regular Celery tasks. The following code snippet is responsible for registering the function get_celery_stats in tasks.py as a Celery task and fetching the results:

get_celery_stats.delay().get()
Screenshots from the Cronitor Application

Once you’ve finished setting up your monitors, your Cronitor account will show you all kinds of helpful information. On the home page, the green color bars refer to successful monitoring conditions, and the red color corresponds to failed monitoring conditions:

Home page screenshot

You’ll also receive an email notification when a scheduled task fails to execute:

Email alert for failure

And you’ll receive another email when the scheduled task has recovered from failure:

Email alert for recovered

Finally, you’ll be able to view the success or failure status of the check monitor performed on the /celery/stats/ endpoint on the “Latest Activity” page:

Check monitor activities

Conclusion

Monitoring Celery is critical for your applications running in production. In this article, you explored several methods for monitoring your tasks in Celery. While you can use the Celery command-line utility or Flower to get a broad overview of what your tasks are doing, be sure to consider setting up monitoring with Cronitor so that you can receive proactive alerts when things don’t go according to plan.

Cronitor offers many capabilities with its easy-to-use GUI-based application and powerful SDK that makes monitoring simple. Whenever a failure is encountered, Cronitor notifies you via custom channels such as email, SMS, or even social media apps like Slack, allowing you to resolve issues fast. Sign up for free today or request a demo to learn more.

Previous
Sidekiq Cron Jobs