运维咖啡吧

享受技术带来的乐趣,体验生活给予的感动

Django动态添加定时任务之django-celery的使用

定时任务和周期任务在我们日常工作中应用广泛,例如定时发布、周期巡检等,通常我们会借助Linux下的Crontab来实现,但如何将这一功能搬进我们自研的运维系统呢?借助django-celery即可轻松完成,本篇文章就通过自定义任务引擎Probius中计划任务的实现来介绍django-celery的使用

Celery是基于Python开发的一个分布式任务队列框架,主要用来实现异步任务,具体介绍和用法可以看我之前写的这篇文章:Django配置Celery执行异步任务和定时任务,Django本身不支持异步,要想实现异步的话借助Celery是个不错的选择,上边这篇文章就提供了django集成celery的方法,但其配置稍微复杂,且不支持动态添加定时任务,django-celery的出现很好的解决了这个问题

django-celery为django提供了celery集成,同时支持将结果存储在django的orm或cache中,最重要的是支持从数据库动态读取计划任务并执行,这也就是说我们只需要将需要执行的加护任务插入数据库,django-celery就可以自动发现并执行了,接下来就具体看下如何实现

安装配置

1.安装django-celery

pip install django-celery

2.settings.pyINSTALLED_APPS中加入djcelery

INSTALLED_APPS = [
    ...
    'djcelery',
]

3.settings.py中添加如下配置

import djcelery
djcelery.setup_loader()

# BROKER和BACKEND配置,这里用了本地的redis,其中1和2表示分别用redis的第一个和第二个db
BROKER_URL = 'redis://localhost/1'
CELERY_RESULT_BACKEND = 'redis://localhost/2'

# celery 关闭UTC时区
CELERY_ENABLE_UTC = False

# celery 并发数设置,最多可以有20个任务同时运行
CELERYD_CONCURRENCY = 20
CELERYD_MAX_TASKS_PER_CHILD = 4

# celery开启数据库调度器,数据库修改后即时生效
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

from celery import platforms

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

4.启动celery

python manage.py celery worker -l info

至此就可以使用djcelery来处理异步任务了

异步调用

具体的用法为在app根目录下添加一个tasks.py文件,在文件中编写函数,给函数添加上shared_task装饰器即可

假设一个项目有如下目录结构

project
    - coffee
        - __init__.py
        - admin.py
        - app.py
        - models.py
        - tasks.py
        - tests.py
        - views.py
    - webapp
        - __init__.py
        - settings.py
        - urls.py
        - wsgi.py
    - manage.py

我们在名为coffee的app下添加文件tasks.pytasks.py文件内容如下

from celery import shared_task


@shared_task
def welcome():
    print('Welcome to ops-coffee.cn')

    return True

然后就可以在view或其他地方异步调用这个welcome函数了

from django.http import JsonResponse
from coffee.tasks import welcome


def index(request):
    welcome.delay()

    return JsonResponse({"state": 1, "message": "welcome"})

周期任务

以上只是将任务变成了异步,如果我们想要周期执行welcome任务,该如何操作呢?

1.首先需要执行migrate命令在数据库创建表

python manage.py migrate

2.然后修改settings.py文件中添加CELERYBEAT_SCHEDULE配置

CELERYBEAT_SCHEDULE = {
    'ops-coffee-1': {
        'task': 'coffee.tasks.welcome',
        'schedule': timedelta(seconds=20)
    },
    'ops-coffee-2': {
        'task': 'coffee.tasks.welcome',
        'schedule': crontab(hour=17, minute=30),
    }
}

以上配置详细的解释可以看文章:Django配置Celery执行异步任务和定时任务

3.最后启动beat

python manage.py celery beat -l info

至此welcome任务就会在设置的时间执行了

报错处理

PS: 启动beat时可能会有如下报错

TypeError: can't subtract offset-naive and offset-aware datetimes

这主要是因为时区引起的,请修改时区相关的配置

TIME_ZONE = 'Asia/Shanghai'
USE_TZ = False

动态添加

至此已经可以添加周期任务或定时任务了,但操作方式比较麻烦,还需要改动配置文件,说好的动态添加呢,别急这就来了,打开数据库会看到几张以djcelery_开头的表

其中对于动态添加计划任务有用的是计划任务时间定义表djcelery_crontabschedule,循环任务时间定义表djcelery_crontabschedule,以及任务表djcelery_periodictask

只需要在对应的表里插入数据就ok了,以我在自定义任务引擎Probius中的使用为例,前端会传递时间到view,view中关于定时任务的的大概处理逻辑如下

from djcelery.models import CrontabSchedule, PeriodicTask

with transaction.atomic():
    save_id = transaction.savepoint()

    try:
        _c, created = CrontabSchedule.objects.get_or_create(
            minute=str(minute),
            hour=str(hour),
            day_of_week=str(day_of_week),
            day_of_month=str(day_of_month),
            month_of_year=str(month_of_year)
        )

        dt = datetime.now().strftime('%Y%m%d%H%M%S')
        _p = PeriodicTask.objects.create(
            name=dt + '-' + '运维咖啡吧周期任务A',
            task='coffee.tasks.welcome',
            args=[37],
            enabled=True,
            crontab=_c
        )

        print('计划任务添加成功')
    except Exception as e:
        transaction.savepoint_rollback(save_id)
        print('添加计划任务失败,错误原因:' + str(e))

通过with transaction.atomic()创建一个事物,保证时间和任务都能同时添加成功,否则就回滚

然后通过get_or_create方法去检索循环任务时间定义表CrontabSchedule,如果有就获取到实例,没有就创建

最后往任务表PeriodicTask里插入任务,name为任务名称,具有唯一性,所以这里加了时间前缀防止重复,task为celery的task任务,字符串类型,在启动celery的时候就可以看到,args传给任务的参数,这里也可以用kwargs的字典形式传,就把args字段改成kwargs即可,enabled定义了这个任务是启动或关闭状态,crontab为循环任务时间实例,如果这里要用周期任务,就是每n秒n分循环执行这样的,只需要将crontab关联换成interval即可,那就需要事先往IntervalSchedule表里插入数据

还记得开头settings.py配置文件中我们配置的CELERYBEAT_SCHEDULER吗?就因为有这个配置,所以当数据表里的数据变更之后,celery的beat程序就能监听到从而在配置的时间触发worker去执行任务

至此,主要功能我们都已实现,django-celery的计划任务只能支持固定时间吗?其实不然,他支持的语法与linux下的crontab类似,像hour='*/3,9-18'这样的复杂语法也是支持的,在Probius中对于复杂语法我们也直接提供了更为灵活的自定义任务执行方式

为了便于使用,减少学习成本,这里就直接用了linux下crontab的格式,传到后端后解析成对应的时间写入数据库。有些小伙伴跟我说开发出来的工具没人用,或者是其他部门推不动,项目开发中的这些个细节考虑到位,尽量减少用户学习使用成本,做的足够好用、易用,还怕没有人用嘛