Python Flask with celery 脱离了应用程序上下文
我正在使用python Flask建立一个网站。一切进展顺利,现在我正在尝试实施celery。
在我尝试使用celery的flask-mail发送电子邮件之前,这种情况也很好。现在,我收到“在应用程序上下文之外工作”错误。
完整的追溯是
Traceback (most recent call last): File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 228, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 415, in __protected_call__
return self.run(*args, **kwargs)
File "/home/ryan/www/CG-Website/src/util/mail.py", line 28, in send_forgot_email
msg = Message("Recover your Crusade Gaming Account")
File "/usr/lib/python2.7/site-packages/flask_mail.py", line 178, in __init__
sender = current_app.config.get("DEFAULT_MAIL_SENDER")
File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 336, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 295, in _get_current_object
return self.__local()
File "/usr/lib/python2.7/site-packages/flask/globals.py", line 26, in _find_app
raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context
这是我的邮件功能:
@celery.taskdef send_forgot_email(email, ref):
global mail
msg = Message("Recover your Crusade Gaming Account")
msg.recipients = [email]
msg.sender = "Crusade Gaming stuff@cg.com"
msg.html = \
"""
Hello Person,<br/>
You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br />
If you did not request that your password be reset, please ignore this.
""".format(url_for('account.forgot', ref=ref, _external=True))
mail.send(msg)
这是我的celery文件:
from __future__ import absolute_importfrom celery import Celery
celery = Celery('src.tasks',
broker='amqp://',
include=['src.util.mail'])
if __name__ == "__main__":
celery.start()
回答:
这是一个与flask应用程序工厂模式一起使用的解决方案,并且无需使用即可创建具有上下文的celery任务app.app_context()。在避免循环导入的同时获取该应用程序确实很棘手,但这可以解决问题。这是针对celery4.2的,这是撰写本文时的最新版本。
结构体:
repo_name/ manage.py
base/
base/__init__.py
base/app.py
base/runcelery.py
base/celeryconfig.py
base/utility/celery_util.py
base/tasks/workers.py
因此base是主要的应用程序包在这个例子中。在中,base/__init__.py
我们创建celery实例,如下所示:
from celery import Celerycelery = Celery('base', config_source='base.celeryconfig')
该base/app.py文件包含flask应用程序工厂,create_app并注意init_celery(app, celery)其中包含:
from base import celeryfrom base.utility.celery_util import init_celery
def create_app(config_obj):
"""An application factory, as explained here:
http://flask.pocoo.org/docs/patterns/appfactories/.
:param config_object: The configuration object to use.
"""
app = Flask('base')
app.config.from_object(config_obj)
init_celery(app, celery=celery)
register_extensions(app)
register_blueprints(app)
register_errorhandlers(app)
register_app_context_processors(app)
return app
转到base/runcelery.py内容:
from flask.helpers import get_debug_flagfrom base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)
接下来,base/celeryconfig.py
文件(作为示例):
# -*- coding: utf-8 -*-"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""
## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0
# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)
## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False
accept_content = ['json', 'application/text']
result_serializer = 'json'
timezone = "UTC"
# define periodic tasks / cron here
# beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'workers.add_together',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
# }
现在在base/utility/celery_util.py文件中定义init_celery :
# -*- coding: utf-8 -*-def init_celery(app, celery):
"""Add flask app context to celery.Task"""
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
对于以下方面的工人base/tasks/workers.py:
from base import celery as celery_appfrom flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User
from base.extensions import mail # this is the flask-mail
@celery_app.task
def send_async_email(msg):
"""Background task to send an email with Flask-mail."""
#with app.app_context():
mail.send(msg)
@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
"""Background task to send a welcome email with flask-security's mail.
You don't need to use with app.app_context() here. Task has context.
"""
user = User.query.filter_by(id=user_id).first()
print(f'sending user {user} a welcome email')
send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
email,
'welcome', user=user,
confirmation_link=confirmation_link)
然后,你需要从文件夹内部repo_name以两个不同的cmd提示符启动celery beat和celery worker 。
在一个cmd提示符下,执行a celery -A base.runcelery:celery beat
和另一个celery -A base.runcelery:celery worker
。
然后,执行需要flask上下文的任务。应该管用。
以上是 Python Flask with celery 脱离了应用程序上下文 的全部内容, 来源链接: utcz.com/qa/431591.html