Celery-异步任务处理

Posted on 三 15 2月 2017 in python

之前有一个小站,要备份数据,打算用celery来做一下。 有几个需求:

  • 每隔6小时备份数据

  • 备份成功与失败时邮件通知

  • 把多个任务分解成小任务

  • 以服务方式启动celery

  • 备份数据加密之后上传到百度网盘

Celery的结构图如下:

Celery

  • task producer
    就是具体要执行的任务,也就是一个一个的task

  • broker
    消息发送和接收的中间件,官方推荐rabbitmq,所以我也就用这个

  • celery worker
    消息执行的一方,负责处理消息中间件发过来的任务,返回执行结果。celery支持分布式部署和扩展,可以在多个节点增加 Celery worker 的数量来增加系统的高可用性。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。

  • celery beat
    Celery beat 进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。定时备份就采用这个

  • result
    celery支持把结果保存在数据库里,在django中有django-celery扩展可用,不过这里用不到,暂不描述.

主要步骤

  • 配置celery,使用2个worker, 一个负责备份,发邮件,一个负责上传。
    新建一个项目celery_test, celery.py包含以下:
app = Celery('celery_test', broker='pyamqp://', backend='rpc://', include=['celery_test.tasks'])
app.conf.update(
    task_routes={
      'celery_test.tasks.upload_database': {'queue': 'upload_db', 'deliver_mod': 'transient'},
      'celery_test.tasks.backup_database': {'queue': 'backup_db', 'deliver_mod': 'transient'},
      'celery_test.tasks.success_handler': {'queue': 'backup_db', 'deliver_mod': 'transient'},
      'celery_test.tasks.error_handler': {'queue': 'backup_db', 'deliver_mod': 'transient'},
      'celery_test.tasks.send_mail_ok': {'queue': 'backup_db', 'deliver_mod': 'transient'},
      'celery_test.tasks.send_mail_fail': {'queue': 'backup_db', 'deliver_mod': 'transient'},
    },
)
  • task.py中新增上面6个任务

其中success_handler 用到了chain, chain方式是官方推荐的多个任务顺序执行时采用的方式.

@app.task
def success_handler(args):
    print 'success callback:', args
    task_chain = send_mail_ok.s(args) | upload_database.s()
    task_chain()
  • 配置任务定时执行
    定时任务的实现方式有多种,我采用其中一种
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from .tasks import backup_database, success_handler, error_handler
    s = backup_database.s()
    s.link(success_handler.s())
    s.link_error(error_handler.s())
    sender.add_periodic_task(43200.0, s, name='### backup ent data task ###')

linkbackup_database成功的回调,link_error是失败的回调。432000.0`是秒,表示 126060,也就是每隔12小时执行一次

  • 测试celery 执行命令的目录要与celery_test目录同级,不是task.py的目录 最直接的方法是
celery multi start 4 -A celery_test -l INFO -B -Q:1,2 backup_db -Q:3,4 upload_db --logfile=/tmp/celery.log

配置了4个worker, 2个queue。1,2个worker接收backup_db队列的消息; 3、4个worker接收upload_db队列的消息 日志是在 /tmp/celery.log

  • 以服务方式运行 上面的方式不适合生产,只能用于调试。 新建3个文件: /etc/init.d/celeryd, /etc/init.d/celerybeat, /etc/default/celeryd
    第1个文件是celery worker启动文件,第2个文件是celery beat启动文件, 第3个是celery的配置文件 文件内容参考celery官方参考文件地址
    主要修改下面几个地方
# /etc/default/celeryd
CELERYD_NODES="worker1_backup_ent worker2_upload_ent"
#   but you can also start multiple and configure settings
#   for each in CELERYD_OPTS
#CELERYD_NODES="worker1 worker2 worker3"
#   alternatively, you can specify the number of nodes to start:
#CELERYD_NODES=10

# Absolute or relative path to the 'celery' command:
CELERY_BIN="/home/maemo/.local/bin/celery"
#CELERY_BIN="/virtualenvs/def/bin/celery"

# App instance to use
# comment out this line if you don't use an app
CELERY_APP="celery_test"
# or fully qualified:
#CELERY_APP="proj.tasks:app"

# Where to chdir at start.
CELERYD_CHDIR="/home/maemo/workspace/python"

# Extra command-line arguments to the worker
CELERYD_OPTS="--time-limit=300 -c 2 -Q:worker1_backup_ent backup_db -Q:worker2_upload_ent upload_db"
# 不能加 -B选项,celery运行时,只能有一个beat实例,否则任务会重复添加执行
#CELERYD_OPTS="--time-limit=300 -c 2 -B -Q:worker1_backup_ent backup_db -Q:worker2_upload_ent upload_db"
# Configure node-specific settings by appending node name to arguments:
#CELERYD_OPTS="--time-limit=300 -c 8 -c:worker2 4 -c:worker3 2 -Ofair:worker1"

# Set logging level to DEBUG
#CELERYD_LOG_LEVEL="DEBUG"

# %n will be replaced with the first part of the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"

# Workers should run as an unprivileged user.
#   You need to create this user manually (or you can choose
#   a user/group combination that already exists (e.g., nobody).
CELERYD_USER="maemo"

启动任务

# ubuntu
/etc/init.d/celeryd start
/etc/init.d/celerybeat start

log和pid文件分别在/var/log/celery, /var/run/celery中

  • 其它
    4.x的版本在启动时可能会crash,官方目前还未合入,可以看这里
    目前我打算把spider的定时任务也放在里面
    上传百度网盘,可以用bypy这个库,非常方便
    发邮件,我用的是mysubmail,每天免费500封普通邮件。用它来做通知邮件非常合适。

参考链接:
http://docs.celeryproject.org/en/latest/index.html
http://www.ibm.com/developerworks/cn/opensource/os-cn-celery-web-service/index.html#ibm-pcon
https://pythad.github.io/articles/2016-12/how-to-run-celery-as-a-daemon-in-production
https://github.com/celery/django-celery/issues/215
http://emadmokhtar.com/how-to-run-celery-as-daemon.html