Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:
Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis。所以我们需要先去安装这个软件。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,Redis,MongoDB等,这里我先去了解RabbitMQ,Redis。
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等,这里我先不去看它是如何存储的,就先选用Redis来存储任务执行结果。
一般通过启动一个或多个worker进程来部署Celery。
这些worker进程连接上消息代理(以下称之为broker)来获取任务请求。
broker随机将任务请求分发给worker。
通过调用Celery的API,用户生成一个任务请求,并且将这个请求发布给broker。
在worker完成任务后,将完成的任务信息发送给result store(设置的backend),从中获取信息。
通过启动新的worker进程并让这些进程连上broker,可以很方便的扩展worker池。
每个worker可以和其他的worker同步执行任务。
简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
快速:一个单进程的celery每分钟可处理上百万个任务
灵活: 几乎celery的各个组件都可以被扩展及自定制
pip3 install celery
from celery import Celery app = Celery(‘tasks‘, broker=‘redis://127.0.0.1‘, #这里面存放任务,worker去里面获取任务,将执行结果放入backend中 注意:由于我们安装的redis配置为本机监听,所以使用127.0.0.1才可,若是使用localhost可能无法连接 backend=‘redis://127.0.0.1‘) #从这里面获取我们任务执行的结果 @app.task def add(x, y): print("running...", x, y) return x + y @app.task def cmd(comm): print(comm) return comm
pip install eventlet
celery -A <任务文件> worker -l info -P eventlet
1.启动Celery Worker来开始监听并执行任务
celery -A cel worker -l info -P eventlet
D:\MyPython\day25\twisted_test>python Python 3.5.4 (v3.5.4:3f56838, Aug 8 2017, 02:17:05) [MSC v.1900 64 bit (AMD64 on win32 Type "help", "copyright", "credits" or "license" for more information. >>> from cel import add,cmd >>> t = add.delay(45,1) >>> t.get() 46
from celery import Celery import time app = Celery(‘tasks‘, broker=‘redis://127.0.0.1‘, backend=‘redis://127.0.0.1‘) @app.task def add(x, y): print("running...", x, y) return x + y @app.task def cmd(comm): time.sleep(10) return comm
>>> c = cmd.delay("ccc") >>> c.get() #会等待到10秒才会获取到值 ‘ccc‘
[2018-06-30 15:21:11,836: INFO/MainProcess] Task cel.cmd[ff9379e7-520e-41c2-ac1d -79400e042fb8] succeeded in 10.014999999999418s: ‘ccc‘ #会一直等待10秒才会返回给get
>>> c.get(timeout=1) #我们将任务delay交给远程后,远程已经开始执行了,不过当我们调用get时远程还是没有执行完毕而已 Traceback (most recent call last): File "C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\site-p ackages\celery\backends\async.py", line 255, in _wait_for_pending on_interval=on_interval): File "C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\site-p ackages\celery\backends\async.py", line 54, in drain_events_until raise socket.timeout() socket.timeout During handling of the above exception, another exception occurred: Traceback (most recent call last): File "<stdin>", line 1, in <module> File "C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\site-p ackages\celery\result.py", line 224, in get on_message=on_message, File "C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\site-p ackages\celery\backends\async.py", line 188, in wait_for_pending for _ in self._wait_for_pending(result, **kwargs): File "C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\site-p ackages\celery\backends\async.py", line 259, in _wait_for_pending raise TimeoutError(‘The operation timed out.‘) celery.exceptions.TimeoutError: The operation timed out.
>>> c = cmd.delay("cccde") >>> c.ready() False >>> c.ready() False >>> c.ready() False >>> c.ready() True >>> c.get() ‘cccde‘
celery multi start w1 -A proj -l info
celery multi restart w1 -A proj -l info
celery multi stop w1 -A proj -l info
celery multi stopwait w1 -A proj -l info
celProject 目录 ---celery.py #这个文件必须这样命名 ---tasks.py ---tasks2.py
from __future__ import absolute_import, unicode_literals #是说下面的celery是python安装包决定路径引入,而不是当前项目 from celery import Celery #celery是指python安装包决定路径引入,.celery是当前目录引入 app = Celery(‘proj‘, #celery APP名称 broker=‘redis://127.0.0.1‘, backend=‘redis://127.0.0.1‘, include=[‘celProject.tasks‘,‘celProject.tasks2‘]) #include引入的是当前项目下的任务,为列表,可以引入多个 # Optional configuration, see the application user guide. app.conf.update( #设置配置 result_expires=3600, #设置结果缓存时间 ) if __name__ == ‘__main__‘: app.start()
from __future__ import absolute_import, unicode_literals from .celery import app ##用到他的装饰器 @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
from __future__ import absolute_import, unicode_literals from .celery import app #用到他的装饰器 @app.task def cmd(comm): print("running cmd...") return comm @app.task def file_transfer(filename): print("send file")
celery -A celProject worker -l info -P eventlet
>>> from celProject import tasks,tasks2 >>> t2 = tasks2.cmd.delay(‘ff‘) >>> t2.get() ‘ff‘ >>> t2 = tasks.xsum.delay([1,3,5,44]) >>> t2.get() 53
celery -A 项目.具体任务 beat #启动任务调度器
celProject 目录 ---celery.py #这个文件必须这样命名 ---tasks.py ---tasks2.py ---ontime_task.py #任务调度器
from __future__ import absolute_import, unicode_literals from celery.schedules import crontab from .celery import app @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test(‘hello‘) every 10 seconds. sender.add_periodic_task(10.0, test.s(‘hello‘), name=‘add every 10‘) #.s就是相当于delay,发送到worker执行 # Calls test(‘world‘) every 30 seconds sender.add_periodic_task(30.0, test.s(‘world‘), expires=10) #默认是秒单位 # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), #可以通过crontab设置其他格式时间 test.s(‘Happy Mondays!‘), ) @app.task def test(arg): #定时执行的任务 print(arg)
celery -A celProject worker -l info -P eventlet #开启一个worker,一会任务调度器中任务由他执行
celery -A celProject.ontime_task beat -l debug #开启任务调度器,由于我们是同项目中app,所以我们需要在项目同级下启动该任务
from __future__ import absolute_import, unicode_literals from celery.schedules import crontab from .celery import app app.conf.beat_schedule = { ‘add-every-monday-morning‘: { ‘task‘: ‘celProject.tasks.add‘, #这里的任务名,我们需要和启动的worker显示的任务列表中任务一致 ‘schedule‘: crontab(hour=7, minute=30, day_of_week=1), ‘args‘: (16, 16), }, ‘add-every-5-seconds‘: { ‘task‘: ‘celProject.tasks.add‘, # 这里的任务名,我们需要和启动的worker显示的任务列表中任务一致 ‘schedule‘: 5, ‘args‘: (13, 26), }, ‘add-every-10-seconds‘: { ‘task‘: ‘celProject.tasks.mul‘, # 这里的任务名,我们需要和启动的worker显示的任务列表中任务一致 ‘schedule‘: 10, ‘args‘: (13, 26), }, } app.conf.timezone = "UTC" @app.task def test(arg): print(arg)
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the ‘celery‘ program. os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘CRM.settings‘) #需要和项目名一致 app = Celery(‘CRM‘) #这个是设置worker名称,随便写,与项目一致吧 # Using a string here means the worker don‘t have to serialize # the configuration object to child processes. # - namespace=‘CELERY‘ means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object(‘django.conf:settings‘, namespace=‘CELERY‘) # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): #测试任务,可以不要 print(‘Request: {0!r}‘.format(self.request))
from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def cmd(comm): print("running cmd...") return comm @shared_task def file_transfer(filename): print("send file")
任务创建完毕。
celery -A CRM worker -l info -P eventlet #是在settings.py文件的上级目录的同级去执行的启动命令,因为celery放在这个目录下面
pip install django-celery-beat
INSTALLED_APPS = ( ..., ‘django_celery_beat‘, )
python manage.py migrate
celery -A CRM beat -l info -S django #也是需要在settings的上级目录下执行
原文:https://www.cnblogs.com/ssyfj/p/9247254.html