运维咖啡吧

享受技术带来的乐趣,体验生活给予的感动

任务系统之手动审批

今日阳光明媚,心情也跟着好了许多,坐在安静的图书馆里任阳光透过窗户照在脸上,暖洋洋很舒服

入职新公司近两月,到目前为止感觉还不错,重写了整个任务系统并推进了项目接入落地,昨天刚跟团队的小伙伴们吃过饭,也算是给这个阶段性的成果一点奖励。进入新团队我主要负责整个DevOps相关工具的开发及落地,对于开发之前我有过相当丰富的经验非常自信没有问题,但对于落地说实在的我之前还是有点担忧的,毕竟大多数人都习惯于自己的舒适圈不愿改变,见过太多太多DevOps推进失败的例子,但我幸得上苍眷顾,遇到了一群非常靠谱的小伙伴,到目前为止落地还算顺利

之所以说是重写了任务系统,是因为我上份工作写过任务系统Probius,到了新公司的第一项工作就是要解决发布部署问题,把原本分散于各个不同开源工具的部署方案给统一,那任务系统首当其冲的要承担重任了,我跟团队内的小伙伴们都逐一聊了聊,了解了当前的项目架构跟日常管理方式,发现曾经写过的任务系统并不能很好的满足当下的需求,所以重新设计,从前端模板到后端逻辑都进行了全新的开发

全新开发的任务系统加入了诸多非常好用的功能,例如资源隔离、并行任务、数据隔离、定时任务、周期任务,以及这篇文章将要讲到的手动审批等等。这里顺便说一下,我写DevOps相关的文章都倾向于讲思路和方法,少有代码,我觉得思路比代码更重要,况且代码属于公司,除非得到允许,否则不允许也不能够公开分享,所以各位也不要问我有没有源码,肯定没有

言归正传,接下来讲一下手动审批,在开始之前需要先看下这篇文章了解下任务系统的基本设计:Probius:一个功能强大的自定义任务系统,虽然我整个重写了,但基本的设计思路没太大变化,只是将命令给改成了子任务,现在全新的任务系统三个核心模块变成了子任务、模板和任务

子任务: 任务系统的最小粒度,子任务有多种类型,例如命令、脚本、通知以及审批

模板: 一组子任务的集合,也可以理解为任务流,主要对模板下的子任务进行编排

任务: 模板+参数=任务,模板是静态的不可执行,而任务是动态的可以执行

手动审批就是子任务中的审批类型任务,在一个任务执行的过程中,有需要人工确认的地方就用到了审批,拿游戏维护更新为例来说,游戏更新完成后需要QA介入测试,测试通过才能对外开放,那在整个任务流中就需要一个审批任务:QA测试通过,而这在之前的任务系统中是无法实现的

单Celery任务

任务系统的核心逻辑是:模板包含多个子任务,任务关联模板,执行任务时实际上就是按照编排好的顺序依次执行模板下的子任务

之所以之前的任务系统无法实现手动审批任务,主要是因为任务流是通过单一Celery任务来处理的,我们看一下之前的代码,用户执行任务,进入taskrun函数,先入库做记录,然后直接调用名为run_task的Celery异步任务执行

def taskrun(request, data):
    # 入库做记录
    _t = tasklog = TaskLog.objects.create(
        name=data.get('task_name'),
        template_id=int(data.get('template_id'))
    )

    # 调用celery任务异步执行
    run_task.delay(tasklog.id)
    return 1, '任务正在执行中,请稍后查看状态', tasklog.id

run_task中会先获取tasklog,然后拿到tasklog关联的模板,从模板里取出全部子任务,循环执行,直到全部执行成功

@shared_task
def run_task(tid):
    try:
        tasklog = TaskLog.objects.get(id=tid)
    except TaskLog.DoesNotExist:
        return True, '当前要执行的Task不存在,tid:%d' % tid

    # 获取所有子任务
    subtasks = tasklog.template.get_temptasks

    # 循环执行子任务
    for sub in subtasks:
        Logger(tid).add('开始执行子任务 - %s\n' % sub.subtask.name)

    # 全部子任务执行完成,修改tasklog状态
    tasklog.state = 1
    tasklog.save()

    return True, 'success'

一个任务流实际上就是启动了一个celery异步任务去运行,异步任务内通过for循环依次执行子任务,而对于这种模式来说想要加入可以中断执行的手动审批任务就比较困难了

多Celery任务

在任务流执行的过程中遇到手动审批类型的子任务,要先中断执行等待用户审批通过才能继续往下执行,既然单Celery任务无法实现,那很自然的就想到了对任务流的执行由单Celery任务进行拆分,控制任务流执行的逻辑拿出来,仅仅在遇到可以自动执行的子任务时才丢给celery去处理,这样整个任务流就可以随意中断了

看下下边的代码,用户执行会新进入TaskRun类,TaskRun类会进行一系列的基础处理,例如参数解析,之后交给task_run函数继续处理

class TaskRun(RetrieveUpdateDestroyView):
    model = Task
    permission = {'post': 'engine.task_run'}

    def post(self, request, pk):
        task = self.model.objects.get(id=int(pk))

        # 获取post提交数据并转为字典
        validated_data = post_data_to_dict(request)

        # 获取到前台传入的参数与任务默认参数合并
        args = dict(literal_eval(task.args) if task.args else {}, **validated_data)

        # 调用任务执行函数执行任务
        state, message, tid = task_run(request, task, args)
        return JsonResponse({'state': state, 'message': message, 'tid': tid})

task_run函数则会进行参数校验,数据入库等等一系列的逻辑处理,然后传给了exec_subtask函数去处理

def task_run(request, task, args):
    # 判断subtask所需参数是否都存在
    success, cmd_args = diff_args(_all_subtasks_args, args)
    if not success:
        return 0, '参数匹配失败:' + str(cmd_args), 0

    # 添加tasklog记录
    _t = tasklog = TaskLog.objects.create(
        create_user=request.user,
        task=task,
    )

    exec_subtask(tasklog)
    return 1, '任务正在执行中,请稍后查看状态', tasklog.id

exec_subtask函数则会获取下一步要执行的子任务,然后交给名为run_subtask的Celery任务去异步执行

def exec_subtask(tasklog, subtasklog=None):
    # 获取所有子任务
    subtasklogs = tasklog.get_subtasklogs

    if subtasklog:
        # 如果所传subtask为任务下的最后一个子任务,则结束整个任务
        if subtasklog == subtasklogs.last():
            subtasklog.tasklog.state = 1
            subtasklog.tasklog.save()

            return True, 'Finished'
        else:
            # 否则获取下一条要执行的subtask
            next_subtask = subtasklogs.filter(id__gt=subtasklog.id).first()
    else:
        # 如果没有传subtask则默认取第一条
        next_subtask = subtasklogs.first()

    # 将要执行的子任务传给run_subtask异步执行
    celery_task = run_subtask.delay(next_subtask.id)

    # 记录celery任务的ID,终止任务执行时使用
    next_subtask.celery_task_id = celery_task
    next_subtask.save()

    return True, 'Next'

run_subtask则会判断任务类型,如果是命令或脚本之类的任务则会自动执行,如果是审批就发送审批通知并中止任务执行

@shared_task
def run_subtask(tid):
    try:
        subtasklog = SubTaskLog.objects.get(id=tid)
    except SubTaskLog.DoesNotExist:
        return True, '当前要执行的Task不存在,tid:%d' % tid

    Logger(tid).add('开始执行子任务 - %s\n' % subtasklog.temptask.subtask.name)

    # 修改任务状态,当任务类型为审批时改为7待审批,否则为9执行中
    subtasklog.state = 7 if subtasklog.type == 6 else 9
    subtasklog.save()

    if subtasklog.type == 1 or subtasklog.type == 2:
        # 如果是命令或脚本类型子任务则调用执行系统执行

    if subtasklog.type == 6:
        # 如果是审批类型子任务,则修改子任务状态为待审批,等待后续审批
        Logger(tid, 7).add('\n通知已发送,等待审批\n通知用户:%s' % _notify_users)

        # 中止流程,待审批之后再进行后续流程
        return True, 'Notify Done, Waitting Approve'

    # 最后继续交给exec_subtask去循环执行剩余子任务
    exec_subtask(subtasklog.tasklog, subtasklog)

    return True, 'Subtask Done, Next Subtask'

此时有审批权限的用户会收到审批通知,在页面上可以对任务进行审批

审批请求会发送给SubTasklogApprove类,SubTasklogApprove记录用户审批结果,如果审批通过则继续调用exec_subtask执行接下来的流程,若审批拒绝就直接结束整个流程了

class SubTasklogApprove(RetrieveUpdateDestroyView):
    model = SubTaskLog

    def put(self, request, pk):
        _t = self.model.objects.get(id=pk)

        _t.state = postdata.get('state')
        _t.approve_user = request.user
        _t.details += _details
        _t.save()

        if _t.state == 11:
            # 审批通过继续走接下来的流程
            exec_subtask(_t.tasklog, _t)

        if _t.state == 12:
            # 审批拒绝则直接结束整个流程
            _t.tasklog.state = 12
            _t.tasklog.save()

        return JsonResponse({"state": 1, "message": "审批完成"})

至此整个审批任务流完成了,对于以上代码只保留了主线逻辑,做了大量精简,所以不要吹毛求疵了,主要目的是介绍清楚逻辑,你有收获吗