烧瓶,蓝图使用芹菜任务并获得循环输入

我有一个应用程序与蓝图和芹菜的代码是在这里:

config.py

import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or ''
    SQLALCHEMY_COMMIT_ON_TEARDOWN = True
    RECORDS_PER_PAGE = 40
    SQLALCHEMY_DATABASE_URI = ''
    CELERY_BROKER_URL = ''
    CELERY_RESULT_BACKEND = ''
    CELERY_RESULT_DBURI = ''
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {}

    @staticmethod
    def init_app(app):
        pass


class DevelopmentConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = True
    APP_HOME = ''
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://...'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }


class TestConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = False
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'


class ProdConfig(Config):
    DEBUG = False
    WTF_CSRF_ENABLED = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...celery'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://.../celery'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }

config = {
    'development': DevelopmentConfig,
    'default': ProdConfig,
    'production': ProdConfig,
    'testing': TestConfig,
}


class AppConf:
    """
    Class to store current config even out of context
    """
    def __init__(self):
        self.app = None
        self.config = {}

    def init_app(self, app):
        if hasattr(app, 'config'):
            self.app = app
            self.config = app.config.copy()
        else:
            raise TypeError

init .py:import os

from flask import Flask
from celery import Celery
from config import config, AppConf

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    config[config_name].init_app(app)
    app_conf.init_app(app)

    # Connect to Staging view
    from staging.views import staging as staging_blueprint
    app.register_blueprint(staging_blueprint)

    return app


def make_celery(app=None):
    app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
    celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
    celery.conf.update(app.conf)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks.py:从应用程序导入make_celery,app_conf

cel = make_celery(app_conf.app)

@cel.task
def send_realm_to_fabricdb(realm, form):
    some actions...

这里是问题:蓝图“分段”使用任务send_realm_to_fabricdb,因此它使得: from tasks import send_realm_to_fabricdb比当我刚运行应用程序时,一切顺利但是,当我试图运行芹菜: celery -A app.tasks worker -l info --beat ,它会转到cel = make_celery(app_conf.app) ,获得app = None并尝试再次创建应用程序:注册一个蓝图...所以我有循环导入。 你能告诉我如何打破这个循环? 提前致谢。


我没有代码来尝试这一点,但我认为如果将Celery实例的创建从tasks.py并放入create_app函数中,情况会更好,以便它同时发生在app实例被建造。

您在-A选项中给Celery worker的参数不需要任务,Celery只需要celery对象,例如,您可以创建一个单独的启动脚本,例如celery_worker.py ,它调用create_app来创建appcel ,然后把它交给工人为-A celery_worker.cel ,而根本不涉及蓝图。

希望这可以帮助。

链接地址: http://www.djcxy.com/p/84541.html

上一篇: Flask, blueprints uses celery task and got cycle import

下一篇: replace a string not located between two specific words