首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Django Celery定时任务的接口化开发

Django Celery定时任务的接口化开发

原创
作者头像
保持热爱奔赴山海
发布2025-12-22 21:38:07
发布2025-12-22 21:38:07
1700
举报
文章被收录于专栏:DevOpsDevOps

新建一个django项目,这里假设项目名叫 demo ,我这里用的是一个在跑的项目做演示,所以用的包相对比较多。

代码语言:txt
复制
Django==4.1.2
djangorestframework==3.14.0
django-celery-beat==2.4.0
django-celery-results==2.4.0
celery==5.2.7

创建应用

代码语言:txt
复制
python manage.py startapp tasks

编辑 demo/demo/settings.py

代码语言:txt
复制
INSTALLED_APPS = [
   .. 省略 ...
    "rest_framework",
    "celery",
    "django_celery_results",
    "django_celery_beat",
    "tasks",
]

其余的配置,例如celery的配置和数据库的配置,不是这里的重点,不贴代码了

编辑 demo/demo/celery.py

代码语言:txt
复制
import os
import logging

import pymysql

pymysql.install_as_MySQLdb()
pymysql.version_info = (1, 4, 6, "final", 0)


from celery import Celery

# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')

# 实例化
app = Celery('demo')
logger = logging.getLogger(__name__)

# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动从Django的已注册app中发现任务
app.autodiscover_tasks()

# 日志格式
app.conf.update(
    worker_log_format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
    worker_task_log_format='[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id).8s)] %(message)s',
    worker_log_level='INFO',
)


# 一个测试任务
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

编辑 demo/demo/__init__.py

代码语言:txt
复制
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ('celery_app',)

import pymysql

pymysql.install_as_MySQLdb()
pymysql.version_info = (1, 4, 6, "final", 0)

编辑 tasks相关的文件

1 demo/tasks/models.py

代码语言:txt
复制
from django.db import models

# Create your models here.
from django.db import models
from django.utils import timezone


class ScheduledTask(models.Model):
    TASK_TYPES = (
        ('periodic', '周期性任务'),
        ('one_time', '一次性任务'),
    )

    name = models.CharField('任务名称', max_length=100)
    task_type = models.CharField('任务类型', max_length=20, choices=TASK_TYPES)
    task_function = models.CharField('任务函数', max_length=200)
    cron_expression = models.CharField('Cron表达式', max_length=100, blank=True, null=True)
    interval_seconds = models.IntegerField('间隔秒数', blank=True, null=True)
    next_run_time = models.DateTimeField('下次执行时间', blank=True, null=True)
    is_active = models.BooleanField('是否激活', default=True)
    created_at = models.DateTimeField('创建时间', auto_now_add=True)
    updated_at = models.DateTimeField('更新时间', auto_now=True)

    def __str__(self):
        return self.name

    class Meta:
        verbose_name = '定时任务'
        verbose_name_plural = '定时任务列表'


class TaskExecutionLog(models.Model):
    task = models.ForeignKey(ScheduledTask, on_delete=models.CASCADE, related_name='logs')
    execution_time = models.DateTimeField('执行时间', auto_now_add=True)
    status = models.CharField('执行状态', max_length=20, choices=(
        ('success', '成功'),
        ('failed', '失败'),
    ))
    result = models.TextField('执行结果', blank=True, null=True)
    error_message = models.TextField('错误信息', blank=True, null=True)

    def __str__(self):
        return f"{self.task.name} - {self.execution_time}"

    class Meta:
        verbose_name = '任务执行日志'
        verbose_name_plural = '任务执行日志列表'

然后,迁移数据库

代码语言:txt
复制
python manage.py makemigrations
python manage.py migrate

2 demo/tasks/serializers.py

代码语言:txt
复制
import json
from rest_framework import serializers
from django_celery_beat.models import PeriodicTask, CrontabSchedule


class OneOffTaskSerializer(serializers.Serializer):
    task_name = serializers.CharField(help_text="e.g. 'myapp.tasks.send_email'")
    args = serializers.JSONField(required=False, default=list)
    kwargs = serializers.JSONField(required=False, default=dict)
    countdown = serializers.IntegerField(required=False, min_value=0, help_text="Delay in seconds")
    eta = serializers.DateTimeField(required=False, help_text="Exact execution time (ISO 8601)")

    def validate_task_name(self, value):
        from celery import current_app
        if value not in current_app.tasks:
            raise serializers.ValidationError(f"Task '{value}' is not registered.")
        return value


class CrontabScheduleSerializer(serializers.ModelSerializer):
    cron_expression = serializers.SerializerMethodField()

    class Meta:
        model = CrontabSchedule
        fields = ['id', 'minute', 'hour', 'day_of_week', 'day_of_month', 'month_of_year', 'cron_expression']

    def get_cron_expression(self, obj):
        return f"{obj.minute} {obj.hour} {obj.day_of_month} {obj.month_of_year} {obj.day_of_week}"


class PeriodicTaskSerializer(serializers.ModelSerializer):
    # 写入时使用字符串 cron_expression
    cron_expression = serializers.CharField(write_only=True, required=False, help_text="e.g. '*/10 * * * *'")

    # 读取时展示结构化 crontab 或 cron 表达式
    crontab_detail = CrontabScheduleSerializer(source='crontab', read_only=True)
    cron_expression_display = serializers.SerializerMethodField(read_only=True)

    class Meta:
        model = PeriodicTask
        fields = [
            'id', 'name', 'task', 'enabled',
            'args', 'kwargs',
            'cron_expression',  # write-only
            'cron_expression_display',  # read-only
            'crontab_detail',
            'interval',  # 注意:cron 任务应为 null
            'last_run_at', 'total_run_count', 'description'
        ]
        read_only_fields = ['id', 'last_run_at', 'total_run_count', 'interval']

    def get_cron_expression_display(self, obj):
        if obj.crontab:
            return f"{obj.crontab.minute} {obj.crontab.hour} {obj.crontab.day_of_month} {obj.crontab.month_of_year} {obj.crontab.day_of_week}"
        return None

    def validate_cron_expression(self, value):
        parts = value.strip().split()
        if len(parts) != 5:
            raise serializers.ValidationError("Cron expression must contain exactly 5 parts.")
        return parts

    def validate_args(self, value):
        if not value:
            return "[]"
        try:
            parsed = json.loads(value)
            if not isinstance(parsed, list):
                raise serializers.ValidationError("args must be a JSON array.")
        except json.JSONDecodeError as e:
            raise serializers.ValidationError(f"Invalid JSON in args: {str(e)}")
        return value

    def validate_kwargs(self, value):
        if not value:
            return "{}"
        try:
            parsed = json.loads(value)
            if not isinstance(parsed, dict):
                raise serializers.ValidationError("kwargs must be a JSON object.")
        except json.JSONDecodeError as e:
            raise serializers.ValidationError(f"Invalid JSON in kwargs: {str(e)}")
        return value

    def create(self, validated_data):
        cron_parts = validated_data.pop('cron_expression', None)
        crontab = None

        if cron_parts:
            minute, hour, day_of_month, month_of_year, day_of_week = cron_parts
            crontab, _ = CrontabSchedule.objects.get_or_create(
                minute=minute,
                hour=hour,
                day_of_week=day_of_week,
                day_of_month=day_of_month,
                month_of_year=month_of_year,
            )

        # 确保 interval 为 None(因为是 cron 任务)
        validated_data['interval'] = None
        validated_data['crontab'] = crontab
        return super().create(validated_data)

    def update(self, instance, validated_data):
        cron_parts = validated_data.pop('cron_expression', None)
        crontab = None

        if cron_parts:
            minute, hour, day_of_month, month_of_year, day_of_week = cron_parts
            crontab, _ = CrontabSchedule.objects.get_or_create(
                minute=minute,
                hour=hour,
                day_of_week=day_of_week,
                day_of_month=day_of_month,
                month_of_year=month_of_year,
            )

        # 更新 crontab,清空 interval(防止混用)
        instance.crontab = crontab
        instance.interval = None

        # 更新其他字段
        for attr, value in validated_data.items():
            setattr(instance, attr, value)

        instance.save()
        return instance


class RegisteredTaskSerializer(serializers.Serializer):
    name = serializers.CharField(read_only=True)

3 demo/tasks/tasks.py 创建一个测试用的task

代码语言:txt
复制
from celery import shared_task
import logging

logger = logging.getLogger(__name__)


@shared_task
def add_task(a, b):
    return a + b

3 demo/tasks/urls.py

代码语言:txt
复制
from django.urls import include, path
from rest_framework import routers
# from .views import CreatePeriodicTaskView
from . import views

router = routers.DefaultRouter()
router.register(r'periodic_tasks', views.PeriodicTaskViewSet)

urlpatterns = [
    path('', include(router.urls)),
    path('oneoff_task/', views.RunOneOffTaskView.as_view(), name='run-one-off-task'),
    path('registered_tasks/', views.RegisteredTasksListView.as_view(), name='registered-tasks'),  # ← 新增

]

4 demo/tasks/views.py

代码语言:txt
复制
from rest_framework import viewsets
from django_celery_beat.models import PeriodicTask
from .serializers import PeriodicTaskSerializer
from .serializers import OneOffTaskSerializer
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from celery import current_app
from utils.pagination import PageNum
from utils.custom_renderer import CustomRenderer
from rest_framework.filters import OrderingFilter, SearchFilter
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.generics import ListAPIView


# 周期性任务管理
class PeriodicTaskViewSet(viewsets.ModelViewSet):
    """
    周期性任务管理 API
    - 支持通过 cron_expression 创建/更新 cron 任务
    - 自动校验 args/kwargs 为合法 JSON
    - 禁止同时设置 interval 和 crontab(本实现强制 interval=None)
    """
    queryset = PeriodicTask.objects.all()
    serializer_class = PeriodicTaskSerializer
 
    # 注意我这里用了些自定义的分页和renderer,如果你没有的话,这些都可以先注释掉,不影响使用
    pagination_class = PageNum

    filter_backends = [OrderingFilter, DjangoFilterBackend, SearchFilter]
    ordering_fields = "__all__"
    filterset_fields = "__all__"
    search_fields = ("name", "task", "description",)

    renderer_classes = [CustomRenderer]

    def get_queryset(self):
        # 可选:只返回用户自定义任务(排除 Celery 内置任务)
        return PeriodicTask.objects.exclude(name__startswith='celery.')


# 创建一次性任务
class RunOneOffTaskView(APIView):
    def post(self, request):
        serializer = OneOffTaskSerializer(data=request.data)
        if serializer.is_valid():
            task_name = serializer.validated_data['task_name']
            args = serializer.validated_data.get('args', [])
            kwargs = serializer.validated_data.get('kwargs', {})
            countdown = serializer.validated_data.get('countdown')
            eta = serializer.validated_data.get('eta')

            task = current_app.tasks[task_name]

            async_result = task.apply_async(
                args=args,
                kwargs=kwargs,
                countdown=countdown,
                eta=eta
            )

            return Response({
                "task_id": async_result.id,
                "status": "submitted",
                "task_name": task_name
            }, status=status.HTTP_202_ACCEPTED)

        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)


class RegisteredTasksListView(ListAPIView):

    def list(self, request, *args, **kwargs):
        # 获取所有任务名(排除 Celery 内置任务)
        all_tasks = list(current_app.tasks.keys())

        # 过滤掉 Celery 系统任务(可选)
        user_tasks = [name for name in all_tasks if not name.startswith('celery.')]

        return Response({
            "tasks": sorted(user_tasks)
        })

5 vim demo/demo/urls.py 编辑项目的urls.py
代码语言:txt
复制
urlpatterns = [
    path('api/v1/tasks/', include('tasks.urls')),
    ...  省略 ....
]

这样就完成了。

接口示例

代码语言:txt
复制
1 周期性任务
http://127.0.0.1:8000/api/v1/tasks/periodic_tasks/
{
  "name": "test every 3 mins",
  "description":"描述信息-1122",
  "task": "tasks.tasks.add_task",
  "cron_expression": "*/3 * * * *",
  "args": "[11, 22]",
  "kwargs": "{}",
  "enabled": true
}

注意name名称不能相同,相同的话则无法提交。


2 一次性任务
POST /api/v1/tasks/oneoff_task/
{
  "task_name": "tasks.tasks.send_email",
  "kwargs": {
    "to": "[email protected]",
    "subject": "ddddd",
    "body": "Hello"
  }
}

3 修改任务,也是标准的restful接口
PUT /api/v1/tasks/periodic_tasks/25/
{
    "id": 25,
    "name": "test",
    "task": "demo.tasks.add",
    "enabled": true,
    "args": "[2,3]",
    "kwargs": "{}",
    "cron_expression": "*/5 * * * *",
    "description": ""
}

后端接口化改造完成后,我们就可以写个前端封装下,大致效果如下:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 [email protected] 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 [email protected] 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档