烧瓶,蓝图使用芹菜任务并获得循环输入
我有一个应用程序与蓝图和芹菜的代码是在这里:
config.py
import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or ''
SQLALCHEMY_COMMIT_ON_TEARDOWN = True
RECORDS_PER_PAGE = 40
SQLALCHEMY_DATABASE_URI = ''
CELERY_BROKER_URL = ''
CELERY_RESULT_BACKEND = ''
CELERY_RESULT_DBURI = ''
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {}
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
WTF_CSRF_ENABLED = True
APP_HOME = ''
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
CELERY_BROKER_URL = 'sqla+mysql://...'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://...'
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
'send-email-every-morning': {
'task': 'app.workers.tasks.send_email_task',
'schedule': crontab(hour=6, minute=15),
},
}
class TestConfig(Config):
DEBUG = True
WTF_CSRF_ENABLED = False
TESTING = True
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
class ProdConfig(Config):
DEBUG = False
WTF_CSRF_ENABLED = True
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
CELERY_BROKER_URL = 'sqla+mysql://...celery'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://.../celery'
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
'send-email-every-morning': {
'task': 'app.workers.tasks.send_email_task',
'schedule': crontab(hour=6, minute=15),
},
}
config = {
'development': DevelopmentConfig,
'default': ProdConfig,
'production': ProdConfig,
'testing': TestConfig,
}
class AppConf:
"""
Class to store current config even out of context
"""
def __init__(self):
self.app = None
self.config = {}
def init_app(self, app):
if hasattr(app, 'config'):
self.app = app
self.config = app.config.copy()
else:
raise TypeError
init .py:import os
from flask import Flask
from celery import Celery
from config import config, AppConf
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
app_conf.init_app(app)
# Connect to Staging view
from staging.views import staging as staging_blueprint
app.register_blueprint(staging_blueprint)
return app
def make_celery(app=None):
app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
celery.conf.update(app.conf)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
tasks.py:从应用程序导入make_celery,app_conf
cel = make_celery(app_conf.app)
@cel.task
def send_realm_to_fabricdb(realm, form):
some actions...
这里是问题:蓝图“分段”使用任务send_realm_to_fabricdb,因此它使得: from tasks import send_realm_to_fabricdb
比当我刚运行应用程序时,一切顺利但是,当我试图运行芹菜: celery -A app.tasks worker -l info --beat
,它会转到cel = make_celery(app_conf.app)
,获得app = None并尝试再次创建应用程序:注册一个蓝图...所以我有循环导入。 你能告诉我如何打破这个循环? 提前致谢。
我没有代码来尝试这一点,但我认为如果将Celery实例的创建从tasks.py
并放入create_app
函数中,情况会更好,以便它同时发生在app
实例被建造。
您在-A
选项中给Celery worker的参数不需要任务,Celery只需要celery对象,例如,您可以创建一个单独的启动脚本,例如celery_worker.py
,它调用create_app
来创建app
和cel
,然后把它交给工人为-A celery_worker.cel
,而根本不涉及蓝图。
希望这可以帮助。
链接地址: http://www.djcxy.com/p/84541.html上一篇: Flask, blueprints uses celery task and got cycle import
下一篇: replace a string not located between two specific words