【Python】django+redis+celery异步任务执行

一、安装redis

参考redis文件夹下:redis安装

二、django工程配置

1、安装依赖包

pip install celery

pip install celery-with-redis

pip install django-celery

2、配置

settings.py文件

python">import djcelery

#注册jdcelery

INSTALLED_APPS = [

... ,

'djcelery',

]

# celery 设置

# celery中间人 redis://redis服务所在的ip地址:端口/数据库号

BROKER_URL = 'redis://redis_ip:6379/0'

# celery结果返回,可用于跟踪结果 CELERY_RESULT_BACKEND = 'redis://redis_ip:6379/0'

# celery内容等消息的格式设置,这里使用pickle,如果不使用,会序列化报错

CELERY_ACCEPT_CONTENT = ['pickle', ] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'

# celery时区设置,使用settings中TIME_ZONE同样的时区 CELERY_TIMEZONE = TIME_ZONE

在工程目录下,创建celery.py文件

# coding:utf-8

from __future__ import absolute_import, unicode_literals

from celery import Celery

from django.conf import settings

import os

# 获取当前文件夹名,即为该Django的项目名

project_name = os.path.split(os.path.abspath('.'))[-1]

project_settings = '%s.settings' % project_name

# 设置环境变量

os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_settings)

# 实例化Celery,网上很多教程这里都是没有设置broker造成启动失败

app = Celery('tasks', broker='redis://redis_ip:6379/0', backend='redis://redis_ip:6379/0')

# 使用django的settings文件配置celery

app.config_from_object('django.conf:settings')

# Celery加载所有注册的应用

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

在工程目录的__init__.py文件中添加:

# 引入celery实例对象

# 一定要添加在文件最前面,否则会报错

from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = [celery_app]

在对应app的目录下创建tasks.py文件

注意:tasks.py文件一定要创建在应用根目录下,且文件名固定,在该文件里添加你对应的后台任务代码,并注解@task

import time

from celery import task, Celery

app = Celery('tasks', broker='redis://redis_ip:6379/0', backend='redis://redis_ip:6379/0')

#在这里添加你的后台任务方法代码,并注解task

@task

def add(a, b):

print("这是任务开始")

c = a + b

print(c)

time.sleep(10)

print("这是任务结束")

3、启动celery worker

在python manage.py shell命令中输入

celery -A project_name worker --pool=solo -l info

如下图,即为启动成功
【Python】django+redis+celery异步任务执行

4、测试

在tasks.py文件中加入测试代码,这里复用上面tasks.py代码中的add方法

在views.py文件中创建测试view方法:

from . import tasks

def add(request,*args,**kwargs):

tasks.add.delay(1, 2)

result = {'code': 0, 'msg': '这是一个后台任务'}

return JsonResponse(result)

在url中配置url路径

from django.urls import path

from . import views

urlpatterns = [ path('add', views.add, name="add") ]

启动django

然后调用对应url,发现响应只用63ms
【Python】django+redis+celery异步任务执行

在manage.py shell日志中静候后台任务执行,发现任务已经执行,耗时10s多(因为上述代码sleep了10s)
【Python】django+redis+celery异步任务执行
到这里,celery已成功运行。

三、问题积累

1、启动celery worker时,报链接失败

(1)可能是限制bind没有注释,仅允许本机访问;参考redis配置项

(2)保护模式为关闭,参考redis配置项

(3)需要鉴权

2、运行后台任务时,报错 kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)

https://blog.csdn.net/libing_thinking/article/details/78622943

kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted

content of type pickle (application/x-python-serialize)

需要将settings.py文件中修改celery配置

CELERY_ACCEPT_CONTENT = ['pickle', ]

3、运行后台任务时,报错AttributeError: 'str' object has no attribute 'items'

python版本为3.6

#貌似是redis版本高了,重新安装后得以解决 pip install redis==2.10.6

4、启动celery时报错:TypeError: can only concatenate tuple (not “NoneType”) to tuple

这个问题是由于安装包不全导致的,分别检查是否安装了以下安装包:

pip install celery

pip install celery-with-redis

pip install django-celery

以上是 【Python】django+redis+celery异步任务执行 的全部内容, 来源链接: utcz.com/a/74852.html

回到顶部