如何在Celery任务中使用Flask-SQLAlchemy
我最近切换到芹菜3.0。在此之前,我使用Flask-Celery来将Celery与Flask集成。尽管它有很多问题,例如隐藏了一些强大的Celery功能,但它使我可以使用Flask应用程序的完整上下文,尤其是Flask-
SQLAlchemy。
在我的后台任务中,我正在处理数据和SQLAlchemy ORM来存储数据。Flask-
Celery的维护者已放弃对该插件的支持。该插件正在任务中腌制Flask实例,因此我可以完全访问SQLAlchemy。
我试图在我的task.py文件中复制此行为,但没有成功。您对如何实现这一目标有任何暗示吗?
回答:
更新:从那以后,我们就开始使用一种更好的方式来处理应用程序拆卸,并根据最新的flask文档中描述的模式在每个任务的基础上进行设置。
extensions.py
import flaskfrom flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
celery = FlaskCelery()
db = SQLAlchemy()
app.py
from flask import Flaskfrom extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.init_app(app)
return app
通过这种方式设置应用程序后,您可以运行和使用celery,而不必从应用程序上下文中显式运行它,因为如果需要,所有任务将自动在应用程序上下文中运行,而您不必明确担心任务后拆卸,这是一个重要的管理问题(请参见下面的其他回复)。
故障排除
那些不断得到的人with _celery.app.app_context(): AttributeError: 'FlaskCelery' object
has no attribute 'app'要确保:
- 将
celery
导入保持在app.py
文件级别。避免:
app.py
from flask import Flaskdef create_app():
app = Flask()
initiliaze_extensions(app)
return app
def initiliaze_extensions(app):
from extensions import celery, db # DOOMED! Keep celery import at the FILE level
db.init_app(app)
celery.init_app(app)
在您
flask run
使用之前开始使用芹菜工作者celery worker -A app:celery -l info -f celery.log
注意app:celery
,即从加载app.py
。
您仍然可以从扩展名导入来装饰任务,即from extensions import celery
。
下面的旧答案仍然有效,但不是一个干净的解决方案
我更喜欢通过在应用程序上下文中创建一个单独的文件来调用celery.start()来在应用程序上下文中运行所有celery。这意味着您的任务文件不必因上下文设置和拆卸而乱七八糟。它也很适合烧瓶的“应用程序工厂”模式。
extensions.py
from from flask.ext.sqlalchemy import SQLAlchemyfrom celery import Celery
db = SQLAlchemy()
celery = Celery()
task.py
from extensions import celery, dbfrom flask.globals import current_app
from celery.signals import task_postrun
@celery.task
def do_some_stuff():
current_app.logger.info("I have the application context")
#you can now use the db object from extensions
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
db.session.remove()
app.py
from extensions import celery, dbdef create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_appfrom extensions import celery
app = create_app()
if __name__ == '__main__':
with app.app_context():
celery.start()
以上是 如何在Celery任务中使用Flask-SQLAlchemy 的全部内容, 来源链接: utcz.com/qa/424729.html