如何在Celery任务中使用Flask-SQLAlchemy

我最近切换到芹菜3.0。在此之前,我使用Flask-Celery来将Celery与Flask集成。尽管它有很多问题,例如隐藏了一些强大的Celery功能,但它使我可以使用Flask应用程序的完整上下文,尤其是Flask-

SQLAlchemy。

在我的后台任务中,我正在处理数据和SQLAlchemy ORM来存储数据。Flask-

Celery的维护者已放弃对该插件的支持。该插件正在任务中腌制Flask实例,因此我可以完全访问SQLAlchemy。

我试图在我的task.py文件中复制此行为,但没有成功。您对如何实现这一目标有任何暗示吗?

回答:

更新:从那以后,我们就开始使用一种更好的方式来处理应用程序拆卸,并根据最新的flask文档中描述的模式在每个任务的基础上进行设置。

extensions.py

import flask

from flask.ext.sqlalchemy import SQLAlchemy

from celery import Celery

class FlaskCelery(Celery):

def __init__(self, *args, **kwargs):

super(FlaskCelery, self).__init__(*args, **kwargs)

self.patch_task()

if 'app' in kwargs:

self.init_app(kwargs['app'])

def patch_task(self):

TaskBase = self.Task

_celery = self

class ContextTask(TaskBase):

abstract = True

def __call__(self, *args, **kwargs):

if flask.has_app_context():

return TaskBase.__call__(self, *args, **kwargs)

else:

with _celery.app.app_context():

return TaskBase.__call__(self, *args, **kwargs)

self.Task = ContextTask

def init_app(self, app):

self.app = app

self.config_from_object(app.config)

celery = FlaskCelery()

db = SQLAlchemy()

app.py

from flask import Flask

from extensions import celery, db

def create_app():

app = Flask()

#configure/initialize all your extensions

db.init_app(app)

celery.init_app(app)

return app

通过这种方式设置应用程序后,您可以运行和使用celery,而不必从应用程序上下文中显式运行它,因为如果需要,所有任务将自动在应用程序上下文中运行,而您不必明确担心任务后拆卸,这是一个重要的管理问题(请参见下面的其他回复)。

故障排除

那些不断得到的人with _celery.app.app_context(): AttributeError: 'FlaskCelery' object

has no attribute 'app'要确保:

  1. celery导入保持在app.py文件级别。避免:

app.py

from flask import Flask

def create_app():

app = Flask()

initiliaze_extensions(app)

return app

def initiliaze_extensions(app):

from extensions import celery, db # DOOMED! Keep celery import at the FILE level

db.init_app(app)

celery.init_app(app)

  1. 在您flask run使用之前开始使用芹菜工作者

    celery worker -A app:celery -l info -f celery.log

注意app:celery,即从加载app.py

您仍然可以从扩展名导入来装饰任务,即from extensions import celery

下面的旧答案仍然有效,但不是一个干净的解决方案

我更喜欢通过在应用程序上下文中创建一个单独的文件来调用celery.start()来在应用程序上下文中运行所有celery。这意味着您的任务文件不必因上下文设置和拆卸而乱七八糟。它也很适合烧瓶的“应用程序工厂”模式。

extensions.py

from from flask.ext.sqlalchemy import SQLAlchemy

from celery import Celery

db = SQLAlchemy()

celery = Celery()

task.py

from extensions import celery, db

from flask.globals import current_app

from celery.signals import task_postrun

@celery.task

def do_some_stuff():

current_app.logger.info("I have the application context")

#you can now use the db object from extensions

@task_postrun.connect

def close_session(*args, **kwargs):

# Flask SQLAlchemy will automatically create new sessions for you from

# a scoped session factory, given that we are maintaining the same app

# context, this ensures tasks have a fresh session (e.g. session errors

# won't propagate across tasks)

db.session.remove()

app.py

from extensions import celery, db

def create_app():

app = Flask()

#configure/initialize all your extensions

db.init_app(app)

celery.config_from_object(app.config)

return app

RunCelery.py

from app import create_app

from extensions import celery

app = create_app()

if __name__ == '__main__':

with app.app_context():

celery.start()

以上是 如何在Celery任务中使用Flask-SQLAlchemy 的全部内容, 来源链接: utcz.com/qa/424729.html

回到顶部