Python Flask with celery 脱离了应用程序上下文

我正在使用python Flask建立一个网站。一切进展顺利,现在我正在尝试实施celery。

在我尝试使用celery的flask-mail发送电子邮件之前,这种情况也很好。现在,我收到“在应用程序上下文之外工作”错误。

完整的追溯是

  Traceback (most recent call last):

File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 228, in trace_task

R = retval = fun(*args, **kwargs)

File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 415, in __protected_call__

return self.run(*args, **kwargs)

File "/home/ryan/www/CG-Website/src/util/mail.py", line 28, in send_forgot_email

msg = Message("Recover your Crusade Gaming Account")

File "/usr/lib/python2.7/site-packages/flask_mail.py", line 178, in __init__

sender = current_app.config.get("DEFAULT_MAIL_SENDER")

File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 336, in __getattr__

return getattr(self._get_current_object(), name)

File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 295, in _get_current_object

return self.__local()

File "/usr/lib/python2.7/site-packages/flask/globals.py", line 26, in _find_app

raise RuntimeError('working outside of application context')

RuntimeError: working outside of application context

这是我的邮件功能:

@celery.task

def send_forgot_email(email, ref):

global mail

msg = Message("Recover your Crusade Gaming Account")

msg.recipients = [email]

msg.sender = "Crusade Gaming stuff@cg.com"

msg.html = \

"""

Hello Person,<br/>

You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br />

If you did not request that your password be reset, please ignore this.

""".format(url_for('account.forgot', ref=ref, _external=True))

mail.send(msg)

这是我的celery文件:

from __future__ import absolute_import

from celery import Celery

celery = Celery('src.tasks',

broker='amqp://',

include=['src.util.mail'])

if __name__ == "__main__":

celery.start()

回答:

这是一个与flask应用程序工厂模式一起使用的解决方案,并且无需使用即可创建具有上下文的celery任务app.app_context()。在避免循环导入的同时获取该应用程序确实很棘手,但这可以解决问题。这是针对celery4.2的,这是撰写本文时的最新版本。

结构体:

repo_name/

manage.py

base/

base/__init__.py

base/app.py

base/runcelery.py

base/celeryconfig.py

base/utility/celery_util.py

base/tasks/workers.py

因此base是主要的应用程序包在这个例子中。在中,base/__init__.py我们创建celery实例,如下所示:

from celery import Celery

celery = Celery('base', config_source='base.celeryconfig')

该base/app.py文件包含flask应用程序工厂,create_app并注意init_celery(app, celery)其中包含:

from base import celery

from base.utility.celery_util import init_celery

def create_app(config_obj):

"""An application factory, as explained here:

http://flask.pocoo.org/docs/patterns/appfactories/.

:param config_object: The configuration object to use.

"""

app = Flask('base')

app.config.from_object(config_obj)

init_celery(app, celery=celery)

register_extensions(app)

register_blueprints(app)

register_errorhandlers(app)

register_app_context_processors(app)

return app

转到base/runcelery.py内容:

from flask.helpers import get_debug_flag

from base.settings import DevConfig, ProdConfig

from base import celery

from base.app import create_app

from base.utility.celery_util import init_celery

CONFIG = DevConfig if get_debug_flag() else ProdConfig

app = create_app(CONFIG)

init_celery(app, celery)

接下来,base/celeryconfig.py文件(作为示例):

# -*- coding: utf-8 -*-

"""

Configure Celery. See the configuration guide at ->

http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration

"""

## Broker settings.

broker_url = 'pyamqp://guest:guest@localhost:5672//'

broker_heartbeat=0

# List of modules to import when the Celery worker starts.

imports = ('base.tasks.workers',)

## Using the database to store task state and results.

result_backend = 'rpc'

#result_persistent = False

accept_content = ['json', 'application/text']

result_serializer = 'json'

timezone = "UTC"

# define periodic tasks / cron here

# beat_schedule = {

# 'add-every-10-seconds': {

# 'task': 'workers.add_together',

# 'schedule': 10.0,

# 'args': (16, 16)

# },

# }

现在在base/utility/celery_util.py文件中定义init_celery :

# -*- coding: utf-8 -*-

def init_celery(app, celery):

"""Add flask app context to celery.Task"""

TaskBase = celery.Task

class ContextTask(TaskBase):

abstract = True

def __call__(self, *args, **kwargs):

with app.app_context():

return TaskBase.__call__(self, *args, **kwargs)

celery.Task = ContextTask

对于以下方面的工人base/tasks/workers.py:

from base import celery as celery_app

from flask_security.utils import config_value, send_mail

from base.bp.users.models.user_models import User

from base.extensions import mail # this is the flask-mail

@celery_app.task

def send_async_email(msg):

"""Background task to send an email with Flask-mail."""

#with app.app_context():

mail.send(msg)

@celery_app.task

def send_welcome_email(email, user_id, confirmation_link):

"""Background task to send a welcome email with flask-security's mail.

You don't need to use with app.app_context() here. Task has context.

"""

user = User.query.filter_by(id=user_id).first()

print(f'sending user {user} a welcome email')

send_mail(config_value('EMAIL_SUBJECT_REGISTER'),

email,

'welcome', user=user,

confirmation_link=confirmation_link)

然后,你需要从文件夹内部repo_name以两个不同的cmd提示符启动celery beat和celery worker 。

在一个cmd提示符下,执行a celery -A base.runcelery:celery beat和另一个celery -A base.runcelery:celery worker

然后,执行需要flask上下文的任务。应该管用。

以上是 Python Flask with celery 脱离了应用程序上下文 的全部内容, 来源链接: utcz.com/qa/431591.html

回到顶部