<img height="1" width="1" style="display:none" src="https://www.facebook.com/tr?id=278116885016877&amp;ev=PageView&amp;noscript=1">

, , ,

May 14, 2024 | 7 Minute Read

How To Schedule Periodic Tasks Using Celery Beat

Hanush Kumar, Marketing Associate

Table of Contents

Developers use Cron and Crontabs to schedule repetitive, asynchronous tasks that have to be carried out at specific intervals.

A Cron allows developers to schedule jobs in Unix-like operating systems to run shell scripts or commands periodically at fixed intervals, dates, or times. This automation of tasks is beneficial when you need to conduct periodical system maintenance, download files from the internet, perform repetitive admin tasks, and other such use cases.

Developers face challenges when reconfiguring Crons. First, they have to log into the system running the service. Then, they can access the system configuration file (crontabs) to update Crons. This process may require them to restart services. Additionally, there is no observability, monitoring, logging, and visibility on top of those Cron Jobs.

If you want to run very small commands, Crons perform and transact well because they retain that simplicity. But when you’re using them on a higher level, in complex projects where you need to monitor and visualize their activities, Crons don’t quite meet the expectations.

In that case, developers should move to something more advanced like Celery and Celery Beat to schedule their repetitive or periodic asynchronous tasks.

What Is Celery And How Does It Work?

Celery is an asynchronous distributed task queue system that allows developers to efficiently use compute resources.

Let’s assume you’re calling an API. Now, the connection is open until this API returns some data. As a result, the CPU resources must process something like database queries or wait for some third-party APIs to return data. But that would mean it is wasting CPU cycles.

The CPU keeps programs running continuously. Celery optimizes resource use by running tasks asynchronously, where you run the job, pass it to a queue, and then let the CPU do other tasks, such as responding to other requests.

And one of its key advantages is dynamic task scheduling. In other words, it allows developers to manage tasks programmatically at runtime rather than limiting them to making direct edits only in Crontab files.

What is Celery Beat?

Celery Beat is a periodic task scheduler that belongs to the Celery distributed task queue system. As the name suggests, just like how heartbeats are continuous, Celery Beat has the beat internally, which keeps checking internally (ticking): Are there any schedules I need to run?  

While Celery and Celery Beat are Python-specific tools, they are highly flexible. They are built on an architecture that appreciates diversity and enables them to work with workers written in other languages. All you have to do is configure the Celery package accordingly so it ensures seamless interoperability with the workers.

Why Aren’t Crons A Viable Solution?

If you're using it in a local environment, you can always edit this crontab file. But imagine this situation where you have a production system deployed on multiple machines and multiple VMs right now. 

Assume you want to add a Cron to clean some data from your database. Before proceeding, you may have to answer a couple of questions.

Which machine would you choose to do this? If you’re using an auto-scaling system, you have to know and decide which type of machine will always be available.

How do you know that your resources aren’t misused? You have multiple instances running, and Crons will also be part of these instances. Now, for example, in the first instance where a cron ran, it could’ve replicated itself into the second one also unnecessarily, and it reruns the same thing. Now, that's a clear case of misuse of resources.

Also, there can be situations where you intentionally want Crons to run across multiple services. However, that control is lost in the case of Cron jobs. While you don't get that control while using Crons, you get the same when you use Celery Beat.

Celery Beat’s Real-Life Use Cases

Use Case 1: Generating Periodic Reports

Developers can define a scheduled task within Celery by specifying the time, and it will automatically run, ensuring all those failsafe mechanisms are met and will generate a report.

After generating the report, whether you want to publish it to your users, push it to centralized storage, or pass it to some model is part of your application's use case and how you have to handle that.

However, Celery itself automates task scheduling; no manual intervention is required. You define the task once, and then Celery will take care of it.

Now, how do you schedule the tasks? For example, you can run it every five hours, run it every day at 5 pm, or run it every alternate day at 6 pm. All these nuances in terms of scheduling are also taken care of by using the Crontab mechanisms, such as how Crons are defined.

The Crons are defined as an asterisk. You can use five asterisks to define the time as follows:

The first asterisk denotes the minute 
The second asterisk denotes the hour
The third asterisk denotes the day 
The fourth asterisk denotes the month 
The fifth asterisk denotes the year 

This is how you define when a job should run and at what intervals. Using this syntax, you can now define your Crontab within Celery, and Celery Beat automatically runs the task that needs to be run.

All you have to do is set up the Celery workers. How the workers fetch the tasks and then run them is Celery's job. It’s like delegating everything to Celery: define the repetitive tasks and assign corresponding workers. Now, Celery will handle everything on its own.

Use Case 2: Dynamic Addition Of Periodic Tasks In Run-Time

Processing employee payslips is another repetitive task that occurs every month. But imagine the case where a new employee joins and is onboarded to an HR tool.

Now, you need to generate that person’s payslip and the leave allowances that should be newly added to this user. If the system does not support dynamically adding a new entry into schedules, you may have to stop the service to schedule the jobs and then restart your service.

Since Celery Beat supports dynamic editing, when you create and submit a new employee profile in the HR tool, a new entry is added to Celery Beat’s scheduling system. The job is now scheduled, and the new employee is processed every month.

Scheduling Periodic Tasks With Celery Beat

# First, install celery
$ pip install celery

# Next, have the required dependencies up, like in this case, we will use Redis as our 'broker' and 'result backend.'

# To start a Redis server from docker

docker run -d --name redis-stack-server -p 6379:6379 redis/redis-stack-server:latest


****** celery_app.py ******

import os
from celery import Celery

# define workers who run the tasks in the background
app = Celery(
"Worker",
backend=f"redis://{os.environ.get('REDIS_HOST', 'localhost')}:{os.environ.get('REDIS_PORT', '6379')}/0",
broker=f"redis://{os.environ.get('REDIS_HOST', 'localhost')}:{os.environ.get('REDIS_PORT', '6379')}/0",
)


# define tasks


@app.task
def send_welcome_email(user_id):
    # Fetch user details from the database
    user = User.objects.get(id=user_id)
   
    # Construct the email subject and body
    subject = f"Welcome to Our Platform, {user.username}!"
    message = f"Dear {user.username},\n\nWelcome to our platform We're glad to have you.\n\nBest,\nThe Team"
   
    # Send the email
    send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [user.email])


@app.task(queue="<queue-name>", soft_time_limit=300, ignore_result=True, acks_late=True)
def periodic_task_to_do() -> None:

    # business logic here e.g., generating hourly report
    # Load data from your data source (e.g., database, CSV files)
    data = load_data()  # Placeholder function to load data
   
    # Process and aggregate data
    processed_data = process_data(data)  # Placeholder function to process data
   
    # Generate report
    report = generate_report(processed_data)  # Placeholder function to generate report
   
    # Save the report to a file or send it via email
    save_or_send_report(report)  # Placeholder function to save or send report


****** periodic_beats.py ******

# define celery beat schedules
from celery.schedules import crontab

from celery_app import app, periodic_task_to_do

app.conf.timezone = "UTC"

# Another way
app.conf.beat_schedule = {
    'add-every-minute': {
        'task': 'periodic_task_to_do',
        'schedule': 10.0,   # This runs the task every minute
    },
}

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # Schedule process periodic_task_to_domarkets settlement to run every hour 0th minute
    sender.add_periodic_task(
        crontab(minute="0", hour="*"),  # 0th minute of every hour
        periodic_task_to_do.s(),
        name="process-periodic-task-every-hour",
    )

bg_executor.autodiscover_tasks() # to discover tasks

# To run:

# First, run the workers which listen to the queue for tasks

python -m celery --app=celery_app worker --queues=main-queue --concurrency=10 --loglevel=info

# and then run celery beat

python -m celery -A periodic_beats beat --loglevel=info

You can also use custom schedulers with Celery Beat.

For more details, please refer to Celery Beat’s documentation here.

Handling Failures With Celery Beat

Task Retries

Celery Beat automatically retries a task when it fails due to an exception or error. Instead of marking it as failed immediately, Celery retries executing the task based on a predefined policy.

This predefined policy specifies parameters such as delay between retry attempts, maximum number of retry attempts, and back-off strategy.

 

logger = logging.getLogger(__name__)

@app.task(bind=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True)
def send_welcome_email(self, user_id):
    try:
        # Fetch user details from the database
        user = User.objects.get(id=user_id)
       
        # Construct the email subject and body
        subject = f"Welcome to Our Platform, {user.username}!"
        message = f"Dear {user.username},\n\nWelcome to our platform We're glad to have you.\n\nBest,\nThe Team"
       
        # Send the email
        send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [user.email])
    except Exception as ex:
        logger.exception("Failed to send welcome email")
        self.retry(countdown=3**self.request.retries)

 

@app.task: This decorator defines a Celery task within a module other than the main module of your Celery application. It's similar to shared_task but is used when you want to define tasks within a specific application or feature module.

bind=True: This argument allows the task to access the self-keyword argument, which is necessary for using the retry method within the task. Without bind=True, the task would not be able to call self.retry.

autoretry_for=(Exception,): This specifies that the task should automatically retry on any exception. If you want to retry only on certain types of errors, you can replace 'Exception' with specific exceptions.

max_retries=3: This sets the maximum number of retries for the task. If the task fails after being retried this many times, it will stop retrying and raise the last exception.

retry_backoff=True: This enables exponential backoff for retries, meaning the delay between retries increases exponentially, reducing the load on the system and giving transient issues time to resolve themselves.

self.retry(countdown=3**self.request.retries): This line attempts to retry the task. The countdown parameter specifies the initial delay before the first retry. It uses an exponential backoff strategy, where the delay increases exponentially with each retry attempt.

Timeouts

Developers can configure the maximum duration for a task to get executed. Celery Beat terminates a task and marks it as failed when it exceeds the defined timeout period. As a result, you can ensure that tasks don’t run for prolonged periods and consume all the resources at your disposal.

Monitoring & Troubleshooting

Developers can install Flower, a monitoring tool that can be integrated with Celery. Flower provides all the information that a developer requires about all the Celery jobs, including the scheduled periodic tasks, which tasks have already been run, and receives alerts for abnormal behaviors. As a result, it helps you track task execution status and detect failures beforehand.

Error Handling & Logging

Celery Beat comes bundled with error handling and logging mechanisms. Developers can use these mechanisms to capture and report errors while executing a task. Also, Celery can log the stack traceback, message, and other relevant information to the console, file, or syslog whenever a task fails. This provides an option for developers to go back to troubleshoot issues.

Final Thoughts

Scheduling automated, repetitive tasks like database cleaning or report generation relieves developers of a huge burden and aids with proper resource utilization.

Unlike Crons requiring access and updates to system-level files and crontabs to make changes to a job, Celery Beat is a solution that helps developers schedule tasks programmatically. Moreover, it comes with fail-proof mechanisms such as error handling and logging, timeouts, and retries to ensure all processes run efficiently.

Schedule a meeting with our experts to learn more about how Celery Beat can improve your work.

About the Author
Qais Qadri, Senior Software Engineer
About the Author

Qais Qadri, Senior Software Engineer

Qais enjoys exploring places, going on long drives, and hanging out with close ones. He likes to read books about life and self-improvement, as well as blogs about technology. He also likes to play around with code. Qais lives by the values of Gratitude, Patience, and Positivity.


Hanush_img

Hanush Kumar, Marketing Associate

Hanush finds joy in YouTube content on automobiles and smartphones, prefers watching thrillers, and enjoys movie directors' interviews where they give out book recommendations. His essential life values? Positivity, continuous learning, self-respect, and integrity.

Back to Top