Sanic 异步任务队列
如果使用的是 django 框架,肯定是使用 celery 作为异步任务队列了
这里介绍的是自实现的一个简单的异步任务队列(无需额外进程来运行)
异步任务
定义实现
python
@app.after_server_start
async def create_task_queue(app, loop):
"""创建任务队列"""
app.ctx.queue = asyncio.Queue()
for x in range(app.config.QUEUE_WORKERS):
app.add_task(worker(f"Worker-{x}", app))
python
async def worker(name, app):
print(f'{name} start ...')
while True:
job = await app.ctx.queue.get()
app.add_task(job)
size = app.ctx.queue.qsize()
print(f"{name} is sleeping on the job for {job}. {size} remaining")
await asyncio.sleep(1)
app.config.QUEUE_WORKERS
为 worker 的数量,我取值为 4- worker 一直在后台死循环,等待从队列中取出任务然后执行
使用方式
python
app.ctx.queue.put_nowait(function(*params))
app.ctx.queue.put_nowait(feishu_detail_page('sql', _id))
- 直接往队列里添加函数调用即可,推荐直接使用
put_nowait
定时任务
定义实现
python
@app.after_server_start
async def create_task_queue(app, loop):
"""创建任务队列"""
app.ctx.queue = asyncio.Queue()
for x in range(app.config.QUEUE_WORKERS):
app.add_task(worker(f"Worker-{x}", app))
app.add_task(cron_job(app), name='cron')
python
async def cron_job(app):
"""定时任务
每分钟从库中获取当前需要运行的计划任务
"""
import time
import importlib
while True:
now_ts = int(time.time())
await asyncio.sleep(1)
if now_ts % 60 != 0:
continue
print(now_ts, '分钟整...', time.strftime('%Y-%m-%d %H:%M'))
res = app.ctx.db['crontab_jobs'].find({
'is_done': False,
'time': now_ts
})
async for r in res:
params = []
for param in r['params']:
if param == 'app':
param = app
params.append(param)
func = r['function']
print(f'开始执行计划任务:{func}, {params}')
function = getattr(importlib.import_module('api.tasks'), func)
app.ctx.queue.put_nowait(function(*params))
await app.ctx.db['crontab_jobs'].update_one(
{'_id': r['_id']},
{'$set': {'is_done': True}}
)
- 在
create_task_queue
函数中单独启动一个后台任务来运行启动定时 workercron_job
- 定时逻辑步骤:
- 每秒检测当前时间,判断是否为整分钟时间
- 在整分钟时间时,从数据库中取出当前整分钟时间的计划任务(使用整分钟一是不需要秒级别;二是每秒从库中查询、判断、执行一套下来可能耗时超过1秒,继而导致漏掉)
- 异步执行此计划任务(字符串变为函数使用的是
importlib
库) - 更新计划任务的状态为已执行
使用方式
python
async def sql_execute_cron(app, _id, step, ts):
"""SQL 上线执行-定时执行"""
now = datetime.datetime.now()
data = {
'name': 'SQL 定时执行',
'function': 'sql_execute_now',
'x_id': str(_id),
'params': ['app', str(_id), step],
'time': ts,
'is_done': False,
'create_at': now,
'update_at': now,
}
await app.ctx.db['crontab_jobs'].insert_one(data)
- 往库中插入数据即可
- 考虑到使用的简易性,
function
对应的函数最好是在同一个模块中,后使用importlib
库来引入
还需考虑解决的问题
- 重启服务时,如果保证当前的异步任务执行完成;而不是执行中途被杀掉