首页 > 其他 > 详细

celery+django+redis使用介绍

时间:2018-10-19 17:10:12      阅读:223      评论:0      收藏:0      [点我收藏+]

版本:

  • celery==3.1.25
  • django==1.8.6
  • redis==2.10.6

安装:

  • 进入虚拟环境(虚拟环境创建不同,进入方式不同)
  • pip install celery==3.1.25(以celery安装为例,其他安装方式相同)

运行环境:

  • window10(celery4.0以后不支持windows)
  • linux

目录结构:
技术分享图片

  • 最外面的test1为项目名称
  • 里面的test1与app同级目录,里面都是一些配置(其中celery.py放在其中)。
  • app2为项目中的一个app
  • tasks.py在app2中,这个tasks.py只针对app2使用。若其他app也有celery任务。建立同样的文件名即可。

运行celery,需要的几个文件:

  • test1下的celery.py,代码如下(里面有介绍):
技术分享图片
 1 from __future__ import absolute_import, unicode_literals
 2 import os
 3 from django.conf import settings
 4 from celery import Celery
 5 
 6 
 7 #设置 Django 的配置文件
 8 os.environ.setdefault(DJANGO_SETTINGS_MODULE, test.settings)
 9 
10 # 创建 celery 实例
11 app = Celery(test1)
12 
13 # Using a string here means the worker will not have to
14 # pickle the object when using Windows.
15 app.config_from_object(django.conf:settings)
16 
17 # 搜索所有 app 中的 tasks
18 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
19  
20 @app.task(bind=True)
21 def debug_task(self):
22 print(Request: {0!r}.format(self.request))
View Code
  • test1下的settings.py,代码如下:
技术分享图片
 1 """原来的django的settings内容"""
 2 
 3 #celery config
 4 #消息中间件(使用redis),消息代理,用于发布者传递消息给消费者
 5 BROKER_URL = redis://127.0.0.1:6379
 6 #消息结果返回中间件(使用redis),用于存储任务执行结果
 7 CELERY_RESULT_BACKEND = redis://127.0.0.1:6379
 8 #允许的内容类型,
 9 CELERY_ACCEPT_CONTENT = [json]
10 #任务的序列化方式
11 CELERY_TASK_SERIALIZER = json
12 #任务结果的序列化方式
13 CELERY_RESULT_SERIALIZER = json
14 #celery时区,定时任务使用
15 CELERY_TIMEZONE = Asia/Shanghai
16 from datetime import timedelta
17 #定时任务处理,使用的schedule,里面的task,写上任务的“路径”,schedule设置时间,args设置参数。
18 CELERYBEAT_SCHEDULE = {
19 add_every_10_seconds: {
20 task: app2.tasks.add,
21 schedule: timedelta(seconds=10),
22 args: (4,4)
23 },
24 }
View Code
  • test1下的__init__.py,代码如下:

 

技术分享图片
1 from __future__ import absolute_import
2 from .celery import app as celery_app
3 # 这是为了确保在django启动时启动 celery
View Code
  • app2下的tasks.py,这个tasks文件只针对该app2使用,其他app可以根据实际情况添加tasks.py文件,里面写相应的任务代码。代码如下:
技术分享图片
1 # -*- coding:utf-8 -*-
2 from __future__ import absolute_import
3 from auto_model_platform.celery import app
4 
5 @app.task
6 def add(x, y):
7     time.sleep(30)
8     print("running...", x, y)
9     return x + y
View Code

 

调用tasks.py内的函数:

  • 在app2.views.py中的某个视图函数中直接调用,比如调用tasks.py中的add(x,y)函数,这样调用。代码如下:
技术分享图片
 1 from app2.tasks import add
 2 
 3 class TestView(View):
 4      def get(self, request):
 5 
 6     """其它逻辑"""
 7 
 8     #celery处理的其它任务(异步处理),下面这个代码,celery会去处理,django直接执行下面的其它逻辑
 9     r = add.delay(x,y)
10     task_id = r.id
11 
12      """其它逻辑"""
13 
14     return JsonResponse({"data":"123"})
View Code
  • 通过task_id可以查询celery异步处理完的结果
技术分享图片
 1 from test1.celery import app
 2 status = app.AsyncResult(task_id).status
 3 result = app.AsyncResult(task_id).result
 4 
 5 #状态有这几种情况
 6 CELERY_STATUS = {
 7     PENDING: 等待开始,
 8     STARTED: 任务开始,
 9     SUCCESS: 成功,
10     FAILURE: 失败,
11     RETRY: 重试,
12     REVOKED: 任务取消,
13 }
View Code

 

服务器启动celery worker(消费者)任务,和定时任务:

  • 打开项目的虚拟环境,进入项目名称目录下,即跟app同一级目录下运行:
  • 定时任务:celery -A auto_model_platform worker -l info --beat
  • worker任务:celery -A auto_model_platform worker -l info

  

celery+django+redis使用介绍

原文:https://www.cnblogs.com/sunweiyang/p/9817444.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!