最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【已解决】Flask中如何用工厂模式初始化Celery

Flask crifan 252浏览 0评论
折腾:
【未解决】用蓝图和工厂模式去优化现有Flask项目代码结构
期间,需要把之前的直接都写在app.py中的Celery的初始化:
celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celeryApp.conf.update(app.config)
log.info("celeryApp=%s", celeryApp)

#----------------------------------------
# Celery tasks
#----------------------------------------

# @celeryApp.task()
@celeryApp.task
# @celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    global log

    log.info("deleteTmpAudioFile: filename=%s", filename)

    audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
    # audioTmpFolder = "tmp/audio"
    log.info("audioTmpFolder=%s", audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
    log.info("curFolderAbsPath=%s", curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        log.info("Ok to delete file %s", tempAudioFullname)
    else:
        log.warning("No need to remove for not exist file %s", tempAudioFullname)

# log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile)
# log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name)
# log.info("celeryApp.tasks=%s", celeryApp.tasks)

@celeryApp.task
def celeryRefreshAzureSpeechToken():
    """celery's task: refreshAzureSpeechToken"""
    refreshAzureSpeechToken()

@celeryApp.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: sender=%s", sender)

    sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                             celeryRefreshAzureSpeechToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")
其中,celery还调用了其他一些函数,比如上面的refreshAzureSpeechToken
也要想办法,如何更好的合并进去
flask celery factory
zenyui/celery-flask-factory: Implementing Celery within a Flask application factory
Celery and the Flask Application Factory Pattern – miguelgrinberg.com
https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern
from celery import Celery
from config import config, Config

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)

def create_app(config_name):
    # ...
    celery.conf.update(app.config)
    # ...
    return app
还需要导读导入config,感觉也不是很完美
而且此处的__name__,估计不一定是到时候的app.name
Using Celery with Flask Factories
Flask with create_app, SQLAlchemy and Celery – Stack Overflow
python – Celery/Flask Receiving unregistered task of type (App Factory + Blueprints) – Stack Overflow
python – Celery using default broker instead of reddis. Flask + Celery + Factory pattern – Stack Overflow
python – Flask and Celery large application structure – Stack Overflow
在Celery中使用Flask的上下文
https://jiayi.space/post/zai-celeryzhong-shi-yong-flaskde-shang-xia-wen
在 Flask 项目中使用 Celery – 李林克斯
目前写成:
resources/celery_task.py
import os
from app import app, celery, log

from resources.tts import refreshAzureSpeechToken


#----------------------------------------
# Celery tasks
#----------------------------------------

# @celery.task()
@celery.task
# @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    log.info("deleteTmpAudioFile: filename=%s", filename)

    audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
    # audioTmpFolder = "tmp/audio"
    log.info("audioTmpFolder=%s", audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
    log.info("curFolderAbsPath=%s", curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        log.info("Ok to delete file %s", tempAudioFullname)
    else:
        log.warning("No need to remove for not exist file %s", tempAudioFullname)


@celery.task
def celeryRefreshAzureSpeechToken():
    """celery's task: refreshAzureSpeechToken"""
    refreshAzureSpeechToken()

@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: sender=%s", sender)

    sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                             celeryRefreshAzureSpeechToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")


# log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile)
# log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name)
# log.info("celery.tasks=%s", celery.tasks)
app.py
from factory import celery

app = create_app(settings)
log.debug("celery=%s", celery)
factory.py
from celery import Celery


celery = Celery()
print("celery=%" % celery)


def create_app(config_object):
    app = Flask(__name__)
    CORS(app)

    # app.config.from_object('config.DevelopmentConfig')
    # # app.config.from_object('config.ProductionConfig')

    app.config.from_object(config_object)

    create_extensions(app)


def create_extensions(app):
    global log, mongo, fsCollection, api, celery

    log = create_log(app)
    ...
    celery = create_celery(app)
    log.info("celery=%s", celery)

    return app
待后续调试,看看是否正常运行。
【总结】
此处,目前对于Celery改为工厂模式初始化,代码是:
import os
# from app import celery
# from app import app, log
# from flask import current_app as app
# from factory import celery
from flask import g
from resources.tts import refreshAzureSpeechToken

app = g.app
log = g.log
celery = g.celery

#----------------------------------------
# Celery tasks
#----------------------------------------

# @celery.task()
@celery.task
# @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    # log = app.logger
    with celery.app.app_context():
        log.info("deleteTmpAudioFile: filename=%s", filename)

        audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
        # audioTmpFolder = "tmp/audio"
        ...

@celery.task
def celeryRefreshAzureSpeechToken():
    """celery's task: refreshAzureSpeechToken"""
    with celery.app.app_context():
        refreshAzureSpeechToken()

@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    with celery.app.app_context():
        # log = app.logger
        log.info("celerySetupPeriodicTasks: sender=%s", sender)

        sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                                 celeryRefreshAzureSpeechToken.s(),
                                 name="refresh ms Azure token every less than 10 minutes")
而app.py是:
from conf.app import settings

from factory import create_app

app = create_app(settings)
# register_extensions(app)
log = app.logger

if __name__ == "__main__":
    app.run(
        host=app.config["FLASK_HOST"],
        port=app.config["FLASK_PORT"],
        debug=app.config["DEBUG"],
        use_reloader=False
    )
factory.py
import os
from flask import Flask
...
from celery import Celery

from flask import g

################################################################################
# Global Function
################################################################################

def create_app(config_object):
    # global log

    # app = Flask(__name__) #<Flask 'factory'>
    app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'>
    CORS(app)

    # app.config.from_object('config.DevelopmentConfig')
    # # app.config.from_object('config.ProductionConfig')

    app.config.from_object(config_object)
    with app.app_context():
        g.app = app

        log = create_log(app)
        g.log = log

        ...

        register_extensions(app)

    return app


def register_extensions(app):
    # global log, mongo, fsCollection, api, celery

    log = g.log
...
    celery = create_celery(app)
    g.celery = celery
    log.info("celery=%s", celery)
...
    return app

...


def create_celery(app):
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

    return celery
...
目前,至少对于Flask的app的初始化是可以的,可以运行的。不会出现循环导入和全局变量的问题。
【后记】
后来去真正本地调试运行时,尤其是本地运行celery的work时,出错了:
【已解决】Flask的Celery改为工厂模式后本地调试worker出错:RuntimeError: Working outside of application context
所以之前的写法,还是不行。
最终是:
想办法解决了循环导入,以及全局变量的问题,期间用了很多折中的做法:
先说celery的库:
抛弃了之前的Flask-Celery-Helper,只用最原始的Celery
# from flask_celery import Celery
from celery import Celery
-》因为其中会让我们在配置中去加上CELERY_IMPORTS:
# for Flask-Celery-Helper
# CELERY_IMPORTS = ('tasks.deleteTmpAudioFile', 'tasks.celeryRefreshAzureSpeechToken', )
# CELERY_IMPORTS = ('resources.tasks.deleteTmpAudioFile', 'resources.tasks.celeryRefreshAzureSpeechToken', )
-》会导致找不到task
-》所以果断放弃。
再说每个文件的代码和其中的改动:
文件结构是:
➜  xxxRobotDemoServer git:(master) ✗ tree .       
...
├── app.py
...
├── conf
│   ├── __init__.py
│   ├── __pycache__
│   │   └── __init__.cpython-36.pyc
│   ├── app
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-36.pyc
│   │   │   └── settings.cpython-36.pyc
│   │   ├── development
│   │   │   └── __init__.py
│   │   ├── production
│   │   │   └── __init__.py
│   │   └── settings.py
...
├── factory.py
├── resources
│   ├── __init__.py
│   ├── asr.py
│   ├── extensions_celery.py
│   ├── files.py
│   ├── qa.py
│   ├── tasks.py
│   └── tts.py
...
详细解释每个文件:
(1)resources/extensions_celery.py
celery的初始化中:只是初始化celery,而不去做其他事情
# from flask_celery import Celery
from conf.app import settings
from celery import Celery
from celery.utils.log import get_task_logger

# celery = Celery()
celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL)
celery_logger = get_task_logger(__name__)
print("in extensions_celery: celery=%s" % celery)
(2)resources/tasks.py
背景是:
之前celery中依赖app和log
-》那是因为task中:
  • 配置用到app.config[“xxx”] -》 后来改为settings.xxx了-》单独引入settings不会产生循环导入
  • log是task中需要打印输出 -》 希望尽量和flask的app的log用同一个log
这样会导致:
A:否则始终无法很好的处理,如何再去得到app
因为,即使最开始运行app时,可以方便在的得到app后,但是celery work -A时,还是会无法得到app
而如果再去调用create_app时,就又会导致app初始化时,引入flask-restful时,引用到别的模块,比如:
resources/tts.py
而其中由于用到celery的异步task:deleteTmpAudioFile,所以要导入
from resources.tasks import deleteTmpAudioFile
从而会触发tasks中,多了一次app的初始化
-》变成了:
Flask的app的初始化运行,初始化了2次,这是无法接受的
B:无法方便的获取(app的)log
最后的折中的解决办法是:
celery的task中:去除之前依赖的flask的app和log
-》
  • 把app.config[“xxx”]  改为settings.xxx了
    • -》而单独引入settings不会产生循环导入
  • 用另外的办法去获得log
    • 此处用的是从resources/extensions_celery.py获取celery的logger
完整代码:
import os

# from resources.extensions_celery import celery
# print("celery=%s" % celery)
# print("celery.app=%s" % celery.app)
# print("celery.app.log=%s" % celery.app.log)
# print("celery.log=%s" % celery.log)

from conf.app import settings

# try:
#     from flask import g
#     print("tasks: import flask g ok")
#     print("g=%s" % g)
#     log = g.log
#     print("log=%s" % log)
# except RuntimeError as err:
#     # except:
#     print("tasks: failed to import flask g, err=%s" % err)
#     from factory import create_app
#     print("tasks: import create_app ok")
#     app = create_app(settings)
#     print("create_app ok, app=%s" % app)
#     log = app.logger
#     print("log=%s" % log)

# import logging
# log = logging.getLogger(settings.FLASK_APP_NAME)
# log.info("test logging getLogger from flask app, work?")

# print("-----before: from factory import create_celery_app")
# from factory import create_celery_app
# from factory import create_log, create_app
# print("from factory import create_celery_app ok")
# celery = create_celery_app()
# app = create_app(settings)
# celery = create_celery_app(app)
# log = app.logger

from resources.extensions_celery import celery, celery_logger as log
print("create_celery_app return: celery=%s, log=%s" % (celery, log))

#----------------------------------------
# Celery tasks
#----------------------------------------

# @celery.task()
@celery.task
# @celery.task(name=settings.CELERY_TASK_NAME + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    # print("deleteTmpAudioFile: celery=%s, filename=%s" % (celery, filename))

    # log = app.logger
    # with celery.app.app_context():
    #     log = celery.app.logger
    #     app = celery.app

    # log = celery.log
    # print("celery.log=%s" % celery.log)
    # print("log=%s" % log)

    log.info("deleteTmpAudioFile: celery=%s, filename=%s", celery, filename)

    audioTmpFolder = settings.AUDIO_TEMP_FOLDER
    # audioTmpFolder = "tmp/audio"
    log.info("audioTmpFolder=%s", audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
    log.info("curFolderAbsPath=%s", curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        log.info("Ok to delete file %s", tempAudioFullname)
    else:
        log.warning("No need to remove for not exist file %s", tempAudioFullname)


@celery.task
def celeryRefreshAzureSpeechToken():
    """celery's task: refreshAzureSpeechToken"""
    log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery)

    # with celery.app.app_context():
    from resources.tts import refreshAzureSpeechToken
    refreshAzureSpeechToken()

@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info("celerySetupPeriodicTasks: celery=%s, sender=%s" % (celery, sender))

    # with celery.app.app_context():
    # log = app.logger
    # log = celery.app.logger
    # app = celery.app

    # log = celery.log
    # # print("celery.log=%s" % celery.log)
    # # print("log=%s" % log)
    # print("sender=%s" % sender)

    # log.info("celerySetupPeriodicTasks: sender=%s", sender)
    # print("celerySetupPeriodicTasks: log is usable")

    sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL,
                             celeryRefreshAzureSpeechToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")
(3)factory.py
而factory中没有全局的变量celery(和其他全局变量)
celery的初始化,都放在create_celery_app
并且create_celery_app中,在最开始app初始化时是传入app的
而在celery work -A时,则只是从resources/tasks.py得到celery,而不会再去新建flask的app
import os
from flask import Flask
import logging
from logging.handlers import RotatingFileHandler
# from flask_pymongo import PyMongo
from gridfs import GridFS
from pymongo import MongoClient
from flask_restful import Api
from flask_cors import CORS

from conf.app import settings

from celery import Celery
# from flask_celery import Celery
# from  resources.extensions_celery import celery

from flask import g

################################################################################
# Global Variables
################################################################################
# # log = logging.getLogger() #<RootLogger root (WARNING)>
# log = None
# print("log=%s" % log)
#
# # mongo = MongoClient() # MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True)
# mongo = None
# print("mongo=%s" % mongo)
# fsCollection = None #None
# print("fsCollection=%s" % fsCollection)
#
# celery = Celery() #<Celery __main__ at 0x1068d8b38>
# celery = None
# print("celery=%s" % celery)

################################################################################
# Global Function
################################################################################

def create_app(config_object, init_extensions=True):
    # global log

    # app = Flask(__name__) #<Flask 'factory'>
    app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'>
    CORS(app)

    # app.config.from_object('config.DevelopmentConfig')
    # # app.config.from_object('config.ProductionConfig')

    app.config.from_object(config_object)
    with app.app_context():
        g.app = app

        log = create_log(app)
        g.log = log

        log.debug("after load from object: app.config=%s", app.config)
        log.debug('app.config["DEBUG"]=%s, app.config["MONGODB_HOST"]=%s, app.config["FILE_URL_HOST"]=%s',
                  app.config["DEBUG"], app.config["MONGODB_HOST"], app.config["FILE_URL_HOST"])

        if init_extensions:
            register_extensions(app)
            log.info("flask app extensions init completed")

    return app


def register_extensions(app):
    # global log, mongo, fsCollection, api, celery

    log = g.log

    mongo = create_mongo(app)
    g.mongo = mongo
    log.info("mongo=%s", mongo)
    mongoServerInfo = mongo.server_info()
    log.debug("mongoServerInfo=%s", mongoServerInfo)

    fsCollection = create_gridfs_fs_collection(mongo)
    g.fsCollection = fsCollection
    log.info("fsCollection=%s", fsCollection)

    celery = create_celery_app(app)
    g.celery = celery
    log.info("celery=%s", celery)

    # api = Api(app)
    api = create_rest_api(app)
    log.debug("api=%s", api)
    g.api = api

    return app


def create_rest_api(app):
    from resources.qa import RobotQaAPI
    from resources.asr import RobotAsrAPI
    from resources.files import GridfsAPI, TmpAudioAPI

    rest_api = Api()
    rest_api.add_resource(RobotQaAPI, '/qa', endpoint='qa')
    rest_api.add_resource(RobotAsrAPI, '/asr/language/<string:language>', endpoint='asr')
    rest_api.add_resource(GridfsAPI, '/files/<fileId>', '/files/<fileId>/<fileName>', endpoint='gridfs')
    rest_api.add_resource(TmpAudioAPI, '/tmp/audio/<filename>', endpoint='TmpAudio')

    rest_api.init_app(app)
    return rest_api


def create_log(app):
    print("create_log: before init log: app.logger=%s" % app.logger)
    logFormatterStr = app.config["LOG_FORMAT"]
    logFormatter = logging.Formatter(logFormatterStr)

    fileHandler = RotatingFileHandler(
        app.config['LOG_FILE_FILENAME'],
        maxBytes=app.config["LOG_FILE_MAX_BYTES"],
        backupCount=app.config["LOG_FILE_BACKUP_COUNT"],
        encoding="UTF-8")
    fileHandler.setLevel(logging.DEBUG)
    fileHandler.setFormatter(logFormatter)
    app.logger.addHandler(fileHandler)
    # Note: should NOT set StreamHandler here, otherwise will duplicate debug log
    app.logger.setLevel(logging.DEBUG)  # set root log level

    log = app.logger
    log.info("app=%s", app)
    # log.debug("app.config=%s", app.config)
    print("create_log: after init log: app.logger=%s" % app.logger)

    return log


def create_mongo(app):
    # mongo_client = MongoClient(
    #     host=app.config["MONGODB_HOST"],
    #     port=app.config["MONGODB_PORT"],
    #     username=app.config["MONGODB_USERNAME"],
    #     password=app.config["MONGODB_PASSWORD"],
    #     authSource=app.config["MONGODB_AUTH_SOURCE"]
    # )

    if settings.MONGODB_AUTH_SOURCE:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
            username=settings.MONGODB_USERNAME,
            password=settings.MONGODB_PASSWORD,
            authSource=settings.MONGODB_AUTH_SOURCE
        )
    elif settings.MONGODB_USERNAME and settings.MONGODB_PASSWORD:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
            username=settings.MONGODB_USERNAME,
            password=settings.MONGODB_PASSWORD,
        )
    elif settings.MONGODB_PORT:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
            port=int(settings.MONGODB_PORT),
        )
    elif settings.MONGODB_HOST:
        mongo_client = MongoClient(
            host=settings.MONGODB_HOST,
        )
    else:
        mongo_client = MongoClient()


    return mongo_client


def create_gridfs_fs_collection(mongo_db):
    # Pure PyMongo
    gridfs_db = mongo_db.gridfs  # Database(MongoClient(host=['xxx:32018'], document_class=dict, tz_aware=False, connect=True, authsource='gridfs'), 'gridfs')
    gridfs_fs_collection = GridFS(gridfs_db)  # <gridfs.GridFS object at 0x1107b2390>
    return gridfs_fs_collection


def create_celery_app(app=None):
    print("create_celery_app: app=%s" % app)

    app = app or create_app(settings, init_extensions=False)
    app_import_name = app.import_name
    # app_name = app.name
    # celery_app_name = app_name
    celery_app_name = app_import_name
    celery = Celery(celery_app_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)

    # celery.log = app.logger

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                # g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
                app.logger.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask


    # celery.init_app(app)

    print("init celery ok")

    return celery
(4)app.py
app中只是调用create_app
而不去初始化celery了
import os

from conf.app import settings
from factory import create_app
# from factory import log
# from factory import create_celery_app


################################################################################
# Global Definitions
################################################################################

################################################################################
# Global Variables
################################################################################


################################################################################
# Global Function
################################################################################


################################################################################
# Global Init App
################################################################################

print("in flask app: settings=%s" % (settings))
app = create_app(settings)
app.app_context().push()
# register_extensions(app)
log = app.logger
log.debug("app=%s", app)

log.debug("log=%s", log)
log.debug("settings.FLASK_ENV=%s", settings.FLASK_ENV)
log.debug("settings.DEBUG=%s, settings.MONGODB_HOST=%s, settings.FILE_URL_HOST=%s",
          settings.DEBUG, settings.MONGODB_HOST, settings.FILE_URL_HOST)

# celery = None
# with app.app_context():
# celery = create_celery_app(app)
# print("celery=%s" % celery)

if __name__ == "__main__":
    app.run(
        host=settings.FLASK_HOST,
        port=settings.FLASK_PORT,
        debug=settings.DEBUG,
        use_reloader=False
    )
(5)其他模块用flask的g获取要的全局变量
其他模块中,为了获取最初app初始化后的app,log,mongo
通过factory初始化时加上with app.app_context(),
以及其他每个模块中用from flask import g
再去用log=g.log, app=g.app等去获取自己要的值
resources/files.py
from flask import g
from resources.tts import gTempAudioFolder

log = g.log
mongo = g.mongo
fsCollection = g.fsCollection

class GridfsAPI(Resource):

    def get(self, fileId, fileName=None):
        # log = app.logger
        log.info("fileId=%s, file_name=%s", fileId, fileName)
        ...
resources/qa.py
from flask import g

app = g.app
log = g.log
fsCollection = g.fsCollection
resources/tts.py
from flask import g

app = g.app
log = g.log
这样每个模块中,都可以用上全局的log,全局的app了。
然后PyCharm去debug初始化时,就正常了:
接着再去Mac的终端中去运行celery的worker
celery worker -A resources.tasks.celery --loglevel=DEBUG
然后就可以有正常输出了:
➜  xxxRobotDemoServer git:(master) ✗ celery worker -A resources.tasks.celery --loglevel=DEBUG
cur_flask_environ=None
FLASK_ENV=development
cur_dir=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/conf/app
env_folder=development
dotenv_path=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/conf/app/development/.env
dotenv_load_ok=True
After  load .env: DEBUG=True, MONGODB_HOST=xxx, FILE_URL_HOST=127.0.0.1
in extensions_celery: celery=<Celery RobotQA at 0x104277e80>
create_celery_app return: celery=<Celery RobotQA at 0x104277e80>, log=<Logger resources.extensions_celery (WARNING)>
[2018-08-26 11:11:45,796: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2018-08-26 11:11:45,801: DEBUG/MainProcess] | Worker: Building graph...
[2018-08-26 11:11:45,802: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, Beat, StateDB, Consumer}
[2018-08-26 11:11:45,831: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2018-08-26 11:11:45,831: DEBUG/MainProcess] | Consumer: Building graph...
[2018-08-26 11:11:45,876: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Tasks, Control, Gossip, Agent, event loop}


[email protected]
 v4.2.1 (windowlicker)

Darwin-17.7.0-x86_64-i386-64bit-PE 2018-08-26 11:11:45

[config]
.> app:         RobotQA:0x104277e80
.> transport:   
redis://localhost:6379/0
.> results:     disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . resources.tasks.celeryRefreshAzureSpeechToken
  . resources.tasks.deleteTmpAudioFile

[2018-08-26 11:11:45,921: DEBUG/MainProcess] | Worker: Starting Hub
[2018-08-26 11:11:45,922: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:45,922: DEBUG/MainProcess] | Worker: Starting Pool
[2018-08-26 11:11:46,059: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,060: DEBUG/MainProcess] | Worker: Starting Consumer
[2018-08-26 11:11:46,061: DEBUG/MainProcess] | Consumer: Starting Connection
[2018-08-26 11:11:46,098: INFO/MainProcess] Connected to 
redis://localhost:6379/0
[2018-08-26 11:11:46,098: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,098: DEBUG/MainProcess] | Consumer: Starting Events
[2018-08-26 11:11:46,114: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,114: DEBUG/MainProcess] | Consumer: Starting Heart
[2018-08-26 11:11:46,117: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,119: DEBUG/MainProcess] | Consumer: Starting Mingle
[2018-08-26 11:11:46,119: INFO/MainProcess] mingle: searching for neighbors
[2018-08-26 11:11:47,155: INFO/MainProcess] mingle: all alone
[2018-08-26 11:11:47,156: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,156: DEBUG/MainProcess] | Consumer: Starting Tasks
[2018-08-26 11:11:47,161: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,161: DEBUG/MainProcess] | Consumer: Starting Control
[2018-08-26 11:11:47,167: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,167: DEBUG/MainProcess] | Consumer: Starting Gossip
[2018-08-26 11:11:47,172: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,172: DEBUG/MainProcess] | Consumer: Starting event loop
[2018-08-26 11:11:47,173: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2018-08-26 11:11:47,173: INFO/MainProcess] 
[email protected]
 ready.
[2018-08-26 11:11:47,174: DEBUG/MainProcess] basic.qos: prefetch_count->16
其中的task是:
[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . resources.tasks.celeryRefreshAzureSpeechToken
  . resources.tasks.deleteTmpAudioFile
后续和Flask中的task是一致的,那Flask就可以互相识别了,就可以调用task了。
后续调用时task就可以正常运行了:
2018-08-26 11:11:47,174: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-08-26 11:14:28,913: INFO/MainProcess] Received task: resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]  ETA:[2018-08-26 03:14:38.521681+00:00]
[2018-08-26 11:14:28,915: DEBUG/MainProcess] basic.qos: prefetch_count->17
[2018-08-26 11:14:38,527: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x10433c840> (args:('resources.tasks.deleteTmpAudioFile', '6ea6c103-7b58-4d38-ad54-06666b0ebb59', {'lang': 'py', 'task': 'resources.tasks.deleteTmpAudioFile', 'id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'shadow': None, 'eta': '2018-08-26T03:14:38.521681+00:00', 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'parent_id': None, 'argsrepr': "['42667515-281a-4cab-bfac-5d2c23c25a34.mp3']", 'kwargsrepr': '{}', 'origin': '
[email protected]
', 'reply_to': '10c57fe8-23c6-3e20-bea6-0576fa07fe57', 'correlation_id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}}, b'[["42667515-281a-4cab-bfac-5d2c23c25a34.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2018-08-26 11:14:38,564: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-08-26 11:14:38,626: DEBUG/MainProcess] Task accepted: resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] pid:47617
[2018-08-26 11:14:38,628: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: deleteTmpAudioFile: celery=<Celery RobotQA at 0x104277e80>, filename=42667515-281a-4cab-bfac-5d2c23c25a34.mp3
[2018-08-26 11:14:38,632: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: audioTmpFolder=tmp/audio
[2018-08-26 11:14:38,634: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: curFolderAbsPath=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer
[2018-08-26 11:14:38,634: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: audioTmpFolderFullPath=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/tmp/audio
[2018-08-26 11:14:38,635: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: Ok to delete file /Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/tmp/audio/42667515-281a-4cab-bfac-5d2c23c25a34.mp3
[2018-08-26 11:14:38,638: INFO/ForkPoolWorker-2] Task resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] succeeded in 0.012065329006873071s: None
加上celery beat的输出,经过调试后,确保celery的task中,不要调用到别的模块的函数后才正常。
否则celery的task一旦调用别的模块中的东西,则别的模块中的flask的g,就会报错:
    raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
经过调整后的代码:
resources/tasks.py
@celery.task
# def celeryRefreshAzureSpeechToken():
def refreshAzureSpeechToken():
    """celery's task: refresh microsoft azure speech token key for later call tts/ASR api"""
    log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery)

    # with celery.app.app_context():
    # from resources.tts import refreshAzureSpeechToken
    # refreshAzureSpeechToken()

    # global gMsToken
    # log = app.logger
    # log.info("refreshAzureSpeechToken: gMsToken=%s", gMsToken)
    # log.info("refreshAzureSpeechToken")

    getMsTokenUrl = settings.MS_GET_TOKEN_URL
    reqHeaders = {
        "Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY
    }
    log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders)
    resp = requests.post(getMsTokenUrl, headers=reqHeaders)
    log.info("resp=%s", resp)
    respTokenText = resp.text  # eyxxxxiJ9.xxx.xxx
    log.info("respTokenText=%s", respTokenText)
    # gMsToken = respTokenText
    updatedToken = respTokenText

    # # for debug
    # gMsToken = "eyJ0eXAiOxxxxnez"

    # log.info("after refresh: gMsToken=%s", gMsToken)

    return updatedToken
避免了之前的:
from resources.tts import refreshAzureSpeechToken
refreshAzureSpeechToken()
的报错:
ImportError: cannot import name ‘refreshAzureSpeechToken’
resources/tts.py
中的,对于:
from common.util import generateUUID
必要要加上导入sys的path:
import sys
resouces_dir = os.path.dirname(__file__)
project_root_dir = os.path.abspath(os.path.join(resouces_dir, ".."))
if project_root_dir not in sys.path:
    sys.path.append(project_root_dir)
否则会出现无法导入
以及tts内的token的代码也去更新了:
from resources.tasks import refreshAzureSpeechToken


def getAzureSpeechToken():
    """get Microsoft Azure speech service token key"""
    # log = app.logger
    global gMsToken
    gMsToken = refreshAzureSpeechToken()
...
    else:
        # if errNo == BAIDU_ERR_TOKEN_INVALID:
        if errNo == settings.MS_ERR_UNAUTHORIZED:
            log.warning("Token invalid -> retry one for refresh token")
            # refreshBaiduToken()
            gMsToken = refreshAzureSpeechToken()

            # isOk, audioBinData, errNo, errMsg = baiduText2Audio(unicodeText)
            isOk, audioBinData, errNo, errMsg = msTTS(unicodeText)
心得:
其实,此处能成功把Celery转换为工厂模式,核心的几个点是:
基本上明白了:
Celery的初始化,不一定非要是传入Flask的app
其实别人的,类似于
# extensions.py
from flask_celery import Celery
celery = Celery()

# application.py
from flask import Flask
from extensions import celery

def create_app():
    app = Flask(__name__)
    app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
    app.config['CELERY_BROKER_URL'] = 'redis://localhost'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
    celery.init_app(app)
    return app
或:
from celery import Celery
from flask import Flask

from app.config import BaseConfig

celery = Celery(__name__, broker=BaseConfig.CELERY_BROKER_URL)


def create_app():
    app = Flask(__name__)
    # ....
    celery.conf.update(app.config)    # 更新 celery 的配置
    # ...
    return app
核心在于:
最开始初始化Celery的话,一定要加上broker参数,指定了
之后再去在create_app中初始化app后,再去更新配置:
celery.conf.update(app.config)
celery.init_app(app)
而Celery其实本身和Flask的app,并没有什么本质的关联。
主要是flask中的配置参数:
conf/app/settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
# CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result
# for periodical celery task
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True
起到了效果。
所以此处改为:
celery初始化,从settings传入celery相关的配置:
resources/extensions_celery.py
celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL)
就可以了。
然后别的地方,去导入,引用celery的时候
-》感觉会重新触发Celery的初始化,和(一般做法中会有的)Flask的app中的初始化中的flask,会不相同
-》其实是一样的
-》主要就是上面的Celery,传入了name和broker是一致的,就可以了
-》celery work -A 中就可以了
celery worker -A resources.tasks.celery --loglevel=DEBUG
调用task中的celery,就不会有什么,非要和Flask中的app关联在一起了。
其次是,celery的task中,不能依赖导入别的模块
-》否则celery的work或beat时,再去导入别的模块时,就会出现各种问题
比如模块路径问题导致无法导入
比如flask的g无法正常识别
等等。
最终的上述的celery的工厂模式的代码,去运行了flask的app后,再分别去运行worker和beat:
pipenv shell
celery worker -A resources.tasks.celery --loglevel=DEBUG
和:
pipenv shell
celery beat -A resources.tasks.celery -s runtime/celerybeat-schedule --loglevel=DEBUG
终于可以正常执行celery的异步函数和定期任务了。

转载请注明:在路上 » 【已解决】Flask中如何用工厂模式初始化Celery

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
86 queries in 0.132 seconds, using 20.80MB memory