Skip to content

Sanic 异步任务队列

如果使用的是 django 框架,肯定是使用 celery 作为异步任务队列了
这里介绍的是自实现的一个简单的异步任务队列(无需额外进程来运行)

异步任务

定义实现

python
@app.after_server_start
async def create_task_queue(app, loop):
    """创建任务队列"""
    app.ctx.queue = asyncio.Queue()
    for x in range(app.config.QUEUE_WORKERS):
        app.add_task(worker(f"Worker-{x}", app))
python
async def worker(name, app):
    print(f'{name} start ...')
    while True:
        job = await app.ctx.queue.get()
        app.add_task(job)
        size = app.ctx.queue.qsize()
        print(f"{name} is sleeping on the job for {job}. {size} remaining")
        await asyncio.sleep(1)
  • app.config.QUEUE_WORKERS 为 worker 的数量,我取值为 4
  • worker 一直在后台死循环,等待从队列中取出任务然后执行

使用方式

python
app.ctx.queue.put_nowait(function(*params))
app.ctx.queue.put_nowait(feishu_detail_page('sql', _id))
  • 直接往队列里添加函数调用即可,推荐直接使用 put_nowait

定时任务

定义实现

python
@app.after_server_start
async def create_task_queue(app, loop):
    """创建任务队列"""
    app.ctx.queue = asyncio.Queue()
    for x in range(app.config.QUEUE_WORKERS):
        app.add_task(worker(f"Worker-{x}", app))

    app.add_task(cron_job(app), name='cron')
python
async def cron_job(app):
    """定时任务
    每分钟从库中获取当前需要运行的计划任务
    """
    import time
    import importlib

    while True:
        now_ts = int(time.time())
        await asyncio.sleep(1)
        if now_ts % 60 != 0:
            continue
        print(now_ts, '分钟整...', time.strftime('%Y-%m-%d %H:%M'))
        res = app.ctx.db['crontab_jobs'].find({
            'is_done': False,
            'time': now_ts
        })
        async for r in res:
            params = []
            for param in r['params']:
                if param == 'app':
                    param = app
                params.append(param)
            func = r['function']
            print(f'开始执行计划任务:{func}, {params}')
            function = getattr(importlib.import_module('api.tasks'), func)
            app.ctx.queue.put_nowait(function(*params))
            await app.ctx.db['crontab_jobs'].update_one(
                {'_id': r['_id']},
                {'$set': {'is_done': True}}
            )
  • create_task_queue 函数中单独启动一个后台任务来运行启动定时 worker cron_job
  • 定时逻辑步骤:
    1. 每秒检测当前时间,判断是否为整分钟时间
    2. 在整分钟时间时,从数据库中取出当前整分钟时间的计划任务(使用整分钟一是不需要秒级别;二是每秒从库中查询、判断、执行一套下来可能耗时超过1秒,继而导致漏掉)
    3. 异步执行此计划任务(字符串变为函数使用的是 importlib 库)
    4. 更新计划任务的状态为已执行

使用方式

python
async def sql_execute_cron(app, _id, step, ts):
    """SQL 上线执行-定时执行"""
    now = datetime.datetime.now()
    data = {
        'name': 'SQL 定时执行',
        'function': 'sql_execute_now',
        'x_id': str(_id),
        'params': ['app', str(_id), step],
        'time': ts,
        'is_done': False,
        'create_at': now,
        'update_at': now,
    }
    await app.ctx.db['crontab_jobs'].insert_one(data)
  • 往库中插入数据即可
  • 考虑到使用的简易性,function 对应的函数最好是在同一个模块中,后使用 importlib 库来引入

还需考虑解决的问题

  • 重启服务时,如果保证当前的异步任务执行完成;而不是执行中途被杀掉

参考